feat:支持 TS 评分
This commit is contained in:
parent
da75f4a93e
commit
13c181f5f7
292
core/internal/data/metrics.go
Normal file
292
core/internal/data/metrics.go
Normal file
@ -0,0 +1,292 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BinaryScores struct {
|
||||
N int64 `json:"n"`
|
||||
N11 int64 `json:"n11"` // 预报雨 且 实况雨
|
||||
N01 int64 `json:"n01"` // 预报不雨 但 实况雨(漏报)
|
||||
N10 int64 `json:"n10"` // 预报雨 但 实况不雨(空报)
|
||||
N00 int64 `json:"n00"` // 预报不雨 且 实况不雨
|
||||
CSI float64 `json:"csi"` // Threat Score / Critical Success Index
|
||||
POD float64 `json:"pod"` // Probability of Detection
|
||||
FAR float64 `json:"far"` // False Alarm Rate
|
||||
}
|
||||
|
||||
// ForecastBinaryScores computes contingency counts and TS/POD/FAR over [since, until) by hour.
|
||||
// provider may be empty -> treat as required provider? We'll require explicit provider; if empty, default to imdroid_mix.
|
||||
func ForecastBinaryScores(stationID string, since, until time.Time, provider string, thresholdMM float64) (BinaryScores, error) {
|
||||
if provider == "" {
|
||||
provider = "imdroid_mix"
|
||||
}
|
||||
thrX1000 := int64(thresholdMM * 1000.0)
|
||||
const q = `
|
||||
WITH base AS (
|
||||
SELECT bucket_start, rain_10m_mm_x1000
|
||||
FROM rs485_weather_10min
|
||||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
|
||||
), actual AS (
|
||||
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
|
||||
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
|
||||
FROM base
|
||||
GROUP BY 1
|
||||
), fc AS (
|
||||
SELECT forecast_time,
|
||||
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
|
||||
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
|
||||
FROM forecast_hourly
|
||||
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
|
||||
), pair AS (
|
||||
SELECT a.hour AS t,
|
||||
a.rain_sum AS actual_x1000,
|
||||
COALESCE(f.rain_x1000,0) AS pred_x1000
|
||||
FROM actual a
|
||||
LEFT JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
|
||||
)
|
||||
SELECT
|
||||
COUNT(*) AS n,
|
||||
SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n11,
|
||||
SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n01,
|
||||
SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n10,
|
||||
SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n00
|
||||
FROM pair`
|
||||
|
||||
row := DB().QueryRow(q, stationID, since, until, provider, thrX1000)
|
||||
var s BinaryScores
|
||||
if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return BinaryScores{}, nil
|
||||
}
|
||||
return BinaryScores{}, fmt.Errorf("query scores: %w", err)
|
||||
}
|
||||
h, m, f := float64(s.N11), float64(s.N01), float64(s.N10)
|
||||
if (h + m + f) > 0 {
|
||||
s.CSI = h / (h + m + f)
|
||||
}
|
||||
if (h + m) > 0 {
|
||||
s.POD = h / (h + m)
|
||||
}
|
||||
if (h + f) > 0 {
|
||||
s.FAR = f / (h + f)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// ForecastBinaryScoresMixedOnlyForecast computes scores where a forecast exists for that hour (mixed latest), using INNER JOIN on forecast.
|
||||
func ForecastBinaryScoresMixedOnlyForecast(stationID string, since, until time.Time, provider string, thresholdMM float64) (BinaryScores, error) {
|
||||
if provider == "" {
|
||||
provider = "imdroid_mix"
|
||||
}
|
||||
thrX1000 := int64(thresholdMM * 1000.0)
|
||||
const q = `
|
||||
WITH base AS (
|
||||
SELECT bucket_start, rain_10m_mm_x1000
|
||||
FROM rs485_weather_10min
|
||||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
|
||||
), actual AS (
|
||||
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
|
||||
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
|
||||
FROM base
|
||||
GROUP BY 1
|
||||
), fc AS (
|
||||
SELECT forecast_time,
|
||||
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
|
||||
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
|
||||
FROM forecast_hourly
|
||||
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
|
||||
), pair AS (
|
||||
SELECT a.hour AS t,
|
||||
a.rain_sum AS actual_x1000,
|
||||
f.rain_x1000 AS pred_x1000
|
||||
FROM actual a
|
||||
INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
|
||||
)
|
||||
SELECT
|
||||
COUNT(*) AS n,
|
||||
SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n11,
|
||||
SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 > $5 THEN 1 ELSE 0 END) AS n01,
|
||||
SUM(CASE WHEN pred_x1000 > $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n10,
|
||||
SUM(CASE WHEN pred_x1000 <= $5 AND actual_x1000 <= $5 THEN 1 ELSE 0 END) AS n00
|
||||
FROM pair`
|
||||
row := DB().QueryRow(q, stationID, since, until, provider, thrX1000)
|
||||
var s BinaryScores
|
||||
if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return BinaryScores{}, nil
|
||||
}
|
||||
return BinaryScores{}, fmt.Errorf("query scores(mixed): %w", err)
|
||||
}
|
||||
h, m, f := float64(s.N11), float64(s.N01), float64(s.N10)
|
||||
if (h + m + f) > 0 {
|
||||
s.CSI = h / (h + m + f)
|
||||
}
|
||||
if (h + m) > 0 {
|
||||
s.POD = h / (h + m)
|
||||
}
|
||||
if (h + f) > 0 {
|
||||
s.FAR = f / (h + f)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// ForecastBinaryScoresLead computes scores for a fixed lead (1/2/3), counting only hours where that lead exists.
|
||||
func ForecastBinaryScoresLead(stationID string, since, until time.Time, provider string, thresholdMM float64, lead int) (BinaryScores, error) {
|
||||
if provider == "" {
|
||||
provider = "imdroid_mix"
|
||||
}
|
||||
thrX1000 := int64(thresholdMM * 1000.0)
|
||||
const q = `
|
||||
WITH base AS (
|
||||
SELECT bucket_start, rain_10m_mm_x1000
|
||||
FROM rs485_weather_10min
|
||||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
|
||||
), actual AS (
|
||||
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
|
||||
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
|
||||
FROM base
|
||||
GROUP BY 1
|
||||
), raw AS (
|
||||
SELECT forecast_time, issued_at,
|
||||
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours,
|
||||
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
|
||||
FROM forecast_hourly
|
||||
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
|
||||
), fc AS (
|
||||
SELECT forecast_time, rain_x1000,
|
||||
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn
|
||||
FROM raw WHERE lead_hours = $5
|
||||
), pair AS (
|
||||
SELECT a.hour AS t,
|
||||
a.rain_sum AS actual_x1000,
|
||||
f.rain_x1000 AS pred_x1000
|
||||
FROM actual a
|
||||
INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
|
||||
)
|
||||
SELECT
|
||||
COUNT(*) AS n,
|
||||
SUM(CASE WHEN pred_x1000 > $6 AND actual_x1000 > $6 THEN 1 ELSE 0 END) AS n11,
|
||||
SUM(CASE WHEN pred_x1000 <= $6 AND actual_x1000 > $6 THEN 1 ELSE 0 END) AS n01,
|
||||
SUM(CASE WHEN pred_x1000 > $6 AND actual_x1000 <= $6 THEN 1 ELSE 0 END) AS n10,
|
||||
SUM(CASE WHEN pred_x1000 <= $6 AND actual_x1000 <= $6 THEN 1 ELSE 0 END) AS n00
|
||||
FROM pair`
|
||||
row := DB().QueryRow(q, stationID, since, until, provider, lead, thrX1000)
|
||||
var s BinaryScores
|
||||
if err := row.Scan(&s.N, &s.N11, &s.N01, &s.N10, &s.N00); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return BinaryScores{}, nil
|
||||
}
|
||||
return BinaryScores{}, fmt.Errorf("query scores(lead): %w", err)
|
||||
}
|
||||
h, m, f := float64(s.N11), float64(s.N01), float64(s.N10)
|
||||
if (h + m + f) > 0 {
|
||||
s.CSI = h / (h + m + f)
|
||||
}
|
||||
if (h + m) > 0 {
|
||||
s.POD = h / (h + m)
|
||||
}
|
||||
if (h + f) > 0 {
|
||||
s.FAR = f / (h + f)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// BinaryRow holds per-hour aligned actual/pred pair for detail view.
|
||||
type BinaryRow struct {
|
||||
T time.Time
|
||||
ActualMM float64
|
||||
PredMM float64
|
||||
}
|
||||
|
||||
// ForecastBinarySeriesMixed returns per-hour rows for hours with a forecast (mixed latest).
|
||||
func ForecastBinarySeriesMixed(stationID string, since, until time.Time, provider string) ([]BinaryRow, error) {
|
||||
if provider == "" {
|
||||
provider = "imdroid_mix"
|
||||
}
|
||||
const q = `
|
||||
WITH base AS (
|
||||
SELECT bucket_start, rain_10m_mm_x1000
|
||||
FROM rs485_weather_10min
|
||||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
|
||||
), actual AS (
|
||||
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
|
||||
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
|
||||
FROM base
|
||||
GROUP BY 1
|
||||
), fc AS (
|
||||
SELECT forecast_time,
|
||||
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn,
|
||||
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
|
||||
FROM forecast_hourly
|
||||
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
|
||||
)
|
||||
SELECT a.hour, a.rain_sum, f.rain_x1000
|
||||
FROM actual a
|
||||
INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
|
||||
ORDER BY a.hour`
|
||||
rows, err := DB().Query(q, stationID, since, until, provider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []BinaryRow
|
||||
for rows.Next() {
|
||||
var t time.Time
|
||||
var ax, px int64
|
||||
if err := rows.Scan(&t, &ax, &px); err != nil {
|
||||
continue
|
||||
}
|
||||
out = append(out, BinaryRow{T: t, ActualMM: float64(ax) / 1000.0, PredMM: float64(px) / 1000.0})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ForecastBinarySeriesLead returns per-hour rows for a fixed lead (latest issued per hour for that lead only).
|
||||
func ForecastBinarySeriesLead(stationID string, since, until time.Time, provider string, lead int) ([]BinaryRow, error) {
|
||||
if provider == "" {
|
||||
provider = "imdroid_mix"
|
||||
}
|
||||
const q = `
|
||||
WITH base AS (
|
||||
SELECT bucket_start, rain_10m_mm_x1000
|
||||
FROM rs485_weather_10min
|
||||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
|
||||
), actual AS (
|
||||
SELECT date_trunc('hour', bucket_start) + interval '1 hour' AS hour,
|
||||
SUM(rain_10m_mm_x1000)::bigint AS rain_sum
|
||||
FROM base
|
||||
GROUP BY 1
|
||||
), raw AS (
|
||||
SELECT forecast_time, issued_at,
|
||||
CEIL(EXTRACT(EPOCH FROM (forecast_time - issued_at)) / 3600.0)::int AS lead_hours,
|
||||
COALESCE(rain_mm_x1000,0)::bigint AS rain_x1000
|
||||
FROM forecast_hourly
|
||||
WHERE station_id = $1 AND provider = $4 AND forecast_time >= $2 AND forecast_time < $3
|
||||
), fc AS (
|
||||
SELECT forecast_time, rain_x1000,
|
||||
ROW_NUMBER() OVER (PARTITION BY forecast_time ORDER BY issued_at DESC) AS rn
|
||||
FROM raw WHERE lead_hours = $5
|
||||
)
|
||||
SELECT a.hour, a.rain_sum, f.rain_x1000
|
||||
FROM actual a
|
||||
INNER JOIN (SELECT forecast_time, rain_x1000 FROM fc WHERE rn=1) f ON f.forecast_time = a.hour
|
||||
ORDER BY a.hour`
|
||||
rows, err := DB().Query(q, stationID, since, until, provider, lead)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []BinaryRow
|
||||
for rows.Next() {
|
||||
var t time.Time
|
||||
var ax, px int64
|
||||
if err := rows.Scan(&t, &ax, &px); err != nil {
|
||||
continue
|
||||
}
|
||||
out = append(out, BinaryRow{T: t, ActualMM: float64(ax) / 1000.0, PredMM: float64(px) / 1000.0})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user