feat: Add heartbeat

This commit is contained in:
yarnom 2025-07-24 16:36:29 +08:00
parent fb169d4d0c
commit 0342df581b
4 changed files with 126 additions and 32 deletions

View File

@ -8,3 +8,11 @@ database:
password: "yourpassword" password: "yourpassword"
dbname: "weatherdb" dbname: "weatherdb"
sslmode: "disable" sslmode: "disable"
heartbeat:
interval: 5
message: "Hello"
device_check:
interval: 5
message: "Hello"

View File

@ -21,9 +21,21 @@ type DatabaseConfig struct {
SSLMode string `yaml:"sslmode"` SSLMode string `yaml:"sslmode"`
} }
type HeartbeatConfig struct {
Interval int `yaml:"interval"`
Message string `yaml:"message"`
}
type DeviceCheckConfig struct {
Interval int `yaml:"interval"`
Message string `yaml:"message"`
}
type Config struct { type Config struct {
Server ServerConfig `yaml:"server"` Server ServerConfig `yaml:"server"`
Database DatabaseConfig `yaml:"database"` Database DatabaseConfig `yaml:"database"`
Heartbeat HeartbeatConfig `yaml:"heartbeat"`
DeviceCheck DeviceCheckConfig `yaml:"device_check"`
} }
var ( var (

88
main.go
View File

@ -15,6 +15,8 @@ import (
"weatherstation/config" "weatherstation/config"
"weatherstation/model" "weatherstation/model"
"github.com/gin-gonic/gin"
) )
type UTF8Writer struct { type UTF8Writer struct {
@ -29,9 +31,8 @@ func (w *UTF8Writer) Write(p []byte) (n int, err error) {
if utf8.Valid(p) { if utf8.Valid(p) {
return w.w.Write(p) return w.w.Write(p)
} }
s := string(p) s := string(p)
s = strings.ToValidUTF8(s, "<EFBFBD>") s = strings.ToValidUTF8(s, "")
return w.w.Write([]byte(s)) return w.w.Write([]byte(s))
} }
@ -40,78 +41,64 @@ func setupLogger() {
if _, err := os.Stat(logDir); os.IsNotExist(err) { if _, err := os.Stat(logDir); os.IsNotExist(err) {
os.MkdirAll(logDir, 0755) os.MkdirAll(logDir, 0755)
} }
currentTime := time.Now() currentTime := time.Now()
logFileName := filepath.Join(logDir, fmt.Sprintf("%s.log", currentTime.Format("2006-01-02"))) logFileName := filepath.Join(logDir, fmt.Sprintf("%s.log", currentTime.Format("2006-01-02")))
logFile, err := os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) logFile, err := os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil { if err != nil {
log.Fatalf("无法创建日志文件: %v", err) log.Fatalf("无法创建日志文件: %v", err)
} }
bufferedWriter := bufio.NewWriter(logFile) bufferedWriter := bufio.NewWriter(logFile)
utf8Writer := NewUTF8Writer(bufferedWriter) utf8Writer := NewUTF8Writer(bufferedWriter)
go func() { go func() {
for { for {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
bufferedWriter.Flush() bufferedWriter.Flush()
} }
}() }()
multiWriter := io.MultiWriter(os.Stdout, utf8Writer) multiWriter := io.MultiWriter(os.Stdout, utf8Writer)
log.SetOutput(multiWriter) log.SetOutput(multiWriter)
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
} }
func main() { func startUDP() {
setupLogger()
cfg := config.GetConfig() cfg := config.GetConfig()
err := model.InitDB() err := model.InitDB()
if err != nil { if err != nil {
log.Fatalf("初始化数据库失败: %v", err) log.Fatalf("初始化数据库失败: %v", err)
} }
defer model.CloseDB() defer model.CloseDB()
addr := fmt.Sprintf(":%d", cfg.Server.UDPPort) addr := fmt.Sprintf(":%d", cfg.Server.UDPPort)
conn, err := net.ListenPacket("udp", addr) conn, err := net.ListenPacket("udp", addr)
if err != nil { if err != nil {
log.Fatalf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err) log.Fatalf("无法监听UDP端口 %d: %v", cfg.Server.UDPPort, err)
} }
defer conn.Close() defer conn.Close()
log.Printf("UDP服务器已启动监听端口 %d...", cfg.Server.UDPPort) log.Printf("UDP服务器已启动监听端口 %d...", cfg.Server.UDPPort)
buffer := make([]byte, 2048) buffer := make([]byte, 2048)
for { for {
n, addr, err := conn.ReadFrom(buffer) n, addr, err := conn.ReadFrom(buffer)
if err != nil { if err != nil {
log.Printf("读取数据错误: %v", err) log.Printf("读取数据错误: %v", err)
continue continue
} }
rawData := buffer[:n] rawData := buffer[:n]
data := string(rawData) data := string(rawData)
log.Printf("从 %s 接收到 %d 字节数据", addr.String(), n) log.Printf("从 %s 接收到 %d 字节数据", addr.String(), n)
weatherData, err := model.ParseWeatherData(data) weatherData, err := model.ParseWeatherData(data)
if err != nil { if err != nil {
log.Printf("解析数据失败: %v", err) log.Printf("解析数据失败: %v", err)
hexDump := hexDump(rawData) hexDump := hexDump(rawData)
log.Printf("原始码流(十六进制):\n%s", hexDump) log.Printf("原始码流(十六进制):\n%s", hexDump)
asciiDump := asciiDump(rawData) asciiDump := asciiDump(rawData)
log.Printf("ASCII码:\n%s", asciiDump) log.Printf("ASCII码:\n%s", asciiDump)
continue continue
} }
log.Println("成功解析气象站数据:") log.Println("成功解析气象站数据:")
log.Println(weatherData) log.Println(weatherData)
// 注册设备
model.RegisterDevice(weatherData.StationID, addr)
err = model.SaveWeatherData(weatherData, data) err = model.SaveWeatherData(weatherData, data)
if err != nil { if err != nil {
log.Printf("保存数据到数据库失败: %v", err) log.Printf("保存数据到数据库失败: %v", err)
@ -121,40 +108,85 @@ func main() {
} }
} }
func startDeviceCheck() {
cfg := config.GetConfig()
ticker := time.NewTicker(time.Duration(cfg.DeviceCheck.Interval) * time.Minute)
defer ticker.Stop()
for range ticker.C {
devices := model.GetOnlineDevices()
log.Printf("当前在线设备数: %d", len(devices))
for _, device := range devices {
sendUDPMessage(device.IP, cfg.DeviceCheck.Message)
}
}
}
func sendUDPMessage(ip string, message string) {
addr, err := net.ResolveUDPAddr("udp", ip+":10006")
if err != nil {
log.Printf("解析UDP地址失败: %v", err)
return
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
log.Printf("连接UDP失败: %v", err)
return
}
defer conn.Close()
_, err = conn.Write([]byte(message))
if err != nil {
log.Printf("发送UDP消息失败: %v", err)
return
}
log.Printf("成功向 %s 发送消息: %s", ip, message)
}
func main() {
setupLogger()
go startUDP()
go startDeviceCheck()
r := gin.Default()
r.LoadHTMLGlob("templates/*")
r.Static("/static", "static")
r.GET("/", func(c *gin.Context) {
c.HTML(200, "index.html", gin.H{})
})
r.Run(":10007")
}
func hexDump(data []byte) string { func hexDump(data []byte) string {
var result strings.Builder var result strings.Builder
for i := 0; i < len(data); i += 16 { for i := 0; i < len(data); i += 16 {
end := i + 16 end := i + 16
if end > len(data) { if end > len(data) {
end = len(data) end = len(data)
} }
chunk := data[i:end] chunk := data[i:end]
hexStr := hex.EncodeToString(chunk) hexStr := hex.EncodeToString(chunk)
for j := 0; j < len(hexStr); j += 2 { for j := 0; j < len(hexStr); j += 2 {
if j+2 <= len(hexStr) { if j+2 <= len(hexStr) {
result.WriteString(strings.ToUpper(hexStr[j : j+2])) result.WriteString(strings.ToUpper(hexStr[j : j+2]))
result.WriteString(" ") result.WriteString(" ")
} }
} }
result.WriteString("\n") result.WriteString("\n")
} }
return result.String() return result.String()
} }
func asciiDump(data []byte) string { func asciiDump(data []byte) string {
var result strings.Builder var result strings.Builder
for i := 0; i < len(data); i += 64 { for i := 0; i < len(data); i += 64 {
end := i + 64 end := i + 64
if end > len(data) { if end > len(data) {
end = len(data) end = len(data)
} }
chunk := data[i:end] chunk := data[i:end]
for _, b := range chunk { for _, b := range chunk {
if b >= 32 && b <= 126 { if b >= 32 && b <= 126 {
@ -163,9 +195,7 @@ func asciiDump(data []byte) string {
result.WriteString(".") result.WriteString(".")
} }
} }
result.WriteString("\n") result.WriteString("\n")
} }
return result.String() return result.String()
} }

44
model/device.go Normal file
View File

@ -0,0 +1,44 @@
package model
import (
"net"
"sync"
"time"
)
type Device struct {
IP string
LastSeen time.Time
StationID string
}
var (
devices = make(map[string]*Device)
deviceMutex sync.RWMutex
)
func RegisterDevice(stationID string, addr net.Addr) {
ip, _, _ := net.SplitHostPort(addr.String())
deviceMutex.Lock()
defer deviceMutex.Unlock()
devices[stationID] = &Device{
IP: ip,
LastSeen: time.Now(),
StationID: stationID,
}
}
func GetOnlineDevices() []*Device {
deviceMutex.RLock()
defer deviceMutex.RUnlock()
result := make([]*Device, 0, len(devices))
for _, device := range devices {
if time.Since(device.LastSeen) < 10*time.Minute {
result = append(result, device)
}
}
return result
}