From cbeb623f20fe48b8945682d4bda090437b925d79 Mon Sep 17 00:00:00 2001 From: yarnom Date: Sun, 12 Oct 2025 14:37:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E9=9B=A8=E9=87=8F=E8=9E=8D=E5=90=88=E7=9A=84=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/imdroidmix/main.go | 1147 ++++++++++++++++++++++++++++++++++++++++ templates/index.html | 2 + 2 files changed, 1149 insertions(+) create mode 100644 cmd/imdroidmix/main.go diff --git a/cmd/imdroidmix/main.go b/cmd/imdroidmix/main.go new file mode 100644 index 0000000..6c3aa16 --- /dev/null +++ b/cmd/imdroidmix/main.go @@ -0,0 +1,1147 @@ +package main + +import ( + "database/sql" + "encoding/binary" + "errors" + "flag" + "fmt" + "log" + "math" + "os" + "strings" + "time" + + "weatherstation/internal/database" +) + +const ( + providerOpenMeteo = "open-meteo" + providerCaiyun = "caiyun" + providerImdroid = "imdroid" + outputProvider = "imdroid_mix" +) + +var ( + defaultWeights = []float64{0.4, 0.3, 0.3} + asiaShanghai *time.Location +) + +func init() { + loc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + loc = time.FixedZone("CST", 8*3600) + } + asiaShanghai = loc + log.SetFlags(log.LstdFlags | log.Lmsgprefix) + log.SetPrefix("[imdroidmix] ") +} + +type stationInfo struct { + ID string + Alias string + Lat sql.NullFloat64 + Lon sql.NullFloat64 + Z sql.NullInt64 + Y sql.NullInt64 + X sql.NullInt64 +} + +type forecastSample struct { + Rain float64 + Temp float64 + Humidity float64 + WindSpeed float64 + WindGust float64 + WindDir float64 + Prob float64 + Pressure float64 + UV float64 +} + +type providerMatrix struct { + Provider string + Samples []forecastSample +} + +type fusedRecord struct { + StationID string + IssuedAt time.Time + ForecastTime time.Time + Sample forecastSample + MaskApplied bool +} + +type maskContext struct { + id string + alias string + lat float64 + lon float64 + z int + y int + x int +} + +func main() { + var ( + issuedRange string + stationFlag string + outSQLPath string + showProgress bool + applyUpserts bool + includeAll bool + debugLog bool + noMask bool + minSources int + maskHoursStr string + ) + + flag.StringVar(&issuedRange, "issued_range", "", "时间范围,格式 \"YYYY-MM-DD HH:MM:SS,YYYY-MM-DD HH:MM:SS\"(CST)") + flag.StringVar(&stationFlag, "station", "", "目标站点(可选,多个以逗号分隔)") + flag.BoolVar(&includeAll, "all_stations", false, "处理全部有经纬度的站点") + flag.StringVar(&outSQLPath, "export_sql", "imdroid_mix.sql", "输出SQL文件路径") + flag.BoolVar(&showProgress, "progress", false, "展示处理进度") + flag.BoolVar(&applyUpserts, "apply", false, "直接写入数据库(默认仅导出SQL)") + flag.BoolVar(&debugLog, "debug", false, "输出调试日志") + flag.BoolVar(&noMask, "no_mask", false, "关闭雷达掩码(不清零融合雨量)") + flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过") + flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0") + flag.Parse() + + if strings.TrimSpace(issuedRange) == "" { + log.Fatalln("必须提供 --issued_range") + } + + startIssued, endIssued, err := parseRange(issuedRange) + if err != nil { + log.Fatalf("解析 issued_range 失败: %v", err) + } + if !endIssued.After(startIssued) { + log.Fatalln("issued_range 结束时间必须大于起始时间") + } + + db := database.GetDB() + defer database.Close() + + stations, err := resolveStations(db, stationFlag, includeAll) + if err != nil { + log.Fatalf("加载站点失败: %v", err) + } + if len(stations) == 0 { + log.Fatalln("没有符合条件的站点") + } + + type stationWork struct { + info stationInfo + issuedTimes []time.Time + providers []string + } + var jobs []stationWork + var totalSteps int + + for _, st := range stations { + pSet, err := resolveProviderSet(db, st.ID, startIssued, endIssued) + if err != nil { + log.Fatalf("检测站点 %s 预报源失败: %v", st.ID, err) + } + if debugLog { + log.Printf("站点 %s 使用预报源: %v", st.ID, pSet) + } + issuedTimes, err := fetchIssuedHourBuckets(db, st.ID, startIssued, endIssued, pSet, minSources) + if err != nil { + log.Fatalf("查询站点 %s 的 issued_at 失败: %v", st.ID, err) + } + if len(issuedTimes) == 0 { + if debugLog { + // 打印每个provider的 issued_at 数量辅助定位 + for _, p := range pSet { + cnt, first, last := debugIssuedSummary(db, st.ID, startIssued, endIssued, p) + log.Printf("站点 %s provider=%s issued_at数量=%d 时间范围[%s, %s]", st.ID, p, cnt, first, last) + } + } + continue + } + jobs = append(jobs, stationWork{info: st, issuedTimes: issuedTimes, providers: pSet}) + totalSteps += len(issuedTimes) + } + + if len(jobs) == 0 { + log.Fatalln("指定时间范围内无可融合的预报数据") + } + + var ( + results []fusedRecord + step int + progressFn func(current int, total int, ctx string) + ) + + if showProgress { + progressFn = func(current, total int, ctx string) { + fmt.Printf("\r处理进度 %d/%d (%s)...", current, total, ctx) + } + defer fmt.Println() + } + + // 解析 mask_hours + maskHours := [3]bool{true, true, true} + if s := strings.TrimSpace(maskHoursStr); s != "" { + parts := strings.Split(s, ",") + for i := 0; i < len(parts) && i < 3; i++ { + v := strings.TrimSpace(parts[i]) + maskHours[i] = (v == "1" || strings.EqualFold(v, "true")) + } + } + + for _, job := range jobs { + info := job.info + maskCtx, hasMaskCtx := buildMaskContext(info) + weights := append([]float64{}, defaultWeights...) + + for idx, issued := range job.issuedTimes { + matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers) + if err != nil { + log.Printf("站点 %s issued=%s 载入预报失败: %v", info.ID, issued.Format(time.RFC3339), err) + continue + } + if len(matrix) < minSources { + log.Printf("站点 %s issued=%s 可用源=%d < min_sources=%d,跳过", info.ID, issued.Format(time.RFC3339), len(matrix), minSources) + if debugLog { + probeProviderAtBucket(db, info.ID, job.providers, issued) + } + continue + } + + if debugLog { + log.Printf("权重(使用前): station=%s issued=%s %s=%.2f %s=%.2f %s=%.2f", + info.ID, issued.Format("2006-01-02 15:04:05"), + job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2]) + } + + fused := fuseForecastsWith(matrix, effectiveWeights(weights, job.providers, matrix)) + rainAllZero := isRainAllZero(fused) + + maskApplied := false + if !rainAllZero && hasMaskCtx && !noMask { + rep, err := checkRadarMask(db, maskCtx, issued) + if err != nil { + log.Printf("站点 %s issued=%s 掩码计算出错: %v", info.ID, issued.Format(time.RFC3339), err) + } else { + if rep.Scanned { + // 逐小时按 mask_hours 应用 + // 解析 mask_hours 上移至批次开始 + // 在此应用逐小时清零 + // 重用 maskHours 变量 + for hidx := 0; hidx < 3; hidx++ { + if !maskHours[hidx] { + continue + } + if rep.HitsByHour[hidx] == 0 { + fused[hidx].Rain = 0 + maskApplied = true + } + } + } + if debugLog { + log.Printf("掩码报告: station=%s issued=%s scanned=%v windOK=%v used=%s wind=%.2fm/s %.0f° tiles=%d hits=[%d,%d,%d] mask_hours=%v", info.ID, issued.Format("2006-01-02 15:04:05"), rep.Scanned, rep.WindOK, rep.UsedAlias, rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskHours) + } + } + } + + for h := 1; h <= 3; h++ { + results = append(results, fusedRecord{ + StationID: info.ID, + IssuedAt: issued, // 输出按“小时桶”归一的发布时间 + ForecastTime: issued.Add(time.Duration(h) * time.Hour), + Sample: fused[h-1], + MaskApplied: maskApplied, + }) + } + + actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, issued, issued.Add(time.Hour)) + if err != nil { + log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err) + } else if actualOK { + before := append([]float64{}, weights...) + weights = adjustWeightsSubset(weights, job.providers, matrix, actualRain) + if debugLog { + log.Printf("权重调整: station=%s issued=%s actual=%.3f | 之前[%s=%.2f,%s=%.2f,%s=%.2f] 之后[%s=%.2f,%s=%.2f,%s=%.2f] 参与源=%d", + info.ID, issued.Format("2006-01-02 15:04:05"), actualRain, + job.providers[0], before[0], job.providers[1], before[1], job.providers[2], before[2], + job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2], + len(matrix)) + } + } + + step++ + if progressFn != nil { + progressFn(step, totalSteps, fmt.Sprintf("%s #%d", info.ID, idx+1)) + } + } + } + + if len(results) == 0 { + log.Println("未生成任何融合结果") + return + } + + sqlLines := buildSQL(results) + if err := os.WriteFile(outSQLPath, []byte(strings.Join(sqlLines, "\n")+"\n"), 0o644); err != nil { + log.Fatalf("写入SQL文件失败: %v", err) + } + log.Printf("已写出SQL到 %s(共 %d 条记录)", outSQLPath, len(results)) + + if applyUpserts { + if err := applySQL(db, results); err != nil { + log.Fatalf("写入数据库失败: %v", err) + } + log.Printf("已写入数据库 forecast_hourly(provider=%s)", outputProvider) + } +} + +func parseRange(r string) (time.Time, time.Time, error) { + parts := strings.Split(r, ",") + if len(parts) != 2 { + return time.Time{}, time.Time{}, errors.New("格式应为 start,end") + } + start, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[0]), asiaShanghai) + if err != nil { + return time.Time{}, time.Time{}, err + } + end, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[1]), asiaShanghai) + if err != nil { + return time.Time{}, time.Time{}, err + } + return start, end, nil +} + +func resolveStations(db *sql.DB, stationFlag string, includeAll bool) ([]stationInfo, error) { + if includeAll { + const q = ` + SELECT station_id, + COALESCE(NULLIF(station_alias,''), station_id) AS alias, + latitude, longitude, z, y, x + FROM stations + WHERE latitude IS NOT NULL AND longitude IS NOT NULL + AND device_type='WH65LP' + ORDER BY station_id` + rows, err := db.Query(q) + if err != nil { + return nil, err + } + defer rows.Close() + var out []stationInfo + for rows.Next() { + var s stationInfo + if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil { + return nil, err + } + out = append(out, s) + } + return out, nil + } + + list := fieldsFromFlag(stationFlag) + if len(list) == 0 { + return nil, nil + } + var out []stationInfo + const q = ` + SELECT station_id, + COALESCE(NULLIF(station_alias,''), station_id) AS alias, + latitude, longitude, z, y, x + FROM stations + WHERE station_id=$1` + for _, id := range list { + var s stationInfo + err := db.QueryRow(q, id).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X) + if err != nil { + if err == sql.ErrNoRows { + log.Printf("警告:站点 %s 不存在,跳过", id) + continue + } + return nil, err + } + out = append(out, s) + } + return out, nil +} + +func fieldsFromFlag(v string) []string { + if strings.TrimSpace(v) == "" { + return nil + } + parts := strings.Split(v, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +func resolveProviderSet(_ *sql.DB, _ string, _ time.Time, _ time.Time) ([]string, error) { + // 固定三源,不做别名回退 + return []string{providerOpenMeteo, providerCaiyun, providerImdroid}, nil +} + +func hasProviderData(db *sql.DB, stationID, provider string, from, to time.Time) (bool, error) { + var cnt int + err := db.QueryRow(` + SELECT COUNT(*) + FROM forecast_hourly + WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`, + stationID, provider, from, to).Scan(&cnt) + if err != nil { + return false, err + } + return cnt > 0, nil +} + +func fetchIssuedHourBuckets(db *sql.DB, stationID string, from, to time.Time, providers []string, minSources int) ([]time.Time, error) { + if len(providers) != 3 { + return nil, fmt.Errorf("provider 列表长度异常: %v", providers) + } + if minSources < 1 { + minSources = 1 + } + if minSources > 3 { + minSources = 3 + } + const q = ` + WITH base AS ( + SELECT date_trunc('hour', issued_at) AS bucket, provider + FROM forecast_hourly + WHERE station_id=$1 AND provider IN ($2, $3, $4) + AND issued_at BETWEEN $5 AND $6 + GROUP BY 1,2 + ) + SELECT bucket + FROM base + GROUP BY bucket + HAVING COUNT(DISTINCT provider) >= $7 + ORDER BY bucket` + rows, err := db.Query(q, stationID, providers[0], providers[1], providers[2], from, to, minSources) + if err != nil { + return nil, err + } + defer rows.Close() + var buckets []time.Time + for rows.Next() { + var t time.Time + if err := rows.Scan(&t); err != nil { + return nil, err + } + buckets = append(buckets, t.In(asiaShanghai)) + } + return buckets, nil +} + +// debugIssuedSummary 仅用于 --debug:统计provider在范围内的issued_at计数及首尾时间 +func debugIssuedSummary(db *sql.DB, stationID string, from, to time.Time, provider string) (count int, first string, last string) { + const q = ` + SELECT COUNT(*), + MIN(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS first, + MAX(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS last + FROM forecast_hourly + WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4` + var f, l sql.NullString + if err := db.QueryRow(q, stationID, provider, from, to).Scan(&count, &f, &l); err != nil { + return 0, "", "" + } + if f.Valid { + first = f.String + } + if l.Valid { + last = l.String + } + return +} + +func loadProviderMatrix(db *sql.DB, stationID string, bucketHour time.Time, providers []string) ([]providerMatrix, error) { + out := make([]providerMatrix, 0, len(providers)) + for _, provider := range providers { + issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour) + if err != nil { + return nil, err + } + if !ok { + // 该源缺席,跳过 + continue + } + samples, err := loadForecastSamples(db, stationID, provider, issued) + if err != nil { + return nil, err + } + if len(samples) != 3 { + // 样本不足,视为该源缺席 + continue + } + out = append(out, providerMatrix{Provider: provider, Samples: samples}) + } + return out, nil +} + +// 在某个小时桶内,选取该 provider 的最新 issued_at([bucket, bucket+1h)) +func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) { + const q = ` + SELECT issued_at + FROM forecast_hourly + WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour' + ORDER BY issued_at DESC + LIMIT 1` + var t time.Time + err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t) + if err == sql.ErrNoRows { + return time.Time{}, false, nil + } + if err != nil { + return time.Time{}, false, err + } + return t, true, nil +} + +func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) { + const q = ` + SELECT forecast_time, + rain_mm_x1000, + temp_c_x100, + humidity_pct, + wind_speed_ms_x1000, + wind_gust_ms_x1000, + wind_dir_deg, + precip_prob_pct, + pressure_hpa_x100, + uv_index + FROM forecast_hourly + WHERE station_id=$1 AND provider=$2 AND issued_at=$3 + ORDER BY forecast_time ASC` + rows, err := db.Query(q, stationID, provider, issued) + if err != nil { + return nil, err + } + defer rows.Close() + var list []forecastSample + for rows.Next() { + var ( + ft time.Time + rain, temp, humidity, ws, gust, wdir, prob sql.NullInt64 + pressure, uv sql.NullInt64 + ) + if err := rows.Scan(&ft, &rain, &temp, &humidity, &ws, &gust, &wdir, &prob, &pressure, &uv); err != nil { + return nil, err + } + // 使用“向上取整”的方式计算前视小时(与查询端 lead_hours=CEIL(...) 逻辑对齐) + dh := ft.Sub(issued).Hours() + hours := int(math.Ceil(dh - 1e-9)) + if hours < 1 || hours > 3 { + continue + } + s := forecastSample{ + Rain: float64(rain.Int64) / 1000.0, + Temp: float64(temp.Int64) / 100.0, + Humidity: float64(humidity.Int64), + WindSpeed: float64(ws.Int64) / 1000.0, + WindGust: float64(gust.Int64) / 1000.0, + WindDir: float64(wdir.Int64), + Prob: float64(prob.Int64), + Pressure: float64(pressure.Int64) / 100.0, + UV: float64(uv.Int64), + } + list = append(list, s) + } + return list, nil +} + +func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample { + result := make([]forecastSample, 3) + for h := 0; h < 3; h++ { + var windX, windY float64 + for pIdx, pm := range matrix { + if len(pm.Samples) <= h { + continue + } + s := pm.Samples[h] + w := weights[pIdx] + result[h].Rain += s.Rain * w + result[h].Temp += s.Temp * w + result[h].Humidity += s.Humidity * w + result[h].WindSpeed += s.WindSpeed * w + result[h].WindGust += s.WindGust * w + result[h].Prob += s.Prob * w + result[h].Pressure += s.Pressure * w + result[h].UV += s.UV * w + rad := s.WindDir * math.Pi / 180 + windX += math.Cos(rad) * w + windY += math.Sin(rad) * w + } + if windX == 0 && windY == 0 { + result[h].WindDir = 0 + } else { + dir := math.Atan2(windY, windX) * 180 / math.Pi + if dir < 0 { + dir += 360 + } + result[h].WindDir = dir + } + result[h].Humidity = math.Round(result[h].Humidity) + result[h].Prob = math.Round(result[h].Prob) + result[h].UV = math.Round(result[h].UV) + } + return result +} + +// fuseForecastsWith 使用与 matrix 对齐的有效权重进行融合 +func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample { + if len(eff) != len(matrix) { + return fuseForecasts(matrix, uniform(len(matrix))) + } + result := make([]forecastSample, 3) + for h := 0; h < 3; h++ { + var windX, windY float64 + for pIdx, pm := range matrix { + if len(pm.Samples) <= h { + continue + } + s := pm.Samples[h] + w := eff[pIdx] + result[h].Rain += s.Rain * w + result[h].Temp += s.Temp * w + result[h].Humidity += s.Humidity * w + result[h].WindSpeed += s.WindSpeed * w + result[h].WindGust += s.WindGust * w + result[h].Prob += s.Prob * w + result[h].Pressure += s.Pressure * w + result[h].UV += s.UV * w + rad := s.WindDir * math.Pi / 180 + windX += math.Cos(rad) * w + windY += math.Sin(rad) * w + } + if windX == 0 && windY == 0 { + result[h].WindDir = 0 + } else { + dir := math.Atan2(windY, windX) * 180 / math.Pi + if dir < 0 { + dir += 360 + } + result[h].WindDir = dir + } + result[h].Humidity = math.Round(result[h].Humidity) + result[h].Prob = math.Round(result[h].Prob) + result[h].UV = math.Round(result[h].UV) + } + return result +} + +func uniform(n int) []float64 { + if n <= 0 { + return nil + } + w := make([]float64, n) + v := 1.0 / float64(n) + for i := range w { + w[i] = v + } + return w +} + +// effectiveWeights 将基础权重映射到参与融合的 providers(按 matrix 顺序),并在子集上归一化 +func effectiveWeights(base []float64, providersAll []string, matrix []providerMatrix) []float64 { + m := make(map[string]int) + for i, p := range providersAll { + m[p] = i + } + eff := make([]float64, len(matrix)) + sum := 0.0 + for i, pm := range matrix { + if idx, ok := m[pm.Provider]; ok { + eff[i] = base[idx] + sum += eff[i] + } + } + if sum <= 0 { + return uniform(len(matrix)) + } + for i := range eff { + eff[i] /= sum + } + return eff +} + +// adjustWeightsSubset 仅对当前参与的 providers 调整权重,其它分量保持不变 +func adjustWeightsSubset(base []float64, providersAll []string, matrix []providerMatrix, actual float64) []float64 { + preds := extractProviderRainAt(matrix, 0) + if len(preds) == 0 { + return base + } + // 找最优和最差(绝对误差) + best, worst := 0, 0 + for i := range preds { + if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) { + best = i + } + if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) { + worst = i + } + } + // provider 名到 base 索引 + m := make(map[string]int) + for i, p := range providersAll { + m[p] = i + } + out := append([]float64{}, base...) + // 应用 ±0.1 调整 + if bi, ok := m[matrix[best].Provider]; ok { + out[bi] += 0.1 + } + if wi, ok := m[matrix[worst].Provider]; ok { + out[wi] -= 0.1 + } + // 夹紧并归一化 + sum := 0.0 + for i := range out { + if out[i] < 0 { + out[i] = 0 + } + sum += out[i] + } + if sum == 0 { + // 回到默认 + return append([]float64{}, defaultWeights...) + } + for i := range out { + out[i] /= sum + } + return out +} + +// probeProviderAtBucket: 调试辅助,打印该小时桶内每个provider的issued与是否有+1/+2/+3样本 +func probeProviderAtBucket(db *sql.DB, stationID string, providers []string, bucket time.Time) { + for _, p := range providers { + issued, ok, err := resolveIssuedAtInBucket(db, stationID, p, bucket) + if err != nil { + log.Printf(" provider=%s 探测错误: %v", p, err) + continue + } + if !ok { + log.Printf(" provider=%s: 小时桶内无 issued", p) + continue + } + leads := presentLeads(db, stationID, p, issued) + log.Printf(" provider=%s: issued=%s leads=%v (count=%d)", p, issued.In(asiaShanghai).Format("2006-01-02 15:04:05"), leads, len(leads)) + } +} + +// presentLeads 返回该 issued 的预测中,存在的 lead 小时集合(1..3) +func presentLeads(db *sql.DB, stationID, provider string, issued time.Time) []int { + rows, err := db.Query(`SELECT forecast_time FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at=$3`, stationID, provider, issued) + if err != nil { + return nil + } + defer rows.Close() + var leads []int + for rows.Next() { + var ft time.Time + if err := rows.Scan(&ft); err != nil { + continue + } + dh := ft.Sub(issued).Hours() + h := int(math.Ceil(dh - 1e-9)) + if h >= 1 && h <= 3 { + leads = append(leads, h) + } + } + return leads +} + +func isRainAllZero(samples []forecastSample) bool { + for _, s := range samples { + if math.Abs(s.Rain) > 1e-6 { + return false + } + } + return true +} + +func extractProviderRainAt(matrix []providerMatrix, hourIdx int) []float64 { + out := make([]float64, 0, len(matrix)) + for _, pm := range matrix { + if len(pm.Samples) > hourIdx { + out = append(out, pm.Samples[hourIdx].Rain) + } else { + out = append(out, 0) + } + } + return out +} + +func adjustWeights(current []float64, preds []float64, actual float64) []float64 { + if len(current) != len(preds) || len(current) != 3 { + return current + } + diffs := make([]float64, len(preds)) + for i := range preds { + diffs[i] = math.Abs(preds[i] - actual) + } + minIdx, maxIdx := 0, 0 + for i, d := range diffs { + if d < diffs[minIdx] { + minIdx = i + } + if d > diffs[maxIdx] { + maxIdx = i + } + } + if minIdx != maxIdx { + current[minIdx] += 0.1 + current[maxIdx] -= 0.1 + } + var sum float64 + for i := range current { + if current[i] < 0 { + current[i] = 0 + } + sum += current[i] + } + if sum == 0 { + return append([]float64{}, defaultWeights...) + } + for i := range current { + current[i] /= sum + } + return current +} + +func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) { + const q = ` + SELECT SUM(rain_10m_mm_x1000) + FROM rs485_weather_10min + WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3` + var sum sql.NullInt64 + err := db.QueryRow(q, stationID, start, end).Scan(&sum) + if err != nil { + return 0, false, err + } + if !sum.Valid { + return 0, false, nil + } + return float64(sum.Int64) / 1000.0, true, nil +} + +type tileRec struct { + DT time.Time + Width int + Height int + West float64 + South float64 + East float64 + North float64 + ResDeg float64 + Data []byte +} + +type maskReport struct { + Scanned bool + WindOK bool + UsedAlias string + WindSpeed float64 + WindDir float64 + Tiles int + HitsByHour [3]int +} + +func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) { + rep := maskReport{} + windTime := issued.Add(-time.Hour) + // 优先用 station_id,其次别名 + if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil { + return rep, err + } else if ok { + rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id + } else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil { + return rep, err2 + } else if ok2 { + rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias + } else { + // 没有风场:不扫描,不应用掩码 + return rep, nil + } + + tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued) + if err != nil { + return rep, err + } + if len(tiles) == 0 { + // 无瓦片:不扫描,不应用掩码 + return rep, nil + } + rep.Tiles = len(tiles) + + rep.Scanned = true + const circleR = 8000.0 + const halfAngle = 30.0 + + for _, t := range tiles { + vals, xs, ys, err := decodeTile(t) + if err != nil { + continue + } + for r := 0; r < len(vals); r++ { + row := vals[r] + lat := ys[r] + for c := 0; c < len(row); c++ { + v := row[c] + if v == nil { + continue + } + dbz := *v + if dbz < 40 { + continue + } + lon := xs[c] + dist := haversine(ctx.lat, ctx.lon, lat, lon) + // +1h:圆 或 扇形(3h) + if dist <= circleR { + rep.HitsByHour[0]++ + } + if rep.WindSpeed > 0 { + brg := bearingDeg(ctx.lat, ctx.lon, lat, lon) + if angDiff(brg, rep.WindDir) <= halfAngle { + if dist <= rep.WindSpeed*3*3600 { + rep.HitsByHour[0]++ + } + // +2h:圆 或 扇形(2h) + if dist <= circleR { + rep.HitsByHour[1]++ + } + if dist <= rep.WindSpeed*2*3600 { + rep.HitsByHour[1]++ + } + // +3h:扇形(3h) + if dist <= rep.WindSpeed*3*3600 { + rep.HitsByHour[2]++ + } + } + } + } + } + } + return rep, nil +} + +func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) { + const q = ` + SELECT wind_speed, wind_direction + FROM radar_weather + WHERE alias=$1 AND dt <= $2 + ORDER BY dt DESC + LIMIT 1` + var speed, dir sql.NullFloat64 + err := db.QueryRow(q, alias, target).Scan(&speed, &dir) + if err == sql.ErrNoRows { + return 0, 0, false, nil + } + if err != nil { + return 0, 0, false, err + } + if !speed.Valid || !dir.Valid { + return 0, 0, false, nil + } + return speed.Float64, dir.Float64, true, nil +} + +func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) { + const q = ` + SELECT dt, width, height, west, south, east, north, res_deg, data + FROM radar_tiles + WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5 + ORDER BY dt` + rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + var list []tileRec + for rows.Next() { + var t tileRec + if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil { + return nil, err + } + list = append(list, t) + } + return list, nil +} + +func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) { + w, h := t.Width, t.Height + if w <= 0 || h <= 0 { + return nil, nil, nil, fmt.Errorf("非法瓦片尺寸") + } + if len(t.Data) < w*h*2 { + return nil, nil, nil, fmt.Errorf("数据长度不足") + } + xs := make([]float64, w) + for c := 0; c < w; c++ { + xs[c] = t.West + (float64(c)+0.5)*t.ResDeg + } + ys := make([]float64, h) + for r := 0; r < h; r++ { + ys[r] = t.South + (float64(r)+0.5)*t.ResDeg + } + vals := make([][]*float64, h) + off := 0 + for r := 0; r < h; r++ { + row := make([]*float64, w) + for c := 0; c < w; c++ { + v := int16(binary.BigEndian.Uint16(t.Data[off : off+2])) + off += 2 + if v >= 32766 { + row[c] = nil + continue + } + dbz := float64(v) / 10.0 + if dbz < 0 { + dbz = 0 + } else if dbz > 75 { + dbz = 75 + } + value := dbz + row[c] = &value + } + vals[r] = row + } + return vals, xs, ys, nil +} + +func haversine(lat1, lon1, lat2, lon2 float64) float64 { + const R = 6371000.0 + dLat := toRad(lat2 - lat1) + dLon := toRad(lon2 - lon1) + a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2) + c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a)) + return R * c +} + +func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 { + φ1 := toRad(lat1) + φ2 := toRad(lat2) + Δλ := toRad(lon2 - lon1) + y := math.Sin(Δλ) * math.Cos(φ2) + x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ) + brg := toDeg(math.Atan2(y, x)) + if brg < 0 { + brg += 360 + } + return brg +} + +func angDiff(a, b float64) float64 { + d := math.Mod(a-b+540, 360) - 180 + if d < 0 { + d = -d + } + return math.Abs(d) +} + +func toRad(d float64) float64 { return d * math.Pi / 180 } +func toDeg(r float64) float64 { return r * 180 / math.Pi } + +func buildMaskContext(info stationInfo) (maskContext, bool) { + if !info.Lat.Valid || !info.Lon.Valid { + return maskContext{}, false + } + if !info.Z.Valid || !info.Y.Valid || !info.X.Valid { + return maskContext{}, false + } + alias := info.Alias + if strings.TrimSpace(alias) == "" { + alias = info.ID + } + return maskContext{ + id: info.ID, + alias: alias, + lat: info.Lat.Float64, + lon: info.Lon.Float64, + z: int(info.Z.Int64), + y: int(info.Y.Int64), + x: int(info.X.Int64), + }, true +} + +func buildSQL(records []fusedRecord) []string { + lines := make([]string, 0, len(records)*2) + header := "-- SQL 输出 (默认未执行)" + lines = append(lines, header) + for _, rec := range records { + sql := fmt.Sprintf( + "INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index)\nVALUES ('%s', '%s', '%s'::timestamptz, '%s'::timestamptz, %d, %d, %d, %d, %d, %d, %d, %d, %d)\nON CONFLICT (station_id, provider, issued_at, forecast_time)\nDO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000, temp_c_x100=EXCLUDED.temp_c_x100, humidity_pct=EXCLUDED.humidity_pct, wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000, wind_dir_deg=EXCLUDED.wind_dir_deg, precip_prob_pct=EXCLUDED.precip_prob_pct, pressure_hpa_x100=EXCLUDED.pressure_hpa_x100, uv_index=EXCLUDED.uv_index;", + rec.StationID, + outputProvider, + rec.IssuedAt.In(asiaShanghai).Format(time.RFC3339), + rec.ForecastTime.In(asiaShanghai).Format(time.RFC3339), + int(math.Round(rec.Sample.Rain*1000)), + int(math.Round(rec.Sample.Temp*100)), + int(math.Round(rec.Sample.Humidity)), + int(math.Round(rec.Sample.WindSpeed*1000)), + int(math.Round(rec.Sample.WindGust*1000)), + int(math.Round(rec.Sample.WindDir)), + int(math.Round(rec.Sample.Prob)), + int(math.Round(rec.Sample.Pressure*100)), + int(math.Round(rec.Sample.UV)), + ) + lines = append(lines, sql, "") + } + return lines +} + +func applySQL(db *sql.DB, records []fusedRecord) error { + tx, err := db.Begin() + if err != nil { + return err + } + stmt, err := tx.Prepare(` + INSERT INTO forecast_hourly ( + station_id, provider, issued_at, forecast_time, + rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, + wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT (station_id, provider, issued_at, forecast_time) + DO UPDATE SET + rain_mm_x1000 = EXCLUDED.rain_mm_x1000, + temp_c_x100 = EXCLUDED.temp_c_x100, + humidity_pct = EXCLUDED.humidity_pct, + wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000, + wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000, + wind_dir_deg = EXCLUDED.wind_dir_deg, + precip_prob_pct = EXCLUDED.precip_prob_pct, + pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100, + uv_index = EXCLUDED.uv_index`) + if err != nil { + tx.Rollback() + return err + } + defer stmt.Close() + + for _, rec := range records { + if _, err := stmt.Exec( + rec.StationID, + outputProvider, + rec.IssuedAt, + rec.ForecastTime, + int(math.Round(rec.Sample.Rain*1000)), + int(math.Round(rec.Sample.Temp*100)), + int(math.Round(rec.Sample.Humidity)), + int(math.Round(rec.Sample.WindSpeed*1000)), + int(math.Round(rec.Sample.WindGust*1000)), + int(math.Round(rec.Sample.WindDir)), + int(math.Round(rec.Sample.Prob)), + int(math.Round(rec.Sample.Pressure*100)), + int(math.Round(rec.Sample.UV)), + ); err != nil { + tx.Rollback() + return err + } + } + return tx.Commit() +} diff --git a/templates/index.html b/templates/index.html index cadc7ef..c4fa18e 100644 --- a/templates/index.html +++ b/templates/index.html @@ -476,9 +476,11 @@