Compare commits

..

2 Commits

Author SHA1 Message Date
6d276b1760 feat: 新增告警 2025-12-05 09:22:23 +08:00
2c2d5232f8 feat: 新增一些辅助的代码 2025-12-03 11:30:13 +08:00
15 changed files with 2531 additions and 1 deletions

279
cmd/caiyun_parse/main.go Normal file
View File

@ -0,0 +1,279 @@
package main
import (
"bufio"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"sort"
"strings"
"time"
)
// Minimal Caiyun hourly model focusing on required fields.
type caiyunHourly struct {
Status string `json:"status"`
Result struct {
Hourly struct {
Status string `json:"status"`
Temperature []valTime `json:"temperature"`
Humidity []valTime `json:"humidity"`
Visibility []valTime `json:"visibility"`
Dswrf []valTime `json:"dswrf"`
Pressure []valTime `json:"pressure"`
Wind []windTime `json:"wind"`
} `json:"hourly"`
} `json:"result"`
}
type valTime struct {
Datetime string `json:"datetime"`
Value float64 `json:"value"`
}
type windTime struct {
Datetime string `json:"datetime"`
Speed float64 `json:"speed"`
Direction float64 `json:"direction"`
}
type row struct {
t time.Time
temperature *float64
humidity *float64
windSpeed *float64
windDir *float64
pressure *float64
visibility *float64
dswrf *float64
}
func main() {
var file string
var tz string
var mode string
var alias string
var lat float64
var lon float64
var sqlTable string
flag.StringVar(&file, "file", "", "Path to Caiyun hourly JSON; if empty, read from stdin")
flag.StringVar(&tz, "tz", "Asia/Shanghai", "Timezone for output timestamps")
flag.StringVar(&mode, "mode", "csv", "Output mode: csv | sql")
flag.StringVar(&alias, "alias", "", "Station alias for SQL output (required for mode=sql)")
flag.Float64Var(&lat, "lat", 0, "Latitude for SQL output")
flag.Float64Var(&lon, "lon", 0, "Longitude for SQL output")
flag.StringVar(&sqlTable, "sqltable", "radar_weather", "SQL table name for inserts")
flag.Parse()
var r io.Reader
if file == "" {
r = bufio.NewReader(os.Stdin)
} else {
f, err := os.Open(file)
if err != nil {
fatalf("open file: %v", err)
}
defer f.Close()
r = f
}
var payload caiyunHourly
dec := json.NewDecoder(r)
if err := dec.Decode(&payload); err != nil {
fatalf("decode json: %v", err)
}
if strings.ToLower(payload.Status) != "ok" && payload.Status != "" {
fatalf("top-level status not ok: %s", payload.Status)
}
loc, _ := time.LoadLocation(tz)
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
// Merge series by timestamp
rowsByTime := map[time.Time]*row{}
upsert := func(ts string) *row {
t, ok := parseTime(ts, loc)
if !ok {
fatalf("parse time failed: %s", ts)
}
if v, exists := rowsByTime[t]; exists {
return v
}
nr := &row{t: t}
rowsByTime[t] = nr
return nr
}
for _, v := range payload.Result.Hourly.Temperature {
rr := upsert(v.Datetime)
rr.temperature = ptr(v.Value)
}
for _, v := range payload.Result.Hourly.Humidity {
rr := upsert(v.Datetime)
rr.humidity = ptr(v.Value)
}
for _, v := range payload.Result.Hourly.Visibility {
rr := upsert(v.Datetime)
rr.visibility = ptr(v.Value)
}
for _, v := range payload.Result.Hourly.Dswrf {
rr := upsert(v.Datetime)
rr.dswrf = ptr(v.Value)
}
for _, v := range payload.Result.Hourly.Pressure {
rr := upsert(v.Datetime)
rr.pressure = ptr(v.Value)
}
for _, w := range payload.Result.Hourly.Wind {
rr := upsert(w.Datetime)
rr.windSpeed = ptr(w.Speed)
rr.windDir = ptr(w.Direction)
}
// Sort by time
times := make([]time.Time, 0, len(rowsByTime))
for t := range rowsByTime {
times = append(times, t)
}
sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) })
if mode == "sql" {
if alias == "" {
fatalf("-alias is required for mode=sql")
}
// Emit upserts into radar_weather; convert wind_speed km/h -> m/s, keep humidity as ratio (0..1)
fmt.Println("BEGIN;")
for _, t := range times {
rr := rowsByTime[t]
// wind speed conversion
var ws string
if rr.windSpeed != nil {
v := *rr.windSpeed / 3.6
ws = trimZeros(fmt.Sprintf("%.6f", v))
}
// Build SQL with NULLs where missing
q := fmt.Sprintf(
"INSERT INTO %s (alias, lat, lon, dt, temperature, humidity, cloudrate, visibility, dswrf, wind_speed, wind_direction, pressure) "+
"VALUES (%s, %s, %s, %s, %s, %s, NULL, %s, %s, %s, %s, %s) "+
"ON CONFLICT (alias, dt) DO UPDATE SET "+
"lat=EXCLUDED.lat, lon=EXCLUDED.lon, temperature=EXCLUDED.temperature, humidity=EXCLUDED.humidity, "+
"visibility=EXCLUDED.visibility, dswrf=EXCLUDED.dswrf, wind_speed=EXCLUDED.wind_speed, "+
"wind_direction=EXCLUDED.wind_direction, pressure=EXCLUDED.pressure;",
sqlTable,
sqlQuote(alias),
sqlNum(lat),
sqlNum(lon),
sqlTime(t),
sqlOpt(rr.temperature),
sqlOpt(rr.humidity),
sqlOpt(rr.visibility),
sqlOpt(rr.dswrf),
sqlStrOrNull(ws),
sqlOpt(rr.windDir),
sqlOpt(rr.pressure),
)
fmt.Println(q)
}
fmt.Println("COMMIT;")
return
}
// CSV output
w := csv.NewWriter(os.Stdout)
_ = w.Write([]string{"datetime", "temperature", "humidity", "wind_speed", "wind_direction", "pressure", "visibility", "dswrf"})
for _, t := range times {
rr := rowsByTime[t]
var ws string
if rr.windSpeed != nil {
v := *rr.windSpeed / 3.6
ws = trimZeros(fmt.Sprintf("%.6f", v))
}
rec := []string{
t.Format("2006-01-02 15:04:05"),
optf(rr.temperature),
optf(rr.humidity),
ws,
optf(rr.windDir),
optf(rr.pressure),
optf(rr.visibility),
optf(rr.dswrf),
}
_ = w.Write(rec)
}
w.Flush()
if err := w.Error(); err != nil {
fatalf("write csv: %v", err)
}
}
func ptr(f float64) *float64 { return &f }
func optf(p *float64) string {
if p == nil {
return ""
}
// Trim trailing zeros via fmt
return trimZeros(fmt.Sprintf("%.6f", *p))
}
func trimZeros(s string) string {
if !strings.Contains(s, ".") {
return s
}
s = strings.TrimRight(s, "0")
s = strings.TrimRight(s, ".")
return s
}
// parseTime attempts RFC3339 and common Caiyun formats without seconds.
func parseTime(s string, loc *time.Location) (time.Time, bool) {
// Try RFC3339 first
if t, err := time.Parse(time.RFC3339, s); err == nil {
return t.In(loc), true
}
// Try without seconds, with offset, e.g. 2006-01-02T15:04+08:00
if t, err := time.Parse("2006-01-02T15:04-07:00", s); err == nil {
return t.In(loc), true
}
// Try without offset (assume loc)
if t, err := time.ParseInLocation("2006-01-02 15:04", s, loc); err == nil {
return t.In(loc), true
}
return time.Time{}, false
}
func fatalf(format string, args ...any) {
fmt.Fprintf(os.Stderr, format+"\n", args...)
os.Exit(1)
}
func sqlQuote(s string) string {
return "'" + strings.ReplaceAll(s, "'", "''") + "'"
}
func sqlNum(f float64) string {
return trimZeros(fmt.Sprintf("%.8f", f))
}
func sqlTime(t time.Time) string {
return sqlQuote(t.Format("2006-01-02 15:04:05"))
}
func sqlOpt(p *float64) string {
if p == nil {
return "NULL"
}
return trimZeros(fmt.Sprintf("%.6f", *p))
}
func sqlStrOrNull(s string) string {
if s == "" {
return "NULL"
}
return s
}

View File

