package tcp import ( "encoding/binary" "encoding/hex" "fmt" "net" "strings" "sync" "time" "go_rain_dtu/internal/dao" "go_rain_dtu/internal/model" "go_rain_dtu/pkg/logger" ) const ( queryCmd = "010301F400100408" // 读取寄存器命令 resetCmd = "0106600200005AB6" // 清除雨量统计命令 tcpPort = ":10004" // TCP服务器端口 ) type SensorComm struct { conn net.Conn dao *dao.SensorDAO address string mu sync.Mutex } // 创建新的传感器通信实例 func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm { return &SensorComm{ conn: conn, dao: dao, address: conn.RemoteAddr().String(), } } // 发送查询命令 func (s *SensorComm) sendQuery() error { s.mu.Lock() defer s.mu.Unlock() cmd, err := hex.DecodeString(queryCmd) if err != nil { logger.Logger.Printf("解析命令失败: %v", err) return err } _, err = s.conn.Write(cmd) if err != nil { logger.Logger.Printf("发送查询命令失败: %v", err) return err } logger.Logger.Printf("发送查询命令: %X", cmd) return nil } // 处理连接 func handleConnection(sensor *SensorComm) { defer sensor.Close() logger.Logger.Printf("新连接建立: %s", sensor.address) // 发送首次查询 if err := sensor.sendQuery(); err != nil { return } // 设置定时器,每5分钟查询一次 ticker := time.NewTicker(time.Minute * 5) defer ticker.Stop() // 读取数据的缓冲区 buffer := make([]byte, 1024) done := make(chan bool) // 设置tcp保持活动状态,防止连接断开 tcpConn, ok := sensor.conn.(*net.TCPConn) if ok { tcpConn.SetKeepAlive(true) tcpConn.SetKeepAlivePeriod(30 * time.Second) } // 处理接收数据的goroutine go func() { for { // 设置读取超时 - 增加超时时间到10分钟,大于查询间隔 sensor.conn.SetReadDeadline(time.Now().Add(10 * time.Minute)) n, err := sensor.conn.Read(buffer) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { // 超时错误不一定意味着连接断开,继续等待 logger.Logger.Printf("读取超时: %v, 等待下一次查询", err) continue } // 其他错误才认为连接断开 logger.Logger.Printf("读取数据失败: %v", err) done <- true return } if n > 0 { // 记录原始数据 logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n]) // 只处理符合预期长度的响应(37字节) if n == 37 { if sensorData := sensor.handleData(buffer[:n]); sensorData != nil { logger.Logger.Printf("处理数据成功: %+v", sensorData) } } else { logger.Logger.Printf("数据长度不符合要求: %d != 37", n) } } } }() // 主循环 for { select { case <-ticker.C: logger.Logger.Printf("定时查询触发 [%s]", sensor.address) if err := sensor.sendQuery(); err != nil { return } case <-done: return } } } // 处理接收到的数据 func (s *SensorComm) handleData(data []byte) *model.SensorData { // 检查数据长度 if len(data) != 37 { logger.Logger.Printf("数据长度错误: %d != 37", len(data)) return nil } // 验证数据格式 // 1. 检查起始字节是否为 0x01 0x03 if data[0] != 0x01 || data[1] != 0x03 { logger.Logger.Printf("数据格式错误: 起始字节不是0x01 0x03") return nil } // 2. 检查数据长度字节(第3个字节)是否为0x20 if data[2] != 0x20 { logger.Logger.Printf("数据格式错误: 长度字节不是0x20") return nil } // 解析数据,从第4个字节开始是数据部分 sensorData := &model.SensorData{ Timestamp: time.Now(), WindSpeed: int(binary.BigEndian.Uint16(data[3:5])), WindForce: int(binary.BigEndian.Uint16(data[5:7])), WindDirection8: int(binary.BigEndian.Uint16(data[7:9])), WindDirection360: int(binary.BigEndian.Uint16(data[9:11])), Humidity: int(binary.BigEndian.Uint16(data[11:13])), Temperature: int(binary.BigEndian.Uint16(data[13:15])), AtmPressure: int(binary.BigEndian.Uint16(data[21:23])), SolarRadiation: int(binary.BigEndian.Uint16(data[33:35])), } // 保存数据到数据库 if err := s.dao.Insert(sensorData); err != nil { logger.Logger.Printf("保存数据失败: %v", err) return nil } return sensorData } // 关闭连接 func (s *SensorComm) Close() { s.mu.Lock() defer s.mu.Unlock() if s.conn != nil { s.conn.Close() s.conn = nil } } // 启动TCP服务器 func StartTCPServer(dao *dao.SensorDAO) error { // 创建TCP监听器并设置TCP选项 addr, err := net.ResolveTCPAddr("tcp", tcpPort) if err != nil { return fmt.Errorf("解析TCP地址失败: %v", err) } listener, err := net.ListenTCP("tcp", addr) if err != nil { return fmt.Errorf("启动TCP服务器失败: %v", err) } defer listener.Close() logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort) var currentConn *SensorComm var mu sync.Mutex for { conn, err := listener.AcceptTCP() if err != nil { logger.Logger.Printf("接受连接失败: %v", err) continue } // 设置TCP连接选项 conn.SetKeepAlive(true) conn.SetKeepAlivePeriod(30 * time.Second) conn.SetLinger(0) // 立即关闭 mu.Lock() // 关闭旧连接 if currentConn != nil { currentConn.Close() } // 创建新连接 sensor := NewSensorComm(conn, dao) currentConn = sensor mu.Unlock() logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr()) // 处理连接 go handleConnection(sensor) } } // 辅助函数:将十六进制字符串转换为字节数组 func hexStringToBytes(s string) []byte { // 移除空格 s = strings.ReplaceAll(s, " ", "") // 确保字符串长度是偶数 if len(s)%2 != 0 { return nil } bytes := make([]byte, len(s)/2) for i := 0; i < len(s); i += 2 { // 转换高位 high, ok := hexCharToByte(s[i]) if !ok { return nil } // 转换低位 low, ok := hexCharToByte(s[i+1]) if !ok { return nil } // 组合高位和低位 bytes[i/2] = high<<4 | low } return bytes } // 辅助函数:将十六进制字符转换为字节 func hexCharToByte(c byte) (byte, bool) { switch { case '0' <= c && c <= '9': return c - '0', true case 'a' <= c && c <= 'f': return c - 'a' + 10, true case 'A' <= c && c <= 'F': return c - 'A' + 10, true } return 0, false }