2025-05-15 16:27:40 +08:00

256 lines
5.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package tcp
import (
"encoding/binary"
"encoding/hex"
"fmt"
"net"
"strings"
"sync"
"time"
"go_rain_dtu/internal/dao"
"go_rain_dtu/internal/model"
"go_rain_dtu/pkg/logger"
)
const (
queryCmd = "010301F400100408" // 读取寄存器命令
resetCmd = "0106600200005AB6" // 清除雨量统计命令
tcpPort = ":10004" // TCP服务器端口
)
type SensorComm struct {
conn net.Conn
dao *dao.SensorDAO
address string
mu sync.Mutex
}
// 创建新的传感器通信实例
func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm {
return &SensorComm{
conn: conn,
dao: dao,
address: conn.RemoteAddr().String(),
}
}
// 发送查询命令
func (s *SensorComm) sendQuery() error {
s.mu.Lock()
defer s.mu.Unlock()
cmd, err := hex.DecodeString(queryCmd)
if err != nil {
logger.Logger.Printf("解析命令失败: %v", err)
return err
}
_, err = s.conn.Write(cmd)
if err != nil {
logger.Logger.Printf("发送查询命令失败: %v", err)
return err
}
logger.Logger.Printf("发送查询命令: %X", cmd)
return nil
}
// 处理连接
func handleConnection(sensor *SensorComm) {
defer sensor.Close()
logger.Logger.Printf("新连接建立: %s", sensor.address)
// 发送首次查询
if err := sensor.sendQuery(); err != nil {
return
}
// 设置定时器,每分钟查询一次
ticker := time.NewTicker(time.Minute * 5)
defer ticker.Stop()
// 读取数据的缓冲区
buffer := make([]byte, 1024)
done := make(chan bool)
// 处理接收数据的goroutine
go func() {
for {
// 设置读取超时
sensor.conn.SetReadDeadline(time.Now().Add(time.Second * 30))
n, err := sensor.conn.Read(buffer)
if err != nil {
logger.Logger.Printf("读取数据失败: %v", err)
done <- true
return
}
if n > 0 {
// 记录原始数据
logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n])
// 只处理符合预期长度的响应37字节
if n == 37 {
if sensorData := sensor.handleData(buffer[:n]); sensorData != nil {
logger.Logger.Printf("处理数据成功: %+v", sensorData)
}
} else {
logger.Logger.Printf("数据长度不符合要求: %d != 37", n)
}
}
}
}()
// 主循环
for {
select {
case <-ticker.C:
if err := sensor.sendQuery(); err != nil {
return
}
case <-done:
return
}
}
}
// 处理接收到的数据
func (s *SensorComm) handleData(data []byte) *model.SensorData {
// 检查数据长度
if len(data) != 37 {
logger.Logger.Printf("数据长度错误: %d != 37", len(data))
return nil
}
// 验证数据格式
// 1. 检查起始字节是否为 0x01 0x03
if data[0] != 0x01 || data[1] != 0x03 {
logger.Logger.Printf("数据格式错误: 起始字节不是0x01 0x03")
return nil
}
// 2. 检查数据长度字节第3个字节是否为0x20
if data[2] != 0x20 {
logger.Logger.Printf("数据格式错误: 长度字节不是0x20")
return nil
}
// 解析数据从第4个字节开始是数据部分
sensorData := &model.SensorData{
Timestamp: time.Now(),
WindSpeed: int(binary.BigEndian.Uint16(data[3:5])),
WindForce: int(binary.BigEndian.Uint16(data[5:7])),
WindDirection8: int(binary.BigEndian.Uint16(data[7:9])),
WindDirection360: int(binary.BigEndian.Uint16(data[9:11])),
Humidity: int(binary.BigEndian.Uint16(data[11:13])),
Temperature: int(binary.BigEndian.Uint16(data[13:15])),
AtmPressure: int(binary.BigEndian.Uint16(data[21:23])),
SolarRadiation: int(binary.BigEndian.Uint16(data[33:35])),
}
// 保存数据到数据库
if err := s.dao.Insert(sensorData); err != nil {
logger.Logger.Printf("保存数据失败: %v", err)
return nil
}
return sensorData
}
// 关闭连接
func (s *SensorComm) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.conn != nil {
s.conn.Close()
s.conn = nil
}
}
// 启动TCP服务器
func StartTCPServer(dao *dao.SensorDAO) error {
listener, err := net.Listen("tcp", tcpPort)
if err != nil {
return fmt.Errorf("启动TCP服务器失败: %v", err)
}
defer listener.Close()
logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort)
var currentConn *SensorComm
var mu sync.Mutex
for {
conn, err := listener.Accept()
if err != nil {
logger.Logger.Printf("接受连接失败: %v", err)
continue
}
mu.Lock()
// 关闭旧连接
if currentConn != nil {
currentConn.Close()
}
// 创建新连接
sensor := NewSensorComm(conn, dao)
currentConn = sensor
mu.Unlock()
logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr())
// 处理连接
go handleConnection(sensor)
}
}
// 辅助函数:将十六进制字符串转换为字节数组
func hexStringToBytes(s string) []byte {
// 移除空格
s = strings.ReplaceAll(s, " ", "")
// 确保字符串长度是偶数
if len(s)%2 != 0 {
return nil
}
bytes := make([]byte, len(s)/2)
for i := 0; i < len(s); i += 2 {
// 转换高位
high, ok := hexCharToByte(s[i])
if !ok {
return nil
}
// 转换低位
low, ok := hexCharToByte(s[i+1])
if !ok {
return nil
}
// 组合高位和低位
bytes[i/2] = high<<4 | low
}
return bytes
}
// 辅助函数:将十六进制字符转换为字节
func hexCharToByte(c byte) (byte, bool) {
switch {
case '0' <= c && c <= '9':
return c - '0', true
case 'a' <= c && c <= 'f':
return c - 'a' + 10, true
case 'A' <= c && c <= 'F':
return c - 'A' + 10, true
}
return 0, false
}