272 lines
7.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"flag"
"fmt"
"log"
"strings"
"time"
"weatherstation/core/internal/data"
)
const (
baseProvider = "imdroid_mix"
outProvider = "imdroid_V5"
)
func main() {
var stationsCSV, issuedStr, startStr, endStr, tzName string
flag.StringVar(&stationsCSV, "stations", "", "逗号分隔的 station_id 列表;为空则自动扫描有基线的站点")
flag.StringVar(&issuedStr, "issued", "", "指定 issued 时间(整点),格式: 2006-01-02 15:00为空用当前整点")
flag.StringVar(&startStr, "start", "", "开始时间(整点),格式: 2006-01-02 15:00与 --end 一起使用end 为开区间")
flag.StringVar(&endStr, "end", "", "结束时间(整点),格式: 2006-01-02 15:00与 --start 一起使用end 为开区间")
flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai")
flag.Parse()
ctx := context.Background()
loc, _ := time.LoadLocation(tzName)
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
parse := func(s string) (time.Time, error) {
var t time.Time
var err error
for _, ly := range []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} {
t, err = time.ParseInLocation(ly, s, loc)
if err == nil {
return t.Truncate(time.Hour), nil
}
}
return time.Time{}, err
}
// Determine mode: single issued or range
if strings.TrimSpace(startStr) != "" && strings.TrimSpace(endStr) != "" {
start, err := parse(startStr)
if err != nil {
log.Fatalf("无法解析 start: %v", err)
}
end, err := parse(endStr)
if err != nil {
log.Fatalf("无法解析 end: %v", err)
}
if !end.After(start) {
log.Fatalf("end 必须大于 start")
}
for t := start; t.Before(end); t = t.Add(time.Hour) {
var stations []string
if strings.TrimSpace(stationsCSV) != "" {
stations = splitStations(stationsCSV)
} else {
var err error
stations, err = listStationsWithBase(ctx, baseProvider, t)
if err != nil {
log.Fatalf("list stations failed: %v", err)
}
}
if len(stations) == 0 {
log.Printf("no stations to process for issued=%s", t.Format("2006-01-02 15:04:05"))
continue
}
for _, st := range stations {
if err := runForStation(ctx, st, t); err != nil {
log.Printf("V5 station=%s issued=%s error: %v", st, t.Format("2006-01-02 15:04:05"), err)
}
}
}
return
}
// Single issued
var issued time.Time
if strings.TrimSpace(issuedStr) != "" {
var err error
issued, err = parse(issuedStr)
if err != nil || issued.IsZero() {
log.Fatalf("无法解析 issued: %v", err)
}
} else {
issued = time.Now().In(loc).Truncate(time.Hour)
}
var stations []string
if strings.TrimSpace(stationsCSV) != "" {
stations = splitStations(stationsCSV)
} else {
var err error
stations, err = listStationsWithBase(ctx, baseProvider, issued)
if err != nil {
log.Fatalf("list stations failed: %v", err)
}
}
if len(stations) == 0 {
log.Printf("no stations to process for issued=%s", issued.Format("2006-01-02 15:04:05"))
return
}
for _, st := range stations {
if err := runForStation(ctx, st, issued); err != nil {
log.Printf("V5 station=%s error: %v", st, err)
}
}
}
func listStationsWithBase(ctx context.Context, provider string, issued time.Time) ([]string, error) {
const q = `
SELECT DISTINCT station_id
FROM forecast_hourly
WHERE provider=$1 AND issued_at >= $2 AND issued_at < $2 + interval '1 hour'`
rows, err := data.DB().QueryContext(ctx, q, provider, issued)
if err != nil {
return nil, err
}
defer rows.Close()
var out []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err == nil {
out = append(out, id)
}
}
return out, nil
}
func splitStations(s string) []string {
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}
func runForStation(ctx context.Context, stationID string, issued time.Time) error {
// 解析当前 issued 桶内的基础源发布时间(取最后一条)
baseIssued, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, issued)
if err != nil || !ok {
return fmt.Errorf("resolve base issued failed: %v ok=%v", err, ok)
}
basePoints, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, baseIssued)
if err != nil || len(basePoints) < 3 {
return fmt.Errorf("load base points failed: %v len=%d", err, len(basePoints))
}
// 取有效时间
ft1 := issued.Add(1 * time.Hour)
ft2 := issued.Add(2 * time.Hour)
ft3 := issued.Add(3 * time.Hour)
base1, base2, base3 := pickRain(basePoints, ft1), pickRain(basePoints, ft2), pickRain(basePoints, ft3)
// 计算三个 horizon 的偏差:
// r1 = 实况[issued-1,issued) - (issued-1 的 +1)
// r2 = 实况[issued-1,issued) - (issued-2 的 +2)
// r3 = 实况[issued-1,issued) - (issued-3 的 +3)
actual, okA, err := data.FetchActualHourlyRain(ctx, stationID, issued.Add(-time.Hour), issued)
if err != nil || !okA {
return fmt.Errorf("actual not ready: %v ok=%v", err, okA)
}
p1, err := pickPrevPredict(ctx, stationID, issued.Add(-1*time.Hour), 1, issued)
if err != nil {
return err
}
p2, err := pickPrevPredict(ctx, stationID, issued.Add(-2*time.Hour), 2, issued)
if err != nil {
return err
}
p3, err := pickPrevPredict(ctx, stationID, issued.Add(-3*time.Hour), 3, issued)
if err != nil {
return err
}
r1 := actual - p1
r2 := actual - p2
r3 := actual - p3
// Apply baseline-fallback if negative for all leads
cand1 := base1 + 1.0*r1
cand2 := base2 + 0.5*r2
cand3 := base3 + (1.0/3.0)*r3
var out1, out2, out3 float64
if cand1 < 0 {
out1 = base1
} else {
out1 = cand1
}
if cand2 < 0 {
out2 = base2
} else {
out2 = cand2
}
if cand3 < 0 {
out3 = base3
} else {
out3 = cand3
}
items := []data.UpsertRainItem{
{ForecastTime: ft1, RainMMx1000: toX1000(out1)},
{ForecastTime: ft2, RainMMx1000: toX1000(out2)},
{ForecastTime: ft3, RainMMx1000: toX1000(out3)},
}
if err := data.UpsertForecastRain(ctx, stationID, outProvider, issued, items); err != nil {
return err
}
log.Printf("V5 %s issued=%s base=[%.3f,%.3f,%.3f] actual=%.3f prev=[%.3f,%.3f,%.3f] out=[%.3f,%.3f,%.3f]",
stationID, issued.Format("2006-01-02 15:04:05"),
base1, base2, base3, actual, p1, p2, p3, out1, out2, out3,
)
return nil
}
func pickPrevPredict(ctx context.Context, stationID string, prevBucket time.Time, lead int, validFT time.Time) (float64, error) {
iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, prevBucket)
if err != nil || !ok {
return 0, fmt.Errorf("resolve prev issued fail bucket=%s: %v ok=%v", prevBucket, err, ok)
}
pts, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, iss)
if err != nil || len(pts) < lead {
return 0, fmt.Errorf("load prev points fail lead=%d: %v len=%d", lead, err, len(pts))
}
// 直接按 validFT 精确匹配(容错:若不存在则按 lead 取第 lead 个)
if v := pickRain(pts, validFT); v >= 0 {
return v, nil
}
switch lead {
case 1:
return toMM(pts[0].RainMMx1000), nil
case 2:
if len(pts) >= 2 {
return toMM(pts[1].RainMMx1000), nil
}
case 3:
if len(pts) >= 3 {
return toMM(pts[2].RainMMx1000), nil
}
}
return 0, fmt.Errorf("insufficient points for lead=%d", lead)
}
func pickRain(points []data.PredictPoint, ft time.Time) float64 {
for _, p := range points {
if p.ForecastTime.Equal(ft) {
return toMM(p.RainMMx1000)
}
}
return -1
}
func toMM(vx1000 int32) float64 { return float64(vx1000) / 1000.0 }
func toX1000(mm float64) int32 { return int32(mm*1000 + 0.5) }
func clamp0(v float64) float64 {
if v < 0 {
return 0
}
return v
}