2025-11-05 16:47:38 +08:00

227 lines
5.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"weatherstation/core/internal/data"
)
const (
// 固定映射
deviceID = "Z866"
stationID = "RS485-002A6E"
// MQTT
brokerURL = "wss://broker.emqx.io:8084/mqtt"
clientID = "core-publisher-Z866"
username = "1"
password = "1"
topic = "$dp"
)
type hwsPayload struct {
Type string `json:"type"`
Device string `json:"device"`
Dm float64 `json:"Dm"`
Pa float64 `json:"Pa"`
Rc float64 `json:"Rc"`
Sm float64 `json:"Sm"`
Ta float64 `json:"Ta"`
Ua float64 `json:"Ua"`
Time int64 `json:"time"`
}
func main() {
// 初始化 MQTT 客户端
opts := mqtt.NewClientOptions().AddBroker(brokerURL)
opts.SetClientID(clientID)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetProtocolVersion(4)
opts.SetKeepAlive(60 * time.Second)
opts.SetAutoReconnect(true)
opts.SetCleanSession(true)
cli := mqtt.NewClient(opts)
if tok := cli.Connect(); tok.Wait() && tok.Error() != nil {
log.Fatalf("MQTT 连接失败: %v", tok.Error())
}
defer cli.Disconnect(250)
// 5分钟发布任务
go alignAndRun5m(func(tickEnd time.Time) { publishOnce(cli, tickEnd) })
// 1小时+10分钟发布预测任务
go alignAndRunHour10(func(tick time.Time) { publishPredict(cli, tick) })
select {}
}
func alignAndRun5m(fn func(tickEnd time.Time)) {
now := time.Now()
next := now.Truncate(5 * time.Minute).Add(5 * time.Minute)
time.Sleep(time.Until(next))
for {
tickEnd := time.Now().Truncate(5 * time.Minute)
fn(tickEnd)
time.Sleep(5 * time.Minute)
}
}
func alignAndRunHour10(fn func(tick time.Time)) {
// 计算下一个 “整点+10分钟”
now := time.Now()
base := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 10, 0, 0, now.Location())
var next time.Time
if now.After(base) {
next = base.Add(1 * time.Hour)
} else {
next = base
}
time.Sleep(time.Until(next))
for {
tick := time.Now().Truncate(time.Minute)
fn(tick)
// 每小时执行一次
time.Sleep(1 * time.Hour)
}
}
func publishOnce(cli mqtt.Client, tickEnd time.Time) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
start := tickEnd.Add(-5 * time.Minute)
// 聚合窗口
agg, err := data.WindowAverages(ctx, stationID, start, tickEnd)
if err != nil {
log.Printf("聚合失败: %v", err)
return
}
// 当天累计雨量
rc, err := data.DailyRainSinceMidnight(ctx, stationID, tickEnd)
if err != nil {
log.Printf("降雨查询失败: %v", err)
return
}
// 处理无效值:若无数据则用 0<0.001 算无效)
dm := 0.0
if agg.Dm.Valid {
dm = agg.Dm.Float64
}
sm := 0.0
if agg.Sm.Valid {
sm = agg.Sm.Float64
}
ta := 0.0
if agg.Ta.Valid {
ta = agg.Ta.Float64
}
ua := 0.0
if agg.Ua.Valid {
ua = agg.Ua.Float64
}
pa := 0.0
if agg.Pa.Valid {
pa = agg.Pa.Float64
}
// 四舍五入:风向取整数,其它保留两位小数
round2 := func(v float64) float64 { return math.Round(v*100) / 100 }
dmInt := math.Round(dm)
payload := hwsPayload{
Type: "hws",
Device: deviceID,
Dm: dmInt,
Pa: round2(pa),
Rc: round2(rc),
Sm: round2(sm),
Ta: round2(ta),
Ua: round2(ua),
Time: tickEnd.UnixMilli(),
}
b, _ := json.Marshal(payload)
tok := cli.Publish(topic, 1, false, b)
tok.Wait()
if tok.Error() != nil {
log.Printf("发布失败: %v", tok.Error())
return
}
log.Printf("发布成功 %s: %s", topic, string(b))
}
// 预测发布:每小时+10分钟从 forecast_hourly 按 issued_at=整点 选取数据
func publishPredict(cli mqtt.Client, tick time.Time) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 以 CST 解析 issued_at 整点
loc, _ := time.LoadLocation("Asia/Shanghai")
if loc == nil {
loc = time.FixedZone("CST", 8*3600)
}
now := tick.In(loc)
issued := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, loc)
const forecastStation = "RS485-002A6E"
const provider = "imdroid_mix"
points, err := data.ForecastRainAtIssued(ctx, forecastStation, provider, issued)
if err != nil {
log.Printf("预测查询失败: %v", err)
return
}
if len(points) == 0 {
log.Printf("预测无数据 issued_at=%s", issued.Format("2006-01-02 15:04:05"))
return
}
// 组装 payload
type predictItem struct {
PredictTime int64 `json:"predict_time"`
PredictRainfall string `json:"predict_rainfall"`
}
var items []predictItem
for _, p := range points {
rf := float64(p.RainMMx1000) / 1000.0
items = append(items, predictItem{
PredictTime: p.ForecastTime.UnixMilli(),
PredictRainfall: format3(rf),
})
}
payload := struct {
Type string `json:"type"`
Device string `json:"device"`
Time int64 `json:"time"`
Data []predictItem `json:"data"`
}{
Type: "predict",
Device: deviceID,
Time: now.UnixMilli(), // 当前“整点+10分钟”的时间
Data: items,
}
b, _ := json.Marshal(payload)
tok := cli.Publish(topic, 1, false, b)
tok.Wait()
if tok.Error() != nil {
log.Printf("发布预测失败: %v", tok.Error())
return
}
log.Printf("发布预测成功 %s: issued_at=%s, items=%d", topic, issued.Format("2006-01-02 15:04:05"), len(items))
}
func format3(v float64) string {
// 保留三位小数(字符串形式)
s := fmt.Sprintf("%.3f", v)
return s
}