@ -0,0 +1,448 @@
package main
import (
"context"
"database/sql"
"encoding/binary"
"encoding/csv"
"errors"
"flag"
"fmt"
"log"
"math"
"os"
"strings"
"time"
"weatherstation/internal/database"
)
type stationInfo struct {
ID string
Alias string
Lat float64
Lon float64
Z int
Y int
X int
}
type tileRec struct {
DT time.Time
Width, Height int
West, South float64
East, North float64
ResDeg float64
Data []byte
}
func main() {
var stationID string
var startStr string
var endStr string
var outPath string
var verbose bool
flag.StringVar(&stationID, "station_id", "", "站点ID留空表示全部WH65LP且有经纬度的站")
flag.StringVar(&startStr, "start", "", "起始时间YYYY-MM-DD HH:MM:SSCST")
flag.StringVar(&endStr, "end", "", "结束时间YYYY-MM-DD HH:MM:SSCST")
flag.StringVar(&outPath, "out", "radar_stats.csv", "输出CSV文件路径")
flag.BoolVar(&verbose, "info", false, "输出详细过程信息")
flag.Parse()
if strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" {
log.Fatalln("必须提供 --start 与 --end格式 YYYY-MM-DD HH:MM:SS")
}
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
startT, err := time.ParseInLocation("2006-01-02 15:04:05", startStr, loc)
if err != nil {
log.Fatalf("解析 start 失败: %v", err)
}
endT, err := time.ParseInLocation("2006-01-02 15:04:05", endStr, loc)
if err != nil {
log.Fatalf("解析 end 失败: %v", err)
}
if !endT.After(startT) {
log.Fatalln("结束时间必须大于起始时间")
}
// 初始化数据库
_ = database.GetDB()
defer database.Close()
// 获取站点列表
stations, err := listStations(database.GetDB(), stationID)
if err != nil {
log.Fatalf("查询站点失败: %v", err)
}
if len(stations) == 0 {
log.Fatalln("没有符合条件的站点")
}
if verbose {
log.Printf("站点数量: %d", len(stations))
for _, s := range stations {
log.Printf("站点: id=%s alias=%s lat=%.5f lon=%.5f z/y/x=%d/%d/%d", s.ID, s.Alias, s.Lat, s.Lon, s.Z, s.Y, s.X)
}
}
// 创建CSV
f, err := os.Create(outPath)
if err != nil {
log.Fatalf("创建输出文件失败: %v", err)
}
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
header := []string{
"station_id", "station_alias", "dt", "lat", "lon", "wind_speed_ms", "wind_dir_deg",
"sector_ge40_cnt", "sector_ge40_sum", "sector_ge30_cnt", "sector_ge30_sum",
"circle_ge40_cnt", "circle_ge40_sum", "circle_ge30_cnt", "circle_ge30_sum",
"rs485_rain_total_mm",
}
if err := w.Write(header); err != nil {
log.Fatalf("写入CSV表头失败: %v", err)
}
ctx := context.Background()
totalRows := 0
var totalTiles, skipNoZYX, skipNoWind, skipDecode int
for _, s := range stations {
if s.Z == 0 && s.Y == 0 && s.X == 0 {
log.Printf("跳过站点 %s无z/y/x映射", s.ID)
skipNoZYX++
continue
}
tiles, err := listTiles(ctx, database.GetDB(), s.Z, s.Y, s.X, startT, endT)
if err != nil {
log.Printf("查询瓦片失败 station=%s: %v", s.ID, err)
continue
}
totalTiles += len(tiles)
if verbose {
log.Printf("站点 %s 瓦片数量: %d", s.ID, len(tiles))
}
if len(tiles) == 0 {
log.Printf("站点 %s 在范围内无瓦片", s.ID)
}
for _, t := range tiles {
// 10分钟向下取整时间bucket
bucket := bucket10(t.DT, loc)
// NOTE: 按需改为用 station_id 匹配 radar_weather.alias
windSpeed, windDir, ok, err := loadWindAt(database.GetDB(), s.ID, bucket)
if err != nil {
log.Printf("读取风失败 %s @%s: %v", s.ID, t.DT.Format(time.RFC3339), err)
continue
}
if !ok { // 无风场:跳过该时次
skipNoWind++
if verbose {
log.Printf("跳过: %s 瓦片@%s桶=%s在 radar_weather(alias=%s) 无记录", s.ID, t.DT.In(loc).Format("2006-01-02 15:04:05"), bucket.Format("2006-01-02 15:04:05"), s.ID)
}
continue
}
// 解码 dBZ 网格
vals, xs, ys, err := decodeTile(t)
if err != nil {
log.Printf("解码瓦片失败 %s @%s: %v", s.ID, t.DT.Format(time.RFC3339), err)
skipDecode++
continue
}
// 统计
sec40Cnt, sec40Sum, sec30Cnt, sec30Sum,
cir40Cnt, cir40Sum, cir30Cnt, cir30Sum := computeStats(vals, xs, ys, s.Lat, s.Lon, windSpeed, windDir)
// 最近一条累计雨量
rainTotal, rainOK := loadNearestRain(database.GetDB(), s.ID, t.DT)
if verbose {
log.Printf("写出: %s dt=%s wind=%.3f m/s %.1f° 扇形(>=40:%d/%.1f >=30:%d/%.1f) 圆形(>=40:%d/%.1f >=30:%d/%.1f) rain_total=%v(%.3f)",
s.ID,
t.DT.In(loc).Format("2006-01-02 15:04:05"),
windSpeed, windDir,
sec40Cnt, sec40Sum, sec30Cnt, sec30Sum,
cir40Cnt, cir40Sum, cir30Cnt, cir30Sum,
rainOK, rainTotal,
)
}
rec := []string{
s.ID,
s.Alias,
t.DT.In(loc).Format("2006-01-02 15:04:05"),
fmt.Sprintf("%.6f", s.Lat),
fmt.Sprintf("%.6f", s.Lon),
fmt.Sprintf("%.3f", windSpeed),
fmt.Sprintf("%.2f", windDir),
fmt.Sprintf("%d", sec40Cnt),
fmt.Sprintf("%.1f", sec40Sum),
fmt.Sprintf("%d", sec30Cnt),
fmt.Sprintf("%.1f", sec30Sum),
fmt.Sprintf("%d", cir40Cnt),
fmt.Sprintf("%.1f", cir40Sum),
fmt.Sprintf("%d", cir30Cnt),
fmt.Sprintf("%.1f", cir30Sum),
fmt.Sprintf("%.3f", rainTotal),
}
if err := w.Write(rec); err != nil {
log.Printf("写入CSV失败: %v", err)
}
totalRows++
}
}
w.Flush()
if err := w.Error(); err != nil {
log.Fatalf("写入CSV失败: %v", err)
}
if verbose {
log.Printf("汇总: 站点数=%d 瓦片总数=%d 跳过(无z/y/x)=%d 跳过(无风)=%d 跳过(解码失败)=%d", len(stations), totalTiles, skipNoZYX, skipNoWind, skipDecode)
}
log.Printf("完成,输出 %d 行到 %s", totalRows, outPath)
}
func listStations(db *sql.DB, stationID string) ([]stationInfo, error) {
// 与前端一致device_type='WH65LP' 且 lat/lon 非空且非零
if strings.TrimSpace(stationID) != "" {
const q = `
SELECT station_id,
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
latitude, longitude,
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
FROM stations
WHERE device_type='WH65LP' AND station_id=$1
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude<>0 AND longitude<>0`
var s stationInfo
err := db.QueryRow(q, stationID).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
return []stationInfo{s}, nil
}
const qAll = `
SELECT station_id,
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
latitude, longitude,
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
FROM stations
WHERE device_type='WH65LP'
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude<>0 AND longitude<>0
ORDER BY station_id`
rows, err := db.Query(qAll)
if err != nil {
return nil, err
}
defer rows.Close()
var out []stationInfo
for rows.Next() {
var s stationInfo
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err == nil {
out = append(out, s)
}
}
return out, nil
}
func listTiles(ctx context.Context, db *sql.DB, z, y, x int, from, to time.Time) ([]tileRec, error) {
const q = `
SELECT dt, width, height, west, south, east, north, res_deg, data
FROM radar_tiles
WHERE z=$1 AND y=$2 AND x=$3 AND dt BETWEEN $4 AND $5
ORDER BY dt ASC`
rows, err := db.QueryContext(ctx, q, z, y, x, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var out []tileRec
for rows.Next() {
var r tileRec
if err := rows.Scan(&r.DT, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err == nil {
out = append(out, r)
}
}
return out, nil
}
func bucket10(t time.Time, loc *time.Location) time.Time {
tt := t.In(loc)
m := (tt.Minute() / 10) * 10
return time.Date(tt.Year(), tt.Month(), tt.Day(), tt.Hour(), m, 0, 0, loc)
}
// loadWindAt 以别名(alias)精确匹配 radar_weather本导出按需传入 station_id 作为 alias 参数
func loadWindAt(db *sql.DB, alias string, dt time.Time) (speedMS float64, dirDeg float64, ok bool, err error) {
const q = `
SELECT wind_speed, wind_direction
FROM radar_weather
WHERE alias=$1 AND dt=$2
LIMIT 1`
var s, d sql.NullFloat64
err = db.QueryRow(q, alias, dt).Scan(&s, &d)
if err == sql.ErrNoRows {
return 0, 0, false, nil
}
if err != nil {
return 0, 0, false, err
}
if !s.Valid || !d.Valid {
return 0, 0, false, nil
}
return s.Float64, d.Float64, true, nil
}
func loadNearestRain(db *sql.DB, stationID string, dt time.Time) (rainTotal float64, ok bool) {
// 取最近一条累计雨量单位mm。如不存在返回0,false
const q = `
SELECT rainfall
FROM rs485_weather_data
WHERE station_id=$1
ORDER BY ABS(EXTRACT(EPOCH FROM (timestamp - $2))) ASC
LIMIT 1`
var r sql.NullFloat64
if err := db.QueryRow(q, stationID, dt).Scan(&r); err != nil {
return 0, false
}
if !r.Valid {
return 0, false
}
return r.Float64, true
}
func decodeTile(t tileRec) (vals [][]*float64, xs []float64, ys []float64, err error) {
w, h := t.Width, t.Height
if w <= 0 || h <= 0 {
return nil, nil, nil, fmt.Errorf("非法尺寸")
}
if len(t.Data) < w*h*2 {
return nil, nil, nil, fmt.Errorf("数据长度不足")
}
xs = make([]float64, w)
for c := 0; c < w; c++ {
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
}
ys = make([]float64, h)
for r := 0; r < h; r++ {
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
}
vals = make([][]*float64, h)
off := 0
for r := 0; r < h; r++ {
row := make([]*float64, w)
for c := 0; c < w; c++ {
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
off += 2
if v >= 32766 {
row[c] = nil
continue
}
dbz := float64(v) / 10.0
if dbz < 0 {
dbz = 0
} else if dbz > 75 {
dbz = 75
}
vv := dbz
row[c] = &vv
}
vals[r] = row
}
return vals, xs, ys, nil
}
func computeStats(vals [][]*float64, xs, ys []float64, stLat, stLon, windMS, windFromDeg float64) (
sec40Cnt int, sec40Sum float64, sec30Cnt int, sec30Sum float64,
cir40Cnt int, cir40Sum float64, cir30Cnt int, cir30Sum float64,
) {
h := len(vals)
if h == 0 {
return
}
w := len(vals[0])
// 半径(米)与半角
halfAngle := 30.0
rangeM := windMS * 3 * 3600
circleR := 8000.0
for r := 0; r < h; r++ {
lat := ys[r]
row := vals[r]
for c := 0; c < w; c++ {
if row[c] == nil {
continue
}
dbz := *row[c]
lon := xs[c]
dist := haversine(stLat, stLon, lat, lon)
// 8km 圆
if dist <= circleR {
if dbz >= 40 {
cir40Cnt++
cir40Sum += dbz
}
if dbz >= 30 {
cir30Cnt++
cir30Sum += dbz
}
}
// 扇形(需同时满足距离与角度)
if dist <= rangeM {
brg := bearingDeg(stLat, stLon, lat, lon)
if angDiff(brg, windFromDeg) <= halfAngle {
if dbz >= 40 {
sec40Cnt++
sec40Sum += dbz
}
if dbz >= 30 {
sec30Cnt++
sec30Sum += dbz
}
}
}
}
}
return
}
func toRad(d float64) float64 { return d * math.Pi / 180 }
func toDeg(r float64) float64 { return r * 180 / math.Pi }
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
const R = 6371000.0
dLat := toRad(lat2 - lat1)
dLon := toRad(lon2 - lon1)
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
return R * c
}
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
φ1 := toRad(lat1)
φ2 := toRad(lat2)
Δλ := toRad(lon2 - lon1)
y := math.Sin(Δλ) * math.Cos(φ2)
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
brg := toDeg(math.Atan2(y, x))
if brg < 0 {
brg += 360
}
return brg
}
func angDiff(a, b float64) float64 {
d := math.Mod(a-b+540, 360) - 180
if d < 0 {
d = -d
}
return math.Abs(d)
}

