783 lines
22 KiB
Go
783 lines
22 KiB
Go
package fusion
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"encoding/binary"
|
||
"fmt"
|
||
"log"
|
||
"math"
|
||
"sort"
|
||
"strings"
|
||
"time"
|
||
|
||
"weatherstation/internal/database"
|
||
)
|
||
|
||
const (
|
||
providerOpenMeteo = "open-meteo"
|
||
providerCaiyun = "caiyun"
|
||
providerImdroid = "imdroid"
|
||
outputProvider = "imdroid_mix"
|
||
)
|
||
|
||
var defaultTriad = database.Triad{OpenMeteo: 0.4, Caiyun: 0.3, Imdroid: 0.3}
|
||
|
||
// RunForIssued runs fusion for all stations at the given issued (hour bucket, CST).
|
||
// It persists weights into forecast_weights_current and fused rows into forecast_hourly.
|
||
func RunForIssued(ctx context.Context, issued time.Time) error {
|
||
db := database.GetDB()
|
||
stations, err := loadStations(ctx, db)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, st := range stations {
|
||
if err := runForStation(ctx, db, st, issued); err != nil {
|
||
log.Printf("fusion: station=%s issued=%s error=%v", st.ID, issued.Format(time.RFC3339), err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
type stationInfo struct {
|
||
ID string
|
||
Alias string
|
||
Lat sql.NullFloat64
|
||
Lon sql.NullFloat64
|
||
Z sql.NullInt64
|
||
Y sql.NullInt64
|
||
X sql.NullInt64
|
||
}
|
||
|
||
func loadStations(ctx context.Context, db *sql.DB) ([]stationInfo, error) {
|
||
rows, err := db.QueryContext(ctx, `
|
||
SELECT station_id,
|
||
COALESCE(NULLIF(station_alias,''), station_id) AS alias,
|
||
latitude, longitude, z, y, x
|
||
FROM stations
|
||
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
|
||
ORDER BY station_id`)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
var list []stationInfo
|
||
for rows.Next() {
|
||
var s stationInfo
|
||
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil {
|
||
return nil, err
|
||
}
|
||
list = append(list, s)
|
||
}
|
||
return list, nil
|
||
}
|
||
|
||
type providerMatrix struct {
|
||
Provider string
|
||
Samples []forecastSample
|
||
}
|
||
|
||
type forecastSample struct {
|
||
Rain float64
|
||
Temp float64
|
||
Humidity float64
|
||
WindSpeed float64
|
||
WindGust float64
|
||
WindDir float64
|
||
Prob float64
|
||
Pressure float64
|
||
UV float64
|
||
}
|
||
|
||
func runForStation(ctx context.Context, db *sql.DB, st stationInfo, issued time.Time) error {
|
||
// Log header
|
||
log.Printf("")
|
||
log.Printf("issue %s station=%s", issued.Format("2006-01-02 15:04:05"), st.ID)
|
||
|
||
// Presence at issued
|
||
present := presentProviders(db, st.ID, issued)
|
||
presOrdered := orderByProvidersAll(present)
|
||
added, removed := diffPresence(db, st.ID, present)
|
||
if len(added) > 0 || len(removed) > 0 {
|
||
log.Printf("sources %v added=%v removed=%v", presOrdered, added, removed)
|
||
}
|
||
|
||
// Load previous triad snapshot or default
|
||
triad, _, ok, _ := database.GetWeightsCurrent(ctx, st.ID)
|
||
if !ok {
|
||
triad = defaultTriad
|
||
}
|
||
|
||
// Apply missing/new rules
|
||
triad = applyMissingTransfer(triad, present)
|
||
triad = applyNewAllocation(triad, present)
|
||
|
||
// Learning (I-1 vs (I-2)+1)
|
||
learnStart := issued.Add(-time.Hour)
|
||
learnEnd := issued
|
||
actual, actualOK, err := fetchActualHourlyRain(db, st.ID, learnStart, learnEnd)
|
||
if err != nil {
|
||
log.Printf("fusion: actual error station=%s issued=%s err=%v", st.ID, issued.Format(time.RFC3339), err)
|
||
} else if actualOK {
|
||
prevIssue := issued.Add(-2 * time.Hour)
|
||
preds, names := lead1PredsAt(db, st.ID, prevIssue, presOrdered)
|
||
log.Printf("在 [%s,%s) ,该小时本站实况雨量累计为:%.3f,预报源在 issue=%s 发布的 +1h %s-%s 预报值是 %s",
|
||
learnStart.Format("15:04"), learnEnd.Format("15:04"), actual,
|
||
prevIssue.Format("15:04"), learnStart.Format("15:04"), learnEnd.Format("15:04"),
|
||
formatFloatSlice(preds))
|
||
triad = learnTriad(triad, names, preds, actual)
|
||
}
|
||
|
||
// Build matrix samples for issued (providers present only)
|
||
matrix := buildMatrix(db, st.ID, issued, presOrdered)
|
||
|
||
// Effective weights (ordered by presOrdered)
|
||
effBefore := triadToSlice(triad, presOrdered)
|
||
eff := effBefore // learning already applied
|
||
log.Printf("学习前权重 %s ,本轮使用权重 %s", toNamed(effBefore, presOrdered), toNamed(eff, presOrdered))
|
||
|
||
// Fuse with mask
|
||
fused := fuseForecastsWith(matrix, eff)
|
||
preMask := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain}
|
||
maskApplied := false
|
||
// Radar mask disabled by request: skip scanning tiles and altering rain.
|
||
// if hasMaskCtx(st) { ... }
|
||
postMask := preMask
|
||
log.Printf("预报 %s , mask(disabled)=%v , 融合的结果 %s", formatFloatSlice(preMask), maskApplied, formatFloatSlice(postMask))
|
||
|
||
// Persist fused rows
|
||
if err := upsertFused(ctx, db, st.ID, issued, fused); err != nil {
|
||
return err
|
||
}
|
||
// Persist triad snapshot for next hour
|
||
if err := database.UpsertWeightsCurrent(ctx, st.ID, triad, issued); err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func formatFloatSlice(vs []float64) string {
|
||
parts := make([]string, len(vs))
|
||
for i, v := range vs {
|
||
parts[i] = fmt.Sprintf("%.2f", v)
|
||
}
|
||
return "[" + strings.Join(parts, ",") + "]"
|
||
}
|
||
|
||
func toNamed(ws []float64, names []string) string {
|
||
parts := make([]string, len(ws))
|
||
for i := range ws {
|
||
parts[i] = fmt.Sprintf("%s=%.2f", names[i], ws[i])
|
||
}
|
||
return strings.Join(parts, ", ")
|
||
}
|
||
|
||
func triadToSlice(t database.Triad, names []string) []float64 {
|
||
out := make([]float64, len(names))
|
||
for i, n := range names {
|
||
switch n {
|
||
case providerOpenMeteo:
|
||
out[i] = t.OpenMeteo
|
||
case providerCaiyun:
|
||
out[i] = t.Caiyun
|
||
case providerImdroid:
|
||
out[i] = t.Imdroid
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func sliceToTriad(vals []float64, names []string) database.Triad {
|
||
t := database.Triad{}
|
||
for i, n := range names {
|
||
switch n {
|
||
case providerOpenMeteo:
|
||
t.OpenMeteo = vals[i]
|
||
case providerCaiyun:
|
||
t.Caiyun = vals[i]
|
||
case providerImdroid:
|
||
t.Imdroid = vals[i]
|
||
}
|
||
}
|
||
return t
|
||
}
|
||
|
||
func presentProviders(db *sql.DB, stationID string, issued time.Time) map[string]bool {
|
||
out := map[string]bool{}
|
||
// Check providers within [issued, issued+1h)
|
||
for _, p := range []string{providerOpenMeteo, providerCaiyun, providerImdroid} {
|
||
var cnt int
|
||
_ = db.QueryRow(`SELECT COUNT(*) FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour'`, stationID, p, issued).Scan(&cnt)
|
||
if cnt > 0 {
|
||
out[p] = true
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func orderByProvidersAll(present map[string]bool) []string {
|
||
base := []string{providerOpenMeteo, providerCaiyun, providerImdroid}
|
||
out := make([]string, 0, len(present))
|
||
for _, p := range base {
|
||
if present[p] {
|
||
out = append(out, p)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func diffPresence(db *sql.DB, stationID string, present map[string]bool) (added, removed []string) {
|
||
// Use last hour presence from forecast_hourly as proxy
|
||
prev := map[string]bool{}
|
||
// naive: if provider had any row at issued-1h bucket, consider present
|
||
// We don't know issued here; caller logs only.
|
||
// Return stable order using base sequence.
|
||
base := []string{providerOpenMeteo, providerCaiyun, providerImdroid}
|
||
for _, p := range base {
|
||
if present[p] && !prev[p] {
|
||
added = append(added, p)
|
||
}
|
||
if !present[p] && prev[p] {
|
||
removed = append(removed, p)
|
||
}
|
||
}
|
||
return added, removed
|
||
}
|
||
|
||
func applyMissingTransfer(t database.Triad, present map[string]bool) database.Triad {
|
||
// Sum of missing weights
|
||
missing := 0.0
|
||
if !present[providerOpenMeteo] {
|
||
missing += t.OpenMeteo
|
||
t.OpenMeteo = 0
|
||
}
|
||
if !present[providerCaiyun] {
|
||
missing += t.Caiyun
|
||
t.Caiyun = 0
|
||
}
|
||
if !present[providerImdroid] {
|
||
missing += t.Imdroid
|
||
t.Imdroid = 0
|
||
}
|
||
if missing == 0 {
|
||
return t
|
||
}
|
||
// Give to current max among present
|
||
maxP := ""
|
||
maxW := -1.0
|
||
for p, ok := range present {
|
||
if !ok {
|
||
continue
|
||
}
|
||
w := 0.0
|
||
switch p {
|
||
case providerOpenMeteo:
|
||
w = t.OpenMeteo
|
||
case providerCaiyun:
|
||
w = t.Caiyun
|
||
case providerImdroid:
|
||
w = t.Imdroid
|
||
}
|
||
if w > maxW {
|
||
maxW = w
|
||
maxP = p
|
||
}
|
||
}
|
||
switch maxP {
|
||
case providerOpenMeteo:
|
||
t.OpenMeteo += missing
|
||
case providerCaiyun:
|
||
t.Caiyun += missing
|
||
case providerImdroid:
|
||
t.Imdroid += missing
|
||
}
|
||
return t
|
||
}
|
||
|
||
func applyNewAllocation(t database.Triad, present map[string]bool) database.Triad {
|
||
// allocate 0.2 from current max to any newly present which currently has 0
|
||
for _, p := range []string{providerOpenMeteo, providerCaiyun, providerImdroid} {
|
||
// we don't track prior presence; heuristic: if present and weight==0 allocate
|
||
if present[p] && getW(t, p) <= 0 {
|
||
// find current max (excluding p)
|
||
type pair struct {
|
||
name string
|
||
w float64
|
||
}
|
||
arr := []pair{{providerOpenMeteo, t.OpenMeteo}, {providerCaiyun, t.Caiyun}, {providerImdroid, t.Imdroid}}
|
||
sort.Slice(arr, func(i, j int) bool { return arr[i].w > arr[j].w })
|
||
src := arr[0]
|
||
if src.name == p && len(arr) > 1 {
|
||
src = arr[1]
|
||
}
|
||
delta := 0.2
|
||
if src.w < delta {
|
||
delta = src.w
|
||
}
|
||
t = setW(t, src.name, src.w-delta)
|
||
t = setW(t, p, getW(t, p)+delta)
|
||
}
|
||
}
|
||
return t
|
||
}
|
||
|
||
func getW(t database.Triad, p string) float64 {
|
||
switch p {
|
||
case providerOpenMeteo:
|
||
return t.OpenMeteo
|
||
case providerCaiyun:
|
||
return t.Caiyun
|
||
case providerImdroid:
|
||
return t.Imdroid
|
||
}
|
||
return 0
|
||
}
|
||
func setW(t database.Triad, p string, v float64) database.Triad {
|
||
switch p {
|
||
case providerOpenMeteo:
|
||
t.OpenMeteo = v
|
||
case providerCaiyun:
|
||
t.Caiyun = v
|
||
case providerImdroid:
|
||
t.Imdroid = v
|
||
}
|
||
return t
|
||
}
|
||
|
||
func learnTriad(t database.Triad, names []string, preds []float64, actual float64) database.Triad {
|
||
if len(names) != len(preds) || len(names) == 0 {
|
||
return t
|
||
}
|
||
// find best/worst among names by absolute error
|
||
best, worst := 0, 0
|
||
for i := range preds {
|
||
if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
|
||
best = i
|
||
}
|
||
if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) {
|
||
worst = i
|
||
}
|
||
}
|
||
// log absolute errors
|
||
parts := make([]string, 0, len(names))
|
||
for i := range names {
|
||
parts = append(parts, fmt.Sprintf("%s |%.2f-%.3f|=%.3f", names[i], preds[i], actual, math.Abs(preds[i]-actual)))
|
||
}
|
||
log.Printf("绝对误差:%s → best=%s,worst=%s。", strings.Join(parts, ","), names[best], names[worst])
|
||
|
||
// apply ±0.1 (cap by available on worst)
|
||
alpha := 0.1
|
||
worstW := getW(t, names[worst])
|
||
delta := alpha
|
||
if worstW < delta {
|
||
delta = worstW
|
||
}
|
||
if delta <= 0 {
|
||
log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatFloatSlice(triadToSlice(t, []string{providerOpenMeteo, providerCaiyun, providerImdroid})))
|
||
return t
|
||
}
|
||
t = setW(t, names[worst], worstW-delta)
|
||
t = setW(t, names[best], getW(t, names[best])+delta)
|
||
log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatFloatSlice(triadToSlice(t, []string{providerOpenMeteo, providerCaiyun, providerImdroid})))
|
||
return t
|
||
}
|
||
|
||
func buildMatrix(db *sql.DB, stationID string, issued time.Time, ordered []string) []providerMatrix {
|
||
out := make([]providerMatrix, 0, len(ordered))
|
||
for _, p := range ordered {
|
||
iss, ok, err := resolveIssuedAtInBucket(db, stationID, p, issued)
|
||
if err != nil || !ok {
|
||
continue
|
||
}
|
||
samples, err := loadForecastSamples(db, stationID, p, iss)
|
||
if err != nil || len(samples) < 3 {
|
||
continue
|
||
}
|
||
out = append(out, providerMatrix{Provider: p, Samples: samples[:3]})
|
||
}
|
||
return out
|
||
}
|
||
|
||
func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample {
|
||
result := make([]forecastSample, 3)
|
||
n := len(matrix)
|
||
if n == 0 {
|
||
return result
|
||
}
|
||
for h := 0; h < 3; h++ {
|
||
var windX, windY float64
|
||
for i := 0; i < n; i++ {
|
||
if len(matrix[i].Samples) <= h {
|
||
continue
|
||
}
|
||
s := matrix[i].Samples[h]
|
||
w := eff[i]
|
||
result[h].Rain += s.Rain * w
|
||
result[h].Temp += s.Temp * w
|
||
result[h].Humidity += s.Humidity * w
|
||
result[h].WindSpeed += s.WindSpeed * w
|
||
result[h].WindGust += s.WindGust * w
|
||
result[h].Prob += s.Prob * w
|
||
result[h].Pressure += s.Pressure * w
|
||
result[h].UV += s.UV * w
|
||
rad := s.WindDir * math.Pi / 180
|
||
windX += math.Cos(rad) * w
|
||
windY += math.Sin(rad) * w
|
||
}
|
||
if windX == 0 && windY == 0 {
|
||
result[h].WindDir = 0
|
||
} else {
|
||
dir := math.Atan2(windY, windX) * 180 / math.Pi
|
||
if dir < 0 {
|
||
dir += 360
|
||
}
|
||
result[h].WindDir = dir
|
||
}
|
||
result[h].Humidity = math.Round(result[h].Humidity)
|
||
result[h].Prob = math.Round(result[h].Prob)
|
||
result[h].UV = math.Round(result[h].UV)
|
||
}
|
||
return result
|
||
}
|
||
|
||
func upsertFused(ctx context.Context, db *sql.DB, stationID string, issued time.Time, fused []forecastSample) error {
|
||
for h := 1; h <= 3; h++ {
|
||
ft := issued.Add(time.Duration(h) * time.Hour)
|
||
_, err := db.ExecContext(ctx, `
|
||
INSERT INTO forecast_hourly (
|
||
station_id, provider, issued_at, forecast_time,
|
||
rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000,
|
||
wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index)
|
||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
|
||
ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET
|
||
rain_mm_x1000=EXCLUDED.rain_mm_x1000,
|
||
temp_c_x100=EXCLUDED.temp_c_x100,
|
||
humidity_pct=EXCLUDED.humidity_pct,
|
||
wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000,
|
||
wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000,
|
||
wind_dir_deg=EXCLUDED.wind_dir_deg,
|
||
precip_prob_pct=EXCLUDED.precip_prob_pct,
|
||
pressure_hpa_x100=EXCLUDED.pressure_hpa_x100,
|
||
uv_index=EXCLUDED.uv_index`,
|
||
stationID, outputProvider, issued, ft,
|
||
int(math.Round(fused[h-1].Rain*1000)),
|
||
int(math.Round(fused[h-1].Temp*100)),
|
||
int(math.Round(fused[h-1].Humidity)),
|
||
int(math.Round(fused[h-1].WindSpeed*1000)),
|
||
int(math.Round(fused[h-1].WindGust*1000)),
|
||
int(math.Round(fused[h-1].WindDir)),
|
||
int(math.Round(fused[h-1].Prob)),
|
||
int(math.Round(fused[h-1].Pressure*100)),
|
||
int(math.Round(fused[h-1].UV)),
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ----- Helpers copied (simplified) from cmd/imdroidmix -----
|
||
|
||
func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) {
|
||
const q = `SELECT issued_at FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour' ORDER BY issued_at DESC LIMIT 1`
|
||
var t time.Time
|
||
err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t)
|
||
if err == sql.ErrNoRows {
|
||
return time.Time{}, false, nil
|
||
}
|
||
if err != nil {
|
||
return time.Time{}, false, err
|
||
}
|
||
return t, true, nil
|
||
}
|
||
|
||
func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) {
|
||
const q = `
|
||
SELECT forecast_time,
|
||
COALESCE(rain_mm_x1000, 0),
|
||
COALESCE(temp_c_x100, 0),
|
||
COALESCE(humidity_pct, 0),
|
||
COALESCE(wind_speed_ms_x1000, 0),
|
||
COALESCE(wind_gust_ms_x1000, 0),
|
||
COALESCE(wind_dir_deg, 0),
|
||
COALESCE(precip_prob_pct, 0),
|
||
COALESCE(pressure_hpa_x100, 0),
|
||
COALESCE(uv_index, 0)
|
||
FROM forecast_hourly
|
||
WHERE station_id=$1 AND provider=$2 AND issued_at=$3
|
||
ORDER BY forecast_time ASC`
|
||
rows, err := db.Query(q, stationID, provider, issued)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
var list []forecastSample
|
||
for rows.Next() {
|
||
var ft time.Time
|
||
var rainX1000, tempX100, hum, wsX1000, gustX1000, wdir, prob, presX100, uv int
|
||
if err := rows.Scan(&ft, &rainX1000, &tempX100, &hum, &wsX1000, &gustX1000, &wdir, &prob, &presX100, &uv); err != nil {
|
||
return nil, err
|
||
}
|
||
list = append(list, forecastSample{
|
||
Rain: float64(rainX1000) / 1000.0,
|
||
Temp: float64(tempX100) / 100.0,
|
||
Humidity: float64(hum),
|
||
WindSpeed: float64(wsX1000) / 1000.0,
|
||
WindGust: float64(gustX1000) / 1000.0,
|
||
WindDir: float64(wdir),
|
||
Prob: float64(prob),
|
||
Pressure: float64(presX100) / 100.0,
|
||
UV: float64(uv),
|
||
})
|
||
}
|
||
return list, nil
|
||
}
|
||
|
||
func lead1PredsAt(db *sql.DB, stationID string, prevIssue time.Time, names []string) ([]float64, []string) {
|
||
preds := make([]float64, 0, len(names))
|
||
used := make([]string, 0, len(names))
|
||
for _, p := range names {
|
||
iss, ok, err := resolveIssuedAtInBucket(db, stationID, p, prevIssue)
|
||
if err != nil || !ok {
|
||
continue
|
||
}
|
||
samples, err := loadForecastSamples(db, stationID, p, iss)
|
||
if err != nil || len(samples) < 1 {
|
||
continue
|
||
}
|
||
preds = append(preds, samples[0].Rain)
|
||
used = append(used, p)
|
||
}
|
||
return preds, used
|
||
}
|
||
|
||
func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) {
|
||
const q = `SELECT SUM(rain_10m_mm_x1000) FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
|
||
var sum sql.NullInt64
|
||
err := db.QueryRow(q, stationID, start, end).Scan(&sum)
|
||
if err != nil {
|
||
return 0, false, err
|
||
}
|
||
if !sum.Valid {
|
||
return 0, false, nil
|
||
}
|
||
return float64(sum.Int64) / 1000.0, true, nil
|
||
}
|
||
|
||
// ----- Radar mask helpers (trimmed) -----
|
||
|
||
type maskContext struct {
|
||
id, alias string
|
||
lat, lon float64
|
||
z, y, x int
|
||
}
|
||
|
||
func hasMaskCtx(info stationInfo) bool {
|
||
return info.Lat.Valid && info.Lon.Valid && info.Z.Valid && info.Y.Valid && info.X.Valid
|
||
}
|
||
func buildMaskContext(info stationInfo) maskContext {
|
||
alias := info.Alias
|
||
if strings.TrimSpace(alias) == "" {
|
||
alias = info.ID
|
||
}
|
||
return maskContext{id: info.ID, alias: alias, lat: info.Lat.Float64, lon: info.Lon.Float64, z: int(info.Z.Int64), y: int(info.Y.Int64), x: int(info.X.Int64)}
|
||
}
|
||
|
||
type tileRec struct {
|
||
DT time.Time
|
||
Width int
|
||
Height int
|
||
West float64
|
||
South float64
|
||
East float64
|
||
North float64
|
||
ResDeg float64
|
||
Data []byte
|
||
}
|
||
|
||
type maskReport struct {
|
||
Scanned bool
|
||
WindOK bool
|
||
UsedAlias string
|
||
WindSpeed float64
|
||
WindDir float64
|
||
Tiles int
|
||
HitsByHour [3]int
|
||
}
|
||
|
||
func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) {
|
||
rep := maskReport{}
|
||
windTime := issued.Add(-time.Hour)
|
||
if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil {
|
||
return rep, err
|
||
} else if ok {
|
||
rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id
|
||
} else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil {
|
||
return rep, err2
|
||
} else if ok2 {
|
||
rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias
|
||
} else {
|
||
return rep, nil
|
||
}
|
||
tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued)
|
||
if err != nil {
|
||
return rep, err
|
||
}
|
||
if len(tiles) == 0 {
|
||
return rep, nil
|
||
}
|
||
rep.Tiles = len(tiles)
|
||
rep.Scanned = true
|
||
const circleR = 8000.0
|
||
const halfAngle = 30.0
|
||
for _, t := range tiles {
|
||
vals, xs, ys, err := decodeTile(t)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
for r := 0; r < len(vals); r++ {
|
||
row := vals[r]
|
||
lat := ys[r]
|
||
for c := 0; c < len(row); c++ {
|
||
v := row[c]
|
||
if v == nil {
|
||
continue
|
||
}
|
||
dbz := *v
|
||
if dbz < 40 {
|
||
continue
|
||
}
|
||
lon := xs[c]
|
||
dist := haversine(ctx.lat, ctx.lon, lat, lon)
|
||
if dist <= circleR {
|
||
rep.HitsByHour[0]++
|
||
}
|
||
if rep.WindSpeed > 0 {
|
||
brg := bearingDeg(ctx.lat, ctx.lon, lat, lon)
|
||
if angDiff(brg, rep.WindDir) <= halfAngle {
|
||
if dist <= rep.WindSpeed*3*3600 {
|
||
rep.HitsByHour[0]++
|
||
}
|
||
if dist <= circleR {
|
||
rep.HitsByHour[1]++
|
||
}
|
||
if dist <= rep.WindSpeed*2*3600 {
|
||
rep.HitsByHour[1]++
|
||
}
|
||
if dist <= rep.WindSpeed*3*3600 {
|
||
rep.HitsByHour[2]++
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
return rep, nil
|
||
}
|
||
|
||
func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) {
|
||
const q = `SELECT wind_speed, wind_direction FROM radar_weather WHERE alias=$1 AND dt <= $2 ORDER BY dt DESC LIMIT 1`
|
||
var speed, dir sql.NullFloat64
|
||
err := db.QueryRow(q, alias, target).Scan(&speed, &dir)
|
||
if err == sql.ErrNoRows {
|
||
return 0, 0, false, nil
|
||
}
|
||
if err != nil {
|
||
return 0, 0, false, err
|
||
}
|
||
if !speed.Valid || !dir.Valid {
|
||
return 0, 0, false, nil
|
||
}
|
||
return speed.Float64, dir.Float64, true, nil
|
||
}
|
||
|
||
func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) {
|
||
const q = `SELECT dt, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5 ORDER BY dt`
|
||
rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
var list []tileRec
|
||
for rows.Next() {
|
||
var t tileRec
|
||
if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil {
|
||
return nil, err
|
||
}
|
||
list = append(list, t)
|
||
}
|
||
return list, nil
|
||
}
|
||
|
||
func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) {
|
||
w, h := t.Width, t.Height
|
||
if w <= 0 || h <= 0 {
|
||
return nil, nil, nil, fmt.Errorf("非法瓦片尺寸")
|
||
}
|
||
if len(t.Data) < w*h*2 {
|
||
return nil, nil, nil, fmt.Errorf("数据长度不足")
|
||
}
|
||
xs := make([]float64, w)
|
||
for c := 0; c < w; c++ {
|
||
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
|
||
}
|
||
ys := make([]float64, h)
|
||
for r := 0; r < h; r++ {
|
||
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
|
||
}
|
||
vals := make([][]*float64, h)
|
||
off := 0
|
||
for r := 0; r < h; r++ {
|
||
row := make([]*float64, w)
|
||
for c := 0; c < w; c++ {
|
||
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
|
||
off += 2
|
||
if v >= 32766 {
|
||
row[c] = nil
|
||
continue
|
||
}
|
||
dbz := float64(v) / 10.0
|
||
if dbz < 0 {
|
||
dbz = 0
|
||
} else if dbz > 75 {
|
||
dbz = 75
|
||
}
|
||
value := dbz
|
||
row[c] = &value
|
||
}
|
||
vals[r] = row
|
||
}
|
||
return vals, xs, ys, nil
|
||
}
|
||
|
||
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
|
||
const R = 6371000.0
|
||
dLat := toRad(lat2 - lat1)
|
||
dLon := toRad(lon2 - lon1)
|
||
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
|
||
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
|
||
return R * c
|
||
}
|
||
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
|
||
φ1 := toRad(lat1)
|
||
φ2 := toRad(lat2)
|
||
Δλ := toRad(lon2 - lon1)
|
||
y := math.Sin(Δλ) * math.Cos(φ2)
|
||
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
|
||
brg := toDeg(math.Atan2(y, x))
|
||
if brg < 0 {
|
||
brg += 360
|
||
}
|
||
return brg
|
||
}
|
||
func angDiff(a, b float64) float64 {
|
||
d := math.Mod(a-b+540, 360) - 180
|
||
if d < 0 {
|
||
d = -d
|
||
}
|
||
return math.Abs(d)
|
||
}
|
||
func toRad(d float64) float64 { return d * math.Pi / 180 }
|
||
func toDeg(r float64) float64 { return r * 180 / math.Pi }
|