2025-12-01 14:32:45 +08:00

262 lines
7.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"flag"
"fmt"
"log"
"strings"
"time"
"weatherstation/core/internal/data"
)
const (
v6OutProvider = "imdroid_V6"
)
func main() {
var stationsCSV, issuedStr, startStr, endStr, tzName, baseProvider string
flag.StringVar(&stationsCSV, "stations", "", "逗号分隔的 station_id 列表;为空则自动扫描有基线的站点")
flag.StringVar(&issuedStr, "issued", "", "指定 issued 时间(整点),格式: 2006-01-02 15:00为空用当前整点")
flag.StringVar(&startStr, "start", "", "开始时间(整点),格式: 2006-01-02 15:00与 --end 一起使用end 为开区间")
flag.StringVar(&endStr, "end", "", "结束时间(整点),格式: 2006-01-02 15:00与 --start 一起使用end 为开区间")
flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai")
flag.StringVar(&baseProvider, "base", "imdroid_mix", "基础预报源 provider例如: imdroid_mix, caiyun, open-meteo")
flag.Parse()
ctx := context.Background()
loc, _ := time.LoadLocation(tzName)
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
parse := func(s string) (time.Time, error) {
var t time.Time
var err error
for _, ly := range []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} {
t, err = time.ParseInLocation(ly, s, loc)
if err == nil {
return t.Truncate(time.Hour), nil
}
}
return time.Time{}, err
}
if strings.TrimSpace(startStr) != "" && strings.TrimSpace(endStr) != "" {
start, err := parse(startStr)
if err != nil {
log.Fatalf("无法解析 start: %v", err)
}
end, err := parse(endStr)
if err != nil {
log.Fatalf("无法解析 end: %v", err)
}
if !end.After(start) {
log.Fatalf("end 必须大于 start")
}
for t := start; t.Before(end); t = t.Add(time.Hour) {
var stations []string
if strings.TrimSpace(stationsCSV) != "" {
stations = splitStations(stationsCSV)
} else {
var err error
stations, err = listStationsWithBase(ctx, baseProvider, t)
if err != nil {
log.Fatalf("list stations failed: %v", err)
}
}
if len(stations) == 0 {
log.Printf("no stations to process for issued=%s", t.Format("2006-01-02 15:04:05"))
continue
}
for _, st := range stations {
if err := runV6ForStation(ctx, st, t, baseProvider); err != nil {
log.Printf("V6 station=%s issued=%s error: %v", st, t.Format("2006-01-02 15:04:05"), err)
}
}
}
return
}
var issued time.Time
if strings.TrimSpace(issuedStr) != "" {
var err error
issued, err = parse(issuedStr)
if err != nil || issued.IsZero() {
log.Fatalf("无法解析 issued: %v", err)
}
} else {
issued = time.Now().In(loc).Truncate(time.Hour)
}
var stations []string
if strings.TrimSpace(stationsCSV) != "" {
stations = splitStations(stationsCSV)
} else {
var err error
stations, err = listStationsWithBase(ctx, baseProvider, issued)
if err != nil {
log.Fatalf("list stations failed: %v", err)
}
}
if len(stations) == 0 {
log.Printf("no stations to process for issued=%s", issued.Format("2006-01-02 15:04:05"))
return
}
for _, st := range stations {
if err := runV6ForStation(ctx, st, issued, baseProvider); err != nil {
log.Printf("V6 station=%s error: %v", st, err)
}
}
}
func splitStations(s string) []string {
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}
// listStationsWithBase 与 v5 共用逻辑,通过 forecast_hourly 检索该 issued 桶内有基线的站点
func listStationsWithBase(ctx context.Context, provider string, issued time.Time) ([]string, error) {
const q = `SELECT DISTINCT station_id FROM forecast_hourly WHERE provider=$1 AND issued_at >= $2 AND issued_at < $2 + interval '1 hour'`
rows, err := data.DB().QueryContext(ctx, q, provider, issued)
if err != nil {
return nil, err
}
defer rows.Close()
var out []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err == nil {
out = append(out, id)
}
}
return out, nil
}
func runV6ForStation(ctx context.Context, stationID string, issued time.Time, baseProvider string) error {
// 基线:当期小时桶内 mix 最新 issued
baseIssued, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, issued)
if err != nil || !ok {
return fmt.Errorf("base issued missing: %v ok=%v", err, ok)
}
pts, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, baseIssued)
if err != nil || len(pts) < 3 {
return fmt.Errorf("base points insufficient: %v len=%d", err, len(pts))
}
ft1, ft2, ft3 := issued.Add(time.Hour), issued.Add(2*time.Hour), issued.Add(3*time.Hour)
base1, base2, base3 := pickRain(pts, ft1), pickRain(pts, ft2), pickRain(pts, ft3)
// 实况
actual, okA, err := data.FetchActualHourlyRain(ctx, stationID, issued.Add(-time.Hour), issued)
if err != nil || !okA {
return fmt.Errorf("actual missing: %v ok=%v", err, okA)
}
// 残差:优先 V6 历史,否则回退 mix 历史
vPrev1, ok1 := prevV6OrMix(ctx, stationID, issued, 1, baseProvider)
vPrev2, ok2 := prevV6OrMix(ctx, stationID, issued, 2, baseProvider)
vPrev3, ok3 := prevV6OrMix(ctx, stationID, issued, 3, baseProvider)
if !(ok1 && ok2 && ok3) {
return fmt.Errorf("prev missing leads: h1=%v h2=%v h3=%v", ok1, ok2, ok3)
}
e1 := actual - vPrev1
e2 := actual - vPrev2
e3 := actual - vPrev3
cand1 := base1 + 1.0*e1
cand2 := base2 + 0.5*e2
cand3 := base3 + (1.0/3.0)*e3
var out1, out2, out3 float64
if cand1 < 0 {
out1 = base1
} else {
out1 = cand1
}
if cand2 < 0 {
out2 = base2
} else {
out2 = cand2
}
if cand3 < 0 {
out3 = base3
} else {
out3 = cand3
}
items := []data.UpsertRainItem{
{ForecastTime: ft1, RainMMx1000: toX1000(out1)},
{ForecastTime: ft2, RainMMx1000: toX1000(out2)},
{ForecastTime: ft3, RainMMx1000: toX1000(out3)},
}
if err := data.UpsertForecastRain(ctx, stationID, v6OutProvider, issued, items); err != nil {
return err
}
log.Printf("V6 %s issued=%s base=[%.3f,%.3f,%.3f] actual=%.3f prev=[%.3f,%.3f,%.3f] out=[%.3f,%.3f,%.3f]",
stationID, issued.Format("2006-01-02 15:04:05"), base1, base2, base3, actual, vPrev1, vPrev2, vPrev3, out1, out2, out3)
return nil
}
func prevV6OrMix(ctx context.Context, stationID string, issued time.Time, lead int, baseProvider string) (float64, bool) {
// 验证时刻
vt := issued
// 先找 V6 历史:在 (t-lead) 桶内找 v6 的 issued取 +lead @ vt
if iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, v6OutProvider, issued.Add(-time.Duration(lead)*time.Hour)); err == nil && ok {
if pts, err := data.ForecastRainAtIssued(ctx, stationID, v6OutProvider, iss); err == nil && len(pts) >= lead {
if v := pickRain(pts, vt); v >= 0 {
return v, true
}
switch lead {
case 1:
return toMM(pts[0].RainMMx1000), true
case 2:
if len(pts) >= 2 {
return toMM(pts[1].RainMMx1000), true
}
case 3:
if len(pts) >= 3 {
return toMM(pts[2].RainMMx1000), true
}
}
}
}
// 退回 mix 历史
if iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, issued.Add(-time.Duration(lead)*time.Hour)); err == nil && ok {
if pts, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, iss); err == nil && len(pts) >= lead {
if v := pickRain(pts, vt); v >= 0 {
return v, true
}
switch lead {
case 1:
return toMM(pts[0].RainMMx1000), true
case 2:
if len(pts) >= 2 {
return toMM(pts[1].RainMMx1000), true
}
case 3:
if len(pts) >= 3 {
return toMM(pts[2].RainMMx1000), true
}
}
}
}
return 0, false
}
func pickRain(points []data.PredictPoint, ft time.Time) float64 {
for _, p := range points {
if p.ForecastTime.Equal(ft) {
return toMM(p.RainMMx1000)
}
}
return -1
}
func toMM(vx1000 int32) float64 { return float64(vx1000) / 1000.0 }
func toX1000(mm float64) int32 { return int32(mm*1000 + 0.5) }