package tools import ( "compress/gzip" "context" "database/sql" "errors" "fmt" "io" "log" "math" "os" "path/filepath" "strings" "time" "weatherstation/internal/database" ) // Exporter 负责每10分钟导出 CSV(含ZTD融合) type Exporter struct { pg *sql.DB my *sql.DB loc *time.Location // Asia/Shanghai logger *log.Logger // 专用日志记录器 } func NewExporter() *Exporter { loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } // 创建导出专用日志文件(追加模式) if err := os.MkdirAll("export_data", 0o755); err != nil { log.Printf("创建导出日志目录失败: %v", err) } f, err := os.OpenFile(filepath.Join("export_data", "export.log"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) if err != nil { log.Printf("创建导出日志文件失败: %v", err) } // 使用自定义日志格式,包含时间戳 logger := log.New(f, "", log.Ldate|log.Ltime|log.Lmicroseconds) return &Exporter{ pg: database.GetDB(), my: database.GetMySQL(), loc: loc, logger: logger, } } // Start 启动调度循环(阻塞) func (e *Exporter) Start(ctx context.Context) error { for { select { case <-ctx.Done(): return ctx.Err() default: } now := time.Now().In(e.loc) // 下一个10分钟边界 + 10秒 next := alignToNextBucketEnd(now, 10).Add(10 * time.Second) e.logger.Printf("调度: 当前=%s, 下次执行=%s (延迟10秒)", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05")) delay := time.Until(next) if delay > 0 { timer := time.NewTimer(delay) select { case <-ctx.Done(): timer.Stop() return ctx.Err() case <-timer.C: } } // 当前时间在 21:11:xx 时,应导出桶 [21:00, 21:10) currentTime := time.Now().In(e.loc) bucketEnd := alignToPrevBucketEnd(currentTime, 10) bucketStart := bucketEnd.Add(-10 * time.Minute) e.logger.Printf("当前时间=%s, 导出桶开始时间=%s, 桶结束时间=%s", currentTime.Format("2006-01-02 15:04:05"), bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05")) if err := e.exportBucket(ctx, bucketStart, bucketEnd); err != nil { e.logger.Printf("导出桶 %s-%s 失败: %v", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), err) } } } // exportBucket 导出一个10分钟桶(CST) func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time.Time) error { utcDay := bucketEnd.UTC().Format("2006-01-02") outDir := filepath.Join("export_data") histDir := filepath.Join("export_data", "history") if err := os.MkdirAll(histDir, 0o755); err != nil { return err } activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay)) e.logger.Printf("开始导出: 桶开始时间=%s, 桶结束时间=%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath) // 先查询所有符合条件的站点,用于后续比对缺失 var eligibleStations []struct { StationID string DeviceID string } stationsRows, err := e.pg.QueryContext(ctx, ` SELECT station_id, device_id FROM stations WHERE device_type = 'WH65LP' AND latitude IS NOT NULL AND longitude IS NOT NULL AND latitude <> 0 AND longitude <> 0 ORDER BY station_id `) if err != nil { e.logger.Printf("查询合格站点失败: %v", err) } else { defer stationsRows.Close() for stationsRows.Next() { var station struct { StationID string DeviceID string } if err := stationsRows.Scan(&station.StationID, &station.DeviceID); err == nil { eligibleStations = append(eligibleStations, station) } } e.logger.Printf("合格站点总数: %d", len(eligibleStations)) for _, s := range eligibleStations { e.logger.Printf("合格站点: station_id=%s, device_id=%s", s.StationID, s.DeviceID) } } // 轮转上一 UTC 日的文件(若存在且未压缩) if err := rotatePreviousUTC(outDir, histDir, utcDay); err != nil { e.logger.Printf("轮转上一日文件失败: %v", err) } needHeader := ensureFileWithHeader(activePath) // 查询该桶的数据 - 使用桶开始时间查询 e.logger.Printf("查询数据: 使用bucket_start=%s", bucketStart.Format("2006-01-02 15:04:05")) rows, err := e.pg.QueryContext(ctx, ` SELECT s.latitude, s.longitude, s.device_id, s.altitude, r.pressure_hpa_x100, r.temp_c_x100, r.wind_speed_ms_x1000, r.wind_dir_deg, r.humidity_pct, r.bucket_start, s.station_id FROM stations s JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1 WHERE s.device_type = 'WH65LP' AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL AND s.latitude <> 0 AND s.longitude <> 0 ORDER BY s.station_id `, bucketStart) if err != nil { return fmt.Errorf("查询10分钟数据失败: %v", err) } defer rows.Close() f, err := os.OpenFile(activePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) if err != nil { return err } defer f.Close() if needHeader { if _, err := f.WriteString(headerLine() + "\n"); err != nil { return err } } var total, ztdHit, ztdMiss int processedStationIDs := make(map[string]bool) for rows.Next() { var ( lat, lon, elev sql.NullFloat64 deviceID string pX100, tX100 sql.NullInt64 wsX1000 sql.NullInt64 wdDeg sql.NullInt64 rh sql.NullInt64 bucketStartTS time.Time stationID string ) if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS, &stationID); err != nil { e.logger.Printf("扫描行失败: %v", err) continue } processedStationIDs[stationID] = true e.logger.Printf("处理站点: station_id=%s, device_id=%s, 经度=%.6f, 纬度=%.6f", stationID, deviceID, lon.Float64, lat.Float64) // CSV 使用桶末时间作为 date_time(用于表意 10 分钟区间的右端点) dateTimeStr := bucketEnd.In(e.loc).Format("2006-01-02 15:04:05") e.logger.Printf("站点 %s: 写出时间(桶末)=%s,用于 ZTD 对齐参考", stationID, dateTimeStr) var pressureStr, tempStr, wsStr, wdStr, rhStr string if pX100.Valid { pressureStr = fmtFloat(float64(pX100.Int64)/100.0, 2) } if tX100.Valid { tempStr = fmtFloat(float64(tX100.Int64)/100.0, 2) } if wsX1000.Valid { wsStr = fmtFloat(float64(wsX1000.Int64)/1000.0, 3) } if wdDeg.Valid { wdStr = fmtFloat(float64(wdDeg.Int64), 0) } if rh.Valid { rhStr = fmtFloat(float64(rh.Int64), 0) } // 使用device_id查询ZTD,使用桶末时间 ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd) if ztdStr != "" { ztdHit++ e.logger.Printf("站点 %s (device_id=%s): ZTD 数据正常, 值=%s", stationID, deviceID, ztdStr) } else { ztdMiss++ e.logger.Printf("站点 %s (device_id=%s): ZTD 数据缺失或超出5分钟窗口", stationID, deviceID) } var b strings.Builder b.WriteString(fmtNullFloat(lat)) b.WriteByte(',') b.WriteString(fmtNullFloat(lon)) b.WriteByte(',') b.WriteString(deviceID) // CSV输出用device_id作为station_id b.WriteByte(',') b.WriteByte(',') // station_name 留空 b.WriteString(dateTimeStr) b.WriteByte(',') b.WriteString(fmtNullFloat(elev)) b.WriteByte(',') b.WriteString(pressureStr) b.WriteByte(',') b.WriteString(tempStr) b.WriteByte(',') b.WriteByte(',') // dewpoint 留空 b.WriteString(wsStr) b.WriteByte(',') b.WriteString(wdStr) b.WriteByte(',') b.WriteString(rhStr) b.WriteByte(',') b.WriteString(ztdStr) b.WriteByte(',') // pwv 留空 if _, err := f.WriteString(b.String() + "\n"); err != nil { e.logger.Printf("写入CSV失败: %v", err) } total++ } if err := rows.Err(); err != nil { return err } // 检查哪些站点在这个桶完全没有数据 var missingCount int for _, station := range eligibleStations { if !processedStationIDs[station.StationID] { e.logger.Printf("站点缺失: station_id=%s, device_id=%s (rs485_weather_10min 表中未找到记录)", station.StationID, station.DeviceID) missingCount++ } } e.logger.Printf("导出完成: 桶开始时间=%s, 桶结束时间=%s, 总行数=%d, ZTD命中=%d, ZTD未命中=%d, 缺失站点数=%d", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), total, ztdHit, ztdMiss, missingCount) return nil } func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd time.Time) string { if e.my == nil { return "" } var ztd sql.NullFloat64 var ts time.Time err := e.my.QueryRowContext(ctx, ` SELECT ztd, timestamp FROM rtk_data WHERE station_id = ? AND ABS(TIMESTAMPDIFF(MINUTE, timestamp, ?)) <= 5 LIMIT 1 `, deviceID, bucketEnd).Scan(&ztd, &ts) if err != nil { if !errors.Is(err, sql.ErrNoRows) { e.logger.Printf("查询ZTD失败: station_id=%s, error=%v", deviceID, err) } return "" } if !ztd.Valid { e.logger.Printf("站点 device_id=%s: ZTD值为NULL", deviceID) return "" } return fmtFloat(ztd.Float64*1000.0, -1) } func ensureFileWithHeader(path string) bool { if _, err := os.Stat(path); err == nil { return false } dir := filepath.Dir(path) _ = os.MkdirAll(dir, 0o755) f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { log.Printf("create csv failed: %v", err) return false } _ = f.Close() return true } func headerLine() string { return "latitude,longitude,station_id,station_name,date_time,elevation,pressure,temperature,dewpoint,wind_speed,wind_direction,relative_humidity,ztd,pwv" } func alignToNextBucketEnd(t time.Time, minutes int) time.Time { m := t.Minute() next := (m/minutes + 1) * minutes dt := time.Duration(next-m) * time.Minute return t.Truncate(time.Minute).Add(dt).Truncate(time.Minute) } func alignToPrevBucketEnd(t time.Time, minutes int) time.Time { m := t.Minute() prev := (m / minutes) * minutes // 返回不超过当前时间的10分钟整点(例如 21:21 -> 21:20) return t.Truncate(time.Minute).Add(time.Duration(prev-m) * time.Minute) } func fmtNullFloat(v sql.NullFloat64) string { if v.Valid { return fmtFloat(v.Float64, -1) } return "" } // fmtFloat: prec < 0 表示用不固定小数(去除无意义零),否则保留指定小数位 func fmtFloat(fv float64, prec int) string { if prec >= 0 { return fmt.Sprintf("%.*f", prec, fv) } s := fmt.Sprintf("%.10f", fv) s = strings.TrimRight(s, "0") s = strings.TrimRight(s, ".") if s == "-0" { s = "0" } if s == "" || s == "-" || s == "+" || s == "." { return "0" } if math.Abs(fv) < 1e-9 { return "0" } return s } // rotatePreviousUTC 将上一UTC日的活跃CSV压缩到history目录 func rotatePreviousUTC(outDir, histDir, currentUTC string) error { // 计算昨日 UTC 日期 curDay, err := time.Parse("2006-01-02", currentUTC) if err != nil { return nil } yesterday := curDay.Add(-24 * time.Hour).Format("2006-01-02") prevPath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", yesterday)) gzPath := filepath.Join(histDir, fmt.Sprintf("weather_data_%s.csv.gz", yesterday)) if _, err := os.Stat(prevPath); err != nil { return nil } if _, err := os.Stat(gzPath); err == nil { // 已压缩 return nil } return gzipFile(prevPath, gzPath) } func gzipFile(src, dst string) error { srcF, err := os.Open(src) if err != nil { return err } defer srcF.Close() if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil { return err } dstF, err := os.Create(dst) if err != nil { return err } defer func() { _ = dstF.Close() }() gw := gzip.NewWriter(dstF) gw.Name = filepath.Base(src) gw.ModTime = time.Now() defer func() { _ = gw.Close() }() if _, err := io.Copy(gw, srcF); err != nil { return err } // 压缩成功后删除原文件 return os.Remove(src) }