1148 lines
31 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"database/sql"
"encoding/binary"
"errors"
"flag"
"fmt"
"log"
"math"
"os"
"strings"
"time"
"weatherstation/internal/database"
)
const (
providerOpenMeteo = "open-meteo"
providerCaiyun = "caiyun"
providerImdroid = "imdroid"
outputProvider = "imdroid_mix"
)
var (
defaultWeights = []float64{0.4, 0.3, 0.3}
asiaShanghai *time.Location
)
func init() {
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
loc = time.FixedZone("CST", 8*3600)
}
asiaShanghai = loc
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
log.SetPrefix("[imdroidmix] ")
}
type stationInfo struct {
ID string
Alias string
Lat sql.NullFloat64
Lon sql.NullFloat64
Z sql.NullInt64
Y sql.NullInt64
X sql.NullInt64
}
type forecastSample struct {
Rain float64
Temp float64
Humidity float64
WindSpeed float64
WindGust float64
WindDir float64
Prob float64
Pressure float64
UV float64
}
type providerMatrix struct {
Provider string
Samples []forecastSample
}
type fusedRecord struct {
StationID string
IssuedAt time.Time
ForecastTime time.Time
Sample forecastSample
MaskApplied bool
}
type maskContext struct {
id string
alias string
lat float64
lon float64
z int
y int
x int
}
func main() {
var (
issuedRange string
stationFlag string
outSQLPath string
showProgress bool
applyUpserts bool
includeAll bool
debugLog bool
noMask bool
minSources int
maskHoursStr string
)
flag.StringVar(&issuedRange, "issued_range", "", "时间范围,格式 \"YYYY-MM-DD HH:MM:SS,YYYY-MM-DD HH:MM:SS\"CST")
flag.StringVar(&stationFlag, "station", "", "目标站点(可选,多个以逗号分隔)")
flag.BoolVar(&includeAll, "all_stations", false, "处理全部有经纬度的站点")
flag.StringVar(&outSQLPath, "export_sql", "imdroid_mix.sql", "输出SQL文件路径")
flag.BoolVar(&showProgress, "progress", false, "展示处理进度")
flag.BoolVar(&applyUpserts, "apply", false, "直接写入数据库默认仅导出SQL")
flag.BoolVar(&debugLog, "debug", false, "输出调试日志")
flag.BoolVar(&noMask, "no_mask", false, "关闭雷达掩码(不清零融合雨量)")
flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过")
flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0")
flag.Parse()
if strings.TrimSpace(issuedRange) == "" {
log.Fatalln("必须提供 --issued_range")
}
startIssued, endIssued, err := parseRange(issuedRange)
if err != nil {
log.Fatalf("解析 issued_range 失败: %v", err)
}
if !endIssued.After(startIssued) {
log.Fatalln("issued_range 结束时间必须大于起始时间")
}
db := database.GetDB()
defer database.Close()
stations, err := resolveStations(db, stationFlag, includeAll)
if err != nil {
log.Fatalf("加载站点失败: %v", err)
}
if len(stations) == 0 {
log.Fatalln("没有符合条件的站点")
}
type stationWork struct {
info stationInfo
issuedTimes []time.Time
providers []string
}
var jobs []stationWork
var totalSteps int
for _, st := range stations {
pSet, err := resolveProviderSet(db, st.ID, startIssued, endIssued)
if err != nil {
log.Fatalf("检测站点 %s 预报源失败: %v", st.ID, err)
}
if debugLog {
log.Printf("站点 %s 使用预报源: %v", st.ID, pSet)
}
issuedTimes, err := fetchIssuedHourBuckets(db, st.ID, startIssued, endIssued, pSet, minSources)
if err != nil {
log.Fatalf("查询站点 %s 的 issued_at 失败: %v", st.ID, err)
}
if len(issuedTimes) == 0 {
if debugLog {
// 打印每个provider的 issued_at 数量辅助定位
for _, p := range pSet {
cnt, first, last := debugIssuedSummary(db, st.ID, startIssued, endIssued, p)
log.Printf("站点 %s provider=%s issued_at数量=%d 时间范围[%s, %s]", st.ID, p, cnt, first, last)
}
}
continue
}
jobs = append(jobs, stationWork{info: st, issuedTimes: issuedTimes, providers: pSet})
totalSteps += len(issuedTimes)
}
if len(jobs) == 0 {
log.Fatalln("指定时间范围内无可融合的预报数据")
}
var (
results []fusedRecord
step int
progressFn func(current int, total int, ctx string)
)
if showProgress {
progressFn = func(current, total int, ctx string) {
fmt.Printf("\r处理进度 %d/%d (%s)...", current, total, ctx)
}
defer fmt.Println()
}
// 解析 mask_hours
maskHours := [3]bool{true, true, true}
if s := strings.TrimSpace(maskHoursStr); s != "" {
parts := strings.Split(s, ",")
for i := 0; i < len(parts) && i < 3; i++ {
v := strings.TrimSpace(parts[i])
maskHours[i] = (v == "1" || strings.EqualFold(v, "true"))
}
}
for _, job := range jobs {
info := job.info
maskCtx, hasMaskCtx := buildMaskContext(info)
weights := append([]float64{}, defaultWeights...)
for idx, issued := range job.issuedTimes {
matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers)
if err != nil {
log.Printf("站点 %s issued=%s 载入预报失败: %v", info.ID, issued.Format(time.RFC3339), err)
continue
}
if len(matrix) < minSources {
log.Printf("站点 %s issued=%s 可用源=%d < min_sources=%d跳过", info.ID, issued.Format(time.RFC3339), len(matrix), minSources)
if debugLog {
probeProviderAtBucket(db, info.ID, job.providers, issued)
}
continue
}
if debugLog {
log.Printf("权重(使用前): station=%s issued=%s %s=%.2f %s=%.2f %s=%.2f",
info.ID, issued.Format("2006-01-02 15:04:05"),
job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2])
}
fused := fuseForecastsWith(matrix, effectiveWeights(weights, job.providers, matrix))
rainAllZero := isRainAllZero(fused)
maskApplied := false
if !rainAllZero && hasMaskCtx && !noMask {
rep, err := checkRadarMask(db, maskCtx, issued)
if err != nil {
log.Printf("站点 %s issued=%s 掩码计算出错: %v", info.ID, issued.Format(time.RFC3339), err)
} else {
if rep.Scanned {
// 逐小时按 mask_hours 应用
// 解析 mask_hours 上移至批次开始
// 在此应用逐小时清零
// 重用 maskHours 变量
for hidx := 0; hidx < 3; hidx++ {
if !maskHours[hidx] {
continue
}
if rep.HitsByHour[hidx] == 0 {
fused[hidx].Rain = 0
maskApplied = true
}
}
}
if debugLog {
log.Printf("掩码报告: station=%s issued=%s scanned=%v windOK=%v used=%s wind=%.2fm/s %.0f° tiles=%d hits=[%d,%d,%d] mask_hours=%v", info.ID, issued.Format("2006-01-02 15:04:05"), rep.Scanned, rep.WindOK, rep.UsedAlias, rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskHours)
}
}
}
for h := 1; h <= 3; h++ {
results = append(results, fusedRecord{
StationID: info.ID,
IssuedAt: issued, // 输出按“小时桶”归一的发布时间
ForecastTime: issued.Add(time.Duration(h) * time.Hour),
Sample: fused[h-1],
MaskApplied: maskApplied,
})
}
actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, issued, issued.Add(time.Hour))
if err != nil {
log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err)
} else if actualOK {
before := append([]float64{}, weights...)
weights = adjustWeightsSubset(weights, job.providers, matrix, actualRain)
if debugLog {
log.Printf("权重调整: station=%s issued=%s actual=%.3f | 之前[%s=%.2f,%s=%.2f,%s=%.2f] 之后[%s=%.2f,%s=%.2f,%s=%.2f] 参与源=%d",
info.ID, issued.Format("2006-01-02 15:04:05"), actualRain,
job.providers[0], before[0], job.providers[1], before[1], job.providers[2], before[2],
job.providers[0], weights[0], job.providers[1], weights[1], job.providers[2], weights[2],
len(matrix))
}
}
step++
if progressFn != nil {
progressFn(step, totalSteps, fmt.Sprintf("%s #%d", info.ID, idx+1))
}
}
}
if len(results) == 0 {
log.Println("未生成任何融合结果")
return
}
sqlLines := buildSQL(results)
if err := os.WriteFile(outSQLPath, []byte(strings.Join(sqlLines, "\n")+"\n"), 0o644); err != nil {
log.Fatalf("写入SQL文件失败: %v", err)
}
log.Printf("已写出SQL到 %s共 %d 条记录)", outSQLPath, len(results))
if applyUpserts {
if err := applySQL(db, results); err != nil {
log.Fatalf("写入数据库失败: %v", err)
}
log.Printf("已写入数据库 forecast_hourlyprovider=%s", outputProvider)
}
}
func parseRange(r string) (time.Time, time.Time, error) {
parts := strings.Split(r, ",")
if len(parts) != 2 {
return time.Time{}, time.Time{}, errors.New("格式应为 start,end")
}
start, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[0]), asiaShanghai)
if err != nil {
return time.Time{}, time.Time{}, err
}
end, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[1]), asiaShanghai)
if err != nil {
return time.Time{}, time.Time{}, err
}
return start, end, nil
}
func resolveStations(db *sql.DB, stationFlag string, includeAll bool) ([]stationInfo, error) {
if includeAll {
const q = `
SELECT station_id,
COALESCE(NULLIF(station_alias,''), station_id) AS alias,
latitude, longitude, z, y, x
FROM stations
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
AND device_type='WH65LP'
ORDER BY station_id`
rows, err := db.Query(q)
if err != nil {
return nil, err
}
defer rows.Close()
var out []stationInfo
for rows.Next() {
var s stationInfo
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil {
return nil, err
}
out = append(out, s)
}
return out, nil
}
list := fieldsFromFlag(stationFlag)
if len(list) == 0 {
return nil, nil
}
var out []stationInfo
const q = `
SELECT station_id,
COALESCE(NULLIF(station_alias,''), station_id) AS alias,
latitude, longitude, z, y, x
FROM stations
WHERE station_id=$1`
for _, id := range list {
var s stationInfo
err := db.QueryRow(q, id).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
if err != nil {
if err == sql.ErrNoRows {
log.Printf("警告:站点 %s 不存在,跳过", id)
continue
}
return nil, err
}
out = append(out, s)
}
return out, nil
}
func fieldsFromFlag(v string) []string {
if strings.TrimSpace(v) == "" {
return nil
}
parts := strings.Split(v, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}
func resolveProviderSet(_ *sql.DB, _ string, _ time.Time, _ time.Time) ([]string, error) {
// 固定三源,不做别名回退
return []string{providerOpenMeteo, providerCaiyun, providerImdroid}, nil
}
func hasProviderData(db *sql.DB, stationID, provider string, from, to time.Time) (bool, error) {
var cnt int
err := db.QueryRow(`
SELECT COUNT(*)
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`,
stationID, provider, from, to).Scan(&cnt)
if err != nil {
return false, err
}
return cnt > 0, nil
}
func fetchIssuedHourBuckets(db *sql.DB, stationID string, from, to time.Time, providers []string, minSources int) ([]time.Time, error) {
if len(providers) != 3 {
return nil, fmt.Errorf("provider 列表长度异常: %v", providers)
}
if minSources < 1 {
minSources = 1
}
if minSources > 3 {
minSources = 3
}
const q = `
WITH base AS (
SELECT date_trunc('hour', issued_at) AS bucket, provider
FROM forecast_hourly
WHERE station_id=$1 AND provider IN ($2, $3, $4)
AND issued_at BETWEEN $5 AND $6
GROUP BY 1,2
)
SELECT bucket
FROM base
GROUP BY bucket
HAVING COUNT(DISTINCT provider) >= $7
ORDER BY bucket`
rows, err := db.Query(q, stationID, providers[0], providers[1], providers[2], from, to, minSources)
if err != nil {
return nil, err
}
defer rows.Close()
var buckets []time.Time
for rows.Next() {
var t time.Time
if err := rows.Scan(&t); err != nil {
return nil, err
}
buckets = append(buckets, t.In(asiaShanghai))
}
return buckets, nil
}
// debugIssuedSummary 仅用于 --debug统计provider在范围内的issued_at计数及首尾时间
func debugIssuedSummary(db *sql.DB, stationID string, from, to time.Time, provider string) (count int, first string, last string) {
const q = `
SELECT COUNT(*),
MIN(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS first,
MAX(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS last
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`
var f, l sql.NullString
if err := db.QueryRow(q, stationID, provider, from, to).Scan(&count, &f, &l); err != nil {
return 0, "", ""
}
if f.Valid {
first = f.String
}
if l.Valid {
last = l.String
}
return
}
func loadProviderMatrix(db *sql.DB, stationID string, bucketHour time.Time, providers []string) ([]providerMatrix, error) {
out := make([]providerMatrix, 0, len(providers))
for _, provider := range providers {
issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour)
if err != nil {
return nil, err
}
if !ok {
// 该源缺席,跳过
continue
}
samples, err := loadForecastSamples(db, stationID, provider, issued)
if err != nil {
return nil, err
}
if len(samples) != 3 {
// 样本不足,视为该源缺席
continue
}
out = append(out, providerMatrix{Provider: provider, Samples: samples})
}
return out, nil
}
// 在某个小时桶内,选取该 provider 的最新 issued_at[bucket, bucket+1h)
func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) {
const q = `
SELECT issued_at
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour'
ORDER BY issued_at DESC
LIMIT 1`
var t time.Time
err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t)
if err == sql.ErrNoRows {
return time.Time{}, false, nil
}
if err != nil {
return time.Time{}, false, err
}
return t, true, nil
}
func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) {
const q = `
SELECT forecast_time,
rain_mm_x1000,
temp_c_x100,
humidity_pct,
wind_speed_ms_x1000,
wind_gust_ms_x1000,
wind_dir_deg,
precip_prob_pct,
pressure_hpa_x100,
uv_index
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND issued_at=$3
ORDER BY forecast_time ASC`
rows, err := db.Query(q, stationID, provider, issued)
if err != nil {
return nil, err
}
defer rows.Close()
var list []forecastSample
for rows.Next() {
var (
ft time.Time
rain, temp, humidity, ws, gust, wdir, prob sql.NullInt64
pressure, uv sql.NullInt64
)
if err := rows.Scan(&ft, &rain, &temp, &humidity, &ws, &gust, &wdir, &prob, &pressure, &uv); err != nil {
return nil, err
}
// 使用“向上取整”的方式计算前视小时(与查询端 lead_hours=CEIL(...) 逻辑对齐)
dh := ft.Sub(issued).Hours()
hours := int(math.Ceil(dh - 1e-9))
if hours < 1 || hours > 3 {
continue
}
s := forecastSample{
Rain: float64(rain.Int64) / 1000.0,
Temp: float64(temp.Int64) / 100.0,
Humidity: float64(humidity.Int64),
WindSpeed: float64(ws.Int64) / 1000.0,
WindGust: float64(gust.Int64) / 1000.0,
WindDir: float64(wdir.Int64),
Prob: float64(prob.Int64),
Pressure: float64(pressure.Int64) / 100.0,
UV: float64(uv.Int64),
}
list = append(list, s)
}
return list, nil
}
func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample {
result := make([]forecastSample, 3)
for h := 0; h < 3; h++ {
var windX, windY float64
for pIdx, pm := range matrix {
if len(pm.Samples) <= h {
continue
}
s := pm.Samples[h]
w := weights[pIdx]
result[h].Rain += s.Rain * w
result[h].Temp += s.Temp * w
result[h].Humidity += s.Humidity * w
result[h].WindSpeed += s.WindSpeed * w
result[h].WindGust += s.WindGust * w
result[h].Prob += s.Prob * w
result[h].Pressure += s.Pressure * w
result[h].UV += s.UV * w
rad := s.WindDir * math.Pi / 180
windX += math.Cos(rad) * w
windY += math.Sin(rad) * w
}
if windX == 0 && windY == 0 {
result[h].WindDir = 0
} else {
dir := math.Atan2(windY, windX) * 180 / math.Pi
if dir < 0 {
dir += 360
}
result[h].WindDir = dir
}
result[h].Humidity = math.Round(result[h].Humidity)
result[h].Prob = math.Round(result[h].Prob)
result[h].UV = math.Round(result[h].UV)
}
return result
}
// fuseForecastsWith 使用与 matrix 对齐的有效权重进行融合
func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample {
if len(eff) != len(matrix) {
return fuseForecasts(matrix, uniform(len(matrix)))
}
result := make([]forecastSample, 3)
for h := 0; h < 3; h++ {
var windX, windY float64
for pIdx, pm := range matrix {
if len(pm.Samples) <= h {
continue
}
s := pm.Samples[h]
w := eff[pIdx]
result[h].Rain += s.Rain * w
result[h].Temp += s.Temp * w
result[h].Humidity += s.Humidity * w
result[h].WindSpeed += s.WindSpeed * w
result[h].WindGust += s.WindGust * w
result[h].Prob += s.Prob * w
result[h].Pressure += s.Pressure * w
result[h].UV += s.UV * w
rad := s.WindDir * math.Pi / 180
windX += math.Cos(rad) * w
windY += math.Sin(rad) * w
}
if windX == 0 && windY == 0 {
result[h].WindDir = 0
} else {
dir := math.Atan2(windY, windX) * 180 / math.Pi
if dir < 0 {
dir += 360
}
result[h].WindDir = dir
}
result[h].Humidity = math.Round(result[h].Humidity)
result[h].Prob = math.Round(result[h].Prob)
result[h].UV = math.Round(result[h].UV)
}
return result
}
func uniform(n int) []float64 {
if n <= 0 {
return nil
}
w := make([]float64, n)
v := 1.0 / float64(n)
for i := range w {
w[i] = v
}
return w
}
// effectiveWeights 将基础权重映射到参与融合的 providers按 matrix 顺序),并在子集上归一化
func effectiveWeights(base []float64, providersAll []string, matrix []providerMatrix) []float64 {
m := make(map[string]int)
for i, p := range providersAll {
m[p] = i
}
eff := make([]float64, len(matrix))
sum := 0.0
for i, pm := range matrix {
if idx, ok := m[pm.Provider]; ok {
eff[i] = base[idx]
sum += eff[i]
}
}
if sum <= 0 {
return uniform(len(matrix))
}
for i := range eff {
eff[i] /= sum
}
return eff
}
// adjustWeightsSubset 仅对当前参与的 providers 调整权重,其它分量保持不变
func adjustWeightsSubset(base []float64, providersAll []string, matrix []providerMatrix, actual float64) []float64 {
preds := extractProviderRainAt(matrix, 0)
if len(preds) == 0 {
return base
}
// 找最优和最差(绝对误差)
best, worst := 0, 0
for i := range preds {
if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
best = i
}
if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) {
worst = i
}
}
// provider 名到 base 索引
m := make(map[string]int)
for i, p := range providersAll {
m[p] = i
}
out := append([]float64{}, base...)
// 应用 ±0.1 调整
if bi, ok := m[matrix[best].Provider]; ok {
out[bi] += 0.1
}
if wi, ok := m[matrix[worst].Provider]; ok {
out[wi] -= 0.1
}
// 夹紧并归一化
sum := 0.0
for i := range out {
if out[i] < 0 {
out[i] = 0
}
sum += out[i]
}
if sum == 0 {
// 回到默认
return append([]float64{}, defaultWeights...)
}
for i := range out {
out[i] /= sum
}
return out
}
// probeProviderAtBucket: 调试辅助打印该小时桶内每个provider的issued与是否有+1/+2/+3样本
func probeProviderAtBucket(db *sql.DB, stationID string, providers []string, bucket time.Time) {
for _, p := range providers {
issued, ok, err := resolveIssuedAtInBucket(db, stationID, p, bucket)
if err != nil {
log.Printf(" provider=%s 探测错误: %v", p, err)
continue
}
if !ok {
log.Printf(" provider=%s: 小时桶内无 issued", p)
continue
}
leads := presentLeads(db, stationID, p, issued)
log.Printf(" provider=%s: issued=%s leads=%v (count=%d)", p, issued.In(asiaShanghai).Format("2006-01-02 15:04:05"), leads, len(leads))
}
}
// presentLeads 返回该 issued 的预测中,存在的 lead 小时集合1..3
func presentLeads(db *sql.DB, stationID, provider string, issued time.Time) []int {
rows, err := db.Query(`SELECT forecast_time FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at=$3`, stationID, provider, issued)
if err != nil {
return nil
}
defer rows.Close()
var leads []int
for rows.Next() {
var ft time.Time
if err := rows.Scan(&ft); err != nil {
continue
}
dh := ft.Sub(issued).Hours()
h := int(math.Ceil(dh - 1e-9))
if h >= 1 && h <= 3 {
leads = append(leads, h)
}
}
return leads
}
func isRainAllZero(samples []forecastSample) bool {
for _, s := range samples {
if math.Abs(s.Rain) > 1e-6 {
return false
}
}
return true
}
func extractProviderRainAt(matrix []providerMatrix, hourIdx int) []float64 {
out := make([]float64, 0, len(matrix))
for _, pm := range matrix {
if len(pm.Samples) > hourIdx {
out = append(out, pm.Samples[hourIdx].Rain)
} else {
out = append(out, 0)
}
}
return out
}
func adjustWeights(current []float64, preds []float64, actual float64) []float64 {
if len(current) != len(preds) || len(current) != 3 {
return current
}
diffs := make([]float64, len(preds))
for i := range preds {
diffs[i] = math.Abs(preds[i] - actual)
}
minIdx, maxIdx := 0, 0
for i, d := range diffs {
if d < diffs[minIdx] {
minIdx = i
}
if d > diffs[maxIdx] {
maxIdx = i
}
}
if minIdx != maxIdx {
current[minIdx] += 0.1
current[maxIdx] -= 0.1
}
var sum float64
for i := range current {
if current[i] < 0 {
current[i] = 0
}
sum += current[i]
}
if sum == 0 {
return append([]float64{}, defaultWeights...)
}
for i := range current {
current[i] /= sum
}
return current
}
func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) {
const q = `
SELECT SUM(rain_10m_mm_x1000)
FROM rs485_weather_10min
WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
var sum sql.NullInt64
err := db.QueryRow(q, stationID, start, end).Scan(&sum)
if err != nil {
return 0, false, err
}
if !sum.Valid {
return 0, false, nil
}
return float64(sum.Int64) / 1000.0, true, nil
}
type tileRec struct {
DT time.Time
Width int
Height int
West float64
South float64
East float64
North float64
ResDeg float64
Data []byte
}
type maskReport struct {
Scanned bool
WindOK bool
UsedAlias string
WindSpeed float64
WindDir float64
Tiles int
HitsByHour [3]int
}
func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) {
rep := maskReport{}
windTime := issued.Add(-time.Hour)
// 优先用 station_id其次别名
if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil {
return rep, err
} else if ok {
rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id
} else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil {
return rep, err2
} else if ok2 {
rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias
} else {
// 没有风场:不扫描,不应用掩码
return rep, nil
}
tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued)
if err != nil {
return rep, err
}
if len(tiles) == 0 {
// 无瓦片:不扫描,不应用掩码
return rep, nil
}
rep.Tiles = len(tiles)
rep.Scanned = true
const circleR = 8000.0
const halfAngle = 30.0
for _, t := range tiles {
vals, xs, ys, err := decodeTile(t)
if err != nil {
continue
}
for r := 0; r < len(vals); r++ {
row := vals[r]
lat := ys[r]
for c := 0; c < len(row); c++ {
v := row[c]
if v == nil {
continue
}
dbz := *v
if dbz < 40 {
continue
}
lon := xs[c]
dist := haversine(ctx.lat, ctx.lon, lat, lon)
// +1h圆 或 扇形(3h)
if dist <= circleR {
rep.HitsByHour[0]++
}
if rep.WindSpeed > 0 {
brg := bearingDeg(ctx.lat, ctx.lon, lat, lon)
if angDiff(brg, rep.WindDir) <= halfAngle {
if dist <= rep.WindSpeed*3*3600 {
rep.HitsByHour[0]++
}
// +2h圆 或 扇形(2h)
if dist <= circleR {
rep.HitsByHour[1]++
}
if dist <= rep.WindSpeed*2*3600 {
rep.HitsByHour[1]++
}
// +3h扇形(3h)
if dist <= rep.WindSpeed*3*3600 {
rep.HitsByHour[2]++
}
}
}
}
}
}
return rep, nil
}
func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) {
const q = `
SELECT wind_speed, wind_direction
FROM radar_weather
WHERE alias=$1 AND dt <= $2
ORDER BY dt DESC
LIMIT 1`
var speed, dir sql.NullFloat64
err := db.QueryRow(q, alias, target).Scan(&speed, &dir)
if err == sql.ErrNoRows {
return 0, 0, false, nil
}
if err != nil {
return 0, 0, false, err
}
if !speed.Valid || !dir.Valid {
return 0, 0, false, nil
}
return speed.Float64, dir.Float64, true, nil
}
func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) {
const q = `
SELECT dt, width, height, west, south, east, north, res_deg, data
FROM radar_tiles
WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5
ORDER BY dt`
rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var list []tileRec
for rows.Next() {
var t tileRec
if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil {
return nil, err
}
list = append(list, t)
}
return list, nil
}
func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) {
w, h := t.Width, t.Height
if w <= 0 || h <= 0 {
return nil, nil, nil, fmt.Errorf("非法瓦片尺寸")
}
if len(t.Data) < w*h*2 {
return nil, nil, nil, fmt.Errorf("数据长度不足")
}
xs := make([]float64, w)
for c := 0; c < w; c++ {
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
}
ys := make([]float64, h)
for r := 0; r < h; r++ {
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
}
vals := make([][]*float64, h)
off := 0
for r := 0; r < h; r++ {
row := make([]*float64, w)
for c := 0; c < w; c++ {
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
off += 2
if v >= 32766 {
row[c] = nil
continue
}
dbz := float64(v) / 10.0
if dbz < 0 {
dbz = 0
} else if dbz > 75 {
dbz = 75
}
value := dbz
row[c] = &value
}
vals[r] = row
}
return vals, xs, ys, nil
}
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
const R = 6371000.0
dLat := toRad(lat2 - lat1)
dLon := toRad(lon2 - lon1)
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
return R * c
}
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
φ1 := toRad(lat1)
φ2 := toRad(lat2)
Δλ := toRad(lon2 - lon1)
y := math.Sin(Δλ) * math.Cos(φ2)
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
brg := toDeg(math.Atan2(y, x))
if brg < 0 {
brg += 360
}
return brg
}
func angDiff(a, b float64) float64 {
d := math.Mod(a-b+540, 360) - 180
if d < 0 {
d = -d
}
return math.Abs(d)
}
func toRad(d float64) float64 { return d * math.Pi / 180 }
func toDeg(r float64) float64 { return r * 180 / math.Pi }
func buildMaskContext(info stationInfo) (maskContext, bool) {
if !info.Lat.Valid || !info.Lon.Valid {
return maskContext{}, false
}
if !info.Z.Valid || !info.Y.Valid || !info.X.Valid {
return maskContext{}, false
}
alias := info.Alias
if strings.TrimSpace(alias) == "" {
alias = info.ID
}
return maskContext{
id: info.ID,
alias: alias,
lat: info.Lat.Float64,
lon: info.Lon.Float64,
z: int(info.Z.Int64),
y: int(info.Y.Int64),
x: int(info.X.Int64),
}, true
}
func buildSQL(records []fusedRecord) []string {
lines := make([]string, 0, len(records)*2)
header := "-- SQL 输出 (默认未执行)"
lines = append(lines, header)
for _, rec := range records {
sql := fmt.Sprintf(
"INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index)\nVALUES ('%s', '%s', '%s'::timestamptz, '%s'::timestamptz, %d, %d, %d, %d, %d, %d, %d, %d, %d)\nON CONFLICT (station_id, provider, issued_at, forecast_time)\nDO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000, temp_c_x100=EXCLUDED.temp_c_x100, humidity_pct=EXCLUDED.humidity_pct, wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000, wind_dir_deg=EXCLUDED.wind_dir_deg, precip_prob_pct=EXCLUDED.precip_prob_pct, pressure_hpa_x100=EXCLUDED.pressure_hpa_x100, uv_index=EXCLUDED.uv_index;",
rec.StationID,
outputProvider,
rec.IssuedAt.In(asiaShanghai).Format(time.RFC3339),
rec.ForecastTime.In(asiaShanghai).Format(time.RFC3339),
int(math.Round(rec.Sample.Rain*1000)),
int(math.Round(rec.Sample.Temp*100)),
int(math.Round(rec.Sample.Humidity)),
int(math.Round(rec.Sample.WindSpeed*1000)),
int(math.Round(rec.Sample.WindGust*1000)),
int(math.Round(rec.Sample.WindDir)),
int(math.Round(rec.Sample.Prob)),
int(math.Round(rec.Sample.Pressure*100)),
int(math.Round(rec.Sample.UV)),
)
lines = append(lines, sql, "")
}
return lines
}
func applySQL(db *sql.DB, records []fusedRecord) error {
tx, err := db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`
INSERT INTO forecast_hourly (
station_id, provider, issued_at, forecast_time,
rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000,
wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (station_id, provider, issued_at, forecast_time)
DO UPDATE SET
rain_mm_x1000 = EXCLUDED.rain_mm_x1000,
temp_c_x100 = EXCLUDED.temp_c_x100,
humidity_pct = EXCLUDED.humidity_pct,
wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
wind_dir_deg = EXCLUDED.wind_dir_deg,
precip_prob_pct = EXCLUDED.precip_prob_pct,
pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100,
uv_index = EXCLUDED.uv_index`)
if err != nil {
tx.Rollback()
return err
}
defer stmt.Close()
for _, rec := range records {
if _, err := stmt.Exec(
rec.StationID,
outputProvider,
rec.IssuedAt,
rec.ForecastTime,
int(math.Round(rec.Sample.Rain*1000)),
int(math.Round(rec.Sample.Temp*100)),
int(math.Round(rec.Sample.Humidity)),
int(math.Round(rec.Sample.WindSpeed*1000)),
int(math.Round(rec.Sample.WindGust*1000)),
int(math.Round(rec.Sample.WindDir)),
int(math.Round(rec.Sample.Prob)),
int(math.Round(rec.Sample.Pressure*100)),
int(math.Round(rec.Sample.UV)),
); err != nil {
tx.Rollback()
return err
}
}
return tx.Commit()
}