1420 lines
40 KiB
Go
1420 lines
40 KiB
Go
package main
|
||
|
||
import (
|
||
"database/sql"
|
||
"encoding/binary"
|
||
"errors"
|
||
"flag"
|
||
"fmt"
|
||
"log"
|
||
"math"
|
||
"os"
|
||
"sort"
|
||
"strings"
|
||
"time"
|
||
|
||
"weatherstation/internal/database"
|
||
)
|
||
|
||
const (
|
||
providerOpenMeteo = "open-meteo"
|
||
providerCaiyun = "caiyun"
|
||
providerImdroid = "imdroid"
|
||
outputProvider = "imdroid_mix"
|
||
)
|
||
|
||
var (
|
||
defaultWeights = []float64{0.4, 0.3, 0.3}
|
||
asiaShanghai *time.Location
|
||
debugEnabled bool
|
||
)
|
||
|
||
func init() {
|
||
loc, err := time.LoadLocation("Asia/Shanghai")
|
||
if err != nil {
|
||
loc = time.FixedZone("CST", 8*3600)
|
||
}
|
||
asiaShanghai = loc
|
||
log.SetFlags(log.LstdFlags | log.Lmsgprefix)
|
||
log.SetPrefix("[imdroidmix] ")
|
||
}
|
||
|
||
type stationInfo struct {
|
||
ID string
|
||
Alias string
|
||
Lat sql.NullFloat64
|
||
Lon sql.NullFloat64
|
||
Z sql.NullInt64
|
||
Y sql.NullInt64
|
||
X sql.NullInt64
|
||
}
|
||
|
||
type forecastSample struct {
|
||
Rain float64
|
||
Temp float64
|
||
Humidity float64
|
||
WindSpeed float64
|
||
WindGust float64
|
||
WindDir float64
|
||
Prob float64
|
||
Pressure float64
|
||
UV float64
|
||
}
|
||
|
||
type providerMatrix struct {
|
||
Provider string
|
||
Samples []forecastSample
|
||
}
|
||
|
||
type fusedRecord struct {
|
||
StationID string
|
||
IssuedAt time.Time
|
||
ForecastTime time.Time
|
||
Sample forecastSample
|
||
MaskApplied bool
|
||
}
|
||
|
||
type maskContext struct {
|
||
id string
|
||
alias string
|
||
lat float64
|
||
lon float64
|
||
z int
|
||
y int
|
||
x int
|
||
}
|
||
|
||
func main() {
|
||
var (
|
||
issuedRange string
|
||
stationFlag string
|
||
outSQLPath string
|
||
showProgress bool
|
||
applyUpserts bool
|
||
includeAll bool
|
||
debugLog bool
|
||
noMask bool
|
||
minSources int
|
||
maskHoursStr string
|
||
)
|
||
|
||
flag.StringVar(&issuedRange, "issued_range", "", "时间范围,格式 \"YYYY-MM-DD HH:MM:SS,YYYY-MM-DD HH:MM:SS\"(CST)")
|
||
flag.StringVar(&stationFlag, "station", "", "目标站点(可选,多个以逗号分隔)")
|
||
flag.BoolVar(&includeAll, "all_stations", false, "处理全部有经纬度的站点")
|
||
flag.StringVar(&outSQLPath, "export_sql", "imdroid_mix.sql", "输出SQL文件路径")
|
||
flag.BoolVar(&showProgress, "progress", false, "展示处理进度")
|
||
flag.BoolVar(&applyUpserts, "apply", false, "直接写入数据库(默认仅导出SQL)")
|
||
flag.BoolVar(&debugLog, "debug", false, "输出调试日志")
|
||
flag.BoolVar(&noMask, "no_mask", false, "关闭雷达掩码(不清零融合雨量)")
|
||
flag.IntVar(&minSources, "min_sources", 2, "最少参与融合的预报源个数(2或3),不足则跳过")
|
||
flag.StringVar(&maskHoursStr, "mask_hours", "1,1,1", "逐小时掩码开关(h+1,h+2,h+3),如: 1,1,0")
|
||
flag.Parse()
|
||
// 让库内辅助函数也能访问调试开关
|
||
debugEnabled = debugLog
|
||
|
||
if strings.TrimSpace(issuedRange) == "" {
|
||
log.Fatalln("必须提供 --issued_range")
|
||
}
|
||
|
||
startIssued, endIssued, err := parseRange(issuedRange)
|
||
if err != nil {
|
||
log.Fatalf("解析 issued_range 失败: %v", err)
|
||
}
|
||
if !endIssued.After(startIssued) {
|
||
log.Fatalln("issued_range 结束时间必须大于起始时间")
|
||
}
|
||
|
||
db := database.GetDB()
|
||
defer database.Close()
|
||
|
||
stations, err := resolveStations(db, stationFlag, includeAll)
|
||
if err != nil {
|
||
log.Fatalf("加载站点失败: %v", err)
|
||
}
|
||
if len(stations) == 0 {
|
||
log.Fatalln("没有符合条件的站点")
|
||
}
|
||
|
||
type stationWork struct {
|
||
info stationInfo
|
||
issuedTimes []time.Time
|
||
providers []string
|
||
}
|
||
var jobs []stationWork
|
||
var totalSteps int
|
||
|
||
for _, st := range stations {
|
||
pSet, err := resolveProviderSet(db, st.ID, startIssued, endIssued)
|
||
if err != nil {
|
||
log.Fatalf("检测站点 %s 预报源失败: %v", st.ID, err)
|
||
}
|
||
if debugLog {
|
||
log.Printf("站点 %s 使用预报源: %v", st.ID, pSet)
|
||
}
|
||
issuedTimes, err := fetchIssuedHourBuckets(db, st.ID, startIssued, endIssued, pSet, minSources)
|
||
if err != nil {
|
||
log.Fatalf("查询站点 %s 的 issued_at 失败: %v", st.ID, err)
|
||
}
|
||
if len(issuedTimes) == 0 {
|
||
if debugLog {
|
||
// 打印每个provider的 issued_at 数量辅助定位
|
||
for _, p := range pSet {
|
||
cnt, first, last := debugIssuedSummary(db, st.ID, startIssued, endIssued, p)
|
||
log.Printf("站点 %s provider=%s issued_at数量=%d 时间范围[%s, %s]", st.ID, p, cnt, first, last)
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
jobs = append(jobs, stationWork{info: st, issuedTimes: issuedTimes, providers: pSet})
|
||
totalSteps += len(issuedTimes)
|
||
}
|
||
|
||
if len(jobs) == 0 {
|
||
log.Fatalln("指定时间范围内无可融合的预报数据")
|
||
}
|
||
|
||
var (
|
||
results []fusedRecord
|
||
step int
|
||
progressFn func(current int, total int, ctx string)
|
||
)
|
||
|
||
if showProgress {
|
||
progressFn = func(current, total int, ctx string) {
|
||
fmt.Printf("\r处理进度 %d/%d (%s)...", current, total, ctx)
|
||
}
|
||
defer fmt.Println()
|
||
}
|
||
|
||
// 解析 mask_hours
|
||
maskHours := [3]bool{true, true, true}
|
||
if s := strings.TrimSpace(maskHoursStr); s != "" {
|
||
parts := strings.Split(s, ",")
|
||
for i := 0; i < len(parts) && i < 3; i++ {
|
||
v := strings.TrimSpace(parts[i])
|
||
maskHours[i] = (v == "1" || strings.EqualFold(v, "true"))
|
||
}
|
||
}
|
||
|
||
for _, job := range jobs {
|
||
info := job.info
|
||
maskCtx, hasMaskCtx := buildMaskContext(info)
|
||
stWeights := newWeightState(job.providers)
|
||
|
||
for idx, issued := range job.issuedTimes {
|
||
matrix, err := loadProviderMatrix(db, info.ID, issued, job.providers)
|
||
if err != nil {
|
||
log.Printf("站点 %s issued=%s 载入预报失败: %v", info.ID, issued.Format(time.RFC3339), err)
|
||
continue
|
||
}
|
||
if len(matrix) < minSources {
|
||
log.Printf("站点 %s issued=%s 可用源=%d < min_sources=%d,跳过", info.ID, issued.Format(time.RFC3339), len(matrix), minSources)
|
||
if debugLog {
|
||
probeProviderAtBucket(db, info.ID, job.providers, issued)
|
||
}
|
||
continue
|
||
}
|
||
|
||
if debugLog {
|
||
log.Printf("")
|
||
log.Printf("issue %s station=%s", issued.Format("2006-01-02 15:04:05"), info.ID)
|
||
}
|
||
|
||
// 学习先行:先得到学习前权重,再学习,再得到本轮使用权重
|
||
effBefore, _ := stWeights.effectiveForMatrix(matrix)
|
||
|
||
// 学习对齐:用刚结束的小时 [issued-1h, issued) 与 (issued-2h) 的 +1 预测
|
||
learnStart := issued.Add(-time.Hour)
|
||
learnEnd := issued
|
||
actualRain, actualOK, err := fetchActualHourlyRain(db, info.ID, learnStart, learnEnd)
|
||
if err != nil {
|
||
log.Printf("站点 %s issued=%s 聚合实况失败: %v", info.ID, issued.Format(time.RFC3339), err)
|
||
} else if actualOK {
|
||
prevIssueBucket := issued.Add(-2 * time.Hour)
|
||
preds := make([]float64, len(matrix))
|
||
valids := make([]bool, len(matrix))
|
||
for i, pm := range matrix {
|
||
if v, ok := lead1RainAtIssued(db, info.ID, pm.Provider, prevIssueBucket); ok {
|
||
preds[i] = v
|
||
valids[i] = true
|
||
}
|
||
}
|
||
if debugLog {
|
||
log.Printf("在 [%s,%s) ,该小时本站实况雨量累计为:%.3f,预报源在 issue=%s 发布的 +1h %s-%s 预报值是 %s",
|
||
learnStart.Format("15:04"), learnEnd.Format("15:04"), actualRain,
|
||
prevIssueBucket.Format("15:04"), learnStart.Format("15:04"), learnEnd.Format("15:04"),
|
||
formatWeights(preds, false))
|
||
}
|
||
stWeights.learnFromPreds(matrix, preds, valids, actualRain)
|
||
}
|
||
|
||
eff, _ := stWeights.effectiveForMatrix(matrix)
|
||
if debugLog {
|
||
// 以 provider=weight 的形式输出,顺序与 sources 一致
|
||
toNamed := func(ws []float64) string {
|
||
pairs := make([]string, 0, len(matrix))
|
||
for i, pm := range matrix {
|
||
pairs = append(pairs, fmt.Sprintf("%s=%.2f", pm.Provider, ws[i]))
|
||
}
|
||
return strings.Join(pairs, ", ")
|
||
}
|
||
log.Printf("学习前权重 %s ,本轮使用权重 %s", toNamed(effBefore), toNamed(eff))
|
||
}
|
||
|
||
fused := fuseForecastsWith(matrix, eff)
|
||
// 记录掩码前的融合雨量
|
||
var preMaskRain = []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain}
|
||
rainAllZero := isRainAllZero(fused)
|
||
|
||
maskApplied := false
|
||
if !rainAllZero && hasMaskCtx && !noMask {
|
||
rep, err := checkRadarMask(db, maskCtx, issued)
|
||
if err != nil {
|
||
log.Printf("站点 %s issued=%s 掩码计算出错: %v", info.ID, issued.Format(time.RFC3339), err)
|
||
} else {
|
||
if rep.Scanned {
|
||
// 逐小时按 mask_hours 应用
|
||
// 解析 mask_hours 上移至批次开始
|
||
// 在此应用逐小时清零
|
||
// 重用 maskHours 变量
|
||
for hidx := 0; hidx < 3; hidx++ {
|
||
if !maskHours[hidx] {
|
||
continue
|
||
}
|
||
if rep.HitsByHour[hidx] == 0 {
|
||
fused[hidx].Rain = 0
|
||
maskApplied = true
|
||
}
|
||
}
|
||
}
|
||
if debugLog {
|
||
log.Printf("mask t=%s wind=%.1fm/s %.0f° tiles=%d hits=[%d,%d,%d] applied=%v",
|
||
issued.Format("2006-01-02 15:04:05"), rep.WindSpeed, rep.WindDir, rep.Tiles, rep.HitsByHour[0], rep.HitsByHour[1], rep.HitsByHour[2], maskApplied)
|
||
}
|
||
}
|
||
}
|
||
|
||
if debugLog {
|
||
// 展示预报与掩码后的融合雨量 [h+1,h+2,h+3]
|
||
rp := []float64{fused[0].Rain, fused[1].Rain, fused[2].Rain}
|
||
log.Printf("预报 %s , mask=%v , 融合的结果 %s", formatWeights(preMaskRain, false), maskApplied, formatWeights(rp, false))
|
||
}
|
||
for h := 1; h <= 3; h++ {
|
||
results = append(results, fusedRecord{
|
||
StationID: info.ID,
|
||
IssuedAt: issued, // 输出按“小时桶”归一的发布时间
|
||
ForecastTime: issued.Add(time.Duration(h) * time.Hour),
|
||
Sample: fused[h-1],
|
||
MaskApplied: maskApplied,
|
||
})
|
||
}
|
||
|
||
// 学习已在融合前完成
|
||
|
||
step++
|
||
if progressFn != nil {
|
||
progressFn(step, totalSteps, fmt.Sprintf("%s #%d", info.ID, idx+1))
|
||
}
|
||
}
|
||
}
|
||
|
||
if len(results) == 0 {
|
||
log.Println("未生成任何融合结果")
|
||
return
|
||
}
|
||
|
||
sqlLines := buildSQL(results)
|
||
if err := os.WriteFile(outSQLPath, []byte(strings.Join(sqlLines, "\n")+"\n"), 0o644); err != nil {
|
||
log.Fatalf("写入SQL文件失败: %v", err)
|
||
}
|
||
log.Printf("已写出SQL到 %s(共 %d 条记录)", outSQLPath, len(results))
|
||
|
||
if applyUpserts {
|
||
if err := applySQL(db, results); err != nil {
|
||
log.Fatalf("写入数据库失败: %v", err)
|
||
}
|
||
log.Printf("已写入数据库 forecast_hourly(provider=%s)", outputProvider)
|
||
}
|
||
}
|
||
|
||
func parseRange(r string) (time.Time, time.Time, error) {
|
||
parts := strings.Split(r, ",")
|
||
if len(parts) != 2 {
|
||
return time.Time{}, time.Time{}, errors.New("格式应为 start,end")
|
||
}
|
||
start, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[0]), asiaShanghai)
|
||
if err != nil {
|
||
return time.Time{}, time.Time{}, err
|
||
}
|
||
end, err := time.ParseInLocation("2006-01-02 15:04:05", strings.TrimSpace(parts[1]), asiaShanghai)
|
||
if err != nil {
|
||
return time.Time{}, time.Time{}, err
|
||
}
|
||
return start, end, nil
|
||
}
|
||
|
||
func resolveStations(db *sql.DB, stationFlag string, includeAll bool) ([]stationInfo, error) {
|
||
if includeAll {
|
||
const q = `
|
||
SELECT station_id,
|
||
COALESCE(NULLIF(station_alias,''), station_id) AS alias,
|
||
latitude, longitude, z, y, x
|
||
FROM stations
|
||
WHERE latitude IS NOT NULL AND longitude IS NOT NULL
|
||
AND device_type='WH65LP'
|
||
ORDER BY station_id`
|
||
rows, err := db.Query(q)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
var out []stationInfo
|
||
for rows.Next() {
|
||
var s stationInfo
|
||
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err != nil {
|
||
return nil, err
|
||
}
|
||
out = append(out, s)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
list := fieldsFromFlag(stationFlag)
|
||
if len(list) == 0 {
|
||
return nil, nil
|
||
}
|
||
var out []stationInfo
|
||
const q = `
|
||
SELECT station_id,
|
||
COALESCE(NULLIF(station_alias,''), station_id) AS alias,
|
||
latitude, longitude, z, y, x
|
||
FROM stations
|
||
WHERE station_id=$1`
|
||
for _, id := range list {
|
||
var s stationInfo
|
||
err := db.QueryRow(q, id).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
|
||
if err != nil {
|
||
if err == sql.ErrNoRows {
|
||
log.Printf("警告:站点 %s 不存在,跳过", id)
|
||
continue
|
||
}
|
||
return nil, err
|
||
}
|
||
out = append(out, s)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func fieldsFromFlag(v string) []string {
|
||
if strings.TrimSpace(v) == "" {
|
||
return nil
|
||
}
|
||
parts := strings.Split(v, ",")
|
||
out := make([]string, 0, len(parts))
|
||
for _, p := range parts {
|
||
p = strings.TrimSpace(p)
|
||
if p != "" {
|
||
out = append(out, p)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func resolveProviderSet(_ *sql.DB, _ string, _ time.Time, _ time.Time) ([]string, error) {
|
||
// 固定三源,不做别名回退
|
||
return []string{providerOpenMeteo, providerCaiyun, providerImdroid}, nil
|
||
}
|
||
|
||
func hasProviderData(db *sql.DB, stationID, provider string, from, to time.Time) (bool, error) {
|
||
var cnt int
|
||
err := db.QueryRow(`
|
||
SELECT COUNT(*)
|
||
FROM forecast_hourly
|
||
WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`,
|
||
stationID, provider, from, to).Scan(&cnt)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
return cnt > 0, nil
|
||
}
|
||
|
||
func fetchIssuedHourBuckets(db *sql.DB, stationID string, from, to time.Time, providers []string, minSources int) ([]time.Time, error) {
|
||
if len(providers) != 3 {
|
||
return nil, fmt.Errorf("provider 列表长度异常: %v", providers)
|
||
}
|
||
if minSources < 1 {
|
||
minSources = 1
|
||
}
|
||
if minSources > 3 {
|
||
minSources = 3
|
||
}
|
||
const q = `
|
||
WITH base AS (
|
||
SELECT date_trunc('hour', issued_at) AS bucket, provider
|
||
FROM forecast_hourly
|
||
WHERE station_id=$1 AND provider IN ($2, $3, $4)
|
||
AND issued_at BETWEEN $5 AND $6
|
||
GROUP BY 1,2
|
||
)
|
||
SELECT bucket
|
||
FROM base
|
||
GROUP BY bucket
|
||
HAVING COUNT(DISTINCT provider) >= $7
|
||
ORDER BY bucket`
|
||
rows, err := db.Query(q, stationID, providers[0], providers[1], providers[2], from, to, minSources)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
var buckets []time.Time
|
||
for rows.Next() {
|
||
var t time.Time
|
||
if err := rows.Scan(&t); err != nil {
|
||
return nil, err
|
||
}
|
||
buckets = append(buckets, t.In(asiaShanghai))
|
||
}
|
||
return buckets, nil
|
||
}
|
||
|
||
// debugIssuedSummary 仅用于 --debug:统计provider在范围内的issued_at计数及首尾时间
|
||
func debugIssuedSummary(db *sql.DB, stationID string, from, to time.Time, provider string) (count int, first string, last string) {
|
||
const q = `
|
||
SELECT COUNT(*),
|
||
MIN(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS first,
|
||
MAX(to_char(issued_at, 'YYYY-MM-DD HH24:MI:SS')) AS last
|
||
FROM forecast_hourly
|
||
WHERE station_id=$1 AND provider=$2 AND issued_at BETWEEN $3 AND $4`
|
||
var f, l sql.NullString
|
||
if err := db.QueryRow(q, stationID, provider, from, to).Scan(&count, &f, &l); err != nil {
|
||
return 0, "", ""
|
||
}
|
||
if f.Valid {
|
||
first = f.String
|
||
}
|
||
if l.Valid {
|
||
last = l.String
|
||
}
|
||
return
|
||
}
|
||
|
||
func loadProviderMatrix(db *sql.DB, stationID string, bucketHour time.Time, providers []string) ([]providerMatrix, error) {
|
||
out := make([]providerMatrix, 0, len(providers))
|
||
for _, provider := range providers {
|
||
issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if !ok {
|
||
// 该源缺席,跳过
|
||
continue
|
||
}
|
||
samples, err := loadForecastSamples(db, stationID, provider, issued)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if len(samples) != 3 {
|
||
// 样本不足,视为该源缺席
|
||
continue
|
||
}
|
||
out = append(out, providerMatrix{Provider: provider, Samples: samples})
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// 在某个小时桶内,选取该 provider 的最新 issued_at([bucket, bucket+1h))
|
||
func resolveIssuedAtInBucket(db *sql.DB, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) {
|
||
const q = `
|
||
SELECT issued_at
|
||
FROM forecast_hourly
|
||
WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour'
|
||
ORDER BY issued_at DESC
|
||
LIMIT 1`
|
||
var t time.Time
|
||
err := db.QueryRow(q, stationID, provider, bucketHour).Scan(&t)
|
||
if err == sql.ErrNoRows {
|
||
return time.Time{}, false, nil
|
||
}
|
||
if err != nil {
|
||
return time.Time{}, false, err
|
||
}
|
||
return t, true, nil
|
||
}
|
||
|
||
func loadForecastSamples(db *sql.DB, stationID, provider string, issued time.Time) ([]forecastSample, error) {
|
||
const q = `
|
||
SELECT forecast_time,
|
||
rain_mm_x1000,
|
||
temp_c_x100,
|
||
humidity_pct,
|
||
wind_speed_ms_x1000,
|
||
wind_gust_ms_x1000,
|
||
wind_dir_deg,
|
||
precip_prob_pct,
|
||
pressure_hpa_x100,
|
||
uv_index
|
||
FROM forecast_hourly
|
||
WHERE station_id=$1 AND provider=$2 AND issued_at=$3
|
||
ORDER BY forecast_time ASC`
|
||
rows, err := db.Query(q, stationID, provider, issued)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
var list []forecastSample
|
||
for rows.Next() {
|
||
var (
|
||
ft time.Time
|
||
rain, temp, humidity, ws, gust, wdir, prob sql.NullInt64
|
||
pressure, uv sql.NullInt64
|
||
)
|
||
if err := rows.Scan(&ft, &rain, &temp, &humidity, &ws, &gust, &wdir, &prob, &pressure, &uv); err != nil {
|
||
return nil, err
|
||
}
|
||
// 使用“向上取整”的方式计算前视小时(与查询端 lead_hours=CEIL(...) 逻辑对齐)
|
||
dh := ft.Sub(issued).Hours()
|
||
hours := int(math.Ceil(dh - 1e-9))
|
||
if hours < 1 || hours > 3 {
|
||
continue
|
||
}
|
||
s := forecastSample{
|
||
Rain: float64(rain.Int64) / 1000.0,
|
||
Temp: float64(temp.Int64) / 100.0,
|
||
Humidity: float64(humidity.Int64),
|
||
WindSpeed: float64(ws.Int64) / 1000.0,
|
||
WindGust: float64(gust.Int64) / 1000.0,
|
||
WindDir: float64(wdir.Int64),
|
||
Prob: float64(prob.Int64),
|
||
Pressure: float64(pressure.Int64) / 100.0,
|
||
UV: float64(uv.Int64),
|
||
}
|
||
list = append(list, s)
|
||
}
|
||
return list, nil
|
||
}
|
||
|
||
// lead1RainAtIssued 返回某站点、某provider在给定小时桶(J)内的发行的 +1 小时降雨预测(若存在)。
|
||
func lead1RainAtIssued(db *sql.DB, stationID, provider string, bucketHour time.Time) (float64, bool) {
|
||
issued, ok, err := resolveIssuedAtInBucket(db, stationID, provider, bucketHour)
|
||
if err != nil || !ok {
|
||
return 0, false
|
||
}
|
||
samples, err := loadForecastSamples(db, stationID, provider, issued)
|
||
if err != nil || len(samples) < 1 {
|
||
return 0, false
|
||
}
|
||
return samples[0].Rain, true
|
||
}
|
||
|
||
func fuseForecasts(matrix []providerMatrix, weights []float64) []forecastSample {
|
||
result := make([]forecastSample, 3)
|
||
for h := 0; h < 3; h++ {
|
||
var windX, windY float64
|
||
for pIdx, pm := range matrix {
|
||
if len(pm.Samples) <= h {
|
||
continue
|
||
}
|
||
s := pm.Samples[h]
|
||
w := weights[pIdx]
|
||
result[h].Rain += s.Rain * w
|
||
result[h].Temp += s.Temp * w
|
||
result[h].Humidity += s.Humidity * w
|
||
result[h].WindSpeed += s.WindSpeed * w
|
||
result[h].WindGust += s.WindGust * w
|
||
result[h].Prob += s.Prob * w
|
||
result[h].Pressure += s.Pressure * w
|
||
result[h].UV += s.UV * w
|
||
rad := s.WindDir * math.Pi / 180
|
||
windX += math.Cos(rad) * w
|
||
windY += math.Sin(rad) * w
|
||
}
|
||
if windX == 0 && windY == 0 {
|
||
result[h].WindDir = 0
|
||
} else {
|
||
dir := math.Atan2(windY, windX) * 180 / math.Pi
|
||
if dir < 0 {
|
||
dir += 360
|
||
}
|
||
result[h].WindDir = dir
|
||
}
|
||
result[h].Humidity = math.Round(result[h].Humidity)
|
||
result[h].Prob = math.Round(result[h].Prob)
|
||
result[h].UV = math.Round(result[h].UV)
|
||
}
|
||
return result
|
||
}
|
||
|
||
// fuseForecastsWith 使用与 matrix 对齐的有效权重进行融合
|
||
func fuseForecastsWith(matrix []providerMatrix, eff []float64) []forecastSample {
|
||
if len(eff) != len(matrix) {
|
||
return fuseForecasts(matrix, uniform(len(matrix)))
|
||
}
|
||
result := make([]forecastSample, 3)
|
||
for h := 0; h < 3; h++ {
|
||
var windX, windY float64
|
||
for pIdx, pm := range matrix {
|
||
if len(pm.Samples) <= h {
|
||
continue
|
||
}
|
||
s := pm.Samples[h]
|
||
w := eff[pIdx]
|
||
result[h].Rain += s.Rain * w
|
||
result[h].Temp += s.Temp * w
|
||
result[h].Humidity += s.Humidity * w
|
||
result[h].WindSpeed += s.WindSpeed * w
|
||
result[h].WindGust += s.WindGust * w
|
||
result[h].Prob += s.Prob * w
|
||
result[h].Pressure += s.Pressure * w
|
||
result[h].UV += s.UV * w
|
||
rad := s.WindDir * math.Pi / 180
|
||
windX += math.Cos(rad) * w
|
||
windY += math.Sin(rad) * w
|
||
}
|
||
if windX == 0 && windY == 0 {
|
||
result[h].WindDir = 0
|
||
} else {
|
||
dir := math.Atan2(windY, windX) * 180 / math.Pi
|
||
if dir < 0 {
|
||
dir += 360
|
||
}
|
||
result[h].WindDir = dir
|
||
}
|
||
result[h].Humidity = math.Round(result[h].Humidity)
|
||
result[h].Prob = math.Round(result[h].Prob)
|
||
result[h].UV = math.Round(result[h].UV)
|
||
}
|
||
return result
|
||
}
|
||
|
||
func uniform(n int) []float64 {
|
||
if n <= 0 {
|
||
return nil
|
||
}
|
||
w := make([]float64, n)
|
||
v := 1.0 / float64(n)
|
||
for i := range w {
|
||
w[i] = v
|
||
}
|
||
return w
|
||
}
|
||
|
||
// weightState: 维护三源与各二源对的基础权重
|
||
type weightState struct {
|
||
providersAll []string
|
||
baseTriad []float64 // [om, cy, im] 按 providersAll 对齐
|
||
basePairs map[string][]float64 // 保留字段但不再使用成对权重
|
||
lastPresent map[string]bool // 上一次有效参与的provider集合
|
||
lastEff map[string]float64 // 上一次按provider的有效权重(仅在场时更新)
|
||
}
|
||
|
||
func newWeightState(providersAll []string) *weightState {
|
||
ws := &weightState{providersAll: providersAll}
|
||
ws.baseTriad = append([]float64{}, defaultWeights...)
|
||
ws.basePairs = map[string][]float64{}
|
||
ws.lastPresent = map[string]bool{}
|
||
ws.lastEff = map[string]float64{}
|
||
return ws
|
||
}
|
||
|
||
func (ws *weightState) idx(p string) int {
|
||
for i, v := range ws.providersAll {
|
||
if v == p {
|
||
return i
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func pairKey(a, b string) string {
|
||
if a < b {
|
||
return a + "|" + b
|
||
}
|
||
return b + "|" + a
|
||
}
|
||
|
||
// providersList: 用于debug展示参与源
|
||
func providersList(matrix []providerMatrix) string {
|
||
names := make([]string, len(matrix))
|
||
for i, pm := range matrix {
|
||
names[i] = pm.Provider
|
||
}
|
||
return fmt.Sprintf("%v", names)
|
||
}
|
||
|
||
// formatWeights 将权重格式化为字符串,如 "[0.70,0.10,0.20]"
|
||
// 若 quantizeTenths=true,则先按 0.1 就近取整再输出两位小数
|
||
func formatWeights(ws []float64, quantizeTenths bool) string {
|
||
if len(ws) == 0 {
|
||
return "[]"
|
||
}
|
||
var parts []string
|
||
for _, v := range ws {
|
||
if quantizeTenths {
|
||
v = math.Round(v*10.0) / 10.0
|
||
}
|
||
parts = append(parts, fmt.Sprintf("%.2f", v))
|
||
}
|
||
return "[" + strings.Join(parts, ",") + "]"
|
||
}
|
||
|
||
// effectiveForMatrix: 返回与 matrix 顺序对齐的有效权重(两源/三源),在子集上归一化
|
||
func (ws *weightState) effectiveForMatrix(matrix []providerMatrix) ([]float64, bool) {
|
||
// 根据当前在场provider集合,对 triad 进行“缺失转移/新增分配”并输出有效权重。
|
||
n := len(matrix)
|
||
eff := make([]float64, n)
|
||
if n == 0 {
|
||
return eff, false
|
||
}
|
||
|
||
// 建立当前在场集合
|
||
present := make(map[string]bool, n)
|
||
for _, pm := range matrix {
|
||
present[pm.Provider] = true
|
||
}
|
||
|
||
// 识别缺失与新增
|
||
var missing []string
|
||
var added []string
|
||
if len(ws.lastPresent) == 0 {
|
||
// 首次:按照 providersAll 判断缺失(需要立即应用缺失转移)
|
||
for _, p := range ws.providersAll {
|
||
if !present[p] {
|
||
missing = append(missing, p)
|
||
}
|
||
}
|
||
} else {
|
||
for p := range ws.lastPresent {
|
||
if !present[p] {
|
||
missing = append(missing, p)
|
||
}
|
||
}
|
||
for p := range present {
|
||
if !ws.lastPresent[p] {
|
||
added = append(added, p)
|
||
}
|
||
}
|
||
}
|
||
if debugEnabled && (len(missing) > 0 || len(added) > 0) {
|
||
// 稳定顺序:按 providersAll 顺序输出 present/added/removed
|
||
presList := make([]string, 0, len(present))
|
||
for _, p := range ws.providersAll {
|
||
if present[p] {
|
||
presList = append(presList, p)
|
||
}
|
||
}
|
||
// added/missing 也按 providersAll 顺序
|
||
orderIdx := func(p string) int { return ws.idx(p) }
|
||
sort.Slice(added, func(i, j int) bool { return orderIdx(added[i]) < orderIdx(added[j]) })
|
||
sort.Slice(missing, func(i, j int) bool { return orderIdx(missing[i]) < orderIdx(missing[j]) })
|
||
log.Printf("sources %v added=%v removed=%v", presList, added, missing)
|
||
}
|
||
|
||
// 缺失:将缺失源的权重转移给当前在场中权重最大的源,缺失源置0
|
||
if len(missing) > 0 && len(present) > 0 {
|
||
// 找当前在场的最大权重源
|
||
maxP := ""
|
||
maxW := -1.0
|
||
for p := range present {
|
||
w := ws.baseTriad[ws.idx(p)]
|
||
if w > maxW {
|
||
maxW, maxP = w, p
|
||
}
|
||
}
|
||
gain := 0.0
|
||
for _, m := range missing {
|
||
w := ws.baseTriad[ws.idx(m)]
|
||
if w > 0 {
|
||
gain += w
|
||
ws.baseTriad[ws.idx(m)] = 0
|
||
}
|
||
}
|
||
if maxP != "" && gain > 0 {
|
||
ws.baseTriad[ws.idx(maxP)] += gain
|
||
if debugEnabled {
|
||
tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
|
||
log.Printf("removed %v -> %s +%.2f triad %s",
|
||
missing, maxP, gain, formatWeights(tri, true))
|
||
}
|
||
}
|
||
}
|
||
|
||
// 新增:从当前最大源挪出0.2给新加入的源(每个新增一次性处理)
|
||
for _, a := range added {
|
||
// 仅在此前为0时进行分配,避免重复分配
|
||
if ws.baseTriad[ws.idx(a)] <= 0 {
|
||
// 找在场的最大源(不包含自身)
|
||
maxP := ""
|
||
maxW := -1.0
|
||
for p := range present {
|
||
if p == a {
|
||
continue
|
||
}
|
||
w := ws.baseTriad[ws.idx(p)]
|
||
if w > maxW {
|
||
maxW, maxP = w, p
|
||
}
|
||
}
|
||
if maxP != "" && maxW > 0 {
|
||
delta := 0.2
|
||
if maxW < delta {
|
||
delta = maxW
|
||
}
|
||
ws.baseTriad[ws.idx(maxP)] -= delta
|
||
ws.baseTriad[ws.idx(a)] += delta
|
||
if debugEnabled {
|
||
tri := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
|
||
log.Printf("added %s from %s %.2f triad %s",
|
||
a, maxP, delta, formatWeights(tri, true))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 更新 lastPresent 为当前集合
|
||
ws.lastPresent = present
|
||
|
||
// 组装有效权重,按 matrix 顺序输出。此时 triad 在场分量应合计为1。
|
||
for i, pm := range matrix {
|
||
eff[i] = ws.baseTriad[ws.idx(pm.Provider)]
|
||
}
|
||
// 不在此处打印权重对比,由主循环根据“学习前/后”输出
|
||
// 更新上一次有效权重
|
||
for i, pm := range matrix {
|
||
ws.lastEff[pm.Provider] = eff[i]
|
||
}
|
||
return eff, false
|
||
}
|
||
|
||
// learn: 只在本集合中做 +0.1/-0.1,学习后在该集合内归一化,并写回对应的基础权重集
|
||
func (ws *weightState) learn(matrix []providerMatrix, actual float64) {
|
||
n := len(matrix)
|
||
if n <= 1 {
|
||
return
|
||
}
|
||
preds := extractProviderRainAt(matrix, 0)
|
||
if len(preds) != n {
|
||
return
|
||
}
|
||
// 找出当前在场集合中的最优与最差
|
||
best, worst := 0, 0
|
||
for i := range preds {
|
||
if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
|
||
best = i
|
||
}
|
||
if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) {
|
||
worst = i
|
||
}
|
||
}
|
||
|
||
alpha := 0.1
|
||
pBest := matrix[best].Provider
|
||
pWorst := matrix[worst].Provider
|
||
|
||
// 仅在在场集合中调整,第三方可能为0(绝对不可信)且不参与两源学习
|
||
decCap := ws.baseTriad[ws.idx(pWorst)]
|
||
delta := alpha
|
||
if decCap < delta {
|
||
delta = decCap
|
||
}
|
||
if delta <= 0 {
|
||
return
|
||
}
|
||
prev := append([]float64{}, ws.baseTriad...)
|
||
ws.baseTriad[ws.idx(pWorst)] -= delta
|
||
ws.baseTriad[ws.idx(pBest)] += delta
|
||
if debugEnabled {
|
||
before := []float64{prev[ws.idx(providerOpenMeteo)], prev[ws.idx(providerCaiyun)], prev[ws.idx(providerImdroid)]}
|
||
after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
|
||
log.Printf("rain=%.3f best=%s worst=%s weights %s -> %s",
|
||
actual, pBest, pWorst, formatWeights(before, true), formatWeights(after, true))
|
||
}
|
||
}
|
||
|
||
// learnFromPreds: 使用外部提供的预测值(与 matrix 顺序对齐)进行学习。
|
||
func (ws *weightState) learnFromPreds(matrix []providerMatrix, preds []float64, valids []bool, actual float64) {
|
||
n := len(matrix)
|
||
if n == 0 || len(preds) != n || len(valids) != n {
|
||
return
|
||
}
|
||
// 仅在有效的、且当前权重>0 的来源中选择最优/最差
|
||
idxs := make([]int, 0, n)
|
||
for i := 0; i < n; i++ {
|
||
if !valids[i] {
|
||
continue
|
||
}
|
||
idxs = append(idxs, i)
|
||
}
|
||
if len(idxs) < 2 {
|
||
return
|
||
}
|
||
best, worst := idxs[0], idxs[0]
|
||
for _, i := range idxs {
|
||
if math.Abs(preds[i]-actual) < math.Abs(preds[best]-actual) {
|
||
best = i
|
||
}
|
||
if math.Abs(preds[i]-actual) > math.Abs(preds[worst]-actual) {
|
||
worst = i
|
||
}
|
||
}
|
||
alpha := 0.1
|
||
pBest := matrix[best].Provider
|
||
pWorst := matrix[worst].Provider
|
||
decCap := ws.baseTriad[ws.idx(pWorst)]
|
||
delta := alpha
|
||
if decCap < delta {
|
||
delta = decCap
|
||
}
|
||
if delta <= 0 {
|
||
return
|
||
}
|
||
// 日志:绝对误差详情(OM/CY/IM)和最优/最差
|
||
if debugEnabled {
|
||
name := func(p string) string { return p }
|
||
// 组装当前在场源的误差摘要
|
||
parts := make([]string, 0, len(matrix))
|
||
for i, pm := range matrix {
|
||
if !valids[i] || ws.baseTriad[ws.idx(pm.Provider)] <= 0 {
|
||
continue
|
||
}
|
||
parts = append(parts, fmt.Sprintf("%s |%.2f-%.3f|=%.3f", name(pm.Provider), preds[i], actual, math.Abs(preds[i]-actual)))
|
||
}
|
||
log.Printf("绝对误差:%s → best=%s,worst=%s。", strings.Join(parts, ","), name(pBest), name(pWorst))
|
||
}
|
||
ws.baseTriad[ws.idx(pWorst)] -= delta
|
||
ws.baseTriad[ws.idx(pBest)] += delta
|
||
if debugEnabled {
|
||
after := []float64{ws.baseTriad[ws.idx(providerOpenMeteo)], ws.baseTriad[ws.idx(providerCaiyun)], ws.baseTriad[ws.idx(providerImdroid)]}
|
||
log.Printf("按规则:best +0.1,worst −0.1 → 权重在下一轮将为 %s", formatWeights(after, true))
|
||
}
|
||
}
|
||
|
||
// probeProviderAtBucket: 调试辅助,打印该小时桶内每个provider的issued与是否有+1/+2/+3样本
|
||
func probeProviderAtBucket(db *sql.DB, stationID string, providers []string, bucket time.Time) {
|
||
for _, p := range providers {
|
||
issued, ok, err := resolveIssuedAtInBucket(db, stationID, p, bucket)
|
||
if err != nil {
|
||
log.Printf(" provider=%s 探测错误: %v", p, err)
|
||
continue
|
||
}
|
||
if !ok {
|
||
log.Printf(" provider=%s: 小时桶内无 issued", p)
|
||
continue
|
||
}
|
||
leads := presentLeads(db, stationID, p, issued)
|
||
log.Printf(" provider=%s: issued=%s leads=%v (count=%d)", p, issued.In(asiaShanghai).Format("2006-01-02 15:04:05"), leads, len(leads))
|
||
}
|
||
}
|
||
|
||
// presentLeads 返回该 issued 的预测中,存在的 lead 小时集合(1..3)
|
||
func presentLeads(db *sql.DB, stationID, provider string, issued time.Time) []int {
|
||
rows, err := db.Query(`SELECT forecast_time FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at=$3`, stationID, provider, issued)
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
defer rows.Close()
|
||
var leads []int
|
||
for rows.Next() {
|
||
var ft time.Time
|
||
if err := rows.Scan(&ft); err != nil {
|
||
continue
|
||
}
|
||
dh := ft.Sub(issued).Hours()
|
||
h := int(math.Ceil(dh - 1e-9))
|
||
if h >= 1 && h <= 3 {
|
||
leads = append(leads, h)
|
||
}
|
||
}
|
||
return leads
|
||
}
|
||
|
||
func isRainAllZero(samples []forecastSample) bool {
|
||
for _, s := range samples {
|
||
if math.Abs(s.Rain) > 1e-6 {
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
func extractProviderRainAt(matrix []providerMatrix, hourIdx int) []float64 {
|
||
out := make([]float64, 0, len(matrix))
|
||
for _, pm := range matrix {
|
||
if len(pm.Samples) > hourIdx {
|
||
out = append(out, pm.Samples[hourIdx].Rain)
|
||
} else {
|
||
out = append(out, 0)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func adjustWeights(current []float64, preds []float64, actual float64) []float64 {
|
||
if len(current) != len(preds) || len(current) != 3 {
|
||
return current
|
||
}
|
||
diffs := make([]float64, len(preds))
|
||
for i := range preds {
|
||
diffs[i] = math.Abs(preds[i] - actual)
|
||
}
|
||
minIdx, maxIdx := 0, 0
|
||
for i, d := range diffs {
|
||
if d < diffs[minIdx] {
|
||
minIdx = i
|
||
}
|
||
if d > diffs[maxIdx] {
|
||
maxIdx = i
|
||
}
|
||
}
|
||
if minIdx != maxIdx {
|
||
current[minIdx] += 0.1
|
||
current[maxIdx] -= 0.1
|
||
}
|
||
var sum float64
|
||
for i := range current {
|
||
if current[i] < 0 {
|
||
current[i] = 0
|
||
}
|
||
sum += current[i]
|
||
}
|
||
if sum == 0 {
|
||
return append([]float64{}, defaultWeights...)
|
||
}
|
||
for i := range current {
|
||
current[i] /= sum
|
||
}
|
||
return current
|
||
}
|
||
|
||
func fetchActualHourlyRain(db *sql.DB, stationID string, start, end time.Time) (float64, bool, error) {
|
||
const q = `
|
||
SELECT SUM(rain_10m_mm_x1000)
|
||
FROM rs485_weather_10min
|
||
WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
|
||
var sum sql.NullInt64
|
||
err := db.QueryRow(q, stationID, start, end).Scan(&sum)
|
||
if err != nil {
|
||
return 0, false, err
|
||
}
|
||
if !sum.Valid {
|
||
return 0, false, nil
|
||
}
|
||
return float64(sum.Int64) / 1000.0, true, nil
|
||
}
|
||
|
||
type tileRec struct {
|
||
DT time.Time
|
||
Width int
|
||
Height int
|
||
West float64
|
||
South float64
|
||
East float64
|
||
North float64
|
||
ResDeg float64
|
||
Data []byte
|
||
}
|
||
|
||
type maskReport struct {
|
||
Scanned bool
|
||
WindOK bool
|
||
UsedAlias string
|
||
WindSpeed float64
|
||
WindDir float64
|
||
Tiles int
|
||
HitsByHour [3]int
|
||
}
|
||
|
||
func checkRadarMask(db *sql.DB, ctx maskContext, issued time.Time) (maskReport, error) {
|
||
rep := maskReport{}
|
||
windTime := issued.Add(-time.Hour)
|
||
// 优先用 station_id,其次别名
|
||
if spd, dir, ok, err := loadWindByAlias(db, ctx.id, windTime); err != nil {
|
||
return rep, err
|
||
} else if ok {
|
||
rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd, dir, ctx.id
|
||
} else if spd2, dir2, ok2, err2 := loadWindByAlias(db, ctx.alias, windTime); err2 != nil {
|
||
return rep, err2
|
||
} else if ok2 {
|
||
rep.WindOK, rep.WindSpeed, rep.WindDir, rep.UsedAlias = true, spd2, dir2, ctx.alias
|
||
} else {
|
||
// 没有风场:不扫描,不应用掩码
|
||
return rep, nil
|
||
}
|
||
|
||
tiles, err := loadTiles(db, ctx, issued.Add(-time.Hour), issued)
|
||
if err != nil {
|
||
return rep, err
|
||
}
|
||
if len(tiles) == 0 {
|
||
// 无瓦片:不扫描,不应用掩码
|
||
return rep, nil
|
||
}
|
||
rep.Tiles = len(tiles)
|
||
|
||
rep.Scanned = true
|
||
const circleR = 8000.0
|
||
const halfAngle = 30.0
|
||
|
||
for _, t := range tiles {
|
||
vals, xs, ys, err := decodeTile(t)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
for r := 0; r < len(vals); r++ {
|
||
row := vals[r]
|
||
lat := ys[r]
|
||
for c := 0; c < len(row); c++ {
|
||
v := row[c]
|
||
if v == nil {
|
||
continue
|
||
}
|
||
dbz := *v
|
||
if dbz < 40 {
|
||
continue
|
||
}
|
||
lon := xs[c]
|
||
dist := haversine(ctx.lat, ctx.lon, lat, lon)
|
||
// +1h:圆 或 扇形(3h)
|
||
if dist <= circleR {
|
||
rep.HitsByHour[0]++
|
||
}
|
||
if rep.WindSpeed > 0 {
|
||
brg := bearingDeg(ctx.lat, ctx.lon, lat, lon)
|
||
if angDiff(brg, rep.WindDir) <= halfAngle {
|
||
if dist <= rep.WindSpeed*3*3600 {
|
||
rep.HitsByHour[0]++
|
||
}
|
||
// +2h:圆 或 扇形(2h)
|
||
if dist <= circleR {
|
||
rep.HitsByHour[1]++
|
||
}
|
||
if dist <= rep.WindSpeed*2*3600 {
|
||
rep.HitsByHour[1]++
|
||
}
|
||
// +3h:扇形(3h)
|
||
if dist <= rep.WindSpeed*3*3600 {
|
||
rep.HitsByHour[2]++
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
return rep, nil
|
||
}
|
||
|
||
func loadWindByAlias(db *sql.DB, alias string, target time.Time) (float64, float64, bool, error) {
|
||
const q = `
|
||
SELECT wind_speed, wind_direction
|
||
FROM radar_weather
|
||
WHERE alias=$1 AND dt <= $2
|
||
ORDER BY dt DESC
|
||
LIMIT 1`
|
||
var speed, dir sql.NullFloat64
|
||
err := db.QueryRow(q, alias, target).Scan(&speed, &dir)
|
||
if err == sql.ErrNoRows {
|
||
return 0, 0, false, nil
|
||
}
|
||
if err != nil {
|
||
return 0, 0, false, err
|
||
}
|
||
if !speed.Valid || !dir.Valid {
|
||
return 0, 0, false, nil
|
||
}
|
||
return speed.Float64, dir.Float64, true, nil
|
||
}
|
||
|
||
func loadTiles(db *sql.DB, ctx maskContext, from, to time.Time) ([]tileRec, error) {
|
||
const q = `
|
||
SELECT dt, width, height, west, south, east, north, res_deg, data
|
||
FROM radar_tiles
|
||
WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5
|
||
ORDER BY dt`
|
||
rows, err := db.Query(q, ctx.z, ctx.y, ctx.x, from, to)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
var list []tileRec
|
||
for rows.Next() {
|
||
var t tileRec
|
||
if err := rows.Scan(&t.DT, &t.Width, &t.Height, &t.West, &t.South, &t.East, &t.North, &t.ResDeg, &t.Data); err != nil {
|
||
return nil, err
|
||
}
|
||
list = append(list, t)
|
||
}
|
||
return list, nil
|
||
}
|
||
|
||
func decodeTile(t tileRec) ([][]*float64, []float64, []float64, error) {
|
||
w, h := t.Width, t.Height
|
||
if w <= 0 || h <= 0 {
|
||
return nil, nil, nil, fmt.Errorf("非法瓦片尺寸")
|
||
}
|
||
if len(t.Data) < w*h*2 {
|
||
return nil, nil, nil, fmt.Errorf("数据长度不足")
|
||
}
|
||
xs := make([]float64, w)
|
||
for c := 0; c < w; c++ {
|
||
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
|
||
}
|
||
ys := make([]float64, h)
|
||
for r := 0; r < h; r++ {
|
||
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
|
||
}
|
||
vals := make([][]*float64, h)
|
||
off := 0
|
||
for r := 0; r < h; r++ {
|
||
row := make([]*float64, w)
|
||
for c := 0; c < w; c++ {
|
||
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
|
||
off += 2
|
||
if v >= 32766 {
|
||
row[c] = nil
|
||
continue
|
||
}
|
||
dbz := float64(v) / 10.0
|
||
if dbz < 0 {
|
||
dbz = 0
|
||
} else if dbz > 75 {
|
||
dbz = 75
|
||
}
|
||
value := dbz
|
||
row[c] = &value
|
||
}
|
||
vals[r] = row
|
||
}
|
||
return vals, xs, ys, nil
|
||
}
|
||
|
||
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
|
||
const R = 6371000.0
|
||
dLat := toRad(lat2 - lat1)
|
||
dLon := toRad(lon2 - lon1)
|
||
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
|
||
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
|
||
return R * c
|
||
}
|
||
|
||
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
|
||
φ1 := toRad(lat1)
|
||
φ2 := toRad(lat2)
|
||
Δλ := toRad(lon2 - lon1)
|
||
y := math.Sin(Δλ) * math.Cos(φ2)
|
||
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
|
||
brg := toDeg(math.Atan2(y, x))
|
||
if brg < 0 {
|
||
brg += 360
|
||
}
|
||
return brg
|
||
}
|
||
|
||
func angDiff(a, b float64) float64 {
|
||
d := math.Mod(a-b+540, 360) - 180
|
||
if d < 0 {
|
||
d = -d
|
||
}
|
||
return math.Abs(d)
|
||
}
|
||
|
||
func toRad(d float64) float64 { return d * math.Pi / 180 }
|
||
func toDeg(r float64) float64 { return r * 180 / math.Pi }
|
||
|
||
func buildMaskContext(info stationInfo) (maskContext, bool) {
|
||
if !info.Lat.Valid || !info.Lon.Valid {
|
||
return maskContext{}, false
|
||
}
|
||
if !info.Z.Valid || !info.Y.Valid || !info.X.Valid {
|
||
return maskContext{}, false
|
||
}
|
||
alias := info.Alias
|
||
if strings.TrimSpace(alias) == "" {
|
||
alias = info.ID
|
||
}
|
||
return maskContext{
|
||
id: info.ID,
|
||
alias: alias,
|
||
lat: info.Lat.Float64,
|
||
lon: info.Lon.Float64,
|
||
z: int(info.Z.Int64),
|
||
y: int(info.Y.Int64),
|
||
x: int(info.X.Int64),
|
||
}, true
|
||
}
|
||
|
||
func buildSQL(records []fusedRecord) []string {
|
||
lines := make([]string, 0, len(records)*2)
|
||
header := "-- SQL 输出 (默认未执行)"
|
||
lines = append(lines, header)
|
||
for _, rec := range records {
|
||
sql := fmt.Sprintf(
|
||
"INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000, wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index)\nVALUES ('%s', '%s', '%s'::timestamptz, '%s'::timestamptz, %d, %d, %d, %d, %d, %d, %d, %d, %d)\nON CONFLICT (station_id, provider, issued_at, forecast_time)\nDO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000, temp_c_x100=EXCLUDED.temp_c_x100, humidity_pct=EXCLUDED.humidity_pct, wind_speed_ms_x1000=EXCLUDED.wind_speed_ms_x1000, wind_gust_ms_x1000=EXCLUDED.wind_gust_ms_x1000, wind_dir_deg=EXCLUDED.wind_dir_deg, precip_prob_pct=EXCLUDED.precip_prob_pct, pressure_hpa_x100=EXCLUDED.pressure_hpa_x100, uv_index=EXCLUDED.uv_index;",
|
||
rec.StationID,
|
||
outputProvider,
|
||
rec.IssuedAt.In(asiaShanghai).Format(time.RFC3339),
|
||
rec.ForecastTime.In(asiaShanghai).Format(time.RFC3339),
|
||
int(math.Round(rec.Sample.Rain*1000)),
|
||
int(math.Round(rec.Sample.Temp*100)),
|
||
int(math.Round(rec.Sample.Humidity)),
|
||
int(math.Round(rec.Sample.WindSpeed*1000)),
|
||
int(math.Round(rec.Sample.WindGust*1000)),
|
||
int(math.Round(rec.Sample.WindDir)),
|
||
int(math.Round(rec.Sample.Prob)),
|
||
int(math.Round(rec.Sample.Pressure*100)),
|
||
int(math.Round(rec.Sample.UV)),
|
||
)
|
||
lines = append(lines, sql, "")
|
||
}
|
||
return lines
|
||
}
|
||
|
||
func applySQL(db *sql.DB, records []fusedRecord) error {
|
||
tx, err := db.Begin()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
stmt, err := tx.Prepare(`
|
||
INSERT INTO forecast_hourly (
|
||
station_id, provider, issued_at, forecast_time,
|
||
rain_mm_x1000, temp_c_x100, humidity_pct, wind_speed_ms_x1000,
|
||
wind_gust_ms_x1000, wind_dir_deg, precip_prob_pct, pressure_hpa_x100, uv_index
|
||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
||
ON CONFLICT (station_id, provider, issued_at, forecast_time)
|
||
DO UPDATE SET
|
||
rain_mm_x1000 = EXCLUDED.rain_mm_x1000,
|
||
temp_c_x100 = EXCLUDED.temp_c_x100,
|
||
humidity_pct = EXCLUDED.humidity_pct,
|
||
wind_speed_ms_x1000 = EXCLUDED.wind_speed_ms_x1000,
|
||
wind_gust_ms_x1000 = EXCLUDED.wind_gust_ms_x1000,
|
||
wind_dir_deg = EXCLUDED.wind_dir_deg,
|
||
precip_prob_pct = EXCLUDED.precip_prob_pct,
|
||
pressure_hpa_x100 = EXCLUDED.pressure_hpa_x100,
|
||
uv_index = EXCLUDED.uv_index`)
|
||
if err != nil {
|
||
tx.Rollback()
|
||
return err
|
||
}
|
||
defer stmt.Close()
|
||
|
||
for _, rec := range records {
|
||
if _, err := stmt.Exec(
|
||
rec.StationID,
|
||
outputProvider,
|
||
rec.IssuedAt,
|
||
rec.ForecastTime,
|
||
int(math.Round(rec.Sample.Rain*1000)),
|
||
int(math.Round(rec.Sample.Temp*100)),
|
||
int(math.Round(rec.Sample.Humidity)),
|
||
int(math.Round(rec.Sample.WindSpeed*1000)),
|
||
int(math.Round(rec.Sample.WindGust*1000)),
|
||
int(math.Round(rec.Sample.WindDir)),
|
||
int(math.Round(rec.Sample.Prob)),
|
||
int(math.Round(rec.Sample.Pressure*100)),
|
||
int(math.Round(rec.Sample.UV)),
|
||
); err != nil {
|
||
tx.Rollback()
|
||
return err
|
||
}
|
||
}
|
||
return tx.Commit()
|
||
}
|