feat: 新增一个雨量融合的工具

This commit is contained in:
yarnom 2025-10-14 10:16:25 +08:00
parent cbeb623f20
commit 8797af23b5

View File

@ -9,6 +9,7 @@ import (
"log" "log"
"math" "math"
"os" "os"
"sort"
"strings" "strings"
"time" "time"
@ -25,6 +26,7 @@ const (
var ( var (
defaultWeights = []float64{0.4, 0.3, 0.3} defaultWeights = []float64{0.4, 0.3, 0.3}
asiaShanghai *time.Location asiaShanghai *time.Location
debugEnabled bool
) )
func init() { func init() {
@ -107,6 +109,8 @@ func main() {
flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过") flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过")
flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0") flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0")
flag.Parse() flag.Parse()
// 让库内辅助函数也能访问调试开关
debugEnabled = debugLog
if strings.TrimSpace(issuedRange) == "" { if strings.TrimSpace(issuedRange) == "" {
log.Fatalln("必须提供 --issued_range") log.Fatalln("必须提供 --issued_range")
@ -195,7 +199,7 @@ func main() {
for _, job := range jobs { for _, job := range jobs {
info := job.info info := job.info
maskCtx, hasMaskCtx := buildMaskContext(info) maskCtx, hasMaskCtx := buildMaskContext(info)
weights := append([]float64{}, defaultWeights...) stWeights := newWeightState(job.providers)
for idx, issued := range job.issuedTimes { for idx, issued := range job.issuedTimes {
matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers) matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers)
@ -212,12 +216,54 @@ func main() {
} }
if debugLog { if debugLog {
log.Printf("权重(使用前): station=%s issued=%s %s=%.2f %s=%.2f %s=%.2f", log.Printf("")
info.ID, issued.Format("2006-01-02 15:04:05"), log.Printf("issue %s station=%s", issued.Format("2006-01-02 15:04:05"), info.ID)
job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2])
} }
fused := fuseForecastsWith(matrix, effectiveWeights(weights, job.providers, matrix)) // 学习先行:先得到学习前权重,再学习,再得到本轮使用权重
effBefore, _ := stWeights.effectiveForMatrix(matrix)
// 学习对齐:用刚结束的小时 [issued-1h, issued) 与 (issued-2h) 的 +1 预测
learnStart := issued.Add(-time.Hour)
learnEnd := issued
actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, learnStart, learnEnd)
if err != nil {
log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err)
} else if actualOK {
prevIssueBucket := issued.Add(-2 * time.Hour)
preds := make([]float64, len(matrix))
valids := make([]bool, len(matrix))
for i, pm := range matrix {
if v, ok := lead1RainAtIssued(db, info.ID, pm.Provider, prevIssueBucket); ok {
preds[i] = v
valids[i] = true
}
}
if debugLog {
log.Printf("在 [%s,%s) ,该小时本站实况雨量累计为:%.3f,预报源在 issue=%s 发布的 +1h %s-%s 预报值是 %s",
learnStart.Format("15:04"), learnEnd.Format("15:04"), actualRain,
prevIssueBucket.Format("15:04"), learnStart.Format("15:04"), learnEnd.Format("15:04"),
formatWeights(preds, false))
}
stWeights.learnFromPreds(matrix, preds, valids, actualRain)
}
eff, _ := stWeights.effectiveForMatrix(matrix)
if debugLog {
// 以 provider=weight 的形式输出,顺序与 sources 一致
toNamed := func(ws []float64) string {
pairs := make([]string, 0, len(matrix))
for i, pm := range matrix {
pairs = append(pairs, fmt.Sprintf("%s=%.2f", pm.Provider, ws[i]))
}
return strings.Join(pairs, ", ")
}
log.Printf("学习前权重 %s ,本轮使用权重 %s", toNamed(effBefore), toNamed(eff))
}
fused := fuseForecastsWith(matrix, eff)
// 记录掩码前的融合雨量
var preMaskRain = []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain}
rainAllZero := isRainAllZero(fused) rainAllZero := isRainAllZero(fused)
maskApplied := false maskApplied := false
@ -242,11 +288,17 @@ func main() {
} }
} }
if debugLog { if debugLog {
log.Printf("掩码报告: station=%s issued=%s scanned=%v windOK=%v used=%s wind=%.2fm/s %.0f° tiles=%d hits=[%d,%d,%d] mask_hours=%v", info.ID, issued.Format("2006-01-02 15:04:05"), rep.Scanned, rep.WindOK, rep.UsedAlias, rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskHours) log.Printf("mask t=%s wind=%.1fm/s %.0f° tiles=%d hits=[%d,%d,%d] applied=%v",
issued.Format("2006-01-02 15:04:05"), rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskApplied)
} }
} }
} }
if debugLog {
// 展示预报与掩码后的融合雨量 [h+1,h+2,h+3]
rp := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain}
log.Printf("预报 %s , mask=%v , 融合的结果 %s", formatWeights(preMaskRain, false), maskApplied, formatWeights(rp, false))
}
for h := 1; h <= 3; h++ { for h := 1; h <= 3; h++ {
results = append(results, fusedRecord{ results = append(results, fusedRecord{
StationID: info.ID, StationID: info.ID,
@ -257,20 +309,7 @@ func main() {
}) })
} }
actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, issued, issued.Add(time.Hour)) // 学习已在融合前完成
if err != nil {
log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err)
} else if actualOK {
before := append([]float64{}, weights...)
weights = adjustWeightsSubset(weights, job.providers, matrix, actualRain)
if debugLog {
log.Printf("权重调整: station=%s issued=%s actual=%.3f | 之前[%s=%.2f,%s=%.2f,%s=%.2f] 之后[%s=%.2f,%s=%.2f,%s=%.2f] 参与源=%d",
info.ID, issued.Format("2006-01-02 15:04:05"), actualRain,
job.providers[0], before[0], job.providers[1], before[1], job.providers[2], before[2],
job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2],
len(matrix))
}
}
step++ step++
if progressFn != nil { if progressFn != nil {
@ -554,6 +593,19 @@ func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Tim
return list, nil return list, nil
} }
// lead1RainAtIssued 返回某站点、某provider在给定小时桶(J)内的发行的 +1 小时降雨预测(若存在)。
func lead1RainAtIssued(db *sql.DB, stationID, provider string, bucketHour time.Time) (float64, bool) {
issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour)
if err != nil || !ok {
return 0, false
}
samples, err := loadForecastSamples(db, stationID, provider, issued)
if err != nil || len(samples) < 1 {
return 0, false
}
return samples[0].Rain, true
}
func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample { func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample {
result := make([]forecastSample, 3) result := make([]forecastSample, 3)
for h := 0; h < 3; h++ { for h := 0; h < 3; h++ {
@ -646,36 +698,204 @@ func uniform(n int) []float64 {
return w return w
} }
// effectiveWeights 将基础权重映射到参与融合的 providers按 matrix 顺序),并在子集上归一化 // weightState: 维护三源与各二源对的基础权重
func effectiveWeights(base []float64, providersAll []string, matrix []providerMatrix) []float64 { type weightState struct {
m := make(map[string]int) providersAll []string
for i, p := range providersAll { baseTriad []float64 // [om, cy, im] 按 providersAll 对齐
m[p] = i basePairs map[string][]float64 // 保留字段但不再使用成对权重
} lastPresent map[string]bool // 上一次有效参与的provider集合
eff := make([]float64, len(matrix)) lastEff map[string]float64 // 上一次按provider的有效权重仅在场时更新
sum := 0.0
for i, pm := range matrix {
if idx, ok := m[pm.Provider]; ok {
eff[i] = base[idx]
sum += eff[i]
}
}
if sum <= 0 {
return uniform(len(matrix))
}
for i := range eff {
eff[i] /= sum
}
return eff
} }
// adjustWeightsSubset 仅对当前参与的 providers 调整权重,其它分量保持不变 func newWeightState(providersAll []string) *weightState {
func adjustWeightsSubset(base []float64, providersAll []string, matrix []providerMatrix, actual float64) []float64 { ws := &weightState{providersAll: providersAll}
preds := extractProviderRainAt(matrix, 0) ws.baseTriad = append([]float64{}, defaultWeights...)
if len(preds) == 0 { ws.basePairs = map[string][]float64{}
return base ws.lastPresent = map[string]bool{}
ws.lastEff = map[string]float64{}
return ws
} }
// 找最优和最差(绝对误差)
func (ws *weightState) idx(p string) int {
for i, v := range ws.providersAll {
if v == p {
return i
}
}
return 0
}
func pairKey(a, b string) string {
if a < b {
return a + "|" + b
}
return b + "|" + a
}
// providersList: 用于debug展示参与源
func providersList(matrix []providerMatrix) string {
names := make([]string, len(matrix))
for i, pm := range matrix {
names[i] = pm.Provider
}
return fmt.Sprintf("%v", names)
}
// formatWeights 将权重格式化为字符串,如 "[0.70,0.10,0.20]"
// 若 quantizeTenths=true则先按 0.1 就近取整再输出两位小数
func formatWeights(ws []float64, quantizeTenths bool) string {
if len(ws) == 0 {
return "[]"
}
var parts []string
for _, v := range ws {
if quantizeTenths {
v = math.Round(v*10.0) / 10.0
}
parts = append(parts, fmt.Sprintf("%.2f", v))
}
return "[" + strings.Join(parts, ",") + "]"
}
// effectiveForMatrix: 返回与 matrix 顺序对齐的有效权重(两源/三源),在子集上归一化
func (ws *weightState) effectiveForMatrix(matrix []providerMatrix) ([]float64, bool) {
// 根据当前在场provider集合对 triad 进行“缺失转移/新增分配”并输出有效权重。
n := len(matrix)
eff := make([]float64, n)
if n == 0 {
return eff, false
}
// 建立当前在场集合
present := make(map[string]bool, n)
for _, pm := range matrix {
present[pm.Provider] = true
}
// 识别缺失与新增
var missing []string
var added []string
if len(ws.lastPresent) == 0 {
// 首次:按照 providersAll 判断缺失(需要立即应用缺失转移)
for _, p := range ws.providersAll {
if !present[p] {
missing = append(missing, p)
}
}
} else {
for p := range ws.lastPresent {
if !present[p] {
missing = append(missing, p)
}
}
for p := range present {
if !ws.lastPresent[p] {
added = append(added, p)
}
}
}
if debugEnabled && (len(missing) > 0 || len(added) > 0) {
// 稳定顺序:按 providersAll 顺序输出 present/added/removed
presList := make([]string, 0, len(present))
for _, p := range ws.providersAll {
if present[p] {
presList = append(presList, p)
}
}
// added/missing 也按 providersAll 顺序
orderIdx := func(p string) int { return ws.idx(p) }
sort.Slice(added, func(i, j int) bool { return orderIdx(added[i]) < orderIdx(added[j]) })
sort.Slice(missing, func(i, j int) bool { return orderIdx(missing[i]) < orderIdx(missing[j]) })
log.Printf("sources %v added=%v removed=%v", presList, added, missing)
}
// 缺失将缺失源的权重转移给当前在场中权重最大的源缺失源置0
if len(missing) > 0 && len(present) > 0 {
// 找当前在场的最大权重源
maxP := ""
maxW := -1.0
for p := range present {
w := ws.baseTriad[ws.idx(p)]
if w > maxW {
maxW, maxP = w, p
}
}
gain := 0.0
for _, m := range missing {
w := ws.baseTriad[ws.idx(m)]
if w > 0 {
gain += w
ws.baseTriad[ws.idx(m)] = 0
}
}
if maxP != "" && gain > 0 {
ws.baseTriad[ws.idx(maxP)] += gain
if debugEnabled {
tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
log.Printf("removed %v -> %s +%.2f triad %s",
missing, maxP, gain, formatWeights(tri, true))
}
}
}
// 新增从当前最大源挪出0.2给新加入的源(每个新增一次性处理)
for _, a := range added {
// 仅在此前为0时进行分配避免重复分配
if ws.baseTriad[ws.idx(a)] <= 0 {
// 找在场的最大源(不包含自身)
maxP := ""
maxW := -1.0
for p := range present {
if p == a {
continue
}
w := ws.baseTriad[ws.idx(p)]
if w > maxW {
maxW, maxP = w, p
}
}
if maxP != "" && maxW > 0 {
delta := 0.2
if maxW < delta {
delta = maxW
}
ws.baseTriad[ws.idx(maxP)] -= delta
ws.baseTriad[ws.idx(a)] += delta
if debugEnabled {
tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
log.Printf("added %s from %s %.2f triad %s",
a, maxP, delta, formatWeights(tri, true))
}
}
}
}
// 更新 lastPresent 为当前集合
ws.lastPresent = present
// 组装有效权重,按 matrix 顺序输出。此时 triad 在场分量应合计为1。
for i, pm := range matrix {
eff[i] = ws.baseTriad[ws.idx(pm.Provider)]
}
// 不在此处打印权重对比,由主循环根据“学习前/后”输出
// 更新上一次有效权重
for i, pm := range matrix {
ws.lastEff[pm.Provider] = eff[i]
}
return eff, false
}
// learn: 只在本集合中做 +0.1/-0.1,学习后在该集合内归一化,并写回对应的基础权重集
func (ws *weightState) learn(matrix []providerMatrix, actual float64) {
n := len(matrix)
if n <= 1 {
return
}
preds := extractProviderRainAt(matrix, 0)
if len(preds) != n {
return
}
// 找出当前在场集合中的最优与最差
best, worst := 0, 0 best, worst := 0, 0
for i := range preds { for i := range preds {
if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) { if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
@ -685,35 +905,87 @@ func adjustWeightsSubset(base []float64, providersAll []string, matrix []provide
worst = i worst = i
} }
} }
// provider 名到 base 索引
m := make(map[string]int) alpha := 0.1
for i, p := range providersAll { pBest := matrix[best].Provider
m[p] = i pWorst := matrix[worst].Provider
// 仅在在场集合中调整第三方可能为0绝对不可信且不参与两源学习
decCap := ws.baseTriad[ws.idx(pWorst)]
delta := alpha
if decCap < delta {
delta = decCap
} }
out := append([]float64{}, base...) if delta <= 0 {
// 应用 ±0.1 调整 return
if bi, ok := m[matrix[best].Provider]; ok {
out[bi] += 0.1
} }
if wi, ok := m[matrix[worst].Provider]; ok { prev := append([]float64{}, ws.baseTriad...)
out[wi] -= 0.1 ws.baseTriad[ws.idx(pWorst)] -= delta
ws.baseTriad[ws.idx(pBest)] += delta
if debugEnabled {
before := []float64{prev[ws.idx(providerOpenMeteo)], prev[ws.idx(providerCaiyun)], prev[ws.idx(providerImdroid)]}
after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
log.Printf("rain=%.3f best=%s worst=%s weights %s -> %s",
actual, pBest, pWorst, formatWeights(before, true), formatWeights(after, true))
} }
// 夹紧并归一化
sum := 0.0
for i := range out {
if out[i] < 0 {
out[i] = 0
} }
sum += out[i]
// learnFromPreds: 使用外部提供的预测值(与 matrix 顺序对齐)进行学习。
func (ws *weightState) learnFromPreds(matrix []providerMatrix, preds []float64, valids []bool, actual float64) {
n := len(matrix)
if n == 0 || len(preds) != n || len(valids) != n {
return
} }
if sum == 0 { // 仅在有效的、且当前权重>0 的来源中选择最优/最差
// 回到默认 idxs := make([]int, 0, n)
return append([]float64{}, defaultWeights...) for i := 0; i < n; i++ {
if !valids[i] {
continue
} }
for i := range out { idxs = append(idxs, i)
out[i] /= sum }
if len(idxs) < 2 {
return
}
best, worst := idxs[0], idxs[0]
for _, i := range idxs {
if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
best = i
}
if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) {
worst = i
}
}
alpha := 0.1
pBest := matrix[best].Provider
pWorst := matrix[worst].Provider
decCap := ws.baseTriad[ws.idx(pWorst)]
delta := alpha
if decCap < delta {
delta = decCap
}
if delta <= 0 {
return
}
// 日志绝对误差详情OM/CY/IM和最优/最差
if debugEnabled {
name := func(p string) string { return p }
// 组装当前在场源的误差摘要
parts := make([]string, 0, len(matrix))
for i, pm := range matrix {
if !valids[i] || ws.baseTriad[ws.idx(pm.Provider)] <= 0 {
continue
}
parts = append(parts, fmt.Sprintf("%s |%.2f-%.3f|=%.3f", name(pm.Provider), preds[i], actual, math.Abs(preds[i]-actual)))
}
log.Printf("绝对误差:%s → best=%sworst=%s。", strings.Join(parts, ""), name(pBest), name(pWorst))
}
ws.baseTriad[ws.idx(pWorst)] -= delta
ws.baseTriad[ws.idx(pBest)] += delta
if debugEnabled {
after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
log.Printf("按规则best +0.1worst 0.1 → 权重在下一轮将为 %s", formatWeights(after, true))
} }
return out
} }
// probeProviderAtBucket: 调试辅助打印该小时桶内每个provider的issued与是否有+1/+2/+3样本 // probeProviderAtBucket: 调试辅助打印该小时桶内每个provider的issued与是否有+1/+2/+3样本