diff --git a/core/internal/data/metrics.go b/core/internal/data/metrics.go new file mode 100644 index 0000000..06c2a28 --- /dev/null +++ b/core/internal/data/metrics.go @@ -0,0 +1,292 @@ +package data + +import ( + "database/sql" + "fmt" + "time" +) + +type BinaryScores struct { + N int64 `json:"n"` + N11 int64 `json:"n11"` // 预报雨 且 实况雨 + N01 int64 `json:"n01"` // 预报不雨 但 实况雨(漏报) + N10 int64 `json:"n10"` // 预报雨 但 实况不雨(空报) + N00 int64 `json:"n00"` // 预报不雨 且 实况不雨 + CSI float64 `json:"csi"` // Threat Score / Critical Success Index + POD float64 `json:"pod"` // Probability of Detection + FAR float64 `json:"far"` // False Alarm Rate +} + +// ForecastBinaryScores computes contingency counts and TS/POD/FAR over [since, until) by hour. +// provider may be empty -> treat as required provider? We'll require explicit provider; if empty, default to imdroid_mix. +func ForecastBinaryScores(stationID string, since, until time.Time, provider string, thresholdMM float64) (BinaryScores, error) { + if provider == "" { + provider = "imdroid_mix" + } + thrX1000 := int64(thresholdMM * 1000.0) + const q = ` +WITH base AS ( + SELECT bucket_start, rain_10m_mm_x1000 + FROM rs485_weather_10min + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3 +), actual AS ( + SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour, + SUM(rain_10m_mm_x1000)::bigint AS rain_sum + FROM base + GROUP BY 1 +), fc AS ( + SELECT forecast_time, + ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn, + COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000 + FROM forecast_hourly + WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3 +), pair AS ( + SELECT a.hour AS t, + a.rain_sum AS actual_x1000, + COALESCE(f.rain_x1000,0) AS pred_x1000 + FROM actual a + LEFT JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour +) +SELECT + COUNT(*) AS n, + SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n11, + SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n01, + SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n10, + SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n00 +FROM pair` + + row := DB().QueryRow(q, stationID, since, until, provider, thrX1000) + var s BinaryScores + if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil { + if err == sql.ErrNoRows { + return BinaryScores{}, nil + } + return BinaryScores{}, fmt.Errorf("query scores: %w", err) + } + h, m, f := float64(s.N11), float64(s.N01), float64(s.N10) + if (h + m + f) > 0 { + s.CSI = h / (h + m + f) + } + if (h + m) > 0 { + s.POD = h / (h + m) + } + if (h + f) > 0 { + s.FAR = f / (h + f) + } + return s, nil +} + +// ForecastBinaryScoresMixedOnlyForecast computes scores where a forecast exists for that hour (mixed latest), using INNER JOIN on forecast. +func ForecastBinaryScoresMixedOnlyForecast(stationID string, since, until time.Time, provider string, thresholdMM float64) (BinaryScores, error) { + if provider == "" { + provider = "imdroid_mix" + } + thrX1000 := int64(thresholdMM * 1000.0) + const q = ` +WITH base AS ( + SELECT bucket_start, rain_10m_mm_x1000 + FROM rs485_weather_10min + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3 +), actual AS ( + SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour, + SUM(rain_10m_mm_x1000)::bigint AS rain_sum + FROM base + GROUP BY 1 +), fc AS ( + SELECT forecast_time, + ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn, + COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000 + FROM forecast_hourly + WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3 +), pair AS ( + SELECT a.hour AS t, + a.rain_sum AS actual_x1000, + f.rain_x1000 AS pred_x1000 + FROM actual a + INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour +) +SELECT + COUNT(*) AS n, + SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n11, + SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n01, + SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n10, + SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n00 +FROM pair` + row := DB().QueryRow(q, stationID, since, until, provider, thrX1000) + var s BinaryScores + if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil { + if err == sql.ErrNoRows { + return BinaryScores{}, nil + } + return BinaryScores{}, fmt.Errorf("query scores(mixed): %w", err) + } + h, m, f := float64(s.N11), float64(s.N01), float64(s.N10) + if (h + m + f) > 0 { + s.CSI = h / (h + m + f) + } + if (h + m) > 0 { + s.POD = h / (h + m) + } + if (h + f) > 0 { + s.FAR = f / (h + f) + } + return s, nil +} + +// ForecastBinaryScoresLead computes scores for a fixed lead (1/2/3), counting only hours where that lead exists. +func ForecastBinaryScoresLead(stationID string, since, until time.Time, provider string, thresholdMM float64, lead int) (BinaryScores, error) { + if provider == "" { + provider = "imdroid_mix" + } + thrX1000 := int64(thresholdMM * 1000.0) + const q = ` +WITH base AS ( + SELECT bucket_start, rain_10m_mm_x1000 + FROM rs485_weather_10min + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3 +), actual AS ( + SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour, + SUM(rain_10m_mm_x1000)::bigint AS rain_sum + FROM base + GROUP BY 1 +), raw AS ( + SELECT forecast_time, issued_at, + CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours, + COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000 + FROM forecast_hourly + WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3 +), fc AS ( + SELECT forecast_time, rain_x1000, + ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn + FROM raw WHERE lead_hours = $5 +), pair AS ( + SELECT a.hour AS t, + a.rain_sum AS actual_x1000, + f.rain_x1000 AS pred_x1000 + FROM actual a + INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour +) +SELECT + COUNT(*) AS n, + SUM(CASE WHEN pred_x1000 > $6 AND actual_x1000 > $6 THEN 1 ELSE 0 END) AS n11, + SUM(CASE WHEN pred_x1000 <= $6 AND actual_x1000 > $6 THEN 1 ELSE 0 END) AS n01, + SUM(CASE WHEN pred_x1000 > $6 AND actual_x1000 <= $6 THEN 1 ELSE 0 END) AS n10, + SUM(CASE WHEN pred_x1000 <= $6 AND actual_x1000 <= $6 THEN 1 ELSE 0 END) AS n00 +FROM pair` + row := DB().QueryRow(q, stationID, since, until, provider, lead, thrX1000) + var s BinaryScores + if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil { + if err == sql.ErrNoRows { + return BinaryScores{}, nil + } + return BinaryScores{}, fmt.Errorf("query scores(lead): %w", err) + } + h, m, f := float64(s.N11), float64(s.N01), float64(s.N10) + if (h + m + f) > 0 { + s.CSI = h / (h + m + f) + } + if (h + m) > 0 { + s.POD = h / (h + m) + } + if (h + f) > 0 { + s.FAR = f / (h + f) + } + return s, nil +} + +// BinaryRow holds per-hour aligned actual/pred pair for detail view. +type BinaryRow struct { + T time.Time + ActualMM float64 + PredMM float64 +} + +// ForecastBinarySeriesMixed returns per-hour rows for hours with a forecast (mixed latest). +func ForecastBinarySeriesMixed(stationID string, since, until time.Time, provider string) ([]BinaryRow, error) { + if provider == "" { + provider = "imdroid_mix" + } + const q = ` +WITH base AS ( + SELECT bucket_start, rain_10m_mm_x1000 + FROM rs485_weather_10min + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3 +), actual AS ( + SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour, + SUM(rain_10m_mm_x1000)::bigint AS rain_sum + FROM base + GROUP BY 1 +), fc AS ( + SELECT forecast_time, + ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn, + COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000 + FROM forecast_hourly + WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3 +) +SELECT a.hour, a.rain_sum, f.rain_x1000 +FROM actual a +INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour +ORDER BY a.hour` + rows, err := DB().Query(q, stationID, since, until, provider) + if err != nil { + return nil, err + } + defer rows.Close() + var out []BinaryRow + for rows.Next() { + var t time.Time + var ax, px int64 + if err := rows.Scan(&t, &ax, &px); err != nil { + continue + } + out = append(out, BinaryRow{T: t, ActualMM: float64(ax) / 1000.0, PredMM: float64(px) / 1000.0}) + } + return out, nil +} + +// ForecastBinarySeriesLead returns per-hour rows for a fixed lead (latest issued per hour for that lead only). +func ForecastBinarySeriesLead(stationID string, since, until time.Time, provider string, lead int) ([]BinaryRow, error) { + if provider == "" { + provider = "imdroid_mix" + } + const q = ` +WITH base AS ( + SELECT bucket_start, rain_10m_mm_x1000 + FROM rs485_weather_10min + WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3 +), actual AS ( + SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour, + SUM(rain_10m_mm_x1000)::bigint AS rain_sum + FROM base + GROUP BY 1 +), raw AS ( + SELECT forecast_time, issued_at, + CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours, + COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000 + FROM forecast_hourly + WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3 +), fc AS ( + SELECT forecast_time, rain_x1000, + ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn + FROM raw WHERE lead_hours = $5 +) +SELECT a.hour, a.rain_sum, f.rain_x1000 +FROM actual a +INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour +ORDER BY a.hour` + rows, err := DB().Query(q, stationID, since, until, provider, lead) + if err != nil { + return nil, err + } + defer rows.Close() + var out []BinaryRow + for rows.Next() { + var t time.Time + var ax, px int64 + if err := rows.Scan(&t, &ax, &px); err != nil { + continue + } + out = append(out, BinaryRow{T: t, ActualMM: float64(ax) / 1000.0, PredMM: float64(px) / 1000.0}) + } + return out, nil +}