diff --git a/cmd/imdroidmix/main.go b/cmd/imdroidmix/main.go index 6c3aa16..b8b251e 100644 --- a/cmd/imdroidmix/main.go +++ b/cmd/imdroidmix/main.go @@ -9,6 +9,7 @@ import ( "log" "math" "os" + "sort" "strings" "time" @@ -25,6 +26,7 @@ const ( var ( defaultWeights = []float64{0.4, 0.3, 0.3} asiaShanghai *time.Location + debugEnabled bool ) func init() { @@ -107,6 +109,8 @@ func main() { flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过") flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0") flag.Parse() + // 让库内辅助函数也能访问调试开关 + debugEnabled = debugLog if strings.TrimSpace(issuedRange) == "" { log.Fatalln("必须提供 --issued_range") @@ -195,7 +199,7 @@ func main() { for _, job := range jobs { info := job.info maskCtx, hasMaskCtx := buildMaskContext(info) - weights := append([]float64{}, defaultWeights...) + stWeights := newWeightState(job.providers) for idx, issued := range job.issuedTimes { matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers) @@ -212,12 +216,54 @@ func main() { } if debugLog { - log.Printf("权重(使用前): station=%s issued=%s %s=%.2f %s=%.2f %s=%.2f", - info.ID, issued.Format("2006-01-02 15:04:05"), - job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2]) + log.Printf("") + log.Printf("issue %s station=%s", issued.Format("2006-01-02 15:04:05"), info.ID) } - fused := fuseForecastsWith(matrix, effectiveWeights(weights, job.providers, matrix)) + // 学习先行:先得到学习前权重,再学习,再得到本轮使用权重 + effBefore, _ := stWeights.effectiveForMatrix(matrix) + + // 学习对齐:用刚结束的小时 [issued-1h, issued) 与 (issued-2h) 的 +1 预测 + learnStart := issued.Add(-time.Hour) + learnEnd := issued + actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, learnStart, learnEnd) + if err != nil { + log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err) + } else if actualOK { + prevIssueBucket := issued.Add(-2 * time.Hour) + preds := make([]float64, len(matrix)) + valids := make([]bool, len(matrix)) + for i, pm := range matrix { + if v, ok := lead1RainAtIssued(db, info.ID, pm.Provider, prevIssueBucket); ok { + preds[i] = v + valids[i] = true + } + } + if debugLog { + log.Printf("在 [%s,%s) ,该小时本站实况雨量累计为:%.3f,预报源在 issue=%s 发布的 +1h %s-%s 预报值是 %s", + learnStart.Format("15:04"), learnEnd.Format("15:04"), actualRain, + prevIssueBucket.Format("15:04"), learnStart.Format("15:04"), learnEnd.Format("15:04"), + formatWeights(preds, false)) + } + stWeights.learnFromPreds(matrix, preds, valids, actualRain) + } + + eff, _ := stWeights.effectiveForMatrix(matrix) + if debugLog { + // 以 provider=weight 的形式输出,顺序与 sources 一致 + toNamed := func(ws []float64) string { + pairs := make([]string, 0, len(matrix)) + for i, pm := range matrix { + pairs = append(pairs, fmt.Sprintf("%s=%.2f", pm.Provider, ws[i])) + } + return strings.Join(pairs, ", ") + } + log.Printf("学习前权重 %s ,本轮使用权重 %s", toNamed(effBefore), toNamed(eff)) + } + + fused := fuseForecastsWith(matrix, eff) + // 记录掩码前的融合雨量 + var preMaskRain = []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain} rainAllZero := isRainAllZero(fused) maskApplied := false @@ -242,11 +288,17 @@ func main() { } } if debugLog { - log.Printf("掩码报告: station=%s issued=%s scanned=%v windOK=%v used=%s wind=%.2fm/s %.0f° tiles=%d hits=[%d,%d,%d] mask_hours=%v", info.ID, issued.Format("2006-01-02 15:04:05"), rep.Scanned, rep.WindOK, rep.UsedAlias, rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskHours) + log.Printf("mask t=%s wind=%.1fm/s %.0f° tiles=%d hits=[%d,%d,%d] applied=%v", + issued.Format("2006-01-02 15:04:05"), rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskApplied) } } } + if debugLog { + // 展示预报与掩码后的融合雨量 [h+1,h+2,h+3] + rp := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain} + log.Printf("预报 %s , mask=%v , 融合的结果 %s", formatWeights(preMaskRain, false), maskApplied, formatWeights(rp, false)) + } for h := 1; h <= 3; h++ { results = append(results, fusedRecord{ StationID: info.ID, @@ -257,20 +309,7 @@ func main() { }) } - actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, issued, issued.Add(time.Hour)) - if err != nil { - log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err) - } else if actualOK { - before := append([]float64{}, weights...) - weights = adjustWeightsSubset(weights, job.providers, matrix, actualRain) - if debugLog { - log.Printf("权重调整: station=%s issued=%s actual=%.3f | 之前[%s=%.2f,%s=%.2f,%s=%.2f] 之后[%s=%.2f,%s=%.2f,%s=%.2f] 参与源=%d", - info.ID, issued.Format("2006-01-02 15:04:05"), actualRain, - job.providers[0], before[0], job.providers[1], before[1], job.providers[2], before[2], - job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2], - len(matrix)) - } - } + // 学习已在融合前完成 step++ if progressFn != nil { @@ -554,6 +593,19 @@ func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Tim return list, nil } +// lead1RainAtIssued 返回某站点、某provider在给定小时桶(J)内的发行的 +1 小时降雨预测(若存在)。 +func lead1RainAtIssued(db *sql.DB, stationID, provider string, bucketHour time.Time) (float64, bool) { + issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour) + if err != nil || !ok { + return 0, false + } + samples, err := loadForecastSamples(db, stationID, provider, issued) + if err != nil || len(samples) < 1 { + return 0, false + } + return samples[0].Rain, true +} + func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample { result := make([]forecastSample, 3) for h := 0; h < 3; h++ { @@ -646,36 +698,204 @@ func uniform(n int) []float64 { return w } -// effectiveWeights 将基础权重映射到参与融合的 providers(按 matrix 顺序),并在子集上归一化 -func effectiveWeights(base []float64, providersAll []string, matrix []providerMatrix) []float64 { - m := make(map[string]int) - for i, p := range providersAll { - m[p] = i - } - eff := make([]float64, len(matrix)) - sum := 0.0 - for i, pm := range matrix { - if idx, ok := m[pm.Provider]; ok { - eff[i] = base[idx] - sum += eff[i] - } - } - if sum <= 0 { - return uniform(len(matrix)) - } - for i := range eff { - eff[i] /= sum - } - return eff +// weightState: 维护三源与各二源对的基础权重 +type weightState struct { + providersAll []string + baseTriad []float64 // [om, cy, im] 按 providersAll 对齐 + basePairs map[string][]float64 // 保留字段但不再使用成对权重 + lastPresent map[string]bool // 上一次有效参与的provider集合 + lastEff map[string]float64 // 上一次按provider的有效权重(仅在场时更新) } -// adjustWeightsSubset 仅对当前参与的 providers 调整权重,其它分量保持不变 -func adjustWeightsSubset(base []float64, providersAll []string, matrix []providerMatrix, actual float64) []float64 { - preds := extractProviderRainAt(matrix, 0) - if len(preds) == 0 { - return base +func newWeightState(providersAll []string) *weightState { + ws := &weightState{providersAll: providersAll} + ws.baseTriad = append([]float64{}, defaultWeights...) + ws.basePairs = map[string][]float64{} + ws.lastPresent = map[string]bool{} + ws.lastEff = map[string]float64{} + return ws +} + +func (ws *weightState) idx(p string) int { + for i, v := range ws.providersAll { + if v == p { + return i + } } - // 找最优和最差(绝对误差) + return 0 +} + +func pairKey(a, b string) string { + if a < b { + return a + "|" + b + } + return b + "|" + a +} + +// providersList: 用于debug展示参与源 +func providersList(matrix []providerMatrix) string { + names := make([]string, len(matrix)) + for i, pm := range matrix { + names[i] = pm.Provider + } + return fmt.Sprintf("%v", names) +} + +// formatWeights 将权重格式化为字符串,如 "[0.70,0.10,0.20]" +// 若 quantizeTenths=true,则先按 0.1 就近取整再输出两位小数 +func formatWeights(ws []float64, quantizeTenths bool) string { + if len(ws) == 0 { + return "[]" + } + var parts []string + for _, v := range ws { + if quantizeTenths { + v = math.Round(v*10.0) / 10.0 + } + parts = append(parts, fmt.Sprintf("%.2f", v)) + } + return "[" + strings.Join(parts, ",") + "]" +} + +// effectiveForMatrix: 返回与 matrix 顺序对齐的有效权重(两源/三源),在子集上归一化 +func (ws *weightState) effectiveForMatrix(matrix []providerMatrix) ([]float64, bool) { + // 根据当前在场provider集合,对 triad 进行“缺失转移/新增分配”并输出有效权重。 + n := len(matrix) + eff := make([]float64, n) + if n == 0 { + return eff, false + } + + // 建立当前在场集合 + present := make(map[string]bool, n) + for _, pm := range matrix { + present[pm.Provider] = true + } + + // 识别缺失与新增 + var missing []string + var added []string + if len(ws.lastPresent) == 0 { + // 首次:按照 providersAll 判断缺失(需要立即应用缺失转移) + for _, p := range ws.providersAll { + if !present[p] { + missing = append(missing, p) + } + } + } else { + for p := range ws.lastPresent { + if !present[p] { + missing = append(missing, p) + } + } + for p := range present { + if !ws.lastPresent[p] { + added = append(added, p) + } + } + } + if debugEnabled && (len(missing) > 0 || len(added) > 0) { + // 稳定顺序:按 providersAll 顺序输出 present/added/removed + presList := make([]string, 0, len(present)) + for _, p := range ws.providersAll { + if present[p] { + presList = append(presList, p) + } + } + // added/missing 也按 providersAll 顺序 + orderIdx := func(p string) int { return ws.idx(p) } + sort.Slice(added, func(i, j int) bool { return orderIdx(added[i]) < orderIdx(added[j]) }) + sort.Slice(missing, func(i, j int) bool { return orderIdx(missing[i]) < orderIdx(missing[j]) }) + log.Printf("sources %v added=%v removed=%v", presList, added, missing) + } + + // 缺失:将缺失源的权重转移给当前在场中权重最大的源,缺失源置0 + if len(missing) > 0 && len(present) > 0 { + // 找当前在场的最大权重源 + maxP := "" + maxW := -1.0 + for p := range present { + w := ws.baseTriad[ws.idx(p)] + if w > maxW { + maxW, maxP = w, p + } + } + gain := 0.0 + for _, m := range missing { + w := ws.baseTriad[ws.idx(m)] + if w > 0 { + gain += w + ws.baseTriad[ws.idx(m)] = 0 + } + } + if maxP != "" && gain > 0 { + ws.baseTriad[ws.idx(maxP)] += gain + if debugEnabled { + tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]} + log.Printf("removed %v -> %s +%.2f triad %s", + missing, maxP, gain, formatWeights(tri, true)) + } + } + } + + // 新增:从当前最大源挪出0.2给新加入的源(每个新增一次性处理) + for _, a := range added { + // 仅在此前为0时进行分配,避免重复分配 + if ws.baseTriad[ws.idx(a)] <= 0 { + // 找在场的最大源(不包含自身) + maxP := "" + maxW := -1.0 + for p := range present { + if p == a { + continue + } + w := ws.baseTriad[ws.idx(p)] + if w > maxW { + maxW, maxP = w, p + } + } + if maxP != "" && maxW > 0 { + delta := 0.2 + if maxW < delta { + delta = maxW + } + ws.baseTriad[ws.idx(maxP)] -= delta + ws.baseTriad[ws.idx(a)] += delta + if debugEnabled { + tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]} + log.Printf("added %s from %s %.2f triad %s", + a, maxP, delta, formatWeights(tri, true)) + } + } + } + } + + // 更新 lastPresent 为当前集合 + ws.lastPresent = present + + // 组装有效权重,按 matrix 顺序输出。此时 triad 在场分量应合计为1。 + for i, pm := range matrix { + eff[i] = ws.baseTriad[ws.idx(pm.Provider)] + } + // 不在此处打印权重对比,由主循环根据“学习前/后”输出 + // 更新上一次有效权重 + for i, pm := range matrix { + ws.lastEff[pm.Provider] = eff[i] + } + return eff, false +} + +// learn: 只在本集合中做 +0.1/-0.1,学习后在该集合内归一化,并写回对应的基础权重集 +func (ws *weightState) learn(matrix []providerMatrix, actual float64) { + n := len(matrix) + if n <= 1 { + return + } + preds := extractProviderRainAt(matrix, 0) + if len(preds) != n { + return + } + // 找出当前在场集合中的最优与最差 best, worst := 0, 0 for i := range preds { if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) { @@ -685,35 +905,87 @@ func adjustWeightsSubset(base []float64, providersAll []string, matrix []provide worst = i } } - // provider 名到 base 索引 - m := make(map[string]int) - for i, p := range providersAll { - m[p] = i + + alpha := 0.1 + pBest := matrix[best].Provider + pWorst := matrix[worst].Provider + + // 仅在在场集合中调整,第三方可能为0(绝对不可信)且不参与两源学习 + decCap := ws.baseTriad[ws.idx(pWorst)] + delta := alpha + if decCap < delta { + delta = decCap } - out := append([]float64{}, base...) - // 应用 ±0.1 调整 - if bi, ok := m[matrix[best].Provider]; ok { - out[bi] += 0.1 + if delta <= 0 { + return } - if wi, ok := m[matrix[worst].Provider]; ok { - out[wi] -= 0.1 + prev := append([]float64{}, ws.baseTriad...) + ws.baseTriad[ws.idx(pWorst)] -= delta + ws.baseTriad[ws.idx(pBest)] += delta + if debugEnabled { + before := []float64{prev[ws.idx(providerOpenMeteo)], prev[ws.idx(providerCaiyun)], prev[ws.idx(providerImdroid)]} + after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]} + log.Printf("rain=%.3f best=%s worst=%s weights %s -> %s", + actual, pBest, pWorst, formatWeights(before, true), formatWeights(after, true)) } - // 夹紧并归一化 - sum := 0.0 - for i := range out { - if out[i] < 0 { - out[i] = 0 +} + +// learnFromPreds: 使用外部提供的预测值(与 matrix 顺序对齐)进行学习。 +func (ws *weightState) learnFromPreds(matrix []providerMatrix, preds []float64, valids []bool, actual float64) { + n := len(matrix) + if n == 0 || len(preds) != n || len(valids) != n { + return + } + // 仅在有效的、且当前权重>0 的来源中选择最优/最差 + idxs := make([]int, 0, n) + for i := 0; i < n; i++ { + if !valids[i] { + continue } - sum += out[i] + idxs = append(idxs, i) } - if sum == 0 { - // 回到默认 - return append([]float64{}, defaultWeights...) + if len(idxs) < 2 { + return } - for i := range out { - out[i] /= sum + best, worst := idxs[0], idxs[0] + for _, i := range idxs { + if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) { + best = i + } + if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) { + worst = i + } + } + alpha := 0.1 + pBest := matrix[best].Provider + pWorst := matrix[worst].Provider + decCap := ws.baseTriad[ws.idx(pWorst)] + delta := alpha + if decCap < delta { + delta = decCap + } + if delta <= 0 { + return + } + // 日志:绝对误差详情(OM/CY/IM)和最优/最差 + if debugEnabled { + name := func(p string) string { return p } + // 组装当前在场源的误差摘要 + parts := make([]string, 0, len(matrix)) + for i, pm := range matrix { + if !valids[i] || ws.baseTriad[ws.idx(pm.Provider)] <= 0 { + continue + } + parts = append(parts, fmt.Sprintf("%s |%.2f-%.3f|=%.3f", name(pm.Provider), preds[i], actual, math.Abs(preds[i]-actual))) + } + log.Printf("绝对误差:%s → best=%s,worst=%s。", strings.Join(parts, ","), name(pBest), name(pWorst)) + } + ws.baseTriad[ws.idx(pWorst)] -= delta + ws.baseTriad[ws.idx(pBest)] += delta + if debugEnabled { + after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]} + log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatWeights(after, true)) } - return out } // probeProviderAtBucket: 调试辅助,打印该小时桶内每个provider的issued与是否有+1/+2/+3样本