View File

@ -0,0 +1,553 @@
package main
import (
"context"
"database/sql"
"encoding/binary"
"encoding/csv"
"errors"
"flag"
"fmt"
"log"
"math"
"os"
"strings"
"time"
"weatherstation/internal/database"
)
type stationInfo struct {
ID string
Alias string
Lat float64
Lon float64
Z int
Y int
X int
}
type tileRec struct {
DT time.Time
Width, Height int
West, South float64
East, North float64
ResDeg float64
Data []byte
}
func main() {
var stationID string
var startStr string
var endStr string
var outPath string
var verbose bool
var useImdroid bool
flag.StringVar(&stationID, "station_id", "", "站点ID留空表示全部符合条件的站")
flag.StringVar(&startStr, "start", "", "起始时间YYYY-MM-DD HH:MM:SSCST表示区间左端点")
flag.StringVar(&endStr, "end", "", "结束时间YYYY-MM-DD HH:MM:SSCST表示区间左端点非包含")
flag.StringVar(&outPath, "out", "radar_hourly_stats.csv", "输出CSV文件路径")
flag.BoolVar(&verbose, "info", false, "输出详细过程信息")
flag.BoolVar(&useImdroid, "use_imdroid", false, "输出 imdroid 预报(右端点)")
flag.Parse()
if strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" {
log.Fatalln("必须提供 --start 与 --end格式 YYYY-MM-DD HH:MM:SS")
}
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
startT, err := time.ParseInLocation("2006-01-02 15:04:05", startStr, loc)
if err != nil {
log.Fatalf("解析 start 失败: %v", err)
}
endT, err := time.ParseInLocation("2006-01-02 15:04:05", endStr, loc)
if err != nil {
log.Fatalf("解析 end 失败: %v", err)
}
if !endT.After(startT) {
log.Fatalln("结束时间必须大于起始时间")
}
_ = database.GetDB()
defer database.Close()
stations, err := listStations(database.GetDB(), stationID)
if err != nil {
log.Fatalf("查询站点失败: %v", err)
}
if len(stations) == 0 {
log.Fatalln("没有符合条件的站点")
}
if verbose {
log.Printf("站点数量: %d", len(stations))
for _, s := range stations {
log.Printf("站点: id=%s alias=%s lat=%.5f lon=%.5f z/y/x=%d/%d/%d", s.ID, s.Alias, s.Lat, s.Lon, s.Z, s.Y, s.X)
}
}
f, err := os.Create(outPath)
if err != nil {
log.Fatalf("创建输出文件失败: %v", err)
}
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
header := []string{
"station_id",
"station_alias",
"hour_end",
"rain_actual_mm",
"wind_speed_ms",
"wind_dir_deg",
"openmeteo_rain_mm",
"openmeteo_issued",
"caiyun_rain_mm",
"caiyun_issued",
}
if useImdroid {
header = append(header, "imdroid_rain_mm", "imdroid_issued")
}
header = append(header, "radar_circle_max_dbz", "radar_sector_max_dbz")
if err := w.Write(header); err != nil {
log.Fatalf("写入CSV表头失败: %v", err)
}
ctx := context.Background()
totalRows := 0
hours := buildHourSlots(startT, endT)
for _, s := range stations {
if verbose {
log.Printf("处理站点 %s共 %d 个小时区间", s.ID, len(hours))
}
for _, slot := range hours {
actual, windSpeed, windDir, hasObs, err := aggregateHourlyObs(ctx, database.GetDB(), s.ID, slot.from, slot.to)
if err != nil {
log.Printf("站点 %s 聚合观测失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err)
continue
}
openRain, openIssued, hasOpen, err := findLatestForecast(ctx, database.GetDB(), s.ID, "open-meteo", slot.to)
if err != nil {
log.Printf("站点 %s 读取 open-meteo 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err)
}
caiyunRain, caiyunIssued, hasCaiyun, err := findLatestForecast(ctx, database.GetDB(), s.ID, "caiyun", slot.to)
if err != nil {
log.Printf("站点 %s 读取 caiyun 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err)
}
var (
imdroidRain float64
imdroidIssued time.Time
hasImdroid bool
)
if useImdroid {
var errImdroid error
imdroidRain, imdroidIssued, hasImdroid, errImdroid = findLatestForecast(ctx, database.GetDB(), s.ID, "imdroid", slot.to)
if errImdroid != nil {
log.Printf("站点 %s 读取 imdroid 预报失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), errImdroid)
}
}
circleMax, sectorMax, hasRadar, err := hourlyRadarMax(ctx, database.GetDB(), s, slot.from, slot.to, loc, verbose)
if err != nil {
log.Printf("站点 %s 统计雷达失败 @%s: %v", s.ID, slot.to.Format(time.RFC3339), err)
}
rec := []string{
s.ID,
s.Alias,
slot.to.Format("2006-01-02 15:04:05"),
formatFloat(actual, hasObs, 3),
formatFloat(windSpeed, hasObs && !math.IsNaN(windSpeed), 3),
formatFloat(windDir, hasObs && !math.IsNaN(windDir), 1),
formatFloat(openRain, hasOpen, 3),
formatTime(openIssued, hasOpen),
formatFloat(caiyunRain, hasCaiyun, 3),
formatTime(caiyunIssued, hasCaiyun),
}
if useImdroid {
rec = append(rec,
formatFloat(imdroidRain, hasImdroid, 3),
formatTime(imdroidIssued, hasImdroid),
)
}
rec = append(rec,
formatFloat(circleMax, hasRadar && !math.IsNaN(circleMax), 1),
formatFloat(sectorMax, hasRadar && !math.IsNaN(sectorMax), 1),
)
if err := w.Write(rec); err != nil {
log.Printf("写入CSV失败: %v", err)
} else {
totalRows++
}
}
}
w.Flush()
if err := w.Error(); err != nil {
log.Fatalf("写入CSV失败: %v", err)
}
log.Printf("完成,输出 %d 行到 %s", totalRows, outPath)
}
type hourSlot struct {
from time.Time
to time.Time
}
func buildHourSlots(from, to time.Time) []hourSlot {
var slots []hourSlot
cursor := from
for cursor.Before(to) {
end := cursor.Add(time.Hour)
if end.After(to) {
end = to
}
slots = append(slots, hourSlot{from: cursor, to: end})
cursor = end
}
return slots
}
func listStations(db *sql.DB, stationID string) ([]stationInfo, error) {
if strings.TrimSpace(stationID) != "" {
const q = `
SELECT station_id,
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
latitude, longitude,
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
FROM stations
WHERE station_id=$1
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude<>0 AND longitude<>0
AND COALESCE(z,0)=7 AND COALESCE(y,0)=40 AND COALESCE(x,0)=102`
var s stationInfo
err := db.QueryRow(q, stationID).Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
}
return []stationInfo{s}, nil
}
const qAll = `
SELECT station_id,
CASE WHEN COALESCE(station_alias,'')='' THEN station_id ELSE station_alias END AS alias,
latitude, longitude,
COALESCE(z,0), COALESCE(y,0), COALESCE(x,0)
FROM stations
WHERE device_type='WH65LP'
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude<>0 AND longitude<>0
AND COALESCE(z,0)=7 AND COALESCE(y,0)=40 AND COALESCE(x,0)=102
ORDER BY station_id`
rows, err := db.Query(qAll)
if err != nil {
return nil, err
}
defer rows.Close()
var out []stationInfo
for rows.Next() {
var s stationInfo
if err := rows.Scan(&s.ID, &s.Alias, &s.Lat, &s.Lon, &s.Z, &s.Y, &s.X); err == nil {
out = append(out, s)
}
}
return out, nil
}
func aggregateHourlyObs(ctx context.Context, db *sql.DB, stationID string, from, to time.Time) (rain float64, windSpeed float64, windDir float64, ok bool, err error) {
const q = `
SELECT wind_speed_ms_x1000, wind_dir_deg, rain_10m_mm_x1000
FROM rs485_weather_10min
WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
rows, err := db.QueryContext(ctx, q, stationID, from, to)
if err != nil {
return 0, 0, 0, false, err
}
defer rows.Close()
var totalRain int64
var count int
var sumX, sumY float64
for rows.Next() {
var ws sql.NullInt64
var wd sql.NullInt64
var rainX sql.NullInt64
if err := rows.Scan(&ws, &wd, &rainX); err != nil {
return 0, 0, 0, false, err
}
if rainX.Valid {
totalRain += rainX.Int64
}
if ws.Valid && wd.Valid {
speed := float64(ws.Int64) / 1000.0
dir := float64(wd.Int64)
rad := toRad(dir)
sumX += speed * math.Cos(rad)
sumY += speed * math.Sin(rad)
count++
}
}
if err := rows.Err(); err != nil {
return 0, 0, 0, false, err
}
rain = float64(totalRain) / 1000.0
windSpeed = math.NaN()
windDir = math.NaN()
if count > 0 {
avgX := sumX / float64(count)
avgY := sumY / float64(count)
windSpeed = math.Hypot(avgX, avgY)
if windSpeed == 0 {
windDir = 0
} else {
dir := toDeg(math.Atan2(avgY, avgX))
if dir < 0 {
dir += 360
}
windDir = dir
}
ok = true
}
return rain, windSpeed, windDir, totalRain > 0 || count > 0, nil
}
func findLatestForecast(ctx context.Context, db *sql.DB, stationID, provider string, forecastTime time.Time) (rain float64, issued time.Time, ok bool, err error) {
const q = `
SELECT issued_at, rain_mm_x1000
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND forecast_time=$3
ORDER BY issued_at DESC
LIMIT 1`
var issuedAt time.Time
var rainX sql.NullInt64
err = db.QueryRowContext(ctx, q, stationID, provider, forecastTime).Scan(&issuedAt, &rainX)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, time.Time{}, false, nil
}
return 0, time.Time{}, false, err
}
if !rainX.Valid {
return 0, issuedAt, true, nil
}
return float64(rainX.Int64) / 1000.0, issuedAt, true, nil
}
func hourlyRadarMax(ctx context.Context, db *sql.DB, s stationInfo, from, to time.Time, loc *time.Location, verbose bool) (circleMax float64, sectorMax float64, ok bool, err error) {
tiles, err := listTiles(ctx, db, s.Z, s.Y, s.X, from, to)
if err != nil {
return math.NaN(), math.NaN(), false, err
}
if len(tiles) == 0 {
return math.NaN(), math.NaN(), false, nil
}
circleMax = math.NaN()
sectorMax = math.NaN()
alias := strings.TrimSpace(s.Alias)
if alias == "" {
alias = s.ID
}
for _, t := range tiles {
bucket := bucket10(t.DT, loc)
windSpeed, windDir, windOK, err := loadWindAt(db, s.ID, alias, bucket)
if err != nil {
if verbose {
log.Printf("站点 %s 瓦片@%s 读取风失败: %v", s.ID, t.DT.Format(time.RFC3339), err)
}
continue
}
vals, xs, ys, err := decodeTile(t)
if err != nil {
if verbose {
log.Printf("站点 %s 解码瓦片失败: %v", s.ID, err)
}
continue
}
for r := 0; r < len(vals); r++ {
row := vals[r]
lat := ys[r]
for c := 0; c < len(row); c++ {
v := row[c]
if v == nil {
continue
}
dbz := *v
lon := xs[c]
dist := haversine(s.Lat, s.Lon, lat, lon)
if dist <= 8000.0 {
if math.IsNaN(circleMax) || dbz > circleMax {
circleMax = dbz
}
}
if windOK && windSpeed > 0 {
brg := bearingDeg(s.Lat, s.Lon, lat, lon)
if angDiff(brg, windDir) <= 30.0 && dist <= windSpeed*3*3600 {
if math.IsNaN(sectorMax) || dbz > sectorMax {
sectorMax = dbz
}
}
}
}
}
}
return circleMax, sectorMax, !math.IsNaN(circleMax) || !math.IsNaN(sectorMax), nil
}
func listTiles(ctx context.Context, db *sql.DB, z, y, x int, from, to time.Time) ([]tileRec, error) {
const q = `
SELECT dt, width, height, west, south, east, north, res_deg, data
FROM radar_tiles
WHERE z=$1 AND y=$2 AND x=$3 AND dt >= $4 AND dt < $5
ORDER BY dt ASC`
rows, err := db.QueryContext(ctx, q, z, y, x, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var out []tileRec
for rows.Next() {
var r tileRec
if err := rows.Scan(&r.DT, &r.Width, &r.Height, &r.West, &r.South, &r.East, &r.North, &r.ResDeg, &r.Data); err == nil {
out = append(out, r)
}
}
return out, nil
}
func bucket10(t time.Time, loc *time.Location) time.Time {
tt := t.In(loc)
m := (tt.Minute() / 10) * 10
return time.Date(tt.Year(), tt.Month(), tt.Day(), tt.Hour(), m, 0, 0, loc)
}
func loadWindAt(db *sql.DB, stationID, alias string, dt time.Time) (speedMS float64, dirDeg float64, ok bool, err error) {
const q = `
SELECT wind_speed, wind_direction
FROM radar_weather
WHERE alias=$1 AND dt=$2
LIMIT 1`
tryAlias := func(a string) (float64, float64, bool, error) {
var s, d sql.NullFloat64
err := db.QueryRow(q, a, dt).Scan(&s, &d)
if err == sql.ErrNoRows {
return 0, 0, false, nil
}
if err != nil {
return 0, 0, false, err
}
if !s.Valid || !d.Valid {
return 0, 0, false, nil
}
return s.Float64, d.Float64, true, nil
}
if speed, dir, ok, err := tryAlias(stationID); err != nil {
return 0, 0, false, err
} else if ok {
return speed, dir, true, nil
}
return tryAlias(alias)
}
func decodeTile(t tileRec) (vals [][]*float64, xs []float64, ys []float64, err error) {
w, h := t.Width, t.Height
if w <= 0 || h <= 0 {
return nil, nil, nil, fmt.Errorf("非法尺寸")
}
if len(t.Data) < w*h*2 {
return nil, nil, nil, fmt.Errorf("数据长度不足")
}
xs = make([]float64, w)
for c := 0; c < w; c++ {
xs[c] = t.West + (float64(c)+0.5)*t.ResDeg
}
ys = make([]float64, h)
for r := 0; r < h; r++ {
ys[r] = t.South + (float64(r)+0.5)*t.ResDeg
}
vals = make([][]*float64, h)
off := 0
for r := 0; r < h; r++ {
row := make([]*float64, w)
for c := 0; c < w; c++ {
v := int16(binary.BigEndian.Uint16(t.Data[off : off+2]))
off += 2
if v >= 32766 {
row[c] = nil
continue
}
dbz := float64(v) / 10.0
if dbz < 0 {
dbz = 0
} else if dbz > 75 {
dbz = 75
}
value := dbz
row[c] = &value
}
vals[r] = row
}
return vals, xs, ys, nil
}
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
const R = 6371000.0
dLat := toRad(lat2 - lat1)
dLon := toRad(lon2 - lon1)
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
return R * c
}
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
φ1 := toRad(lat1)
φ2 := toRad(lat2)
Δλ := toRad(lon2 - lon1)
y := math.Sin(Δλ) * math.Cos(φ2)
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
brg := toDeg(math.Atan2(y, x))
if brg < 0 {
brg += 360
}
return brg
}
func angDiff(a, b float64) float64 {
d := math.Mod(a-b+540, 360) - 180
if d < 0 {
d = -d
}
return math.Abs(d)
}
func toRad(d float64) float64 { return d * math.Pi / 180 }
func toDeg(r float64) float64 { return r * 180 / math.Pi }
func formatFloat(v float64, ok bool, digits int) string {
if !ok || math.IsNaN(v) {
return ""
}
format := fmt.Sprintf("%%.%df", digits)
return fmt.Sprintf(format, v)
}
func formatTime(t time.Time, ok bool) string {
if !ok || t.IsZero() {
return ""
}
return t.Format("2006-01-02 15:04:05")
}

227
cmd/rainfetch/main.go Normal file
View File

@ -0,0 +1,227 @@
package main
import (
"context"
"database/sql"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
dbpkg "weatherstation/internal/database"
"weatherstation/internal/rain"
)
// 简单的小时雨量CMPA按时间范围下载器
// - 输入时间为北京时间Asia/Shanghai
// - 构造下载路径使用 UTC本地整点 -8h
// - 入库前通过 rain.StoreTileBytes 使用 URL 解析将 UTC 还原为北京时间并写库
// 用法示例:
//
// go run ./cmd/rainfetch --from "2025-10-07 09:00:00" --to "2025-10-07 11:00:00" \
// --tiles "7/40/102,7/40/104" --outdir rain_data
func main() {
var (
fromStr = flag.String("from", "", "起始时间北京时间YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD")
toStr = flag.String("to", "", "结束时间北京时间YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD")
tiles = flag.String("tiles", "7/40/102,7/40/104", "瓦片列表,逗号分隔,每项为 z/y/x如 7/40/102")
outDir = flag.String("outdir", "rain_data", "保存目录(同时也会写入数据库)")
baseURL = flag.String("base", "https://image.data.cma.cn/tiles/China/CMPA_RT_China_0P01_HOR-PRE_GISJPG_Tiles/%Y%m%d/%H/%M/{z}/{y}/{x}.bin", "下载基础URL模板UTC路径时间")
dryRun = flag.Bool("dry", false, "仅打印将要下载的URL与目标不实际下载写库")
)
flag.Parse()
if strings.TrimSpace(*fromStr) == "" || strings.TrimSpace(*toStr) == "" {
log.Fatalln("必须提供 --from 与 --to北京时间")
}
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
parseCST := func(s string) (time.Time, error) {
s = strings.TrimSpace(s)
var t time.Time
var err error
if len(s) == len("2006-01-02") {
// 日期:按 00:00:00 处理
if tm, e := time.ParseInLocation("2006-01-02", s, loc); e == nil {
t = tm
} else {
err = e
}
} else {
t, err = time.ParseInLocation("2006-01-02 15:04:05", s, loc)
}
return t, err
}
start, err1 := parseCST(*fromStr)
end, err2 := parseCST(*toStr)
if err1 != nil || err2 != nil {
log.Fatalf("解析时间失败: from=%v to=%v", err1, err2)
}
if end.Before(start) {
log.Fatalln("结束时间需不小于起始时间")
}
// 小时步进(包含端点):先对齐到小时
cur := start.Truncate(time.Hour)
end = end.Truncate(time.Hour)
// 解析 tiles 参数
type tcoord struct{ z, y, x int }
var tlist []tcoord
for _, part := range strings.Split(*tiles, ",") {
p := strings.TrimSpace(part)
if p == "" {
continue
}
var z, y, x int
if _, err := fmt.Sscanf(p, "%d/%d/%d", &z, &y, &x); err != nil {
log.Fatalf("无效的 tiles 项: %s", p)
}
tlist = append(tlist, tcoord{z, y, x})
}
if len(tlist) == 0 {
log.Fatalln("tiles 解析后为空")
}
if err := os.MkdirAll(*outDir, 0o755); err != nil {
log.Fatalf("创建输出目录失败: %v", err)
}
ctx := context.Background()
total := 0
success := 0
for !cur.After(end) {
// 本地整点CST→ UTC 路径时间
slotLocal := cur
slotUTC := slotLocal.Add(-8 * time.Hour).In(time.UTC)
dateStr := slotUTC.Format("20060102")
hh := slotUTC.Format("15")
mm := "00"
log.Printf("[rainfetch] 时次 local=%s, utc=%s", slotLocal.Format("2006-01-02 15:04"), slotUTC.Format("2006-01-02 15:04"))
for _, tc := range tlist {
total++
// 构造 URL
url := *baseURL
url = strings.ReplaceAll(url, "%Y%m%d", dateStr)
url = strings.ReplaceAll(url, "%H", hh)
url = strings.ReplaceAll(url, "%M", mm)
url = strings.ReplaceAll(url, "{z}", fmt.Sprintf("%d", tc.z))
url = strings.ReplaceAll(url, "{y}", fmt.Sprintf("%d", tc.y))
url = strings.ReplaceAll(url, "{x}", fmt.Sprintf("%d", tc.x))
fname := fmt.Sprintf("rain_z%d_y%d_x%d_%s.bin", tc.z, tc.y, tc.x, slotLocal.Format("20060102_1504"))
dest := filepath.Join(*outDir, fname)
if *dryRun {
log.Printf("[rainfetch] DRY url=%s -> %s", url, dest)
continue
}
// 若 DB 已有,则跳过
if ref, e := rain.ParseCMPATileURL(url); e == nil {
exists, e2 := databaseHas(ctx, ref.Product, ref.DT, tc.z, tc.y, tc.x)
if e2 == nil && exists {
log.Printf("[rainfetch] skip exists in DB z=%d y=%d x=%d dt=%s", tc.z, tc.y, tc.x, ref.DT.Format("2006-01-02 15:04"))
continue
}
}
if err := httpDownloadTo(ctx, url, dest); err != nil {
log.Printf("[rainfetch] 下载失败 z=%d y=%d x=%d: %v", tc.z, tc.y, tc.x, err)
continue
}
log.Printf("[rainfetch] 保存 %s", dest)
// 写库(使用 URL 解析 UTC → CST 后 upsert
b, rerr := os.ReadFile(dest)
if rerr != nil {
log.Printf("[rainfetch] 读文件失败: %v", rerr)
continue
}
if err := rain.StoreTileBytes(ctx, url, b); err != nil {
log.Printf("[rainfetch] 入库失败: %v", err)
continue
}
success++
}
cur = cur.Add(1 * time.Hour)
}
log.Printf("[rainfetch] 完成:尝试 %d成功 %d", total, success)
}
// 轻量 DB 存在性检查(避免引入内部 database 包到该命令):
// 为了避免循环依赖,这里复制一份最小 SQL 调用;实际工程也可抽取共享函数。
// 但当前 repo 中 database.GetDB 在 internal/database 包内,雨量 API 直接使用它。
// 注意:为保持最小侵入,这里通过 rain.StoreTileBytes 完成入库;
// 仅在下载前进行“是否已存在”查询,避免重复下载。为此需要访问 internal/database。
func databaseHas(ctx context.Context, product string, dt time.Time, z, y, x int) (bool, error) {
const q = `SELECT 1 FROM rain_tiles WHERE product=$1 AND dt=$2 AND z=$3 AND y=$4 AND x=$5 LIMIT 1`
var one int
err := dbpkg.GetDB().QueryRowContext(ctx, q, product, dt, z, y, x).Scan(&one)
if err == sql.ErrNoRows {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
func httpDownloadTo(ctx context.Context, url, dest string) error {
client := &http.Client{Timeout: 20 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("build request: %w", err)
}
req.Header.Set("Referer", "https://data.cma.cn/")
req.Header.Set("Origin", "https://data.cma.cn")
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("http get: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
tmp := dest + ".part"
f, err := os.Create(tmp)
if err != nil {
return fmt.Errorf("create temp: %w", err)
}
_, copyErr := io.Copy(f, resp.Body)
closeErr := f.Close()
if copyErr != nil {
_ = os.Remove(tmp)
return fmt.Errorf("write body: %w", copyErr)
}
if closeErr != nil {
_ = os.Remove(tmp)
return fmt.Errorf("close temp: %w", closeErr)
}
if err := os.Rename(tmp, dest); err != nil {
// Cross-device fallback
data, rerr := os.ReadFile(tmp)
if rerr != nil {
return fmt.Errorf("read temp: %w", rerr)
}
if werr := os.WriteFile(dest, data, 0o644); werr != nil {
return fmt.Errorf("write final: %w", werr)
}
_ = os.Remove(tmp)
}
return nil
}

28
cmd/service-rain/main.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"weatherstation/internal/rain"
"weatherstation/internal/server"
)
// service-rain: standalone CMPA hourly rain tile downloader
// - Uses internal/rain scheduler with defaults
// - Controlled by env vars in internal/rain (e.g., RAIN_ENABLED, RAIN_DIR, RAIN_BASE_URL)
func main() {
server.SetupLogger()
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if err := rain.Start(ctx, rain.Options{StoreToDB: true}); err != nil {
log.Fatalf("service-rain start error: %v", err)
}
<-ctx.Done()
log.Println("service-rain shutting down")
}

View File

@ -0,0 +1,235 @@
package main
import (
"context"
"database/sql"
"encoding/csv"
"flag"
"fmt"
"log"
"os"
"strings"
"time"
"weatherstation/core/internal/data"
)
type actualHour struct {
HourEnd time.Time
TempC float64
HumidityPct float64
PressureHpa float64
WindSpeedMs float64
WindDirDeg float64
RainActualMM float64
}
func main() {
var stationID, startStr, endStr, providersCSV, outPath, tzName string
flag.StringVar(&stationID, "station", "", "站点ID如 RS485-002A6E")
flag.StringVar(&startStr, "start", "", "开始时间,格式 2006-01-02 15:00")
flag.StringVar(&endStr, "end", "", "结束时间,格式 2006-01-02 15:00开区间")
flag.StringVar(&providersCSV, "providers", "caiyun,ec,wrf", "逗号分隔的预报源,默认 caiyun,ec,wrf")
flag.StringVar(&outPath, "out", "", "输出 CSV 文件路径;留空输出到 stdout")
flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai")
flag.Parse()
if strings.TrimSpace(stationID) == "" || strings.TrimSpace(startStr) == "" || strings.TrimSpace(endStr) == "" {
log.Fatalf("用法: im_export_data --station RS485-XXXXXX --start '2024-08-01 00:00' --end '2024-08-02 00:00' [--providers caiyun,ec,wrf] [--out out.csv]")
}
loc, _ := time.LoadLocation(tzName)
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
parse := func(s string) time.Time {
var t time.Time
var err error
for _, ly := range []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} {
t, err = time.ParseInLocation(ly, s, loc)
if err == nil {
return t.Truncate(time.Hour)
}
}
log.Fatalf("无法解析时间: %s", s)
return time.Time{}
}
start := parse(startStr)
end := parse(endStr)
if !end.After(start) {
log.Fatalf("end 必须大于 start")
}
providers := splitCSV(providersCSV)
if len(providers) == 0 {
providers = []string{"caiyun"}
}
// Prepare writer
var out *csv.Writer
var file *os.File
if strings.TrimSpace(outPath) != "" {
f, err := os.Create(outPath)
if err != nil {
log.Fatalf("打开输出文件失败: %v", err)
}
defer f.Close()
out = csv.NewWriter(f)
file = f
} else {
out = csv.NewWriter(os.Stdout)
}
defer out.Flush()
// Header
header := []string{"station_id", "hour_end", "temp_c", "humidity_pct", "wind_dir_deg", "wind_speed_ms", "pressure_hpa", "rain_actual_mm"}
for _, p := range providers {
header = append(header, fmt.Sprintf("%s_lead1_rain_mm", p))
header = append(header, fmt.Sprintf("%s_lead2_rain_mm", p))
header = append(header, fmt.Sprintf("%s_lead3_rain_mm", p))
}
if err := out.Write(header); err != nil {
log.Fatalf("写入 CSV 失败: %v", err)
}
ctx := context.Background()
rows, err := loadActualHourly(ctx, stationID, start, end)
if err != nil {
log.Fatalf("查询实况失败: %v", err)
}
for _, row := range rows {
rec := []string{
stationID,
row.HourEnd.Format("2006-01-02 15:04:05"),
fmt.Sprintf("%.2f", row.TempC),
fmt.Sprintf("%.2f", row.HumidityPct),
fmt.Sprintf("%.2f", row.WindDirDeg),
fmt.Sprintf("%.3f", row.WindSpeedMs),
fmt.Sprintf("%.2f", row.PressureHpa),
fmt.Sprintf("%.3f", row.RainActualMM),
}
for _, p := range providers {
// For each lead 1..3, get rain for forecast_time = hour_end, latest issued_at for that lead
for lead := 1; lead <= 3; lead++ {
v, _ := loadProviderRainAt(ctx, stationID, p, row.HourEnd, lead)
if v < 0 {
rec = append(rec, "")
} else {
rec = append(rec, fmt.Sprintf("%.3f", v))
}
}
}
if err := out.Write(rec); err != nil {
log.Fatalf("写入 CSV 失败: %v", err)
}
}
out.Flush()
if err := out.Error(); err != nil {
log.Fatalf("写入 CSV 错误: %v", err)
}
if file != nil {
log.Printf("导出完成: %s共 %d 行", outPath, len(rows))
}
}
func splitCSV(s string) []string {
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}
func loadActualHourly(ctx context.Context, stationID string, start, end time.Time) ([]actualHour, error) {
// Right-endpoint hourly aggregation from rs485_weather_10min
const q = `
WITH base AS (
SELECT * FROM rs485_weather_10min
WHERE station_id = $1 AND bucket_start >= $2 AND bucket_start < $3
), g AS (
SELECT date_trunc('hour', bucket_start) AS grp,
SUM(temp_c_x100 * sample_count)::bigint AS w_temp,
SUM(humidity_pct * sample_count)::bigint AS w_hum,
SUM(pressure_hpa_x100 * sample_count)::bigint AS w_p,
SUM(solar_wm2_x100 * sample_count)::bigint AS w_solar,
SUM(uv_index * sample_count)::bigint AS w_uv,
SUM(wind_speed_ms_x1000 * sample_count)::bigint AS w_ws,
MAX(wind_gust_ms_x1000) AS gust_max,
SUM(sin(radians(wind_dir_deg)) * sample_count)::double precision AS sin_sum,
SUM(cos(radians(wind_dir_deg)) * sample_count)::double precision AS cos_sum,
SUM(rain_10m_mm_x1000) AS rain_sum,
SUM(sample_count) AS n_sum
FROM base GROUP BY 1
)
SELECT grp + interval '1 hour' AS hour_end,
(w_temp/NULLIF(n_sum,0))/100.0 AS temp_c,
(w_hum/NULLIF(n_sum,0))::double precision AS humidity_pct,
(w_p/NULLIF(n_sum,0))/100.0 AS pressure_hpa,
(w_ws/NULLIF(n_sum,0))/1000.0 AS wind_speed_ms,
CASE WHEN sin_sum IS NULL OR cos_sum IS NULL THEN NULL
ELSE (
CASE WHEN degrees(atan2(sin_sum, cos_sum)) < 0
THEN degrees(atan2(sin_sum, cos_sum)) + 360
ELSE degrees(atan2(sin_sum, cos_sum)) END)
END AS wind_dir_deg,
(rain_sum/1000.0) AS rain_mm
FROM g
ORDER BY hour_end`
rows, err := data.DB().QueryContext(ctx, q, stationID, start, end)
if err != nil {
return nil, err
}
defer rows.Close()
var out []actualHour
for rows.Next() {
var t time.Time
var ta, ua, pa, ws, dm, rain sql.NullFloat64
if err := rows.Scan(&t, &ta, &ua, &pa, &ws, &dm, &rain); err != nil {
continue
}
out = append(out, actualHour{
HourEnd: t,
TempC: nullF(ta),
HumidityPct: nullF(ua),
PressureHpa: nullF(pa),
WindSpeedMs: nullF(ws),
WindDirDeg: nullF(dm),
RainActualMM: nullF(rain),
})
}
return out, nil
}
// loadProviderRainAt returns rain(mm) for a provider at forecast_time=t with fixed lead, picking latest issued_at.
func loadProviderRainAt(ctx context.Context, stationID, provider string, t time.Time, lead int) (float64, error) {
const q = `
SELECT COALESCE(rain_mm_x1000,0)::bigint
FROM (
SELECT rain_mm_x1000, issued_at,
CEIL(EXTRACT(EPOCH FROM ($3 - issued_at)) / 3600.0)::int AS lead_hours
FROM forecast_hourly
WHERE station_id=$1 AND provider=$2 AND forecast_time=$3
) x
WHERE lead_hours=$4
ORDER BY issued_at DESC
LIMIT 1`
var v int64
err := data.DB().QueryRowContext(ctx, q, stationID, provider, t, lead).Scan(&v)
if err != nil {
return -1, err
}
return float64(v) / 1000.0, nil
}
func nullF(n sql.NullFloat64) float64 {
if n.Valid {
return n.Float64
}
return 0
}

View File

@ -0,0 +1,551 @@
package main
import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"math"
"sort"
"strings"
"time"
"weatherstation/core/internal/config"
"weatherstation/core/internal/data"
"weatherstation/core/internal/sms"
)
const (
providerForecast = "imdroid_mix"
alertTypeForecast = "forecast_3h_rain"
alertTypeActual30m = "actual_30m_rain"
alertTypeNeighbor30 = "actual_30m_neighbor"
levelRed = "red"
levelYellow = "yellow"
forecastRedMM = 8.0
forecastYellowMM = 4.0
actualRedMM = 4.0
actualYellowMM = 2.0
halfAngleDeg = 90.0
timeFormatShort = "2006-01-02 15:04"
defaultCheckTimeout = 20 * time.Second
)
var (
flagOnce bool
flagTest bool
flagTestStIDs string
flagTestTime string
flagWhy bool
)
func main() {
flag.BoolVar(&flagOnce, "once", false, "run checks once immediately (no scheduling)")
flag.BoolVar(&flagTest, "test", false, "force alerts regardless of thresholds (for dry-run)")
flag.StringVar(&flagTestStIDs, "station", "", "comma-separated station_id list for test mode")
flag.StringVar(&flagTestTime, "time", "", "test mode: specify end time (YYYY-MM-DD HH:MM:SS, CST)")
flag.BoolVar(&flagWhy, "why", false, "in test mode, log reasons when alert not triggered")
flag.Parse()
cfg := config.Load()
scli := mustInitSMS(cfg)
if flagOnce {
tick := time.Now()
runForecastCheck(scli, tick)
runActualCheck(scli, tick)
runNeighborActualCheck(scli, tick)
return
}
go alignAndRunHour10(func(tick time.Time) { runForecastCheck(scli, tick) })
go alignAndRunHalfHour(func(tick time.Time) { runActualCheck(scli, tick) })
go alignAndRunHalfHour(func(tick time.Time) { runNeighborActualCheck(scli, tick) })
select {}
}
func mustInitSMS(cfg config.Config) *sms.Client {
cli, err := sms.New(sms.Config{
AccessKeyID: strings.TrimSpace(cfg.SMS.AccessKeyID),
AccessKeySecret: strings.TrimSpace(cfg.SMS.AccessKeySecret),
SignName: strings.TrimSpace(cfg.SMS.SignName),
TemplateCode: strings.TrimSpace(cfg.SMS.TemplateCode),
Endpoint: strings.TrimSpace(cfg.SMS.Endpoint),
})
if err != nil {
log.Printf("sms: disabled (%v)", err)
return nil
}
return cli
}
func alignAndRunHour10(fn func(tick time.Time)) {
now := time.Now()
base := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 10, 0, 0, now.Location())
var next time.Time
if now.After(base) {
next = base.Add(time.Hour)
} else {
next = base
}
time.Sleep(time.Until(next))
for {
tick := time.Now().Truncate(time.Minute)
fn(tick)
time.Sleep(time.Hour)
}
}
func alignAndRunHalfHour(fn func(tick time.Time)) {
now := time.Now()
next := now.Truncate(30 * time.Minute).Add(30 * time.Minute)
time.Sleep(time.Until(next))
for {
tick := time.Now().Truncate(time.Minute)
fn(tick)
time.Sleep(30 * time.Minute)
}
}
func runForecastCheck(scli *sms.Client, tick time.Time) {
ctx, cancel := context.WithTimeout(context.Background(), defaultCheckTimeout)
defer cancel()
stations := listFixedStations(ctx)
stations = filterStations(stations, flagTestStIDs)
if len(stations) == 0 {
log.Printf("forecast: no stations after filter")
return
}
recip := loadRecipients(ctx)
loc := mustShanghai()
now := tick.In(loc)
issued := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, loc)
next1 := issued.Add(time.Hour)
next3 := issued.Add(3 * time.Hour)
for _, st := range stations {
points, err := data.ForecastRainAtIssued(ctx, st.ID, providerForecast, issued)
if err != nil {
log.Printf("forecast: query station=%s err=%v", st.ID, err)
continue
}
var redMax, yellowMax int64
for _, p := range points {
if p.ForecastTime.Before(next1) || p.ForecastTime.After(next3) {
continue
}
v := int64(p.RainMMx1000)
if v >= 8000 {
if v > redMax {
redMax = v
}
} else if v >= 4000 {
if v > yellowMax {
yellowMax = v
}
}
}
level := ""
value := 0.0
threshold := 0.0
if redMax > 0 {
level = levelRed
value = float64(redMax) / 1000.0
threshold = forecastRedMM
} else if yellowMax > 0 {
level = levelYellow
value = float64(yellowMax) / 1000.0
threshold = forecastYellowMM
}
if level == "" {
if !flagTest {
if flagWhy {
log.Printf("forecast why: station=%s no threshold hit redMax=%d yellowMax=%d", st.ID, redMax, yellowMax)
}
continue
}
level = levelYellow
value = forecastYellowMM
threshold = forecastYellowMM
}
if flagTest {
msg := fmt.Sprintf("【测试】站点%s 强制触发未来3小时降水预警 level=%s", st.Name, levelLabel(level))
targetPhones := recip.forLevel(level)
if len(targetPhones) == 0 {
recordAlert(ctx, alertTypeForecast, st.ID, level, issued, msg, sql.NullString{})
} else {
sendToPhones(ctx, scli, st.Name, value, level, issued, targetPhones, msg, alertTypeForecast, st.ID)
}
continue
}
msg := fmt.Sprintf("站点%s 未来3小时单小时最大降水 %.3fmm,达到%s阈值 %.1fmmissued_at=%s", st.Name, value, levelLabel(level), threshold, issued.Format(timeFormatShort))
targetPhones := recip.forLevel(level)
if len(targetPhones) == 0 {
recordAlert(ctx, alertTypeForecast, st.ID, level, issued, msg, sql.NullString{})
continue
}
sendToPhones(ctx, scli, st.Name, value, level, issued, targetPhones, msg, alertTypeForecast, st.ID)
}
}
func runActualCheck(scli *sms.Client, tick time.Time) {
ctx, cancel := context.WithTimeout(context.Background(), defaultCheckTimeout)
defer cancel()
stations := listFixedStations(ctx)
stations = filterStations(stations, flagTestStIDs)
if len(stations) == 0 {
log.Printf("actual: no stations after filter")
return
}
recip := loadRecipients(ctx)
loc := mustShanghai()
end := tick.In(loc)
if flagTestTime != "" {
if t, err := time.ParseInLocation("2006-01-02 15:04:05", flagTestTime, loc); err == nil {
end = t
}
}
start := end.Add(-30 * time.Minute)
for _, st := range stations {
rain, ok, err := data.SumRainMM(ctx, st.ID, start, end)
if err != nil {
log.Printf("actual: sum station=%s err=%v", st.ID, err)
continue
}
if flagWhy {
log.Printf("actual why: station=%s window=%s~%s rain_sum=%.3f ok=%v", st.ID, start.Format(timeFormatShort), end.Format(timeFormatShort), rain, ok)
}
if !ok {
if flagWhy {
log.Printf("actual why: station=%s no rain data", st.ID)
}
continue
}
level := ""
threshold := 0.0
if rain >= actualRedMM {
level = levelRed
threshold = actualRedMM
} else if rain >= actualYellowMM {
level = levelYellow
threshold = actualYellowMM
}
if level == "" {
if flagWhy {
log.Printf("actual why: station=%s rain=%.3f below threshold", st.ID, rain)
}
if !flagTest {
continue
}
level = levelYellow
threshold = actualYellowMM
rain = actualYellowMM
}
if flagTest {
msg := fmt.Sprintf("【测试】站点%s 强制触发30分钟降水预警 level=%s", st.Name, levelLabel(level))
targetPhones := recip.forLevel(level)
if len(targetPhones) == 0 {
recordAlert(ctx, alertTypeActual30m, st.ID, level, end, msg, sql.NullString{})
} else {
sendToPhones(ctx, scli, st.Name, rain, level, end, targetPhones, msg, alertTypeActual30m, st.ID)
}
continue
}
msg := fmt.Sprintf("站点%s 过去30分钟降水 %.3fmm,达到%s阈值 %.1fmm,窗口 %s - %s", st.Name, rain, levelLabel(level), threshold, start.Format(timeFormatShort), end.Format(timeFormatShort))
targetPhones := recip.forLevel(level)
if len(targetPhones) == 0 {
recordAlert(ctx, alertTypeActual30m, st.ID, level, end, msg, sql.NullString{})
log.Printf("actual: triggered station=%s level=%s rain=%.3f (no phones)", st.ID, level, rain)
continue
}
sendToPhones(ctx, scli, st.Name, rain, level, end, targetPhones, msg, alertTypeActual30m, st.ID)
log.Printf("actual: triggered station=%s level=%s rain=%.3f phones=%d", st.ID, level, rain, len(targetPhones))
}
}
func runNeighborActualCheck(scli *sms.Client, tick time.Time) {
ctx, cancel := context.WithTimeout(context.Background(), defaultCheckTimeout)
defer cancel()
allStations := listFixedStations(ctx)
centers := filterStations(allStations, flagTestStIDs)
if len(centers) == 0 {
if flagWhy {
log.Printf("neighbor why: no stations after filter")
}
return
}
recip := loadRecipients(ctx)
loc := mustShanghai()
end := tick.In(loc)
if flagTestTime != "" {
if t, err := time.ParseInLocation("2006-01-02 15:04:05", flagTestTime, loc); err == nil {
end = t
}
}
start := end.Add(-30 * time.Minute)
for _, center := range centers {
if center.Latitude == 0 || center.Longitude == 0 {
if flagWhy {
log.Printf("neighbor why: center %s missing lat/lon", center.ID)
}
continue
}
wind, err := data.RadarWeatherNearest(center.Latitude, center.Longitude, end, 6*time.Hour)
if err != nil {
log.Printf("neighbor: wind query failed station=%s: %v", center.ID, err)
continue
}
if wind == nil || !wind.WindDirection.Valid || !wind.WindSpeed.Valid || wind.WindSpeed.Float64 <= 0.01 {
if flagWhy {
log.Printf("neighbor why: center %s no wind data", center.ID)
}
continue
}
dir := wind.WindDirection.Float64
spd := wind.WindSpeed.Float64
radius := spd * 3600
for _, nb := range allStations {
if nb.ID == center.ID {
continue
}
if nb.Latitude == 0 || nb.Longitude == 0 {
if flagWhy {
log.Printf("neighbor why: neighbor %s missing lat/lon", nb.ID)
}
continue
}
dist := haversine(center.Latitude, center.Longitude, nb.Latitude, nb.Longitude)
brg := bearingDeg(center.Latitude, center.Longitude, nb.Latitude, nb.Longitude)
diff := angDiff(brg, dir)
inSector := dist <= radius && diff <= halfAngleDeg
if !inSector {
if flagWhy {
log.Printf("neighbor why: center=%s neighbor=%s not in sector dist=%.1fm radius=%.1fm bearing=%.1f windFrom=%.1f diff=%.1f half=%.1f",
center.ID, nb.ID, dist, radius, brg, dir, diff, halfAngleDeg)
}
continue
}
rain, ok, err := data.SumRainMM(ctx, nb.ID, start, end)
if err != nil {
log.Printf("neighbor: sum rain station=%s err=%v", nb.ID, err)
continue
}
if flagWhy {
log.Printf("neighbor why: center=%s neighbor=%s window=%s~%s rain_sum=%.3f ok=%v", center.ID, nb.ID, start.Format(timeFormatShort), end.Format(timeFormatShort), rain, ok)
}
if !ok {
if flagWhy {
log.Printf("neighbor why: neighbor %s no rain data", nb.ID)
}
continue
}
level := ""
threshold := 0.0
if rain >= actualRedMM {
level = levelRed
threshold = actualRedMM
} else if rain >= actualYellowMM {
level = levelYellow
threshold = actualYellowMM
}
if level == "" {
if flagWhy {
log.Printf("neighbor why: neighbor %s rain=%.3f below threshold", nb.ID, rain)
}
continue
}
atype := alertTypeNeighbor30 + "_" + nb.ID
msg := fmt.Sprintf("站点%s 迎风扇区内站点%s 30分钟降水 %.3fmm,达到%s阈值 %.1fmm,窗口 %s - %s", center.Name, nb.Name, rain, levelLabel(level), threshold, start.Format(timeFormatShort), end.Format(timeFormatShort))
targetPhones := recip.forLevel(level)
if len(targetPhones) == 0 {
recordAlert(ctx, atype, center.ID, level, end, msg, sql.NullString{})
log.Printf("neighbor: center=%s neighbor=%s level=%s rain=%.3f (no phones)", center.ID, nb.ID, level, rain)
continue
}
sendToPhones(ctx, scli, center.Name, rain, level, end, targetPhones, msg, atype, center.ID)
log.Printf("neighbor: center=%s neighbor=%s level=%s rain=%.3f phones=%d", center.ID, nb.ID, level, rain, len(targetPhones))
}
}
}
func recordAlert(ctx context.Context, alertType, stationID, level string, issuedAt time.Time, message string, phone sql.NullString) {
_, err := data.InsertAlert(ctx, data.AlertRecord{
AlertType: alertType,
StationID: stationID,
Level: level,
IssuedAt: issuedAt,
Message: message,
SMSPhone: phone,
})
if err != nil {
log.Printf("alert insert failed station=%s type=%s level=%s: %v", stationID, alertType, level, err)
}
}
type recipients struct {
red []string
yel []string
}
func (r recipients) forLevel(level string) []string {
if level == levelRed {
return r.red
}
if level == levelYellow {
return r.yel
}
return nil
}
func loadRecipients(ctx context.Context) recipients {
list, err := data.ListEnabledSMSRecipients(ctx)
if err != nil {
log.Printf("sms: load recipients failed: %v", err)
return recipients{}
}
var res recipients
for _, r := range list {
if r.AlertLevel >= 1 {
res.red = append(res.red, r.Phone)
}
if r.AlertLevel >= 2 {
res.yel = append(res.yel, r.Phone)
}
}
return res
}
func sendToPhones(ctx context.Context, scli *sms.Client, stationName string, value float64, level string, issuedAt time.Time, phones []string, message string, alertType string, stationID string) {
if scli == nil {
return
}
name := "" + stationName + ""
content := format3(value) + " mm"
alertText := "【大礼村】暴雨"
if level == levelRed {
alertText += "红色预警"
} else {
alertText += "黄色预警"
}
for _, ph := range phones {
if err := scli.Send(ctx, name, content, alertText, "", []string{ph}); err != nil {
log.Printf("sms: send failed phone=%s station=%s level=%s: %v", ph, stationID, level, err)
continue
}
recordAlert(ctx, alertType, stationID, level, issuedAt, message, sql.NullString{String: ph, Valid: true})
log.Printf("sms: sent phone=%s station=%s level=%s", ph, stationID, level)
}
}
func format3(v float64) string {
s := fmt.Sprintf("%.3f", v)
s = strings.TrimRight(s, "0")
s = strings.TrimRight(s, ".")
if s == "" {
return "0"
}
return s
}
func mustShanghai() *time.Location {
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
return loc
}
func levelLabel(level string) string {
if level == levelRed {
return "红色"
}
return "黄色"
}
func listFixedStations(ctx context.Context) []data.StationInfo {
ids := []string{"RS485-002964", "RS485-002A39", "RS485-0029CB"}
sts, err := data.ListStationsByIDs(ctx, ids)
if err != nil || len(sts) == 0 {
var out []data.StationInfo
for _, id := range ids {
out = append(out, data.StationInfo{ID: id, Name: id})
}
return out
}
// 保证顺序按 ids
order := make(map[string]int)
for i, id := range ids {
order[id] = i
}
sort.Slice(sts, func(i, j int) bool { return order[sts[i].ID] < order[sts[j].ID] })
return sts
}
func filterStations(in []data.StationInfo, filter string) []data.StationInfo {
f := strings.TrimSpace(filter)
if f == "" {
return in
}
parts := strings.Split(f, ",")
m := make(map[string]struct{}, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
m[p] = struct{}{}
}
}
if len(m) == 0 {
return in
}
var out []data.StationInfo
for _, st := range in {
if _, ok := m[st.ID]; ok {
out = append(out, st)
}
}
return out
}
func toRad(d float64) float64 { return d * math.Pi / 180 }
func toDeg(r float64) float64 { return r * 180 / math.Pi }
func haversine(lat1, lon1, lat2, lon2 float64) float64 {
const R = 6371000.0
dLat := toRad(lat2 - lat1)
dLon := toRad(lon2 - lon1)
a := math.Sin(dLat/2)*math.Sin(dLat/2) + math.Cos(toRad(lat1))*math.Cos(toRad(lat2))*math.Sin(dLon/2)*math.Sin(dLon/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
return R * c
}
func bearingDeg(lat1, lon1, lat2, lon2 float64) float64 {
φ1 := toRad(lat1)
φ2 := toRad(lat2)
Δλ := toRad(lon2 - lon1)
y := math.Sin(Δλ) * math.Cos(φ2)
x := math.Cos(φ1)*math.Sin(φ2) - math.Sin(φ1)*math.Cos(φ2)*math.Cos(Δλ)
brg := toDeg(math.Atan2(y, x))
if brg < 0 {
brg += 360
}
return brg
}
func angDiff(a, b float64) float64 {
d := math.Mod(a-b+540, 360) - 180
if d < 0 {
d = -d
}
return math.Abs(d)
}

View File

@ -422,7 +422,7 @@ export class AppComponent implements OnInit, AfterViewInit {
} catch {} } catch {}
if (windDir != null && windSpd != null && windSpd > 0.01) { if (windDir != null && windSpd != null && windSpd > 0.01) {
const bearingTo = (windDir + 180) % 360; // 去向 const bearingTo = windDir;
const hours = 3; const hours = 3;
const radius = windSpd * 3600 * hours; // m const radius = windSpd * 3600 * hours; // m
const half = 25; // 半角 const half = 25; // 半角

View File

@ -0,0 +1,41 @@
package data
import (
"context"
"database/sql"
"time"
)
type AlertRecord struct {
AlertType string
StationID string
Level string
IssuedAt time.Time
Message string
SMSPhone sql.NullString
}
// InsertAlert writes one alert row; returns inserted id or 0 when skipped by conflict.
func InsertAlert(ctx context.Context, ar AlertRecord) (int64, error) {
const q = `
INSERT INTO alerts (alert_type, station_id, level, issued_at, message, sms_phone)
VALUES ($1,$2,$3,$4,$5,$6)
ON CONFLICT DO NOTHING
RETURNING id`
var id int64
err := DB().QueryRowContext(ctx, q,
ar.AlertType,
ar.StationID,
ar.Level,
ar.IssuedAt,
ar.Message,
ar.SMSPhone,
).Scan(&id)
if err == sql.ErrNoRows {
return 0, nil
}
if err != nil {
return 0, err
}
return id, nil
}

View File

@ -19,3 +19,17 @@ func FetchActualHourlyRain(ctx context.Context, stationID string, start, end tim
} }
return float64(sum.Int64) / 1000.0, true, nil return float64(sum.Int64) / 1000.0, true, nil
} }
// SumRainMM sums rain_10m_mm_x1000 over [start,end) and returns mm.
func SumRainMM(ctx context.Context, stationID string, start, end time.Time) (float64, bool, error) {
const q = `SELECT SUM(rain_10m_mm_x1000) FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3`
var sum sql.NullInt64
err := DB().QueryRowContext(ctx, q, stationID, start, end).Scan(&sum)
if err != nil {
return 0, false, err
}
if !sum.Valid {
return 0, false, nil
}
return float64(sum.Int64) / 1000.0, true, nil
}

View File

@ -3,6 +3,8 @@ package data
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"strings"
) )
// GetStationName returns stations.name by station_id; empty string if not found/null. // GetStationName returns stations.name by station_id; empty string if not found/null.
@ -21,3 +23,88 @@ func GetStationName(ctx context.Context, stationID string) (string, error) {
} }
return "", nil return "", nil
} }
// StationInfo contains minimal fields for alert checks.
type StationInfo struct {
ID string
Name string
Location string
Latitude float64
Longitude float64
Altitude float64
}
// ListEligibleStations returns WH65LP stations with required non-empty fields.
func ListEligibleStations(ctx context.Context) ([]StationInfo, error) {
const q = `
SELECT
station_id,
COALESCE(NULLIF(BTRIM(name), ''), station_id) AS name,
location,
latitude::float8,
longitude::float8,
altitude::float8
FROM stations
WHERE
device_type = 'WH65LP' AND
name IS NOT NULL AND BTRIM(name) <> '' AND
location IS NOT NULL AND BTRIM(location) <> '' AND
latitude IS NOT NULL AND latitude <> 0 AND
longitude IS NOT NULL AND longitude <> 0 AND
altitude IS NOT NULL AND altitude <> 0`
rows, err := DB().QueryContext(ctx, q)
if err != nil {
return nil, err
}
defer rows.Close()
var out []StationInfo
for rows.Next() {
var st StationInfo
if err := rows.Scan(&st.ID, &st.Name, &st.Location, &st.Latitude, &st.Longitude, &st.Altitude); err != nil {
continue
}
out = append(out, st)
}
return out, nil
}
// ListStationsByIDs returns stations by a given id list (ignores missing ones).
func ListStationsByIDs(ctx context.Context, ids []string) ([]StationInfo, error) {
if len(ids) == 0 {
return nil, nil
}
var placeholders []string
args := make([]interface{}, 0, len(ids))
for i, id := range ids {
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
args = append(args, id)
}
q := fmt.Sprintf(`
SELECT
station_id,
COALESCE(NULLIF(BTRIM(name), ''), station_id) AS name,
location,
latitude::float8,
longitude::float8,
altitude::float8
FROM stations
WHERE station_id IN (%s)`, strings.Join(placeholders, ","))
rows, err := DB().QueryContext(ctx, q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var out []StationInfo
for rows.Next() {
var st StationInfo
if err := rows.Scan(&st.ID, &st.Name, &st.Location, &st.Latitude, &st.Longitude, &st.Altitude); err != nil {
continue
}
out = append(out, st)
}
return out, nil
}

7
db/migrations/02.sql Normal file
View File

@ -0,0 +1,7 @@
-- 02.sql: create simple users table
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
password TEXT NOT NULL, -- bcrypt hash
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

9
db/migrations/03.sql Normal file
View File

@ -0,0 +1,9 @@
-- 03.sql: recipients for SMS alerts
-- Table to store phone numbers, enabled flag, and alert level (1=red only, 2=yellow+red)
-- PostgreSQL has no native unsigned int; use integer with CHECK constraints.
CREATE TABLE IF NOT EXISTS sms_recipients (
phone TEXT PRIMARY KEY,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
alert_level INTEGER NOT NULL DEFAULT 2 CHECK (alert_level >= 1)
);

View File

@ -0,0 +1,28 @@
-- 04_sms_recipients_seed_20251125.sql: add/update specific SMS recipients
-- Sets enabled = FALSE and alert_level = 1 for listed phone numbers.
-- Idempotent via ON CONFLICT on primary key (phone).
INSERT INTO sms_recipients (phone, enabled, alert_level) VALUES
('13114458208', FALSE, 1),
('13986807953', FALSE, 1),
('13207210509', FALSE, 1),
('13886680872', FALSE, 1),
('13477172662', FALSE, 1),
('13177094329', FALSE, 1),
('13165617999', FALSE, 1),
('13217179901', FALSE, 1),
('18571017120', FALSE, 1),
('18674205345', FALSE, 1),
('18871769640', FALSE, 1),
('15587930225', FALSE, 1),
('13545715958', FALSE, 1),
('15629386907', FALSE, 1),
('15971633321', FALSE, 1),
('15671074991', FALSE, 1),
('18727254175', FALSE, 1),
('13477108587', FALSE, 1),
('15897521649', FALSE, 1)
ON CONFLICT (phone) DO UPDATE SET
enabled = EXCLUDED.enabled,
alert_level = EXCLUDED.alert_level;

View File

@ -0,0 +1,23 @@
-- 05_alerts.sql: table for alerts/warnings
CREATE TABLE IF NOT EXISTS alerts (
id BIGSERIAL PRIMARY KEY,
alert_type TEXT NOT NULL,
station_id VARCHAR(50) NOT NULL,
level VARCHAR(10) NOT NULL CHECK (level IN ('yellow', 'red')),
issued_at TIMESTAMPTZ NOT NULL,
message TEXT,
sms_phone TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- One record per alert event (no SMS stored)
CREATE UNIQUE INDEX IF NOT EXISTS alerts_uniq_event
ON alerts (alert_type, station_id, issued_at, level)
WHERE sms_phone IS NULL;
-- One record per phone recipient
CREATE UNIQUE INDEX IF NOT EXISTS alerts_uniq_phone
ON alerts (alert_type, station_id, issued_at, level, sms_phone)
WHERE sms_phone IS NOT NULL;
CREATE INDEX IF NOT EXISTS alerts_station_idx ON alerts (station_id);