Compare commits
No commits in common. "6d276b1760c2c9cf18ffa3086d3b7ca8f5793bf2" and "19ab65dec6c4aa7ac7e92015548d7d7144555081" have entirely different histories.
6d276b1760
...
19ab65dec6
@ -1,279 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"encoding/csv"
|
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Minimal Caiyun hourly model focusing on required fields.
|
|
||||||
type caiyunHourly struct {
|
|
||||||
Status string `json:"status"`
|
|
||||||
Result struct {
|
|
||||||
Hourly struct {
|
|
||||||
Status string `json:"status"`
|
|
||||||
Temperature []valTime `json:"temperature"`
|
|
||||||
Humidity []valTime `json:"humidity"`
|
|
||||||
Visibility []valTime `json:"visibility"`
|
|
||||||
Dswrf []valTime `json:"dswrf"`
|
|
||||||
Pressure []valTime `json:"pressure"`
|
|
||||||
Wind []windTime `json:"wind"`
|
|
||||||
} `json:"hourly"`
|
|
||||||
} `json:"result"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type valTime struct {
|
|
||||||
Datetime string `json:"datetime"`
|
|
||||||
Value float64 `json:"value"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type windTime struct {
|
|
||||||
Datetime string `json:"datetime"`
|
|
||||||
Speed float64 `json:"speed"`
|
|
||||||
Direction float64 `json:"direction"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type row struct {
|
|
||||||
t time.Time
|
|
||||||
temperature *float64
|
|
||||||
humidity *float64
|
|
||||||
windSpeed *float64
|
|
||||||
windDir *float64
|
|
||||||
pressure *float64
|
|
||||||
visibility *float64
|
|
||||||
dswrf *float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var file string
|
|
||||||
var tz string
|
|
||||||
var mode string
|
|
||||||
var alias string
|
|
||||||
var lat float64
|
|
||||||
var lon float64
|
|
||||||
var sqlTable string
|
|
||||||
flag.StringVar(&file, "file", "", "Path to Caiyun hourly JSON; if empty, read from stdin")
|
|
||||||
flag.StringVar(&tz, "tz", "Asia/Shanghai", "Timezone for output timestamps")
|
|
||||||
flag.StringVar(&mode, "mode", "csv", "Output mode: csv | sql")
|
|
||||||
flag.StringVar(&alias, "alias", "", "Station alias for SQL output (required for mode=sql)")
|
|
||||||
flag.Float64Var(&lat, "lat", 0, "Latitude for SQL output")
|
|
||||||
flag.Float64Var(&lon, "lon", 0, "Longitude for SQL output")
|
|
||||||
flag.StringVar(&sqlTable, "sqltable", "radar_weather", "SQL table name for inserts")
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
var r io.Reader
|
|
||||||
if file == "" {
|
|
||||||
r = bufio.NewReader(os.Stdin)
|
|
||||||
} else {
|
|
||||||
f, err := os.Open(file)
|
|
||||||
if err != nil {
|
|
||||||
fatalf("open file: %v", err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
r = f
|
|
||||||
}
|
|
||||||
|
|
||||||
var payload caiyunHourly
|
|
||||||
dec := json.NewDecoder(r)
|
|
||||||
if err := dec.Decode(&payload); err != nil {
|
|
||||||
fatalf("decode json: %v", err)
|
|
||||||
}
|
|
||||||
if strings.ToLower(payload.Status) != "ok" && payload.Status != "" {
|
|
||||||
fatalf("top-level status not ok: %s", payload.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
loc, _ := time.LoadLocation(tz)
|
|
||||||
if loc == nil {
|
|
||||||
loc = time.FixedZone("CST", 8*3600)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge series by timestamp
|
|
||||||
rowsByTime := map[time.Time]*row{}
|
|
||||||
upsert := func(ts string) *row {
|
|
||||||
t, ok := parseTime(ts, loc)
|
|
||||||
if !ok {
|
|
||||||
fatalf("parse time failed: %s", ts)
|
|
||||||
}
|
|
||||||
if v, exists := rowsByTime[t]; exists {
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
nr := &row{t: t}
|
|
||||||
rowsByTime[t] = nr
|
|
||||||
return nr
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, v := range payload.Result.Hourly.Temperature {
|
|
||||||
rr := upsert(v.Datetime)
|
|
||||||
rr.temperature = ptr(v.Value)
|
|
||||||
}
|
|
||||||
for _, v := range payload.Result.Hourly.Humidity {
|
|
||||||
rr := upsert(v.Datetime)
|
|
||||||
rr.humidity = ptr(v.Value)
|
|
||||||
}
|
|
||||||
for _, v := range payload.Result.Hourly.Visibility {
|
|
||||||
rr := upsert(v.Datetime)
|
|
||||||
rr.visibility = ptr(v.Value)
|
|
||||||
}
|
|
||||||
for _, v := range payload.Result.Hourly.Dswrf {
|
|
||||||
rr := upsert(v.Datetime)
|
|
||||||
rr.dswrf = ptr(v.Value)
|
|
||||||
}
|
|
||||||
for _, v := range payload.Result.Hourly.Pressure {
|
|
||||||
rr := upsert(v.Datetime)
|
|
||||||
rr.pressure = ptr(v.Value)
|
|
||||||
}
|
|
||||||
for _, w := range payload.Result.Hourly.Wind {
|
|
||||||
rr := upsert(w.Datetime)
|
|
||||||
rr.windSpeed = ptr(w.Speed)
|
|
||||||
rr.windDir = ptr(w.Direction)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sort by time
|
|
||||||
times := make([]time.Time, 0, len(rowsByTime))
|
|
||||||
for t := range rowsByTime {
|
|
||||||
times = append(times, t)
|
|
||||||
}
|
|
||||||
sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) })
|
|
||||||
|
|
||||||
if mode == "sql" {
|
|
||||||
if alias == "" {
|
|
||||||
fatalf("-alias is required for mode=sql")
|
|
||||||
}
|
|
||||||
// Emit upserts into radar_weather; convert wind_speed km/h -> m/s, keep humidity as ratio (0..1)
|
|
||||||
fmt.Println("BEGIN;")
|
|
||||||
for _, t := range times {
|
|
||||||
rr := rowsByTime[t]
|
|
||||||
// wind speed conversion
|
|
||||||
var ws string
|
|
||||||
if rr.windSpeed != nil {
|
|
||||||
v := *rr.windSpeed / 3.6
|
|
||||||
ws = trimZeros(fmt.Sprintf("%.6f", v))
|
|
||||||
}
|
|
||||||
// Build SQL with NULLs where missing
|
|
||||||
q := fmt.Sprintf(
|
|
||||||
"INSERT INTO %s (alias, lat, lon, dt, temperature, humidity, cloudrate, visibility, dswrf, wind_speed, wind_direction, pressure) "+
|
|
||||||
"VALUES (%s, %s, %s, %s, %s, %s, NULL, %s, %s, %s, %s, %s) "+
|
|
||||||
"ON CONFLICT (alias, dt) DO UPDATE SET "+
|
|
||||||
"lat=EXCLUDED.lat, lon=EXCLUDED.lon, temperature=EXCLUDED.temperature, humidity=EXCLUDED.humidity, "+
|
|
||||||
"visibility=EXCLUDED.visibility, dswrf=EXCLUDED.dswrf, wind_speed=EXCLUDED.wind_speed, "+
|
|
||||||
"wind_direction=EXCLUDED.wind_direction, pressure=EXCLUDED.pressure;",
|
|
||||||
sqlTable,
|
|
||||||
sqlQuote(alias),
|
|
||||||
sqlNum(lat),
|
|
||||||
sqlNum(lon),
|
|
||||||
sqlTime(t),
|
|
||||||
sqlOpt(rr.temperature),
|
|
||||||
sqlOpt(rr.humidity),
|
|
||||||
sqlOpt(rr.visibility),
|
|
||||||
sqlOpt(rr.dswrf),
|
|
||||||
sqlStrOrNull(ws),
|
|
||||||
sqlOpt(rr.windDir),
|
|
||||||
sqlOpt(rr.pressure),
|
|
||||||
)
|
|
||||||
fmt.Println(q)
|
|
||||||
}
|
|
||||||
fmt.Println("COMMIT;")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// CSV output
|
|
||||||
w := csv.NewWriter(os.Stdout)
|
|
||||||
_ = w.Write([]string{"datetime", "temperature", "humidity", "wind_speed", "wind_direction", "pressure", "visibility", "dswrf"})
|
|
||||||
for _, t := range times {
|
|
||||||
rr := rowsByTime[t]
|
|
||||||
var ws string
|
|
||||||
if rr.windSpeed != nil {
|
|
||||||
v := *rr.windSpeed / 3.6
|
|
||||||
ws = trimZeros(fmt.Sprintf("%.6f", v))
|
|
||||||
}
|
|
||||||
rec := []string{
|
|
||||||
t.Format("2006-01-02 15:04:05"),
|
|
||||||
optf(rr.temperature),
|
|
||||||
optf(rr.humidity),
|
|
||||||
ws,
|
|
||||||
optf(rr.windDir),
|
|
||||||
optf(rr.pressure),
|
|
||||||
optf(rr.visibility),
|
|
||||||
optf(rr.dswrf),
|
|
||||||
}
|
|
||||||
_ = w.Write(rec)
|
|
||||||
}
|
|
||||||
w.Flush()
|
|
||||||
if err := w.Error(); err != nil {
|
|
||||||
fatalf("write csv: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func ptr(f float64) *float64 { return &f }
|
|
||||||
|
|
||||||
func optf(p *float64) string {
|
|
||||||
if p == nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
// Trim trailing zeros via fmt
|
|
||||||
return trimZeros(fmt.Sprintf("%.6f", *p))
|
|
||||||
}
|
|
||||||
|
|
||||||
func trimZeros(s string) string {
|
|
||||||
if !strings.Contains(s, ".") {
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
s = strings.TrimRight(s, "0")
|
|
||||||
s = strings.TrimRight(s, ".")
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseTime attempts RFC3339 and common Caiyun formats without seconds.
|
|
||||||
func parseTime(s string, loc *time.Location) (time.Time, bool) {
|
|
||||||
// Try RFC3339 first
|
|
||||||
if t, err := time.Parse(time.RFC3339, s); err == nil {
|
|
||||||
return t.In(loc), true
|
|
||||||
}
|
|
||||||
// Try without seconds, with offset, e.g. 2006-01-02T15:04+08:00
|
|
||||||
if t, err := time.Parse("2006-01-02T15:04-07:00", s); err == nil {
|
|
||||||
return t.In(loc), true
|
|
||||||
}
|
|
||||||
// Try without offset (assume loc)
|
|
||||||
if t, err := time.ParseInLocation("2006-01-02 15:04", s, loc); err == nil {
|
|
||||||
return t.In(loc), true
|
|
||||||
}
|
|
||||||
return time.Time{}, false
|
|
||||||
}
|
|
||||||
|
|
||||||
func fatalf(format string, args ...any) {
|
|
||||||
fmt.Fprintf(os.Stderr, format+"\n", args...)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func sqlQuote(s string) string {
|
|
||||||
return "'" + strings.ReplaceAll(s, "'", "''") + "'"
|
|
||||||
}
|
|
||||||
|
|
||||||
func sqlNum(f float64) string {
|
|
||||||
return trimZeros(fmt.Sprintf("%.8f", f))
|
|
||||||
}
|
|
||||||
|
|
||||||
func sqlTime(t time.Time) string {
|
|
||||||
return sqlQuote(t.Format("2006-01-02 15:04:05"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func sqlOpt(p *float64) string {
|
|
||||||
if p == nil {
|
|
||||||
return "NULL"
|
|
||||||
}
|
|
||||||
return trimZeros(fmt.Sprintf("%.6f", *p))
|
|
||||||
}
|
|
||||||
|
|
||||||
func sqlStrOrNull(s string) string {
|
|
||||||
if s == "" {
|
|
||||||
return "NULL"
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
@ -1,448 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"encoding/binary"
|
|
||||||
"encoding/csv"
|
|
||||||
"errors"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
"weatherstation/internal/database"
|
|
||||||
)
|
|
||||||
|
|
||||||
type stationInfo struct {
|
|
||||||
ID string
|
|
||||||
Alias string
|
|
||||||
Lat float64
|
|
||||||
Lon float64
|
|
||||||
Z int
|
|
||||||
Y int
|
|
||||||
X int
|
|
||||||
}
|
|
||||||
|
|
||||||
type tileRec struct {
|
|
||||||
DT time.Time
|
|
||||||
Width, Height int
|
|
||||||
West, South float64
|
|
||||||
East, North float64
|
|
||||||
ResDeg float64
|
|
||||||
Data []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var stationID string
|
|
||||||
var startStr string
|
|
||||||
var endStr string
|
|
||||||
var outPath string
|
|
||||||
var verbose bool
|
|
||||||
|
|
||||||
flag.StringVar(&stationID, "station_id", "", "站点ID(留空表示全部WH65LP且有经纬度的站)")
|
|
||||||
flag.StringVar(&startStr, "start", "", "起始时间(YYYY-MM-DD HH:MM:SS,CST)")
|
|
||||||
flag.StringVar(&endStr, "end", "", "结束时间(YYYY-MM-DD HH:MM:SS,CST)")
|
|
||||||
flag.StringVar(&outPath, "out", "radar_stats.csv", "输出CSV文件路径")
|
|
||||||
flag.BoolVar(&verbose, "info", false, "输出详细过程信息")
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
if strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" {
|
|
||||||
log.Fatalln("必须提供 --start 与 --end,格式 YYYY-MM-DD HH:MM:SS")
|
|
||||||
}
|
|
||||||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
|
||||||
if loc == nil {
|
|
||||||
loc = time.FixedZone("CST", 8*3600)
|
|
||||||
}
|
|
||||||
startT, err := time.ParseInLocation("2006-01-02 15:04:05", startStr, loc)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("解析 start 失败: %v", err)
|
|
||||||
}
|
|
||||||
endT, err := time.ParseInLocation("2006-01-02 15:04:05", endStr, loc)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("解析 end 失败: %v", err)
|
|
||||||
}
|
|
||||||
if !endT.After(startT) {
|
|
||||||
log.Fatalln("结束时间必须大于起始时间")
|
|
||||||
}
|
|
||||||
|
|
||||||
// 初始化数据库
|
|
||||||
_ = database.GetDB()
|
|
||||||
defer database.Close()
|
|
||||||
|
|
||||||
// 获取站点列表
|
|
||||||
stations, err := listStations(database.GetDB(), stationID)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("查询站点失败: %v", err)
|
|
||||||
}
|
|
||||||
if len(stations) == 0 {
|
|
||||||
log.Fatalln("没有符合条件的站点")
|
|
||||||
}
|
|
||||||
if verbose {
|
|
||||||
log.Printf("站点数量: %d", len(stations))
|
|
||||||
for _, s := range stations {
|
|
||||||
log.Printf("站点: id=%s alias=%s lat=%.5f lon=%.5f z/y/x=%d/%d/%d", s.ID, s.Alias, s.Lat, s.Lon, s.Z, s.Y, s.X)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建CSV
|
|
||||||
f, err := os.Create(outPath)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("创建输出文件失败: %v", err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
w := csv.NewWriter(f)
|
|
||||||
defer w.Flush()
|
|
||||||
|
|
||||||
header := []string{
|
|
||||||
"station_id", "station_alias", "dt", "lat", "lon", "wind_speed_ms", "wind_dir_deg",
|
|
||||||
"sector_ge40_cnt", "sector_ge40_sum", "sector_ge30_cnt", "sector_ge30_sum",
|
|
||||||
"circle_ge40_cnt", "circle_ge40_sum", "circle_ge30_cnt", "circle_ge30_sum",
|
|
||||||
"rs485_rain_total_mm",
|
|
||||||
}
|
|
||||||
if err := w.Write(header); err != nil {
|
|
||||||
log.Fatalf("写入CSV表头失败: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
totalRows := 0
|
|
||||||
var totalTiles, skipNoZYX, skipNoWind, skipDecode int
|
|
||||||
for _, s := range stations {
|
|
||||||
if s.Z == 0 && s.Y == 0 && s.X == 0 {
|
|
||||||
log.Printf("跳过站点 %s(无z/y/x映射)", s.ID)
|
|
||||||
skipNoZYX++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
tiles, err := listTiles(ctx, database.GetDB(), s.Z, s.Y, s.X, startT, endT)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("查询瓦片失败 station=%s: %v", s.ID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
totalTiles += len(tiles)
|
|
||||||
if verbose {
|
|
||||||
log.Printf("站点 %s 瓦片数量: %d", s.ID, len(tiles))
|
|
||||||
}
|
|
||||||
if len(tiles) == 0 {
|
|
||||||
log.Printf("站点 %s 在范围内无瓦片", s.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, t := range tiles {
|
|
||||||
// 10分钟向下取整时间(bucket)
|
|
||||||
bucket := bucket10(t.DT, loc)
|
|
||||||
// NOTE: 按需改为用 station_id 匹配 radar_weather.alias
|
|
||||||
windSpeed, windDir, ok, err := loadWindAt(database.GetDB(), s.ID, bucket)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("读取风失败 %s @%s: %v", s.ID, t.DT.Format(time.RFC3339), err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !ok { // 无风场:跳过该时次
|
|
||||||
skipNoWind++
|
|
||||||
if verbose {
|
|
||||||
log.Printf("跳过: %s 瓦片@%s(桶=%s)在 radar_weather(alias=%s) 无记录", s.ID, t.DT.In(loc).Format("2006-01-02 15:04:05"), bucket.Format("2006-01-02 15:04:05"), s.ID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解码 dBZ 网格
|
|
||||||
vals, xs, ys, err := decodeTile(t)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("解码瓦片失败 %s @%s: %v", s.ID, t.DT.Format(time.RFC3339), err)
|
|
||||||
skipDecode++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 统计
|
|
||||||
sec40Cnt, sec40Sum, sec30Cnt, sec30Sum,
|
|
||||||
cir40Cnt, cir40Sum, cir30Cnt, cir30Sum := computeStats(vals, xs, ys, s.Lat, s.Lon, windSpeed, windDir)
|
|
||||||
|
|
||||||
// 最近一条累计雨量
|
|
||||||
rainTotal, rainOK := loadNearestRain(database.GetDB(), s.ID, t.DT)
|
|
||||||
if verbose {
|
|
||||||
log.Printf("写出: %s dt=%s wind=%.3f m/s %.1f° 扇形(>=40:%d/%.1f >=30:%d/%.1f) 圆形(>=40:%d/%.1f >=30:%d/%.1f) rain_total=%v(%.3f)",
|
|
||||||
s.ID,
|
|
||||||
t.DT.In(loc).Format("2006-01-02 15:04:05"),
|
|
||||||
windSpeed, windDir,
|
|
||||||
sec40Cnt, sec40Sum, sec30Cnt, sec30Sum,
|
|
||||||
cir40Cnt, cir40Sum, cir30Cnt, cir30Sum,
|
|
||||||
rainOK, rainTotal,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
rec := []string{
|
|
||||||
s.ID,
|
|
||||||
s.Alias,
|
|
||||||
t.DT.In(loc).Format("2006-01-02 15:04:05"),
|
|
||||||
fmt.Sprintf("%.6f", s.Lat),
|
|
||||||
fmt.Sprintf("%.6f", s.Lon),
|
|
||||||
fmt.Sprintf("%.3f", windSpeed),
|
|
||||||
fmt.Sprintf("%.2f", windDir),
|
|
||||||
fmt.Sprintf("%d", sec40Cnt),
|
|
||||||
fmt.Sprintf("%.1f", sec40Sum),
|
|
||||||
fmt.Sprintf("%d", sec30Cnt),
|
|
||||||
fmt.Sprintf("%.1f", sec30Sum),
|
|
||||||
fmt.Sprintf("%d", cir40Cnt),
|
|
||||||
fmt.Sprintf("%.1f", cir40Sum),
|
|
||||||
fmt.Sprintf("%d", cir30Cnt),
|
|
||||||
fmt.Sprintf("%.1f", cir30Sum),
|
|
||||||
fmt.Sprintf("%.3f", rainTotal),
|
|
||||||
}
|
|
||||||
if err := w.Write(rec); err != nil {
|
|
||||||
log.Printf("写入CSV失败: %v", err)
|
|
||||||
}
|
|
||||||
totalRows++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
w.Flush()
|
|
||||||
if err := w.Error(); err != nil {
|
|
||||||
log.Fatalf("写入CSV失败: %v", err)
|
|
||||||
}
|
|
||||||
if verbose {
|
|
||||||
log.Printf("汇总: 站点数=%d 瓦片总数=%d 跳过(无z/y/x)=%d 跳过(无风)=%d 跳过(解码失败)=%d", len(stations), totalTiles, skipNoZYX, skipNoWind, skipDecode)
|
|
||||||
}
|
|
||||||
log.Printf("完成,输出 %d 行到 %s", totalRows, outPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
func listStations(db *sql.DB, stationID string) ([]stationInfo, error) {
|
|
||||||
// 与前端一致:device_type='WH65LP' 且 lat/lon 非空且非零
|
|
||||||
if strings.TrimSpace(stationID) != "" {
|
|
||||||
const q = `
|
|
||||||
SELECT station_id,
|
|
||||||
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
|
|
||||||
latitude, longitude,
|
|
||||||
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
|
|
||||||
FROM stations
|
|
||||||
WHERE device_type='WH65LP' AND station_id=$1
|
|
||||||
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
||||||
AND latitude<>0 AND longitude<>0`
|
|
||||||
var s stationInfo
|
|
||||||
err := db.QueryRow(q, stationID).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return []stationInfo{s}, nil
|
|
||||||
}
|
|
||||||
const qAll = `
|
|
||||||
SELECT station_id,
|
|
||||||
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
|
|
||||||
latitude, longitude,
|
|
||||||
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
|
|
||||||
FROM stations
|
|
||||||
WHERE device_type='WH65LP'
|
|
||||||
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
||||||
AND latitude<>0 AND longitude<>0
|
|
||||||
ORDER BY station_id`
|
|
||||||
rows, err := db.Query(qAll)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
var out []stationInfo
|
|
||||||
for rows.Next() {
|
|
||||||
var s stationInfo
|
|
||||||
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err == nil {
|
|
||||||
out = append(out, s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func listTiles(ctx context.Context, db *sql.DB, z, y, x int, from, to time.Time) ([]tileRec, error) {
|
|
||||||
const q = `
|
|
||||||
SELECT dt, width, height, west, south, east, north, res_deg, data
|
|
||||||
FROM radar_tiles
|
|
||||||
WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5
|
|
||||||
ORDER BY dt ASC`
|
|
||||||
rows, err := db.QueryContext(ctx, q, z, y, x, from, to)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
var out []tileRec
|
|
||||||
for rows.Next() {
|
|
||||||
var r tileRec
|
|
||||||
if err := rows.Scan(&r.DT, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err == nil {
|
|
||||||
out = append(out, r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func bucket10(t time.Time, loc *time.Location) time.Time {
|
|
||||||
tt := t.In(loc)
|
|
||||||
m := (tt.Minute() / 10) * 10
|
|
||||||
return time.Date(tt.Year(), tt.Month(), tt.Day(), tt.Hour(), m, 0, 0, loc)
|
|
||||||
}
|
|
||||||
|
|
||||||
// loadWindAt 以别名(alias)精确匹配 radar_weather;本导出按需传入 station_id 作为 alias 参数
|
|
||||||
func loadWindAt(db *sql.DB, alias string, dt time.Time) (speedMS float64, dirDeg float64, ok bool, err error) {
|
|
||||||
const q = `
|
|
||||||
SELECT wind_speed, wind_direction
|
|
||||||
FROM radar_weather
|
|
||||||
WHERE alias=$1 AND dt=$2
|
|
||||||
LIMIT 1`
|
|
||||||
var s, d sql.NullFloat64
|
|
||||||
err = db.QueryRow(q, alias, dt).Scan(&s, &d)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return 0, 0, false, nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, false, err
|
|
||||||
}
|
|
||||||
if !s.Valid || !d.Valid {
|
|
||||||
return 0, 0, false, nil
|
|
||||||
}
|
|
||||||
return s.Float64, d.Float64, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadNearestRain(db *sql.DB, stationID string, dt time.Time) (rainTotal float64, ok bool) {
|
|
||||||
// 取最近一条累计雨量(单位mm)。如不存在返回0,false
|
|
||||||
const q = `
|
|
||||||
SELECT rainfall
|
|
||||||
FROM rs485_weather_data
|
|
||||||
WHERE station_id=$1
|
|
||||||
ORDER BY ABS(EXTRACT(EPOCH FROM (timestamp - $2))) ASC
|
|
||||||
LIMIT 1`
|
|
||||||
var r sql.NullFloat64
|
|
||||||
if err := db.QueryRow(q, stationID, dt).Scan(&r); err != nil {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
if !r.Valid {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
return r.Float64, true
|
|
||||||
}
|
|
||||||
|
|
||||||
func decodeTile(t tileRec) (vals [][]*float64, xs []float64, ys []float64, err error) {
|
|
||||||
w, h := t.Width, t.Height
|
|
||||||
if w <= 0 || h <= 0 {
|
|
||||||
return nil, nil, nil, fmt.Errorf("非法尺寸")
|
|
||||||
}
|
|
||||||
if len(t.Data) < w*h*2 {
|
|
||||||
return nil, nil, nil, fmt.Errorf("数据长度不足")
|
|
||||||
}
|
|
||||||
xs = make([]float64, w)
|
|
||||||
for c := 0; c < w; c++ {
|
|
||||||
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
|
|
||||||
}
|
|
||||||
ys = make([]float64, h)
|
|
||||||
for r := 0; r < h; r++ {
|
|
||||||
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
|
|
||||||
}
|
|
||||||
vals = make([][]*float64, h)
|
|
||||||
off := 0
|
|
||||||
for r := 0; r < h; r++ {
|
|
||||||
row := make([]*float64, w)
|
|
||||||
for c := 0; c < w; c++ {
|
|
||||||
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
|
|
||||||
off += 2
|
|
||||||
if v >= 32766 {
|
|
||||||
row[c] = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dbz := float64(v) / 10.0
|
|
||||||
if dbz < 0 {
|
|
||||||
dbz = 0
|
|
||||||
} else if dbz > 75 {
|
|
||||||
dbz = 75
|
|
||||||
}
|
|
||||||
vv := dbz
|
|
||||||
row[c] = &vv
|
|
||||||
}
|
|
||||||
vals[r] = row
|
|
||||||
}
|
|
||||||
return vals, xs, ys, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func computeStats(vals [][]*float64, xs, ys []float64, stLat, stLon, windMS, windFromDeg float64) (
|
|
||||||
sec40Cnt int, sec40Sum float64, sec30Cnt int, sec30Sum float64,
|
|
||||||
cir40Cnt int, cir40Sum float64, cir30Cnt int, cir30Sum float64,
|
|
||||||
) {
|
|
||||||
h := len(vals)
|
|
||||||
if h == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
w := len(vals[0])
|
|
||||||
// 半径(米)与半角
|
|
||||||
halfAngle := 30.0
|
|
||||||
rangeM := windMS * 3 * 3600
|
|
||||||
circleR := 8000.0
|
|
||||||
|
|
||||||
for r := 0; r < h; r++ {
|
|
||||||
lat := ys[r]
|
|
||||||
row := vals[r]
|
|
||||||
for c := 0; c < w; c++ {
|
|
||||||
if row[c] == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dbz := *row[c]
|
|
||||||
lon := xs[c]
|
|
||||||
dist := haversine(stLat, stLon, lat, lon)
|
|
||||||
|
|
||||||
// 8km 圆
|
|
||||||
if dist <= circleR {
|
|
||||||
if dbz >= 40 {
|
|
||||||
cir40Cnt++
|
|
||||||
cir40Sum += dbz
|
|
||||||
}
|
|
||||||
if dbz >= 30 {
|
|
||||||
cir30Cnt++
|
|
||||||
cir30Sum += dbz
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 扇形(需同时满足距离与角度)
|
|
||||||
if dist <= rangeM {
|
|
||||||
brg := bearingDeg(stLat, stLon, lat, lon)
|
|
||||||
if angDiff(brg, windFromDeg) <= halfAngle {
|
|
||||||
if dbz >= 40 {
|
|
||||||
sec40Cnt++
|
|
||||||
sec40Sum += dbz
|
|
||||||
}
|
|
||||||
if dbz >= 30 {
|
|
||||||
sec30Cnt++
|
|
||||||
sec30Sum += dbz
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func toRad(d float64) float64 { return d * math.Pi / 180 }
|
|
||||||
func toDeg(r float64) float64 { return r * 180 / math.Pi }
|
|
||||||
|
|
||||||
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
|
|
||||||
const R = 6371000.0
|
|
||||||
dLat := toRad(lat2 - lat1)
|
|
||||||
dLon := toRad(lon2 - lon1)
|
|
||||||
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
|
|
||||||
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
|
|
||||||
return R * c
|
|
||||||
}
|
|
||||||
|
|
||||||
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
|
|
||||||
φ1 := toRad(lat1)
|
|
||||||
φ2 := toRad(lat2)
|
|
||||||
Δλ := toRad(lon2 - lon1)
|
|
||||||
y := math.Sin(Δλ) * math.Cos(φ2)
|
|
||||||
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
|
|
||||||
brg := toDeg(math.Atan2(y, x))
|
|
||||||
if brg < 0 {
|
|
||||||
brg += 360
|
|
||||||
}
|
|
||||||
return brg
|
|
||||||
}
|
|
||||||
|
|
||||||
func angDiff(a, b float64) float64 {
|
|
||||||
d := math.Mod(a-b+540, 360) - 180
|
|
||||||
if d < 0 {
|
|
||||||
d = -d
|
|
||||||
}
|
|
||||||
return math.Abs(d)
|
|
||||||
}
|
|
||||||
@ -1,553 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"encoding/binary"
|
|
||||||
"encoding/csv"
|
|
||||||
"errors"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"weatherstation/internal/database"
|
|
||||||
)
|
|
||||||
|
|
||||||
type stationInfo struct {
|
|
||||||
ID string
|
|
||||||
Alias string
|
|
||||||
Lat float64
|
|
||||||
Lon float64
|
|
||||||
Z int
|
|
||||||
Y int
|
|
||||||
X int
|
|
||||||
}
|
|
||||||
|
|
||||||
type tileRec struct {
|
|
||||||
DT time.Time
|
|
||||||
Width, Height int
|
|
||||||
West, South float64
|
|
||||||
East, North float64
|
|
||||||
ResDeg float64
|
|
||||||
Data []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var stationID string
|
|
||||||
var startStr string
|
|
||||||
var endStr string
|
|
||||||
var outPath string
|
|
||||||
var verbose bool
|
|
||||||
var useImdroid bool
|
|
||||||
|
|
||||||
flag.StringVar(&stationID, "station_id", "", "站点ID(留空表示全部符合条件的站)")
|
|
||||||
flag.StringVar(&startStr, "start", "", "起始时间(YYYY-MM-DD HH:MM:SS,CST,表示区间左端点)")
|
|
||||||
flag.StringVar(&endStr, "end", "", "结束时间(YYYY-MM-DD HH:MM:SS,CST,表示区间左端点,非包含)")
|
|
||||||
flag.StringVar(&outPath, "out", "radar_hourly_stats.csv", "输出CSV文件路径")
|
|
||||||
flag.BoolVar(&verbose, "info", false, "输出详细过程信息")
|
|
||||||
flag.BoolVar(&useImdroid, "use_imdroid", false, "输出 imdroid 预报(右端点)")
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
if strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" {
|
|
||||||
log.Fatalln("必须提供 --start 与 --end,格式 YYYY-MM-DD HH:MM:SS")
|
|
||||||
}
|
|
||||||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
|
||||||
if loc == nil {
|
|
||||||
loc = time.FixedZone("CST", 8*3600)
|
|
||||||
}
|
|
||||||
startT, err := time.ParseInLocation("2006-01-02 15:04:05", startStr, loc)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("解析 start 失败: %v", err)
|
|
||||||
}
|
|
||||||
endT, err := time.ParseInLocation("2006-01-02 15:04:05", endStr, loc)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("解析 end 失败: %v", err)
|
|
||||||
}
|
|
||||||
if !endT.After(startT) {
|
|
||||||
log.Fatalln("结束时间必须大于起始时间")
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = database.GetDB()
|
|
||||||
defer database.Close()
|
|
||||||
|
|
||||||
stations, err := listStations(database.GetDB(), stationID)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("查询站点失败: %v", err)
|
|
||||||
}
|
|
||||||
if len(stations) == 0 {
|
|
||||||
log.Fatalln("没有符合条件的站点")
|
|
||||||
}
|
|
||||||
if verbose {
|
|
||||||
log.Printf("站点数量: %d", len(stations))
|
|
||||||
for _, s := range stations {
|
|
||||||
log.Printf("站点: id=%s alias=%s lat=%.5f lon=%.5f z/y/x=%d/%d/%d", s.ID, s.Alias, s.Lat, s.Lon, s.Z, s.Y, s.X)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := os.Create(outPath)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("创建输出文件失败: %v", err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
w := csv.NewWriter(f)
|
|
||||||
defer w.Flush()
|
|
||||||
|
|
||||||
header := []string{
|
|
||||||
"station_id",
|
|
||||||
"station_alias",
|
|
||||||
"hour_end",
|
|
||||||
"rain_actual_mm",
|
|
||||||
"wind_speed_ms",
|
|
||||||
"wind_dir_deg",
|
|
||||||
"openmeteo_rain_mm",
|
|
||||||
"openmeteo_issued",
|
|
||||||
"caiyun_rain_mm",
|
|
||||||
"caiyun_issued",
|
|
||||||
}
|
|
||||||
if useImdroid {
|
|
||||||
header = append(header, "imdroid_rain_mm", "imdroid_issued")
|
|
||||||
}
|
|
||||||
header = append(header, "radar_circle_max_dbz", "radar_sector_max_dbz")
|
|
||||||
if err := w.Write(header); err != nil {
|
|
||||||
log.Fatalf("写入CSV表头失败: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
totalRows := 0
|
|
||||||
hours := buildHourSlots(startT, endT)
|
|
||||||
|
|
||||||
for _, s := range stations {
|
|
||||||
if verbose {
|
|
||||||
log.Printf("处理站点 %s,共 %d 个小时区间", s.ID, len(hours))
|
|
||||||
}
|
|
||||||
for _, slot := range hours {
|
|
||||||
actual, windSpeed, windDir, hasObs, err := aggregateHourlyObs(ctx, database.GetDB(), s.ID, slot.from, slot.to)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("站点 %s 聚合观测失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
openRain, openIssued, hasOpen, err := findLatestForecast(ctx, database.GetDB(), s.ID, "open-meteo", slot.to)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("站点 %s 读取 open-meteo 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err)
|
|
||||||
}
|
|
||||||
caiyunRain, caiyunIssued, hasCaiyun, err := findLatestForecast(ctx, database.GetDB(), s.ID, "caiyun", slot.to)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("站点 %s 读取 caiyun 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
imdroidRain float64
|
|
||||||
imdroidIssued time.Time
|
|
||||||
hasImdroid bool
|
|
||||||
)
|
|
||||||
if useImdroid {
|
|
||||||
var errImdroid error
|
|
||||||
imdroidRain, imdroidIssued, hasImdroid, errImdroid = findLatestForecast(ctx, database.GetDB(), s.ID, "imdroid", slot.to)
|
|
||||||
if errImdroid != nil {
|
|
||||||
log.Printf("站点 %s 读取 imdroid 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), errImdroid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
circleMax, sectorMax, hasRadar, err := hourlyRadarMax(ctx, database.GetDB(), s, slot.from, slot.to, loc, verbose)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("站点 %s 统计雷达失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rec := []string{
|
|
||||||
s.ID,
|
|
||||||
s.Alias,
|
|
||||||
slot.to.Format("2006-01-02 15:04:05"),
|
|
||||||
formatFloat(actual, hasObs, 3),
|
|
||||||
formatFloat(windSpeed, hasObs && !math.IsNaN(windSpeed), 3),
|
|
||||||
formatFloat(windDir, hasObs && !math.IsNaN(windDir), 1),
|
|
||||||
formatFloat(openRain, hasOpen, 3),
|
|
||||||
formatTime(openIssued, hasOpen),
|
|
||||||
formatFloat(caiyunRain, hasCaiyun, 3),
|
|
||||||
formatTime(caiyunIssued, hasCaiyun),
|
|
||||||
}
|
|
||||||
if useImdroid {
|
|
||||||
rec = append(rec,
|
|
||||||
formatFloat(imdroidRain, hasImdroid, 3),
|
|
||||||
formatTime(imdroidIssued, hasImdroid),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
rec = append(rec,
|
|
||||||
formatFloat(circleMax, hasRadar && !math.IsNaN(circleMax), 1),
|
|
||||||
formatFloat(sectorMax, hasRadar && !math.IsNaN(sectorMax), 1),
|
|
||||||
)
|
|
||||||
if err := w.Write(rec); err != nil {
|
|
||||||
log.Printf("写入CSV失败: %v", err)
|
|
||||||
} else {
|
|
||||||
totalRows++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Flush()
|
|
||||||
if err := w.Error(); err != nil {
|
|
||||||
log.Fatalf("写入CSV失败: %v", err)
|
|
||||||
}
|
|
||||||
log.Printf("完成,输出 %d 行到 %s", totalRows, outPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
type hourSlot struct {
|
|
||||||
from time.Time
|
|
||||||
to time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func buildHourSlots(from, to time.Time) []hourSlot {
|
|
||||||
var slots []hourSlot
|
|
||||||
cursor := from
|
|
||||||
for cursor.Before(to) {
|
|
||||||
end := cursor.Add(time.Hour)
|
|
||||||
if end.After(to) {
|
|
||||||
end = to
|
|
||||||
}
|
|
||||||
slots = append(slots, hourSlot{from: cursor, to: end})
|
|
||||||
cursor = end
|
|
||||||
}
|
|
||||||
return slots
|
|
||||||
}
|
|
||||||
|
|
||||||
func listStations(db *sql.DB, stationID string) ([]stationInfo, error) {
|
|
||||||
if strings.TrimSpace(stationID) != "" {
|
|
||||||
const q = `
|
|
||||||
SELECT station_id,
|
|
||||||
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
|
|
||||||
latitude, longitude,
|
|
||||||
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
|
|
||||||
FROM stations
|
|
||||||
WHERE station_id=$1
|
|
||||||
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
||||||
AND latitude<>0 AND longitude<>0
|
|
||||||
AND COALESCE(z,0)=7 AND COALESCE(y,0)=40 AND COALESCE(x,0)=102`
|
|
||||||
var s stationInfo
|
|
||||||
err := db.QueryRow(q, stationID).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return []stationInfo{s}, nil
|
|
||||||
}
|
|
||||||
const qAll = `
|
|
||||||
SELECT station_id,
|
|
||||||
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
|
|
||||||
latitude, longitude,
|
|
||||||
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
|
|
||||||
FROM stations
|
|
||||||
WHERE device_type='WH65LP'
|
|
||||||
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
||||||
AND latitude<>0 AND longitude<>0
|
|
||||||
AND COALESCE(z,0)=7 AND COALESCE(y,0)=40 AND COALESCE(x,0)=102
|
|
||||||
ORDER BY station_id`
|
|
||||||
rows, err := db.Query(qAll)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
var out []stationInfo
|
|
||||||
for rows.Next() {
|
|
||||||
var s stationInfo
|
|
||||||
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err == nil {
|
|
||||||
out = append(out, s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func aggregateHourlyObs(ctx context.Context, db *sql.DB, stationID string, from, to time.Time) (rain float64, windSpeed float64, windDir float64, ok bool, err error) {
|
|
||||||
const q = `
|
|
||||||
SELECT wind_speed_ms_x1000, wind_dir_deg, rain_10m_mm_x1000
|
|
||||||
FROM rs485_weather_10min
|
|
||||||
WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
|
|
||||||
rows, err := db.QueryContext(ctx, q, stationID, from, to)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, 0, false, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
var totalRain int64
|
|
||||||
var count int
|
|
||||||
var sumX, sumY float64
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
var ws sql.NullInt64
|
|
||||||
var wd sql.NullInt64
|
|
||||||
var rainX sql.NullInt64
|
|
||||||
if err := rows.Scan(&ws, &wd, &rainX); err != nil {
|
|
||||||
return 0, 0, 0, false, err
|
|
||||||
}
|
|
||||||
if rainX.Valid {
|
|
||||||
totalRain += rainX.Int64
|
|
||||||
}
|
|
||||||
if ws.Valid && wd.Valid {
|
|
||||||
speed := float64(ws.Int64) / 1000.0
|
|
||||||
dir := float64(wd.Int64)
|
|
||||||
rad := toRad(dir)
|
|
||||||
sumX += speed * math.Cos(rad)
|
|
||||||
sumY += speed * math.Sin(rad)
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := rows.Err(); err != nil {
|
|
||||||
return 0, 0, 0, false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
rain = float64(totalRain) / 1000.0
|
|
||||||
windSpeed = math.NaN()
|
|
||||||
windDir = math.NaN()
|
|
||||||
if count > 0 {
|
|
||||||
avgX := sumX / float64(count)
|
|
||||||
avgY := sumY / float64(count)
|
|
||||||
windSpeed = math.Hypot(avgX, avgY)
|
|
||||||
if windSpeed == 0 {
|
|
||||||
windDir = 0
|
|
||||||
} else {
|
|
||||||
dir := toDeg(math.Atan2(avgY, avgX))
|
|
||||||
if dir < 0 {
|
|
||||||
dir += 360
|
|
||||||
}
|
|
||||||
windDir = dir
|
|
||||||
}
|
|
||||||
ok = true
|
|
||||||
}
|
|
||||||
return rain, windSpeed, windDir, totalRain > 0 || count > 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func findLatestForecast(ctx context.Context, db *sql.DB, stationID, provider string, forecastTime time.Time) (rain float64, issued time.Time, ok bool, err error) {
|
|
||||||
const q = `
|
|
||||||
SELECT issued_at, rain_mm_x1000
|
|
||||||
FROM forecast_hourly
|
|
||||||
WHERE station_id=$1 AND provider=$2 AND forecast_time=$3
|
|
||||||
ORDER BY issued_at DESC
|
|
||||||
LIMIT 1`
|
|
||||||
var issuedAt time.Time
|
|
||||||
var rainX sql.NullInt64
|
|
||||||
err = db.QueryRowContext(ctx, q, stationID, provider, forecastTime).Scan(&issuedAt, &rainX)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return 0, time.Time{}, false, nil
|
|
||||||
}
|
|
||||||
return 0, time.Time{}, false, err
|
|
||||||
}
|
|
||||||
if !rainX.Valid {
|
|
||||||
return 0, issuedAt, true, nil
|
|
||||||
}
|
|
||||||
return float64(rainX.Int64) / 1000.0, issuedAt, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func hourlyRadarMax(ctx context.Context, db *sql.DB, s stationInfo, from, to time.Time, loc *time.Location, verbose bool) (circleMax float64, sectorMax float64, ok bool, err error) {
|
|
||||||
tiles, err := listTiles(ctx, db, s.Z, s.Y, s.X, from, to)
|
|
||||||
if err != nil {
|
|
||||||
return math.NaN(), math.NaN(), false, err
|
|
||||||
}
|
|
||||||
if len(tiles) == 0 {
|
|
||||||
return math.NaN(), math.NaN(), false, nil
|
|
||||||
}
|
|
||||||
circleMax = math.NaN()
|
|
||||||
sectorMax = math.NaN()
|
|
||||||
alias := strings.TrimSpace(s.Alias)
|
|
||||||
if alias == "" {
|
|
||||||
alias = s.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, t := range tiles {
|
|
||||||
bucket := bucket10(t.DT, loc)
|
|
||||||
windSpeed, windDir, windOK, err := loadWindAt(db, s.ID, alias, bucket)
|
|
||||||
if err != nil {
|
|
||||||
if verbose {
|
|
||||||
log.Printf("站点 %s 瓦片@%s 读取风失败: %v", s.ID, t.DT.Format(time.RFC3339), err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
vals, xs, ys, err := decodeTile(t)
|
|
||||||
if err != nil {
|
|
||||||
if verbose {
|
|
||||||
log.Printf("站点 %s 解码瓦片失败: %v", s.ID, err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for r := 0; r < len(vals); r++ {
|
|
||||||
row := vals[r]
|
|
||||||
lat := ys[r]
|
|
||||||
for c := 0; c < len(row); c++ {
|
|
||||||
v := row[c]
|
|
||||||
if v == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dbz := *v
|
|
||||||
lon := xs[c]
|
|
||||||
dist := haversine(s.Lat, s.Lon, lat, lon)
|
|
||||||
|
|
||||||
if dist <= 8000.0 {
|
|
||||||
if math.IsNaN(circleMax) || dbz > circleMax {
|
|
||||||
circleMax = dbz
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if windOK && windSpeed > 0 {
|
|
||||||
brg := bearingDeg(s.Lat, s.Lon, lat, lon)
|
|
||||||
if angDiff(brg, windDir) <= 30.0 && dist <= windSpeed*3*3600 {
|
|
||||||
if math.IsNaN(sectorMax) || dbz > sectorMax {
|
|
||||||
sectorMax = dbz
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return circleMax, sectorMax, !math.IsNaN(circleMax) || !math.IsNaN(sectorMax), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func listTiles(ctx context.Context, db *sql.DB, z, y, x int, from, to time.Time) ([]tileRec, error) {
|
|
||||||
const q = `
|
|
||||||
SELECT dt, width, height, west, south, east, north, res_deg, data
|
|
||||||
FROM radar_tiles
|
|
||||||
WHERE z=$1 AND y=$2 AND x=$3 AND dt >= $4 AND dt < $5
|
|
||||||
ORDER BY dt ASC`
|
|
||||||
rows, err := db.QueryContext(ctx, q, z, y, x, from, to)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
var out []tileRec
|
|
||||||
for rows.Next() {
|
|
||||||
var r tileRec
|
|
||||||
if err := rows.Scan(&r.DT, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err == nil {
|
|
||||||
out = append(out, r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func bucket10(t time.Time, loc *time.Location) time.Time {
|
|
||||||
tt := t.In(loc)
|
|
||||||
m := (tt.Minute() / 10) * 10
|
|
||||||
return time.Date(tt.Year(), tt.Month(), tt.Day(), tt.Hour(), m, 0, 0, loc)
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadWindAt(db *sql.DB, stationID, alias string, dt time.Time) (speedMS float64, dirDeg float64, ok bool, err error) {
|
|
||||||
const q = `
|
|
||||||
SELECT wind_speed, wind_direction
|
|
||||||
FROM radar_weather
|
|
||||||
WHERE alias=$1 AND dt=$2
|
|
||||||
LIMIT 1`
|
|
||||||
tryAlias := func(a string) (float64, float64, bool, error) {
|
|
||||||
var s, d sql.NullFloat64
|
|
||||||
err := db.QueryRow(q, a, dt).Scan(&s, &d)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return 0, 0, false, nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, false, err
|
|
||||||
}
|
|
||||||
if !s.Valid || !d.Valid {
|
|
||||||
return 0, 0, false, nil
|
|
||||||
}
|
|
||||||
return s.Float64, d.Float64, true, nil
|
|
||||||
}
|
|
||||||
if speed, dir, ok, err := tryAlias(stationID); err != nil {
|
|
||||||
return 0, 0, false, err
|
|
||||||
} else if ok {
|
|
||||||
return speed, dir, true, nil
|
|
||||||
}
|
|
||||||
return tryAlias(alias)
|
|
||||||
}
|
|
||||||
|
|
||||||
func decodeTile(t tileRec) (vals [][]*float64, xs []float64, ys []float64, err error) {
|
|
||||||
w, h := t.Width, t.Height
|
|
||||||
if w <= 0 || h <= 0 {
|
|
||||||
return nil, nil, nil, fmt.Errorf("非法尺寸")
|
|
||||||
}
|
|
||||||
if len(t.Data) < w*h*2 {
|
|
||||||
return nil, nil, nil, fmt.Errorf("数据长度不足")
|
|
||||||
}
|
|
||||||
xs = make([]float64, w)
|
|
||||||
for c := 0; c < w; c++ {
|
|
||||||
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
|
|
||||||
}
|
|
||||||
ys = make([]float64, h)
|
|
||||||
for r := 0; r < h; r++ {
|
|
||||||
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
|
|
||||||
}
|
|
||||||
vals = make([][]*float64, h)
|
|
||||||
off := 0
|
|
||||||
for r := 0; r < h; r++ {
|
|
||||||
row := make([]*float64, w)
|
|
||||||
for c := 0; c < w; c++ {
|
|
||||||
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
|
|
||||||
off += 2
|
|
||||||
if v >= 32766 {
|
|
||||||
row[c] = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dbz := float64(v) / 10.0
|
|
||||||
if dbz < 0 {
|
|
||||||
dbz = 0
|
|
||||||
} else if dbz > 75 {
|
|
||||||
dbz = 75
|
|
||||||
}
|
|
||||||
value := dbz
|
|
||||||
row[c] = &value
|
|
||||||
}
|
|
||||||
vals[r] = row
|
|
||||||
}
|
|
||||||
return vals, xs, ys, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
|
|
||||||
const R = 6371000.0
|
|
||||||
dLat := toRad(lat2 - lat1)
|
|
||||||
dLon := toRad(lon2 - lon1)
|
|
||||||
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
|
|
||||||
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
|
|
||||||
return R * c
|
|
||||||
}
|
|
||||||
|
|
||||||
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
|
|
||||||
φ1 := toRad(lat1)
|
|
||||||
φ2 := toRad(lat2)
|
|
||||||
Δλ := toRad(lon2 - lon1)
|
|
||||||
y := math.Sin(Δλ) * math.Cos(φ2)
|
|
||||||
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
|
|
||||||
brg := toDeg(math.Atan2(y, x))
|
|
||||||
if brg < 0 {
|
|
||||||
brg += 360
|
|
||||||
}
|
|
||||||
return brg
|
|
||||||
}
|
|
||||||
|
|
||||||
func angDiff(a, b float64) float64 {
|
|
||||||
d := math.Mod(a-b+540, 360) - 180
|
|
||||||
if d < 0 {
|
|
||||||
d = -d
|
|
||||||
}
|
|
||||||
return math.Abs(d)
|
|
||||||
}
|
|
||||||
|
|
||||||
func toRad(d float64) float64 { return d * math.Pi / 180 }
|
|
||||||
func toDeg(r float64) float64 { return r * 180 / math.Pi }
|
|
||||||
|
|
||||||
func formatFloat(v float64, ok bool, digits int) string {
|
|
||||||
if !ok || math.IsNaN(v) {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
format := fmt.Sprintf("%%.%df", digits)
|
|
||||||
return fmt.Sprintf(format, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func formatTime(t time.Time, ok bool) string {
|
|
||||||
if !ok || t.IsZero() {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return t.Format("2006-01-02 15:04:05")
|
|
||||||
}
|
|
||||||
@ -1,227 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
dbpkg "weatherstation/internal/database"
|
|
||||||
"weatherstation/internal/rain"
|
|
||||||
)
|
|
||||||
|
|
||||||
// 简单的小时雨量(CMPA)按时间范围下载器:
|
|
||||||
// - 输入时间为北京时间(Asia/Shanghai)
|
|
||||||
// - 构造下载路径使用 UTC(本地整点 -8h)
|
|
||||||
// - 入库前通过 rain.StoreTileBytes 使用 URL 解析将 UTC 还原为北京时间并写库
|
|
||||||
// 用法示例:
|
|
||||||
//
|
|
||||||
// go run ./cmd/rainfetch --from "2025-10-07 09:00:00" --to "2025-10-07 11:00:00" \
|
|
||||||
// --tiles "7/40/102,7/40/104" --outdir rain_data
|
|
||||||
func main() {
|
|
||||||
var (
|
|
||||||
fromStr = flag.String("from", "", "起始时间(北京时间,YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD)")
|
|
||||||
toStr = flag.String("to", "", "结束时间(北京时间,YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD)")
|
|
||||||
tiles = flag.String("tiles", "7/40/102,7/40/104", "瓦片列表,逗号分隔,每项为 z/y/x,如 7/40/102")
|
|
||||||
outDir = flag.String("outdir", "rain_data", "保存目录(同时也会写入数据库)")
|
|
||||||
baseURL = flag.String("base", "https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%Y%m%d/%H/%M/{z}/{y}/{x}.bin", "下载基础URL模板(UTC路径时间)")
|
|
||||||
dryRun = flag.Bool("dry", false, "仅打印将要下载的URL与目标,不实际下载写库")
|
|
||||||
)
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
if strings.TrimSpace(*fromStr) == "" || strings.TrimSpace(*toStr) == "" {
|
|
||||||
log.Fatalln("必须提供 --from 与 --to(北京时间)")
|
|
||||||
}
|
|
||||||
|
|
||||||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
|
||||||
if loc == nil {
|
|
||||||
loc = time.FixedZone("CST", 8*3600)
|
|
||||||
}
|
|
||||||
|
|
||||||
parseCST := func(s string) (time.Time, error) {
|
|
||||||
s = strings.TrimSpace(s)
|
|
||||||
var t time.Time
|
|
||||||
var err error
|
|
||||||
if len(s) == len("2006-01-02") {
|
|
||||||
// 日期:按 00:00:00 处理
|
|
||||||
if tm, e := time.ParseInLocation("2006-01-02", s, loc); e == nil {
|
|
||||||
t = tm
|
|
||||||
} else {
|
|
||||||
err = e
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
t, err = time.ParseInLocation("2006-01-02 15:04:05", s, loc)
|
|
||||||
}
|
|
||||||
return t, err
|
|
||||||
}
|
|
||||||
|
|
||||||
start, err1 := parseCST(*fromStr)
|
|
||||||
end, err2 := parseCST(*toStr)
|
|
||||||
if err1 != nil || err2 != nil {
|
|
||||||
log.Fatalf("解析时间失败: from=%v to=%v", err1, err2)
|
|
||||||
}
|
|
||||||
if end.Before(start) {
|
|
||||||
log.Fatalln("结束时间需不小于起始时间")
|
|
||||||
}
|
|
||||||
|
|
||||||
// 小时步进(包含端点):先对齐到小时
|
|
||||||
cur := start.Truncate(time.Hour)
|
|
||||||
end = end.Truncate(time.Hour)
|
|
||||||
|
|
||||||
// 解析 tiles 参数
|
|
||||||
type tcoord struct{ z, y, x int }
|
|
||||||
var tlist []tcoord
|
|
||||||
for _, part := range strings.Split(*tiles, ",") {
|
|
||||||
p := strings.TrimSpace(part)
|
|
||||||
if p == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var z, y, x int
|
|
||||||
if _, err := fmt.Sscanf(p, "%d/%d/%d", &z, &y, &x); err != nil {
|
|
||||||
log.Fatalf("无效的 tiles 项: %s", p)
|
|
||||||
}
|
|
||||||
tlist = append(tlist, tcoord{z, y, x})
|
|
||||||
}
|
|
||||||
if len(tlist) == 0 {
|
|
||||||
log.Fatalln("tiles 解析后为空")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.MkdirAll(*outDir, 0o755); err != nil {
|
|
||||||
log.Fatalf("创建输出目录失败: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
total := 0
|
|
||||||
success := 0
|
|
||||||
for !cur.After(end) {
|
|
||||||
// 本地整点(CST)→ UTC 路径时间
|
|
||||||
slotLocal := cur
|
|
||||||
slotUTC := slotLocal.Add(-8 * time.Hour).In(time.UTC)
|
|
||||||
dateStr := slotUTC.Format("20060102")
|
|
||||||
hh := slotUTC.Format("15")
|
|
||||||
mm := "00"
|
|
||||||
log.Printf("[rainfetch] 时次 local=%s, utc=%s", slotLocal.Format("2006-01-02 15:04"), slotUTC.Format("2006-01-02 15:04"))
|
|
||||||
|
|
||||||
for _, tc := range tlist {
|
|
||||||
total++
|
|
||||||
// 构造 URL
|
|
||||||
url := *baseURL
|
|
||||||
url = strings.ReplaceAll(url, "%Y%m%d", dateStr)
|
|
||||||
url = strings.ReplaceAll(url, "%H", hh)
|
|
||||||
url = strings.ReplaceAll(url, "%M", mm)
|
|
||||||
url = strings.ReplaceAll(url, "{z}", fmt.Sprintf("%d", tc.z))
|
|
||||||
url = strings.ReplaceAll(url, "{y}", fmt.Sprintf("%d", tc.y))
|
|
||||||
url = strings.ReplaceAll(url, "{x}", fmt.Sprintf("%d", tc.x))
|
|
||||||
|
|
||||||
fname := fmt.Sprintf("rain_z%d_y%d_x%d_%s.bin", tc.z, tc.y, tc.x, slotLocal.Format("20060102_1504"))
|
|
||||||
dest := filepath.Join(*outDir, fname)
|
|
||||||
|
|
||||||
if *dryRun {
|
|
||||||
log.Printf("[rainfetch] DRY url=%s -> %s", url, dest)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 若 DB 已有,则跳过
|
|
||||||
if ref, e := rain.ParseCMPATileURL(url); e == nil {
|
|
||||||
exists, e2 := databaseHas(ctx, ref.Product, ref.DT, tc.z, tc.y, tc.x)
|
|
||||||
if e2 == nil && exists {
|
|
||||||
log.Printf("[rainfetch] skip exists in DB z=%d y=%d x=%d dt=%s", tc.z, tc.y, tc.x, ref.DT.Format("2006-01-02 15:04"))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := httpDownloadTo(ctx, url, dest); err != nil {
|
|
||||||
log.Printf("[rainfetch] 下载失败 z=%d y=%d x=%d: %v", tc.z, tc.y, tc.x, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Printf("[rainfetch] 保存 %s", dest)
|
|
||||||
|
|
||||||
// 写库(使用 URL 解析 UTC → CST 后 upsert)
|
|
||||||
b, rerr := os.ReadFile(dest)
|
|
||||||
if rerr != nil {
|
|
||||||
log.Printf("[rainfetch] 读文件失败: %v", rerr)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := rain.StoreTileBytes(ctx, url, b); err != nil {
|
|
||||||
log.Printf("[rainfetch] 入库失败: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
success++
|
|
||||||
}
|
|
||||||
cur = cur.Add(1 * time.Hour)
|
|
||||||
}
|
|
||||||
log.Printf("[rainfetch] 完成:尝试 %d,成功 %d", total, success)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 轻量 DB 存在性检查(避免引入内部 database 包到该命令):
|
|
||||||
// 为了避免循环依赖,这里复制一份最小 SQL 调用;实际工程也可抽取共享函数。
|
|
||||||
// 但当前 repo 中 database.GetDB 在 internal/database 包内,雨量 API 直接使用它。
|
|
||||||
|
|
||||||
// 注意:为保持最小侵入,这里通过 rain.StoreTileBytes 完成入库;
|
|
||||||
// 仅在下载前进行“是否已存在”查询,避免重复下载。为此需要访问 internal/database。
|
|
||||||
|
|
||||||
func databaseHas(ctx context.Context, product string, dt time.Time, z, y, x int) (bool, error) {
|
|
||||||
const q = `SELECT 1 FROM rain_tiles WHERE product=$1 AND dt=$2 AND z=$3 AND y=$4 AND x=$5 LIMIT 1`
|
|
||||||
var one int
|
|
||||||
err := dbpkg.GetDB().QueryRowContext(ctx, q, product, dt, z, y, x).Scan(&one)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func httpDownloadTo(ctx context.Context, url, dest string) error {
|
|
||||||
client := &http.Client{Timeout: 20 * time.Second}
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("build request: %w", err)
|
|
||||||
}
|
|
||||||
req.Header.Set("Referer", "https://data.cma.cn/")
|
|
||||||
req.Header.Set("Origin", "https://data.cma.cn")
|
|
||||||
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
|
|
||||||
resp, err := client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("http get: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return fmt.Errorf("unexpected status: %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
tmp := dest + ".part"
|
|
||||||
f, err := os.Create(tmp)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("create temp: %w", err)
|
|
||||||
}
|
|
||||||
_, copyErr := io.Copy(f, resp.Body)
|
|
||||||
closeErr := f.Close()
|
|
||||||
if copyErr != nil {
|
|
||||||
_ = os.Remove(tmp)
|
|
||||||
return fmt.Errorf("write body: %w", copyErr)
|
|
||||||
}
|
|
||||||
if closeErr != nil {
|
|
||||||
_ = os.Remove(tmp)
|
|
||||||
return fmt.Errorf("close temp: %w", closeErr)
|
|
||||||
}
|
|
||||||
if err := os.Rename(tmp, dest); err != nil {
|
|
||||||
// Cross-device fallback
|
|
||||||
data, rerr := os.ReadFile(tmp)
|
|
||||||
if rerr != nil {
|
|
||||||
return fmt.Errorf("read temp: %w", rerr)
|
|
||||||
}
|
|
||||||
if werr := os.WriteFile(dest, data, 0o644); werr != nil {
|
|
||||||
return fmt.Errorf("write final: %w", werr)
|
|
||||||
}
|
|
||||||
_ = os.Remove(tmp)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@ -1,28 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
"weatherstation/internal/rain"
|
|
||||||
"weatherstation/internal/server"
|
|
||||||
)
|
|
||||||
|
|
||||||
// service-rain: standalone CMPA hourly rain tile downloader
|
|
||||||
// - Uses internal/rain scheduler with defaults
|
|
||||||
// - Controlled by env vars in internal/rain (e.g., RAIN_ENABLED, RAIN_DIR, RAIN_BASE_URL)
|
|
||||||
func main() {
|
|
||||||
server.SetupLogger()
|
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
|
||||||
defer stop()
|
|
||||||
|
|
||||||
if err := rain.Start(ctx, rain.Options{StoreToDB: true}); err != nil {
|
|
||||||
log.Fatalf("service-rain start error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
<-ctx.Done()
|
|
||||||
log.Println("service-rain shutting down")
|
|
||||||
}
|
|
||||||
@ -1,235 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"encoding/csv"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"weatherstation/core/internal/data"
|
|
||||||
)
|
|
||||||
|
|
||||||
type actualHour struct {
|
|
||||||
HourEnd time.Time
|
|
||||||
TempC float64
|
|
||||||
HumidityPct float64
|
|
||||||
PressureHpa float64
|
|
||||||
WindSpeedMs float64
|
|
||||||
WindDirDeg float64
|
|
||||||
RainActualMM float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var stationID, startStr, endStr, providersCSV, outPath, tzName string
|
|
||||||
flag.StringVar(&stationID, "station", "", "站点ID,如 RS485-002A6E")
|
|
||||||
flag.StringVar(&startStr, "start", "", "开始时间,格式 2006-01-02 15:00")
|
|
||||||
flag.StringVar(&endStr, "end", "", "结束时间,格式 2006-01-02 15:00(开区间)")
|
|
||||||
flag.StringVar(&providersCSV, "providers", "caiyun,ec,wrf", "逗号分隔的预报源,默认 caiyun,ec,wrf")
|
|
||||||
flag.StringVar(&outPath, "out", "", "输出 CSV 文件路径;留空输出到 stdout")
|
|
||||||
flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai")
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
if strings.TrimSpace(stationID) == "" || strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" {
|
|
||||||
log.Fatalf("用法: im_export_data --station RS485-XXXXXX --start '2024-08-01 00:00' --end '2024-08-02 00:00' [--providers caiyun,ec,wrf] [--out out.csv]")
|
|
||||||
}
|
|
||||||
|
|
||||||
loc, _ := time.LoadLocation(tzName)
|
|
||||||
if loc == nil {
|
|
||||||
loc = time.FixedZone("CST", 8*3600)
|
|
||||||
}
|
|
||||||
parse := func(s string) time.Time {
|
|
||||||
var t time.Time
|
|
||||||
var err error
|
|
||||||
for _, ly := range []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} {
|
|
||||||
t, err = time.ParseInLocation(ly, s, loc)
|
|
||||||
if err == nil {
|
|
||||||
return t.Truncate(time.Hour)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.Fatalf("无法解析时间: %s", s)
|
|
||||||
return time.Time{}
|
|
||||||
}
|
|
||||||
start := parse(startStr)
|
|
||||||
end := parse(endStr)
|
|
||||||
if !end.After(start) {
|
|
||||||
log.Fatalf("end 必须大于 start")
|
|
||||||
}
|
|
||||||
|
|
||||||
providers := splitCSV(providersCSV)
|
|
||||||
if len(providers) == 0 {
|
|
||||||
providers = []string{"caiyun"}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepare writer
|
|
||||||
var out *csv.Writer
|
|
||||||
var file *os.File
|
|
||||||
if strings.TrimSpace(outPath) != "" {
|
|
||||||
f, err := os.Create(outPath)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("打开输出文件失败: %v", err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
out = csv.NewWriter(f)
|
|
||||||
file = f
|
|
||||||
} else {
|
|
||||||
out = csv.NewWriter(os.Stdout)
|
|
||||||
}
|
|
||||||
defer out.Flush()
|
|
||||||
|
|
||||||
// Header
|
|
||||||
header := []string{"station_id", "hour_end", "temp_c", "humidity_pct", "wind_dir_deg", "wind_speed_ms", "pressure_hpa", "rain_actual_mm"}
|
|
||||||
for _, p := range providers {
|
|
||||||
header = append(header, fmt.Sprintf("%s_lead1_rain_mm", p))
|
|
||||||
header = append(header, fmt.Sprintf("%s_lead2_rain_mm", p))
|
|
||||||
header = append(header, fmt.Sprintf("%s_lead3_rain_mm", p))
|
|
||||||
}
|
|
||||||
if err := out.Write(header); err != nil {
|
|
||||||
log.Fatalf("写入 CSV 失败: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
rows, err := loadActualHourly(ctx, stationID, start, end)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("查询实况失败: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, row := range rows {
|
|
||||||
rec := []string{
|
|
||||||
stationID,
|
|
||||||
row.HourEnd.Format("2006-01-02 15:04:05"),
|
|
||||||
fmt.Sprintf("%.2f", row.TempC),
|
|
||||||
fmt.Sprintf("%.2f", row.HumidityPct),
|
|
||||||
fmt.Sprintf("%.2f", row.WindDirDeg),
|
|
||||||
fmt.Sprintf("%.3f", row.WindSpeedMs),
|
|
||||||
fmt.Sprintf("%.2f", row.PressureHpa),
|
|
||||||
fmt.Sprintf("%.3f", row.RainActualMM),
|
|
||||||
}
|
|
||||||
for _, p := range providers {
|
|
||||||
// For each lead 1..3, get rain for forecast_time = hour_end, latest issued_at for that lead
|
|
||||||
for lead := 1; lead <= 3; lead++ {
|
|
||||||
v, _ := loadProviderRainAt(ctx, stationID, p, row.HourEnd, lead)
|
|
||||||
if v < 0 {
|
|
||||||
rec = append(rec, "")
|
|
||||||
} else {
|
|
||||||
rec = append(rec, fmt.Sprintf("%.3f", v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := out.Write(rec); err != nil {
|
|
||||||
log.Fatalf("写入 CSV 失败: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
out.Flush()
|
|
||||||
if err := out.Error(); err != nil {
|
|
||||||
log.Fatalf("写入 CSV 错误: %v", err)
|
|
||||||
}
|
|
||||||
if file != nil {
|
|
||||||
log.Printf("导出完成: %s,共 %d 行", outPath, len(rows))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func splitCSV(s string) []string {
|
|
||||||
parts := strings.Split(s, ",")
|
|
||||||
out := make([]string, 0, len(parts))
|
|
||||||
for _, p := range parts {
|
|
||||||
p = strings.TrimSpace(p)
|
|
||||||
if p != "" {
|
|
||||||
out = append(out, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadActualHourly(ctx context.Context, stationID string, start, end time.Time) ([]actualHour, error) {
|
|
||||||
// Right-endpoint hourly aggregation from rs485_weather_10min
|
|
||||||
const q = `
|
|
||||||
WITH base AS (
|
|
||||||
SELECT * FROM rs485_weather_10min
|
|
||||||
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
|
|
||||||
), g AS (
|
|
||||||
SELECT date_trunc('hour', bucket_start) AS grp,
|
|
||||||
SUM(temp_c_x100 * sample_count)::bigint AS w_temp,
|
|
||||||
SUM(humidity_pct * sample_count)::bigint AS w_hum,
|
|
||||||
SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p,
|
|
||||||
SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar,
|
|
||||||
SUM(uv_index * sample_count)::bigint AS w_uv,
|
|
||||||
SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws,
|
|
||||||
MAX(wind_gust_ms_x1000) AS gust_max,
|
|
||||||
SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum,
|
|
||||||
SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum,
|
|
||||||
SUM(rain_10m_mm_x1000) AS rain_sum,
|
|
||||||
SUM(sample_count) AS n_sum
|
|
||||||
FROM base GROUP BY 1
|
|
||||||
)
|
|
||||||
SELECT grp + interval '1 hour' AS hour_end,
|
|
||||||
(w_temp/NULLIF(n_sum,0))/100.0 AS temp_c,
|
|
||||||
(w_hum/NULLIF(n_sum,0))::double precision AS humidity_pct,
|
|
||||||
(w_p/NULLIF(n_sum,0))/100.0 AS pressure_hpa,
|
|
||||||
(w_ws/NULLIF(n_sum,0))/1000.0 AS wind_speed_ms,
|
|
||||||
CASE WHEN sin_sum IS NULL OR cos_sum IS NULL THEN NULL
|
|
||||||
ELSE (
|
|
||||||
CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0
|
|
||||||
THEN degrees(atan2(sin_sum, cos_sum)) + 360
|
|
||||||
ELSE degrees(atan2(sin_sum, cos_sum)) END)
|
|
||||||
END AS wind_dir_deg,
|
|
||||||
(rain_sum/1000.0) AS rain_mm
|
|
||||||
FROM g
|
|
||||||
ORDER BY hour_end`
|
|
||||||
|
|
||||||
rows, err := data.DB().QueryContext(ctx, q, stationID, start, end)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
var out []actualHour
|
|
||||||
for rows.Next() {
|
|
||||||
var t time.Time
|
|
||||||
var ta, ua, pa, ws, dm, rain sql.NullFloat64
|
|
||||||
if err := rows.Scan(&t, &ta, &ua, &pa, &ws, &dm, &rain); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
out = append(out, actualHour{
|
|
||||||
HourEnd: t,
|
|
||||||
TempC: nullF(ta),
|
|
||||||
HumidityPct: nullF(ua),
|
|
||||||
PressureHpa: nullF(pa),
|
|
||||||
WindSpeedMs: nullF(ws),
|
|
||||||
WindDirDeg: nullF(dm),
|
|
||||||
RainActualMM: nullF(rain),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// loadProviderRainAt returns rain(mm) for a provider at forecast_time=t with fixed lead, picking latest issued_at.
|
|
||||||
func loadProviderRainAt(ctx context.Context, stationID, provider string, t time.Time, lead int) (float64, error) {
|
|
||||||
const q = `
|
|
||||||
SELECT COALESCE(rain_mm_x1000,0)::bigint
|
|
||||||
FROM (
|
|
||||||
SELECT rain_mm_x1000, issued_at,
|
|
||||||
CEIL(EXTRACT(EPOCH FROM ($3 - issued_at)) / 3600.0)::int AS lead_hours
|
|
||||||
FROM forecast_hourly
|
|
||||||
WHERE station_id=$1 AND provider=$2 AND forecast_time=$3
|
|
||||||
) x
|
|
||||||
WHERE lead_hours=$4
|
|
||||||
ORDER BY issued_at DESC
|
|
||||||
LIMIT 1`
|
|
||||||
var v int64
|
|
||||||
err := data.DB().QueryRowContext(ctx, q, stationID, provider, t, lead).Scan(&v)
|
|
||||||
if err != nil {
|
|
||||||
return -1, err
|
|
||||||
}
|
|
||||||
return float64(v) / 1000.0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func nullF(n sql.NullFloat64) float64 {
|
|
||||||
if n.Valid {
|
|
||||||
return n.Float64
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
@ -1,551 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"weatherstation/core/internal/config"
|
|
||||||
"weatherstation/core/internal/data"
|
|
||||||
"weatherstation/core/internal/sms"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
providerForecast = "imdroid_mix"
|
|
||||||
alertTypeForecast = "forecast_3h_rain"
|
|
||||||
alertTypeActual30m = "actual_30m_rain"
|
|
||||||
alertTypeNeighbor30 = "actual_30m_neighbor"
|
|
||||||
levelRed = "red"
|
|
||||||
levelYellow = "yellow"
|
|
||||||
forecastRedMM = 8.0
|
|
||||||
forecastYellowMM = 4.0
|
|
||||||
actualRedMM = 4.0
|
|
||||||
actualYellowMM = 2.0
|
|
||||||
halfAngleDeg = 90.0
|
|
||||||
timeFormatShort = "2006-01-02 15:04"
|
|
||||||
defaultCheckTimeout = 20 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
flagOnce bool
|
|
||||||
flagTest bool
|
|
||||||
flagTestStIDs string
|
|
||||||
flagTestTime string
|
|
||||||
flagWhy bool
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
flag.BoolVar(&flagOnce, "once", false, "run checks once immediately (no scheduling)")
|
|
||||||
flag.BoolVar(&flagTest, "test", false, "force alerts regardless of thresholds (for dry-run)")
|
|
||||||
flag.StringVar(&flagTestStIDs, "station", "", "comma-separated station_id list for test mode")
|
|
||||||
flag.StringVar(&flagTestTime, "time", "", "test mode: specify end time (YYYY-MM-DD HH:MM:SS, CST)")
|
|
||||||
flag.BoolVar(&flagWhy, "why", false, "in test mode, log reasons when alert not triggered")
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
cfg := config.Load()
|
|
||||||
|
|
||||||
scli := mustInitSMS(cfg)
|
|
||||||
|
|
||||||
if flagOnce {
|
|
||||||
tick := time.Now()
|
|
||||||
runForecastCheck(scli, tick)
|
|
||||||
runActualCheck(scli, tick)
|
|
||||||
runNeighborActualCheck(scli, tick)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go alignAndRunHour10(func(tick time.Time) { runForecastCheck(scli, tick) })
|
|
||||||
go alignAndRunHalfHour(func(tick time.Time) { runActualCheck(scli, tick) })
|
|
||||||
go alignAndRunHalfHour(func(tick time.Time) { runNeighborActualCheck(scli, tick) })
|
|
||||||
|
|
||||||
select {}
|
|
||||||
}
|
|
||||||
|
|
||||||
func mustInitSMS(cfg config.Config) *sms.Client {
|
|
||||||
cli, err := sms.New(sms.Config{
|
|
||||||
AccessKeyID: strings.TrimSpace(cfg.SMS.AccessKeyID),
|
|
||||||
AccessKeySecret: strings.TrimSpace(cfg.SMS.AccessKeySecret),
|
|
||||||
SignName: strings.TrimSpace(cfg.SMS.SignName),
|
|
||||||
TemplateCode: strings.TrimSpace(cfg.SMS.TemplateCode),
|
|
||||||
Endpoint: strings.TrimSpace(cfg.SMS.Endpoint),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("sms: disabled (%v)", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return cli
|
|
||||||
}
|
|
||||||
|
|
||||||
func alignAndRunHour10(fn func(tick time.Time)) {
|
|
||||||
now := time.Now()
|
|
||||||
base := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 10, 0, 0, now.Location())
|
|
||||||
var next time.Time
|
|
||||||
if now.After(base) {
|
|
||||||
next = base.Add(time.Hour)
|
|
||||||
} else {
|
|
||||||
next = base
|
|
||||||
}
|
|
||||||
time.Sleep(time.Until(next))
|
|
||||||
for {
|
|
||||||
tick := time.Now().Truncate(time.Minute)
|
|
||||||
fn(tick)
|
|
||||||
time.Sleep(time.Hour)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func alignAndRunHalfHour(fn func(tick time.Time)) {
|
|
||||||
now := time.Now()
|
|
||||||
next := now.Truncate(30 * time.Minute).Add(30 * time.Minute)
|
|
||||||
time.Sleep(time.Until(next))
|
|
||||||
for {
|
|
||||||
tick := time.Now().Truncate(time.Minute)
|
|
||||||
fn(tick)
|
|
||||||
time.Sleep(30 * time.Minute)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func runForecastCheck(scli *sms.Client, tick time.Time) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultCheckTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
stations := listFixedStations(ctx)
|
|
||||||
stations = filterStations(stations, flagTestStIDs)
|
|
||||||
if len(stations) == 0 {
|
|
||||||
log.Printf("forecast: no stations after filter")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
recip := loadRecipients(ctx)
|
|
||||||
loc := mustShanghai()
|
|
||||||
now := tick.In(loc)
|
|
||||||
issued := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, loc)
|
|
||||||
next1 := issued.Add(time.Hour)
|
|
||||||
next3 := issued.Add(3 * time.Hour)
|
|
||||||
|
|
||||||
for _, st := range stations {
|
|
||||||
points, err := data.ForecastRainAtIssued(ctx, st.ID, providerForecast, issued)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("forecast: query station=%s err=%v", st.ID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var redMax, yellowMax int64
|
|
||||||
for _, p := range points {
|
|
||||||
if p.ForecastTime.Before(next1) || p.ForecastTime.After(next3) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
v := int64(p.RainMMx1000)
|
|
||||||
if v >= 8000 {
|
|
||||||
if v > redMax {
|
|
||||||
redMax = v
|
|
||||||
}
|
|
||||||
} else if v >= 4000 {
|
|
||||||
if v > yellowMax {
|
|
||||||
yellowMax = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
level := ""
|
|
||||||
value := 0.0
|
|
||||||
threshold := 0.0
|
|
||||||
if redMax > 0 {
|
|
||||||
level = levelRed
|
|
||||||
value = float64(redMax) / 1000.0
|
|
||||||
threshold = forecastRedMM
|
|
||||||
} else if yellowMax > 0 {
|
|
||||||
level = levelYellow
|
|
||||||
value = float64(yellowMax) / 1000.0
|
|
||||||
threshold = forecastYellowMM
|
|
||||||
}
|
|
||||||
if level == "" {
|
|
||||||
if !flagTest {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("forecast why: station=%s no threshold hit redMax=%d yellowMax=%d", st.ID, redMax, yellowMax)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
level = levelYellow
|
|
||||||
value = forecastYellowMM
|
|
||||||
threshold = forecastYellowMM
|
|
||||||
}
|
|
||||||
if flagTest {
|
|
||||||
msg := fmt.Sprintf("【测试】站点%s 强制触发未来3小时降水预警 level=%s", st.Name, levelLabel(level))
|
|
||||||
targetPhones := recip.forLevel(level)
|
|
||||||
if len(targetPhones) == 0 {
|
|
||||||
recordAlert(ctx, alertTypeForecast, st.ID, level, issued, msg, sql.NullString{})
|
|
||||||
} else {
|
|
||||||
sendToPhones(ctx, scli, st.Name, value, level, issued, targetPhones, msg, alertTypeForecast, st.ID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
msg := fmt.Sprintf("站点%s 未来3小时单小时最大降水 %.3fmm,达到%s阈值 %.1fmm,issued_at=%s", st.Name, value, levelLabel(level), threshold, issued.Format(timeFormatShort))
|
|
||||||
targetPhones := recip.forLevel(level)
|
|
||||||
if len(targetPhones) == 0 {
|
|
||||||
recordAlert(ctx, alertTypeForecast, st.ID, level, issued, msg, sql.NullString{})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sendToPhones(ctx, scli, st.Name, value, level, issued, targetPhones, msg, alertTypeForecast, st.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func runActualCheck(scli *sms.Client, tick time.Time) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultCheckTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
stations := listFixedStations(ctx)
|
|
||||||
stations = filterStations(stations, flagTestStIDs)
|
|
||||||
if len(stations) == 0 {
|
|
||||||
log.Printf("actual: no stations after filter")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
recip := loadRecipients(ctx)
|
|
||||||
loc := mustShanghai()
|
|
||||||
end := tick.In(loc)
|
|
||||||
if flagTestTime != "" {
|
|
||||||
if t, err := time.ParseInLocation("2006-01-02 15:04:05", flagTestTime, loc); err == nil {
|
|
||||||
end = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
start := end.Add(-30 * time.Minute)
|
|
||||||
|
|
||||||
for _, st := range stations {
|
|
||||||
rain, ok, err := data.SumRainMM(ctx, st.ID, start, end)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("actual: sum station=%s err=%v", st.ID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("actual why: station=%s window=%s~%s rain_sum=%.3f ok=%v", st.ID, start.Format(timeFormatShort), end.Format(timeFormatShort), rain, ok)
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("actual why: station=%s no rain data", st.ID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
level := ""
|
|
||||||
threshold := 0.0
|
|
||||||
if rain >= actualRedMM {
|
|
||||||
level = levelRed
|
|
||||||
threshold = actualRedMM
|
|
||||||
} else if rain >= actualYellowMM {
|
|
||||||
level = levelYellow
|
|
||||||
threshold = actualYellowMM
|
|
||||||
}
|
|
||||||
if level == "" {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("actual why: station=%s rain=%.3f below threshold", st.ID, rain)
|
|
||||||
}
|
|
||||||
if !flagTest {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
level = levelYellow
|
|
||||||
threshold = actualYellowMM
|
|
||||||
rain = actualYellowMM
|
|
||||||
}
|
|
||||||
if flagTest {
|
|
||||||
msg := fmt.Sprintf("【测试】站点%s 强制触发30分钟降水预警 level=%s", st.Name, levelLabel(level))
|
|
||||||
targetPhones := recip.forLevel(level)
|
|
||||||
if len(targetPhones) == 0 {
|
|
||||||
recordAlert(ctx, alertTypeActual30m, st.ID, level, end, msg, sql.NullString{})
|
|
||||||
} else {
|
|
||||||
sendToPhones(ctx, scli, st.Name, rain, level, end, targetPhones, msg, alertTypeActual30m, st.ID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
msg := fmt.Sprintf("站点%s 过去30分钟降水 %.3fmm,达到%s阈值 %.1fmm,窗口 %s - %s", st.Name, rain, levelLabel(level), threshold, start.Format(timeFormatShort), end.Format(timeFormatShort))
|
|
||||||
targetPhones := recip.forLevel(level)
|
|
||||||
if len(targetPhones) == 0 {
|
|
||||||
recordAlert(ctx, alertTypeActual30m, st.ID, level, end, msg, sql.NullString{})
|
|
||||||
log.Printf("actual: triggered station=%s level=%s rain=%.3f (no phones)", st.ID, level, rain)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sendToPhones(ctx, scli, st.Name, rain, level, end, targetPhones, msg, alertTypeActual30m, st.ID)
|
|
||||||
log.Printf("actual: triggered station=%s level=%s rain=%.3f phones=%d", st.ID, level, rain, len(targetPhones))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func runNeighborActualCheck(scli *sms.Client, tick time.Time) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultCheckTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
allStations := listFixedStations(ctx)
|
|
||||||
centers := filterStations(allStations, flagTestStIDs)
|
|
||||||
if len(centers) == 0 {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("neighbor why: no stations after filter")
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
recip := loadRecipients(ctx)
|
|
||||||
loc := mustShanghai()
|
|
||||||
end := tick.In(loc)
|
|
||||||
if flagTestTime != "" {
|
|
||||||
if t, err := time.ParseInLocation("2006-01-02 15:04:05", flagTestTime, loc); err == nil {
|
|
||||||
end = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
start := end.Add(-30 * time.Minute)
|
|
||||||
|
|
||||||
for _, center := range centers {
|
|
||||||
if center.Latitude == 0 || center.Longitude == 0 {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("neighbor why: center %s missing lat/lon", center.ID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
wind, err := data.RadarWeatherNearest(center.Latitude, center.Longitude, end, 6*time.Hour)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("neighbor: wind query failed station=%s: %v", center.ID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if wind == nil || !wind.WindDirection.Valid || !wind.WindSpeed.Valid || wind.WindSpeed.Float64 <= 0.01 {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("neighbor why: center %s no wind data", center.ID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dir := wind.WindDirection.Float64
|
|
||||||
spd := wind.WindSpeed.Float64
|
|
||||||
radius := spd * 3600
|
|
||||||
for _, nb := range allStations {
|
|
||||||
if nb.ID == center.ID {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if nb.Latitude == 0 || nb.Longitude == 0 {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("neighbor why: neighbor %s missing lat/lon", nb.ID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dist := haversine(center.Latitude, center.Longitude, nb.Latitude, nb.Longitude)
|
|
||||||
brg := bearingDeg(center.Latitude, center.Longitude, nb.Latitude, nb.Longitude)
|
|
||||||
diff := angDiff(brg, dir)
|
|
||||||
inSector := dist <= radius && diff <= halfAngleDeg
|
|
||||||
if !inSector {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("neighbor why: center=%s neighbor=%s not in sector dist=%.1fm radius=%.1fm bearing=%.1f windFrom=%.1f diff=%.1f half=%.1f",
|
|
||||||
center.ID, nb.ID, dist, radius, brg, dir, diff, halfAngleDeg)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rain, ok, err := data.SumRainMM(ctx, nb.ID, start, end)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("neighbor: sum rain station=%s err=%v", nb.ID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("neighbor why: center=%s neighbor=%s window=%s~%s rain_sum=%.3f ok=%v", center.ID, nb.ID, start.Format(timeFormatShort), end.Format(timeFormatShort), rain, ok)
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("neighbor why: neighbor %s no rain data", nb.ID)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
level := ""
|
|
||||||
threshold := 0.0
|
|
||||||
if rain >= actualRedMM {
|
|
||||||
level = levelRed
|
|
||||||
threshold = actualRedMM
|
|
||||||
} else if rain >= actualYellowMM {
|
|
||||||
level = levelYellow
|
|
||||||
threshold = actualYellowMM
|
|
||||||
}
|
|
||||||
if level == "" {
|
|
||||||
if flagWhy {
|
|
||||||
log.Printf("neighbor why: neighbor %s rain=%.3f below threshold", nb.ID, rain)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
atype := alertTypeNeighbor30 + "_" + nb.ID
|
|
||||||
msg := fmt.Sprintf("站点%s 迎风扇区内站点%s 30分钟降水 %.3fmm,达到%s阈值 %.1fmm,窗口 %s - %s", center.Name, nb.Name, rain, levelLabel(level), threshold, start.Format(timeFormatShort), end.Format(timeFormatShort))
|
|
||||||
targetPhones := recip.forLevel(level)
|
|
||||||
if len(targetPhones) == 0 {
|
|
||||||
recordAlert(ctx, atype, center.ID, level, end, msg, sql.NullString{})
|
|
||||||
log.Printf("neighbor: center=%s neighbor=%s level=%s rain=%.3f (no phones)", center.ID, nb.ID, level, rain)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sendToPhones(ctx, scli, center.Name, rain, level, end, targetPhones, msg, atype, center.ID)
|
|
||||||
log.Printf("neighbor: center=%s neighbor=%s level=%s rain=%.3f phones=%d", center.ID, nb.ID, level, rain, len(targetPhones))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func recordAlert(ctx context.Context, alertType, stationID, level string, issuedAt time.Time, message string, phone sql.NullString) {
|
|
||||||
_, err := data.InsertAlert(ctx, data.AlertRecord{
|
|
||||||
AlertType: alertType,
|
|
||||||
StationID: stationID,
|
|
||||||
Level: level,
|
|
||||||
IssuedAt: issuedAt,
|
|
||||||
Message: message,
|
|
||||||
SMSPhone: phone,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("alert insert failed station=%s type=%s level=%s: %v", stationID, alertType, level, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type recipients struct {
|
|
||||||
red []string
|
|
||||||
yel []string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r recipients) forLevel(level string) []string {
|
|
||||||
if level == levelRed {
|
|
||||||
return r.red
|
|
||||||
}
|
|
||||||
if level == levelYellow {
|
|
||||||
return r.yel
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadRecipients(ctx context.Context) recipients {
|
|
||||||
list, err := data.ListEnabledSMSRecipients(ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("sms: load recipients failed: %v", err)
|
|
||||||
return recipients{}
|
|
||||||
}
|
|
||||||
var res recipients
|
|
||||||
for _, r := range list {
|
|
||||||
if r.AlertLevel >= 1 {
|
|
||||||
res.red = append(res.red, r.Phone)
|
|
||||||
}
|
|
||||||
if r.AlertLevel >= 2 {
|
|
||||||
res.yel = append(res.yel, r.Phone)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendToPhones(ctx context.Context, scli *sms.Client, stationName string, value float64, level string, issuedAt time.Time, phones []string, message string, alertType string, stationID string) {
|
|
||||||
if scli == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
name := ":" + stationName + ","
|
|
||||||
content := format3(value) + " mm"
|
|
||||||
alertText := "【大礼村】暴雨"
|
|
||||||
if level == levelRed {
|
|
||||||
alertText += "红色预警"
|
|
||||||
} else {
|
|
||||||
alertText += "黄色预警"
|
|
||||||
}
|
|
||||||
for _, ph := range phones {
|
|
||||||
if err := scli.Send(ctx, name, content, alertText, "", []string{ph}); err != nil {
|
|
||||||
log.Printf("sms: send failed phone=%s station=%s level=%s: %v", ph, stationID, level, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
recordAlert(ctx, alertType, stationID, level, issuedAt, message, sql.NullString{String: ph, Valid: true})
|
|
||||||
log.Printf("sms: sent phone=%s station=%s level=%s", ph, stationID, level)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func format3(v float64) string {
|
|
||||||
s := fmt.Sprintf("%.3f", v)
|
|
||||||
s = strings.TrimRight(s, "0")
|
|
||||||
s = strings.TrimRight(s, ".")
|
|
||||||
if s == "" {
|
|
||||||
return "0"
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func mustShanghai() *time.Location {
|
|
||||||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
|
||||||
if loc == nil {
|
|
||||||
loc = time.FixedZone("CST", 8*3600)
|
|
||||||
}
|
|
||||||
return loc
|
|
||||||
}
|
|
||||||
|
|
||||||
func levelLabel(level string) string {
|
|
||||||
if level == levelRed {
|
|
||||||
return "红色"
|
|
||||||
}
|
|
||||||
return "黄色"
|
|
||||||
}
|
|
||||||
|
|
||||||
func listFixedStations(ctx context.Context) []data.StationInfo {
|
|
||||||
ids := []string{"RS485-002964", "RS485-002A39", "RS485-0029CB"}
|
|
||||||
sts, err := data.ListStationsByIDs(ctx, ids)
|
|
||||||
if err != nil || len(sts) == 0 {
|
|
||||||
var out []data.StationInfo
|
|
||||||
for _, id := range ids {
|
|
||||||
out = append(out, data.StationInfo{ID: id, Name: id})
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
// 保证顺序按 ids
|
|
||||||
order := make(map[string]int)
|
|
||||||
for i, id := range ids {
|
|
||||||
order[id] = i
|
|
||||||
}
|
|
||||||
sort.Slice(sts, func(i, j int) bool { return order[sts[i].ID] < order[sts[j].ID] })
|
|
||||||
return sts
|
|
||||||
}
|
|
||||||
|
|
||||||
func filterStations(in []data.StationInfo, filter string) []data.StationInfo {
|
|
||||||
f := strings.TrimSpace(filter)
|
|
||||||
if f == "" {
|
|
||||||
return in
|
|
||||||
}
|
|
||||||
parts := strings.Split(f, ",")
|
|
||||||
m := make(map[string]struct{}, len(parts))
|
|
||||||
for _, p := range parts {
|
|
||||||
p = strings.TrimSpace(p)
|
|
||||||
if p != "" {
|
|
||||||
m[p] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(m) == 0 {
|
|
||||||
return in
|
|
||||||
}
|
|
||||||
var out []data.StationInfo
|
|
||||||
for _, st := range in {
|
|
||||||
if _, ok := m[st.ID]; ok {
|
|
||||||
out = append(out, st)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func toRad(d float64) float64 { return d * math.Pi / 180 }
|
|
||||||
func toDeg(r float64) float64 { return r * 180 / math.Pi }
|
|
||||||
|
|
||||||
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
|
|
||||||
const R = 6371000.0
|
|
||||||
dLat := toRad(lat2 - lat1)
|
|
||||||
dLon := toRad(lon2 - lon1)
|
|
||||||
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
|
|
||||||
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
|
|
||||||
return R * c
|
|
||||||
}
|
|
||||||
|
|
||||||
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
|
|
||||||
φ1 := toRad(lat1)
|
|
||||||
φ2 := toRad(lat2)
|
|
||||||
Δλ := toRad(lon2 - lon1)
|
|
||||||
y := math.Sin(Δλ) * math.Cos(φ2)
|
|
||||||
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
|
|
||||||
brg := toDeg(math.Atan2(y, x))
|
|
||||||
if brg < 0 {
|
|
||||||
brg += 360
|
|
||||||
}
|
|
||||||
return brg
|
|
||||||
}
|
|
||||||
|
|
||||||
func angDiff(a, b float64) float64 {
|
|
||||||
d := math.Mod(a-b+540, 360) - 180
|
|
||||||
if d < 0 {
|
|
||||||
d = -d
|
|
||||||
}
|
|
||||||
return math.Abs(d)
|
|
||||||
}
|
|
||||||
@ -422,7 +422,7 @@ export class AppComponent implements OnInit, AfterViewInit {
|
|||||||
} catch {}
|
} catch {}
|
||||||
|
|
||||||
if (windDir != null && windSpd != null && windSpd > 0.01) {
|
if (windDir != null && windSpd != null && windSpd > 0.01) {
|
||||||
const bearingTo = windDir;
|
const bearingTo = (windDir + 180) % 360; // 去向
|
||||||
const hours = 3;
|
const hours = 3;
|
||||||
const radius = windSpd * 3600 * hours; // m
|
const radius = windSpd * 3600 * hours; // m
|
||||||
const half = 25; // 半角
|
const half = 25; // 半角
|
||||||
|
|||||||
@ -1,41 +0,0 @@
|
|||||||
package data
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type AlertRecord struct {
|
|
||||||
AlertType string
|
|
||||||
StationID string
|
|
||||||
Level string
|
|
||||||
IssuedAt time.Time
|
|
||||||
Message string
|
|
||||||
SMSPhone sql.NullString
|
|
||||||
}
|
|
||||||
|
|
||||||
// InsertAlert writes one alert row; returns inserted id or 0 when skipped by conflict.
|
|
||||||
func InsertAlert(ctx context.Context, ar AlertRecord) (int64, error) {
|
|
||||||
const q = `
|
|
||||||
INSERT INTO alerts (alert_type, station_id, level, issued_at, message, sms_phone)
|
|
||||||
VALUES ($1,$2,$3,$4,$5,$6)
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
RETURNING id`
|
|
||||||
var id int64
|
|
||||||
err := DB().QueryRowContext(ctx, q,
|
|
||||||
ar.AlertType,
|
|
||||||
ar.StationID,
|
|
||||||
ar.Level,
|
|
||||||
ar.IssuedAt,
|
|
||||||
ar.Message,
|
|
||||||
ar.SMSPhone,
|
|
||||||
).Scan(&id)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
@ -19,17 +19,3 @@ func FetchActualHourlyRain(ctx context.Context, stationID string, start, end tim
|
|||||||
}
|
}
|
||||||
return float64(sum.Int64) / 1000.0, true, nil
|
return float64(sum.Int64) / 1000.0, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SumRainMM sums rain_10m_mm_x1000 over [start,end) and returns mm.
|
|
||||||
func SumRainMM(ctx context.Context, stationID string, start, end time.Time) (float64, bool, error) {
|
|
||||||
const q = `SELECT SUM(rain_10m_mm_x1000) FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
|
|
||||||
var sum sql.NullInt64
|
|
||||||
err := DB().QueryRowContext(ctx, q, stationID, start, end).Scan(&sum)
|
|
||||||
if err != nil {
|
|
||||||
return 0, false, err
|
|
||||||
}
|
|
||||||
if !sum.Valid {
|
|
||||||
return 0, false, nil
|
|
||||||
}
|
|
||||||
return float64(sum.Int64) / 1000.0, true, nil
|
|
||||||
}
|
|
||||||
|
|||||||
@ -3,8 +3,6 @@ package data
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetStationName returns stations.name by station_id; empty string if not found/null.
|
// GetStationName returns stations.name by station_id; empty string if not found/null.
|
||||||
@ -23,88 +21,3 @@ func GetStationName(ctx context.Context, stationID string) (string, error) {
|
|||||||
}
|
}
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// StationInfo contains minimal fields for alert checks.
|
|
||||||
type StationInfo struct {
|
|
||||||
ID string
|
|
||||||
Name string
|
|
||||||
Location string
|
|
||||||
Latitude float64
|
|
||||||
Longitude float64
|
|
||||||
Altitude float64
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListEligibleStations returns WH65LP stations with required non-empty fields.
|
|
||||||
func ListEligibleStations(ctx context.Context) ([]StationInfo, error) {
|
|
||||||
const q = `
|
|
||||||
SELECT
|
|
||||||
station_id,
|
|
||||||
COALESCE(NULLIF(BTRIM(name), ''), station_id) AS name,
|
|
||||||
location,
|
|
||||||
latitude::float8,
|
|
||||||
longitude::float8,
|
|
||||||
altitude::float8
|
|
||||||
FROM stations
|
|
||||||
WHERE
|
|
||||||
device_type = 'WH65LP' AND
|
|
||||||
name IS NOT NULL AND BTRIM(name) <> '' AND
|
|
||||||
location IS NOT NULL AND BTRIM(location) <> '' AND
|
|
||||||
latitude IS NOT NULL AND latitude <> 0 AND
|
|
||||||
longitude IS NOT NULL AND longitude <> 0 AND
|
|
||||||
altitude IS NOT NULL AND altitude <> 0`
|
|
||||||
|
|
||||||
rows, err := DB().QueryContext(ctx, q)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
var out []StationInfo
|
|
||||||
for rows.Next() {
|
|
||||||
var st StationInfo
|
|
||||||
if err := rows.Scan(&st.ID, &st.Name, &st.Location, &st.Latitude, &st.Longitude, &st.Altitude); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
out = append(out, st)
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListStationsByIDs returns stations by a given id list (ignores missing ones).
|
|
||||||
func ListStationsByIDs(ctx context.Context, ids []string) ([]StationInfo, error) {
|
|
||||||
if len(ids) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
var placeholders []string
|
|
||||||
args := make([]interface{}, 0, len(ids))
|
|
||||||
for i, id := range ids {
|
|
||||||
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
|
|
||||||
args = append(args, id)
|
|
||||||
}
|
|
||||||
q := fmt.Sprintf(`
|
|
||||||
SELECT
|
|
||||||
station_id,
|
|
||||||
COALESCE(NULLIF(BTRIM(name), ''), station_id) AS name,
|
|
||||||
location,
|
|
||||||
latitude::float8,
|
|
||||||
longitude::float8,
|
|
||||||
altitude::float8
|
|
||||||
FROM stations
|
|
||||||
WHERE station_id IN (%s)`, strings.Join(placeholders, ","))
|
|
||||||
|
|
||||||
rows, err := DB().QueryContext(ctx, q, args...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
var out []StationInfo
|
|
||||||
for rows.Next() {
|
|
||||||
var st StationInfo
|
|
||||||
if err := rows.Scan(&st.ID, &st.Name, &st.Location, &st.Latitude, &st.Longitude, &st.Altitude); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
out = append(out, st)
|
|
||||||
}
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|||||||
@ -1,7 +0,0 @@
|
|||||||
-- 02.sql: create simple users table
|
|
||||||
CREATE TABLE IF NOT EXISTS users (
|
|
||||||
username TEXT PRIMARY KEY,
|
|
||||||
password TEXT NOT NULL, -- bcrypt hash
|
|
||||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
||||||
);
|
|
||||||
|
|
||||||
@ -1,9 +0,0 @@
|
|||||||
-- 03.sql: recipients for SMS alerts
|
|
||||||
-- Table to store phone numbers, enabled flag, and alert level (1=red only, 2=yellow+red)
|
|
||||||
-- PostgreSQL has no native unsigned int; use integer with CHECK constraints.
|
|
||||||
CREATE TABLE IF NOT EXISTS sms_recipients (
|
|
||||||
phone TEXT PRIMARY KEY,
|
|
||||||
enabled BOOLEAN NOT NULL DEFAULT TRUE,
|
|
||||||
alert_level INTEGER NOT NULL DEFAULT 2 CHECK (alert_level >= 1)
|
|
||||||
);
|
|
||||||
|
|
||||||
@ -1,28 +0,0 @@
|
|||||||
-- 04_sms_recipients_seed_20251125.sql: add/update specific SMS recipients
|
|
||||||
-- Sets enabled = FALSE and alert_level = 1 for listed phone numbers.
|
|
||||||
-- Idempotent via ON CONFLICT on primary key (phone).
|
|
||||||
|
|
||||||
INSERT INTO sms_recipients (phone, enabled, alert_level) VALUES
|
|
||||||
('13114458208', FALSE, 1),
|
|
||||||
('13986807953', FALSE, 1),
|
|
||||||
('13207210509', FALSE, 1),
|
|
||||||
('13886680872', FALSE, 1),
|
|
||||||
('13477172662', FALSE, 1),
|
|
||||||
('13177094329', FALSE, 1),
|
|
||||||
('13165617999', FALSE, 1),
|
|
||||||
('13217179901', FALSE, 1),
|
|
||||||
('18571017120', FALSE, 1),
|
|
||||||
('18674205345', FALSE, 1),
|
|
||||||
('18871769640', FALSE, 1),
|
|
||||||
('15587930225', FALSE, 1),
|
|
||||||
('13545715958', FALSE, 1),
|
|
||||||
('15629386907', FALSE, 1),
|
|
||||||
('15971633321', FALSE, 1),
|
|
||||||
('15671074991', FALSE, 1),
|
|
||||||
('18727254175', FALSE, 1),
|
|
||||||
('13477108587', FALSE, 1),
|
|
||||||
('15897521649', FALSE, 1)
|
|
||||||
ON CONFLICT (phone) DO UPDATE SET
|
|
||||||
enabled = EXCLUDED.enabled,
|
|
||||||
alert_level = EXCLUDED.alert_level;
|
|
||||||
|
|
||||||
@ -1,23 +0,0 @@
|
|||||||
-- 05_alerts.sql: table for alerts/warnings
|
|
||||||
CREATE TABLE IF NOT EXISTS alerts (
|
|
||||||
id BIGSERIAL PRIMARY KEY,
|
|
||||||
alert_type TEXT NOT NULL,
|
|
||||||
station_id VARCHAR(50) NOT NULL,
|
|
||||||
level VARCHAR(10) NOT NULL CHECK (level IN ('yellow', 'red')),
|
|
||||||
issued_at TIMESTAMPTZ NOT NULL,
|
|
||||||
message TEXT,
|
|
||||||
sms_phone TEXT,
|
|
||||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
||||||
);
|
|
||||||
|
|
||||||
-- One record per alert event (no SMS stored)
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS alerts_uniq_event
|
|
||||||
ON alerts (alert_type, station_id, issued_at, level)
|
|
||||||
WHERE sms_phone IS NULL;
|
|
||||||
|
|
||||||
-- One record per phone recipient
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS alerts_uniq_phone
|
|
||||||
ON alerts (alert_type, station_id, issued_at, level, sms_phone)
|
|
||||||
WHERE sms_phone IS NOT NULL;
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS alerts_station_idx ON alerts (station_id);
|
|
||||||
Loading…
x
Reference in New Issue
Block a user