From 1defe32470a41b489e7afaee1d857a1d772ddff5 Mon Sep 17 00:00:00 2001 From: yarnom Date: Fri, 22 Aug 2025 20:30:44 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E6=96=B0=E5=A2=9E=E5=AF=BC?= =?UTF-8?q?=E5=87=BA=E6=97=A5=E5=BF=97=EF=BC=8C=E6=96=B9=E4=BE=BF=E6=9F=A5?= =?UTF-8?q?=E6=89=BE=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/tools/exporter.go | 91 +++++++++++++++++++++++++++++++------- 1 file changed, 75 insertions(+), 16 deletions(-) diff --git a/internal/tools/exporter.go b/internal/tools/exporter.go index e439f4c..885a79d 100644 --- a/internal/tools/exporter.go +++ b/internal/tools/exporter.go @@ -18,9 +18,10 @@ import ( // Exporter 负责每10分钟导出 CSV(含ZTD融合) type Exporter struct { - pg *sql.DB - my *sql.DB - loc *time.Location // Asia/Shanghai + pg *sql.DB + my *sql.DB + loc *time.Location // Asia/Shanghai + logger *log.Logger // 专用日志记录器 } func NewExporter() *Exporter { @@ -28,10 +29,25 @@ func NewExporter() *Exporter { if loc == nil { loc = time.FixedZone("CST", 8*3600) } + + // 创建导出专用日志文件(追加模式) + if err := os.MkdirAll("export_data", 0o755); err != nil { + log.Printf("创建导出日志目录失败: %v", err) + } + f, err := os.OpenFile(filepath.Join("export_data", "export.log"), + os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + log.Printf("创建导出日志文件失败: %v", err) + } + + // 使用自定义日志格式,包含时间戳 + logger := log.New(f, "", log.Ldate|log.Ltime|log.Lmicroseconds) + return &Exporter{ - pg: database.GetDB(), - my: database.GetMySQL(), - loc: loc, + pg: database.GetDB(), + my: database.GetMySQL(), + loc: loc, + logger: logger, } } @@ -46,7 +62,7 @@ func (e *Exporter) Start(ctx context.Context) error { now := time.Now().In(e.loc) next := alignToNextBucketEnd(now, 10).Add(30 * time.Second) - log.Printf("exporter: now=%s next_run=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05")) + e.logger.Printf("调度: 当前=%s, 下次执行=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05")) delay := time.Until(next) if delay > 0 { timer := time.NewTimer(delay) @@ -63,7 +79,7 @@ func (e *Exporter) Start(ctx context.Context) error { start := end.Add(-10 * time.Minute) if err := e.exportBucket(ctx, start, end); err != nil { - log.Printf("export bucket %s-%s failed: %v", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), err) + e.logger.Printf("导出桶 %s-%s 失败: %v", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), err) } } } @@ -77,15 +93,39 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time return err } activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay)) - log.Printf("exporter: begin bucket start=%s end=%s -> %s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath) + e.logger.Printf("开始导出: 桶=%s-%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath) + + // 先查询所有符合条件的站点,用于后续比对缺失 + var eligibleStations []string + stationsRows, err := e.pg.QueryContext(ctx, ` + SELECT device_id + FROM stations + WHERE device_type = 'WH65LP' + AND latitude IS NOT NULL AND longitude IS NOT NULL + AND latitude <> 0 AND longitude <> 0 + ORDER BY device_id + `) + if err != nil { + e.logger.Printf("查询合格站点失败: %v", err) + } else { + defer stationsRows.Close() + for stationsRows.Next() { + var deviceID string + if err := stationsRows.Scan(&deviceID); err == nil { + eligibleStations = append(eligibleStations, deviceID) + } + } + e.logger.Printf("合格站点总数: %d, 站点列表: %v", len(eligibleStations), eligibleStations) + } // 轮转上一 UTC 日的文件(若存在且未压缩) if err := rotatePreviousUTC(outDir, histDir, utcDay); err != nil { - log.Printf("rotate previous day failed: %v", err) + e.logger.Printf("轮转上一日文件失败: %v", err) } needHeader := ensureFileWithHeader(activePath) + // 查询该桶的数据 rows, err := e.pg.QueryContext(ctx, ` SELECT s.latitude, @@ -106,7 +146,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time ORDER BY s.device_id `, bucketStart) if err != nil { - return err + return fmt.Errorf("查询10分钟数据失败: %v", err) } defer rows.Close() @@ -122,6 +162,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time } var total, ztdHit, ztdMiss int + processedStations := make(map[string]bool) for rows.Next() { var ( @@ -134,11 +175,13 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time bucketStartTS time.Time ) if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS); err != nil { - log.Printf("scan row failed: %v", err) + e.logger.Printf("扫描行失败: %v", err) continue } - // date_time 使用 Go 端按 CST 格式化 + processedStations[deviceID] = true + e.logger.Printf("处理站点 %s: 经度=%.6f, 纬度=%.6f", deviceID, lon.Float64, lat.Float64) + dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05") var pressureStr, tempStr, wsStr, wdStr, rhStr string @@ -161,8 +204,10 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd) if ztdStr != "" { ztdHit++ + e.logger.Printf("站点 %s: ZTD 数据正常, 值=%s", deviceID, ztdStr) } else { ztdMiss++ + e.logger.Printf("站点 %s: ZTD 数据缺失或超出5分钟窗口", deviceID) } var b strings.Builder @@ -192,14 +237,27 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time b.WriteByte(',') // pwv 留空 if _, err := f.WriteString(b.String() + "\n"); err != nil { - log.Printf("write csv failed: %v", err) + e.logger.Printf("写入CSV失败: %v", err) } total++ } if err := rows.Err(); err != nil { return err } - log.Printf("exporter: done bucket start=%s end=%s, rows=%d, ztd_hit=%d, ztd_miss=%d", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), total, ztdHit, ztdMiss) + + // 检查哪些站点在这个桶完全没有数据 + for _, station := range eligibleStations { + if !processedStations[station] { + e.logger.Printf("站点 %s: 本桶完全无数据(rs485_weather_10min 表中未找到记录)", station) + } + } + + e.logger.Printf("导出完成: 桶=%s-%s, 总行数=%d, ZTD命中=%d, ZTD未命中=%d, 缺失站点数=%d", + bucketStart.Format("2006-01-02 15:04:05"), + bucketEnd.Format("2006-01-02 15:04:05"), + total, ztdHit, ztdMiss, + len(eligibleStations)-len(processedStations)) + return nil } @@ -217,11 +275,12 @@ func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd tim `, deviceID, bucketEnd).Scan(&ztd, &ts) if err != nil { if !errors.Is(err, sql.ErrNoRows) { - log.Printf("mysql query ztd failed: %v", err) + e.logger.Printf("查询ZTD失败: station_id=%s, error=%v", deviceID, err) } return "" } if !ztd.Valid { + e.logger.Printf("站点 %s: ZTD值为NULL", deviceID) return "" } return fmtFloat(ztd.Float64*1000.0, -1)