feat: 新增雷达站
This commit is contained in:
parent
8123ecb5c8
commit
de18d4f297
@ -1,6 +1,7 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"database/sql"
|
||||
@ -308,6 +309,119 @@ func (e *Exporter) exportBucket(ctx context.Context, bucketStart, bucketEnd time
|
||||
return err
|
||||
}
|
||||
|
||||
// 追加 RADAR 设备:按 stations.station_alias 在 radar_weather 中取不晚于桶末的最新一条
|
||||
e.logger.Printf("开始写出 RADAR 设备(按 radar_weather 最新一条)...")
|
||||
radarRows, err := e.pg.QueryContext(ctx, `
|
||||
SELECT name, latitude, longitude, altitude, station_alias
|
||||
FROM stations
|
||||
WHERE device_type = 'RADAR'
|
||||
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
||||
`)
|
||||
if err != nil {
|
||||
e.logger.Printf("查询 RADAR 站点失败: %v", err)
|
||||
} else {
|
||||
defer radarRows.Close()
|
||||
// 复用已打开的 CSV 文件句柄
|
||||
rf, rerr := os.OpenFile(activePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
|
||||
if rerr != nil {
|
||||
return rerr
|
||||
}
|
||||
rfw := bufio.NewWriter(rf)
|
||||
defer func() { _ = rfw.Flush(); _ = rf.Close() }()
|
||||
|
||||
for radarRows.Next() {
|
||||
var (
|
||||
name string
|
||||
lat, lon, elev sql.NullFloat64
|
||||
alias sql.NullString
|
||||
)
|
||||
if err := radarRows.Scan(&name, &lat, &lon, &elev, &alias); err != nil {
|
||||
e.logger.Printf("扫描 RADAR 站点失败: %v", err)
|
||||
continue
|
||||
}
|
||||
if !alias.Valid || strings.TrimSpace(alias.String) == "" {
|
||||
continue
|
||||
}
|
||||
// radar_weather 中取 <= 桶末时间的最新一条
|
||||
var (
|
||||
rwTemp, rwHum, rwWS, rwWD, rwP sql.NullFloat64
|
||||
rwDT time.Time
|
||||
)
|
||||
qerr := e.pg.QueryRowContext(ctx, `
|
||||
SELECT temperature, humidity, wind_speed, wind_direction, pressure, dt
|
||||
FROM radar_weather
|
||||
WHERE alias = $1 AND dt <= $2
|
||||
ORDER BY dt DESC
|
||||
LIMIT 1
|
||||
`, alias.String, bucketEnd).Scan(&rwTemp, &rwHum, &rwWS, &rwWD, &rwP, &rwDT)
|
||||
if qerr != nil {
|
||||
if !errors.Is(qerr, sql.ErrNoRows) {
|
||||
e.logger.Printf("查询 radar_weather 失败: alias=%s, err=%v", alias.String, qerr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 字段映射:pressure = pressure/100,温度/风/湿度直接使用,露点/PWV 留空;station_id 与 station_name 都使用 stations.name
|
||||
pressureStr := ""
|
||||
if rwP.Valid {
|
||||
pressureStr = fmtFloat(rwP.Float64/100.0, 2)
|
||||
}
|
||||
tempStr := ""
|
||||
if rwTemp.Valid {
|
||||
tempStr = fmtFloat(rwTemp.Float64, -1)
|
||||
}
|
||||
wsStr := ""
|
||||
if rwWS.Valid {
|
||||
wsStr = fmtFloat(rwWS.Float64, 3)
|
||||
}
|
||||
wdStr := ""
|
||||
if rwWD.Valid {
|
||||
wdStr = fmtFloat(rwWD.Float64, 0)
|
||||
}
|
||||
rhStr := ""
|
||||
if rwHum.Valid {
|
||||
rhStr = fmtFloat(rwHum.Float64, -1)
|
||||
}
|
||||
|
||||
dateTimeStr := bucketEnd.In(e.loc).Format("2006-01-02 15:04:05")
|
||||
var b strings.Builder
|
||||
b.WriteString(fmtNullFloat(lat))
|
||||
b.WriteByte(',')
|
||||
b.WriteString(fmtNullFloat(lon))
|
||||
b.WriteByte(',')
|
||||
b.WriteString(name) // station_id = stations.name
|
||||
b.WriteByte(',')
|
||||
b.WriteString(name) // station_name = stations.name
|
||||
b.WriteByte(',')
|
||||
b.WriteString(dateTimeStr)
|
||||
b.WriteByte(',')
|
||||
b.WriteString(fmtNullFloat(elev))
|
||||
b.WriteByte(',')
|
||||
b.WriteString(pressureStr)
|
||||
b.WriteByte(',')
|
||||
b.WriteString(tempStr)
|
||||
b.WriteByte(',')
|
||||
b.WriteByte(',') // dewpoint 留空
|
||||
b.WriteString(wsStr)
|
||||
b.WriteByte(',')
|
||||
b.WriteString(wdStr)
|
||||
b.WriteByte(',')
|
||||
b.WriteString(rhStr)
|
||||
b.WriteByte(',')
|
||||
b.WriteByte(',') // ztd 留空
|
||||
b.WriteByte(',') // pwv 留空
|
||||
|
||||
if _, err := rfw.WriteString(b.String() + "\n"); err != nil {
|
||||
e.logger.Printf("写入 RADAR CSV 失败: %v", err)
|
||||
} else {
|
||||
total++
|
||||
}
|
||||
}
|
||||
if err := radarRows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 检查哪些站点在这个桶完全没有数据
|
||||
var missingCount int
|
||||
for _, station := range eligibleStations {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user