package fusion import ( "context" "database/sql" "encoding/binary" "fmt" "log" "math" "sort" "strings" "time" "weatherstation/internal/database" ) const ( providerOpenMeteo = "open-meteo" providerCaiyun = "caiyun" providerImdroid = "imdroid" outputProvider = "imdroid_mix" ) var defaultTriad = database.Triad{OpenMeteo: 0.4, Caiyun: 0.3, Imdroid: 0.3} // RunForIssued runs fusion for all stations at the given issued (hour bucket, CST). // It persists weights into forecast_weights_current and fused rows into forecast_hourly. func RunForIssued(ctx context.Context, issued time.Time) error { db := database.GetDB() stations, err := loadStations(ctx, db) if err != nil { return err } for _, st := range stations { if err := runForStation(ctx, db, st, issued); err != nil { log.Printf("fusion: station=%s issued=%s error=%v", st.ID, issued.Format(time.RFC3339), err) } } return nil } type stationInfo struct { ID string Alias string Lat sql.NullFloat64 Lon sql.NullFloat64 Z sql.NullInt64 Y sql.NullInt64 X sql.NullInt64 } func loadStations(ctx context.Context, db *sql.DB) ([]stationInfo, error) { rows, err := db.QueryContext(ctx, ` SELECT station_id, COALESCE(NULLIF(station_alias,''), station_id) AS alias, latitude, longitude, z, y, x FROM stations WHERE latitude IS NOT NULL AND longitude IS NOT NULL ORDER BY station_id`) if err != nil { return nil, err } defer rows.Close() var list []stationInfo for rows.Next() { var s stationInfo if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil { return nil, err } list = append(list, s) } return list, nil } type providerMatrix struct { Provider string Samples []forecastSample } type forecastSample struct { Rain float64 Temp float64 Humidity float64 WindSpeed float64 WindGust float64 WindDir float64 Prob float64 Pressure float64 UV float64 } func runForStation(ctx context.Context, db *sql.DB, st stationInfo, issued time.Time) error { // Log header log.Printf("") log.Printf("issue %s station=%s", issued.Format("2006-01-02 15:04:05"), st.ID) // Presence at issued present := presentProviders(db, st.ID, issued) presOrdered := orderByProvidersAll(present) added, removed := diffPresence(db, st.ID, present) if len(added) > 0 || len(removed) > 0 { log.Printf("sources %v added=%v removed=%v", presOrdered, added, removed) } // Load previous triad snapshot or default triad, _, ok, _ := database.GetWeightsCurrent(ctx, st.ID) if !ok { triad = defaultTriad } // Apply missing/new rules triad = applyMissingTransfer(triad, present) triad = applyNewAllocation(triad, present) // Learning (I-1 vs (I-2)+1) learnStart := issued.Add(-time.Hour) learnEnd := issued actual, actualOK, err := fetchActualHourlyRain(db, st.ID, learnStart, learnEnd) if err != nil { log.Printf("fusion: actual error station=%s issued=%s err=%v", st.ID, issued.Format(time.RFC3339), err) } else if actualOK { prevIssue := issued.Add(-2 * time.Hour) preds, names := lead1PredsAt(db, st.ID, prevIssue, presOrdered) log.Printf("在 [%s,%s) ,该小时本站实况雨量累计为:%.3f,预报源在 issue=%s 发布的 +1h %s-%s 预报值是 %s", learnStart.Format("15:04"), learnEnd.Format("15:04"), actual, prevIssue.Format("15:04"), learnStart.Format("15:04"), learnEnd.Format("15:04"), formatFloatSlice(preds)) triad = learnTriad(triad, names, preds, actual) } // Build matrix samples for issued (providers present only) matrix := buildMatrix(db, st.ID, issued, presOrdered) // Effective weights (ordered by presOrdered) effBefore := triadToSlice(triad, presOrdered) eff := effBefore // learning already applied log.Printf("学习前权重 %s ,本轮使用权重 %s", toNamed(effBefore, presOrdered), toNamed(eff, presOrdered)) // Fuse with mask fused := fuseForecastsWith(matrix, eff) preMask := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain} maskApplied := false // Mask requires radar context if hasMaskCtx(st) { rep, err := checkRadarMask(db, buildMaskContext(st), issued) if err == nil && rep.Scanned { // per-hour apply: if hits==0 then zero rain for h := 0; h < 3; h++ { if rep.HitsByHour[h] == 0 { fused[h].Rain = 0 maskApplied = true } } log.Printf("mask t=%s wind=%.1fm/s %.0f° tiles=%d hits=[%d,%d,%d] applied=%v", issued.Format("2006-01-02 15:04:05"), rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskApplied) } } postMask := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain} log.Printf("预报 %s , mask=%v , 融合的结果 %s", formatFloatSlice(preMask), maskApplied, formatFloatSlice(postMask)) // Persist fused rows if err := upsertFused(ctx, db, st.ID, issued, fused); err != nil { return err } // Persist triad snapshot for next hour if err := database.UpsertWeightsCurrent(ctx, st.ID, triad, issued); err != nil { return err } return nil } func formatFloatSlice(vs []float64) string { parts := make([]string, len(vs)) for i, v := range vs { parts[i] = fmt.Sprintf("%.2f", v) } return "[" + strings.Join(parts, ",") + "]" } func toNamed(ws []float64, names []string) string { parts := make([]string, len(ws)) for i := range ws { parts[i] = fmt.Sprintf("%s=%.2f", names[i], ws[i]) } return strings.Join(parts, ", ") } func triadToSlice(t database.Triad, names []string) []float64 { out := make([]float64, len(names)) for i, n := range names { switch n { case providerOpenMeteo: out[i] = t.OpenMeteo case providerCaiyun: out[i] = t.Caiyun case providerImdroid: out[i] = t.Imdroid } } return out } func sliceToTriad(vals []float64, names []string) database.Triad { t := database.Triad{} for i, n := range names { switch n { case providerOpenMeteo: t.OpenMeteo = vals[i] case providerCaiyun: t.Caiyun = vals[i] case providerImdroid: t.Imdroid = vals[i] } } return t } func presentProviders(db *sql.DB, stationID string, issued time.Time) map[string]bool { out := map[string]bool{} // Check providers within [issued, issued+1h) for _, p := range []string{providerOpenMeteo, providerCaiyun, providerImdroid} { var cnt int _ = db.QueryRow(`SELECT COUNT(*) FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour'`, stationID, p, issued).Scan(&cnt) if cnt > 0 { out[p] = true } } return out } func orderByProvidersAll(present map[string]bool) []string { base := []string{providerOpenMeteo, providerCaiyun, providerImdroid} out := make([]string, 0, len(present)) for _, p := range base { if present[p] { out = append(out, p) } } return out } func diffPresence(db *sql.DB, stationID string, present map[string]bool) (added, removed []string) { // Use last hour presence from forecast_hourly as proxy prev := map[string]bool{} // naive: if provider had any row at issued-1h bucket, consider present // We don't know issued here; caller logs only. // Return stable order using base sequence. base := []string{providerOpenMeteo, providerCaiyun, providerImdroid} for _, p := range base { if present[p] && !prev[p] { added = append(added, p) } if !present[p] && prev[p] { removed = append(removed, p) } } return added, removed } func applyMissingTransfer(t database.Triad, present map[string]bool) database.Triad { // Sum of missing weights missing := 0.0 if !present[providerOpenMeteo] { missing += t.OpenMeteo t.OpenMeteo = 0 } if !present[providerCaiyun] { missing += t.Caiyun t.Caiyun = 0 } if !present[providerImdroid] { missing += t.Imdroid t.Imdroid = 0 } if missing == 0 { return t } // Give to current max among present maxP := "" maxW := -1.0 for p, ok := range present { if !ok { continue } w := 0.0 switch p { case providerOpenMeteo: w = t.OpenMeteo case providerCaiyun: w = t.Caiyun case providerImdroid: w = t.Imdroid } if w > maxW { maxW = w maxP = p } } switch maxP { case providerOpenMeteo: t.OpenMeteo += missing case providerCaiyun: t.Caiyun += missing case providerImdroid: t.Imdroid += missing } return t } func applyNewAllocation(t database.Triad, present map[string]bool) database.Triad { // allocate 0.2 from current max to any newly present which currently has 0 for _, p := range []string{providerOpenMeteo, providerCaiyun, providerImdroid} { // we don't track prior presence; heuristic: if present and weight==0 allocate if present[p] && getW(t, p) <= 0 { // find current max (excluding p) type pair struct { name string w float64 } arr := []pair{{providerOpenMeteo, t.OpenMeteo}, {providerCaiyun, t.Caiyun}, {providerImdroid, t.Imdroid}} sort.Slice(arr, func(i, j int) bool { return arr[i].w > arr[j].w }) src := arr[0] if src.name == p && len(arr) > 1 { src = arr[1] } delta := 0.2 if src.w < delta { delta = src.w } t = setW(t, src.name, src.w-delta) t = setW(t, p, getW(t, p)+delta) } } return t } func getW(t database.Triad, p string) float64 { switch p { case providerOpenMeteo: return t.OpenMeteo case providerCaiyun: return t.Caiyun case providerImdroid: return t.Imdroid } return 0 } func setW(t database.Triad, p string, v float64) database.Triad { switch p { case providerOpenMeteo: t.OpenMeteo = v case providerCaiyun: t.Caiyun = v case providerImdroid: t.Imdroid = v } return t } func learnTriad(t database.Triad, names []string, preds []float64, actual float64) database.Triad { if len(names) != len(preds) || len(names) == 0 { return t } // find best/worst among names by absolute error best, worst := 0, 0 for i := range preds { if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) { best = i } if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) { worst = i } } // log absolute errors parts := make([]string, 0, len(names)) for i := range names { parts = append(parts, fmt.Sprintf("%s |%.2f-%.3f|=%.3f", names[i], preds[i], actual, math.Abs(preds[i]-actual))) } log.Printf("绝对误差:%s → best=%s,worst=%s。", strings.Join(parts, ","), names[best], names[worst]) // apply ±0.1 (cap by available on worst) alpha := 0.1 worstW := getW(t, names[worst]) delta := alpha if worstW < delta { delta = worstW } if delta <= 0 { log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatFloatSlice(triadToSlice(t, []string{providerOpenMeteo, providerCaiyun, providerImdroid}))) return t } t = setW(t, names[worst], worstW-delta) t = setW(t, names[best], getW(t, names[best])+delta) log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatFloatSlice(triadToSlice(t, []string{providerOpenMeteo, providerCaiyun, providerImdroid}))) return t } func buildMatrix(db *sql.DB, stationID string, issued time.Time, ordered []string) []providerMatrix { out := make([]providerMatrix, 0, len(ordered)) for _, p := range ordered { iss, ok, err := resolveIssuedAtInBucket(db, stationID, p, issued) if err != nil || !ok { continue } samples, err := loadForecastSamples(db, stationID, p, iss) if err != nil || len(samples) < 3 { continue } out = append(out, providerMatrix{Provider: p, Samples: samples[:3]}) } return out } func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample { result := make([]forecastSample, 3) n := len(matrix) if n == 0 { return result } for h := 0; h < 3; h++ { var windX, windY float64 for i := 0; i < n; i++ { if len(matrix[i].Samples) <= h { continue } s := matrix[i].Samples[h] w := eff[i] result[h].Rain += s.Rain * w result[h].Temp += s.Temp * w result[h].Humidity += s.Humidity * w result[h].WindSpeed += s.WindSpeed * w result[h].WindGust += s.WindGust * w result[h].Prob += s.Prob * w result[h].Pressure += s.Pressure * w result[h].UV += s.UV * w rad := s.WindDir * math.Pi / 180 windX += math.Cos(rad) * w windY += math.Sin(rad) * w } if windX == 0 && windY == 0 { result[h].WindDir = 0 } else { dir := math.Atan2(windY, windX) * 180 / math.Pi if dir < 0 { dir += 360 } result[h].WindDir = dir } result[h].Humidity = math.Round(result[h].Humidity) result[h].Prob = math.Round(result[h].Prob) result[h].UV = math.Round(result[h].UV) } return result } func upsertFused(ctx context.Context, db *sql.DB, stationID string, issued time.Time, fused []forecastSample) error { for h := 1; h <= 3; h++ { ft := issued.Add(time.Duration(h) * time.Hour) _, err := db.ExecContext(ctx, ` INSERT INTO forecast_hourly ( station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000, temp_c_x100=EXCLUDED.temp_c_x100, humidity_pct=EXCLUDED.humidity_pct, wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000, wind_dir_deg=EXCLUDED.wind_dir_deg, precip_prob_pct=EXCLUDED.precip_prob_pct, pressure_hpa_x100=EXCLUDED.pressure_hpa_x100, uv_index=EXCLUDED.uv_index`, stationID, outputProvider, issued, ft, int(math.Round(fused[h-1].Rain*1000)), int(math.Round(fused[h-1].Temp*100)), int(math.Round(fused[h-1].Humidity)), int(math.Round(fused[h-1].WindSpeed*1000)), int(math.Round(fused[h-1].WindGust*1000)), int(math.Round(fused[h-1].WindDir)), int(math.Round(fused[h-1].Prob)), int(math.Round(fused[h-1].Pressure*100)), int(math.Round(fused[h-1].UV)), ) if err != nil { return err } } return nil } // ----- Helpers copied (simplified) from cmd/imdroidmix ----- func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) { const q = `SELECT issued_at FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour' ORDER BY issued_at DESC LIMIT 1` var t time.Time err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t) if err == sql.ErrNoRows { return time.Time{}, false, nil } if err != nil { return time.Time{}, false, err } return t, true, nil } func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) { const q = ` SELECT forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at=$3 ORDER BY forecast_time ASC` rows, err := db.Query(q, stationID, provider, issued) if err != nil { return nil, err } defer rows.Close() var list []forecastSample for rows.Next() { var ft time.Time var rainX1000, tempX100, hum, wsX1000, gustX1000, wdir, prob, presX100, uv int if err := rows.Scan(&ft, &rainX1000, &tempX100, &hum, &wsX1000, &gustX1000, &wdir, &prob, &presX100, &uv); err != nil { return nil, err } list = append(list, forecastSample{ Rain: float64(rainX1000) / 1000.0, Temp: float64(tempX100) / 100.0, Humidity: float64(hum), WindSpeed: float64(wsX1000) / 1000.0, WindGust: float64(gustX1000) / 1000.0, WindDir: float64(wdir), Prob: float64(prob), Pressure: float64(presX100) / 100.0, UV: float64(uv), }) } return list, nil } func lead1PredsAt(db *sql.DB, stationID string, prevIssue time.Time, names []string) ([]float64, []string) { preds := make([]float64, 0, len(names)) used := make([]string, 0, len(names)) for _, p := range names { iss, ok, err := resolveIssuedAtInBucket(db, stationID, p, prevIssue) if err != nil || !ok { continue } samples, err := loadForecastSamples(db, stationID, p, iss) if err != nil || len(samples) < 1 { continue } preds = append(preds, samples[0].Rain) used = append(used, p) } return preds, used } func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) { const q = `SELECT SUM(rain_10m_mm_x1000) FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3` var sum sql.NullInt64 err := db.QueryRow(q, stationID, start, end).Scan(&sum) if err != nil { return 0, false, err } if !sum.Valid { return 0, false, nil } return float64(sum.Int64) / 1000.0, true, nil } // ----- Radar mask helpers (trimmed) ----- type maskContext struct { id, alias string lat, lon float64 z, y, x int } func hasMaskCtx(info stationInfo) bool { return info.Lat.Valid && info.Lon.Valid && info.Z.Valid && info.Y.Valid && info.X.Valid } func buildMaskContext(info stationInfo) maskContext { alias := info.Alias if strings.TrimSpace(alias) == "" { alias = info.ID } return maskContext{id: info.ID, alias: alias, lat: info.Lat.Float64, lon: info.Lon.Float64, z: int(info.Z.Int64), y: int(info.Y.Int64), x: int(info.X.Int64)} } type tileRec struct { DT time.Time Width int Height int West float64 South float64 East float64 North float64 ResDeg float64 Data []byte } type maskReport struct { Scanned bool WindOK bool UsedAlias string WindSpeed float64 WindDir float64 Tiles int HitsByHour [3]int } func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) { rep := maskReport{} windTime := issued.Add(-time.Hour) if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil { return rep, err } else if ok { rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id } else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil { return rep, err2 } else if ok2 { rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias } else { return rep, nil } tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued) if err != nil { return rep, err } if len(tiles) == 0 { return rep, nil } rep.Tiles = len(tiles) rep.Scanned = true const circleR = 8000.0 const halfAngle = 30.0 for _, t := range tiles { vals, xs, ys, err := decodeTile(t) if err != nil { continue } for r := 0; r < len(vals); r++ { row := vals[r] lat := ys[r] for c := 0; c < len(row); c++ { v := row[c] if v == nil { continue } dbz := *v if dbz < 40 { continue } lon := xs[c] dist := haversine(ctx.lat, ctx.lon, lat, lon) if dist <= circleR { rep.HitsByHour[0]++ } if rep.WindSpeed > 0 { brg := bearingDeg(ctx.lat, ctx.lon, lat, lon) if angDiff(brg, rep.WindDir) <= halfAngle { if dist <= rep.WindSpeed*3*3600 { rep.HitsByHour[0]++ } if dist <= circleR { rep.HitsByHour[1]++ } if dist <= rep.WindSpeed*2*3600 { rep.HitsByHour[1]++ } if dist <= rep.WindSpeed*3*3600 { rep.HitsByHour[2]++ } } } } } } return rep, nil } func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) { const q = `SELECT wind_speed, wind_direction FROM radar_weather WHERE alias=$1 AND dt <= $2 ORDER BY dt DESC LIMIT 1` var speed, dir sql.NullFloat64 err := db.QueryRow(q, alias, target).Scan(&speed, &dir) if err == sql.ErrNoRows { return 0, 0, false, nil } if err != nil { return 0, 0, false, err } if !speed.Valid || !dir.Valid { return 0, 0, false, nil } return speed.Float64, dir.Float64, true, nil } func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) { const q = `SELECT dt, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5 ORDER BY dt` rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to) if err != nil { return nil, err } defer rows.Close() var list []tileRec for rows.Next() { var t tileRec if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil { return nil, err } list = append(list, t) } return list, nil } func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) { w, h := t.Width, t.Height if w <= 0 || h <= 0 { return nil, nil, nil, fmt.Errorf("非法瓦片尺寸") } if len(t.Data) < w*h*2 { return nil, nil, nil, fmt.Errorf("数据长度不足") } xs := make([]float64, w) for c := 0; c < w; c++ { xs[c] = t.West + (float64(c)+0.5)*t.ResDeg } ys := make([]float64, h) for r := 0; r < h; r++ { ys[r] = t.South + (float64(r)+0.5)*t.ResDeg } vals := make([][]*float64, h) off := 0 for r := 0; r < h; r++ { row := make([]*float64, w) for c := 0; c < w; c++ { v := int16(binary.BigEndian.Uint16(t.Data[off : off+2])) off += 2 if v >= 32766 { row[c] = nil continue } dbz := float64(v) / 10.0 if dbz < 0 { dbz = 0 } else if dbz > 75 { dbz = 75 } value := dbz row[c] = &value } vals[r] = row } return vals, xs, ys, nil } func haversine(lat1, lon1, lat2, lon2 float64) float64 { const R = 6371000.0 dLat := toRad(lat2 - lat1) dLon := toRad(lon2 - lon1) a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2) c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) return R * c } func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 { φ1 := toRad(lat1) φ2 := toRad(lat2) Δλ := toRad(lon2 - lon1) y := math.Sin(Δλ) * math.Cos(φ2) x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ) brg := toDeg(math.Atan2(y, x)) if brg < 0 { brg += 360 } return brg } func angDiff(a, b float64) float64 { d := math.Mod(a-b+540, 360) - 180 if d < 0 { d = -d } return math.Abs(d) } func toRad(d float64) float64 { return d * math.Pi / 180 } func toDeg(r float64) float64 { return r * 180 / math.Pi }