diff --git a/cmd/imdroidmix/main.go b/cmd/imdroidmix/main.go
new file mode 100644
index 0000000..6c3aa16
--- /dev/null
+++ b/cmd/imdroidmix/main.go
@@ -0,0 +1,1147 @@
+package main
+
+import (
+ "database/sql"
+ "encoding/binary"
+ "errors"
+ "flag"
+ "fmt"
+ "log"
+ "math"
+ "os"
+ "strings"
+ "time"
+
+ "weatherstation/internal/database"
+)
+
+const (
+ providerOpenMeteo = "open-meteo"
+ providerCaiyun = "caiyun"
+ providerImdroid = "imdroid"
+ outputProvider = "imdroid_mix"
+)
+
+var (
+ defaultWeights = []float64{0.4, 0.3, 0.3}
+ asiaShanghai *time.Location
+)
+
+func init() {
+ loc, err := time.LoadLocation("Asia/Shanghai")
+ if err != nil {
+ loc = time.FixedZone("CST", 8*3600)
+ }
+ asiaShanghai = loc
+ log.SetFlags(log.LstdFlags | log.Lmsgprefix)
+ log.SetPrefix("[imdroidmix] ")
+}
+
+type stationInfo struct {
+ ID string
+ Alias string
+ Lat sql.NullFloat64
+ Lon sql.NullFloat64
+ Z sql.NullInt64
+ Y sql.NullInt64
+ X sql.NullInt64
+}
+
+type forecastSample struct {
+ Rain float64
+ Temp float64
+ Humidity float64
+ WindSpeed float64
+ WindGust float64
+ WindDir float64
+ Prob float64
+ Pressure float64
+ UV float64
+}
+
+type providerMatrix struct {
+ Provider string
+ Samples []forecastSample
+}
+
+type fusedRecord struct {
+ StationID string
+ IssuedAt time.Time
+ ForecastTime time.Time
+ Sample forecastSample
+ MaskApplied bool
+}
+
+type maskContext struct {
+ id string
+ alias string
+ lat float64
+ lon float64
+ z int
+ y int
+ x int
+}
+
+func main() {
+ var (
+ issuedRange string
+ stationFlag string
+ outSQLPath string
+ showProgress bool
+ applyUpserts bool
+ includeAll bool
+ debugLog bool
+ noMask bool
+ minSources int
+ maskHoursStr string
+ )
+
+ flag.StringVar(&issuedRange, "issued_range", "", "时间范围,格式 \"YYYY-MM-DD HH:MM:SS,YYYY-MM-DD HH:MM:SS\"(CST)")
+ flag.StringVar(&stationFlag, "station", "", "目标站点(可选,多个以逗号分隔)")
+ flag.BoolVar(&includeAll, "all_stations", false, "处理全部有经纬度的站点")
+ flag.StringVar(&outSQLPath, "export_sql", "imdroid_mix.sql", "输出SQL文件路径")
+ flag.BoolVar(&showProgress, "progress", false, "展示处理进度")
+ flag.BoolVar(&applyUpserts, "apply", false, "直接写入数据库(默认仅导出SQL)")
+ flag.BoolVar(&debugLog, "debug", false, "输出调试日志")
+ flag.BoolVar(&noMask, "no_mask", false, "关闭雷达掩码(不清零融合雨量)")
+ flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过")
+ flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0")
+ flag.Parse()
+
+ if strings.TrimSpace(issuedRange) == "" {
+ log.Fatalln("必须提供 --issued_range")
+ }
+
+ startIssued, endIssued, err := parseRange(issuedRange)
+ if err != nil {
+ log.Fatalf("解析 issued_range 失败: %v", err)
+ }
+ if !endIssued.After(startIssued) {
+ log.Fatalln("issued_range 结束时间必须大于起始时间")
+ }
+
+ db := database.GetDB()
+ defer database.Close()
+
+ stations, err := resolveStations(db, stationFlag, includeAll)
+ if err != nil {
+ log.Fatalf("加载站点失败: %v", err)
+ }
+ if len(stations) == 0 {
+ log.Fatalln("没有符合条件的站点")
+ }
+
+ type stationWork struct {
+ info stationInfo
+ issuedTimes []time.Time
+ providers []string
+ }
+ var jobs []stationWork
+ var totalSteps int
+
+ for _, st := range stations {
+ pSet, err := resolveProviderSet(db, st.ID, startIssued, endIssued)
+ if err != nil {
+ log.Fatalf("检测站点 %s 预报源失败: %v", st.ID, err)
+ }
+ if debugLog {
+ log.Printf("站点 %s 使用预报源: %v", st.ID, pSet)
+ }
+ issuedTimes, err := fetchIssuedHourBuckets(db, st.ID, startIssued, endIssued, pSet, minSources)
+ if err != nil {
+ log.Fatalf("查询站点 %s 的 issued_at 失败: %v", st.ID, err)
+ }
+ if len(issuedTimes) == 0 {
+ if debugLog {
+ // 打印每个provider的 issued_at 数量辅助定位
+ for _, p := range pSet {
+ cnt, first, last := debugIssuedSummary(db, st.ID, startIssued, endIssued, p)
+ log.Printf("站点 %s provider=%s issued_at数量=%d 时间范围[%s, %s]", st.ID, p, cnt, first, last)
+ }
+ }
+ continue
+ }
+ jobs = append(jobs, stationWork{info: st, issuedTimes: issuedTimes, providers: pSet})
+ totalSteps += len(issuedTimes)
+ }
+
+ if len(jobs) == 0 {
+ log.Fatalln("指定时间范围内无可融合的预报数据")
+ }
+
+ var (
+ results []fusedRecord
+ step int
+ progressFn func(current int, total int, ctx string)
+ )
+
+ if showProgress {
+ progressFn = func(current, total int, ctx string) {
+ fmt.Printf("\r处理进度 %d/%d (%s)...", current, total, ctx)
+ }
+ defer fmt.Println()
+ }
+
+ // 解析 mask_hours
+ maskHours := [3]bool{true, true, true}
+ if s := strings.TrimSpace(maskHoursStr); s != "" {
+ parts := strings.Split(s, ",")
+ for i := 0; i < len(parts) && i < 3; i++ {
+ v := strings.TrimSpace(parts[i])
+ maskHours[i] = (v == "1" || strings.EqualFold(v, "true"))
+ }
+ }
+
+ for _, job := range jobs {
+ info := job.info
+ maskCtx, hasMaskCtx := buildMaskContext(info)
+ weights := append([]float64{}, defaultWeights...)
+
+ for idx, issued := range job.issuedTimes {
+ matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers)
+ if err != nil {
+ log.Printf("站点 %s issued=%s 载入预报失败: %v", info.ID, issued.Format(time.RFC3339), err)
+ continue
+ }
+ if len(matrix) < minSources {
+ log.Printf("站点 %s issued=%s 可用源=%d < min_sources=%d,跳过", info.ID, issued.Format(time.RFC3339), len(matrix), minSources)
+ if debugLog {
+ probeProviderAtBucket(db, info.ID, job.providers, issued)
+ }
+ continue
+ }
+
+ if debugLog {
+ log.Printf("权重(使用前): station=%s issued=%s %s=%.2f %s=%.2f %s=%.2f",
+ info.ID, issued.Format("2006-01-02 15:04:05"),
+ job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2])
+ }
+
+ fused := fuseForecastsWith(matrix, effectiveWeights(weights, job.providers, matrix))
+ rainAllZero := isRainAllZero(fused)
+
+ maskApplied := false
+ if !rainAllZero && hasMaskCtx && !noMask {
+ rep, err := checkRadarMask(db, maskCtx, issued)
+ if err != nil {
+ log.Printf("站点 %s issued=%s 掩码计算出错: %v", info.ID, issued.Format(time.RFC3339), err)
+ } else {
+ if rep.Scanned {
+ // 逐小时按 mask_hours 应用
+ // 解析 mask_hours 上移至批次开始
+ // 在此应用逐小时清零
+ // 重用 maskHours 变量
+ for hidx := 0; hidx < 3; hidx++ {
+ if !maskHours[hidx] {
+ continue
+ }
+ if rep.HitsByHour[hidx] == 0 {
+ fused[hidx].Rain = 0
+ maskApplied = true
+ }
+ }
+ }
+ if debugLog {
+ log.Printf("掩码报告: station=%s issued=%s scanned=%v windOK=%v used=%s wind=%.2fm/s %.0f° tiles=%d hits=[%d,%d,%d] mask_hours=%v", info.ID, issued.Format("2006-01-02 15:04:05"), rep.Scanned, rep.WindOK, rep.UsedAlias, rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskHours)
+ }
+ }
+ }
+
+ for h := 1; h <= 3; h++ {
+ results = append(results, fusedRecord{
+ StationID: info.ID,
+ IssuedAt: issued, // 输出按“小时桶”归一的发布时间
+ ForecastTime: issued.Add(time.Duration(h) * time.Hour),
+ Sample: fused[h-1],
+ MaskApplied: maskApplied,
+ })
+ }
+
+ actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, issued, issued.Add(time.Hour))
+ if err != nil {
+ log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err)
+ } else if actualOK {
+ before := append([]float64{}, weights...)
+ weights = adjustWeightsSubset(weights, job.providers, matrix, actualRain)
+ if debugLog {
+ log.Printf("权重调整: station=%s issued=%s actual=%.3f | 之前[%s=%.2f,%s=%.2f,%s=%.2f] 之后[%s=%.2f,%s=%.2f,%s=%.2f] 参与源=%d",
+ info.ID, issued.Format("2006-01-02 15:04:05"), actualRain,
+ job.providers[0], before[0], job.providers[1], before[1], job.providers[2], before[2],
+ job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2],
+ len(matrix))
+ }
+ }
+
+ step++
+ if progressFn != nil {
+ progressFn(step, totalSteps, fmt.Sprintf("%s #%d", info.ID, idx+1))
+ }
+ }
+ }
+
+ if len(results) == 0 {
+ log.Println("未生成任何融合结果")
+ return
+ }
+
+ sqlLines := buildSQL(results)
+ if err := os.WriteFile(outSQLPath, []byte(strings.Join(sqlLines, "\n")+"\n"), 0o644); err != nil {
+ log.Fatalf("写入SQL文件失败: %v", err)
+ }
+ log.Printf("已写出SQL到 %s(共 %d 条记录)", outSQLPath, len(results))
+
+ if applyUpserts {
+ if err := applySQL(db, results); err != nil {
+ log.Fatalf("写入数据库失败: %v", err)
+ }
+ log.Printf("已写入数据库 forecast_hourly(provider=%s)", outputProvider)
+ }
+}
+
+func parseRange(r string) (time.Time, time.Time, error) {
+ parts := strings.Split(r, ",")
+ if len(parts) != 2 {
+ return time.Time{}, time.Time{}, errors.New("格式应为 start,end")
+ }
+ start, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[0]), asiaShanghai)
+ if err != nil {
+ return time.Time{}, time.Time{}, err
+ }
+ end, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[1]), asiaShanghai)
+ if err != nil {
+ return time.Time{}, time.Time{}, err
+ }
+ return start, end, nil
+}
+
+func resolveStations(db *sql.DB, stationFlag string, includeAll bool) ([]stationInfo, error) {
+ if includeAll {
+ const q = `
+ SELECT station_id,
+ COALESCE(NULLIF(station_alias,''), station_id) AS alias,
+ latitude, longitude, z, y, x
+ FROM stations
+ WHERE latitude IS NOT NULL AND longitude IS NOT NULL
+ AND device_type='WH65LP'
+ ORDER BY station_id`
+ rows, err := db.Query(q)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var out []stationInfo
+ for rows.Next() {
+ var s stationInfo
+ if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil {
+ return nil, err
+ }
+ out = append(out, s)
+ }
+ return out, nil
+ }
+
+ list := fieldsFromFlag(stationFlag)
+ if len(list) == 0 {
+ return nil, nil
+ }
+ var out []stationInfo
+ const q = `
+ SELECT station_id,
+ COALESCE(NULLIF(station_alias,''), station_id) AS alias,
+ latitude, longitude, z, y, x
+ FROM stations
+ WHERE station_id=$1`
+ for _, id := range list {
+ var s stationInfo
+ err := db.QueryRow(q, id).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ log.Printf("警告:站点 %s 不存在,跳过", id)
+ continue
+ }
+ return nil, err
+ }
+ out = append(out, s)
+ }
+ return out, nil
+}
+
+func fieldsFromFlag(v string) []string {
+ if strings.TrimSpace(v) == "" {
+ return nil
+ }
+ parts := strings.Split(v, ",")
+ out := make([]string, 0, len(parts))
+ for _, p := range parts {
+ p = strings.TrimSpace(p)
+ if p != "" {
+ out = append(out, p)
+ }
+ }
+ return out
+}
+
+func resolveProviderSet(_ *sql.DB, _ string, _ time.Time, _ time.Time) ([]string, error) {
+ // 固定三源,不做别名回退
+ return []string{providerOpenMeteo, providerCaiyun, providerImdroid}, nil
+}
+
+func hasProviderData(db *sql.DB, stationID, provider string, from, to time.Time) (bool, error) {
+ var cnt int
+ err := db.QueryRow(`
+ SELECT COUNT(*)
+ FROM forecast_hourly
+ WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`,
+ stationID, provider, from, to).Scan(&cnt)
+ if err != nil {
+ return false, err
+ }
+ return cnt > 0, nil
+}
+
+func fetchIssuedHourBuckets(db *sql.DB, stationID string, from, to time.Time, providers []string, minSources int) ([]time.Time, error) {
+ if len(providers) != 3 {
+ return nil, fmt.Errorf("provider 列表长度异常: %v", providers)
+ }
+ if minSources < 1 {
+ minSources = 1
+ }
+ if minSources > 3 {
+ minSources = 3
+ }
+ const q = `
+ WITH base AS (
+ SELECT date_trunc('hour', issued_at) AS bucket, provider
+ FROM forecast_hourly
+ WHERE station_id=$1 AND provider IN ($2, $3, $4)
+ AND issued_at BETWEEN $5 AND $6
+ GROUP BY 1,2
+ )
+ SELECT bucket
+ FROM base
+ GROUP BY bucket
+ HAVING COUNT(DISTINCT provider) >= $7
+ ORDER BY bucket`
+ rows, err := db.Query(q, stationID, providers[0], providers[1], providers[2], from, to, minSources)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var buckets []time.Time
+ for rows.Next() {
+ var t time.Time
+ if err := rows.Scan(&t); err != nil {
+ return nil, err
+ }
+ buckets = append(buckets, t.In(asiaShanghai))
+ }
+ return buckets, nil
+}
+
+// debugIssuedSummary 仅用于 --debug:统计provider在范围内的issued_at计数及首尾时间
+func debugIssuedSummary(db *sql.DB, stationID string, from, to time.Time, provider string) (count int, first string, last string) {
+ const q = `
+ SELECT COUNT(*),
+ MIN(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS first,
+ MAX(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS last
+ FROM forecast_hourly
+ WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`
+ var f, l sql.NullString
+ if err := db.QueryRow(q, stationID, provider, from, to).Scan(&count, &f, &l); err != nil {
+ return 0, "", ""
+ }
+ if f.Valid {
+ first = f.String
+ }
+ if l.Valid {
+ last = l.String
+ }
+ return
+}
+
+func loadProviderMatrix(db *sql.DB, stationID string, bucketHour time.Time, providers []string) ([]providerMatrix, error) {
+ out := make([]providerMatrix, 0, len(providers))
+ for _, provider := range providers {
+ issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour)
+ if err != nil {
+ return nil, err
+ }
+ if !ok {
+ // 该源缺席,跳过
+ continue
+ }
+ samples, err := loadForecastSamples(db, stationID, provider, issued)
+ if err != nil {
+ return nil, err
+ }
+ if len(samples) != 3 {
+ // 样本不足,视为该源缺席
+ continue
+ }
+ out = append(out, providerMatrix{Provider: provider, Samples: samples})
+ }
+ return out, nil
+}
+
+// 在某个小时桶内,选取该 provider 的最新 issued_at([bucket, bucket+1h))
+func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) {
+ const q = `
+ SELECT issued_at
+ FROM forecast_hourly
+ WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour'
+ ORDER BY issued_at DESC
+ LIMIT 1`
+ var t time.Time
+ err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t)
+ if err == sql.ErrNoRows {
+ return time.Time{}, false, nil
+ }
+ if err != nil {
+ return time.Time{}, false, err
+ }
+ return t, true, nil
+}
+
+func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) {
+ const q = `
+ SELECT forecast_time,
+ rain_mm_x1000,
+ temp_c_x100,
+ humidity_pct,
+ wind_speed_ms_x1000,
+ wind_gust_ms_x1000,
+ wind_dir_deg,
+ precip_prob_pct,
+ pressure_hpa_x100,
+ uv_index
+ FROM forecast_hourly
+ WHERE station_id=$1 AND provider=$2 AND issued_at=$3
+ ORDER BY forecast_time ASC`
+ rows, err := db.Query(q, stationID, provider, issued)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var list []forecastSample
+ for rows.Next() {
+ var (
+ ft time.Time
+ rain, temp, humidity, ws, gust, wdir, prob sql.NullInt64
+ pressure, uv sql.NullInt64
+ )
+ if err := rows.Scan(&ft, &rain, &temp, &humidity, &ws, &gust, &wdir, &prob, &pressure, &uv); err != nil {
+ return nil, err
+ }
+ // 使用“向上取整”的方式计算前视小时(与查询端 lead_hours=CEIL(...) 逻辑对齐)
+ dh := ft.Sub(issued).Hours()
+ hours := int(math.Ceil(dh - 1e-9))
+ if hours < 1 || hours > 3 {
+ continue
+ }
+ s := forecastSample{
+ Rain: float64(rain.Int64) / 1000.0,
+ Temp: float64(temp.Int64) / 100.0,
+ Humidity: float64(humidity.Int64),
+ WindSpeed: float64(ws.Int64) / 1000.0,
+ WindGust: float64(gust.Int64) / 1000.0,
+ WindDir: float64(wdir.Int64),
+ Prob: float64(prob.Int64),
+ Pressure: float64(pressure.Int64) / 100.0,
+ UV: float64(uv.Int64),
+ }
+ list = append(list, s)
+ }
+ return list, nil
+}
+
+func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample {
+ result := make([]forecastSample, 3)
+ for h := 0; h < 3; h++ {
+ var windX, windY float64
+ for pIdx, pm := range matrix {
+ if len(pm.Samples) <= h {
+ continue
+ }
+ s := pm.Samples[h]
+ w := weights[pIdx]
+ result[h].Rain += s.Rain * w
+ result[h].Temp += s.Temp * w
+ result[h].Humidity += s.Humidity * w
+ result[h].WindSpeed += s.WindSpeed * w
+ result[h].WindGust += s.WindGust * w
+ result[h].Prob += s.Prob * w
+ result[h].Pressure += s.Pressure * w
+ result[h].UV += s.UV * w
+ rad := s.WindDir * math.Pi / 180
+ windX += math.Cos(rad) * w
+ windY += math.Sin(rad) * w
+ }
+ if windX == 0 && windY == 0 {
+ result[h].WindDir = 0
+ } else {
+ dir := math.Atan2(windY, windX) * 180 / math.Pi
+ if dir < 0 {
+ dir += 360
+ }
+ result[h].WindDir = dir
+ }
+ result[h].Humidity = math.Round(result[h].Humidity)
+ result[h].Prob = math.Round(result[h].Prob)
+ result[h].UV = math.Round(result[h].UV)
+ }
+ return result
+}
+
+// fuseForecastsWith 使用与 matrix 对齐的有效权重进行融合
+func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample {
+ if len(eff) != len(matrix) {
+ return fuseForecasts(matrix, uniform(len(matrix)))
+ }
+ result := make([]forecastSample, 3)
+ for h := 0; h < 3; h++ {
+ var windX, windY float64
+ for pIdx, pm := range matrix {
+ if len(pm.Samples) <= h {
+ continue
+ }
+ s := pm.Samples[h]
+ w := eff[pIdx]
+ result[h].Rain += s.Rain * w
+ result[h].Temp += s.Temp * w
+ result[h].Humidity += s.Humidity * w
+ result[h].WindSpeed += s.WindSpeed * w
+ result[h].WindGust += s.WindGust * w
+ result[h].Prob += s.Prob * w
+ result[h].Pressure += s.Pressure * w
+ result[h].UV += s.UV * w
+ rad := s.WindDir * math.Pi / 180
+ windX += math.Cos(rad) * w
+ windY += math.Sin(rad) * w
+ }
+ if windX == 0 && windY == 0 {
+ result[h].WindDir = 0
+ } else {
+ dir := math.Atan2(windY, windX) * 180 / math.Pi
+ if dir < 0 {
+ dir += 360
+ }
+ result[h].WindDir = dir
+ }
+ result[h].Humidity = math.Round(result[h].Humidity)
+ result[h].Prob = math.Round(result[h].Prob)
+ result[h].UV = math.Round(result[h].UV)
+ }
+ return result
+}
+
+func uniform(n int) []float64 {
+ if n <= 0 {
+ return nil
+ }
+ w := make([]float64, n)
+ v := 1.0 / float64(n)
+ for i := range w {
+ w[i] = v
+ }
+ return w
+}
+
+// effectiveWeights 将基础权重映射到参与融合的 providers(按 matrix 顺序),并在子集上归一化
+func effectiveWeights(base []float64, providersAll []string, matrix []providerMatrix) []float64 {
+ m := make(map[string]int)
+ for i, p := range providersAll {
+ m[p] = i
+ }
+ eff := make([]float64, len(matrix))
+ sum := 0.0
+ for i, pm := range matrix {
+ if idx, ok := m[pm.Provider]; ok {
+ eff[i] = base[idx]
+ sum += eff[i]
+ }
+ }
+ if sum <= 0 {
+ return uniform(len(matrix))
+ }
+ for i := range eff {
+ eff[i] /= sum
+ }
+ return eff
+}
+
+// adjustWeightsSubset 仅对当前参与的 providers 调整权重,其它分量保持不变
+func adjustWeightsSubset(base []float64, providersAll []string, matrix []providerMatrix, actual float64) []float64 {
+ preds := extractProviderRainAt(matrix, 0)
+ if len(preds) == 0 {
+ return base
+ }
+ // 找最优和最差(绝对误差)
+ best, worst := 0, 0
+ for i := range preds {
+ if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
+ best = i
+ }
+ if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) {
+ worst = i
+ }
+ }
+ // provider 名到 base 索引
+ m := make(map[string]int)
+ for i, p := range providersAll {
+ m[p] = i
+ }
+ out := append([]float64{}, base...)
+ // 应用 ±0.1 调整
+ if bi, ok := m[matrix[best].Provider]; ok {
+ out[bi] += 0.1
+ }
+ if wi, ok := m[matrix[worst].Provider]; ok {
+ out[wi] -= 0.1
+ }
+ // 夹紧并归一化
+ sum := 0.0
+ for i := range out {
+ if out[i] < 0 {
+ out[i] = 0
+ }
+ sum += out[i]
+ }
+ if sum == 0 {
+ // 回到默认
+ return append([]float64{}, defaultWeights...)
+ }
+ for i := range out {
+ out[i] /= sum
+ }
+ return out
+}
+
+// probeProviderAtBucket: 调试辅助,打印该小时桶内每个provider的issued与是否有+1/+2/+3样本
+func probeProviderAtBucket(db *sql.DB, stationID string, providers []string, bucket time.Time) {
+ for _, p := range providers {
+ issued, ok, err := resolveIssuedAtInBucket(db, stationID, p, bucket)
+ if err != nil {
+ log.Printf(" provider=%s 探测错误: %v", p, err)
+ continue
+ }
+ if !ok {
+ log.Printf(" provider=%s: 小时桶内无 issued", p)
+ continue
+ }
+ leads := presentLeads(db, stationID, p, issued)
+ log.Printf(" provider=%s: issued=%s leads=%v (count=%d)", p, issued.In(asiaShanghai).Format("2006-01-02 15:04:05"), leads, len(leads))
+ }
+}
+
+// presentLeads 返回该 issued 的预测中,存在的 lead 小时集合(1..3)
+func presentLeads(db *sql.DB, stationID, provider string, issued time.Time) []int {
+ rows, err := db.Query(`SELECT forecast_time FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at=$3`, stationID, provider, issued)
+ if err != nil {
+ return nil
+ }
+ defer rows.Close()
+ var leads []int
+ for rows.Next() {
+ var ft time.Time
+ if err := rows.Scan(&ft); err != nil {
+ continue
+ }
+ dh := ft.Sub(issued).Hours()
+ h := int(math.Ceil(dh - 1e-9))
+ if h >= 1 && h <= 3 {
+ leads = append(leads, h)
+ }
+ }
+ return leads
+}
+
+func isRainAllZero(samples []forecastSample) bool {
+ for _, s := range samples {
+ if math.Abs(s.Rain) > 1e-6 {
+ return false
+ }
+ }
+ return true
+}
+
+func extractProviderRainAt(matrix []providerMatrix, hourIdx int) []float64 {
+ out := make([]float64, 0, len(matrix))
+ for _, pm := range matrix {
+ if len(pm.Samples) > hourIdx {
+ out = append(out, pm.Samples[hourIdx].Rain)
+ } else {
+ out = append(out, 0)
+ }
+ }
+ return out
+}
+
+func adjustWeights(current []float64, preds []float64, actual float64) []float64 {
+ if len(current) != len(preds) || len(current) != 3 {
+ return current
+ }
+ diffs := make([]float64, len(preds))
+ for i := range preds {
+ diffs[i] = math.Abs(preds[i] - actual)
+ }
+ minIdx, maxIdx := 0, 0
+ for i, d := range diffs {
+ if d < diffs[minIdx] {
+ minIdx = i
+ }
+ if d > diffs[maxIdx] {
+ maxIdx = i
+ }
+ }
+ if minIdx != maxIdx {
+ current[minIdx] += 0.1
+ current[maxIdx] -= 0.1
+ }
+ var sum float64
+ for i := range current {
+ if current[i] < 0 {
+ current[i] = 0
+ }
+ sum += current[i]
+ }
+ if sum == 0 {
+ return append([]float64{}, defaultWeights...)
+ }
+ for i := range current {
+ current[i] /= sum
+ }
+ return current
+}
+
+func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) {
+ const q = `
+ SELECT SUM(rain_10m_mm_x1000)
+ FROM rs485_weather_10min
+ WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
+ var sum sql.NullInt64
+ err := db.QueryRow(q, stationID, start, end).Scan(&sum)
+ if err != nil {
+ return 0, false, err
+ }
+ if !sum.Valid {
+ return 0, false, nil
+ }
+ return float64(sum.Int64) / 1000.0, true, nil
+}
+
+type tileRec struct {
+ DT time.Time
+ Width int
+ Height int
+ West float64
+ South float64
+ East float64
+ North float64
+ ResDeg float64
+ Data []byte
+}
+
+type maskReport struct {
+ Scanned bool
+ WindOK bool
+ UsedAlias string
+ WindSpeed float64
+ WindDir float64
+ Tiles int
+ HitsByHour [3]int
+}
+
+func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) {
+ rep := maskReport{}
+ windTime := issued.Add(-time.Hour)
+ // 优先用 station_id,其次别名
+ if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil {
+ return rep, err
+ } else if ok {
+ rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id
+ } else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil {
+ return rep, err2
+ } else if ok2 {
+ rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias
+ } else {
+ // 没有风场:不扫描,不应用掩码
+ return rep, nil
+ }
+
+ tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued)
+ if err != nil {
+ return rep, err
+ }
+ if len(tiles) == 0 {
+ // 无瓦片:不扫描,不应用掩码
+ return rep, nil
+ }
+ rep.Tiles = len(tiles)
+
+ rep.Scanned = true
+ const circleR = 8000.0
+ const halfAngle = 30.0
+
+ for _, t := range tiles {
+ vals, xs, ys, err := decodeTile(t)
+ if err != nil {
+ continue
+ }
+ for r := 0; r < len(vals); r++ {
+ row := vals[r]
+ lat := ys[r]
+ for c := 0; c < len(row); c++ {
+ v := row[c]
+ if v == nil {
+ continue
+ }
+ dbz := *v
+ if dbz < 40 {
+ continue
+ }
+ lon := xs[c]
+ dist := haversine(ctx.lat, ctx.lon, lat, lon)
+ // +1h:圆 或 扇形(3h)
+ if dist <= circleR {
+ rep.HitsByHour[0]++
+ }
+ if rep.WindSpeed > 0 {
+ brg := bearingDeg(ctx.lat, ctx.lon, lat, lon)
+ if angDiff(brg, rep.WindDir) <= halfAngle {
+ if dist <= rep.WindSpeed*3*3600 {
+ rep.HitsByHour[0]++
+ }
+ // +2h:圆 或 扇形(2h)
+ if dist <= circleR {
+ rep.HitsByHour[1]++
+ }
+ if dist <= rep.WindSpeed*2*3600 {
+ rep.HitsByHour[1]++
+ }
+ // +3h:扇形(3h)
+ if dist <= rep.WindSpeed*3*3600 {
+ rep.HitsByHour[2]++
+ }
+ }
+ }
+ }
+ }
+ }
+ return rep, nil
+}
+
+func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) {
+ const q = `
+ SELECT wind_speed, wind_direction
+ FROM radar_weather
+ WHERE alias=$1 AND dt <= $2
+ ORDER BY dt DESC
+ LIMIT 1`
+ var speed, dir sql.NullFloat64
+ err := db.QueryRow(q, alias, target).Scan(&speed, &dir)
+ if err == sql.ErrNoRows {
+ return 0, 0, false, nil
+ }
+ if err != nil {
+ return 0, 0, false, err
+ }
+ if !speed.Valid || !dir.Valid {
+ return 0, 0, false, nil
+ }
+ return speed.Float64, dir.Float64, true, nil
+}
+
+func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) {
+ const q = `
+ SELECT dt, width, height, west, south, east, north, res_deg, data
+ FROM radar_tiles
+ WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5
+ ORDER BY dt`
+ rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var list []tileRec
+ for rows.Next() {
+ var t tileRec
+ if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil {
+ return nil, err
+ }
+ list = append(list, t)
+ }
+ return list, nil
+}
+
+func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) {
+ w, h := t.Width, t.Height
+ if w <= 0 || h <= 0 {
+ return nil, nil, nil, fmt.Errorf("非法瓦片尺寸")
+ }
+ if len(t.Data) < w*h*2 {
+ return nil, nil, nil, fmt.Errorf("数据长度不足")
+ }
+ xs := make([]float64, w)
+ for c := 0; c < w; c++ {
+ xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
+ }
+ ys := make([]float64, h)
+ for r := 0; r < h; r++ {
+ ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
+ }
+ vals := make([][]*float64, h)
+ off := 0
+ for r := 0; r < h; r++ {
+ row := make([]*float64, w)
+ for c := 0; c < w; c++ {
+ v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
+ off += 2
+ if v >= 32766 {
+ row[c] = nil
+ continue
+ }
+ dbz := float64(v) / 10.0
+ if dbz < 0 {
+ dbz = 0
+ } else if dbz > 75 {
+ dbz = 75
+ }
+ value := dbz
+ row[c] = &value
+ }
+ vals[r] = row
+ }
+ return vals, xs, ys, nil
+}
+
+func haversine(lat1, lon1, lat2, lon2 float64) float64 {
+ const R = 6371000.0
+ dLat := toRad(lat2 - lat1)
+ dLon := toRad(lon2 - lon1)
+ a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
+ c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
+ return R * c
+}
+
+func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
+ φ1 := toRad(lat1)
+ φ2 := toRad(lat2)
+ Δλ := toRad(lon2 - lon1)
+ y := math.Sin(Δλ) * math.Cos(φ2)
+ x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
+ brg := toDeg(math.Atan2(y, x))
+ if brg < 0 {
+ brg += 360
+ }
+ return brg
+}
+
+func angDiff(a, b float64) float64 {
+ d := math.Mod(a-b+540, 360) - 180
+ if d < 0 {
+ d = -d
+ }
+ return math.Abs(d)
+}
+
+func toRad(d float64) float64 { return d * math.Pi / 180 }
+func toDeg(r float64) float64 { return r * 180 / math.Pi }
+
+func buildMaskContext(info stationInfo) (maskContext, bool) {
+ if !info.Lat.Valid || !info.Lon.Valid {
+ return maskContext{}, false
+ }
+ if !info.Z.Valid || !info.Y.Valid || !info.X.Valid {
+ return maskContext{}, false
+ }
+ alias := info.Alias
+ if strings.TrimSpace(alias) == "" {
+ alias = info.ID
+ }
+ return maskContext{
+ id: info.ID,
+ alias: alias,
+ lat: info.Lat.Float64,
+ lon: info.Lon.Float64,
+ z: int(info.Z.Int64),
+ y: int(info.Y.Int64),
+ x: int(info.X.Int64),
+ }, true
+}
+
+func buildSQL(records []fusedRecord) []string {
+ lines := make([]string, 0, len(records)*2)
+ header := "-- SQL 输出 (默认未执行)"
+ lines = append(lines, header)
+ for _, rec := range records {
+ sql := fmt.Sprintf(
+ "INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index)\nVALUES ('%s', '%s', '%s'::timestamptz, '%s'::timestamptz, %d, %d, %d, %d, %d, %d, %d, %d, %d)\nON CONFLICT (station_id, provider, issued_at, forecast_time)\nDO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000, temp_c_x100=EXCLUDED.temp_c_x100, humidity_pct=EXCLUDED.humidity_pct, wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000, wind_dir_deg=EXCLUDED.wind_dir_deg, precip_prob_pct=EXCLUDED.precip_prob_pct, pressure_hpa_x100=EXCLUDED.pressure_hpa_x100, uv_index=EXCLUDED.uv_index;",
+ rec.StationID,
+ outputProvider,
+ rec.IssuedAt.In(asiaShanghai).Format(time.RFC3339),
+ rec.ForecastTime.In(asiaShanghai).Format(time.RFC3339),
+ int(math.Round(rec.Sample.Rain*1000)),
+ int(math.Round(rec.Sample.Temp*100)),
+ int(math.Round(rec.Sample.Humidity)),
+ int(math.Round(rec.Sample.WindSpeed*1000)),
+ int(math.Round(rec.Sample.WindGust*1000)),
+ int(math.Round(rec.Sample.WindDir)),
+ int(math.Round(rec.Sample.Prob)),
+ int(math.Round(rec.Sample.Pressure*100)),
+ int(math.Round(rec.Sample.UV)),
+ )
+ lines = append(lines, sql, "")
+ }
+ return lines
+}
+
+func applySQL(db *sql.DB, records []fusedRecord) error {
+ tx, err := db.Begin()
+ if err != nil {
+ return err
+ }
+ stmt, err := tx.Prepare(`
+ INSERT INTO forecast_hourly (
+ station_id, provider, issued_at, forecast_time,
+ rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000,
+ wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index
+ ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
+ ON CONFLICT (station_id, provider, issued_at, forecast_time)
+ DO UPDATE SET
+ rain_mm_x1000 = EXCLUDED.rain_mm_x1000,
+ temp_c_x100 = EXCLUDED.temp_c_x100,
+ humidity_pct = EXCLUDED.humidity_pct,
+ wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
+ wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
+ wind_dir_deg = EXCLUDED.wind_dir_deg,
+ precip_prob_pct = EXCLUDED.precip_prob_pct,
+ pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100,
+ uv_index = EXCLUDED.uv_index`)
+ if err != nil {
+ tx.Rollback()
+ return err
+ }
+ defer stmt.Close()
+
+ for _, rec := range records {
+ if _, err := stmt.Exec(
+ rec.StationID,
+ outputProvider,
+ rec.IssuedAt,
+ rec.ForecastTime,
+ int(math.Round(rec.Sample.Rain*1000)),
+ int(math.Round(rec.Sample.Temp*100)),
+ int(math.Round(rec.Sample.Humidity)),
+ int(math.Round(rec.Sample.WindSpeed*1000)),
+ int(math.Round(rec.Sample.WindGust*1000)),
+ int(math.Round(rec.Sample.WindDir)),
+ int(math.Round(rec.Sample.Prob)),
+ int(math.Round(rec.Sample.Pressure*100)),
+ int(math.Round(rec.Sample.UV)),
+ ); err != nil {
+ tx.Rollback()
+ return err
+ }
+ }
+ return tx.Commit()
+}
diff --git a/templates/index.html b/templates/index.html
index cadc7ef..c4fa18e 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -476,9 +476,11 @@