2025-12-01 14:34:39 +08:00

293 lines
10 KiB
Go

package data
import (
"database/sql"
"fmt"
"time"
)
type BinaryScores struct {
N int64 `json:"n"`
N11 int64 `json:"n11"` // 预报雨 且 实况雨
N01 int64 `json:"n01"` // 预报不雨 但 实况雨(漏报)
N10 int64 `json:"n10"` // 预报雨 但 实况不雨(空报)
N00 int64 `json:"n00"` // 预报不雨 且 实况不雨
CSI float64 `json:"csi"` // Threat Score / Critical Success Index
POD float64 `json:"pod"` // Probability of Detection
FAR float64 `json:"far"` // False Alarm Rate
}
// ForecastBinaryScores computes contingency counts and TS/POD/FAR over [since, until) by hour.
// provider may be empty -> treat as required provider? We'll require explicit provider; if empty, default to imdroid_mix.
func ForecastBinaryScores(stationID string, since, until time.Time, provider string, thresholdMM float64) (BinaryScores, error) {
if provider == "" {
provider = "imdroid_mix"
}
thrX1000 := int64(thresholdMM * 1000.0)
const q = `
WITH base AS (
SELECT bucket_start, rain_10m_mm_x1000
FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
), actual AS (
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
FROM base
GROUP BY 1
), fc AS (
SELECT forecast_time,
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
FROM forecast_hourly
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
), pair AS (
SELECT a.hour AS t,
a.rain_sum AS actual_x1000,
COALESCE(f.rain_x1000,0) AS pred_x1000
FROM actual a
LEFT JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
)
SELECT
COUNT(*) AS n,
SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n11,
SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n01,
SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n10,
SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n00
FROM pair`
row := DB().QueryRow(q, stationID, since, until, provider, thrX1000)
var s BinaryScores
if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil {
if err == sql.ErrNoRows {
return BinaryScores{}, nil
}
return BinaryScores{}, fmt.Errorf("query scores: %w", err)
}
h, m, f := float64(s.N11), float64(s.N01), float64(s.N10)
if (h + m + f) > 0 {
s.CSI = h / (h + m + f)
}
if (h + m) > 0 {
s.POD = h / (h + m)
}
if (h + f) > 0 {
s.FAR = f / (h + f)
}
return s, nil
}
// ForecastBinaryScoresMixedOnlyForecast computes scores where a forecast exists for that hour (mixed latest), using INNER JOIN on forecast.
func ForecastBinaryScoresMixedOnlyForecast(stationID string, since, until time.Time, provider string, thresholdMM float64) (BinaryScores, error) {
if provider == "" {
provider = "imdroid_mix"
}
thrX1000 := int64(thresholdMM * 1000.0)
const q = `
WITH base AS (
SELECT bucket_start, rain_10m_mm_x1000
FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
), actual AS (
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
FROM base
GROUP BY 1
), fc AS (
SELECT forecast_time,
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
FROM forecast_hourly
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
), pair AS (
SELECT a.hour AS t,
a.rain_sum AS actual_x1000,
f.rain_x1000 AS pred_x1000
FROM actual a
INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
)
SELECT
COUNT(*) AS n,
SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n11,
SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n01,
SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n10,
SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n00
FROM pair`
row := DB().QueryRow(q, stationID, since, until, provider, thrX1000)
var s BinaryScores
if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil {
if err == sql.ErrNoRows {
return BinaryScores{}, nil
}
return BinaryScores{}, fmt.Errorf("query scores(mixed): %w", err)
}
h, m, f := float64(s.N11), float64(s.N01), float64(s.N10)
if (h + m + f) > 0 {
s.CSI = h / (h + m + f)
}
if (h + m) > 0 {
s.POD = h / (h + m)
}
if (h + f) > 0 {
s.FAR = f / (h + f)
}
return s, nil
}
// ForecastBinaryScoresLead computes scores for a fixed lead (1/2/3), counting only hours where that lead exists.
func ForecastBinaryScoresLead(stationID string, since, until time.Time, provider string, thresholdMM float64, lead int) (BinaryScores, error) {
if provider == "" {
provider = "imdroid_mix"
}
thrX1000 := int64(thresholdMM * 1000.0)
const q = `
WITH base AS (
SELECT bucket_start, rain_10m_mm_x1000
FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
), actual AS (
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
FROM base
GROUP BY 1
), raw AS (
SELECT forecast_time, issued_at,
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours,
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
FROM forecast_hourly
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
), fc AS (
SELECT forecast_time, rain_x1000,
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn
FROM raw WHERE lead_hours = $5
), pair AS (
SELECT a.hour AS t,
a.rain_sum AS actual_x1000,
f.rain_x1000 AS pred_x1000
FROM actual a
INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
)
SELECT
COUNT(*) AS n,
SUM(CASE WHEN pred_x1000 > $6 AND actual_x1000 > $6 THEN 1 ELSE 0 END) AS n11,
SUM(CASE WHEN pred_x1000 <= $6 AND actual_x1000 > $6 THEN 1 ELSE 0 END) AS n01,
SUM(CASE WHEN pred_x1000 > $6 AND actual_x1000 <= $6 THEN 1 ELSE 0 END) AS n10,
SUM(CASE WHEN pred_x1000 <= $6 AND actual_x1000 <= $6 THEN 1 ELSE 0 END) AS n00
FROM pair`
row := DB().QueryRow(q, stationID, since, until, provider, lead, thrX1000)
var s BinaryScores
if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil {
if err == sql.ErrNoRows {
return BinaryScores{}, nil
}
return BinaryScores{}, fmt.Errorf("query scores(lead): %w", err)
}
h, m, f := float64(s.N11), float64(s.N01), float64(s.N10)
if (h + m + f) > 0 {
s.CSI = h / (h + m + f)
}
if (h + m) > 0 {
s.POD = h / (h + m)
}
if (h + f) > 0 {
s.FAR = f / (h + f)
}
return s, nil
}
// BinaryRow holds per-hour aligned actual/pred pair for detail view.
type BinaryRow struct {
T time.Time
ActualMM float64
PredMM float64
}
// ForecastBinarySeriesMixed returns per-hour rows for hours with a forecast (mixed latest).
func ForecastBinarySeriesMixed(stationID string, since, until time.Time, provider string) ([]BinaryRow, error) {
if provider == "" {
provider = "imdroid_mix"
}
const q = `
WITH base AS (
SELECT bucket_start, rain_10m_mm_x1000
FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
), actual AS (
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
FROM base
GROUP BY 1
), fc AS (
SELECT forecast_time,
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
FROM forecast_hourly
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
)
SELECT a.hour, a.rain_sum, f.rain_x1000
FROM actual a
INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
ORDER BY a.hour`
rows, err := DB().Query(q, stationID, since, until, provider)
if err != nil {
return nil, err
}
defer rows.Close()
var out []BinaryRow
for rows.Next() {
var t time.Time
var ax, px int64
if err := rows.Scan(&t, &ax, &px); err != nil {
continue
}
out = append(out, BinaryRow{T: t, ActualMM: float64(ax) / 1000.0, PredMM: float64(px) / 1000.0})
}
return out, nil
}
// ForecastBinarySeriesLead returns per-hour rows for a fixed lead (latest issued per hour for that lead only).
func ForecastBinarySeriesLead(stationID string, since, until time.Time, provider string, lead int) ([]BinaryRow, error) {
if provider == "" {
provider = "imdroid_mix"
}
const q = `
WITH base AS (
SELECT bucket_start, rain_10m_mm_x1000
FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
), actual AS (
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
FROM base
GROUP BY 1
), raw AS (
SELECT forecast_time, issued_at,
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours,
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
FROM forecast_hourly
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
), fc AS (
SELECT forecast_time, rain_x1000,
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn
FROM raw WHERE lead_hours = $5
)
SELECT a.hour, a.rain_sum, f.rain_x1000
FROM actual a
INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
ORDER BY a.hour`
rows, err := DB().Query(q, stationID, since, until, provider, lead)
if err != nil {
return nil, err
}
defer rows.Close()
var out []BinaryRow
for rows.Next() {
var t time.Time
var ax, px int64
if err := rows.Scan(&t, &ax, &px); err != nil {
continue
}
out = append(out, BinaryRow{T: t, ActualMM: float64(ax) / 1000.0, PredMM: float64(px) / 1000.0})
}
return out, nil
}