package main import ( "context" "crypto/tls" "encoding/json" "fmt" "log" "math" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "weatherstation/core/internal/data" ) const ( // 固定映射 deviceID = "Z866" stationID = "RS485-002A6E" // MQTT 测试配置(可替换为生产) brokerURL = "wss://broker.emqx.io:8084/mqtt" clientID = "core-publisher-Z866" username = "1" password = "1" topic = "$dp" ) type hwsPayload struct { Type string `json:"type"` Device string `json:"device"` Dm float64 `json:"Dm"` Pa float64 `json:"Pa"` Rc float64 `json:"Rc"` Sm float64 `json:"Sm"` Ta float64 `json:"Ta"` Ua float64 `json:"Ua"` Time int64 `json:"time"` } func main() { // 初始化 MQTT 客户端 opts := mqtt.NewClientOptions().AddBroker(brokerURL) opts.SetClientID(clientID) opts.SetUsername(username) opts.SetPassword(password) opts.SetProtocolVersion(4) opts.SetKeepAlive(60 * time.Second) opts.SetAutoReconnect(true) opts.SetCleanSession(true) opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) cli := mqtt.NewClient(opts) if tok := cli.Connect(); tok.Wait() && tok.Error() != nil { log.Fatalf("MQTT 连接失败: %v", tok.Error()) } defer cli.Disconnect(250) // 5分钟发布任务 go alignAndRun5m(func(tickEnd time.Time) { publishOnce(cli, tickEnd) }) // 1小时+10分钟发布预测任务 go alignAndRunHour10(func(tick time.Time) { publishPredict(cli, tick) }) select {} } func alignAndRun5m(fn func(tickEnd time.Time)) { now := time.Now() next := now.Truncate(5 * time.Minute).Add(5 * time.Minute) time.Sleep(time.Until(next)) for { tickEnd := time.Now().Truncate(5 * time.Minute) fn(tickEnd) time.Sleep(5 * time.Minute) } } func alignAndRunHour10(fn func(tick time.Time)) { // 计算下一个 “整点+10分钟” now := time.Now() base := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 10, 0, 0, now.Location()) var next time.Time if now.After(base) { next = base.Add(1 * time.Hour) } else { next = base } time.Sleep(time.Until(next)) for { tick := time.Now().Truncate(time.Minute) fn(tick) // 每小时执行一次 time.Sleep(1 * time.Hour) } } func publishOnce(cli mqtt.Client, tickEnd time.Time) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() start := tickEnd.Add(-5 * time.Minute) // 聚合窗口 agg, err := data.WindowAverages(ctx, stationID, start, tickEnd) if err != nil { log.Printf("聚合失败: %v", err) return } // 当天累计雨量 rc, err := data.DailyRainSinceMidnight(ctx, stationID, tickEnd) if err != nil { log.Printf("降雨查询失败: %v", err) return } // 处理无效值:若无数据则用 0(<0.001 算无效) dm := 0.0 if agg.Dm.Valid { dm = agg.Dm.Float64 } sm := 0.0 if agg.Sm.Valid { sm = agg.Sm.Float64 } ta := 0.0 if agg.Ta.Valid { ta = agg.Ta.Float64 } ua := 0.0 if agg.Ua.Valid { ua = agg.Ua.Float64 } pa := 0.0 if agg.Pa.Valid { pa = agg.Pa.Float64 } // 四舍五入:风向取整数,其它保留两位小数 round2 := func(v float64) float64 { return math.Round(v*100) / 100 } dmInt := math.Round(dm) payload := hwsPayload{ Type: "hws", Device: deviceID, Dm: dmInt, Pa: round2(pa), Rc: round2(rc), Sm: round2(sm), Ta: round2(ta), Ua: round2(ua), Time: tickEnd.UnixMilli(), } b, _ := json.Marshal(payload) tok := cli.Publish(topic, 1, false, b) tok.Wait() if tok.Error() != nil { log.Printf("发布失败: %v", tok.Error()) return } log.Printf("发布成功 %s: %s", topic, string(b)) } // 预测发布:每小时+10分钟,从 forecast_hourly 按 issued_at=整点 选取数据 func publishPredict(cli mqtt.Client, tick time.Time) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 以 CST 解析 issued_at 整点 loc, _ := time.LoadLocation("Asia/Shanghai") if loc == nil { loc = time.FixedZone("CST", 8*3600) } now := tick.In(loc) issued := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, loc) const forecastStation = "RS485-002A6E" const provider = "imdroid_mix" points, err := data.ForecastRainAtIssued(ctx, forecastStation, provider, issued) if err != nil { log.Printf("预测查询失败: %v", err) return } if len(points) == 0 { log.Printf("预测无数据 issued_at=%s", issued.Format("2006-01-02 15:04:05")) return } // 组装 payload type predictItem struct { PredictTime int64 `json:"predict_time"` PredictRainfall string `json:"predict_rainfall"` } var items []predictItem for _, p := range points { rf := float64(p.RainMMx1000) / 1000.0 items = append(items, predictItem{ PredictTime: p.ForecastTime.UnixMilli(), PredictRainfall: format3(rf), }) } payload := struct { Type string `json:"type"` Device string `json:"device"` Time int64 `json:"time"` Data []predictItem `json:"data"` }{ Type: "predict", Device: deviceID, Time: now.UnixMilli(), // 当前“整点+10分钟”的时间 Data: items, } b, _ := json.Marshal(payload) tok := cli.Publish(topic, 1, false, b) tok.Wait() if tok.Error() != nil { log.Printf("发布预测失败: %v", tok.Error()) return } log.Printf("发布预测成功 %s: issued_at=%s, items=%d", topic, issued.Format("2006-01-02 15:04:05"), len(items)) } func format3(v float64) string { // 保留三位小数(字符串形式) s := fmt.Sprintf("%.3f", v) return s }