292 lines
8.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"
"weatherstation/core/internal/data"
)
type v5Item struct {
FT time.Time
Rain float64
}
type v5Result struct {
Station string
Issued time.Time
Base [3]float64
Actual float64
Prev [3]float64
Out [3]float64
SQLRows []string
Skipped bool
SkipReason string
}
func main() {
var (
stationsCSV string
startStr string
endStr string
sqlOut string
logOut string
tzName string
baseProvider string
outProvider string
)
flag.StringVar(&stationsCSV, "stations", "", "逗号分隔的 station_id 列表,例如: RS485-000001,RS485-000002")
flag.StringVar(&startStr, "start", "", "开始时间,格式: 2006-01-02 15:04 或 2006-01-02按整点对齐")
flag.StringVar(&endStr, "end", "", "结束时间,格式: 2006-01-02 15:04 或 2006-01-02不包含该时刻")
flag.StringVar(&sqlOut, "sql", "v5_output.sql", "输出 SQL 文件路径")
flag.StringVar(&logOut, "log", "v5_output.log", "输出日志文件路径")
flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai")
flag.StringVar(&baseProvider, "base", "imdroid_mix", "基础预报源 provider")
flag.StringVar(&outProvider, "out", "imdroid_V5", "输出预报源 provider")
flag.Parse()
if stationsCSV == "" || startStr == "" || endStr == "" {
fmt.Println("用法示例: v5-export --stations RS485-002A6E --start '2024-08-01 00:00' --end '2024-08-02 00:00' --sql out.sql --log out.log")
os.Exit(2)
}
// logger: stdout + file
if err := os.MkdirAll(filepath.Dir(sqlOut), 0755); err != nil && filepath.Dir(sqlOut) != "." {
log.Fatalf("create sql dir: %v", err)
}
if err := os.MkdirAll(filepath.Dir(logOut), 0755); err != nil && filepath.Dir(logOut) != "." {
log.Fatalf("create log dir: %v", err)
}
lf, err := os.Create(logOut)
if err != nil {
log.Fatalf("open log file: %v", err)
}
defer lf.Close()
mw := io.MultiWriter(os.Stdout, lf)
logger := log.New(mw, "", log.LstdFlags)
sf, err := os.Create(sqlOut)
if err != nil {
logger.Fatalf("open sql file: %v", err)
}
defer sf.Close()
loc, _ := time.LoadLocation(tzName)
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
parseTime := func(s string) (time.Time, error) {
layouts := []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"}
var lastErr error
for _, ly := range layouts {
t, err := time.ParseInLocation(ly, s, loc)
if err == nil {
return t, nil
}
lastErr = err
}
return time.Time{}, lastErr
}
start, err := parseTime(startStr)
if err != nil {
logger.Fatalf("parse start: %v", err)
}
end, err := parseTime(endStr)
if err != nil {
logger.Fatalf("parse end: %v", err)
}
start = start.Truncate(time.Hour)
end = end.Truncate(time.Hour)
if !end.After(start) {
logger.Fatalf("end 必须大于 start")
}
stations := splitStations(stationsCSV)
ctx := context.Background()
// 写文件头
fmt.Fprintf(sf, "-- V5 Export generated at %s\n", time.Now().Format(time.RFC3339))
fmt.Fprintf(sf, "BEGIN;\n")
for _, st := range stations {
logger.Printf("处理站点 %s: %s → %s", st, start.Format("2006-01-02 15:04"), end.Format("2006-01-02 15:04"))
for t := start; t.Before(end); t = t.Add(1 * time.Hour) {
res := computeV5(ctx, st, t, baseProvider, outProvider, loc)
if res.Skipped {
logger.Printf("skip station=%s issued=%s: %s", st, t.Format("2006-01-02 15:04"), res.SkipReason)
continue
}
// 日志
logger.Printf("V5 station=%s issued=%s base=[%.3f,%.3f,%.3f] actual=%.3f prev=[%.3f,%.3f,%.3f] out=[%.3f,%.3f,%.3f]",
st, t.Format("2006-01-02 15:04"),
res.Base[0], res.Base[1], res.Base[2], res.Actual,
res.Prev[0], res.Prev[1], res.Prev[2],
res.Out[0], res.Out[1], res.Out[2])
// SQL
for _, row := range res.SQLRows {
fmt.Fprintln(sf, row)
}
}
}
fmt.Fprintf(sf, "COMMIT;\n")
logger.Printf("完成SQL 已写入: %s ,日志: %s", sqlOut, logOut)
}
func splitStations(s string) []string {
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}
func computeV5(ctx context.Context, stationID string, issued time.Time, baseProvider, outProvider string, loc *time.Location) v5Result {
res := v5Result{Station: stationID, Issued: issued}
// base issued in this bucket
baseIssued, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, issued)
if err != nil || !ok {
res.Skipped, res.SkipReason = true, fmt.Sprintf("base issued missing: %v ok=%v", err, ok)
return res
}
basePoints, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, baseIssued)
if err != nil || len(basePoints) < 3 {
res.Skipped, res.SkipReason = true, fmt.Sprintf("base points insufficient: %v len=%d", err, len(basePoints))
return res
}
// targets times
ft1 := issued.Add(1 * time.Hour)
ft2 := issued.Add(2 * time.Hour)
ft3 := issued.Add(3 * time.Hour)
base1 := pickRain(basePoints, ft1)
base2 := pickRain(basePoints, ft2)
base3 := pickRain(basePoints, ft3)
res.Base = [3]float64{base1, base2, base3}
// actual just-finished hour
actual, okA, err := data.FetchActualHourlyRain(ctx, stationID, issued.Add(-time.Hour), issued)
if err != nil || !okA {
res.Skipped, res.SkipReason = true, fmt.Sprintf("actual missing: %v ok=%v", err, okA)
return res
}
res.Actual = actual
// previous preds aligned to same validation time (ft1)
p1, e1 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-1*time.Hour), 1, ft1)
if e1 != nil {
res.Skipped, res.SkipReason = true, e1.Error()
return res
}
p2, e2 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-2*time.Hour), 2, ft1)
if e2 != nil {
res.Skipped, res.SkipReason = true, e2.Error()
return res
}
p3, e3 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-3*time.Hour), 3, ft1)
if e3 != nil {
res.Skipped, res.SkipReason = true, e3.Error()
return res
}
res.Prev = [3]float64{p1, p2, p3}
r1 := actual - p1
r2 := actual - p2
r3 := actual - p3
// Baseline-fallback if negative for all leads
cand1 := base1 + 1.0*r1
cand2 := base2 + 0.5*r2
cand3 := base3 + (1.0/3.0)*r3
var out1, out2, out3 float64
if cand1 < 0 {
out1 = base1
} else {
out1 = cand1
}
if cand2 < 0 {
out2 = base2
} else {
out2 = cand2
}
if cand3 < 0 {
out3 = base3
} else {
out3 = cand3
}
res.Out = [3]float64{out1, out2, out3}
rows := make([]string, 0, 3)
rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft1, toX1000(out1)))
rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft2, toX1000(out2)))
rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft3, toX1000(out3)))
res.SQLRows = rows
return res
}
func pickPrevPredict(ctx context.Context, stationID, provider string, prevBucket time.Time, lead int, validFT time.Time) (float64, error) {
iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, provider, prevBucket)
if err != nil || !ok {
return 0, fmt.Errorf("prev issued missing bucket=%s: %v ok=%v", prevBucket.Format("2006-01-02 15:04"), err, ok)
}
pts, err := data.ForecastRainAtIssued(ctx, stationID, provider, iss)
if err != nil || len(pts) < lead {
return 0, fmt.Errorf("prev points insufficient lead=%d: %v len=%d", lead, err, len(pts))
}
if v := pickRain(pts, validFT); v >= 0 {
return v, nil
}
switch lead {
case 1:
return toMM(pts[0].RainMMx1000), nil
case 2:
if len(pts) >= 2 {
return toMM(pts[1].RainMMx1000), nil
}
case 3:
if len(pts) >= 3 {
return toMM(pts[2].RainMMx1000), nil
}
}
return 0, fmt.Errorf("prev choose failed lead=%d", lead)
}
func pickRain(points []data.PredictPoint, ft time.Time) float64 {
for _, p := range points {
if p.ForecastTime.Equal(ft) {
return toMM(p.RainMMx1000)
}
}
return -1
}
func toMM(vx1000 int32) float64 { return float64(vx1000) / 1000.0 }
func toX1000(mm float64) int32 { return int32(mm*1000 + 0.5) }
func clamp0(v float64) float64 {
if v < 0 {
return 0
}
return v
}
func insertRainSQL(stationID, provider string, issued, ft time.Time, rainX1000 int32) string {
// 使用 RFC3339 和 Postgres timestamptz 解析兼容的格式
return fmt.Sprintf(
"INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000) VALUES ('%s','%s','%s','%s',%d) ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000;",
escapeSQL(stationID), provider, issued.Format(time.RFC3339), ft.Format(time.RFC3339), rainX1000,
)
}
func escapeSQL(s string) string { return strings.ReplaceAll(s, "'", "''") }