320 lines
8.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
"bufio"
"context"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"
"unicode/utf8"
"weatherstation/internal/config"
"weatherstation/internal/tools"
"weatherstation/model"
)
// UTF8Writer 包装一个io.Writer确保写入的数据是有效的UTF-8
type UTF8Writer struct {
w io.Writer
}
// NewUTF8Writer 创建一个新的UTF8Writer
func NewUTF8Writer(w io.Writer) *UTF8Writer {
return &UTF8Writer{w: w}
}
// Write 实现io.Writer接口
func (w *UTF8Writer) Write(p []byte) (n int, err error) {
if utf8.Valid(p) {
return w.w.Write(p)
}
s := string(p)
s = strings.ToValidUTF8(s, "")
return w.w.Write([]byte(s))
}
var (
logFile *os.File
logFileMutex sync.Mutex
currentLogDay int
)
// getLogBaseDir 返回日志目录:优先环境变量 LOG_DIR否则使用可执行文件所在目录下的 log 子目录
func getLogBaseDir() string {
if v := os.Getenv("LOG_DIR"); strings.TrimSpace(v) != "" {
return v
}
exe, _ := os.Executable()
exeDir := filepath.Dir(exe)
return filepath.Join(exeDir, "log")
}
// getLogFileName 获取当前日期的日志文件名
func getLogFileName() string {
currentTime := time.Now()
return filepath.Join(getLogBaseDir(), fmt.Sprintf("%s.log", currentTime.Format("2006-01-02")))
}
// openLogFile 打开日志文件
func openLogFile() (*os.File, error) {
logDir := getLogBaseDir()
if err := os.MkdirAll(logDir, 0o755); err != nil {
return nil, err
}
logFileName := getLogFileName()
return os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
}
// SetupLogger 设置日志系统
func SetupLogger() {
var err error
logFile, err = openLogFile()
if err != nil {
log.Fatalf("无法创建日志文件: %v", err)
}
currentLogDay = time.Now().Day()
bufferedWriter := bufio.NewWriter(logFile)
utf8Writer := NewUTF8Writer(bufferedWriter)
go func() {
for {
time.Sleep(1 * time.Second)
logFileMutex.Lock()
bufferedWriter.Flush()
now := time.Now()
if now.Day() != currentLogDay {
oldLogFile := logFile
logFile, err = openLogFile()
if err != nil {
log.Printf("无法创建新日志文件: %v", err)
} else {
oldLogFile.Close()
currentLogDay = now.Day()
bufferedWriter = bufio.NewWriter(logFile)
utf8Writer = NewUTF8Writer(bufferedWriter)
log.SetOutput(io.MultiWriter(os.Stdout, utf8Writer))
log.Println("日志文件已轮转")
}
}
logFileMutex.Unlock()
}
}()
multiWriter := io.MultiWriter(os.Stdout, utf8Writer)
log.SetOutput(multiWriter)
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
}
// StartUDPServer 启动UDP服务器
func StartUDPServer() error {
cfg := config.GetConfig()
// 初始化数据库连接
if err := model.InitDB(); err != nil {
return fmt.Errorf("初始化数据库失败: %v", err)
}
defer model.CloseDB()
addr := fmt.Sprintf(":%d", cfg.Server.UDPPort)
conn, err := net.ListenPacket("udp", addr)
if err != nil {
return fmt.Errorf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err)
}
defer conn.Close()
log.Printf("UDP服务器已启动监听端口 %d...", cfg.Server.UDPPort)
buffer := make([]byte, 2048)
// 后台定时每分钟回刷最近30分钟全站
go func() {
for {
now := time.Now()
from := now.Add(-30 * time.Minute)
ctx := context.Background()
if err := tools.RunBackfill10Min(ctx, tools.BackfillOptions{
FromTime: from,
ToTime: now,
WrapCycleMM: 0,
BucketMinutes: 10,
}); err != nil {
log.Printf("定时回填失败: %v", err)
}
time.Sleep(1 * time.Minute)
}
}()
// 说明:原有的 open-meteo/彩云/CMA 定时抓取已移除,避免与独立的 service-forecast 重复调度。
// 若需要启用预报抓取,请运行 `cmd/service-forecast` 服务。
// 说明融合任务已迁移至独立服务service-fusion
for {
n, addr, err := conn.ReadFrom(buffer)
if err != nil {
log.Printf("读取数据错误: %v", err)
continue
}
rawData := buffer[:n]
log.Printf("从 %s 接收到 %d 字节数据", addr.String(), n)
hexDump := hexDump(rawData)
log.Printf("原始码流(十六进制):\n%s", hexDump)
asciiDump := asciiDump(rawData)
log.Printf("ASCII码:\n%s", asciiDump)
if len(rawData) == 25 && rawData[0] == 0x24 {
handleRS485Data(rawData, addr, hexDump)
} else {
handleWiFiData(rawData, addr)
}
}
}
// handleRS485Data 处理RS485设备数据
func handleRS485Data(rawData []byte, addr net.Addr, hexDump string) {
log.Println("485 型气象站数据")
// 生成源码字符串(用于日志记录)
sourceHex := strings.ReplaceAll(strings.TrimSpace(hexDump), "\n", " ")
log.Printf("源码: %s", sourceHex)
// 解析RS485数据
protocol := model.NewProtocol(rawData)
rs485Protocol := model.NewRS485Protocol(rawData)
// 获取设备ID
idParts, err := protocol.GetCompleteID()
if err != nil {
log.Printf("获取设备ID失败: %v", err)
return
}
// 解析RS485数据
rs485Data, err := rs485Protocol.ParseRS485Data()
if err != nil {
log.Printf("解析RS485数据失败: %v", err)
return
}
// 添加设备ID和时间戳
rs485Data.DeviceID = idParts.Complete.Hex
rs485Data.ReceivedAt = time.Now()
rs485Data.RawDataHex = sourceHex
// 打印解析结果到日志
log.Println("=== RS485 ===")
log.Printf("设备ID: RS485-%s", rs485Data.DeviceID)
log.Printf("温度: %.2f°C", rs485Data.Temperature)
log.Printf("湿度: %.1f%%", rs485Data.Humidity)
log.Printf("风速: %.5f m/s", rs485Data.WindSpeed)
log.Printf("风向: %.1f°", rs485Data.WindDirection)
log.Printf("降雨量: %.3f mm", rs485Data.Rainfall)
log.Printf("光照: %.1f lux", rs485Data.Light)
log.Printf("紫外线: %.1f", rs485Data.UV)
log.Printf("气压: %.2f hPa", rs485Data.Pressure)
log.Printf("接收时间: %s", rs485Data.ReceivedAt.Format("2006-01-02 15:04:05"))
// 注册设备
stationID := fmt.Sprintf("RS485-%s", rs485Data.DeviceID)
model.RegisterDevice(stationID, addr)
log.Printf("设备 %s 已注册IP: %s", stationID, addr.String())
// 保存到数据库
err = model.SaveWeatherData(rs485Data, string(rawData))
if err != nil {
log.Printf("保存数据到数据库失败: %v", err)
} else {
log.Printf("数据已成功保存到数据库")
}
}
// handleWiFiData 处理WiFi设备数据
func handleWiFiData(rawData []byte, addr net.Addr) {
// 尝试解析WIFI数据
data, deviceType, err := model.ParseData(rawData)
if err != nil {
log.Printf("解析数据失败: %v", err)
return
}
log.Println("成功解析气象站数据:")
log.Printf("设备类型: %s", getDeviceTypeString(deviceType))
log.Println(data)
if deviceType == model.DeviceTypeWIFI {
if wifiData, ok := data.(*model.WeatherData); ok {
stationID := wifiData.StationID
if stationID != "" {
model.RegisterDevice(stationID, addr)
log.Printf("设备 %s 已注册IP: %s", stationID, addr.String())
} else {
log.Printf("警告: 收到的数据没有站点ID")
}
}
}
}
// getDeviceTypeString 获取设备类型字符串
func getDeviceTypeString(deviceType model.DeviceType) string {
switch deviceType {
case model.DeviceTypeWIFI:
return "WIFI"
case model.DeviceTypeRS485:
return "RS485"
default:
return "未知"
}
}
// hexDump 生成十六进制转储
func hexDump(data []byte) string {
var result strings.Builder
for i := 0; i < len(data); i += 16 {
end := i + 16
if end > len(data) {
end = len(data)
}
chunk := data[i:end]
hexStr := hex.EncodeToString(chunk)
for j := 0; j < len(hexStr); j += 2 {
if j+2 <= len(hexStr) {
result.WriteString(strings.ToUpper(hexStr[j : j+2]))
result.WriteString(" ")
}
}
result.WriteString("\n")
}
return result.String()
}
// asciiDump 生成ASCII转储
func asciiDump(data []byte) string {
var result strings.Builder
for i := 0; i < len(data); i += 64 {
end := i + 64
if end > len(data) {
end = len(data)
}
chunk := data[i:end]
for _, b := range chunk {
if b >= 32 && b <= 126 {
result.WriteByte(b)
} else {
result.WriteString(".")
}
}
result.WriteString("\n")
}
return result.String()
}