1420 lines
40 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"database/sql"
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"math"
"os"
"sort"
"strings"
"time"
"weatherstation/internal/database"
)
const (
providerOpenMeteo = "open-meteo"
providerCaiyun = "caiyun"
providerImdroid = "imdroid"
outputProvider = "imdroid_mix"
)
var (
defaultWeights = []float64{0.4, 0.3, 0.3}
asiaShanghai *time.Location
debugEnabled bool
)
func init() {
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
loc = time.FixedZone("CST", 8*3600)
}
asiaShanghai = loc
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[imdroidmix] ")
}
type stationInfo struct {
ID string
Alias string
Lat sql.NullFloat64
Lon sql.NullFloat64
Z sql.NullInt64
Y sql.NullInt64
X sql.NullInt64
}
type forecastSample struct {
Rain float64
Temp float64
Humidity float64
WindSpeed float64
WindGust float64
WindDir float64
Prob float64
Pressure float64
UV float64
}
type providerMatrix struct {
Provider string
Samples []forecastSample
}
type fusedRecord struct {
StationID string
IssuedAt time.Time
ForecastTime time.Time
Sample forecastSample
MaskApplied bool
}
type maskContext struct {
id string
alias string
lat float64
lon float64
z int
y int
x int
}
func main() {
var (
issuedRange string
stationFlag string
outSQLPath string
showProgress bool
applyUpserts bool
includeAll bool
debugLog bool
noMask bool
minSources int
maskHoursStr string
)
flag.StringVar(&issuedRange, "issued_range", "", "时间范围,格式 \"YYYY-MM-DD HH:MM:SS,YYYY-MM-DD HH:MM:SS\"CST")
flag.StringVar(&stationFlag, "station", "", "目标站点(可选,多个以逗号分隔)")
flag.BoolVar(&includeAll, "all_stations", false, "处理全部有经纬度的站点")
flag.StringVar(&outSQLPath, "export_sql", "imdroid_mix.sql", "输出SQL文件路径")
flag.BoolVar(&showProgress, "progress", false, "展示处理进度")
flag.BoolVar(&applyUpserts, "apply", false, "直接写入数据库默认仅导出SQL")
flag.BoolVar(&debugLog, "debug", false, "输出调试日志")
flag.BoolVar(&noMask, "no_mask", false, "关闭雷达掩码(不清零融合雨量)")
flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过")
flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0")
flag.Parse()
// 让库内辅助函数也能访问调试开关
debugEnabled = debugLog
if strings.TrimSpace(issuedRange) == "" {
log.Fatalln("必须提供 --issued_range")
}
startIssued, endIssued, err := parseRange(issuedRange)
if err != nil {
log.Fatalf("解析 issued_range 失败: %v", err)
}
if !endIssued.After(startIssued) {
log.Fatalln("issued_range 结束时间必须大于起始时间")
}
db := database.GetDB()
defer database.Close()
stations, err := resolveStations(db, stationFlag, includeAll)
if err != nil {
log.Fatalf("加载站点失败: %v", err)
}
if len(stations) == 0 {
log.Fatalln("没有符合条件的站点")
}
type stationWork struct {
info stationInfo
issuedTimes []time.Time
providers []string
}
var jobs []stationWork
var totalSteps int
for _, st := range stations {
pSet, err := resolveProviderSet(db, st.ID, startIssued, endIssued)
if err != nil {
log.Fatalf("检测站点 %s 预报源失败: %v", st.ID, err)
}
if debugLog {
log.Printf("站点 %s 使用预报源: %v", st.ID, pSet)
}
issuedTimes, err := fetchIssuedHourBuckets(db, st.ID, startIssued, endIssued, pSet, minSources)
if err != nil {
log.Fatalf("查询站点 %s 的 issued_at 失败: %v", st.ID, err)
}
if len(issuedTimes) == 0 {
if debugLog {
// 打印每个provider的 issued_at 数量辅助定位
for _, p := range pSet {
cnt, first, last := debugIssuedSummary(db, st.ID, startIssued, endIssued, p)
log.Printf("站点 %s provider=%s issued_at数量=%d 时间范围[%s, %s]", st.ID, p, cnt, first, last)
}
}
continue
}
jobs = append(jobs, stationWork{info: st, issuedTimes: issuedTimes, providers: pSet})
totalSteps += len(issuedTimes)
}
if len(jobs) == 0 {
log.Fatalln("指定时间范围内无可融合的预报数据")
}
var (
results []fusedRecord
step int
progressFn func(current int, total int, ctx string)
)
if showProgress {
progressFn = func(current, total int, ctx string) {
fmt.Printf("\r处理进度 %d/%d (%s)...", current, total, ctx)
}
defer fmt.Println()
}
// 解析 mask_hours
maskHours := [3]bool{true, true, true}
if s := strings.TrimSpace(maskHoursStr); s != "" {
parts := strings.Split(s, ",")
for i := 0; i < len(parts) && i < 3; i++ {
v := strings.TrimSpace(parts[i])
maskHours[i] = (v == "1" || strings.EqualFold(v, "true"))
}
}
for _, job := range jobs {
info := job.info
maskCtx, hasMaskCtx := buildMaskContext(info)
stWeights := newWeightState(job.providers)
for idx, issued := range job.issuedTimes {
matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers)
if err != nil {
log.Printf("站点 %s issued=%s 载入预报失败: %v", info.ID, issued.Format(time.RFC3339), err)
continue
}
if len(matrix) < minSources {
log.Printf("站点 %s issued=%s 可用源=%d < min_sources=%d跳过", info.ID, issued.Format(time.RFC3339), len(matrix), minSources)
if debugLog {
probeProviderAtBucket(db, info.ID, job.providers, issued)
}
continue
}
if debugLog {
log.Printf("")
log.Printf("issue %s station=%s", issued.Format("2006-01-02 15:04:05"), info.ID)
}
// 学习先行:先得到学习前权重,再学习,再得到本轮使用权重
effBefore, _ := stWeights.effectiveForMatrix(matrix)
// 学习对齐:用刚结束的小时 [issued-1h, issued) 与 (issued-2h) 的 +1 预测
learnStart := issued.Add(-time.Hour)
learnEnd := issued
actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, learnStart, learnEnd)
if err != nil {
log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err)
} else if actualOK {
prevIssueBucket := issued.Add(-2 * time.Hour)
preds := make([]float64, len(matrix))
valids := make([]bool, len(matrix))
for i, pm := range matrix {
if v, ok := lead1RainAtIssued(db, info.ID, pm.Provider, prevIssueBucket); ok {
preds[i] = v
valids[i] = true
}
}
if debugLog {
log.Printf("在 [%s,%s) ,该小时本站实况雨量累计为:%.3f,预报源在 issue=%s 发布的 +1h %s-%s 预报值是 %s",
learnStart.Format("15:04"), learnEnd.Format("15:04"), actualRain,
prevIssueBucket.Format("15:04"), learnStart.Format("15:04"), learnEnd.Format("15:04"),
formatWeights(preds, false))
}
stWeights.learnFromPreds(matrix, preds, valids, actualRain)
}
eff, _ := stWeights.effectiveForMatrix(matrix)
if debugLog {
// 以 provider=weight 的形式输出,顺序与 sources 一致
toNamed := func(ws []float64) string {
pairs := make([]string, 0, len(matrix))
for i, pm := range matrix {
pairs = append(pairs, fmt.Sprintf("%s=%.2f", pm.Provider, ws[i]))
}
return strings.Join(pairs, ", ")
}
log.Printf("学习前权重 %s ,本轮使用权重 %s", toNamed(effBefore), toNamed(eff))
}
fused := fuseForecastsWith(matrix, eff)
// 记录掩码前的融合雨量
var preMaskRain = []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain}
rainAllZero := isRainAllZero(fused)
maskApplied := false
if !rainAllZero && hasMaskCtx && !noMask {
rep, err := checkRadarMask(db, maskCtx, issued)
if err != nil {
log.Printf("站点 %s issued=%s 掩码计算出错: %v", info.ID, issued.Format(time.RFC3339), err)
} else {
if rep.Scanned {
// 逐小时按 mask_hours 应用
// 解析 mask_hours 上移至批次开始
// 在此应用逐小时清零
// 重用 maskHours 变量
for hidx := 0; hidx < 3; hidx++ {
if !maskHours[hidx] {
continue
}
if rep.HitsByHour[hidx] == 0 {
fused[hidx].Rain = 0
maskApplied = true
}
}
}
if debugLog {
log.Printf("mask t=%s wind=%.1fm/s %.0f° tiles=%d hits=[%d,%d,%d] applied=%v",
issued.Format("2006-01-02 15:04:05"), rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskApplied)
}
}
}
if debugLog {
// 展示预报与掩码后的融合雨量 [h+1,h+2,h+3]
rp := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain}
log.Printf("预报 %s , mask=%v , 融合的结果 %s", formatWeights(preMaskRain, false), maskApplied, formatWeights(rp, false))
}
for h := 1; h <= 3; h++ {
results = append(results, fusedRecord{
StationID: info.ID,
IssuedAt: issued, // 输出按“小时桶”归一的发布时间
ForecastTime: issued.Add(time.Duration(h) * time.Hour),
Sample: fused[h-1],
MaskApplied: maskApplied,
})
}
// 学习已在融合前完成
step++
if progressFn != nil {
progressFn(step, totalSteps, fmt.Sprintf("%s #%d", info.ID, idx+1))
}
}
}
if len(results) == 0 {
log.Println("未生成任何融合结果")
return
}
sqlLines := buildSQL(results)
if err := os.WriteFile(outSQLPath, []byte(strings.Join(sqlLines, "\n")+"\n"), 0o644); err != nil {
log.Fatalf("写入SQL文件失败: %v", err)
}
log.Printf("已写出SQL到 %s共 %d 条记录)", outSQLPath, len(results))
if applyUpserts {
if err := applySQL(db, results); err != nil {
log.Fatalf("写入数据库失败: %v", err)
}
log.Printf("已写入数据库 forecast_hourlyprovider=%s", outputProvider)
}
}
func parseRange(r string) (time.Time, time.Time, error) {
parts := strings.Split(r, ",")
if len(parts) != 2 {
return time.Time{}, time.Time{}, errors.New("格式应为 start,end")
}
start, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[0]), asiaShanghai)
if err != nil {
return time.Time{}, time.Time{}, err
}
end, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[1]), asiaShanghai)
if err != nil {
return time.Time{}, time.Time{}, err
}
return start, end, nil
}
func resolveStations(db *sql.DB, stationFlag string, includeAll bool) ([]stationInfo, error) {
if includeAll {
const q = `
SELECT station_id,
COALESCE(NULLIF(station_alias,''), station_id) AS alias,
latitude, longitude, z, y, x
FROM stations
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
AND device_type='WH65LP'
ORDER BY station_id`
rows, err := db.Query(q)
if err != nil {
return nil, err
}
defer rows.Close()
var out []stationInfo
for rows.Next() {
var s stationInfo
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil {
return nil, err
}
out = append(out, s)
}
return out, nil
}
list := fieldsFromFlag(stationFlag)
if len(list) == 0 {
return nil, nil
}
var out []stationInfo
const q = `
SELECT station_id,
COALESCE(NULLIF(station_alias,''), station_id) AS alias,
latitude, longitude, z, y, x
FROM stations
WHERE station_id=$1`
for _, id := range list {
var s stationInfo
err := db.QueryRow(q, id).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
if err != nil {
if err == sql.ErrNoRows {
log.Printf("警告:站点 %s 不存在,跳过", id)
continue
}
return nil, err
}
out = append(out, s)
}
return out, nil
}
func fieldsFromFlag(v string) []string {
if strings.TrimSpace(v) == "" {
return nil
}
parts := strings.Split(v, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}
func resolveProviderSet(_ *sql.DB, _ string, _ time.Time, _ time.Time) ([]string, error) {
// 固定三源,不做别名回退
return []string{providerOpenMeteo, providerCaiyun, providerImdroid}, nil
}
func hasProviderData(db *sql.DB, stationID, provider string, from, to time.Time) (bool, error) {
var cnt int
err := db.QueryRow(`
SELECT COUNT(*)
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`,
stationID, provider, from, to).Scan(&cnt)
if err != nil {
return false, err
}
return cnt > 0, nil
}
func fetchIssuedHourBuckets(db *sql.DB, stationID string, from, to time.Time, providers []string, minSources int) ([]time.Time, error) {
if len(providers) != 3 {
return nil, fmt.Errorf("provider 列表长度异常: %v", providers)
}
if minSources < 1 {
minSources = 1
}
if minSources > 3 {
minSources = 3
}
const q = `
WITH base AS (
SELECT date_trunc('hour', issued_at) AS bucket, provider
FROM forecast_hourly
WHERE station_id=$1 AND provider IN ($2, $3, $4)
AND issued_at BETWEEN $5 AND $6
GROUP BY 1,2
)
SELECT bucket
FROM base
GROUP BY bucket
HAVING COUNT(DISTINCT provider) >= $7
ORDER BY bucket`
rows, err := db.Query(q, stationID, providers[0], providers[1], providers[2], from, to, minSources)
if err != nil {
return nil, err
}
defer rows.Close()
var buckets []time.Time
for rows.Next() {
var t time.Time
if err := rows.Scan(&t); err != nil {
return nil, err
}
buckets = append(buckets, t.In(asiaShanghai))
}
return buckets, nil
}
// debugIssuedSummary 仅用于 --debug统计provider在范围内的issued_at计数及首尾时间
func debugIssuedSummary(db *sql.DB, stationID string, from, to time.Time, provider string) (count int, first string, last string) {
const q = `
SELECT COUNT(*),
MIN(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS first,
MAX(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS last
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`
var f, l sql.NullString
if err := db.QueryRow(q, stationID, provider, from, to).Scan(&count, &f, &l); err != nil {
return 0, "", ""
}
if f.Valid {
first = f.String
}
if l.Valid {
last = l.String
}
return
}
func loadProviderMatrix(db *sql.DB, stationID string, bucketHour time.Time, providers []string) ([]providerMatrix, error) {
out := make([]providerMatrix, 0, len(providers))
for _, provider := range providers {
issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour)
if err != nil {
return nil, err
}
if !ok {
// 该源缺席,跳过
continue
}
samples, err := loadForecastSamples(db, stationID, provider, issued)
if err != nil {
return nil, err
}
if len(samples) != 3 {
// 样本不足,视为该源缺席
continue
}
out = append(out, providerMatrix{Provider: provider, Samples: samples})
}
return out, nil
}
// 在某个小时桶内,选取该 provider 的最新 issued_at[bucket, bucket+1h)
func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) {
const q = `
SELECT issued_at
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour'
ORDER BY issued_at DESC
LIMIT 1`
var t time.Time
err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t)
if err == sql.ErrNoRows {
return time.Time{}, false, nil
}
if err != nil {
return time.Time{}, false, err
}
return t, true, nil
}
func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) {
const q = `
SELECT forecast_time,
rain_mm_x1000,
temp_c_x100,
humidity_pct,
wind_speed_ms_x1000,
wind_gust_ms_x1000,
wind_dir_deg,
precip_prob_pct,
pressure_hpa_x100,
uv_index
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND issued_at=$3
ORDER BY forecast_time ASC`
rows, err := db.Query(q, stationID, provider, issued)
if err != nil {
return nil, err
}
defer rows.Close()
var list []forecastSample
for rows.Next() {
var (
ft time.Time
rain, temp, humidity, ws, gust, wdir, prob sql.NullInt64
pressure, uv sql.NullInt64
)
if err := rows.Scan(&ft, &rain, &temp, &humidity, &ws, &gust, &wdir, &prob, &pressure, &uv); err != nil {
return nil, err
}
// 使用“向上取整”的方式计算前视小时(与查询端 lead_hours=CEIL(...) 逻辑对齐)
dh := ft.Sub(issued).Hours()
hours := int(math.Ceil(dh - 1e-9))
if hours < 1 || hours > 3 {
continue
}
s := forecastSample{
Rain: float64(rain.Int64) / 1000.0,
Temp: float64(temp.Int64) / 100.0,
Humidity: float64(humidity.Int64),
WindSpeed: float64(ws.Int64) / 1000.0,
WindGust: float64(gust.Int64) / 1000.0,
WindDir: float64(wdir.Int64),
Prob: float64(prob.Int64),
Pressure: float64(pressure.Int64) / 100.0,
UV: float64(uv.Int64),
}
list = append(list, s)
}
return list, nil
}
// lead1RainAtIssued 返回某站点、某provider在给定小时桶(J)内的发行的 +1 小时降雨预测(若存在)。
func lead1RainAtIssued(db *sql.DB, stationID, provider string, bucketHour time.Time) (float64, bool) {
issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour)
if err != nil || !ok {
return 0, false
}
samples, err := loadForecastSamples(db, stationID, provider, issued)
if err != nil || len(samples) < 1 {
return 0, false
}
return samples[0].Rain, true
}
func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample {
result := make([]forecastSample, 3)
for h := 0; h < 3; h++ {
var windX, windY float64
for pIdx, pm := range matrix {
if len(pm.Samples) <= h {
continue
}
s := pm.Samples[h]
w := weights[pIdx]
result[h].Rain += s.Rain * w
result[h].Temp += s.Temp * w
result[h].Humidity += s.Humidity * w
result[h].WindSpeed += s.WindSpeed * w
result[h].WindGust += s.WindGust * w
result[h].Prob += s.Prob * w
result[h].Pressure += s.Pressure * w
result[h].UV += s.UV * w
rad := s.WindDir * math.Pi / 180
windX += math.Cos(rad) * w
windY += math.Sin(rad) * w
}
if windX == 0 && windY == 0 {
result[h].WindDir = 0
} else {
dir := math.Atan2(windY, windX) * 180 / math.Pi
if dir < 0 {
dir += 360
}
result[h].WindDir = dir
}
result[h].Humidity = math.Round(result[h].Humidity)
result[h].Prob = math.Round(result[h].Prob)
result[h].UV = math.Round(result[h].UV)
}
return result
}
// fuseForecastsWith 使用与 matrix 对齐的有效权重进行融合
func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample {
if len(eff) != len(matrix) {
return fuseForecasts(matrix, uniform(len(matrix)))
}
result := make([]forecastSample, 3)
for h := 0; h < 3; h++ {
var windX, windY float64
for pIdx, pm := range matrix {
if len(pm.Samples) <= h {
continue
}
s := pm.Samples[h]
w := eff[pIdx]
result[h].Rain += s.Rain * w
result[h].Temp += s.Temp * w
result[h].Humidity += s.Humidity * w
result[h].WindSpeed += s.WindSpeed * w
result[h].WindGust += s.WindGust * w
result[h].Prob += s.Prob * w
result[h].Pressure += s.Pressure * w
result[h].UV += s.UV * w
rad := s.WindDir * math.Pi / 180
windX += math.Cos(rad) * w
windY += math.Sin(rad) * w
}
if windX == 0 && windY == 0 {
result[h].WindDir = 0
} else {
dir := math.Atan2(windY, windX) * 180 / math.Pi
if dir < 0 {
dir += 360
}
result[h].WindDir = dir
}
result[h].Humidity = math.Round(result[h].Humidity)
result[h].Prob = math.Round(result[h].Prob)
result[h].UV = math.Round(result[h].UV)
}
return result
}
func uniform(n int) []float64 {
if n <= 0 {
return nil
}
w := make([]float64, n)
v := 1.0 / float64(n)
for i := range w {
w[i] = v
}
return w
}
// weightState: 维护三源与各二源对的基础权重
type weightState struct {
providersAll []string
baseTriad []float64 // [om, cy, im] 按 providersAll 对齐
basePairs map[string][]float64 // 保留字段但不再使用成对权重
lastPresent map[string]bool // 上一次有效参与的provider集合
lastEff map[string]float64 // 上一次按provider的有效权重仅在场时更新
}
func newWeightState(providersAll []string) *weightState {
ws := &weightState{providersAll: providersAll}
ws.baseTriad = append([]float64{}, defaultWeights...)
ws.basePairs = map[string][]float64{}
ws.lastPresent = map[string]bool{}
ws.lastEff = map[string]float64{}
return ws
}
func (ws *weightState) idx(p string) int {
for i, v := range ws.providersAll {
if v == p {
return i
}
}
return 0
}
func pairKey(a, b string) string {
if a < b {
return a + "|" + b
}
return b + "|" + a
}
// providersList: 用于debug展示参与源
func providersList(matrix []providerMatrix) string {
names := make([]string, len(matrix))
for i, pm := range matrix {
names[i] = pm.Provider
}
return fmt.Sprintf("%v", names)
}
// formatWeights 将权重格式化为字符串,如 "[0.70,0.10,0.20]"
// 若 quantizeTenths=true则先按 0.1 就近取整再输出两位小数
func formatWeights(ws []float64, quantizeTenths bool) string {
if len(ws) == 0 {
return "[]"
}
var parts []string
for _, v := range ws {
if quantizeTenths {
v = math.Round(v*10.0) / 10.0
}
parts = append(parts, fmt.Sprintf("%.2f", v))
}
return "[" + strings.Join(parts, ",") + "]"
}
// effectiveForMatrix: 返回与 matrix 顺序对齐的有效权重(两源/三源),在子集上归一化
func (ws *weightState) effectiveForMatrix(matrix []providerMatrix) ([]float64, bool) {
// 根据当前在场provider集合对 triad 进行“缺失转移/新增分配”并输出有效权重。
n := len(matrix)
eff := make([]float64, n)
if n == 0 {
return eff, false
}
// 建立当前在场集合
present := make(map[string]bool, n)
for _, pm := range matrix {
present[pm.Provider] = true
}
// 识别缺失与新增
var missing []string
var added []string
if len(ws.lastPresent) == 0 {
// 首次:按照 providersAll 判断缺失(需要立即应用缺失转移)
for _, p := range ws.providersAll {
if !present[p] {
missing = append(missing, p)
}
}
} else {
for p := range ws.lastPresent {
if !present[p] {
missing = append(missing, p)
}
}
for p := range present {
if !ws.lastPresent[p] {
added = append(added, p)
}
}
}
if debugEnabled && (len(missing) > 0 || len(added) > 0) {
// 稳定顺序:按 providersAll 顺序输出 present/added/removed
presList := make([]string, 0, len(present))
for _, p := range ws.providersAll {
if present[p] {
presList = append(presList, p)
}
}
// added/missing 也按 providersAll 顺序
orderIdx := func(p string) int { return ws.idx(p) }
sort.Slice(added, func(i, j int) bool { return orderIdx(added[i]) < orderIdx(added[j]) })
sort.Slice(missing, func(i, j int) bool { return orderIdx(missing[i]) < orderIdx(missing[j]) })
log.Printf("sources %v added=%v removed=%v", presList, added, missing)
}
// 缺失将缺失源的权重转移给当前在场中权重最大的源缺失源置0
if len(missing) > 0 && len(present) > 0 {
// 找当前在场的最大权重源
maxP := ""
maxW := -1.0
for p := range present {
w := ws.baseTriad[ws.idx(p)]
if w > maxW {
maxW, maxP = w, p
}
}
gain := 0.0
for _, m := range missing {
w := ws.baseTriad[ws.idx(m)]
if w > 0 {
gain += w
ws.baseTriad[ws.idx(m)] = 0
}
}
if maxP != "" && gain > 0 {
ws.baseTriad[ws.idx(maxP)] += gain
if debugEnabled {
tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
log.Printf("removed %v -> %s +%.2f triad %s",
missing, maxP, gain, formatWeights(tri, true))
}
}
}
// 新增从当前最大源挪出0.2给新加入的源(每个新增一次性处理)
for _, a := range added {
// 仅在此前为0时进行分配避免重复分配
if ws.baseTriad[ws.idx(a)] <= 0 {
// 找在场的最大源(不包含自身)
maxP := ""
maxW := -1.0
for p := range present {
if p == a {
continue
}
w := ws.baseTriad[ws.idx(p)]
if w > maxW {
maxW, maxP = w, p
}
}
if maxP != "" && maxW > 0 {
delta := 0.2
if maxW < delta {
delta = maxW
}
ws.baseTriad[ws.idx(maxP)] -= delta
ws.baseTriad[ws.idx(a)] += delta
if debugEnabled {
tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
log.Printf("added %s from %s %.2f triad %s",
a, maxP, delta, formatWeights(tri, true))
}
}
}
}
// 更新 lastPresent 为当前集合
ws.lastPresent = present
// 组装有效权重,按 matrix 顺序输出。此时 triad 在场分量应合计为1。
for i, pm := range matrix {
eff[i] = ws.baseTriad[ws.idx(pm.Provider)]
}
// 不在此处打印权重对比,由主循环根据“学习前/后”输出
// 更新上一次有效权重
for i, pm := range matrix {
ws.lastEff[pm.Provider] = eff[i]
}
return eff, false
}
// learn: 只在本集合中做 +0.1/-0.1,学习后在该集合内归一化,并写回对应的基础权重集
func (ws *weightState) learn(matrix []providerMatrix, actual float64) {
n := len(matrix)
if n <= 1 {
return
}
preds := extractProviderRainAt(matrix, 0)
if len(preds) != n {
return
}
// 找出当前在场集合中的最优与最差
best, worst := 0, 0
for i := range preds {
if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
best = i
}
if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) {
worst = i
}
}
alpha := 0.1
pBest := matrix[best].Provider
pWorst := matrix[worst].Provider
// 仅在在场集合中调整第三方可能为0绝对不可信且不参与两源学习
decCap := ws.baseTriad[ws.idx(pWorst)]
delta := alpha
if decCap < delta {
delta = decCap
}
if delta <= 0 {
return
}
prev := append([]float64{}, ws.baseTriad...)
ws.baseTriad[ws.idx(pWorst)] -= delta
ws.baseTriad[ws.idx(pBest)] += delta
if debugEnabled {
before := []float64{prev[ws.idx(providerOpenMeteo)], prev[ws.idx(providerCaiyun)], prev[ws.idx(providerImdroid)]}
after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
log.Printf("rain=%.3f best=%s worst=%s weights %s -> %s",
actual, pBest, pWorst, formatWeights(before, true), formatWeights(after, true))
}
}
// learnFromPreds: 使用外部提供的预测值(与 matrix 顺序对齐)进行学习。
func (ws *weightState) learnFromPreds(matrix []providerMatrix, preds []float64, valids []bool, actual float64) {
n := len(matrix)
if n == 0 || len(preds) != n || len(valids) != n {
return
}
// 仅在有效的、且当前权重>0 的来源中选择最优/最差
idxs := make([]int, 0, n)
for i := 0; i < n; i++ {
if !valids[i] {
continue
}
idxs = append(idxs, i)
}
if len(idxs) < 2 {
return
}
best, worst := idxs[0], idxs[0]
for _, i := range idxs {
if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
best = i
}
if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) {
worst = i
}
}
alpha := 0.1
pBest := matrix[best].Provider
pWorst := matrix[worst].Provider
decCap := ws.baseTriad[ws.idx(pWorst)]
delta := alpha
if decCap < delta {
delta = decCap
}
if delta <= 0 {
return
}
// 日志绝对误差详情OM/CY/IM和最优/最差
if debugEnabled {
name := func(p string) string { return p }
// 组装当前在场源的误差摘要
parts := make([]string, 0, len(matrix))
for i, pm := range matrix {
if !valids[i] || ws.baseTriad[ws.idx(pm.Provider)] <= 0 {
continue
}
parts = append(parts, fmt.Sprintf("%s |%.2f-%.3f|=%.3f", name(pm.Provider), preds[i], actual, math.Abs(preds[i]-actual)))
}
log.Printf("绝对误差:%s → best=%sworst=%s。", strings.Join(parts, ""), name(pBest), name(pWorst))
}
ws.baseTriad[ws.idx(pWorst)] -= delta
ws.baseTriad[ws.idx(pBest)] += delta
if debugEnabled {
after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
log.Printf("按规则best +0.1worst 0.1 → 权重在下一轮将为 %s", formatWeights(after, true))
}
}
// probeProviderAtBucket: 调试辅助打印该小时桶内每个provider的issued与是否有+1/+2/+3样本
func probeProviderAtBucket(db *sql.DB, stationID string, providers []string, bucket time.Time) {
for _, p := range providers {
issued, ok, err := resolveIssuedAtInBucket(db, stationID, p, bucket)
if err != nil {
log.Printf(" provider=%s 探测错误: %v", p, err)
continue
}
if !ok {
log.Printf(" provider=%s: 小时桶内无 issued", p)
continue
}
leads := presentLeads(db, stationID, p, issued)
log.Printf(" provider=%s: issued=%s leads=%v (count=%d)", p, issued.In(asiaShanghai).Format("2006-01-02 15:04:05"), leads, len(leads))
}
}
// presentLeads 返回该 issued 的预测中,存在的 lead 小时集合1..3
func presentLeads(db *sql.DB, stationID, provider string, issued time.Time) []int {
rows, err := db.Query(`SELECT forecast_time FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at=$3`, stationID, provider, issued)
if err != nil {
return nil
}
defer rows.Close()
var leads []int
for rows.Next() {
var ft time.Time
if err := rows.Scan(&ft); err != nil {
continue
}
dh := ft.Sub(issued).Hours()
h := int(math.Ceil(dh - 1e-9))
if h >= 1 && h <= 3 {
leads = append(leads, h)
}
}
return leads
}
func isRainAllZero(samples []forecastSample) bool {
for _, s := range samples {
if math.Abs(s.Rain) > 1e-6 {
return false
}
}
return true
}
func extractProviderRainAt(matrix []providerMatrix, hourIdx int) []float64 {
out := make([]float64, 0, len(matrix))
for _, pm := range matrix {
if len(pm.Samples) > hourIdx {
out = append(out, pm.Samples[hourIdx].Rain)
} else {
out = append(out, 0)
}
}
return out
}
func adjustWeights(current []float64, preds []float64, actual float64) []float64 {
if len(current) != len(preds) || len(current) != 3 {
return current
}
diffs := make([]float64, len(preds))
for i := range preds {
diffs[i] = math.Abs(preds[i] - actual)
}
minIdx, maxIdx := 0, 0
for i, d := range diffs {
if d < diffs[minIdx] {
minIdx = i
}
if d > diffs[maxIdx] {
maxIdx = i
}
}
if minIdx != maxIdx {
current[minIdx] += 0.1
current[maxIdx] -= 0.1
}
var sum float64
for i := range current {
if current[i] < 0 {
current[i] = 0
}
sum += current[i]
}
if sum == 0 {
return append([]float64{}, defaultWeights...)
}
for i := range current {
current[i] /= sum
}
return current
}
func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) {
const q = `
SELECT SUM(rain_10m_mm_x1000)
FROM rs485_weather_10min
WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
var sum sql.NullInt64
err := db.QueryRow(q, stationID, start, end).Scan(&sum)
if err != nil {
return 0, false, err
}
if !sum.Valid {
return 0, false, nil
}
return float64(sum.Int64) / 1000.0, true, nil
}
type tileRec struct {
DT time.Time
Width int
Height int
West float64
South float64
East float64
North float64
ResDeg float64
Data []byte
}
type maskReport struct {
Scanned bool
WindOK bool
UsedAlias string
WindSpeed float64
WindDir float64
Tiles int
HitsByHour [3]int
}
func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) {
rep := maskReport{}
windTime := issued.Add(-time.Hour)
// 优先用 station_id其次别名
if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil {
return rep, err
} else if ok {
rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id
} else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil {
return rep, err2
} else if ok2 {
rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias
} else {
// 没有风场:不扫描,不应用掩码
return rep, nil
}
tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued)
if err != nil {
return rep, err
}
if len(tiles) == 0 {
// 无瓦片:不扫描,不应用掩码
return rep, nil
}
rep.Tiles = len(tiles)
rep.Scanned = true
const circleR = 8000.0
const halfAngle = 30.0
for _, t := range tiles {
vals, xs, ys, err := decodeTile(t)
if err != nil {
continue
}
for r := 0; r < len(vals); r++ {
row := vals[r]
lat := ys[r]
for c := 0; c < len(row); c++ {
v := row[c]
if v == nil {
continue
}
dbz := *v
if dbz < 40 {
continue
}
lon := xs[c]
dist := haversine(ctx.lat, ctx.lon, lat, lon)
// +1h圆 或 扇形(3h)
if dist <= circleR {
rep.HitsByHour[0]++
}
if rep.WindSpeed > 0 {
brg := bearingDeg(ctx.lat, ctx.lon, lat, lon)
if angDiff(brg, rep.WindDir) <= halfAngle {
if dist <= rep.WindSpeed*3*3600 {
rep.HitsByHour[0]++
}
// +2h圆 或 扇形(2h)
if dist <= circleR {
rep.HitsByHour[1]++
}
if dist <= rep.WindSpeed*2*3600 {
rep.HitsByHour[1]++
}
// +3h扇形(3h)
if dist <= rep.WindSpeed*3*3600 {
rep.HitsByHour[2]++
}
}
}
}
}
}
return rep, nil
}
func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) {
const q = `
SELECT wind_speed, wind_direction
FROM radar_weather
WHERE alias=$1 AND dt <= $2
ORDER BY dt DESC
LIMIT 1`
var speed, dir sql.NullFloat64
err := db.QueryRow(q, alias, target).Scan(&speed, &dir)
if err == sql.ErrNoRows {
return 0, 0, false, nil
}
if err != nil {
return 0, 0, false, err
}
if !speed.Valid || !dir.Valid {
return 0, 0, false, nil
}
return speed.Float64, dir.Float64, true, nil
}
func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) {
const q = `
SELECT dt, width, height, west, south, east, north, res_deg, data
FROM radar_tiles
WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5
ORDER BY dt`
rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var list []tileRec
for rows.Next() {
var t tileRec
if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil {
return nil, err
}
list = append(list, t)
}
return list, nil
}
func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) {
w, h := t.Width, t.Height
if w <= 0 || h <= 0 {
return nil, nil, nil, fmt.Errorf("非法瓦片尺寸")
}
if len(t.Data) < w*h*2 {
return nil, nil, nil, fmt.Errorf("数据长度不足")
}
xs := make([]float64, w)
for c := 0; c < w; c++ {
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
}
ys := make([]float64, h)
for r := 0; r < h; r++ {
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
}
vals := make([][]*float64, h)
off := 0
for r := 0; r < h; r++ {
row := make([]*float64, w)
for c := 0; c < w; c++ {
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
off += 2
if v >= 32766 {
row[c] = nil
continue
}
dbz := float64(v) / 10.0
if dbz < 0 {
dbz = 0
} else if dbz > 75 {
dbz = 75
}
value := dbz
row[c] = &value
}
vals[r] = row
}
return vals, xs, ys, nil
}
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
const R = 6371000.0
dLat := toRad(lat2 - lat1)
dLon := toRad(lon2 - lon1)
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
return R * c
}
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
φ1 := toRad(lat1)
φ2 := toRad(lat2)
Δλ := toRad(lon2 - lon1)
y := math.Sin(Δλ) * math.Cos(φ2)
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
brg := toDeg(math.Atan2(y, x))
if brg < 0 {
brg += 360
}
return brg
}
func angDiff(a, b float64) float64 {
d := math.Mod(a-b+540, 360) - 180
if d < 0 {
d = -d
}
return math.Abs(d)
}
func toRad(d float64) float64 { return d * math.Pi / 180 }
func toDeg(r float64) float64 { return r * 180 / math.Pi }
func buildMaskContext(info stationInfo) (maskContext, bool) {
if !info.Lat.Valid || !info.Lon.Valid {
return maskContext{}, false
}
if !info.Z.Valid || !info.Y.Valid || !info.X.Valid {
return maskContext{}, false
}
alias := info.Alias
if strings.TrimSpace(alias) == "" {
alias = info.ID
}
return maskContext{
id: info.ID,
alias: alias,
lat: info.Lat.Float64,
lon: info.Lon.Float64,
z: int(info.Z.Int64),
y: int(info.Y.Int64),
x: int(info.X.Int64),
}, true
}
func buildSQL(records []fusedRecord) []string {
lines := make([]string, 0, len(records)*2)
header := "-- SQL 输出 (默认未执行)"
lines = append(lines, header)
for _, rec := range records {
sql := fmt.Sprintf(
"INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index)\nVALUES ('%s', '%s', '%s'::timestamptz, '%s'::timestamptz, %d, %d, %d, %d, %d, %d, %d, %d, %d)\nON CONFLICT (station_id, provider, issued_at, forecast_time)\nDO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000, temp_c_x100=EXCLUDED.temp_c_x100, humidity_pct=EXCLUDED.humidity_pct, wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000, wind_dir_deg=EXCLUDED.wind_dir_deg, precip_prob_pct=EXCLUDED.precip_prob_pct, pressure_hpa_x100=EXCLUDED.pressure_hpa_x100, uv_index=EXCLUDED.uv_index;",
rec.StationID,
outputProvider,
rec.IssuedAt.In(asiaShanghai).Format(time.RFC3339),
rec.ForecastTime.In(asiaShanghai).Format(time.RFC3339),
int(math.Round(rec.Sample.Rain*1000)),
int(math.Round(rec.Sample.Temp*100)),
int(math.Round(rec.Sample.Humidity)),
int(math.Round(rec.Sample.WindSpeed*1000)),
int(math.Round(rec.Sample.WindGust*1000)),
int(math.Round(rec.Sample.WindDir)),
int(math.Round(rec.Sample.Prob)),
int(math.Round(rec.Sample.Pressure*100)),
int(math.Round(rec.Sample.UV)),
)
lines = append(lines, sql, "")
}
return lines
}
func applySQL(db *sql.DB, records []fusedRecord) error {
tx, err := db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`
INSERT INTO forecast_hourly (
station_id, provider, issued_at, forecast_time,
rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000,
wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (station_id, provider, issued_at, forecast_time)
DO UPDATE SET
rain_mm_x1000 = EXCLUDED.rain_mm_x1000,
temp_c_x100 = EXCLUDED.temp_c_x100,
humidity_pct = EXCLUDED.humidity_pct,
wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
wind_dir_deg = EXCLUDED.wind_dir_deg,
precip_prob_pct = EXCLUDED.precip_prob_pct,
pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100,
uv_index = EXCLUDED.uv_index`)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, rec := range records {
if _, err := stmt.Exec(
rec.StationID,
outputProvider,
rec.IssuedAt,
rec.ForecastTime,
int(math.Round(rec.Sample.Rain*1000)),
int(math.Round(rec.Sample.Temp*100)),
int(math.Round(rec.Sample.Humidity)),
int(math.Round(rec.Sample.WindSpeed*1000)),
int(math.Round(rec.Sample.WindGust*1000)),
int(math.Round(rec.Sample.WindDir)),
int(math.Round(rec.Sample.Prob)),
int(math.Round(rec.Sample.Pressure*100)),
int(math.Round(rec.Sample.UV)),
); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}