From bc8027a3d852cfff990353f8cef6c8e5e7fce66d Mon Sep 17 00:00:00 2001 From: yarnom Date: Fri, 22 Aug 2025 20:49:10 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=20=E4=BF=AE=E6=AD=A3=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E5=85=B3=E8=81=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/tools/exporter.go | 62 +++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/internal/tools/exporter.go b/internal/tools/exporter.go index 885a79d..52d26d6 100644 --- a/internal/tools/exporter.go +++ b/internal/tools/exporter.go @@ -95,27 +95,36 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay)) e.logger.Printf("开始导出: 桶=%s-%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath) - // 先查询所有符合条件的站点,用于后续比对缺失 - var eligibleStations []string + // 先查询所有符合条件的站点,用于后续比对缺失 - 修正:使用station_id和device_id + var eligibleStations []struct { + StationID string + DeviceID string + } stationsRows, err := e.pg.QueryContext(ctx, ` - SELECT device_id + SELECT station_id, device_id FROM stations WHERE device_type = 'WH65LP' AND latitude IS NOT NULL AND longitude IS NOT NULL AND latitude <> 0 AND longitude <> 0 - ORDER BY device_id + ORDER BY station_id `) if err != nil { e.logger.Printf("查询合格站点失败: %v", err) } else { defer stationsRows.Close() for stationsRows.Next() { - var deviceID string - if err := stationsRows.Scan(&deviceID); err == nil { - eligibleStations = append(eligibleStations, deviceID) + var station struct { + StationID string + DeviceID string + } + if err := stationsRows.Scan(&station.StationID, &station.DeviceID); err == nil { + eligibleStations = append(eligibleStations, station) } } - e.logger.Printf("合格站点总数: %d, 站点列表: %v", len(eligibleStations), eligibleStations) + e.logger.Printf("合格站点总数: %d", len(eligibleStations)) + for _, s := range eligibleStations { + e.logger.Printf("合格站点: station_id=%s, device_id=%s", s.StationID, s.DeviceID) + } } // 轮转上一 UTC 日的文件(若存在且未压缩) @@ -125,7 +134,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time needHeader := ensureFileWithHeader(activePath) - // 查询该桶的数据 + // 查询该桶的数据 - 修正:使用正确的JOIN条件 rows, err := e.pg.QueryContext(ctx, ` SELECT s.latitude, @@ -137,13 +146,14 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time r.wind_speed_ms_x1000, r.wind_dir_deg, r.humidity_pct, - r.bucket_start + r.bucket_start, + s.station_id FROM stations s JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1 WHERE s.device_type = 'WH65LP' AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL AND s.latitude <> 0 AND s.longitude <> 0 - ORDER BY s.device_id + ORDER BY s.station_id `, bucketStart) if err != nil { return fmt.Errorf("查询10分钟数据失败: %v", err) @@ -162,7 +172,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time } var total, ztdHit, ztdMiss int - processedStations := make(map[string]bool) + processedStationIDs := make(map[string]bool) for rows.Next() { var ( @@ -173,14 +183,15 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time wdDeg sql.NullInt64 rh sql.NullInt64 bucketStartTS time.Time + stationID string ) - if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS); err != nil { + if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS, &stationID); err != nil { e.logger.Printf("扫描行失败: %v", err) continue } - processedStations[deviceID] = true - e.logger.Printf("处理站点 %s: 经度=%.6f, 纬度=%.6f", deviceID, lon.Float64, lat.Float64) + processedStationIDs[stationID] = true + e.logger.Printf("处理站点: station_id=%s, device_id=%s, 经度=%.6f, 纬度=%.6f", stationID, deviceID, lon.Float64, lat.Float64) dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05") @@ -201,13 +212,14 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time rhStr = fmtFloat(float64(rh.Int64), 0) } + // 使用device_id查询ZTD ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd) if ztdStr != "" { ztdHit++ - e.logger.Printf("站点 %s: ZTD 数据正常, 值=%s", deviceID, ztdStr) + e.logger.Printf("站点 %s (device_id=%s): ZTD 数据正常, 值=%s", stationID, deviceID, ztdStr) } else { ztdMiss++ - e.logger.Printf("站点 %s: ZTD 数据缺失或超出5分钟窗口", deviceID) + e.logger.Printf("站点 %s (device_id=%s): ZTD 数据缺失或超出5分钟窗口", stationID, deviceID) } var b strings.Builder @@ -215,7 +227,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time b.WriteByte(',') b.WriteString(fmtNullFloat(lon)) b.WriteByte(',') - b.WriteString(deviceID) + b.WriteString(deviceID) // CSV输出用device_id作为station_id b.WriteByte(',') b.WriteByte(',') // station_name 留空 b.WriteString(dateTimeStr) @@ -245,18 +257,20 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time return err } - // 检查哪些站点在这个桶完全没有数据 + // 检查哪些站点在这个桶完全没有数据 - 修正:使用station_id比较 + var missingCount int for _, station := range eligibleStations { - if !processedStations[station] { - e.logger.Printf("站点 %s: 本桶完全无数据(rs485_weather_10min 表中未找到记录)", station) + if !processedStationIDs[station.StationID] { + e.logger.Printf("站点缺失: station_id=%s, device_id=%s (rs485_weather_10min 表中未找到记录)", + station.StationID, station.DeviceID) + missingCount++ } } e.logger.Printf("导出完成: 桶=%s-%s, 总行数=%d, ZTD命中=%d, ZTD未命中=%d, 缺失站点数=%d", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), - total, ztdHit, ztdMiss, - len(eligibleStations)-len(processedStations)) + total, ztdHit, ztdMiss, missingCount) return nil } @@ -280,7 +294,7 @@ func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd tim return "" } if !ztd.Valid { - e.logger.Printf("站点 %s: ZTD值为NULL", deviceID) + e.logger.Printf("站点 device_id=%s: ZTD值为NULL", deviceID) return "" } return fmtFloat(ztd.Float64*1000.0, -1)