diff --git a/core/cmd/weather_data_export/main.go b/core/cmd/weather_data_export/main.go new file mode 100644 index 0000000..919e098 --- /dev/null +++ b/core/cmd/weather_data_export/main.go @@ -0,0 +1,667 @@ +package main + +import ( + "bufio" + "context" + "database/sql" + "encoding/csv" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "math" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "weatherstation/internal/config" + "weatherstation/internal/database" +) + +// This command maintains a rolling CSV export at weather_data_export/main.csv +// holding the last 48 hours of 10-minute bucket data, functionally mirroring +// internal/tools/exporter.go without reusing it directly. + +const ( + outBaseDir = "weather_data_export" + mainCSVName = "main.csv" + historyDir = "history" + csvHeader = "latitude,longitude,station_id,station_name,date_time,elevation,pressure,temperature,dewpoint,wind_speed,wind_direction,relative_humidity,ztd,pwv" + bucketMin = 10 + windowHours = 48 +) + +type options struct { + // If true and CAIYUN_TOKEN provided, override wind fields from Caiyun realtime API. + overrideWind bool + caiyunToken string +} + +func main() { + // Load config to initialize DB connections used by internal/database + _ = config.GetConfig() + pg := database.GetDB() + my := database.GetMySQL() // may be nil if not configured; functions handle nil + _ = my + + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + + // Options from env + opts := options{ + overrideWind: isTruthy(os.Getenv("EXPORT_OVERRIDE_WIND")), + caiyunToken: getenvDefault("CAIYUN_TOKEN", ""), + } + + // Ensure directories and header + base := outBaseDir + mainPath := filepath.Join(base, mainCSVName) + histPath := filepath.Join(base, historyDir) + mustMkdirAll(base) + mustMkdirAll(histPath) + ensureFileWithHeader(mainPath) + + // On startup, backfill the last 48 hours (10-minute buckets) and enforce retention + now := time.Now().In(loc) + lastEnd := alignToPrevBucketEnd(now, bucketMin) + firstStart := lastEnd.Add(-windowHours * time.Hour) + // Iterate buckets: [firstStart, lastEnd] stepping 10 minutes + for b := firstStart; !b.After(lastEnd); b = b.Add(bucketMin * time.Minute) { + bucketStart := b + bucketEnd := b.Add(bucketMin * time.Minute) + if err := exportBucket(context.Background(), pg, my, loc, opts, bucketStart, bucketEnd, mainPath); err != nil { + log.Printf("startup export bucket %s-%s failed: %v", tf(bucketStart), tf(bucketEnd), err) + } + } + if err := enforceRetention(mainPath, histPath, loc, windowHours); err != nil { + log.Printf("startup retention failed: %v", err) + } + + // Scheduler loop: every 10 minutes aligned to next bucket end + 10s + for { + now = time.Now().In(loc) + next := alignToNextBucketEnd(now, bucketMin).Add(10 * time.Second) + sleep := time.Until(next) + if sleep > 0 { + time.Sleep(sleep) + } + + // Current bucket is (prevEnd-10m, prevEnd] + cur := time.Now().In(loc) + bucketEnd := alignToPrevBucketEnd(cur, bucketMin) + bucketStart := bucketEnd.Add(-bucketMin * time.Minute) + + if err := exportBucket(context.Background(), pg, my, loc, opts, bucketStart, bucketEnd, mainPath); err != nil { + log.Printf("export bucket %s-%s failed: %v", tf(bucketStart), tf(bucketEnd), err) + } + if err := enforceRetention(mainPath, histPath, loc, windowHours); err != nil { + log.Printf("retention failed: %v", err) + } + } +} + +// exportBucket renders one 10-minute bucket and appends to mainPath after removing +// any existing lines for that bucket to keep idempotence. +func exportBucket(ctx context.Context, pg, my *sql.DB, loc *time.Location, opts options, bucketStart, bucketEnd time.Time, mainPath string) error { + // Remove any existing lines for this bucket from mainPath first + if err := removeBucketFromMain(mainPath, loc, bucketEnd); err != nil { + return err + } + + // First: WH65LP (10-min aggregated table) + rows, err := pg.QueryContext(ctx, ` + SELECT + s.latitude, + s.longitude, + s.device_id, + s.altitude, + r.pressure_hpa_x100, + r.temp_c_x100, + r.wind_speed_ms_x1000, + r.wind_dir_deg, + r.humidity_pct, + r.bucket_start, + s.station_id + FROM stations s + JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1 + WHERE s.device_type = 'WH65LP' + AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL + AND s.latitude <> 0 AND s.longitude <> 0 + ORDER BY s.station_id`, bucketStart) + if err != nil { + return fmt.Errorf("query bucket rows failed: %w", err) + } + defer rows.Close() + + // Append rows + f, err := os.OpenFile(mainPath, os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return err + } + defer f.Close() + w := bufio.NewWriter(f) + defer w.Flush() + + dateTimeStr := bucketEnd.In(loc).Format("2006-01-02 15:04:05") + for rows.Next() { + var ( + lat, lon, elev sql.NullFloat64 + deviceID string + pX100, tX100 sql.NullInt64 + wsX1000 sql.NullInt64 + wdDeg sql.NullInt64 + rh sql.NullInt64 + bucketStartTS time.Time + stationID string + ) + if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS, &stationID); err != nil { + log.Printf("scan row failed: %v", err) + continue + } + + var pressureStr, tempStr, wsStr, wdStr, rhStr string + if pX100.Valid { + pressureStr = fmtFloat(float64(pX100.Int64)/100.0, 2) + } + if tX100.Valid { + tempStr = fmtFloat(float64(tX100.Int64)/100.0, 2) + } + if wsX1000.Valid { + wsStr = fmtFloat(float64(wsX1000.Int64)/1000.0, 3) + } + if wdDeg.Valid { + wdStr = fmtFloat(float64(wdDeg.Int64), 0) + } + if rh.Valid { + rhStr = fmtFloat(float64(rh.Int64), 0) + } + + // Optional: override wind from Caiyun realtime + if opts.overrideWind && opts.caiyunToken != "" && lat.Valid && lon.Valid { + if spd, dir, ok := fetchCaiyunRealtimeWind(ctx, opts.caiyunToken, lat.Float64, lon.Float64); ok { + wsStr = fmtFloat(spd, 3) + wdStr = fmtFloat(dir, 0) + } + } + + // ZTD lookup from MySQL within ±5 minutes around bucketEnd + ztdStr := lookupZTD(ctx, my, deviceID, bucketEnd) + + // Build CSV line: use device_id as station_id, station_name empty, dewpoint/pwv empty + var b strings.Builder + b.WriteString(fmtNullFloat(lat)) + b.WriteByte(',') + b.WriteString(fmtNullFloat(lon)) + b.WriteByte(',') + b.WriteString(deviceID) + b.WriteByte(',') + b.WriteByte(',') // station_name + b.WriteString(dateTimeStr) + b.WriteByte(',') + b.WriteString(fmtNullFloat(elev)) + b.WriteByte(',') + b.WriteString(pressureStr) + b.WriteByte(',') + b.WriteString(tempStr) + b.WriteByte(',') + b.WriteByte(',') // dewpoint + b.WriteString(wsStr) + b.WriteByte(',') + b.WriteString(wdStr) + b.WriteByte(',') + b.WriteString(rhStr) + b.WriteByte(',') + b.WriteString(ztdStr) + b.WriteByte(',') // pwv + b.WriteByte('\n') + + if _, err := w.WriteString(b.String()); err != nil { + log.Printf("write csv failed: %v", err) + } + } + if err := rows.Err(); err != nil { + return err + } + + // Second: RADAR stations -> latest radar_weather by station_alias + if err := exportRadarStations(ctx, pg, loc, bucketEnd, mainPath); err != nil { + return err + } + return nil +} + +// exportRadarStations appends rows for device_type='RADAR' using latest radar_weather by alias. +func exportRadarStations(ctx context.Context, pg *sql.DB, loc *time.Location, bucketEnd time.Time, mainPath string) error { + // Load RADAR stations + stRows, err := pg.QueryContext(ctx, ` + SELECT name, latitude, longitude, altitude, station_alias + FROM stations + WHERE device_type = 'RADAR' + AND latitude IS NOT NULL AND longitude IS NOT NULL + `) + if err != nil { + return fmt.Errorf("query RADAR stations failed: %w", err) + } + defer stRows.Close() + + f, err := os.OpenFile(mainPath, os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return err + } + defer f.Close() + w := bufio.NewWriter(f) + defer w.Flush() + + dateTimeStr := bucketEnd.In(loc).Format("2006-01-02 15:04:05") + for stRows.Next() { + var ( + name string + lat, lon, elev sql.NullFloat64 + alias sql.NullString + ) + if err := stRows.Scan(&name, &lat, &lon, &elev, &alias); err != nil { + log.Printf("scan RADAR station failed: %v", err) + continue + } + if !alias.Valid || strings.TrimSpace(alias.String) == "" { + continue + } + + // Latest radar_weather for this alias, not later than bucketEnd + var ( + rwTemp, rwHum, rwWS, rwWD, rwP sql.NullFloat64 + rwDT time.Time + ) + err := pg.QueryRowContext(ctx, ` + SELECT temperature, humidity, wind_speed, wind_direction, pressure, dt + FROM radar_weather + WHERE alias = $1 AND dt <= $2 + ORDER BY dt DESC + LIMIT 1 + `, alias.String, bucketEnd).Scan(&rwTemp, &rwHum, &rwWS, &rwWD, &rwP, &rwDT) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Printf("query radar_weather failed: alias=%s err=%v", alias.String, err) + } + continue + } + + // Map fields + pressureStr := "" + if rwP.Valid { + // Convert: DB value / 100.0 + pressureStr = fmtFloat(rwP.Float64/100.0, 2) + } + tempStr := "" + if rwTemp.Valid { + tempStr = fmtFloat(rwTemp.Float64, -1) + } + wsStr := "" + if rwWS.Valid { + wsStr = fmtFloat(rwWS.Float64, 3) + } + wdStr := "" + if rwWD.Valid { + wdStr = fmtFloat(rwWD.Float64, 0) + } + rhStr := "" + if rwHum.Valid { + rhStr = fmtFloat(rwHum.Float64, -1) + } + + var b strings.Builder + b.WriteString(fmtNullFloat(lat)) + b.WriteByte(',') + b.WriteString(fmtNullFloat(lon)) + b.WriteByte(',') + // station_id = stations.name + b.WriteString(name) + b.WriteByte(',') + // station_name = stations.name + b.WriteString(name) + b.WriteByte(',') + b.WriteString(dateTimeStr) + b.WriteByte(',') + b.WriteString(fmtNullFloat(elev)) + b.WriteByte(',') + b.WriteString(pressureStr) + b.WriteByte(',') + b.WriteString(tempStr) + b.WriteByte(',') + b.WriteByte(',') // dewpoint blank + b.WriteString(wsStr) + b.WriteByte(',') + b.WriteString(wdStr) + b.WriteByte(',') + b.WriteString(rhStr) + b.WriteByte(',') + b.WriteByte(',') // ztd blank + b.WriteByte(',') // pwv blank + b.WriteByte('\n') + + if _, err := w.WriteString(b.String()); err != nil { + log.Printf("write RADAR csv failed: %v", err) + } + } + return stRows.Err() +} + +func lookupZTD(ctx context.Context, my *sql.DB, deviceID string, bucketEnd time.Time) string { + if my == nil { + return "" + } + var ztd sql.NullFloat64 + var ts time.Time + err := my.QueryRowContext(ctx, ` + SELECT ztd, timestamp FROM rtk_data + WHERE station_id = ? + AND ABS(TIMESTAMPDIFF(MINUTE, timestamp, ?)) <= 5 + LIMIT 1 + `, deviceID, bucketEnd).Scan(&ztd, &ts) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Printf("lookup ZTD failed: station=%s err=%v", deviceID, err) + } + return "" + } + if !ztd.Valid { + return "" + } + // Exported as ztd*100 (to match existing exporter behavior) + return fmtFloat(ztd.Float64*100.0, -1) +} + +// fetchCaiyunRealtimeWind returns speed (m/s) and direction (deg) +func fetchCaiyunRealtimeWind(ctx context.Context, token string, lat, lon float64) (float64, float64, bool) { + type realtimeResp struct { + Status string `json:"status"` + Unit string `json:"unit"` + Result struct { + Realtime struct { + Status string `json:"status"` + Wind struct { + Speed float64 `json:"speed"` + Direction float64 `json:"direction"` + } `json:"wind"` + } `json:"realtime"` + } `json:"result"` + } + url := fmt.Sprintf("https://api.caiyunapp.com/v2.6/%s/%f,%f/realtime?unit=SI", token, lon, lat) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return 0, 0, false + } + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return 0, 0, false + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + return 0, 0, false + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, 0, false + } + var data realtimeResp + if err := json.Unmarshal(body, &data); err != nil { + return 0, 0, false + } + if strings.ToLower(data.Status) != "ok" || strings.ToLower(data.Result.Realtime.Status) != "ok" { + return 0, 0, false + } + spd := data.Result.Realtime.Wind.Speed + dirRad := data.Result.Realtime.Wind.Direction + dirDeg := dirRad * 180.0 / math.Pi + for dirDeg < 0 { + dirDeg += 360 + } + for dirDeg >= 360 { + dirDeg -= 360 + } + return spd, dirDeg, true +} + +// enforceRetention keeps only rows with date_time >= now-keepHours in mainPath, +// moving older rows to history files grouped by UTC date (CSV header ensured). +func enforceRetention(mainPath, histDir string, loc *time.Location, keepHours int) error { + cutoff := time.Now().In(loc).Add(-time.Duration(keepHours) * time.Hour) + + f, err := os.Open(mainPath) + if err != nil { + return err + } + defer f.Close() + r := csv.NewReader(f) + r.FieldsPerRecord = -1 + + // Read header + records, err := r.ReadAll() + if err != nil { + return err + } + if len(records) == 0 { + return nil + } + + // Prepare buffers + header := records[0] + var keep [][]string + keep = append(keep, header) + + // History writers cache per day + writers := map[string]*csv.Writer{} + files := map[string]*os.File{} + ensureWriter := func(day string) (*csv.Writer, error) { + if w, ok := writers[day]; ok { + return w, nil + } + // history file path + histPath := filepath.Join(histDir, fmt.Sprintf("weather_data_%s.csv", day)) + needHeader := ensureFileWithHeader(histPath) + hf, err := os.OpenFile(histPath, os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return nil, err + } + hw := csv.NewWriter(hf) + if needHeader { + if err := hw.Write(header); err != nil { + _ = hf.Close() + return nil, err + } + hw.Flush() + } + writers[day] = hw + files[day] = hf + return hw, nil + } + + for i := 1; i < len(records); i++ { + rec := records[i] + if len(rec) < 5 { + continue + } + dtStr := strings.TrimSpace(rec[4]) + dt, err := time.ParseInLocation("2006-01-02 15:04:05", dtStr, loc) + if err != nil { + // keep malformed lines to avoid data loss + keep = append(keep, rec) + continue + } + if !dt.Before(cutoff) { + keep = append(keep, rec) + continue + } + // Move to history file by UTC day of dt + day := dt.UTC().Format("2006-01-02") + w, err := ensureWriter(day) + if err != nil { + return err + } + if err := w.Write(rec); err != nil { + return err + } + } + // Flush & close writers + for _, w := range writers { + w.Flush() + } + for _, f := range files { + _ = f.Close() + } + + // Rewrite main.csv with kept rows + tmp := mainPath + ".part" + outf, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + if err != nil { + return err + } + cw := csv.NewWriter(outf) + if err := cw.WriteAll(keep); err != nil { + _ = outf.Close() + return err + } + cw.Flush() + if err := outf.Close(); err != nil { + return err + } + return os.Rename(tmp, mainPath) +} + +// removeBucketFromMain removes all rows with date_time == bucketEnd from main CSV. +func removeBucketFromMain(mainPath string, loc *time.Location, bucketEnd time.Time) error { + dtStr := bucketEnd.In(loc).Format("2006-01-02 15:04:05") + // Fast path: if file small, rewrite; otherwise stream + in, err := os.Open(mainPath) + if err != nil { + return err + } + defer in.Close() + + r := csv.NewReader(in) + r.FieldsPerRecord = -1 + recs, err := r.ReadAll() + if err != nil { + return err + } + if len(recs) == 0 { + return nil + } + out := make([][]string, 0, len(recs)) + out = append(out, recs[0]) // header + for i := 1; i < len(recs); i++ { + rec := recs[i] + if len(rec) < 5 { + out = append(out, rec) + continue + } + if strings.TrimSpace(rec[4]) == dtStr { + continue // drop + } + out = append(out, rec) + } + tmp := mainPath + ".part" + outf, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + if err != nil { + return err + } + cw := csv.NewWriter(outf) + if err := cw.WriteAll(out); err != nil { + _ = outf.Close() + return err + } + cw.Flush() + if err := outf.Close(); err != nil { + return err + } + return os.Rename(tmp, mainPath) +} + +func ensureFileWithHeader(path string) bool { + if _, err := os.Stat(path); err == nil { + return false + } + _ = os.MkdirAll(filepath.Dir(path), 0o755) + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + log.Printf("create csv failed: %v", err) + return false + } + // Write header + if _, err := f.WriteString(csvHeader + "\n"); err != nil { + _ = f.Close() + return false + } + _ = f.Close() + return true +} + +func alignToNextBucketEnd(t time.Time, minutes int) time.Time { + m := t.Minute() + next := (m/minutes + 1) * minutes + dt := time.Duration(next-m) * time.Minute + return t.Truncate(time.Minute).Add(dt).Truncate(time.Minute) +} + +func alignToPrevBucketEnd(t time.Time, minutes int) time.Time { + m := t.Minute() + prev := (m / minutes) * minutes + return t.Truncate(time.Minute).Add(time.Duration(prev-m) * time.Minute) +} + +func fmtNullFloat(v sql.NullFloat64) string { + if v.Valid { + return fmtFloat(v.Float64, -1) + } + return "" +} + +// fmtFloat: prec < 0 -> trim trailing zeros +func fmtFloat(fv float64, prec int) string { + if prec >= 0 { + return fmt.Sprintf("%.*f", prec, fv) + } + s := fmt.Sprintf("%.10f", fv) + s = strings.TrimRight(s, "0") + s = strings.TrimRight(s, ".") + if s == "-0" { + s = "0" + } + if s == "" || s == "-" || s == "+" || s == "." { + return "0" + } + if math.Abs(fv) < 1e-9 { + return "0" + } + return s +} + +func isTruthy(s string) bool { + switch strings.ToLower(strings.TrimSpace(s)) { + case "1", "true", "yes", "on": + return true + default: + return false + } +} + +func getenvDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func mustMkdirAll(dir string) { + if err := os.MkdirAll(dir, 0o755); err != nil { + log.Fatalf("mkdir %s: %v", dir, err) + } +} + +func tf(t time.Time) string { return t.Format("2006-01-02 15:04:05") }