package main import ( "context" "flag" "fmt" "io" "log" "os" "path/filepath" "strings" "time" "weatherstation/core/internal/data" ) type v5Item struct { FT time.Time Rain float64 } type v5Result struct { Station string Issued time.Time Base [3]float64 Actual float64 Prev [3]float64 Out [3]float64 SQLRows []string Skipped bool SkipReason string } func main() { var ( stationsCSV string startStr string endStr string sqlOut string logOut string tzName string baseProvider string outProvider string ) flag.StringVar(&stationsCSV, "stations", "", "逗号分隔的 station_id 列表,例如: RS485-000001,RS485-000002") flag.StringVar(&startStr, "start", "", "开始时间,格式: 2006-01-02 15:04 或 2006-01-02(按整点对齐)") flag.StringVar(&endStr, "end", "", "结束时间,格式: 2006-01-02 15:04 或 2006-01-02(不包含该时刻)") flag.StringVar(&sqlOut, "sql", "v5_output.sql", "输出 SQL 文件路径") flag.StringVar(&logOut, "log", "v5_output.log", "输出日志文件路径") flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai") flag.StringVar(&baseProvider, "base", "imdroid_mix", "基础预报源 provider") flag.StringVar(&outProvider, "out", "imdroid_V5", "输出预报源 provider") flag.Parse() if stationsCSV == "" || startStr == "" || endStr == "" { fmt.Println("用法示例: v5-export --stations RS485-002A6E --start '2024-08-01 00:00' --end '2024-08-02 00:00' --sql out.sql --log out.log") os.Exit(2) } // logger: stdout + file if err := os.MkdirAll(filepath.Dir(sqlOut), 0755); err != nil && filepath.Dir(sqlOut) != "." { log.Fatalf("create sql dir: %v", err) } if err := os.MkdirAll(filepath.Dir(logOut), 0755); err != nil && filepath.Dir(logOut) != "." { log.Fatalf("create log dir: %v", err) } lf, err := os.Create(logOut) if err != nil { log.Fatalf("open log file: %v", err) } defer lf.Close() mw := io.MultiWriter(os.Stdout, lf) logger := log.New(mw, "", log.LstdFlags) sf, err := os.Create(sqlOut) if err != nil { logger.Fatalf("open sql file: %v", err) } defer sf.Close() loc, _ := time.LoadLocation(tzName) if loc == nil { loc = time.FixedZone("CST", 8*3600) } parseTime := func(s string) (time.Time, error) { layouts := []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} var lastErr error for _, ly := range layouts { t, err := time.ParseInLocation(ly, s, loc) if err == nil { return t, nil } lastErr = err } return time.Time{}, lastErr } start, err := parseTime(startStr) if err != nil { logger.Fatalf("parse start: %v", err) } end, err := parseTime(endStr) if err != nil { logger.Fatalf("parse end: %v", err) } start = start.Truncate(time.Hour) end = end.Truncate(time.Hour) if !end.After(start) { logger.Fatalf("end 必须大于 start") } stations := splitStations(stationsCSV) ctx := context.Background() // 写文件头 fmt.Fprintf(sf, "-- V5 Export generated at %s\n", time.Now().Format(time.RFC3339)) fmt.Fprintf(sf, "BEGIN;\n") for _, st := range stations { logger.Printf("处理站点 %s: %s → %s", st, start.Format("2006-01-02 15:04"), end.Format("2006-01-02 15:04")) for t := start; t.Before(end); t = t.Add(1 * time.Hour) { res := computeV5(ctx, st, t, baseProvider, outProvider, loc) if res.Skipped { logger.Printf("skip station=%s issued=%s: %s", st, t.Format("2006-01-02 15:04"), res.SkipReason) continue } // 日志 logger.Printf("V5 station=%s issued=%s base=[%.3f,%.3f,%.3f] actual=%.3f prev=[%.3f,%.3f,%.3f] out=[%.3f,%.3f,%.3f]", st, t.Format("2006-01-02 15:04"), res.Base[0], res.Base[1], res.Base[2], res.Actual, res.Prev[0], res.Prev[1], res.Prev[2], res.Out[0], res.Out[1], res.Out[2]) // SQL for _, row := range res.SQLRows { fmt.Fprintln(sf, row) } } } fmt.Fprintf(sf, "COMMIT;\n") logger.Printf("完成,SQL 已写入: %s ,日志: %s", sqlOut, logOut) } func splitStations(s string) []string { parts := strings.Split(s, ",") out := make([]string, 0, len(parts)) for _, p := range parts { p = strings.TrimSpace(p) if p != "" { out = append(out, p) } } return out } func computeV5(ctx context.Context, stationID string, issued time.Time, baseProvider, outProvider string, loc *time.Location) v5Result { res := v5Result{Station: stationID, Issued: issued} // base issued in this bucket baseIssued, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, issued) if err != nil || !ok { res.Skipped, res.SkipReason = true, fmt.Sprintf("base issued missing: %v ok=%v", err, ok) return res } basePoints, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, baseIssued) if err != nil || len(basePoints) < 3 { res.Skipped, res.SkipReason = true, fmt.Sprintf("base points insufficient: %v len=%d", err, len(basePoints)) return res } // targets times ft1 := issued.Add(1 * time.Hour) ft2 := issued.Add(2 * time.Hour) ft3 := issued.Add(3 * time.Hour) base1 := pickRain(basePoints, ft1) base2 := pickRain(basePoints, ft2) base3 := pickRain(basePoints, ft3) res.Base = [3]float64{base1, base2, base3} // actual just-finished hour actual, okA, err := data.FetchActualHourlyRain(ctx, stationID, issued.Add(-time.Hour), issued) if err != nil || !okA { res.Skipped, res.SkipReason = true, fmt.Sprintf("actual missing: %v ok=%v", err, okA) return res } res.Actual = actual // previous preds aligned to same validation time (ft1) p1, e1 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-1*time.Hour), 1, ft1) if e1 != nil { res.Skipped, res.SkipReason = true, e1.Error() return res } p2, e2 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-2*time.Hour), 2, ft1) if e2 != nil { res.Skipped, res.SkipReason = true, e2.Error() return res } p3, e3 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-3*time.Hour), 3, ft1) if e3 != nil { res.Skipped, res.SkipReason = true, e3.Error() return res } res.Prev = [3]float64{p1, p2, p3} r1 := actual - p1 r2 := actual - p2 r3 := actual - p3 // Baseline-fallback if negative for all leads cand1 := base1 + 1.0*r1 cand2 := base2 + 0.5*r2 cand3 := base3 + (1.0/3.0)*r3 var out1, out2, out3 float64 if cand1 < 0 { out1 = base1 } else { out1 = cand1 } if cand2 < 0 { out2 = base2 } else { out2 = cand2 } if cand3 < 0 { out3 = base3 } else { out3 = cand3 } res.Out = [3]float64{out1, out2, out3} rows := make([]string, 0, 3) rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft1, toX1000(out1))) rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft2, toX1000(out2))) rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft3, toX1000(out3))) res.SQLRows = rows return res } func pickPrevPredict(ctx context.Context, stationID, provider string, prevBucket time.Time, lead int, validFT time.Time) (float64, error) { iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, provider, prevBucket) if err != nil || !ok { return 0, fmt.Errorf("prev issued missing bucket=%s: %v ok=%v", prevBucket.Format("2006-01-02 15:04"), err, ok) } pts, err := data.ForecastRainAtIssued(ctx, stationID, provider, iss) if err != nil || len(pts) < lead { return 0, fmt.Errorf("prev points insufficient lead=%d: %v len=%d", lead, err, len(pts)) } if v := pickRain(pts, validFT); v >= 0 { return v, nil } switch lead { case 1: return toMM(pts[0].RainMMx1000), nil case 2: if len(pts) >= 2 { return toMM(pts[1].RainMMx1000), nil } case 3: if len(pts) >= 3 { return toMM(pts[2].RainMMx1000), nil } } return 0, fmt.Errorf("prev choose failed lead=%d", lead) } func pickRain(points []data.PredictPoint, ft time.Time) float64 { for _, p := range points { if p.ForecastTime.Equal(ft) { return toMM(p.RainMMx1000) } } return -1 } func toMM(vx1000 int32) float64 { return float64(vx1000) / 1000.0 } func toX1000(mm float64) int32 { return int32(mm*1000 + 0.5) } func clamp0(v float64) float64 { if v < 0 { return 0 } return v } func insertRainSQL(stationID, provider string, issued, ft time.Time, rainX1000 int32) string { // 使用 RFC3339 和 Postgres timestamptz 解析兼容的格式 return fmt.Sprintf( "INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000) VALUES ('%s','%s','%s','%s',%d) ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000;", escapeSQL(stationID), provider, issued.Format(time.RFC3339), ft.Format(time.RFC3339), rainX1000, ) } func escapeSQL(s string) string { return strings.ReplaceAll(s, "'", "''") }