diff --git a/core/cmd/core-mqtt/main.go b/core/cmd/core-mqtt/main.go new file mode 100644 index 0000000..ffd55d4 --- /dev/null +++ b/core/cmd/core-mqtt/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "os" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +func main() { + broker := "wss://broker.emqx.io:8084/mqtt" + clientID := "Mqttx_07c4e9ed" + username := "1" + password := "1" + topic := "$dp" + + opts := mqtt.NewClientOptions() + opts.AddBroker(broker) + opts.SetClientID(clientID) + opts.SetUsername(username) + opts.SetPassword(password) + opts.SetProtocolVersion(4) + opts.SetKeepAlive(60 * time.Second) + opts.SetAutoReconnect(true) + opts.SetCleanSession(true) + opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) + + c := mqtt.NewClient(opts) + if t := c.Connect(); t.Wait() && t.Error() != nil { + fmt.Printf("connect error: %v\n", t.Error()) + os.Exit(1) + } + defer c.Disconnect(250) + + // 构造所需数据格式(仅设备与时间变更) + payload := map[string]any{ + "type": "hws", + "device": "Z866", + "Dm": 0.0001, + "Pa": 976.7, + "Rc": 0, + "Sm": 0.0001, + "Ta": 39, + "Ua": 26.6, + "time": time.Now().UnixMilli(), + } + b, _ := json.Marshal(payload) + if t := c.Publish(topic, 1, false, b); t.Wait() && t.Error() != nil { + fmt.Printf("publish error: %v\n", t.Error()) + os.Exit(2) + } + fmt.Println("published to", topic) +} diff --git a/core/cmd/service-mqtt-publisher/main.go b/core/cmd/service-mqtt-publisher/main.go new file mode 100644 index 0000000..499fecd --- /dev/null +++ b/core/cmd/service-mqtt-publisher/main.go @@ -0,0 +1,228 @@ +package main + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "log" + "math" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + + "weatherstation/core/internal/data" +) + +const ( + // 固定映射 + deviceID = "Z866" + stationID = "RS485-002A6E" + + // MQTT 测试配置(可替换为生产) + brokerURL = "wss://broker.emqx.io:8084/mqtt" + clientID = "core-publisher-Z866" + username = "1" + password = "1" + topic = "$dp" +) + +type hwsPayload struct { + Type string `json:"type"` + Device string `json:"device"` + Dm float64 `json:"Dm"` + Pa float64 `json:"Pa"` + Rc float64 `json:"Rc"` + Sm float64 `json:"Sm"` + Ta float64 `json:"Ta"` + Ua float64 `json:"Ua"` + Time int64 `json:"time"` +} + +func main() { + // 初始化 MQTT 客户端 + opts := mqtt.NewClientOptions().AddBroker(brokerURL) + opts.SetClientID(clientID) + opts.SetUsername(username) + opts.SetPassword(password) + opts.SetProtocolVersion(4) + opts.SetKeepAlive(60 * time.Second) + opts.SetAutoReconnect(true) + opts.SetCleanSession(true) + opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) + + cli := mqtt.NewClient(opts) + if tok := cli.Connect(); tok.Wait() && tok.Error() != nil { + log.Fatalf("MQTT 连接失败: %v", tok.Error()) + } + defer cli.Disconnect(250) + + // 5分钟发布任务 + go alignAndRun5m(func(tickEnd time.Time) { publishOnce(cli, tickEnd) }) + // 1小时+10分钟发布预测任务 + go alignAndRunHour10(func(tick time.Time) { publishPredict(cli, tick) }) + + select {} +} + +func alignAndRun5m(fn func(tickEnd time.Time)) { + now := time.Now() + next := now.Truncate(5 * time.Minute).Add(5 * time.Minute) + time.Sleep(time.Until(next)) + for { + tickEnd := time.Now().Truncate(5 * time.Minute) + fn(tickEnd) + time.Sleep(5 * time.Minute) + } +} + +func alignAndRunHour10(fn func(tick time.Time)) { + // 计算下一个 “整点+10分钟” + now := time.Now() + base := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 10, 0, 0, now.Location()) + var next time.Time + if now.After(base) { + next = base.Add(1 * time.Hour) + } else { + next = base + } + time.Sleep(time.Until(next)) + for { + tick := time.Now().Truncate(time.Minute) + fn(tick) + // 每小时执行一次 + time.Sleep(1 * time.Hour) + } +} + +func publishOnce(cli mqtt.Client, tickEnd time.Time) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + start := tickEnd.Add(-5 * time.Minute) + // 聚合窗口 + agg, err := data.WindowAverages(ctx, stationID, start, tickEnd) + if err != nil { + log.Printf("聚合失败: %v", err) + return + } + // 当天累计雨量 + rc, err := data.DailyRainSinceMidnight(ctx, stationID, tickEnd) + if err != nil { + log.Printf("降雨查询失败: %v", err) + return + } + + // 处理无效值:若无数据则用 0(<0.001 算无效) + dm := 0.0 + if agg.Dm.Valid { + dm = agg.Dm.Float64 + } + sm := 0.0 + if agg.Sm.Valid { + sm = agg.Sm.Float64 + } + ta := 0.0 + if agg.Ta.Valid { + ta = agg.Ta.Float64 + } + ua := 0.0 + if agg.Ua.Valid { + ua = agg.Ua.Float64 + } + pa := 0.0 + if agg.Pa.Valid { + pa = agg.Pa.Float64 + } + + // 四舍五入:风向取整数,其它保留两位小数 + round2 := func(v float64) float64 { return math.Round(v*100) / 100 } + dmInt := math.Round(dm) + + payload := hwsPayload{ + Type: "hws", + Device: deviceID, + Dm: dmInt, + Pa: round2(pa), + Rc: round2(rc), + Sm: round2(sm), + Ta: round2(ta), + Ua: round2(ua), + Time: tickEnd.UnixMilli(), + } + b, _ := json.Marshal(payload) + + tok := cli.Publish(topic, 1, false, b) + tok.Wait() + if tok.Error() != nil { + log.Printf("发布失败: %v", tok.Error()) + return + } + log.Printf("发布成功 %s: %s", topic, string(b)) +} + +// 预测发布:每小时+10分钟,从 forecast_hourly 按 issued_at=整点 选取数据 +func publishPredict(cli mqtt.Client, tick time.Time) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // 以 CST 解析 issued_at 整点 + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + now := tick.In(loc) + issued := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, loc) + + const forecastStation = "RS485-002A6E" + const provider = "imdroid_mix" + points, err := data.ForecastRainAtIssued(ctx, forecastStation, provider, issued) + if err != nil { + log.Printf("预测查询失败: %v", err) + return + } + if len(points) == 0 { + log.Printf("预测无数据 issued_at=%s", issued.Format("2006-01-02 15:04:05")) + return + } + + // 组装 payload + type predictItem struct { + PredictTime int64 `json:"predict_time"` + PredictRainfall string `json:"predict_rainfall"` + } + var items []predictItem + for _, p := range points { + rf := float64(p.RainMMx1000) / 1000.0 + items = append(items, predictItem{ + PredictTime: p.ForecastTime.UnixMilli(), + PredictRainfall: format3(rf), + }) + } + + payload := struct { + Type string `json:"type"` + Device string `json:"device"` + Time int64 `json:"time"` + Data []predictItem `json:"data"` + }{ + Type: "predict", + Device: deviceID, + Time: now.UnixMilli(), // 当前“整点+10分钟”的时间 + Data: items, + } + b, _ := json.Marshal(payload) + tok := cli.Publish(topic, 1, false, b) + tok.Wait() + if tok.Error() != nil { + log.Printf("发布预测失败: %v", tok.Error()) + return + } + log.Printf("发布预测成功 %s: issued_at=%s, items=%d", topic, issued.Format("2006-01-02 15:04:05"), len(items)) +} + +func format3(v float64) string { + // 保留三位小数(字符串形式) + s := fmt.Sprintf("%.3f", v) + return s +} diff --git a/core/internal/data/forecast.go b/core/internal/data/forecast.go new file mode 100644 index 0000000..752a675 --- /dev/null +++ b/core/internal/data/forecast.go @@ -0,0 +1,40 @@ +package data + +import ( + "context" + "database/sql" + "time" + "weatherstation/internal/database" +) + +type PredictPoint struct { + ForecastTime time.Time + RainMMx1000 int32 +} + +// ForecastRainAtIssued returns forecast hourly rows for a given station/provider at an exact issued_at time. +func ForecastRainAtIssued(ctx context.Context, stationID, provider string, issuedAt time.Time) ([]PredictPoint, error) { + const q = ` + SELECT forecast_time, rain_mm_x1000 + FROM forecast_hourly + WHERE station_id=$1 AND provider=$2 AND issued_at=$3 + ORDER BY forecast_time ASC` + rows, err := database.GetDB().QueryContext(ctx, q, stationID, provider, issuedAt) + if err != nil { + return nil, err + } + defer rows.Close() + var out []PredictPoint + for rows.Next() { + var p PredictPoint + var rain sql.NullInt32 + if err := rows.Scan(&p.ForecastTime, &rain); err != nil { + continue + } + if rain.Valid { + p.RainMMx1000 = rain.Int32 + } + out = append(out, p) + } + return out, nil +} diff --git a/core/internal/data/raw.go b/core/internal/data/raw.go new file mode 100644 index 0000000..60e0503 --- /dev/null +++ b/core/internal/data/raw.go @@ -0,0 +1,85 @@ +package data + +import ( + "context" + "database/sql" + "math" + "time" + "weatherstation/internal/database" +) + +type WindowAgg struct { + Ta sql.NullFloat64 + Ua sql.NullFloat64 + Pa sql.NullFloat64 + Sm sql.NullFloat64 + Dm sql.NullFloat64 +} + +// WindowAverages computes averages on rs485_weather_data over [start,end) window. +// Filters values < 0.001 as invalid. Wind direction averaged vectorially. +func WindowAverages(ctx context.Context, stationID string, start, end time.Time) (WindowAgg, error) { + const q = ` + SELECT + AVG(temperature) AS ta, + AVG(humidity) AS ua, + AVG(pressure) AS pa, + AVG(CASE WHEN wind_speed >= 0.001 THEN wind_speed END) AS sm, + DEGREES( + ATAN2( + AVG(CASE WHEN wind_speed >= 0.001 THEN SIN(RADIANS(wind_direction)) END), + AVG(CASE WHEN wind_speed >= 0.001 THEN COS(RADIANS(wind_direction)) END) + ) + ) AS dm + FROM rs485_weather_data + WHERE station_id = $1 AND timestamp >= $2 AND timestamp < $3` + var agg WindowAgg + err := database.GetDB().QueryRowContext(ctx, q, stationID, start, end).Scan( + &agg.Ta, &agg.Ua, &agg.Pa, &agg.Sm, &agg.Dm, + ) + if err != nil { + return WindowAgg{}, err + } + // Normalize Dm to [0,360) + if agg.Dm.Valid { + v := agg.Dm.Float64 + if v < 0 { + v += 360.0 + } + // handle NaN from no data + if math.IsNaN(v) { + agg.Dm.Valid = false + } else { + agg.Dm.Float64 = v + } + } + return agg, nil +} + +// DailyRainSinceMidnight computes current_day_rain = max(0, latest - baselineAtMidnight). +// If baseline is null, returns 0. +func DailyRainSinceMidnight(ctx context.Context, stationID string, now time.Time) (float64, error) { + // Midnight in Asia/Shanghai + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("CST", 8*3600) + } + dayStart := time.Date(now.In(loc).Year(), now.In(loc).Month(), now.In(loc).Day(), 0, 0, 0, 0, loc) + + var baseline sql.NullFloat64 + const qBase = `SELECT rainfall FROM rs485_weather_data WHERE station_id=$1 AND timestamp <= $2 ORDER BY timestamp DESC LIMIT 1` + _ = database.GetDB().QueryRowContext(ctx, qBase, stationID, dayStart).Scan(&baseline) + + var current sql.NullFloat64 + const qCur = `SELECT rainfall FROM rs485_weather_data WHERE station_id=$1 ORDER BY timestamp DESC LIMIT 1` + _ = database.GetDB().QueryRowContext(ctx, qCur, stationID).Scan(¤t) + + if !current.Valid || !baseline.Valid { + return 0, nil + } + v := current.Float64 - baseline.Float64 + if v < 0 { + v = 0 + } + return v, nil +}