package main import ( "database/sql" "encoding/binary" "errors" "flag" "fmt" "log" "math" "os" "sort" "strings" "time" "weatherstation/internal/database" ) const ( providerOpenMeteo = "open-meteo" providerCaiyun = "caiyun" providerImdroid = "imdroid" outputProvider = "imdroid_mix" ) var ( defaultWeights = []float64{0.4, 0.3, 0.3} asiaShanghai *time.Location debugEnabled bool ) func init() { loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { loc = time.FixedZone("CST", 8*3600) } asiaShanghai = loc log.SetFlags(log.LstdFlags | log.Lmsgprefix) log.SetPrefix("[imdroidmix] ") } type stationInfo struct { ID string Alias string Lat sql.NullFloat64 Lon sql.NullFloat64 Z sql.NullInt64 Y sql.NullInt64 X sql.NullInt64 } type forecastSample struct { Rain float64 Temp float64 Humidity float64 WindSpeed float64 WindGust float64 WindDir float64 Prob float64 Pressure float64 UV float64 } type providerMatrix struct { Provider string Samples []forecastSample } type fusedRecord struct { StationID string IssuedAt time.Time ForecastTime time.Time Sample forecastSample MaskApplied bool } type maskContext struct { id string alias string lat float64 lon float64 z int y int x int } func main() { var ( issuedRange string stationFlag string outSQLPath string showProgress bool applyUpserts bool includeAll bool debugLog bool noMask bool minSources int maskHoursStr string ) flag.StringVar(&issuedRange, "issued_range", "", "时间范围,格式 \"YYYY-MM-DD HH:MM:SS,YYYY-MM-DD HH:MM:SS\"(CST)") flag.StringVar(&stationFlag, "station", "", "目标站点(可选,多个以逗号分隔)") flag.BoolVar(&includeAll, "all_stations", false, "处理全部有经纬度的站点") flag.StringVar(&outSQLPath, "export_sql", "imdroid_mix.sql", "输出SQL文件路径") flag.BoolVar(&showProgress, "progress", false, "展示处理进度") flag.BoolVar(&applyUpserts, "apply", false, "直接写入数据库(默认仅导出SQL)") flag.BoolVar(&debugLog, "debug", false, "输出调试日志") flag.BoolVar(&noMask, "no_mask", false, "关闭雷达掩码(不清零融合雨量)") flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过") flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0") flag.Parse() // 让库内辅助函数也能访问调试开关 debugEnabled = debugLog if strings.TrimSpace(issuedRange) == "" { log.Fatalln("必须提供 --issued_range") } startIssued, endIssued, err := parseRange(issuedRange) if err != nil { log.Fatalf("解析 issued_range 失败: %v", err) } if !endIssued.After(startIssued) { log.Fatalln("issued_range 结束时间必须大于起始时间") } db := database.GetDB() defer database.Close() stations, err := resolveStations(db, stationFlag, includeAll) if err != nil { log.Fatalf("加载站点失败: %v", err) } if len(stations) == 0 { log.Fatalln("没有符合条件的站点") } type stationWork struct { info stationInfo issuedTimes []time.Time providers []string } var jobs []stationWork var totalSteps int for _, st := range stations { pSet, err := resolveProviderSet(db, st.ID, startIssued, endIssued) if err != nil { log.Fatalf("检测站点 %s 预报源失败: %v", st.ID, err) } if debugLog { log.Printf("站点 %s 使用预报源: %v", st.ID, pSet) } issuedTimes, err := fetchIssuedHourBuckets(db, st.ID, startIssued, endIssued, pSet, minSources) if err != nil { log.Fatalf("查询站点 %s 的 issued_at 失败: %v", st.ID, err) } if len(issuedTimes) == 0 { if debugLog { // 打印每个provider的 issued_at 数量辅助定位 for _, p := range pSet { cnt, first, last := debugIssuedSummary(db, st.ID, startIssued, endIssued, p) log.Printf("站点 %s provider=%s issued_at数量=%d 时间范围[%s, %s]", st.ID, p, cnt, first, last) } } continue } jobs = append(jobs, stationWork{info: st, issuedTimes: issuedTimes, providers: pSet}) totalSteps += len(issuedTimes) } if len(jobs) == 0 { log.Fatalln("指定时间范围内无可融合的预报数据") } var ( results []fusedRecord step int progressFn func(current int, total int, ctx string) ) if showProgress { progressFn = func(current, total int, ctx string) { fmt.Printf("\r处理进度 %d/%d (%s)...", current, total, ctx) } defer fmt.Println() } // 解析 mask_hours maskHours := [3]bool{true, true, true} if s := strings.TrimSpace(maskHoursStr); s != "" { parts := strings.Split(s, ",") for i := 0; i < len(parts) && i < 3; i++ { v := strings.TrimSpace(parts[i]) maskHours[i] = (v == "1" || strings.EqualFold(v, "true")) } } for _, job := range jobs { info := job.info maskCtx, hasMaskCtx := buildMaskContext(info) stWeights := newWeightState(job.providers) for idx, issued := range job.issuedTimes { matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers) if err != nil { log.Printf("站点 %s issued=%s 载入预报失败: %v", info.ID, issued.Format(time.RFC3339), err) continue } if len(matrix) < minSources { log.Printf("站点 %s issued=%s 可用源=%d < min_sources=%d,跳过", info.ID, issued.Format(time.RFC3339), len(matrix), minSources) if debugLog { probeProviderAtBucket(db, info.ID, job.providers, issued) } continue } if debugLog { log.Printf("") log.Printf("issue %s station=%s", issued.Format("2006-01-02 15:04:05"), info.ID) } // 学习先行:先得到学习前权重,再学习,再得到本轮使用权重 effBefore, _ := stWeights.effectiveForMatrix(matrix) // 学习对齐:用刚结束的小时 [issued-1h, issued) 与 (issued-2h) 的 +1 预测 learnStart := issued.Add(-time.Hour) learnEnd := issued actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, learnStart, learnEnd) if err != nil { log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err) } else if actualOK { prevIssueBucket := issued.Add(-2 * time.Hour) preds := make([]float64, len(matrix)) valids := make([]bool, len(matrix)) for i, pm := range matrix { if v, ok := lead1RainAtIssued(db, info.ID, pm.Provider, prevIssueBucket); ok { preds[i] = v valids[i] = true } } if debugLog { log.Printf("在 [%s,%s) ,该小时本站实况雨量累计为:%.3f,预报源在 issue=%s 发布的 +1h %s-%s 预报值是 %s", learnStart.Format("15:04"), learnEnd.Format("15:04"), actualRain, prevIssueBucket.Format("15:04"), learnStart.Format("15:04"), learnEnd.Format("15:04"), formatWeights(preds, false)) } stWeights.learnFromPreds(matrix, preds, valids, actualRain) } eff, _ := stWeights.effectiveForMatrix(matrix) if debugLog { // 以 provider=weight 的形式输出,顺序与 sources 一致 toNamed := func(ws []float64) string { pairs := make([]string, 0, len(matrix)) for i, pm := range matrix { pairs = append(pairs, fmt.Sprintf("%s=%.2f", pm.Provider, ws[i])) } return strings.Join(pairs, ", ") } log.Printf("学习前权重 %s ,本轮使用权重 %s", toNamed(effBefore), toNamed(eff)) } fused := fuseForecastsWith(matrix, eff) // 记录掩码前的融合雨量 var preMaskRain = []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain} rainAllZero := isRainAllZero(fused) maskApplied := false if !rainAllZero && hasMaskCtx && !noMask { rep, err := checkRadarMask(db, maskCtx, issued) if err != nil { log.Printf("站点 %s issued=%s 掩码计算出错: %v", info.ID, issued.Format(time.RFC3339), err) } else { if rep.Scanned { // 逐小时按 mask_hours 应用 // 解析 mask_hours 上移至批次开始 // 在此应用逐小时清零 // 重用 maskHours 变量 for hidx := 0; hidx < 3; hidx++ { if !maskHours[hidx] { continue } if rep.HitsByHour[hidx] == 0 { fused[hidx].Rain = 0 maskApplied = true } } } if debugLog { log.Printf("mask t=%s wind=%.1fm/s %.0f° tiles=%d hits=[%d,%d,%d] applied=%v", issued.Format("2006-01-02 15:04:05"), rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskApplied) } } } if debugLog { // 展示预报与掩码后的融合雨量 [h+1,h+2,h+3] rp := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain} log.Printf("预报 %s , mask=%v , 融合的结果 %s", formatWeights(preMaskRain, false), maskApplied, formatWeights(rp, false)) } for h := 1; h <= 3; h++ { results = append(results, fusedRecord{ StationID: info.ID, IssuedAt: issued, // 输出按“小时桶”归一的发布时间 ForecastTime: issued.Add(time.Duration(h) * time.Hour), Sample: fused[h-1], MaskApplied: maskApplied, }) } // 学习已在融合前完成 step++ if progressFn != nil { progressFn(step, totalSteps, fmt.Sprintf("%s #%d", info.ID, idx+1)) } } } if len(results) == 0 { log.Println("未生成任何融合结果") return } sqlLines := buildSQL(results) if err := os.WriteFile(outSQLPath, []byte(strings.Join(sqlLines, "\n")+"\n"), 0o644); err != nil { log.Fatalf("写入SQL文件失败: %v", err) } log.Printf("已写出SQL到 %s(共 %d 条记录)", outSQLPath, len(results)) if applyUpserts { if err := applySQL(db, results); err != nil { log.Fatalf("写入数据库失败: %v", err) } log.Printf("已写入数据库 forecast_hourly(provider=%s)", outputProvider) } } func parseRange(r string) (time.Time, time.Time, error) { parts := strings.Split(r, ",") if len(parts) != 2 { return time.Time{}, time.Time{}, errors.New("格式应为 start,end") } start, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[0]), asiaShanghai) if err != nil { return time.Time{}, time.Time{}, err } end, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[1]), asiaShanghai) if err != nil { return time.Time{}, time.Time{}, err } return start, end, nil } func resolveStations(db *sql.DB, stationFlag string, includeAll bool) ([]stationInfo, error) { if includeAll { const q = ` SELECT station_id, COALESCE(NULLIF(station_alias,''), station_id) AS alias, latitude, longitude, z, y, x FROM stations WHERE latitude IS NOT NULL AND longitude IS NOT NULL AND device_type='WH65LP' ORDER BY station_id` rows, err := db.Query(q) if err != nil { return nil, err } defer rows.Close() var out []stationInfo for rows.Next() { var s stationInfo if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil { return nil, err } out = append(out, s) } return out, nil } list := fieldsFromFlag(stationFlag) if len(list) == 0 { return nil, nil } var out []stationInfo const q = ` SELECT station_id, COALESCE(NULLIF(station_alias,''), station_id) AS alias, latitude, longitude, z, y, x FROM stations WHERE station_id=$1` for _, id := range list { var s stationInfo err := db.QueryRow(q, id).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X) if err != nil { if err == sql.ErrNoRows { log.Printf("警告:站点 %s 不存在,跳过", id) continue } return nil, err } out = append(out, s) } return out, nil } func fieldsFromFlag(v string) []string { if strings.TrimSpace(v) == "" { return nil } parts := strings.Split(v, ",") out := make([]string, 0, len(parts)) for _, p := range parts { p = strings.TrimSpace(p) if p != "" { out = append(out, p) } } return out } func resolveProviderSet(_ *sql.DB, _ string, _ time.Time, _ time.Time) ([]string, error) { // 固定三源,不做别名回退 return []string{providerOpenMeteo, providerCaiyun, providerImdroid}, nil } func hasProviderData(db *sql.DB, stationID, provider string, from, to time.Time) (bool, error) { var cnt int err := db.QueryRow(` SELECT COUNT(*) FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`, stationID, provider, from, to).Scan(&cnt) if err != nil { return false, err } return cnt > 0, nil } func fetchIssuedHourBuckets(db *sql.DB, stationID string, from, to time.Time, providers []string, minSources int) ([]time.Time, error) { if len(providers) != 3 { return nil, fmt.Errorf("provider 列表长度异常: %v", providers) } if minSources < 1 { minSources = 1 } if minSources > 3 { minSources = 3 } const q = ` WITH base AS ( SELECT date_trunc('hour', issued_at) AS bucket, provider FROM forecast_hourly WHERE station_id=$1 AND provider IN ($2, $3, $4) AND issued_at BETWEEN $5 AND $6 GROUP BY 1,2 ) SELECT bucket FROM base GROUP BY bucket HAVING COUNT(DISTINCT provider) >= $7 ORDER BY bucket` rows, err := db.Query(q, stationID, providers[0], providers[1], providers[2], from, to, minSources) if err != nil { return nil, err } defer rows.Close() var buckets []time.Time for rows.Next() { var t time.Time if err := rows.Scan(&t); err != nil { return nil, err } buckets = append(buckets, t.In(asiaShanghai)) } return buckets, nil } // debugIssuedSummary 仅用于 --debug:统计provider在范围内的issued_at计数及首尾时间 func debugIssuedSummary(db *sql.DB, stationID string, from, to time.Time, provider string) (count int, first string, last string) { const q = ` SELECT COUNT(*), MIN(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS first, MAX(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS last FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4` var f, l sql.NullString if err := db.QueryRow(q, stationID, provider, from, to).Scan(&count, &f, &l); err != nil { return 0, "", "" } if f.Valid { first = f.String } if l.Valid { last = l.String } return } func loadProviderMatrix(db *sql.DB, stationID string, bucketHour time.Time, providers []string) ([]providerMatrix, error) { out := make([]providerMatrix, 0, len(providers)) for _, provider := range providers { issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour) if err != nil { return nil, err } if !ok { // 该源缺席,跳过 continue } samples, err := loadForecastSamples(db, stationID, provider, issued) if err != nil { return nil, err } if len(samples) != 3 { // 样本不足,视为该源缺席 continue } out = append(out, providerMatrix{Provider: provider, Samples: samples}) } return out, nil } // 在某个小时桶内,选取该 provider 的最新 issued_at([bucket, bucket+1h)) func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) { const q = ` SELECT issued_at FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour' ORDER BY issued_at DESC LIMIT 1` var t time.Time err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t) if err == sql.ErrNoRows { return time.Time{}, false, nil } if err != nil { return time.Time{}, false, err } return t, true, nil } func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) { const q = ` SELECT forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at=$3 ORDER BY forecast_time ASC` rows, err := db.Query(q, stationID, provider, issued) if err != nil { return nil, err } defer rows.Close() var list []forecastSample for rows.Next() { var ( ft time.Time rain, temp, humidity, ws, gust, wdir, prob sql.NullInt64 pressure, uv sql.NullInt64 ) if err := rows.Scan(&ft, &rain, &temp, &humidity, &ws, &gust, &wdir, &prob, &pressure, &uv); err != nil { return nil, err } // 使用“向上取整”的方式计算前视小时(与查询端 lead_hours=CEIL(...) 逻辑对齐) dh := ft.Sub(issued).Hours() hours := int(math.Ceil(dh - 1e-9)) if hours < 1 || hours > 3 { continue } s := forecastSample{ Rain: float64(rain.Int64) / 1000.0, Temp: float64(temp.Int64) / 100.0, Humidity: float64(humidity.Int64), WindSpeed: float64(ws.Int64) / 1000.0, WindGust: float64(gust.Int64) / 1000.0, WindDir: float64(wdir.Int64), Prob: float64(prob.Int64), Pressure: float64(pressure.Int64) / 100.0, UV: float64(uv.Int64), } list = append(list, s) } return list, nil } // lead1RainAtIssued 返回某站点、某provider在给定小时桶(J)内的发行的 +1 小时降雨预测(若存在)。 func lead1RainAtIssued(db *sql.DB, stationID, provider string, bucketHour time.Time) (float64, bool) { issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour) if err != nil || !ok { return 0, false } samples, err := loadForecastSamples(db, stationID, provider, issued) if err != nil || len(samples) < 1 { return 0, false } return samples[0].Rain, true } func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample { result := make([]forecastSample, 3) for h := 0; h < 3; h++ { var windX, windY float64 for pIdx, pm := range matrix { if len(pm.Samples) <= h { continue } s := pm.Samples[h] w := weights[pIdx] result[h].Rain += s.Rain * w result[h].Temp += s.Temp * w result[h].Humidity += s.Humidity * w result[h].WindSpeed += s.WindSpeed * w result[h].WindGust += s.WindGust * w result[h].Prob += s.Prob * w result[h].Pressure += s.Pressure * w result[h].UV += s.UV * w rad := s.WindDir * math.Pi / 180 windX += math.Cos(rad) * w windY += math.Sin(rad) * w } if windX == 0 && windY == 0 { result[h].WindDir = 0 } else { dir := math.Atan2(windY, windX) * 180 / math.Pi if dir < 0 { dir += 360 } result[h].WindDir = dir } result[h].Humidity = math.Round(result[h].Humidity) result[h].Prob = math.Round(result[h].Prob) result[h].UV = math.Round(result[h].UV) } return result } // fuseForecastsWith 使用与 matrix 对齐的有效权重进行融合 func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample { if len(eff) != len(matrix) { return fuseForecasts(matrix, uniform(len(matrix))) } result := make([]forecastSample, 3) for h := 0; h < 3; h++ { var windX, windY float64 for pIdx, pm := range matrix { if len(pm.Samples) <= h { continue } s := pm.Samples[h] w := eff[pIdx] result[h].Rain += s.Rain * w result[h].Temp += s.Temp * w result[h].Humidity += s.Humidity * w result[h].WindSpeed += s.WindSpeed * w result[h].WindGust += s.WindGust * w result[h].Prob += s.Prob * w result[h].Pressure += s.Pressure * w result[h].UV += s.UV * w rad := s.WindDir * math.Pi / 180 windX += math.Cos(rad) * w windY += math.Sin(rad) * w } if windX == 0 && windY == 0 { result[h].WindDir = 0 } else { dir := math.Atan2(windY, windX) * 180 / math.Pi if dir < 0 { dir += 360 } result[h].WindDir = dir } result[h].Humidity = math.Round(result[h].Humidity) result[h].Prob = math.Round(result[h].Prob) result[h].UV = math.Round(result[h].UV) } return result } func uniform(n int) []float64 { if n <= 0 { return nil } w := make([]float64, n) v := 1.0 / float64(n) for i := range w { w[i] = v } return w } // weightState: 维护三源与各二源对的基础权重 type weightState struct { providersAll []string baseTriad []float64 // [om, cy, im] 按 providersAll 对齐 basePairs map[string][]float64 // 保留字段但不再使用成对权重 lastPresent map[string]bool // 上一次有效参与的provider集合 lastEff map[string]float64 // 上一次按provider的有效权重(仅在场时更新) } func newWeightState(providersAll []string) *weightState { ws := &weightState{providersAll: providersAll} ws.baseTriad = append([]float64{}, defaultWeights...) ws.basePairs = map[string][]float64{} ws.lastPresent = map[string]bool{} ws.lastEff = map[string]float64{} return ws } func (ws *weightState) idx(p string) int { for i, v := range ws.providersAll { if v == p { return i } } return 0 } func pairKey(a, b string) string { if a < b { return a + "|" + b } return b + "|" + a } // providersList: 用于debug展示参与源 func providersList(matrix []providerMatrix) string { names := make([]string, len(matrix)) for i, pm := range matrix { names[i] = pm.Provider } return fmt.Sprintf("%v", names) } // formatWeights 将权重格式化为字符串,如 "[0.70,0.10,0.20]" // 若 quantizeTenths=true,则先按 0.1 就近取整再输出两位小数 func formatWeights(ws []float64, quantizeTenths bool) string { if len(ws) == 0 { return "[]" } var parts []string for _, v := range ws { if quantizeTenths { v = math.Round(v*10.0) / 10.0 } parts = append(parts, fmt.Sprintf("%.2f", v)) } return "[" + strings.Join(parts, ",") + "]" } // effectiveForMatrix: 返回与 matrix 顺序对齐的有效权重(两源/三源),在子集上归一化 func (ws *weightState) effectiveForMatrix(matrix []providerMatrix) ([]float64, bool) { // 根据当前在场provider集合,对 triad 进行“缺失转移/新增分配”并输出有效权重。 n := len(matrix) eff := make([]float64, n) if n == 0 { return eff, false } // 建立当前在场集合 present := make(map[string]bool, n) for _, pm := range matrix { present[pm.Provider] = true } // 识别缺失与新增 var missing []string var added []string if len(ws.lastPresent) == 0 { // 首次:按照 providersAll 判断缺失(需要立即应用缺失转移) for _, p := range ws.providersAll { if !present[p] { missing = append(missing, p) } } } else { for p := range ws.lastPresent { if !present[p] { missing = append(missing, p) } } for p := range present { if !ws.lastPresent[p] { added = append(added, p) } } } if debugEnabled && (len(missing) > 0 || len(added) > 0) { // 稳定顺序:按 providersAll 顺序输出 present/added/removed presList := make([]string, 0, len(present)) for _, p := range ws.providersAll { if present[p] { presList = append(presList, p) } } // added/missing 也按 providersAll 顺序 orderIdx := func(p string) int { return ws.idx(p) } sort.Slice(added, func(i, j int) bool { return orderIdx(added[i]) < orderIdx(added[j]) }) sort.Slice(missing, func(i, j int) bool { return orderIdx(missing[i]) < orderIdx(missing[j]) }) log.Printf("sources %v added=%v removed=%v", presList, added, missing) } // 缺失:将缺失源的权重转移给当前在场中权重最大的源,缺失源置0 if len(missing) > 0 && len(present) > 0 { // 找当前在场的最大权重源 maxP := "" maxW := -1.0 for p := range present { w := ws.baseTriad[ws.idx(p)] if w > maxW { maxW, maxP = w, p } } gain := 0.0 for _, m := range missing { w := ws.baseTriad[ws.idx(m)] if w > 0 { gain += w ws.baseTriad[ws.idx(m)] = 0 } } if maxP != "" && gain > 0 { ws.baseTriad[ws.idx(maxP)] += gain if debugEnabled { tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]} log.Printf("removed %v -> %s +%.2f triad %s", missing, maxP, gain, formatWeights(tri, true)) } } } // 新增:从当前最大源挪出0.2给新加入的源(每个新增一次性处理) for _, a := range added { // 仅在此前为0时进行分配,避免重复分配 if ws.baseTriad[ws.idx(a)] <= 0 { // 找在场的最大源(不包含自身) maxP := "" maxW := -1.0 for p := range present { if p == a { continue } w := ws.baseTriad[ws.idx(p)] if w > maxW { maxW, maxP = w, p } } if maxP != "" && maxW > 0 { delta := 0.2 if maxW < delta { delta = maxW } ws.baseTriad[ws.idx(maxP)] -= delta ws.baseTriad[ws.idx(a)] += delta if debugEnabled { tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]} log.Printf("added %s from %s %.2f triad %s", a, maxP, delta, formatWeights(tri, true)) } } } } // 更新 lastPresent 为当前集合 ws.lastPresent = present // 组装有效权重,按 matrix 顺序输出。此时 triad 在场分量应合计为1。 for i, pm := range matrix { eff[i] = ws.baseTriad[ws.idx(pm.Provider)] } // 不在此处打印权重对比,由主循环根据“学习前/后”输出 // 更新上一次有效权重 for i, pm := range matrix { ws.lastEff[pm.Provider] = eff[i] } return eff, false } // learn: 只在本集合中做 +0.1/-0.1,学习后在该集合内归一化,并写回对应的基础权重集 func (ws *weightState) learn(matrix []providerMatrix, actual float64) { n := len(matrix) if n <= 1 { return } preds := extractProviderRainAt(matrix, 0) if len(preds) != n { return } // 找出当前在场集合中的最优与最差 best, worst := 0, 0 for i := range preds { if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) { best = i } if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) { worst = i } } alpha := 0.1 pBest := matrix[best].Provider pWorst := matrix[worst].Provider // 仅在在场集合中调整,第三方可能为0(绝对不可信)且不参与两源学习 decCap := ws.baseTriad[ws.idx(pWorst)] delta := alpha if decCap < delta { delta = decCap } if delta <= 0 { return } prev := append([]float64{}, ws.baseTriad...) ws.baseTriad[ws.idx(pWorst)] -= delta ws.baseTriad[ws.idx(pBest)] += delta if debugEnabled { before := []float64{prev[ws.idx(providerOpenMeteo)], prev[ws.idx(providerCaiyun)], prev[ws.idx(providerImdroid)]} after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]} log.Printf("rain=%.3f best=%s worst=%s weights %s -> %s", actual, pBest, pWorst, formatWeights(before, true), formatWeights(after, true)) } } // learnFromPreds: 使用外部提供的预测值(与 matrix 顺序对齐)进行学习。 func (ws *weightState) learnFromPreds(matrix []providerMatrix, preds []float64, valids []bool, actual float64) { n := len(matrix) if n == 0 || len(preds) != n || len(valids) != n { return } // 仅在有效的、且当前权重>0 的来源中选择最优/最差 idxs := make([]int, 0, n) for i := 0; i < n; i++ { if !valids[i] { continue } idxs = append(idxs, i) } if len(idxs) < 2 { return } best, worst := idxs[0], idxs[0] for _, i := range idxs { if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) { best = i } if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) { worst = i } } alpha := 0.1 pBest := matrix[best].Provider pWorst := matrix[worst].Provider decCap := ws.baseTriad[ws.idx(pWorst)] delta := alpha if decCap < delta { delta = decCap } if delta <= 0 { return } // 日志:绝对误差详情(OM/CY/IM)和最优/最差 if debugEnabled { name := func(p string) string { return p } // 组装当前在场源的误差摘要 parts := make([]string, 0, len(matrix)) for i, pm := range matrix { if !valids[i] || ws.baseTriad[ws.idx(pm.Provider)] <= 0 { continue } parts = append(parts, fmt.Sprintf("%s |%.2f-%.3f|=%.3f", name(pm.Provider), preds[i], actual, math.Abs(preds[i]-actual))) } log.Printf("绝对误差:%s → best=%s,worst=%s。", strings.Join(parts, ","), name(pBest), name(pWorst)) } ws.baseTriad[ws.idx(pWorst)] -= delta ws.baseTriad[ws.idx(pBest)] += delta if debugEnabled { after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]} log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatWeights(after, true)) } } // probeProviderAtBucket: 调试辅助,打印该小时桶内每个provider的issued与是否有+1/+2/+3样本 func probeProviderAtBucket(db *sql.DB, stationID string, providers []string, bucket time.Time) { for _, p := range providers { issued, ok, err := resolveIssuedAtInBucket(db, stationID, p, bucket) if err != nil { log.Printf(" provider=%s 探测错误: %v", p, err) continue } if !ok { log.Printf(" provider=%s: 小时桶内无 issued", p) continue } leads := presentLeads(db, stationID, p, issued) log.Printf(" provider=%s: issued=%s leads=%v (count=%d)", p, issued.In(asiaShanghai).Format("2006-01-02 15:04:05"), leads, len(leads)) } } // presentLeads 返回该 issued 的预测中,存在的 lead 小时集合(1..3) func presentLeads(db *sql.DB, stationID, provider string, issued time.Time) []int { rows, err := db.Query(`SELECT forecast_time FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at=$3`, stationID, provider, issued) if err != nil { return nil } defer rows.Close() var leads []int for rows.Next() { var ft time.Time if err := rows.Scan(&ft); err != nil { continue } dh := ft.Sub(issued).Hours() h := int(math.Ceil(dh - 1e-9)) if h >= 1 && h <= 3 { leads = append(leads, h) } } return leads } func isRainAllZero(samples []forecastSample) bool { for _, s := range samples { if math.Abs(s.Rain) > 1e-6 { return false } } return true } func extractProviderRainAt(matrix []providerMatrix, hourIdx int) []float64 { out := make([]float64, 0, len(matrix)) for _, pm := range matrix { if len(pm.Samples) > hourIdx { out = append(out, pm.Samples[hourIdx].Rain) } else { out = append(out, 0) } } return out } func adjustWeights(current []float64, preds []float64, actual float64) []float64 { if len(current) != len(preds) || len(current) != 3 { return current } diffs := make([]float64, len(preds)) for i := range preds { diffs[i] = math.Abs(preds[i] - actual) } minIdx, maxIdx := 0, 0 for i, d := range diffs { if d < diffs[minIdx] { minIdx = i } if d > diffs[maxIdx] { maxIdx = i } } if minIdx != maxIdx { current[minIdx] += 0.1 current[maxIdx] -= 0.1 } var sum float64 for i := range current { if current[i] < 0 { current[i] = 0 } sum += current[i] } if sum == 0 { return append([]float64{}, defaultWeights...) } for i := range current { current[i] /= sum } return current } func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) { const q = ` SELECT SUM(rain_10m_mm_x1000) FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3` var sum sql.NullInt64 err := db.QueryRow(q, stationID, start, end).Scan(&sum) if err != nil { return 0, false, err } if !sum.Valid { return 0, false, nil } return float64(sum.Int64) / 1000.0, true, nil } type tileRec struct { DT time.Time Width int Height int West float64 South float64 East float64 North float64 ResDeg float64 Data []byte } type maskReport struct { Scanned bool WindOK bool UsedAlias string WindSpeed float64 WindDir float64 Tiles int HitsByHour [3]int } func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) { rep := maskReport{} windTime := issued.Add(-time.Hour) // 优先用 station_id,其次别名 if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil { return rep, err } else if ok { rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id } else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil { return rep, err2 } else if ok2 { rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias } else { // 没有风场:不扫描,不应用掩码 return rep, nil } tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued) if err != nil { return rep, err } if len(tiles) == 0 { // 无瓦片:不扫描,不应用掩码 return rep, nil } rep.Tiles = len(tiles) rep.Scanned = true const circleR = 8000.0 const halfAngle = 30.0 for _, t := range tiles { vals, xs, ys, err := decodeTile(t) if err != nil { continue } for r := 0; r < len(vals); r++ { row := vals[r] lat := ys[r] for c := 0; c < len(row); c++ { v := row[c] if v == nil { continue } dbz := *v if dbz < 40 { continue } lon := xs[c] dist := haversine(ctx.lat, ctx.lon, lat, lon) // +1h:圆 或 扇形(3h) if dist <= circleR { rep.HitsByHour[0]++ } if rep.WindSpeed > 0 { brg := bearingDeg(ctx.lat, ctx.lon, lat, lon) if angDiff(brg, rep.WindDir) <= halfAngle { if dist <= rep.WindSpeed*3*3600 { rep.HitsByHour[0]++ } // +2h:圆 或 扇形(2h) if dist <= circleR { rep.HitsByHour[1]++ } if dist <= rep.WindSpeed*2*3600 { rep.HitsByHour[1]++ } // +3h:扇形(3h) if dist <= rep.WindSpeed*3*3600 { rep.HitsByHour[2]++ } } } } } } return rep, nil } func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) { const q = ` SELECT wind_speed, wind_direction FROM radar_weather WHERE alias=$1 AND dt <= $2 ORDER BY dt DESC LIMIT 1` var speed, dir sql.NullFloat64 err := db.QueryRow(q, alias, target).Scan(&speed, &dir) if err == sql.ErrNoRows { return 0, 0, false, nil } if err != nil { return 0, 0, false, err } if !speed.Valid || !dir.Valid { return 0, 0, false, nil } return speed.Float64, dir.Float64, true, nil } func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) { const q = ` SELECT dt, width, height, west, south, east, north, res_deg, data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5 ORDER BY dt` rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to) if err != nil { return nil, err } defer rows.Close() var list []tileRec for rows.Next() { var t tileRec if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil { return nil, err } list = append(list, t) } return list, nil } func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) { w, h := t.Width, t.Height if w <= 0 || h <= 0 { return nil, nil, nil, fmt.Errorf("非法瓦片尺寸") } if len(t.Data) < w*h*2 { return nil, nil, nil, fmt.Errorf("数据长度不足") } xs := make([]float64, w) for c := 0; c < w; c++ { xs[c] = t.West + (float64(c)+0.5)*t.ResDeg } ys := make([]float64, h) for r := 0; r < h; r++ { ys[r] = t.South + (float64(r)+0.5)*t.ResDeg } vals := make([][]*float64, h) off := 0 for r := 0; r < h; r++ { row := make([]*float64, w) for c := 0; c < w; c++ { v := int16(binary.BigEndian.Uint16(t.Data[off : off+2])) off += 2 if v >= 32766 { row[c] = nil continue } dbz := float64(v) / 10.0 if dbz < 0 { dbz = 0 } else if dbz > 75 { dbz = 75 } value := dbz row[c] = &value } vals[r] = row } return vals, xs, ys, nil } func haversine(lat1, lon1, lat2, lon2 float64) float64 { const R = 6371000.0 dLat := toRad(lat2 - lat1) dLon := toRad(lon2 - lon1) a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2) c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) return R * c } func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 { φ1 := toRad(lat1) φ2 := toRad(lat2) Δλ := toRad(lon2 - lon1) y := math.Sin(Δλ) * math.Cos(φ2) x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ) brg := toDeg(math.Atan2(y, x)) if brg < 0 { brg += 360 } return brg } func angDiff(a, b float64) float64 { d := math.Mod(a-b+540, 360) - 180 if d < 0 { d = -d } return math.Abs(d) } func toRad(d float64) float64 { return d * math.Pi / 180 } func toDeg(r float64) float64 { return r * 180 / math.Pi } func buildMaskContext(info stationInfo) (maskContext, bool) { if !info.Lat.Valid || !info.Lon.Valid { return maskContext{}, false } if !info.Z.Valid || !info.Y.Valid || !info.X.Valid { return maskContext{}, false } alias := info.Alias if strings.TrimSpace(alias) == "" { alias = info.ID } return maskContext{ id: info.ID, alias: alias, lat: info.Lat.Float64, lon: info.Lon.Float64, z: int(info.Z.Int64), y: int(info.Y.Int64), x: int(info.X.Int64), }, true } func buildSQL(records []fusedRecord) []string { lines := make([]string, 0, len(records)*2) header := "-- SQL 输出 (默认未执行)" lines = append(lines, header) for _, rec := range records { sql := fmt.Sprintf( "INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index)\nVALUES ('%s', '%s', '%s'::timestamptz, '%s'::timestamptz, %d, %d, %d, %d, %d, %d, %d, %d, %d)\nON CONFLICT (station_id, provider, issued_at, forecast_time)\nDO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000, temp_c_x100=EXCLUDED.temp_c_x100, humidity_pct=EXCLUDED.humidity_pct, wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000, wind_dir_deg=EXCLUDED.wind_dir_deg, precip_prob_pct=EXCLUDED.precip_prob_pct, pressure_hpa_x100=EXCLUDED.pressure_hpa_x100, uv_index=EXCLUDED.uv_index;", rec.StationID, outputProvider, rec.IssuedAt.In(asiaShanghai).Format(time.RFC3339), rec.ForecastTime.In(asiaShanghai).Format(time.RFC3339), int(math.Round(rec.Sample.Rain*1000)), int(math.Round(rec.Sample.Temp*100)), int(math.Round(rec.Sample.Humidity)), int(math.Round(rec.Sample.WindSpeed*1000)), int(math.Round(rec.Sample.WindGust*1000)), int(math.Round(rec.Sample.WindDir)), int(math.Round(rec.Sample.Prob)), int(math.Round(rec.Sample.Pressure*100)), int(math.Round(rec.Sample.UV)), ) lines = append(lines, sql, "") } return lines } func applySQL(db *sql.DB, records []fusedRecord) error { tx, err := db.Begin() if err != nil { return err } stmt, err := tx.Prepare(` INSERT INTO forecast_hourly ( station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET rain_mm_x1000 = EXCLUDED.rain_mm_x1000, temp_c_x100 = EXCLUDED.temp_c_x100, humidity_pct = EXCLUDED.humidity_pct, wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000, wind_dir_deg = EXCLUDED.wind_dir_deg, precip_prob_pct = EXCLUDED.precip_prob_pct, pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100, uv_index = EXCLUDED.uv_index`) if err != nil { tx.Rollback() return err } defer stmt.Close() for _, rec := range records { if _, err := stmt.Exec( rec.StationID, outputProvider, rec.IssuedAt, rec.ForecastTime, int(math.Round(rec.Sample.Rain*1000)), int(math.Round(rec.Sample.Temp*100)), int(math.Round(rec.Sample.Humidity)), int(math.Round(rec.Sample.WindSpeed*1000)), int(math.Round(rec.Sample.WindGust*1000)), int(math.Round(rec.Sample.WindDir)), int(math.Round(rec.Sample.Prob)), int(math.Round(rec.Sample.Pressure*100)), int(math.Round(rec.Sample.UV)), ); err != nil { tx.Rollback() return err } } return tx.Commit() }