From de18d4f297497819178d21723068b04f6ecf4654 Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 12 Nov 2025 16:38:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E9=9B=B7=E8=BE=BE?= =?UTF-8?q?=E7=AB=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/tools/exporter.go | 114 +++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/internal/tools/exporter.go b/internal/tools/exporter.go index fde213c..a6591b3 100644 --- a/internal/tools/exporter.go +++ b/internal/tools/exporter.go @@ -1,6 +1,7 @@ package tools import ( + "bufio" "compress/gzip" "context" "database/sql" @@ -308,6 +309,119 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time return err } + // 追加 RADAR 设备:按 stations.station_alias 在 radar_weather 中取不晚于桶末的最新一条 + e.logger.Printf("开始写出 RADAR 设备(按 radar_weather 最新一条)...") + radarRows, err := e.pg.QueryContext(ctx, ` + SELECT name, latitude, longitude, altitude, station_alias + FROM stations + WHERE device_type = 'RADAR' + AND latitude IS NOT NULL AND longitude IS NOT NULL + `) + if err != nil { + e.logger.Printf("查询 RADAR 站点失败: %v", err) + } else { + defer radarRows.Close() + // 复用已打开的 CSV 文件句柄 + rf, rerr := os.OpenFile(activePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if rerr != nil { + return rerr + } + rfw := bufio.NewWriter(rf) + defer func() { _ = rfw.Flush(); _ = rf.Close() }() + + for radarRows.Next() { + var ( + name string + lat, lon, elev sql.NullFloat64 + alias sql.NullString + ) + if err := radarRows.Scan(&name, &lat, &lon, &elev, &alias); err != nil { + e.logger.Printf("扫描 RADAR 站点失败: %v", err) + continue + } + if !alias.Valid || strings.TrimSpace(alias.String) == "" { + continue + } + // radar_weather 中取 <= 桶末时间的最新一条 + var ( + rwTemp, rwHum, rwWS, rwWD, rwP sql.NullFloat64 + rwDT time.Time + ) + qerr := e.pg.QueryRowContext(ctx, ` + SELECT temperature, humidity, wind_speed, wind_direction, pressure, dt + FROM radar_weather + WHERE alias = $1 AND dt <= $2 + ORDER BY dt DESC + LIMIT 1 + `, alias.String, bucketEnd).Scan(&rwTemp, &rwHum, &rwWS, &rwWD, &rwP, &rwDT) + if qerr != nil { + if !errors.Is(qerr, sql.ErrNoRows) { + e.logger.Printf("查询 radar_weather 失败: alias=%s, err=%v", alias.String, qerr) + } + continue + } + + // 字段映射:pressure = pressure/100,温度/风/湿度直接使用,露点/PWV 留空;station_id 与 station_name 都使用 stations.name + pressureStr := "" + if rwP.Valid { + pressureStr = fmtFloat(rwP.Float64/100.0, 2) + } + tempStr := "" + if rwTemp.Valid { + tempStr = fmtFloat(rwTemp.Float64, -1) + } + wsStr := "" + if rwWS.Valid { + wsStr = fmtFloat(rwWS.Float64, 3) + } + wdStr := "" + if rwWD.Valid { + wdStr = fmtFloat(rwWD.Float64, 0) + } + rhStr := "" + if rwHum.Valid { + rhStr = fmtFloat(rwHum.Float64, -1) + } + + dateTimeStr := bucketEnd.In(e.loc).Format("2006-01-02 15:04:05") + var b strings.Builder + b.WriteString(fmtNullFloat(lat)) + b.WriteByte(',') + b.WriteString(fmtNullFloat(lon)) + b.WriteByte(',') + b.WriteString(name) // station_id = stations.name + b.WriteByte(',') + b.WriteString(name) // station_name = stations.name + b.WriteByte(',') + b.WriteString(dateTimeStr) + b.WriteByte(',') + b.WriteString(fmtNullFloat(elev)) + b.WriteByte(',') + b.WriteString(pressureStr) + b.WriteByte(',') + b.WriteString(tempStr) + b.WriteByte(',') + b.WriteByte(',') // dewpoint 留空 + b.WriteString(wsStr) + b.WriteByte(',') + b.WriteString(wdStr) + b.WriteByte(',') + b.WriteString(rhStr) + b.WriteByte(',') + b.WriteByte(',') // ztd 留空 + b.WriteByte(',') // pwv 留空 + + if _, err := rfw.WriteString(b.String() + "\n"); err != nil { + e.logger.Printf("写入 RADAR CSV 失败: %v", err) + } else { + total++ + } + } + if err := radarRows.Err(); err != nil { + return err + } + } + // 检查哪些站点在这个桶完全没有数据 var missingCount int for _, station := range eligibleStations {