From 75cb5722f88ddac168193ac32067ea86305a77c7 Mon Sep 17 00:00:00 2001 From: yarnom Date: Fri, 22 Aug 2025 20:57:27 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=20=E4=BF=AE=E6=AD=A3=E6=97=B6?= =?UTF-8?q?=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/tools/exporter.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/internal/tools/exporter.go b/internal/tools/exporter.go index 52d26d6..79526de 100644 --- a/internal/tools/exporter.go +++ b/internal/tools/exporter.go @@ -61,6 +61,7 @@ func (e *Exporter) Start(ctx context.Context) error { } now := time.Now().In(e.loc) + // 下一个10分钟边界 + 30s next := alignToNextBucketEnd(now, 10).Add(30 * time.Second) e.logger.Printf("调度: 当前=%s, 下次执行=%s", now.Format("2006-01-02 15:04:05"), next.Format("2006-01-02 15:04:05")) delay := time.Until(next) @@ -74,12 +75,19 @@ func (e *Exporter) Start(ctx context.Context) error { } } - end := time.Now().In(e.loc).Truncate(time.Minute) - end = alignToPrevBucketEnd(end, 10) - start := end.Add(-10 * time.Minute) + // 修正:当前时间是桶末,需要导出的是刚结束的上一个桶 + // 例如:现在是20:50:30,应该导出的是20:40桶(表示20:40-20:50的数据) + currentTime := time.Now().In(e.loc) + bucketEnd := alignToPrevBucketEnd(currentTime, 10) // 上一个桶末,如20:50 + bucketStart := bucketEnd.Add(-10 * time.Minute) // 上一个桶开始,如20:40 - if err := e.exportBucket(ctx, start, end); err != nil { - e.logger.Printf("导出桶 %s-%s 失败: %v", start.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), err) + e.logger.Printf("当前时间=%s, 导出桶=%s-%s", + currentTime.Format("2006-01-02 15:04:05"), + bucketStart.Format("2006-01-02 15:04:05"), + bucketEnd.Format("2006-01-02 15:04:05")) + + if err := e.exportBucket(ctx, bucketStart, bucketEnd); err != nil { + e.logger.Printf("导出桶 %s-%s 失败: %v", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), err) } } } @@ -95,7 +103,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay)) e.logger.Printf("开始导出: 桶=%s-%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath) - // 先查询所有符合条件的站点,用于后续比对缺失 - 修正:使用station_id和device_id + // 先查询所有符合条件的站点,用于后续比对缺失 var eligibleStations []struct { StationID string DeviceID string @@ -134,7 +142,8 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time needHeader := ensureFileWithHeader(activePath) - // 查询该桶的数据 - 修正:使用正确的JOIN条件 + // 查询该桶的数据 - 使用桶开始时间查询 + e.logger.Printf("查询数据: 使用bucket_start=%s", bucketStart.Format("2006-01-02 15:04:05")) rows, err := e.pg.QueryContext(ctx, ` SELECT s.latitude, @@ -193,7 +202,9 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time processedStationIDs[stationID] = true e.logger.Printf("处理站点: station_id=%s, device_id=%s, 经度=%.6f, 纬度=%.6f", stationID, deviceID, lon.Float64, lat.Float64) + // 使用桶开始时间作为date_time dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05") + e.logger.Printf("站点 %s: 使用时间=%s", stationID, dateTimeStr) var pressureStr, tempStr, wsStr, wdStr, rhStr string if pX100.Valid { @@ -212,7 +223,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time rhStr = fmtFloat(float64(rh.Int64), 0) } - // 使用device_id查询ZTD + // 使用device_id查询ZTD,使用桶末时间 ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd) if ztdStr != "" { ztdHit++ @@ -257,7 +268,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time return err } - // 检查哪些站点在这个桶完全没有数据 - 修正:使用station_id比较 + // 检查哪些站点在这个桶完全没有数据 var missingCount int for _, station := range eligibleStations { if !processedStationIDs[station.StationID] {