diff --git a/db/migrations/20251013_add_forecast_weights_current.sql b/db/migrations/20251013_add_forecast_weights_current.sql new file mode 100644 index 0000000..2ea3aec --- /dev/null +++ b/db/migrations/20251013_add_forecast_weights_current.sql @@ -0,0 +1,10 @@ +-- Create snapshot table for per-station forecast fusion weights +CREATE TABLE IF NOT EXISTS forecast_weights_current ( + station_id TEXT PRIMARY KEY, + w_open_meteo DOUBLE PRECISION NOT NULL, + w_caiyun DOUBLE PRECISION NOT NULL, + w_imdroid DOUBLE PRECISION NOT NULL, + last_issued_at TIMESTAMPTZ NOT NULL, + updated_at TIMESTAMPTZ NOT NULL +); + diff --git a/internal/database/weights.go b/internal/database/weights.go new file mode 100644 index 0000000..bc9afd2 --- /dev/null +++ b/internal/database/weights.go @@ -0,0 +1,44 @@ +package database + +import ( + "context" + "time" +) + +type Triad struct { + OpenMeteo float64 + Caiyun float64 + Imdroid float64 +} + +// GetWeightsCurrent returns the last saved triad and its issued_at for a station. +// If no row exists, returns ok=false. +func GetWeightsCurrent(ctx context.Context, stationID string) (triad Triad, lastIssued time.Time, ok bool, err error) { + db := GetDB() + row := db.QueryRowContext(ctx, ` + SELECT w_open_meteo, w_caiyun, w_imdroid, last_issued_at + FROM forecast_weights_current + WHERE station_id=$1`, stationID) + var w1, w2, w3 float64 + var li time.Time + if e := row.Scan(&w1, &w2, &w3, &li); e != nil { + return Triad{}, time.Time{}, false, nil + } + return Triad{OpenMeteo: w1, Caiyun: w2, Imdroid: w3}, li, true, nil +} + +// UpsertWeightsCurrent saves the triad snapshot for the station. +func UpsertWeightsCurrent(ctx context.Context, stationID string, triad Triad, issued time.Time) error { + db := GetDB() + _, err := db.ExecContext(ctx, ` + INSERT INTO forecast_weights_current (station_id, w_open_meteo, w_caiyun, w_imdroid, last_issued_at, updated_at) + VALUES ($1,$2,$3,$4,$5, NOW()) + ON CONFLICT (station_id) + DO UPDATE SET w_open_meteo=EXCLUDED.w_open_meteo, + w_caiyun=EXCLUDED.w_caiyun, + w_imdroid=EXCLUDED.w_imdroid, + last_issued_at=EXCLUDED.last_issued_at, + updated_at=NOW()`, + stationID, triad.OpenMeteo, triad.Caiyun, triad.Imdroid, issued) + return err +} diff --git a/internal/fusion/fusion.go b/internal/fusion/fusion.go new file mode 100644 index 0000000..d921e0b --- /dev/null +++ b/internal/fusion/fusion.go @@ -0,0 +1,796 @@ +package fusion + +import ( + "context" + "database/sql" + "encoding/binary" + "fmt" + "log" + "math" + "sort" + "strings" + "time" + + "weatherstation/internal/database" +) + +const ( + providerOpenMeteo = "open-meteo" + providerCaiyun = "caiyun" + providerImdroid = "imdroid" + outputProvider = "imdroid_mix" +) + +var defaultTriad = database.Triad{OpenMeteo: 0.4, Caiyun: 0.3, Imdroid: 0.3} + +// RunForIssued runs fusion for all stations at the given issued (hour bucket, CST). +// It persists weights into forecast_weights_current and fused rows into forecast_hourly. +func RunForIssued(ctx context.Context, issued time.Time) error { + db := database.GetDB() + stations, err := loadStations(ctx, db) + if err != nil { + return err + } + for _, st := range stations { + if err := runForStation(ctx, db, st, issued); err != nil { + log.Printf("fusion: station=%s issued=%s error=%v", st.ID, issued.Format(time.RFC3339), err) + } + } + return nil +} + +type stationInfo struct { + ID string + Alias string + Lat sql.NullFloat64 + Lon sql.NullFloat64 + Z sql.NullInt64 + Y sql.NullInt64 + X sql.NullInt64 +} + +func loadStations(ctx context.Context, db *sql.DB) ([]stationInfo, error) { + rows, err := db.QueryContext(ctx, ` + SELECT station_id, + COALESCE(NULLIF(station_alias,''), station_id) AS alias, + latitude, longitude, z, y, x + FROM stations + WHERE latitude IS NOT NULL AND longitude IS NOT NULL + ORDER BY station_id`) + if err != nil { + return nil, err + } + defer rows.Close() + var list []stationInfo + for rows.Next() { + var s stationInfo + if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil { + return nil, err + } + list = append(list, s) + } + return list, nil +} + +type providerMatrix struct { + Provider string + Samples []forecastSample +} + +type forecastSample struct { + Rain float64 + Temp float64 + Humidity float64 + WindSpeed float64 + WindGust float64 + WindDir float64 + Prob float64 + Pressure float64 + UV float64 +} + +func runForStation(ctx context.Context, db *sql.DB, st stationInfo, issued time.Time) error { + // Log header + log.Printf("") + log.Printf("issue %s station=%s", issued.Format("2006-01-02 15:04:05"), st.ID) + + // Presence at issued + present := presentProviders(db, st.ID, issued) + presOrdered := orderByProvidersAll(present) + added, removed := diffPresence(db, st.ID, present) + if len(added) > 0 || len(removed) > 0 { + log.Printf("sources %v added=%v removed=%v", presOrdered, added, removed) + } + + // Load previous triad snapshot or default + triad, _, ok, _ := database.GetWeightsCurrent(ctx, st.ID) + if !ok { + triad = defaultTriad + } + + // Apply missing/new rules + triad = applyMissingTransfer(triad, present) + triad = applyNewAllocation(triad, present) + + // Learning (I-1 vs (I-2)+1) + learnStart := issued.Add(-time.Hour) + learnEnd := issued + actual, actualOK, err := fetchActualHourlyRain(db, st.ID, learnStart, learnEnd) + if err != nil { + log.Printf("fusion: actual error station=%s issued=%s err=%v", st.ID, issued.Format(time.RFC3339), err) + } else if actualOK { + prevIssue := issued.Add(-2 * time.Hour) + preds, names := lead1PredsAt(db, st.ID, prevIssue, presOrdered) + log.Printf("在 [%s,%s) ,该小时本站实况雨量累计为:%.3f,预报源在 issue=%s 发布的 +1h %s-%s 预报值是 %s", + learnStart.Format("15:04"), learnEnd.Format("15:04"), actual, + prevIssue.Format("15:04"), learnStart.Format("15:04"), learnEnd.Format("15:04"), + formatFloatSlice(preds)) + triad = learnTriad(triad, names, preds, actual) + } + + // Build matrix samples for issued (providers present only) + matrix := buildMatrix(db, st.ID, issued, presOrdered) + + // Effective weights (ordered by presOrdered) + effBefore := triadToSlice(triad, presOrdered) + eff := effBefore // learning already applied + log.Printf("学习前权重 %s ,本轮使用权重 %s", toNamed(effBefore, presOrdered), toNamed(eff, presOrdered)) + + // Fuse with mask + fused := fuseForecastsWith(matrix, eff) + preMask := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain} + maskApplied := false + // Mask requires radar context + if hasMaskCtx(st) { + rep, err := checkRadarMask(db, buildMaskContext(st), issued) + if err == nil && rep.Scanned { + // per-hour apply: if hits==0 then zero rain + for h := 0; h < 3; h++ { + if rep.HitsByHour[h] == 0 { + fused[h].Rain = 0 + maskApplied = true + } + } + log.Printf("mask t=%s wind=%.1fm/s %.0f° tiles=%d hits=[%d,%d,%d] applied=%v", + issued.Format("2006-01-02 15:04:05"), rep.WindSpeed, rep.WindDir, rep.Tiles, + rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskApplied) + } + } + postMask := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain} + log.Printf("预报 %s , mask=%v , 融合的结果 %s", formatFloatSlice(preMask), maskApplied, formatFloatSlice(postMask)) + + // Persist fused rows + if err := upsertFused(ctx, db, st.ID, issued, fused); err != nil { + return err + } + // Persist triad snapshot for next hour + if err := database.UpsertWeightsCurrent(ctx, st.ID, triad, issued); err != nil { + return err + } + return nil +} + +func formatFloatSlice(vs []float64) string { + parts := make([]string, len(vs)) + for i, v := range vs { + parts[i] = fmt.Sprintf("%.2f", v) + } + return "[" + strings.Join(parts, ",") + "]" +} + +func toNamed(ws []float64, names []string) string { + parts := make([]string, len(ws)) + for i := range ws { + parts[i] = fmt.Sprintf("%s=%.2f", names[i], ws[i]) + } + return strings.Join(parts, ", ") +} + +func triadToSlice(t database.Triad, names []string) []float64 { + out := make([]float64, len(names)) + for i, n := range names { + switch n { + case providerOpenMeteo: + out[i] = t.OpenMeteo + case providerCaiyun: + out[i] = t.Caiyun + case providerImdroid: + out[i] = t.Imdroid + } + } + return out +} + +func sliceToTriad(vals []float64, names []string) database.Triad { + t := database.Triad{} + for i, n := range names { + switch n { + case providerOpenMeteo: + t.OpenMeteo = vals[i] + case providerCaiyun: + t.Caiyun = vals[i] + case providerImdroid: + t.Imdroid = vals[i] + } + } + return t +} + +func presentProviders(db *sql.DB, stationID string, issued time.Time) map[string]bool { + out := map[string]bool{} + // Check providers within [issued, issued+1h) + for _, p := range []string{providerOpenMeteo, providerCaiyun, providerImdroid} { + var cnt int + _ = db.QueryRow(`SELECT COUNT(*) FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour'`, stationID, p, issued).Scan(&cnt) + if cnt > 0 { + out[p] = true + } + } + return out +} + +func orderByProvidersAll(present map[string]bool) []string { + base := []string{providerOpenMeteo, providerCaiyun, providerImdroid} + out := make([]string, 0, len(present)) + for _, p := range base { + if present[p] { + out = append(out, p) + } + } + return out +} + +func diffPresence(db *sql.DB, stationID string, present map[string]bool) (added, removed []string) { + // Use last hour presence from forecast_hourly as proxy + prev := map[string]bool{} + // naive: if provider had any row at issued-1h bucket, consider present + // We don't know issued here; caller logs only. + // Return stable order using base sequence. + base := []string{providerOpenMeteo, providerCaiyun, providerImdroid} + for _, p := range base { + if present[p] && !prev[p] { + added = append(added, p) + } + if !present[p] && prev[p] { + removed = append(removed, p) + } + } + return added, removed +} + +func applyMissingTransfer(t database.Triad, present map[string]bool) database.Triad { + // Sum of missing weights + missing := 0.0 + if !present[providerOpenMeteo] { + missing += t.OpenMeteo + t.OpenMeteo = 0 + } + if !present[providerCaiyun] { + missing += t.Caiyun + t.Caiyun = 0 + } + if !present[providerImdroid] { + missing += t.Imdroid + t.Imdroid = 0 + } + if missing == 0 { + return t + } + // Give to current max among present + maxP := "" + maxW := -1.0 + for p, ok := range present { + if !ok { + continue + } + w := 0.0 + switch p { + case providerOpenMeteo: + w = t.OpenMeteo + case providerCaiyun: + w = t.Caiyun + case providerImdroid: + w = t.Imdroid + } + if w > maxW { + maxW = w + maxP = p + } + } + switch maxP { + case providerOpenMeteo: + t.OpenMeteo += missing + case providerCaiyun: + t.Caiyun += missing + case providerImdroid: + t.Imdroid += missing + } + return t +} + +func applyNewAllocation(t database.Triad, present map[string]bool) database.Triad { + // allocate 0.2 from current max to any newly present which currently has 0 + for _, p := range []string{providerOpenMeteo, providerCaiyun, providerImdroid} { + // we don't track prior presence; heuristic: if present and weight==0 allocate + if present[p] && getW(t, p) <= 0 { + // find current max (excluding p) + type pair struct { + name string + w float64 + } + arr := []pair{{providerOpenMeteo, t.OpenMeteo}, {providerCaiyun, t.Caiyun}, {providerImdroid, t.Imdroid}} + sort.Slice(arr, func(i, j int) bool { return arr[i].w > arr[j].w }) + src := arr[0] + if src.name == p && len(arr) > 1 { + src = arr[1] + } + delta := 0.2 + if src.w < delta { + delta = src.w + } + t = setW(t, src.name, src.w-delta) + t = setW(t, p, getW(t, p)+delta) + } + } + return t +} + +func getW(t database.Triad, p string) float64 { + switch p { + case providerOpenMeteo: + return t.OpenMeteo + case providerCaiyun: + return t.Caiyun + case providerImdroid: + return t.Imdroid + } + return 0 +} +func setW(t database.Triad, p string, v float64) database.Triad { + switch p { + case providerOpenMeteo: + t.OpenMeteo = v + case providerCaiyun: + t.Caiyun = v + case providerImdroid: + t.Imdroid = v + } + return t +} + +func learnTriad(t database.Triad, names []string, preds []float64, actual float64) database.Triad { + if len(names) != len(preds) || len(names) == 0 { + return t + } + // find best/worst among names by absolute error + best, worst := 0, 0 + for i := range preds { + if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) { + best = i + } + if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) { + worst = i + } + } + // log absolute errors + parts := make([]string, 0, len(names)) + for i := range names { + parts = append(parts, fmt.Sprintf("%s |%.2f-%.3f|=%.3f", names[i], preds[i], actual, math.Abs(preds[i]-actual))) + } + log.Printf("绝对误差:%s → best=%s,worst=%s。", strings.Join(parts, ","), names[best], names[worst]) + + // apply ±0.1 (cap by available on worst) + alpha := 0.1 + worstW := getW(t, names[worst]) + delta := alpha + if worstW < delta { + delta = worstW + } + if delta <= 0 { + log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatFloatSlice(triadToSlice(t, []string{providerOpenMeteo, providerCaiyun, providerImdroid}))) + return t + } + t = setW(t, names[worst], worstW-delta) + t = setW(t, names[best], getW(t, names[best])+delta) + log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatFloatSlice(triadToSlice(t, []string{providerOpenMeteo, providerCaiyun, providerImdroid}))) + return t +} + +func buildMatrix(db *sql.DB, stationID string, issued time.Time, ordered []string) []providerMatrix { + out := make([]providerMatrix, 0, len(ordered)) + for _, p := range ordered { + iss, ok, err := resolveIssuedAtInBucket(db, stationID, p, issued) + if err != nil || !ok { + continue + } + samples, err := loadForecastSamples(db, stationID, p, iss) + if err != nil || len(samples) < 3 { + continue + } + out = append(out, providerMatrix{Provider: p, Samples: samples[:3]}) + } + return out +} + +func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample { + result := make([]forecastSample, 3) + n := len(matrix) + if n == 0 { + return result + } + for h := 0; h < 3; h++ { + var windX, windY float64 + for i := 0; i < n; i++ { + if len(matrix[i].Samples) <= h { + continue + } + s := matrix[i].Samples[h] + w := eff[i] + result[h].Rain += s.Rain * w + result[h].Temp += s.Temp * w + result[h].Humidity += s.Humidity * w + result[h].WindSpeed += s.WindSpeed * w + result[h].WindGust += s.WindGust * w + result[h].Prob += s.Prob * w + result[h].Pressure += s.Pressure * w + result[h].UV += s.UV * w + rad := s.WindDir * math.Pi / 180 + windX += math.Cos(rad) * w + windY += math.Sin(rad) * w + } + if windX == 0 && windY == 0 { + result[h].WindDir = 0 + } else { + dir := math.Atan2(windY, windX) * 180 / math.Pi + if dir < 0 { + dir += 360 + } + result[h].WindDir = dir + } + result[h].Humidity = math.Round(result[h].Humidity) + result[h].Prob = math.Round(result[h].Prob) + result[h].UV = math.Round(result[h].UV) + } + return result +} + +func upsertFused(ctx context.Context, db *sql.DB, stationID string, issued time.Time, fused []forecastSample) error { + for h := 1; h <= 3; h++ { + ft := issued.Add(time.Duration(h) * time.Hour) + _, err := db.ExecContext(ctx, ` + INSERT INTO forecast_hourly ( + station_id, provider, issued_at, forecast_time, + rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, + wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) + ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET + rain_mm_x1000=EXCLUDED.rain_mm_x1000, + temp_c_x100=EXCLUDED.temp_c_x100, + humidity_pct=EXCLUDED.humidity_pct, + wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000, + wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000, + wind_dir_deg=EXCLUDED.wind_dir_deg, + precip_prob_pct=EXCLUDED.precip_prob_pct, + pressure_hpa_x100=EXCLUDED.pressure_hpa_x100, + uv_index=EXCLUDED.uv_index`, + stationID, outputProvider, issued, ft, + int(math.Round(fused[h-1].Rain*1000)), + int(math.Round(fused[h-1].Temp*100)), + int(math.Round(fused[h-1].Humidity)), + int(math.Round(fused[h-1].WindSpeed*1000)), + int(math.Round(fused[h-1].WindGust*1000)), + int(math.Round(fused[h-1].WindDir)), + int(math.Round(fused[h-1].Prob)), + int(math.Round(fused[h-1].Pressure*100)), + int(math.Round(fused[h-1].UV)), + ) + if err != nil { + return err + } + } + return nil +} + +// ----- Helpers copied (simplified) from cmd/imdroidmix ----- + +func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) { + const q = `SELECT issued_at FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour' ORDER BY issued_at DESC LIMIT 1` + var t time.Time + err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t) + if err == sql.ErrNoRows { + return time.Time{}, false, nil + } + if err != nil { + return time.Time{}, false, err + } + return t, true, nil +} + +func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) { + const q = ` + SELECT forecast_time, + rain_mm_x1000, + temp_c_x100, + humidity_pct, + wind_speed_ms_x1000, + wind_gust_ms_x1000, + wind_dir_deg, + precip_prob_pct, + pressure_hpa_x100, + uv_index + FROM forecast_hourly + WHERE station_id=$1 AND provider=$2 AND issued_at=$3 + ORDER BY forecast_time ASC` + rows, err := db.Query(q, stationID, provider, issued) + if err != nil { + return nil, err + } + defer rows.Close() + var list []forecastSample + for rows.Next() { + var ft time.Time + var rainX1000, tempX100, hum, wsX1000, gustX1000, wdir, prob, presX100, uv int + if err := rows.Scan(&ft, &rainX1000, &tempX100, &hum, &wsX1000, &gustX1000, &wdir, &prob, &presX100, &uv); err != nil { + return nil, err + } + list = append(list, forecastSample{ + Rain: float64(rainX1000) / 1000.0, + Temp: float64(tempX100) / 100.0, + Humidity: float64(hum), + WindSpeed: float64(wsX1000) / 1000.0, + WindGust: float64(gustX1000) / 1000.0, + WindDir: float64(wdir), + Prob: float64(prob), + Pressure: float64(presX100) / 100.0, + UV: float64(uv), + }) + } + return list, nil +} + +func lead1PredsAt(db *sql.DB, stationID string, prevIssue time.Time, names []string) ([]float64, []string) { + preds := make([]float64, 0, len(names)) + used := make([]string, 0, len(names)) + for _, p := range names { + iss, ok, err := resolveIssuedAtInBucket(db, stationID, p, prevIssue) + if err != nil || !ok { + continue + } + samples, err := loadForecastSamples(db, stationID, p, iss) + if err != nil || len(samples) < 1 { + continue + } + preds = append(preds, samples[0].Rain) + used = append(used, p) + } + return preds, used +} + +func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) { + const q = `SELECT SUM(rain_10m_mm_x1000) FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3` + var sum sql.NullInt64 + err := db.QueryRow(q, stationID, start, end).Scan(&sum) + if err != nil { + return 0, false, err + } + if !sum.Valid { + return 0, false, nil + } + return float64(sum.Int64) / 1000.0, true, nil +} + +// ----- Radar mask helpers (trimmed) ----- + +type maskContext struct { + id, alias string + lat, lon float64 + z, y, x int +} + +func hasMaskCtx(info stationInfo) bool { + return info.Lat.Valid && info.Lon.Valid && info.Z.Valid && info.Y.Valid && info.X.Valid +} +func buildMaskContext(info stationInfo) maskContext { + alias := info.Alias + if strings.TrimSpace(alias) == "" { + alias = info.ID + } + return maskContext{id: info.ID, alias: alias, lat: info.Lat.Float64, lon: info.Lon.Float64, z: int(info.Z.Int64), y: int(info.Y.Int64), x: int(info.X.Int64)} +} + +type tileRec struct { + DT time.Time + Width int + Height int + West float64 + South float64 + East float64 + North float64 + ResDeg float64 + Data []byte +} + +type maskReport struct { + Scanned bool + WindOK bool + UsedAlias string + WindSpeed float64 + WindDir float64 + Tiles int + HitsByHour [3]int +} + +func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) { + rep := maskReport{} + windTime := issued.Add(-time.Hour) + if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil { + return rep, err + } else if ok { + rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id + } else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil { + return rep, err2 + } else if ok2 { + rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias + } else { + return rep, nil + } + tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued) + if err != nil { + return rep, err + } + if len(tiles) == 0 { + return rep, nil + } + rep.Tiles = len(tiles) + rep.Scanned = true + const circleR = 8000.0 + const halfAngle = 30.0 + for _, t := range tiles { + vals, xs, ys, err := decodeTile(t) + if err != nil { + continue + } + for r := 0; r < len(vals); r++ { + row := vals[r] + lat := ys[r] + for c := 0; c < len(row); c++ { + v := row[c] + if v == nil { + continue + } + dbz := *v + if dbz < 40 { + continue + } + lon := xs[c] + dist := haversine(ctx.lat, ctx.lon, lat, lon) + if dist <= circleR { + rep.HitsByHour[0]++ + } + if rep.WindSpeed > 0 { + brg := bearingDeg(ctx.lat, ctx.lon, lat, lon) + if angDiff(brg, rep.WindDir) <= halfAngle { + if dist <= rep.WindSpeed*3*3600 { + rep.HitsByHour[0]++ + } + if dist <= circleR { + rep.HitsByHour[1]++ + } + if dist <= rep.WindSpeed*2*3600 { + rep.HitsByHour[1]++ + } + if dist <= rep.WindSpeed*3*3600 { + rep.HitsByHour[2]++ + } + } + } + } + } + } + return rep, nil +} + +func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) { + const q = `SELECT wind_speed, wind_direction FROM radar_weather WHERE alias=$1 AND dt <= $2 ORDER BY dt DESC LIMIT 1` + var speed, dir sql.NullFloat64 + err := db.QueryRow(q, alias, target).Scan(&speed, &dir) + if err == sql.ErrNoRows { + return 0, 0, false, nil + } + if err != nil { + return 0, 0, false, err + } + if !speed.Valid || !dir.Valid { + return 0, 0, false, nil + } + return speed.Float64, dir.Float64, true, nil +} + +func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) { + const q = `SELECT dt, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5 ORDER BY dt` + rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + var list []tileRec + for rows.Next() { + var t tileRec + if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil { + return nil, err + } + list = append(list, t) + } + return list, nil +} + +func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) { + w, h := t.Width, t.Height + if w <= 0 || h <= 0 { + return nil, nil, nil, fmt.Errorf("非法瓦片尺寸") + } + if len(t.Data) < w*h*2 { + return nil, nil, nil, fmt.Errorf("数据长度不足") + } + xs := make([]float64, w) + for c := 0; c < w; c++ { + xs[c] = t.West + (float64(c)+0.5)*t.ResDeg + } + ys := make([]float64, h) + for r := 0; r < h; r++ { + ys[r] = t.South + (float64(r)+0.5)*t.ResDeg + } + vals := make([][]*float64, h) + off := 0 + for r := 0; r < h; r++ { + row := make([]*float64, w) + for c := 0; c < w; c++ { + v := int16(binary.BigEndian.Uint16(t.Data[off : off+2])) + off += 2 + if v >= 32766 { + row[c] = nil + continue + } + dbz := float64(v) / 10.0 + if dbz < 0 { + dbz = 0 + } else if dbz > 75 { + dbz = 75 + } + value := dbz + row[c] = &value + } + vals[r] = row + } + return vals, xs, ys, nil +} + +func haversine(lat1, lon1, lat2, lon2 float64) float64 { + const R = 6371000.0 + dLat := toRad(lat2 - lat1) + dLon := toRad(lon2 - lon1) + a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2) + c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) + return R * c +} +func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 { + φ1 := toRad(lat1) + φ2 := toRad(lat2) + Δλ := toRad(lon2 - lon1) + y := math.Sin(Δλ) * math.Cos(φ2) + x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ) + brg := toDeg(math.Atan2(y, x)) + if brg < 0 { + brg += 360 + } + return brg +} +func angDiff(a, b float64) float64 { + d := math.Mod(a-b+540, 360) - 180 + if d < 0 { + d = -d + } + return math.Abs(d) +} +func toRad(d float64) float64 { return d * math.Pi / 180 } +func toDeg(r float64) float64 { return r * 180 / math.Pi } diff --git a/internal/server/udp.go b/internal/server/udp.go index 5a71ddb..a6c930d 100644 --- a/internal/server/udp.go +++ b/internal/server/udp.go @@ -16,6 +16,7 @@ import ( "unicode/utf8" "weatherstation/internal/config" "weatherstation/internal/forecast" + "weatherstation/internal/fusion" "weatherstation/internal/tools" "weatherstation/model" ) @@ -192,6 +193,21 @@ func StartUDPServer() error { } }() + // 后台定时:每小时整点+5分 执行融合任务(全站),发布 imdroid_mix + go func() { + for { + now := time.Now() + next := now.Truncate(time.Hour).Add(time.Hour).Add(5 * time.Minute) + time.Sleep(time.Until(next)) + issued := next.Truncate(time.Hour) + if err := fusion.RunForIssued(context.Background(), issued); err != nil { + log.Printf("fusion 定时执行失败: %v", err) + } else { + log.Printf("fusion 定时执行完成 issued=%s", issued.Format("2006-01-02 15:04:05")) + } + } + }() + for { n, addr, err := conn.ReadFrom(buffer) if err != nil {