package tcp import ( "encoding/binary" "encoding/hex" "fmt" "net" "strings" "sync" "time" "go_rain_dtu/internal/dao" "go_rain_dtu/internal/model" "go_rain_dtu/pkg/logger" ) const ( queryCmd = "010301F400100408" // 读取寄存器命令 resetCmd = "0106600200005AB6" // 清除雨量统计命令 tcpPort = ":10004" // TCP服务器端口 ) // ConnectionStatus 保存TCP连接状态 var ( ConnectionStatus = struct { Connected bool IP string Port string LastSeen time.Time mu sync.RWMutex }{ Connected: false, IP: "", Port: "", } ) // GetConnectionStatus 返回当前连接状态 func GetConnectionStatus() (bool, string, string) { ConnectionStatus.mu.RLock() defer ConnectionStatus.mu.RUnlock() // 如果最后一次通信时间超过1分钟,认为连接已断开 if ConnectionStatus.Connected && time.Since(ConnectionStatus.LastSeen) > time.Minute { return false, ConnectionStatus.IP, ConnectionStatus.Port } return ConnectionStatus.Connected, ConnectionStatus.IP, ConnectionStatus.Port } // updateConnectionStatus 更新连接状态 func updateConnectionStatus(connected bool, addr string) { ConnectionStatus.mu.Lock() defer ConnectionStatus.mu.Unlock() ConnectionStatus.Connected = connected if connected && addr != "" { host, port, _ := net.SplitHostPort(addr) ConnectionStatus.IP = host ConnectionStatus.Port = port ConnectionStatus.LastSeen = time.Now() } else if !connected { // 如果断开连接,只更新状态,保留最后的IP和端口信息 ConnectionStatus.Connected = false } } type SensorComm struct { conn net.Conn dao *dao.SensorDAO address string mu sync.Mutex } // 创建新的传感器通信实例 func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm { return &SensorComm{ conn: conn, dao: dao, address: conn.RemoteAddr().String(), } } // 发送查询命令 func (s *SensorComm) sendQuery() error { s.mu.Lock() defer s.mu.Unlock() cmd, err := hex.DecodeString(queryCmd) if err != nil { logger.Logger.Printf("解析命令失败: %v", err) return err } _, err = s.conn.Write(cmd) if err != nil { logger.Logger.Printf("发送查询命令失败: %v", err) return err } logger.Logger.Printf("发送查询命令: %X", cmd) return nil } // 处理连接 func handleConnection(sensor *SensorComm) { defer sensor.Close() logger.Logger.Printf("新连接建立: %s", sensor.address) updateConnectionStatus(true, sensor.address) // 发送首次查询 if err := sensor.sendQuery(); err != nil { updateConnectionStatus(false, "") return } // 设置定时器,每5分钟查询一次 ticker := time.NewTicker(time.Minute * 5) defer ticker.Stop() // 读取数据的缓冲区 buffer := make([]byte, 1024) done := make(chan bool) // 设置tcp保持活动状态,防止连接断开 tcpConn, ok := sensor.conn.(*net.TCPConn) if ok { tcpConn.SetKeepAlive(true) tcpConn.SetKeepAlivePeriod(30 * time.Second) } // 处理接收数据的goroutine go func() { for { // 设置读取超时 - 增加超时时间到10分钟,大于查询间隔 sensor.conn.SetReadDeadline(time.Now().Add(10 * time.Minute)) n, err := sensor.conn.Read(buffer) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { // 超时错误不一定意味着连接断开,继续等待 logger.Logger.Printf("读取超时: %v, 等待下一次查询", err) continue } // 其他错误才认为连接断开 logger.Logger.Printf("读取数据失败: %v", err) updateConnectionStatus(false, "") done <- true return } if n > 0 { // 更新最后通信时间 updateConnectionStatus(true, sensor.address) // 记录原始数据 logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n]) // 只处理符合预期长度的响应(37字节) if n == 37 { if sensorData := sensor.handleData(buffer[:n]); sensorData != nil { logger.Logger.Printf("处理数据成功: %+v", sensorData) } } else { logger.Logger.Printf("数据长度不符合要求: %d != 37", n) } } } }() // 主循环 for { select { case <-ticker.C: logger.Logger.Printf("定时查询触发 [%s]", sensor.address) if err := sensor.sendQuery(); err != nil { updateConnectionStatus(false, "") return } case <-done: updateConnectionStatus(false, "") return } } } // 处理接收到的数据 func (s *SensorComm) handleData(data []byte) *model.SensorData { // 检查数据长度 if len(data) != 37 { logger.Logger.Printf("数据长度错误: %d != 37", len(data)) return nil } // 验证数据格式 // 1. 检查起始字节是否为 0x01 0x03 if data[0] != 0x01 || data[1] != 0x03 { logger.Logger.Printf("数据格式错误: 起始字节不是0x01 0x03") return nil } // 2. 检查数据长度字节(第3个字节)是否为0x20 if data[2] != 0x20 { logger.Logger.Printf("数据格式错误: 长度字节不是0x20") return nil } // 解析数据,从第4个字节开始是数据部分 // 根据原始TCP服务器代码中的单位转换 windSpeed := int(binary.BigEndian.Uint16(data[3:5])) // 风速值*100 sensorData := &model.SensorData{ Timestamp: time.Now(), WindSpeed: windSpeed, // 原始值已经是*100的 WindForce: int(binary.BigEndian.Uint16(data[5:7])), WindDirection8: int(binary.BigEndian.Uint16(data[7:9])), WindDirection360: int(binary.BigEndian.Uint16(data[9:11])), Humidity: int(binary.BigEndian.Uint16(data[11:13])), // 湿度*10 Temperature: int(binary.BigEndian.Uint16(data[13:15])), // 温度*10 AtmPressure: int(binary.BigEndian.Uint16(data[21:23])), // 大气压*10 SolarRadiation: int(binary.BigEndian.Uint16(data[33:35])), // 太阳辐射原始值 } // 记录解析后的传感器数据,展示实际物理量 logger.Logger.Printf("传感器数据: 风速=%.2f m/s, 风力=%d级, 风向=%d°, 湿度=%.1f%%, 温度=%.1f℃, 大气压=%.1f kPa, 太阳辐射=%d W/m²", float64(sensorData.WindSpeed)/100.0, sensorData.WindForce, sensorData.WindDirection360, float64(sensorData.Humidity)/10.0, float64(sensorData.Temperature)/10.0, float64(sensorData.AtmPressure)/10.0, sensorData.SolarRadiation) // 保存数据到数据库 if err := s.dao.Insert(sensorData); err != nil { logger.Logger.Printf("保存数据失败: %v", err) return nil } return sensorData } // 关闭连接 func (s *SensorComm) Close() { s.mu.Lock() defer s.mu.Unlock() if s.conn != nil { s.conn.Close() s.conn = nil updateConnectionStatus(false, "") } } // 启动TCP服务器 func StartTCPServer(dao *dao.SensorDAO) error { // 创建TCP监听器并设置TCP选项 addr, err := net.ResolveTCPAddr("tcp", tcpPort) if err != nil { return fmt.Errorf("解析TCP地址失败: %v", err) } listener, err := net.ListenTCP("tcp", addr) if err != nil { return fmt.Errorf("启动TCP服务器失败: %v", err) } defer listener.Close() logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort) var currentConn *SensorComm var mu sync.Mutex for { conn, err := listener.AcceptTCP() if err != nil { logger.Logger.Printf("接受连接失败: %v", err) continue } // 设置TCP连接选项 conn.SetKeepAlive(true) conn.SetKeepAlivePeriod(30 * time.Second) conn.SetLinger(0) // 立即关闭 mu.Lock() // 关闭旧连接 if currentConn != nil { currentConn.Close() } // 创建新连接 sensor := NewSensorComm(conn, dao) currentConn = sensor mu.Unlock() logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr()) // 处理连接 go handleConnection(sensor) } } // 辅助函数:将十六进制字符串转换为字节数组 func hexStringToBytes(s string) []byte { // 移除空格 s = strings.ReplaceAll(s, " ", "") // 确保字符串长度是偶数 if len(s)%2 != 0 { return nil } bytes := make([]byte, len(s)/2) for i := 0; i < len(s); i += 2 { // 转换高位 high, ok := hexCharToByte(s[i]) if !ok { return nil } // 转换低位 low, ok := hexCharToByte(s[i+1]) if !ok { return nil } // 组合高位和低位 bytes[i/2] = high<<4 | low } return bytes } // 辅助函数:将十六进制字符转换为字节 func hexCharToByte(c byte) (byte, bool) { switch { case '0' <= c && c <= '9': return c - '0', true case 'a' <= c && c <= 'f': return c - 'a' + 10, true case 'A' <= c && c <= 'F': return c - 'A' + 10, true } return 0, false }