fix: 修正数据库关联

This commit is contained in:
yarnom 2025-08-22 20:49:10 +08:00
parent 1defe32470
commit bc8027a3d8

View File

@ -95,27 +95,36 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay)) activePath := filepath.Join(outDir, fmt.Sprintf("weather_data_%s.csv", utcDay))
e.logger.Printf("开始导出: 桶=%s-%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath) e.logger.Printf("开始导出: 桶=%s-%s, 文件=%s", bucketStart.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"), activePath)
// 先查询所有符合条件的站点,用于后续比对缺失 // 先查询所有符合条件的站点,用于后续比对缺失 - 修正使用station_id和device_id
var eligibleStations []string var eligibleStations []struct {
StationID string
DeviceID string
}
stationsRows, err := e.pg.QueryContext(ctx, ` stationsRows, err := e.pg.QueryContext(ctx, `
SELECT device_id SELECT station_id, device_id
FROM stations FROM stations
WHERE device_type = 'WH65LP' WHERE device_type = 'WH65LP'
AND latitude IS NOT NULL AND longitude IS NOT NULL AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude <> 0 AND longitude <> 0 AND latitude <> 0 AND longitude <> 0
ORDER BY device_id ORDER BY station_id
`) `)
if err != nil { if err != nil {
e.logger.Printf("查询合格站点失败: %v", err) e.logger.Printf("查询合格站点失败: %v", err)
} else { } else {
defer stationsRows.Close() defer stationsRows.Close()
for stationsRows.Next() { for stationsRows.Next() {
var deviceID string var station struct {
if err := stationsRows.Scan(&deviceID); err == nil { StationID string
eligibleStations = append(eligibleStations, deviceID) DeviceID string
}
if err := stationsRows.Scan(&station.StationID, &station.DeviceID); err == nil {
eligibleStations = append(eligibleStations, station)
} }
} }
e.logger.Printf("合格站点总数: %d, 站点列表: %v", len(eligibleStations), eligibleStations) e.logger.Printf("合格站点总数: %d", len(eligibleStations))
for _, s := range eligibleStations {
e.logger.Printf("合格站点: station_id=%s, device_id=%s", s.StationID, s.DeviceID)
}
} }
// 轮转上一 UTC 日的文件(若存在且未压缩) // 轮转上一 UTC 日的文件(若存在且未压缩)
@ -125,7 +134,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
needHeader := ensureFileWithHeader(activePath) needHeader := ensureFileWithHeader(activePath)
// 查询该桶的数据 // 查询该桶的数据 - 修正使用正确的JOIN条件
rows, err := e.pg.QueryContext(ctx, ` rows, err := e.pg.QueryContext(ctx, `
SELECT SELECT
s.latitude, s.latitude,
@ -137,13 +146,14 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
r.wind_speed_ms_x1000, r.wind_speed_ms_x1000,
r.wind_dir_deg, r.wind_dir_deg,
r.humidity_pct, r.humidity_pct,
r.bucket_start r.bucket_start,
s.station_id
FROM stations s FROM stations s
JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1 JOIN rs485_weather_10min r ON r.station_id = s.station_id AND r.bucket_start = $1
WHERE s.device_type = 'WH65LP' WHERE s.device_type = 'WH65LP'
AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL AND s.latitude IS NOT NULL AND s.longitude IS NOT NULL
AND s.latitude <> 0 AND s.longitude <> 0 AND s.latitude <> 0 AND s.longitude <> 0
ORDER BY s.device_id ORDER BY s.station_id
`, bucketStart) `, bucketStart)
if err != nil { if err != nil {
return fmt.Errorf("查询10分钟数据失败: %v", err) return fmt.Errorf("查询10分钟数据失败: %v", err)
@ -162,7 +172,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
} }
var total, ztdHit, ztdMiss int var total, ztdHit, ztdMiss int
processedStations := make(map[string]bool) processedStationIDs := make(map[string]bool)
for rows.Next() { for rows.Next() {
var ( var (
@ -173,14 +183,15 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
wdDeg sql.NullInt64 wdDeg sql.NullInt64
rh sql.NullInt64 rh sql.NullInt64
bucketStartTS time.Time bucketStartTS time.Time
stationID string
) )
if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS); err != nil { if err := rows.Scan(&lat, &lon, &deviceID, &elev, &pX100, &tX100, &wsX1000, &wdDeg, &rh, &bucketStartTS, &stationID); err != nil {
e.logger.Printf("扫描行失败: %v", err) e.logger.Printf("扫描行失败: %v", err)
continue continue
} }
processedStations[deviceID] = true processedStationIDs[stationID] = true
e.logger.Printf("处理站点 %s: 经度=%.6f, 纬度=%.6f", deviceID, lon.Float64, lat.Float64) e.logger.Printf("处理站点: station_id=%s, device_id=%s, 经度=%.6f, 纬度=%.6f", stationID, deviceID, lon.Float64, lat.Float64)
dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05") dateTimeStr := bucketStartTS.In(e.loc).Format("2006-01-02 15:04:05")
@ -201,13 +212,14 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
rhStr = fmtFloat(float64(rh.Int64), 0) rhStr = fmtFloat(float64(rh.Int64), 0)
} }
// 使用device_id查询ZTD
ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd) ztdStr := e.lookupZTD(ctx, deviceID, bucketEnd)
if ztdStr != "" { if ztdStr != "" {
ztdHit++ ztdHit++
e.logger.Printf("站点 %s: ZTD 数据正常, 值=%s", deviceID, ztdStr) e.logger.Printf("站点 %s (device_id=%s): ZTD 数据正常, 值=%s", stationID, deviceID, ztdStr)
} else { } else {
ztdMiss++ ztdMiss++
e.logger.Printf("站点 %s: ZTD 数据缺失或超出5分钟窗口", deviceID) e.logger.Printf("站点 %s (device_id=%s): ZTD 数据缺失或超出5分钟窗口", stationID, deviceID)
} }
var b strings.Builder var b strings.Builder
@ -215,7 +227,7 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
b.WriteByte(',') b.WriteByte(',')
b.WriteString(fmtNullFloat(lon)) b.WriteString(fmtNullFloat(lon))
b.WriteByte(',') b.WriteByte(',')
b.WriteString(deviceID) b.WriteString(deviceID) // CSV输出用device_id作为station_id
b.WriteByte(',') b.WriteByte(',')
b.WriteByte(',') // station_name 留空 b.WriteByte(',') // station_name 留空
b.WriteString(dateTimeStr) b.WriteString(dateTimeStr)
@ -245,18 +257,20 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
return err return err
} }
// 检查哪些站点在这个桶完全没有数据 // 检查哪些站点在这个桶完全没有数据 - 修正使用station_id比较
var missingCount int
for _, station := range eligibleStations { for _, station := range eligibleStations {
if !processedStations[station] { if !processedStationIDs[station.StationID] {
e.logger.Printf("站点 %s: 本桶完全无数据rs485_weather_10min 表中未找到记录)", station) e.logger.Printf("站点缺失: station_id=%s, device_id=%s (rs485_weather_10min 表中未找到记录)",
station.StationID, station.DeviceID)
missingCount++
} }
} }
e.logger.Printf("导出完成: 桶=%s-%s, 总行数=%d, ZTD命中=%d, ZTD未命中=%d, 缺失站点数=%d", e.logger.Printf("导出完成: 桶=%s-%s, 总行数=%d, ZTD命中=%d, ZTD未命中=%d, 缺失站点数=%d",
bucketStart.Format("2006-01-02 15:04:05"), bucketStart.Format("2006-01-02 15:04:05"),
bucketEnd.Format("2006-01-02 15:04:05"), bucketEnd.Format("2006-01-02 15:04:05"),
total, ztdHit, ztdMiss, total, ztdHit, ztdMiss, missingCount)
len(eligibleStations)-len(processedStations))
return nil return nil
} }
@ -280,7 +294,7 @@ func (e *Exporter) lookupZTD(ctx context.Context, deviceID string, bucketEnd tim
return "" return ""
} }
if !ztd.Valid { if !ztd.Valid {
e.logger.Printf("站点 %s: ZTD值为NULL", deviceID) e.logger.Printf("站点 device_id=%s: ZTD值为NULL", deviceID)
return "" return ""
} }
return fmtFloat(ztd.Float64*1000.0, -1) return fmtFloat(ztd.Float64*1000.0, -1)