From 376a36df2244025476e385f38c2c9e68e8caf352 Mon Sep 17 00:00:00 2001 From: yarnom Date: Mon, 1 Dec 2025 09:50:58 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E6=94=AF=E6=8C=81=20V5=20?= =?UTF-8?q?=E5=92=8C=20V6=20=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/cmd/radar_hour_export/main.go | 399 +++++++++++++++++++++++++++++ core/cmd/sms-send/main.go | 345 +++++++++++++++++++++++-- core/cmd/v5-export/main.go | 291 +++++++++++++++++++++ core/cmd/v5-model/main.go | 271 ++++++++++++++++++++ core/cmd/v6-export/main.go | 288 +++++++++++++++++++++ core/cmd/v6-model/main.go | 261 +++++++++++++++++++ core/internal/data/forecast.go | 46 ++++ core/internal/data/rain.go | 21 ++ core/internal/data/sms.go | 31 +++ core/internal/data/station.go | 23 ++ core/internal/sms/sms.go | 7 +- 11 files changed, 1963 insertions(+), 20 deletions(-) create mode 100644 core/cmd/radar_hour_export/main.go create mode 100644 core/cmd/v5-export/main.go create mode 100644 core/cmd/v5-model/main.go create mode 100644 core/cmd/v6-export/main.go create mode 100644 core/cmd/v6-model/main.go create mode 100644 core/internal/data/rain.go create mode 100644 core/internal/data/sms.go create mode 100644 core/internal/data/station.go diff --git a/core/cmd/radar_hour_export/main.go b/core/cmd/radar_hour_export/main.go new file mode 100644 index 0000000..b407b1f --- /dev/null +++ b/core/cmd/radar_hour_export/main.go @@ -0,0 +1,399 @@ +package main + +import ( + "database/sql" + "encoding/binary" + "encoding/csv" + "flag" + "fmt" + "log" + "math" + "os" + "strings" + "time" + + "weatherstation/core/internal/data" +) + +type station struct { + ID string + Lat float64 + Lon float64 +} + +func main() { + var stationArg string + var timeArg string + var startArg string + var endArg string + var outPath string + var zoom int + flag.StringVar(&stationArg, "station", "", "station id (e.g., RS485-XXXXXX or hex XXXXXX)") + flag.StringVar(&timeArg, "time", "", "[deprecated] right-endpoint time 'YYYY-MM-DD HH:MM:SS' (e.g., 2024-08-01 17:00:00)") + flag.StringVar(&startArg, "start", "", "range start time 'YYYY-MM-DD HH:MM:SS'") + flag.StringVar(&endArg, "end", "", "range end time 'YYYY-MM-DD HH:MM:SS'") + flag.StringVar(&outPath, "out", "", "output CSV file (default stdout)") + flag.IntVar(&zoom, "z", 7, "radar tile zoom level (default 7)") + flag.Parse() + + if strings.TrimSpace(stationArg) == "" || (strings.TrimSpace(timeArg) == "" && (strings.TrimSpace(startArg) == "" || strings.TrimSpace(endArg) == "")) { + log.Fatalf("usage: radar_hour_export -station RS485-XXXXXX (-time 'YYYY-MM-DD HH:MM:SS' | -start 'YYYY-MM-DD HH:MM:SS' -end 'YYYY-MM-DD HH:MM:SS') [-out file.csv] [-z 7]") + } + + st, err := resolveStation(stationArg) + if err != nil { + log.Fatal(err) + } + if st == nil { + log.Fatalf("station not found: %s", stationArg) + } + + // parse time(s) in Asia/Shanghai + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + var tStart, tEnd time.Time + if strings.TrimSpace(timeArg) != "" { + // legacy single-hour mode: time is right-endpoint, range = (t-1h, t] + tEnd, err = time.ParseInLocation("2006-01-02 15:04:05", timeArg, loc) + if err != nil { + log.Fatalf("invalid time: %v", err) + } + tStart = tEnd.Add(-1 * time.Hour) + } else { + var err1, err2 error + tStart, err1 = time.ParseInLocation("2006-01-02 15:04:05", startArg, loc) + tEnd, err2 = time.ParseInLocation("2006-01-02 15:04:05", endArg, loc) + if err1 != nil || err2 != nil { + log.Fatalf("invalid start/end time") + } + if !tEnd.After(tStart) { + log.Fatalf("end must be after start") + } + } + + y, x, _, err := pickTileAt(st.Lat, st.Lon, zoom) + if err != nil { + log.Fatal(err) + } + if y < 0 || x < 0 { + log.Fatalf("no radar tile covering station at z=%d", zoom) + } + + // collect tile times within (tStart, tEnd] + times, err := tileTimesInRange(zoom, y, x, tStart, tEnd) + if err != nil { + log.Fatal(err) + } + if len(times) == 0 { + log.Printf("no radar tiles in hour window for y=%d x=%d", y, x) + } + + // rainfall will be computed per-hour-end for each tile time + + // CSV writer + var w *csv.Writer + var f *os.File + if outPath == "" { + w = csv.NewWriter(os.Stdout) + } else { + f, err = os.Create(outPath) + if err != nil { + log.Fatal(err) + } + defer f.Close() + w = csv.NewWriter(f) + } + defer w.Flush() + // header + _ = w.Write([]string{"time", "rain_mm", "wind_dir", "wind_speed", "ge30", "ge35", "ge40"}) + + // cache rainfall per hour-end to avoid repeated queries + rainCache := make(map[time.Time]float64) + + for _, dt := range times { + // wind from radar_weather nearest to station at this dt + wd, ws := nearestWind(st.Lat, st.Lon, dt) + // load tile values at dt for the chosen y/x tile + vals, meta, err := loadRadarTile(zoom, y, x, dt) + if err != nil { + log.Printf("tile load failed at %v: %v", dt, err) + continue + } + // right-endpoint hour for rainfall: (hourEnd-1h, hourEnd] + hourEnd := rightEndpointHour(dt.In(loc)) + rainMM := rainCache[hourEnd] + if _, ok := rainCache[hourEnd]; !ok { + rmm, _ := hourRain(st.ID, hourEnd.Add(-1*time.Hour), hourEnd) + rainCache[hourEnd] = rmm + rainMM = rmm + } + // build sector polygon using wind (fallback to circle-only if no wind) + ge30, ge35, ge40 := 0, 0, 0 + if wd != nil && ws != nil && *ws > 0.01 { + poly := sectorPolygon(st.Lat, st.Lon, *wd, *ws, 3*time.Hour) + ge30, ge35, ge40 = countInPolygon(vals, meta, poly) + } else { + // fallback to 8km circle approximated as polygon + poly := circlePolygon(st.Lat, st.Lon, 8000) + ge30, ge35, ge40 = countInPolygon(vals, meta, poly) + } + wdStr, wsStr := "", "" + if wd != nil { + wdStr = fmt.Sprintf("%.0f", *wd) + } + if ws != nil { + wsStr = fmt.Sprintf("%.1f", *ws) + } + rec := []string{dt.In(loc).Format("2006-01-02 15:04:05"), fmt.Sprintf("%.3f", rainMM), wdStr, wsStr, fmt.Sprintf("%d", ge30), fmt.Sprintf("%d", ge35), fmt.Sprintf("%d", ge40)} + if err := w.Write(rec); err != nil { + log.Printf("csv write failed: %v", err) + } + } +} + +func resolveStation(arg string) (*station, error) { + id := strings.TrimSpace(arg) + if !strings.HasPrefix(strings.ToUpper(id), "RS485-") { + // treat as hex suffix + hex := strings.ToUpper(strings.TrimSpace(id)) + hex = filterHex(hex) + if len(hex) > 6 { + hex = hex[len(hex)-6:] + } + id = "RS485-" + hex + } + rows, err := data.DB().Query(`SELECT station_id, latitude, longitude FROM stations WHERE station_id = $1`, id) + if err != nil { + return nil, err + } + defer rows.Close() + if rows.Next() { + var s station + if err := rows.Scan(&s.ID, &s.Lat, &s.Lon); err != nil { + return nil, err + } + return &s, nil + } + return nil, nil +} + +func filterHex(s string) string { + var b strings.Builder + for i := 0; i < len(s); i++ { + c := s[i] + if (c >= '0' && c <= '9') || (c >= 'A' && c <= 'F') { + b.WriteByte(c) + } + } + return b.String() +} + +type tileMeta struct { + West, South, East, North float64 + W, H int +} + +func pickTileAt(lat, lon float64, z int) (int, int, tileMeta, error) { + const q = `SELECT y,x,west,south,east,north FROM radar_tiles WHERE z=$1 AND $2 BETWEEN south AND north AND $3 BETWEEN west AND east ORDER BY dt DESC LIMIT 1` + var y, x int + var m tileMeta + err := data.DB().QueryRow(q, z, lat, lon).Scan(&y, &x, &m.West, &m.South, &m.East, &m.North) + if err == sql.ErrNoRows { + return -1, -1, m, nil + } + return y, x, m, err +} + +func tileTimesInRange(z, y, x int, start, end time.Time) ([]time.Time, error) { + const q = `SELECT dt FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt > $4 AND dt <= $5 ORDER BY dt` + rows, err := data.DB().Query(q, z, y, x, start, end) + if err != nil { + return nil, err + } + defer rows.Close() + var ts []time.Time + for rows.Next() { + var t time.Time + if err := rows.Scan(&t); err == nil { + ts = append(ts, t) + } + } + return ts, nil +} + +func hourRain(stationID string, start, end time.Time) (float64, error) { + const q = `SELECT COALESCE(SUM(rain_10m_mm_x1000)/1000.0,0) FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3` + var mm float64 + err := data.DB().QueryRow(q, stationID, start, end).Scan(&mm) + return mm, err +} + +type radarTile struct { + DT time.Time + Z, Y, X int + W, H int + West, South, East, North float64 + Res float64 + Data []byte +} + +func loadRadarTile(z, y, x int, dt time.Time) ([][]*float64, tileMeta, error) { + const q = `SELECT dt,z,y,x,width,height,west,south,east,north,res_deg,data FROM radar_tiles WHERE z=$1 AND y=$2 AND x=$3 AND dt=$4 LIMIT 1` + var r radarTile + row := data.DB().QueryRow(q, z, y, x, dt) + if err := row.Scan(&r.DT, &r.Z, &r.Y, &r.X, &r.W, &r.H, &r.West, &r.South, &r.East, &r.North, &r.Res, &r.Data); err != nil { + return nil, tileMeta{}, err + } + w, h := r.W, r.H + vals := make([][]*float64, h) + off := 0 + for row := 0; row < h; row++ { + rowVals := make([]*float64, w) + for col := 0; col < w; col++ { + v := int16(binary.BigEndian.Uint16(r.Data[off : off+2])) + off += 2 + if v >= 32766 { + rowVals[col] = nil + continue + } + dbz := float64(v) / 10.0 + if dbz < 0 { + dbz = 0 + } else if dbz > 75 { + dbz = 75 + } + vv := dbz + rowVals[col] = &vv + } + vals[row] = rowVals + } + return vals, tileMeta{West: r.West, South: r.South, East: r.East, North: r.North, W: r.W, H: r.H}, nil +} + +func nearestWind(lat, lon float64, dt time.Time) (*float64, *float64) { + // reuse server's nearest logic via data layer + rw, err := data.RadarWeatherNearest(lat, lon, dt, 6*time.Hour) + if err != nil || rw == nil { + return nil, nil + } + var dir *float64 + var spd *float64 + if rw.WindDirection.Valid { + d := rw.WindDirection.Float64 // normalize + d = math.Mod(d, 360) + if d < 0 { + d += 360 + } + dir = &d + } + if rw.WindSpeed.Valid { + v := rw.WindSpeed.Float64 + spd = &v + } + return dir, spd +} + +// great-circle naive meter->degree approximation +func sectorPolygon(lat, lon, windFromDeg, windSpeedMS float64, dur time.Duration) [][2]float64 { + // convert to downwind (to-direction) + bearingTo := math.Mod(windFromDeg+180, 360) + radius := windSpeedMS * dur.Seconds() // meters + // meters per degree + latRad := lat * math.Pi / 180 + mPerDegLat := 111320.0 + mPerDegLon := 111320.0 * math.Cos(latRad) + half := 25.0 // degrees + var poly [][2]float64 + poly = append(poly, [2]float64{lon, lat}) + for a := -half; a <= half+1e-6; a += 2.5 { + ang := (bearingTo + a) * math.Pi / 180 + dx := radius * math.Sin(ang) + dy := radius * math.Cos(ang) + dlon := dx / mPerDegLon + dlat := dy / mPerDegLat + poly = append(poly, [2]float64{lon + dlon, lat + dlat}) + } + poly = append(poly, [2]float64{lon, lat}) + return poly +} + +func circlePolygon(lat, lon float64, radiusM float64) [][2]float64 { + latRad := lat * math.Pi / 180 + mPerDegLat := 111320.0 + mPerDegLon := 111320.0 * math.Cos(latRad) + var poly [][2]float64 + for a := 0.0; a <= 360.0; a += 6.0 { + ang := a * math.Pi / 180 + dx := radiusM * math.Cos(ang) + dy := radiusM * math.Sin(ang) + poly = append(poly, [2]float64{lon + dx/mPerDegLon, lat + dy/mPerDegLat}) + } + poly = append(poly, [2]float64{poly[0][0], poly[0][1]}) + return poly +} + +func countInPolygon(vals [][]*float64, meta tileMeta, poly [][2]float64) (int, int, int) { + if vals == nil || len(vals) == 0 { + return 0, 0, 0 + } + w, h := meta.W, meta.H + dlon := (meta.East - meta.West) / float64(w) + dlat := (meta.North - meta.South) / float64(h) + inPoly := func(x, y float64) bool { + inside := false + n := len(poly) + for i, j := 0, n-1; i < n; j, i = i, i+1 { + xi, yi := poly[i][0], poly[i][1] + xj, yj := poly[j][0], poly[j][1] + inter := ((yi > y) != (yj > y)) && (x < (xj-xi)*(y-yi)/((yj-yi)+1e-12)+xi) + if inter { + inside = !inside + } + } + return inside + } + c30, c35, c40 := 0, 0, 0 + for row := 0; row < h; row++ { + lat := meta.South + (float64(row)+0.5)*dlat + vr := vals[row] + if vr == nil { + continue + } + for col := 0; col < w; col++ { + v := vr[col] + if v == nil { + continue + } + vv := *v + if vv < 30.0 { + continue + } + lon := meta.West + (float64(col)+0.5)*dlon + if !inPoly(lon, lat) { + continue + } + if vv >= 30 { + c30++ + } + if vv >= 35 { + c35++ + } + if vv >= 40 { + c40++ + } + } + } + return c30, c35, c40 +} + +// rightEndpointHour returns the right endpoint hour for a dt, meaning: +// if dt is exactly at :00, return dt truncated to hour; otherwise, return next hour. +func rightEndpointHour(dt time.Time) time.Time { + t := dt.Truncate(time.Hour) + if dt.Equal(t) { + return t + } + return t.Add(time.Hour) +} diff --git a/core/cmd/sms-send/main.go b/core/cmd/sms-send/main.go index 6072d5d..5979708 100644 --- a/core/cmd/sms-send/main.go +++ b/core/cmd/sms-send/main.go @@ -3,10 +3,13 @@ package main import ( "context" "flag" + "fmt" "log" "strings" + "time" "weatherstation/core/internal/config" + "weatherstation/core/internal/data" "weatherstation/core/internal/sms" ) @@ -14,11 +17,27 @@ func main() { // Usage: // CORE_SMS_AK, CORE_SMS_SK, CORE_SMS_SIGN, CORE_SMS_TPL, optional CORE_SMS_ENDPOINT // go run ./core/cmd/sms-send --to 17308264374 --msg "Hello Yarnom" --name device-ids --time 2025-01-01 12:00 + // go run ./core/cmd/sms-send -> hourly check mode (first 10 minutes of each hour) var to, msg, name, tm string + var once bool + var testMode bool + var testLevel int + // test2: manual station+rain, auto decide level and send + var test2 bool + var station string + var rain float64 flag.StringVar(&to, "to", "", "comma-separated phone numbers") - flag.StringVar(&msg, "msg", "", "message content") + flag.StringVar(&msg, "msg", "", "message content (for ${content}, recommend numeric value)") flag.StringVar(&name, "name", "", "device IDs/name field for template") - flag.StringVar(&tm, "time", "", "time field for template") + flag.StringVar(&tm, "time", "", "time field for template (unused if empty)") + var alert string + flag.StringVar(&alert, "alert", "", "alert text for ${alert}") + flag.BoolVar(&once, "once", false, "run one check immediately (auto mode)") + flag.BoolVar(&testMode, "test", false, "run in test mode (ignore thresholds)") + flag.IntVar(&testLevel, "level", 1, "test target alert level (1=大雨-only, 2=中雨+大雨)") + flag.BoolVar(&test2, "test2", false, "manual test by station+rain; decide yellow/red and send to recipients by alert level") + flag.StringVar(&station, "station", "", "station name for template ${name}") + flag.Float64Var(&rain, "rain", 0, "rainfall in mm (single hour)") flag.Parse() cfg := config.Load() @@ -33,22 +52,314 @@ func main() { log.Fatal(err) } - if to == "" { - log.Fatal("missing --to") - } - if msg == "" { - log.Fatal("missing --msg") - } - if tm == "" { - tm = "" - } - if name == "" { - name = "" + // Manual send mode when --to and --msg are provided + if to != "" && msg != "" { + if tm == "" { + tm = "" + } + if name == "" { + name = "" + } + // Manual mode: allow --alert (recommended for new template) + phones := strings.Split(to, ",") + if err := scli.Send(context.Background(), name, msg, alert, tm, phones); err != nil { + log.Fatal(err) + } + log.Println("sms: sent OK") + return } - phones := strings.Split(to, ",") - if err := scli.Send(context.Background(), name, msg, tm, phones); err != nil { - log.Fatal(err) + // Test mode: ignore thresholds, send to recipients of given level, append (测试) + if testMode { + runTestCheck(scli, testLevel) + return } - log.Println("sms: sent OK") + + // Test2 mode: user-provided station name and rain (mm); do not read forecast DB + if test2 { + runTest2(scli, station, rain) + return + } + + // Auto mode: 每小时的第一个10分钟启动 + checkFn := func(tick time.Time) { runHourlyCheck(scli, tick) } + if once { + checkFn(time.Now()) + return + } + alignAndRunHour10(checkFn) +} + +// alignAndRunHour10 runs fn at the first 10 minutes of each hour. +func alignAndRunHour10(fn func(tick time.Time)) { + now := time.Now() + base := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 10, 0, 0, now.Location()) + var next time.Time + if now.After(base) { + next = base.Add(1 * time.Hour) + } else { + next = base + } + time.Sleep(time.Until(next)) + for { + tick := time.Now().Truncate(time.Minute) + fn(tick) + time.Sleep(1 * time.Hour) + } +} + +func runHourlyCheck(scli *sms.Client, tick time.Time) { + // 固定 provider 和站点 + provider := "imdroid_mix" + stationIDs := []string{"RS485-0029CB", "RS485-002A39", "RS485-002964"} + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // 读取收件人(enabled) + recipients, err := data.ListEnabledSMSRecipients(ctx) + if err != nil { + log.Printf("sms: load recipients failed: %v", err) + return + } + if len(recipients) == 0 { + log.Printf("sms: no enabled recipients, skip") + return + } + // alert_level: 1=大雨 only, 2=中雨+大雨 + var heavyPhones, moderatePhones []string + for _, r := range recipients { + if r.AlertLevel >= 1 { + heavyPhones = append(heavyPhones, r.Phone) + } + if r.AlertLevel >= 2 { + moderatePhones = append(moderatePhones, r.Phone) + } + } + if len(heavyPhones) == 0 && len(moderatePhones) == 0 { + log.Printf("sms: no recipients by level, skip") + return + } + + // 以 CST 解析 issued_at 整点 + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + now := tick.In(loc) + issued := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, loc) + + // 三小时窗口:hour+1, hour+2, hour+3 + next1 := issued.Add(1 * time.Hour) + next3 := issued.Add(3 * time.Hour) + + // 遍历站点,计算未来三小时单小时阈值(红>黄) + for _, sid := range stationIDs { + points, err := data.ForecastRainAtIssued(ctx, sid, provider, issued) + if err != nil { + log.Printf("sms: forecast query failed station=%s: %v", sid, err) + continue + } + stName, err := data.GetStationName(ctx, sid) + if err != nil { + stName = "" + } + if strings.TrimSpace(stName) == "" { + stName = sid + } + var redMaxX1000 int64 + var yellowMaxX1000 int64 + for _, p := range points { + if !p.ForecastTime.Before(next1) && !p.ForecastTime.After(next3) { + v := int64(p.RainMMx1000) + if v >= 8000 { + if v > redMaxX1000 { + redMaxX1000 = v + } + } else if v >= 4000 { + if v > yellowMaxX1000 { + yellowMaxX1000 = v + } + } + } + } + // 判定阈值(单小时):任一>=8红色;否则任一[4,8)黄色;否则不发 + if redMaxX1000 > 0 { + if len(heavyPhones) > 0 { + // 模板参数格式:time: "YYYY-MM-DD HH:MM," name: ":<站点名称>," content: " mm(大雨)" + name := ":" + stName + "," + // 新模板字段:content=数值, alert=固定文案, time可为空(此处仍传带逗号的时间字符串以兼容) + content := format3(float64(redMaxX1000)/1000.0) + " mm" + alert := "【大礼村】暴雨红色预警" + tm := "" // ${time} 不用了 + if err := scli.Send(ctx, name, content, alert, tm, heavyPhones); err != nil { + log.Printf("sms: send heavy failed station=%s: %v", sid, err) + } else { + log.Printf("sms: sent HEAVY (红色) station=%s max=%.3fmm to=%d", sid, float64(redMaxX1000)/1000.0, len(heavyPhones)) + } + } + } else if yellowMaxX1000 > 0 { + if len(moderatePhones) > 0 { + name := ":" + stName + "," + content := format3(float64(yellowMaxX1000)/1000.0) + " mm" + alert := "【大礼村】暴雨黄色预警" + tm := "" + if err := scli.Send(ctx, name, content, alert, tm, moderatePhones); err != nil { + log.Printf("sms: send moderate failed station=%s: %v", sid, err) + } else { + log.Printf("sms: sent MODERATE (黄色) station=%s max=%.3fmm to=%d", sid, float64(yellowMaxX1000)/1000.0, len(moderatePhones)) + } + } + } else { + log.Printf("sms: no alert station=%s", sid) + } + } +} + +// content now only carries numeric rain value; helpers removed. + +func format3(v float64) string { + s := fmt.Sprintf("%.3f", v) + s = strings.TrimRight(s, "0") + s = strings.TrimRight(s, ".") + if s == "" { + return "0" + } + return s +} + +// runTestCheck sends messages regardless of thresholds to recipients at given alert level. +func runTestCheck(scli *sms.Client, level int) { + provider := "imdroid_mix" + stationIDs := []string{"RS485-0029CB", "RS485-002A39", "RS485-002964"} + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // Load recipients (enabled) and filter by exact level + recipients, err := data.ListEnabledSMSRecipients(ctx) + if err != nil { + log.Printf("sms test: load recipients failed: %v", err) + return + } + var phones []string + for _, r := range recipients { + if r.AlertLevel == level { + phones = append(phones, r.Phone) + } + } + if len(phones) == 0 { + log.Printf("sms test: no recipients at level=%d", level) + return + } + + // 时间与窗口 + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + now := time.Now().In(loc) + issued := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, loc) + next1 := issued.Add(1 * time.Hour) + next3 := issued.Add(3 * time.Hour) + + // Iterate stations + for _, sid := range stationIDs { + points, err := data.ForecastRainAtIssued(ctx, sid, provider, issued) + if err != nil { + log.Printf("sms test: forecast query failed station=%s: %v", sid, err) + continue + } + stName, err := data.GetStationName(ctx, sid) + if err != nil { + stName = "" + } + if strings.TrimSpace(stName) == "" { + stName = sid + } + var sumX1000 int64 + for _, p := range points { + if !p.ForecastTime.Before(next1) && !p.ForecastTime.After(next3) { + sumX1000 += int64(p.RainMMx1000) + } + } + name := ":" + stName + "," + // Test mode: content=数值; alert=红色预警 + (测试) + content := format3(float64(sumX1000)/1000.0) + " mm" + alert := "【大礼村】暴雨红色预警(测试)" + tm := "" + if err := scli.Send(ctx, name, content, alert, tm, phones); err != nil { + log.Printf("sms test: send failed station=%s: %v", sid, err) + } else { + log.Printf("sms test: sent station=%s sum=%.3fmm level=%d to=%d", sid, float64(sumX1000)/1000.0, level, len(phones)) + } + } +} + +// runTest2 evaluates the provided rainfall and sends to recipients by alert level: +// - red (>=8mm): send to level>=1 (both 1 and 2) +// - yellow ([4,8)mm): send to level>=2 only +// No DB read for forecast; only loads recipients list. +func runTest2(scli *sms.Client, station string, rain float64) { + if strings.TrimSpace(station) == "" { + log.Printf("sms test2: station name required; use --station") + return + } + if rain < 0 { + log.Printf("sms test2: rain must be >= 0") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + recipients, err := data.ListEnabledSMSRecipients(ctx) + if err != nil { + log.Printf("sms test2: load recipients failed: %v", err) + return + } + var heavyPhones, moderatePhones []string + for _, r := range recipients { + if r.AlertLevel >= 1 { + heavyPhones = append(heavyPhones, r.Phone) + } + if r.AlertLevel >= 2 { + moderatePhones = append(moderatePhones, r.Phone) + } + } + if len(heavyPhones) == 0 && len(moderatePhones) == 0 { + log.Printf("sms test2: no recipients, skip") + return + } + + // Decide level by rain (mm) + name := ":" + strings.TrimSpace(station) + "," + content := format3(rain) + " mm" + if rain >= 8.0 { + if len(heavyPhones) == 0 { + log.Printf("sms test2: red alert but no level>=1 recipients") + return + } + alert := "【大礼村】暴雨红色预警" + if err := scli.Send(ctx, name, content, alert, "", heavyPhones); err != nil { + log.Printf("sms test2: send RED failed: %v", err) + } else { + log.Printf("sms test2: sent RED station=%s rain=%.3fmm to=%d", station, rain, len(heavyPhones)) + } + return + } + if rain >= 4.0 { + if len(moderatePhones) == 0 { + log.Printf("sms test2: yellow alert but no level>=2 recipients") + return + } + alert := "【大礼村】暴雨黄色预警" + if err := scli.Send(ctx, name, content, alert, "", moderatePhones); err != nil { + log.Printf("sms test2: send YELLOW failed: %v", err) + } else { + log.Printf("sms test2: sent YELLOW station=%s rain=%.3fmm to=%d", station, rain, len(moderatePhones)) + } + return + } + log.Printf("sms test2: rain %.3fmm below yellow threshold, no alert", rain) } diff --git a/core/cmd/v5-export/main.go b/core/cmd/v5-export/main.go new file mode 100644 index 0000000..63e94e5 --- /dev/null +++ b/core/cmd/v5-export/main.go @@ -0,0 +1,291 @@ +package main + +import ( + "context" + "flag" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + "time" + + "weatherstation/core/internal/data" +) + +type v5Item struct { + FT time.Time + Rain float64 +} + +type v5Result struct { + Station string + Issued time.Time + Base [3]float64 + Actual float64 + Prev [3]float64 + Out [3]float64 + SQLRows []string + Skipped bool + SkipReason string +} + +func main() { + var ( + stationsCSV string + startStr string + endStr string + sqlOut string + logOut string + tzName string + baseProvider string + outProvider string + ) + + flag.StringVar(&stationsCSV, "stations", "", "逗号分隔的 station_id 列表,例如: RS485-000001,RS485-000002") + flag.StringVar(&startStr, "start", "", "开始时间,格式: 2006-01-02 15:04 或 2006-01-02(按整点对齐)") + flag.StringVar(&endStr, "end", "", "结束时间,格式: 2006-01-02 15:04 或 2006-01-02(不包含该时刻)") + flag.StringVar(&sqlOut, "sql", "v5_output.sql", "输出 SQL 文件路径") + flag.StringVar(&logOut, "log", "v5_output.log", "输出日志文件路径") + flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai") + flag.StringVar(&baseProvider, "base", "imdroid_mix", "基础预报源 provider") + flag.StringVar(&outProvider, "out", "imdroid_V5", "输出预报源 provider") + flag.Parse() + + if stationsCSV == "" || startStr == "" || endStr == "" { + fmt.Println("用法示例: v5-export --stations RS485-002A6E --start '2024-08-01 00:00' --end '2024-08-02 00:00' --sql out.sql --log out.log") + os.Exit(2) + } + + // logger: stdout + file + if err := os.MkdirAll(filepath.Dir(sqlOut), 0755); err != nil && filepath.Dir(sqlOut) != "." { + log.Fatalf("create sql dir: %v", err) + } + if err := os.MkdirAll(filepath.Dir(logOut), 0755); err != nil && filepath.Dir(logOut) != "." { + log.Fatalf("create log dir: %v", err) + } + lf, err := os.Create(logOut) + if err != nil { + log.Fatalf("open log file: %v", err) + } + defer lf.Close() + mw := io.MultiWriter(os.Stdout, lf) + logger := log.New(mw, "", log.LstdFlags) + + sf, err := os.Create(sqlOut) + if err != nil { + logger.Fatalf("open sql file: %v", err) + } + defer sf.Close() + + loc, _ := time.LoadLocation(tzName) + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + parseTime := func(s string) (time.Time, error) { + layouts := []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} + var lastErr error + for _, ly := range layouts { + t, err := time.ParseInLocation(ly, s, loc) + if err == nil { + return t, nil + } + lastErr = err + } + return time.Time{}, lastErr + } + start, err := parseTime(startStr) + if err != nil { + logger.Fatalf("parse start: %v", err) + } + end, err := parseTime(endStr) + if err != nil { + logger.Fatalf("parse end: %v", err) + } + start = start.Truncate(time.Hour) + end = end.Truncate(time.Hour) + if !end.After(start) { + logger.Fatalf("end 必须大于 start") + } + + stations := splitStations(stationsCSV) + ctx := context.Background() + + // 写文件头 + fmt.Fprintf(sf, "-- V5 Export generated at %s\n", time.Now().Format(time.RFC3339)) + fmt.Fprintf(sf, "BEGIN;\n") + + for _, st := range stations { + logger.Printf("处理站点 %s: %s → %s", st, start.Format("2006-01-02 15:04"), end.Format("2006-01-02 15:04")) + for t := start; t.Before(end); t = t.Add(1 * time.Hour) { + res := computeV5(ctx, st, t, baseProvider, outProvider, loc) + if res.Skipped { + logger.Printf("skip station=%s issued=%s: %s", st, t.Format("2006-01-02 15:04"), res.SkipReason) + continue + } + // 日志 + logger.Printf("V5 station=%s issued=%s base=[%.3f,%.3f,%.3f] actual=%.3f prev=[%.3f,%.3f,%.3f] out=[%.3f,%.3f,%.3f]", + st, t.Format("2006-01-02 15:04"), + res.Base[0], res.Base[1], res.Base[2], res.Actual, + res.Prev[0], res.Prev[1], res.Prev[2], + res.Out[0], res.Out[1], res.Out[2]) + // SQL + for _, row := range res.SQLRows { + fmt.Fprintln(sf, row) + } + } + } + fmt.Fprintf(sf, "COMMIT;\n") + logger.Printf("完成,SQL 已写入: %s ,日志: %s", sqlOut, logOut) +} + +func splitStations(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +func computeV5(ctx context.Context, stationID string, issued time.Time, baseProvider, outProvider string, loc *time.Location) v5Result { + res := v5Result{Station: stationID, Issued: issued} + + // base issued in this bucket + baseIssued, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, issued) + if err != nil || !ok { + res.Skipped, res.SkipReason = true, fmt.Sprintf("base issued missing: %v ok=%v", err, ok) + return res + } + basePoints, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, baseIssued) + if err != nil || len(basePoints) < 3 { + res.Skipped, res.SkipReason = true, fmt.Sprintf("base points insufficient: %v len=%d", err, len(basePoints)) + return res + } + + // targets times + ft1 := issued.Add(1 * time.Hour) + ft2 := issued.Add(2 * time.Hour) + ft3 := issued.Add(3 * time.Hour) + base1 := pickRain(basePoints, ft1) + base2 := pickRain(basePoints, ft2) + base3 := pickRain(basePoints, ft3) + res.Base = [3]float64{base1, base2, base3} + + // actual just-finished hour + actual, okA, err := data.FetchActualHourlyRain(ctx, stationID, issued.Add(-time.Hour), issued) + if err != nil || !okA { + res.Skipped, res.SkipReason = true, fmt.Sprintf("actual missing: %v ok=%v", err, okA) + return res + } + res.Actual = actual + + // previous preds aligned to same validation time (ft1) + p1, e1 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-1*time.Hour), 1, ft1) + if e1 != nil { + res.Skipped, res.SkipReason = true, e1.Error() + return res + } + p2, e2 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-2*time.Hour), 2, ft1) + if e2 != nil { + res.Skipped, res.SkipReason = true, e2.Error() + return res + } + p3, e3 := pickPrevPredict(ctx, stationID, baseProvider, issued.Add(-3*time.Hour), 3, ft1) + if e3 != nil { + res.Skipped, res.SkipReason = true, e3.Error() + return res + } + res.Prev = [3]float64{p1, p2, p3} + + r1 := actual - p1 + r2 := actual - p2 + r3 := actual - p3 + // Baseline-fallback if negative for all leads + cand1 := base1 + 1.0*r1 + cand2 := base2 + 0.5*r2 + cand3 := base3 + (1.0/3.0)*r3 + var out1, out2, out3 float64 + if cand1 < 0 { + out1 = base1 + } else { + out1 = cand1 + } + if cand2 < 0 { + out2 = base2 + } else { + out2 = cand2 + } + if cand3 < 0 { + out3 = base3 + } else { + out3 = cand3 + } + res.Out = [3]float64{out1, out2, out3} + + rows := make([]string, 0, 3) + rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft1, toX1000(out1))) + rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft2, toX1000(out2))) + rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft3, toX1000(out3))) + res.SQLRows = rows + return res +} + +func pickPrevPredict(ctx context.Context, stationID, provider string, prevBucket time.Time, lead int, validFT time.Time) (float64, error) { + iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, provider, prevBucket) + if err != nil || !ok { + return 0, fmt.Errorf("prev issued missing bucket=%s: %v ok=%v", prevBucket.Format("2006-01-02 15:04"), err, ok) + } + pts, err := data.ForecastRainAtIssued(ctx, stationID, provider, iss) + if err != nil || len(pts) < lead { + return 0, fmt.Errorf("prev points insufficient lead=%d: %v len=%d", lead, err, len(pts)) + } + if v := pickRain(pts, validFT); v >= 0 { + return v, nil + } + switch lead { + case 1: + return toMM(pts[0].RainMMx1000), nil + case 2: + if len(pts) >= 2 { + return toMM(pts[1].RainMMx1000), nil + } + case 3: + if len(pts) >= 3 { + return toMM(pts[2].RainMMx1000), nil + } + } + return 0, fmt.Errorf("prev choose failed lead=%d", lead) +} + +func pickRain(points []data.PredictPoint, ft time.Time) float64 { + for _, p := range points { + if p.ForecastTime.Equal(ft) { + return toMM(p.RainMMx1000) + } + } + return -1 +} + +func toMM(vx1000 int32) float64 { return float64(vx1000) / 1000.0 } +func toX1000(mm float64) int32 { return int32(mm*1000 + 0.5) } +func clamp0(v float64) float64 { + if v < 0 { + return 0 + } + return v +} + +func insertRainSQL(stationID, provider string, issued, ft time.Time, rainX1000 int32) string { + // 使用 RFC3339 和 Postgres timestamptz 解析兼容的格式 + return fmt.Sprintf( + "INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000) VALUES ('%s','%s','%s','%s',%d) ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000;", + escapeSQL(stationID), provider, issued.Format(time.RFC3339), ft.Format(time.RFC3339), rainX1000, + ) +} + +func escapeSQL(s string) string { return strings.ReplaceAll(s, "'", "''") } diff --git a/core/cmd/v5-model/main.go b/core/cmd/v5-model/main.go new file mode 100644 index 0000000..c75196d --- /dev/null +++ b/core/cmd/v5-model/main.go @@ -0,0 +1,271 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "strings" + "time" + + "weatherstation/core/internal/data" +) + +const ( + baseProvider = "imdroid_mix" + outProvider = "imdroid_V5" +) + +func main() { + var stationsCSV, issuedStr, startStr, endStr, tzName string + flag.StringVar(&stationsCSV, "stations", "", "逗号分隔的 station_id 列表;为空则自动扫描有基线的站点") + flag.StringVar(&issuedStr, "issued", "", "指定 issued 时间(整点),格式: 2006-01-02 15:00;为空用当前整点") + flag.StringVar(&startStr, "start", "", "开始时间(整点),格式: 2006-01-02 15:00;与 --end 一起使用,end 为开区间") + flag.StringVar(&endStr, "end", "", "结束时间(整点),格式: 2006-01-02 15:00;与 --start 一起使用,end 为开区间") + flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai") + flag.Parse() + + ctx := context.Background() + loc, _ := time.LoadLocation(tzName) + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + + parse := func(s string) (time.Time, error) { + var t time.Time + var err error + for _, ly := range []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} { + t, err = time.ParseInLocation(ly, s, loc) + if err == nil { + return t.Truncate(time.Hour), nil + } + } + return time.Time{}, err + } + + // Determine mode: single issued or range + if strings.TrimSpace(startStr) != "" && strings.TrimSpace(endStr) != "" { + start, err := parse(startStr) + if err != nil { + log.Fatalf("无法解析 start: %v", err) + } + end, err := parse(endStr) + if err != nil { + log.Fatalf("无法解析 end: %v", err) + } + if !end.After(start) { + log.Fatalf("end 必须大于 start") + } + for t := start; t.Before(end); t = t.Add(time.Hour) { + var stations []string + if strings.TrimSpace(stationsCSV) != "" { + stations = splitStations(stationsCSV) + } else { + var err error + stations, err = listStationsWithBase(ctx, baseProvider, t) + if err != nil { + log.Fatalf("list stations failed: %v", err) + } + } + if len(stations) == 0 { + log.Printf("no stations to process for issued=%s", t.Format("2006-01-02 15:04:05")) + continue + } + for _, st := range stations { + if err := runForStation(ctx, st, t); err != nil { + log.Printf("V5 station=%s issued=%s error: %v", st, t.Format("2006-01-02 15:04:05"), err) + } + } + } + return + } + + // Single issued + var issued time.Time + if strings.TrimSpace(issuedStr) != "" { + var err error + issued, err = parse(issuedStr) + if err != nil || issued.IsZero() { + log.Fatalf("无法解析 issued: %v", err) + } + } else { + issued = time.Now().In(loc).Truncate(time.Hour) + } + var stations []string + if strings.TrimSpace(stationsCSV) != "" { + stations = splitStations(stationsCSV) + } else { + var err error + stations, err = listStationsWithBase(ctx, baseProvider, issued) + if err != nil { + log.Fatalf("list stations failed: %v", err) + } + } + if len(stations) == 0 { + log.Printf("no stations to process for issued=%s", issued.Format("2006-01-02 15:04:05")) + return + } + for _, st := range stations { + if err := runForStation(ctx, st, issued); err != nil { + log.Printf("V5 station=%s error: %v", st, err) + } + } +} + +func listStationsWithBase(ctx context.Context, provider string, issued time.Time) ([]string, error) { + const q = ` + SELECT DISTINCT station_id + FROM forecast_hourly + WHERE provider=$1 AND issued_at >= $2 AND issued_at < $2 + interval '1 hour'` + rows, err := data.DB().QueryContext(ctx, q, provider, issued) + if err != nil { + return nil, err + } + defer rows.Close() + var out []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err == nil { + out = append(out, id) + } + } + return out, nil +} + +func splitStations(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +func runForStation(ctx context.Context, stationID string, issued time.Time) error { + // 解析当前 issued 桶内的基础源发布时间(取最后一条) + baseIssued, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, issued) + if err != nil || !ok { + return fmt.Errorf("resolve base issued failed: %v ok=%v", err, ok) + } + basePoints, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, baseIssued) + if err != nil || len(basePoints) < 3 { + return fmt.Errorf("load base points failed: %v len=%d", err, len(basePoints)) + } + + // 取有效时间 + ft1 := issued.Add(1 * time.Hour) + ft2 := issued.Add(2 * time.Hour) + ft3 := issued.Add(3 * time.Hour) + + base1, base2, base3 := pickRain(basePoints, ft1), pickRain(basePoints, ft2), pickRain(basePoints, ft3) + + // 计算三个 horizon 的偏差: + // r1 = 实况[issued-1,issued) - (issued-1 的 +1) + // r2 = 实况[issued-1,issued) - (issued-2 的 +2) + // r3 = 实况[issued-1,issued) - (issued-3 的 +3) + actual, okA, err := data.FetchActualHourlyRain(ctx, stationID, issued.Add(-time.Hour), issued) + if err != nil || !okA { + return fmt.Errorf("actual not ready: %v ok=%v", err, okA) + } + + p1, err := pickPrevPredict(ctx, stationID, issued.Add(-1*time.Hour), 1, issued) + if err != nil { + return err + } + p2, err := pickPrevPredict(ctx, stationID, issued.Add(-2*time.Hour), 2, issued) + if err != nil { + return err + } + p3, err := pickPrevPredict(ctx, stationID, issued.Add(-3*time.Hour), 3, issued) + if err != nil { + return err + } + + r1 := actual - p1 + r2 := actual - p2 + r3 := actual - p3 + + // Apply baseline-fallback if negative for all leads + cand1 := base1 + 1.0*r1 + cand2 := base2 + 0.5*r2 + cand3 := base3 + (1.0/3.0)*r3 + var out1, out2, out3 float64 + if cand1 < 0 { + out1 = base1 + } else { + out1 = cand1 + } + if cand2 < 0 { + out2 = base2 + } else { + out2 = cand2 + } + if cand3 < 0 { + out3 = base3 + } else { + out3 = cand3 + } + + items := []data.UpsertRainItem{ + {ForecastTime: ft1, RainMMx1000: toX1000(out1)}, + {ForecastTime: ft2, RainMMx1000: toX1000(out2)}, + {ForecastTime: ft3, RainMMx1000: toX1000(out3)}, + } + if err := data.UpsertForecastRain(ctx, stationID, outProvider, issued, items); err != nil { + return err + } + log.Printf("V5 %s issued=%s base=[%.3f,%.3f,%.3f] actual=%.3f prev=[%.3f,%.3f,%.3f] out=[%.3f,%.3f,%.3f]", + stationID, issued.Format("2006-01-02 15:04:05"), + base1, base2, base3, actual, p1, p2, p3, out1, out2, out3, + ) + return nil +} + +func pickPrevPredict(ctx context.Context, stationID string, prevBucket time.Time, lead int, validFT time.Time) (float64, error) { + iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, prevBucket) + if err != nil || !ok { + return 0, fmt.Errorf("resolve prev issued fail bucket=%s: %v ok=%v", prevBucket, err, ok) + } + pts, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, iss) + if err != nil || len(pts) < lead { + return 0, fmt.Errorf("load prev points fail lead=%d: %v len=%d", lead, err, len(pts)) + } + // 直接按 validFT 精确匹配(容错:若不存在则按 lead 取第 lead 个) + if v := pickRain(pts, validFT); v >= 0 { + return v, nil + } + switch lead { + case 1: + return toMM(pts[0].RainMMx1000), nil + case 2: + if len(pts) >= 2 { + return toMM(pts[1].RainMMx1000), nil + } + case 3: + if len(pts) >= 3 { + return toMM(pts[2].RainMMx1000), nil + } + } + return 0, fmt.Errorf("insufficient points for lead=%d", lead) +} + +func pickRain(points []data.PredictPoint, ft time.Time) float64 { + for _, p := range points { + if p.ForecastTime.Equal(ft) { + return toMM(p.RainMMx1000) + } + } + return -1 +} + +func toMM(vx1000 int32) float64 { return float64(vx1000) / 1000.0 } +func toX1000(mm float64) int32 { return int32(mm*1000 + 0.5) } +func clamp0(v float64) float64 { + if v < 0 { + return 0 + } + return v +} diff --git a/core/cmd/v6-export/main.go b/core/cmd/v6-export/main.go new file mode 100644 index 0000000..f933014 --- /dev/null +++ b/core/cmd/v6-export/main.go @@ -0,0 +1,288 @@ +package main + +import ( + "context" + "flag" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + "time" + + "weatherstation/core/internal/data" +) + +// V6 导出工具: +// - 以 imdroid_mix 为基线 b_t(+k) +// - 残差 e_t(+k) 优先使用“上一轮 V6 的预测误差”,冷启动时退回使用 mix 的历史预测误差 +// - out(+1) = max(0, base1 + 1.0*e1) +// - out(+2) = max(0, base2 + 0.5*e2) +// - out(+3) = max(0, base3 + (1/3)*e3) +// - 仅生成 SQL 与日志,不写库 + +const ( + baseProvider = "imdroid_mix" + outProvider = "imdroid_V6" +) + +type v6Out struct { + FT time.Time + Rain float64 +} + +func main() { + var stationsCSV, startStr, endStr, sqlOut, logOut, tzName string + flag.StringVar(&stationsCSV, "stations", "", "逗号分隔的 station_id 列表,例如: RS485-000001,RS485-000002") + flag.StringVar(&startStr, "start", "", "开始时间,格式: 2006-01-02 15:00 或 2006-01-02(按整点对齐)") + flag.StringVar(&endStr, "end", "", "结束时间,格式: 2006-01-02 15:00 或 2006-01-02(不包含该时刻)") + flag.StringVar(&sqlOut, "sql", "v6_output.sql", "输出 SQL 文件路径") + flag.StringVar(&logOut, "log", "v6_output.log", "输出日志文件路径") + flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai") + flag.Parse() + + if stationsCSV == "" || startStr == "" || endStr == "" { + fmt.Println("用法: v6-export --stations RS485-XXXXXX --start '2024-08-01 00:00' --end '2024-08-02 00:00' --sql out.sql --log out.log") + os.Exit(2) + } + + if err := os.MkdirAll(filepath.Dir(sqlOut), 0755); err != nil && filepath.Dir(sqlOut) != "." { + log.Fatalf("create sql dir: %v", err) + } + if err := os.MkdirAll(filepath.Dir(logOut), 0755); err != nil && filepath.Dir(logOut) != "." { + log.Fatalf("create log dir: %v", err) + } + lf, err := os.Create(logOut) + if err != nil { + log.Fatalf("open log file: %v", err) + } + defer lf.Close() + logger := log.New(io.MultiWriter(os.Stdout, lf), "", log.LstdFlags) + + sf, err := os.Create(sqlOut) + if err != nil { + logger.Fatalf("open sql file: %v", err) + } + defer sf.Close() + fmt.Fprintf(sf, "-- V6 Export generated at %s\nBEGIN;\n", time.Now().Format(time.RFC3339)) + + loc, _ := time.LoadLocation(tzName) + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + parse := func(s string) (time.Time, error) { + for _, ly := range []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} { + if t, err := time.ParseInLocation(ly, s, loc); err == nil { + return t, nil + } + } + return time.Time{}, fmt.Errorf("invalid time: %s", s) + } + start, err := parse(startStr) + if err != nil { + logger.Fatalf("parse start: %v", err) + } + end, err := parse(endStr) + if err != nil { + logger.Fatalf("parse end: %v", err) + } + start = start.Truncate(time.Hour) + end = end.Truncate(time.Hour) + if !end.After(start) { + logger.Fatalf("end 必须大于 start") + } + + stations := splitStations(stationsCSV) + ctx := context.Background() + + for _, st := range stations { + logger.Printf("V6 导出 站点=%s 窗口=%s→%s", st, start.Format("2006-01-02 15:04"), end.Format("2006-01-02 15:04")) + // 维护一个“按验证时刻”的 V6 预测缓存:key=forecast_time,value=预测雨量 + v6AtTime := make(map[time.Time]float64) + + for t := start; t.Before(end); t = t.Add(time.Hour) { + res := computeV6AtHour(ctx, st, t, v6AtTime, logger) + if res.skipped { + logger.Printf("skip station=%s issued=%s: %s", st, t.Format("2006-01-02 15:04"), res.reason) + continue + } + // 写 SQL + for _, row := range res.sqlRows { + fmt.Fprintln(sf, row) + } + // 更新缓存:将本次的 +1/+2/+3 结果写入对应的验证时刻键 + v6AtTime[t.Add(1*time.Hour)] = res.out[0] + v6AtTime[t.Add(2*time.Hour)] = res.out[1] + v6AtTime[t.Add(3*time.Hour)] = res.out[2] + + logger.Printf("V6 %s issued=%s base=[%.3f,%.3f,%.3f] actual=%.3f prev=[%.3f,%.3f,%.3f] out=[%.3f,%.3f,%.3f] src=[%s,%s,%s]", + st, t.Format("2006-01-02 15:04"), res.base[0], res.base[1], res.base[2], res.actual, + res.prev[0], res.prev[1], res.prev[2], res.out[0], res.out[1], res.out[2], + res.src[0], res.src[1], res.src[2]) + } + } + fmt.Fprintln(sf, "COMMIT;") + logger.Printf("完成,SQL: %s 日志: %s", sqlOut, logOut) +} + +type v6Result struct { + base [3]float64 + prev [3]float64 + src [3]string // 使用的前一预测来源:V6 或 mix + out [3]float64 + actual float64 + sqlRows []string + skipped bool + reason string +} + +func computeV6AtHour(ctx context.Context, stationID string, issued time.Time, v6AtTime map[time.Time]float64, logger *log.Logger) v6Result { + var res v6Result + + // 读取基线:当期小时桶内 mix 最新 issued 的 +1/+2/+3 + baseIssued, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, issued) + if err != nil || !ok { + res.skipped, res.reason = true, fmt.Sprintf("base issued missing: %v ok=%v", err, ok) + return res + } + pts, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, baseIssued) + if err != nil || len(pts) < 3 { + res.skipped, res.reason = true, fmt.Sprintf("base points insufficient: %v len=%d", err, len(pts)) + return res + } + ft1, ft2, ft3 := issued.Add(time.Hour), issued.Add(2*time.Hour), issued.Add(3*time.Hour) + base1, base2, base3 := pickRain(pts, ft1), pickRain(pts, ft2), pickRain(pts, ft3) + res.base = [3]float64{base1, base2, base3} + + // 实况:刚结束一小时 [t-1,t) + actual, okA, err := data.FetchActualHourlyRain(ctx, stationID, issued.Add(-time.Hour), issued) + if err != nil || !okA { + res.skipped, res.reason = true, fmt.Sprintf("actual missing: %v ok=%v", err, okA) + return res + } + res.actual = actual + + // 前一预测(优先 V6 缓存,否则退回 mix 历史) + // +1:需要 (t-1) 发布、验证时刻 t 的预测值 + vPrev1, src1, ok1 := prevForValidation(ctx, stationID, issued, 1, v6AtTime) + vPrev2, src2, ok2 := prevForValidation(ctx, stationID, issued, 2, v6AtTime) + vPrev3, src3, ok3 := prevForValidation(ctx, stationID, issued, 3, v6AtTime) + if !(ok1 && ok2 && ok3) { + // 若冷启动,允许个别 lead 不可用时跳过;也可以只输出可用的 lead,这里采取全量可用才输出 + res.skipped, res.reason = true, fmt.Sprintf("prev missing leads: h1=%v h2=%v h3=%v", ok1, ok2, ok3) + return res + } + res.prev = [3]float64{vPrev1, vPrev2, vPrev3} + res.src = [3]string{src1, src2, src3} + + // 残差与输出 + e1 := actual - vPrev1 + e2 := actual - vPrev2 + e3 := actual - vPrev3 + cand1 := base1 + 1.0*e1 + cand2 := base2 + 0.5*e2 + cand3 := base3 + (1.0/3.0)*e3 + var out1, out2, out3 float64 + if cand1 < 0 { + out1 = base1 + } else { + out1 = cand1 + } + if cand2 < 0 { + out2 = base2 + } else { + out2 = cand2 + } + if cand3 < 0 { + out3 = base3 + } else { + out3 = cand3 + } + res.out = [3]float64{out1, out2, out3} + + // 生成 SQL(仅雨量 upsert) + rows := make([]string, 0, 3) + rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft1, toX1000(out1))) + rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft2, toX1000(out2))) + rows = append(rows, insertRainSQL(stationID, outProvider, issued, ft3, toX1000(out3))) + res.sqlRows = rows + return res +} + +// prevForValidation 返回用于“验证时刻=issued+0h”的上一预测:优先使用 V6 的缓存;如无则退回 mix 的历史。 +func prevForValidation(ctx context.Context, stationID string, issued time.Time, lead int, v6AtTime map[time.Time]float64) (float64, string, bool) { + // 需要的验证时刻 + vt := issued // 验证在 t + // 先看 V6 缓存:我们在前面会把 V6 的结果按 forecast_time 存入 map + if v, ok := v6AtTime[vt]; ok { + return v, "V6", true + } + // 否则退回 mix 历史:在 (t-lead) 的小时桶内,取最新 issued 的 +lead + prevBucket := issued.Add(-time.Duration(lead) * time.Hour) + iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, baseProvider, prevBucket) + if err != nil || !ok { + return 0, "", false + } + pts, err := data.ForecastRainAtIssued(ctx, stationID, baseProvider, iss) + if err != nil || len(pts) < lead { + return 0, "", false + } + // 直接用验证时刻 vt 精确匹配 + if v := pickRain(pts, vt); v >= 0 { + return v, baseProvider, true + } + // 或退回位置索引 + switch lead { + case 1: + return toMM(pts[0].RainMMx1000), baseProvider, true + case 2: + if len(pts) >= 2 { + return toMM(pts[1].RainMMx1000), baseProvider, true + } + case 3: + if len(pts) >= 3 { + return toMM(pts[2].RainMMx1000), baseProvider, true + } + } + return 0, "", false +} + +func splitStations(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +func pickRain(points []data.PredictPoint, ft time.Time) float64 { + for _, p := range points { + if p.ForecastTime.Equal(ft) { + return toMM(p.RainMMx1000) + } + } + return -1 +} + +func toMM(vx1000 int32) float64 { return float64(vx1000) / 1000.0 } +func toX1000(mm float64) int32 { return int32(mm*1000 + 0.5) } +func clamp0(v float64) float64 { + if v < 0 { + return 0 + } + return v +} + +func insertRainSQL(stationID, provider string, issued, ft time.Time, rainX1000 int32) string { + return fmt.Sprintf( + "INSERT INTO forecast_hourly (station_id, provider, issued_at, forecast_time, rain_mm_x1000) VALUES ('%s','%s','%s','%s',%d) ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET rain_mm_x1000=EXCLUDED.rain_mm_x1000;", + escapeSQL(stationID), provider, issued.Format(time.RFC3339), ft.Format(time.RFC3339), rainX1000, + ) +} + +func escapeSQL(s string) string { return strings.ReplaceAll(s, "'", "''") } diff --git a/core/cmd/v6-model/main.go b/core/cmd/v6-model/main.go new file mode 100644 index 0000000..5f311f1 --- /dev/null +++ b/core/cmd/v6-model/main.go @@ -0,0 +1,261 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "strings" + "time" + + "weatherstation/core/internal/data" +) + +const ( + v6BaseProvider = "imdroid_mix" + v6OutProvider = "imdroid_V6" +) + +func main() { + var stationsCSV, issuedStr, startStr, endStr, tzName string + flag.StringVar(&stationsCSV, "stations", "", "逗号分隔的 station_id 列表;为空则自动扫描有基线的站点") + flag.StringVar(&issuedStr, "issued", "", "指定 issued 时间(整点),格式: 2006-01-02 15:00;为空用当前整点") + flag.StringVar(&startStr, "start", "", "开始时间(整点),格式: 2006-01-02 15:00;与 --end 一起使用,end 为开区间") + flag.StringVar(&endStr, "end", "", "结束时间(整点),格式: 2006-01-02 15:00;与 --start 一起使用,end 为开区间") + flag.StringVar(&tzName, "tz", "Asia/Shanghai", "时区,例如 Asia/Shanghai") + flag.Parse() + + ctx := context.Background() + loc, _ := time.LoadLocation(tzName) + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + + parse := func(s string) (time.Time, error) { + var t time.Time + var err error + for _, ly := range []string{"2006-01-02 15:04", "2006-01-02 15", "2006-01-02"} { + t, err = time.ParseInLocation(ly, s, loc) + if err == nil { + return t.Truncate(time.Hour), nil + } + } + return time.Time{}, err + } + + if strings.TrimSpace(startStr) != "" && strings.TrimSpace(endStr) != "" { + start, err := parse(startStr) + if err != nil { + log.Fatalf("无法解析 start: %v", err) + } + end, err := parse(endStr) + if err != nil { + log.Fatalf("无法解析 end: %v", err) + } + if !end.After(start) { + log.Fatalf("end 必须大于 start") + } + for t := start; t.Before(end); t = t.Add(time.Hour) { + var stations []string + if strings.TrimSpace(stationsCSV) != "" { + stations = splitStations(stationsCSV) + } else { + var err error + stations, err = listStationsWithBase(ctx, v6BaseProvider, t) + if err != nil { + log.Fatalf("list stations failed: %v", err) + } + } + if len(stations) == 0 { + log.Printf("no stations to process for issued=%s", t.Format("2006-01-02 15:04:05")) + continue + } + for _, st := range stations { + if err := runV6ForStation(ctx, st, t); err != nil { + log.Printf("V6 station=%s issued=%s error: %v", st, t.Format("2006-01-02 15:04:05"), err) + } + } + } + return + } + + var issued time.Time + if strings.TrimSpace(issuedStr) != "" { + var err error + issued, err = parse(issuedStr) + if err != nil || issued.IsZero() { + log.Fatalf("无法解析 issued: %v", err) + } + } else { + issued = time.Now().In(loc).Truncate(time.Hour) + } + var stations []string + if strings.TrimSpace(stationsCSV) != "" { + stations = splitStations(stationsCSV) + } else { + var err error + stations, err = listStationsWithBase(ctx, v6BaseProvider, issued) + if err != nil { + log.Fatalf("list stations failed: %v", err) + } + } + if len(stations) == 0 { + log.Printf("no stations to process for issued=%s", issued.Format("2006-01-02 15:04:05")) + return + } + for _, st := range stations { + if err := runV6ForStation(ctx, st, issued); err != nil { + log.Printf("V6 station=%s error: %v", st, err) + } + } +} + +func splitStations(s string) []string { + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} + +// listStationsWithBase 与 v5 共用逻辑,通过 forecast_hourly 检索该 issued 桶内有基线的站点 +func listStationsWithBase(ctx context.Context, provider string, issued time.Time) ([]string, error) { + const q = `SELECT DISTINCT station_id FROM forecast_hourly WHERE provider=$1 AND issued_at >= $2 AND issued_at < $2 + interval '1 hour'` + rows, err := data.DB().QueryContext(ctx, q, provider, issued) + if err != nil { + return nil, err + } + defer rows.Close() + var out []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err == nil { + out = append(out, id) + } + } + return out, nil +} + +func runV6ForStation(ctx context.Context, stationID string, issued time.Time) error { + // 基线:当期小时桶内 mix 最新 issued + baseIssued, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, v6BaseProvider, issued) + if err != nil || !ok { + return fmt.Errorf("base issued missing: %v ok=%v", err, ok) + } + pts, err := data.ForecastRainAtIssued(ctx, stationID, v6BaseProvider, baseIssued) + if err != nil || len(pts) < 3 { + return fmt.Errorf("base points insufficient: %v len=%d", err, len(pts)) + } + ft1, ft2, ft3 := issued.Add(time.Hour), issued.Add(2*time.Hour), issued.Add(3*time.Hour) + base1, base2, base3 := pickRain(pts, ft1), pickRain(pts, ft2), pickRain(pts, ft3) + + // 实况 + actual, okA, err := data.FetchActualHourlyRain(ctx, stationID, issued.Add(-time.Hour), issued) + if err != nil || !okA { + return fmt.Errorf("actual missing: %v ok=%v", err, okA) + } + + // 残差:优先 V6 历史,否则回退 mix 历史 + vPrev1, ok1 := prevV6OrMix(ctx, stationID, issued, 1) + vPrev2, ok2 := prevV6OrMix(ctx, stationID, issued, 2) + vPrev3, ok3 := prevV6OrMix(ctx, stationID, issued, 3) + if !(ok1 && ok2 && ok3) { + return fmt.Errorf("prev missing leads: h1=%v h2=%v h3=%v", ok1, ok2, ok3) + } + + e1 := actual - vPrev1 + e2 := actual - vPrev2 + e3 := actual - vPrev3 + cand1 := base1 + 1.0*e1 + cand2 := base2 + 0.5*e2 + cand3 := base3 + (1.0/3.0)*e3 + var out1, out2, out3 float64 + if cand1 < 0 { + out1 = base1 + } else { + out1 = cand1 + } + if cand2 < 0 { + out2 = base2 + } else { + out2 = cand2 + } + if cand3 < 0 { + out3 = base3 + } else { + out3 = cand3 + } + + items := []data.UpsertRainItem{ + {ForecastTime: ft1, RainMMx1000: toX1000(out1)}, + {ForecastTime: ft2, RainMMx1000: toX1000(out2)}, + {ForecastTime: ft3, RainMMx1000: toX1000(out3)}, + } + if err := data.UpsertForecastRain(ctx, stationID, v6OutProvider, issued, items); err != nil { + return err + } + log.Printf("V6 %s issued=%s base=[%.3f,%.3f,%.3f] actual=%.3f prev=[%.3f,%.3f,%.3f] out=[%.3f,%.3f,%.3f]", + stationID, issued.Format("2006-01-02 15:04:05"), base1, base2, base3, actual, vPrev1, vPrev2, vPrev3, out1, out2, out3) + return nil +} + +func prevV6OrMix(ctx context.Context, stationID string, issued time.Time, lead int) (float64, bool) { + // 验证时刻 + vt := issued + // 先找 V6 历史:在 (t-lead) 桶内找 v6 的 issued,取 +lead @ vt + if iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, v6OutProvider, issued.Add(-time.Duration(lead)*time.Hour)); err == nil && ok { + if pts, err := data.ForecastRainAtIssued(ctx, stationID, v6OutProvider, iss); err == nil && len(pts) >= lead { + if v := pickRain(pts, vt); v >= 0 { + return v, true + } + switch lead { + case 1: + return toMM(pts[0].RainMMx1000), true + case 2: + if len(pts) >= 2 { + return toMM(pts[1].RainMMx1000), true + } + case 3: + if len(pts) >= 3 { + return toMM(pts[2].RainMMx1000), true + } + } + } + } + // 退回 mix 历史 + if iss, ok, err := data.ResolveIssuedAtInBucket(ctx, stationID, v6BaseProvider, issued.Add(-time.Duration(lead)*time.Hour)); err == nil && ok { + if pts, err := data.ForecastRainAtIssued(ctx, stationID, v6BaseProvider, iss); err == nil && len(pts) >= lead { + if v := pickRain(pts, vt); v >= 0 { + return v, true + } + switch lead { + case 1: + return toMM(pts[0].RainMMx1000), true + case 2: + if len(pts) >= 2 { + return toMM(pts[1].RainMMx1000), true + } + case 3: + if len(pts) >= 3 { + return toMM(pts[2].RainMMx1000), true + } + } + } + } + return 0, false +} + +func pickRain(points []data.PredictPoint, ft time.Time) float64 { + for _, p := range points { + if p.ForecastTime.Equal(ft) { + return toMM(p.RainMMx1000) + } + } + return -1 +} +func toMM(vx1000 int32) float64 { return float64(vx1000) / 1000.0 } +func toX1000(mm float64) int32 { return int32(mm*1000 + 0.5) } diff --git a/core/internal/data/forecast.go b/core/internal/data/forecast.go index b567777..2ea3c67 100644 --- a/core/internal/data/forecast.go +++ b/core/internal/data/forecast.go @@ -37,3 +37,49 @@ func ForecastRainAtIssued(ctx context.Context, stationID, provider string, issue } return out, nil } + +// ResolveIssuedAtInBucket finds the latest issued_at in [bucket, bucket+1h) for station/provider. +func ResolveIssuedAtInBucket(ctx context.Context, stationID, provider string, bucketHour time.Time) (time.Time, bool, error) { + const q = `SELECT issued_at FROM forecast_hourly WHERE station_id=$1 AND provider=$2 AND issued_at >= $3 AND issued_at < $3 + interval '1 hour' ORDER BY issued_at DESC LIMIT 1` + var t time.Time + err := DB().QueryRowContext(ctx, q, stationID, provider, bucketHour).Scan(&t) + if err == sql.ErrNoRows { + return time.Time{}, false, nil + } + if err != nil { + return time.Time{}, false, err + } + return t, true, nil +} + +// UpsertForecastRain writes rain-only rows for a provider at issued_at, upserting by key. +// Only the rain_mm_x1000 column is set/updated; other columns remain NULL or unchanged. +type UpsertRainItem struct { + ForecastTime time.Time + RainMMx1000 int32 +} + +func UpsertForecastRain(ctx context.Context, stationID, provider string, issuedAt time.Time, items []UpsertRainItem) error { + if len(items) == 0 { + return nil + } + const q = ` + INSERT INTO forecast_hourly ( + station_id, provider, issued_at, forecast_time, rain_mm_x1000 + ) VALUES ($1,$2,$3,$4,$5) + ON CONFLICT (station_id, provider, issued_at, forecast_time) DO UPDATE SET + rain_mm_x1000 = EXCLUDED.rain_mm_x1000` + tx, err := DB().BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return err + } + defer func() { + _ = tx.Rollback() + }() + for _, it := range items { + if _, err := tx.ExecContext(ctx, q, stationID, provider, issuedAt, it.ForecastTime, it.RainMMx1000); err != nil { + return err + } + } + return tx.Commit() +} diff --git a/core/internal/data/rain.go b/core/internal/data/rain.go new file mode 100644 index 0000000..a843523 --- /dev/null +++ b/core/internal/data/rain.go @@ -0,0 +1,21 @@ +package data + +import ( + "context" + "database/sql" + "time" +) + +// FetchActualHourlyRain sums rs485_weather_10min.rain_10m_mm_x1000 over [start,end) and returns mm. +func FetchActualHourlyRain(ctx context.Context, stationID string, start, end time.Time) (float64, bool, error) { + const q = `SELECT SUM(rain_10m_mm_x1000) FROM rs485_weather_10min WHERE station_id=$1 AND bucket_start >= $2 AND bucket_start < $3` + var sum sql.NullInt64 + err := DB().QueryRowContext(ctx, q, stationID, start, end).Scan(&sum) + if err != nil { + return 0, false, err + } + if !sum.Valid { + return 0, false, nil + } + return float64(sum.Int64) / 1000.0, true, nil +} diff --git a/core/internal/data/sms.go b/core/internal/data/sms.go new file mode 100644 index 0000000..e959c50 --- /dev/null +++ b/core/internal/data/sms.go @@ -0,0 +1,31 @@ +package data + +import ( + "context" +) + +// SMSRecipient models an entry in sms_recipients. +type SMSRecipient struct { + Phone string + Enabled bool + AlertLevel int +} + +// ListEnabledSMSRecipients returns all enabled recipients. +func ListEnabledSMSRecipients(ctx context.Context) ([]SMSRecipient, error) { + const q = `SELECT phone, enabled, alert_level FROM sms_recipients WHERE enabled = TRUE` + rows, err := DB().QueryContext(ctx, q) + if err != nil { + return nil, err + } + defer rows.Close() + var out []SMSRecipient + for rows.Next() { + var r SMSRecipient + if err := rows.Scan(&r.Phone, &r.Enabled, &r.AlertLevel); err != nil { + continue + } + out = append(out, r) + } + return out, nil +} diff --git a/core/internal/data/station.go b/core/internal/data/station.go new file mode 100644 index 0000000..ee0a3da --- /dev/null +++ b/core/internal/data/station.go @@ -0,0 +1,23 @@ +package data + +import ( + "context" + "database/sql" +) + +// GetStationName returns stations.name by station_id; empty string if not found/null. +func GetStationName(ctx context.Context, stationID string) (string, error) { + const q = `SELECT COALESCE(name, '') FROM stations WHERE station_id = $1` + var name sql.NullString + err := DB().QueryRowContext(ctx, q, stationID).Scan(&name) + if err != nil { + if err == sql.ErrNoRows { + return "", nil + } + return "", err + } + if name.Valid { + return name.String, nil + } + return "", nil +} diff --git a/core/internal/sms/sms.go b/core/internal/sms/sms.go index d3614f0..ed8b9e7 100644 --- a/core/internal/sms/sms.go +++ b/core/internal/sms/sms.go @@ -55,15 +55,16 @@ type TemplateData struct { Time string `json:"time"` Name string `json:"name"` Content string `json:"content"` + Alert string `json:"alert"` } // Send sends the template message to one or more phone numbers. -// deviceIDs/name/content/msgTime follow the existing Java contract. -func (c *Client) Send(ctx context.Context, deviceIDs, content, msgTime string, phones []string) error { +// name/content/alert/msgTime map to template ${name}, ${content}, ${alert}, ${time}. +func (c *Client) Send(ctx context.Context, name, content, alert, msgTime string, phones []string) error { if len(phones) == 0 { return errors.New("sms: empty phone list") } - payload := TemplateData{Time: msgTime, Name: deviceIDs, Content: content} + payload := TemplateData{Time: msgTime, Name: name, Content: content, Alert: alert} b, _ := json.Marshal(payload) param := string(b)