package tcp import ( "encoding/binary" "encoding/hex" "fmt" "net" "strings" "sync" "time" "go_rain_dtu/internal/dao" "go_rain_dtu/internal/model" "go_rain_dtu/pkg/logger" ) const ( queryCmd = "010301F400100408" // 读取寄存器命令 resetCmd = "0106600200005AB6" // 清除雨量统计命令 tcpPort = ":10004" // TCP服务器端口 ) // ConnectionStatus 保存TCP连接状态 var ( // 活跃连接映射表 activeConnections = struct { conns map[string]*SensorComm // 使用IP:Port作为键 mu sync.RWMutex }{ conns: make(map[string]*SensorComm), } ) // GetConnectionStatus 返回当前连接状态 func GetConnectionStatus() (bool, string, string) { activeConnections.mu.RLock() defer activeConnections.mu.RUnlock() // 如果有任何活跃连接,返回第一个作为示例 // 在未来可以改进为返回一个完整的连接列表 for addr, conn := range activeConnections.conns { if time.Since(conn.lastActivity) < time.Minute { host, port, _ := net.SplitHostPort(addr) return true, host, port } } return false, "", "" } // 更新连接映射表 func addConnection(addr string, conn *SensorComm) { activeConnections.mu.Lock() defer activeConnections.mu.Unlock() activeConnections.conns[addr] = conn logger.Logger.Printf("添加新连接: %s, 当前连接数: %d", addr, len(activeConnections.conns)) } // 从映射表中移除连接 func removeConnection(addr string) { activeConnections.mu.Lock() defer activeConnections.mu.Unlock() delete(activeConnections.conns, addr) logger.Logger.Printf("移除连接: %s, 当前连接数: %d", addr, len(activeConnections.conns)) } // GetActiveConnectionCount 获取活跃连接数量 func GetActiveConnectionCount() int { activeConnections.mu.RLock() defer activeConnections.mu.RUnlock() return len(activeConnections.conns) } type SensorComm struct { conn net.Conn dao *dao.SensorDAO address string lastActivity time.Time mu sync.Mutex } // 创建新的传感器通信实例 func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm { return &SensorComm{ conn: conn, dao: dao, address: conn.RemoteAddr().String(), lastActivity: time.Now(), } } // 发送查询命令 func (s *SensorComm) sendQuery() error { s.mu.Lock() defer s.mu.Unlock() cmd, err := hex.DecodeString(queryCmd) if err != nil { logger.Logger.Printf("解析命令失败: %v", err) return err } _, err = s.conn.Write(cmd) if err != nil { logger.Logger.Printf("发送查询命令失败: %v", err) return err } s.lastActivity = time.Now() logger.Logger.Printf("发送查询命令: %X", cmd) return nil } // 处理连接 func handleConnection(sensor *SensorComm) { defer sensor.Close() logger.Logger.Printf("新连接建立: %s", sensor.address) // 添加到活跃连接列表 addConnection(sensor.address, sensor) // 发送首次查询 if err := sensor.sendQuery(); err != nil { removeConnection(sensor.address) return } // 设置定时器,每5分钟查询一次 ticker := time.NewTicker(time.Minute * 5) defer ticker.Stop() // 读取数据的缓冲区 buffer := make([]byte, 1024) done := make(chan bool) // 设置tcp保持活动状态,防止连接断开 tcpConn, ok := sensor.conn.(*net.TCPConn) if ok { tcpConn.SetKeepAlive(true) tcpConn.SetKeepAlivePeriod(30 * time.Second) } // 处理接收数据的goroutine go func() { for { // 设置读取超时 - 增加超时时间到10分钟,大于查询间隔 sensor.conn.SetReadDeadline(time.Now().Add(10 * time.Minute)) n, err := sensor.conn.Read(buffer) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { // 超时错误不一定意味着连接断开,继续等待 logger.Logger.Printf("读取超时: %v, 等待下一次查询", err) continue } // 其他错误才认为连接断开 logger.Logger.Printf("读取数据失败: %v", err) done <- true return } if n > 0 { // 更新最后活动时间 sensor.lastActivity = time.Now() // 记录原始数据 logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n]) // 只处理符合预期长度的响应(37字节) if n == 37 { if sensorData := sensor.handleData(buffer[:n]); sensorData != nil { logger.Logger.Printf("处理数据成功: %+v", sensorData) } } else { logger.Logger.Printf("数据长度不符合要求: %d != 37", n) } } } }() // 主循环 for { select { case <-ticker.C: logger.Logger.Printf("定时查询触发 [%s]", sensor.address) if err := sensor.sendQuery(); err != nil { removeConnection(sensor.address) return } case <-done: removeConnection(sensor.address) return } } } // 处理接收到的数据 func (s *SensorComm) handleData(data []byte) *model.SensorData { // 检查数据长度 if len(data) != 37 { logger.Logger.Printf("数据长度错误: %d != 37", len(data)) return nil } // 验证数据格式 // 1. 检查起始字节是否为 0x01 0x03 if data[0] != 0x01 || data[1] != 0x03 { logger.Logger.Printf("数据格式错误: 起始字节不是0x01 0x03") return nil } // 2. 检查数据长度字节(第3个字节)是否为0x20 if data[2] != 0x20 { logger.Logger.Printf("数据格式错误: 长度字节不是0x20") return nil } // 解析数据,从第4个字节开始是数据部分 sensorData := &model.SensorData{ Timestamp: time.Now(), WindSpeed: int(binary.BigEndian.Uint16(data[3:5])), // 风速值*100 WindForce: int(binary.BigEndian.Uint16(data[5:7])), // 风力 WindDirection8: int(binary.BigEndian.Uint16(data[7:9])), // 8方位风向 WindDirection360: int(binary.BigEndian.Uint16(data[9:11])), // 360度风向 Humidity: int(binary.BigEndian.Uint16(data[11:13])), // 湿度*10 Temperature: int(binary.BigEndian.Uint16(data[13:15])), // 温度*10 Noise: int(binary.BigEndian.Uint16(data[15:17])), // 噪声*10 PM25: int(binary.BigEndian.Uint16(data[17:19])), // PM2.5 PM10: int(binary.BigEndian.Uint16(data[19:21])), // PM10 AtmPressure: int(binary.BigEndian.Uint16(data[21:23])), // 大气压*10 Lux20WH: int(binary.BigEndian.Uint16(data[23:25])), // 20W Lux值(高16位) Lux20WL: int(binary.BigEndian.Uint16(data[25:27])), // 20W Lux值(低16位) Light20W: int(binary.BigEndian.Uint16(data[27:29])), // 20W光照 OpticalRain: int(binary.BigEndian.Uint16(data[29:31])), // 光学雨量*10 CompassAngle: int(binary.BigEndian.Uint16(data[31:33])), // 电子指南针角度*100 SolarRadiation: int(binary.BigEndian.Uint16(data[33:35])), // 太阳辐射 } // 记录解析后的传感器数据,展示实际物理量 logger.Logger.Printf("传感器数据: 风速=%.2f m/s, 风力=%d级, 风向=%d°, 湿度=%.1f%%, 温度=%.1f℃, 噪声=%.1f dB, PM2.5=%d μg/m³, PM10=%d μg/m³, 大气压=%.1f kPa, 20W Lux值=%d/%d, 20W光照=%d 百Lux, 光学雨量=%.1f mm, 电子指南针角度=%.2f°, 太阳辐射=%d W/m²", float64(sensorData.WindSpeed)/100.0, sensorData.WindForce, sensorData.WindDirection360, float64(sensorData.Humidity)/10.0, float64(sensorData.Temperature)/10.0, float64(sensorData.Noise)/10.0, sensorData.PM25, sensorData.PM10, float64(sensorData.AtmPressure)/10.0, sensorData.Lux20WH, sensorData.Lux20WL, sensorData.Light20W, float64(sensorData.OpticalRain)/10.0, float64(sensorData.CompassAngle)/100.0, sensorData.SolarRadiation) // 保存数据到数据库 if err := s.dao.Insert(sensorData); err != nil { logger.Logger.Printf("保存数据失败: %v", err) return nil } return sensorData } // 关闭连接 func (s *SensorComm) Close() { s.mu.Lock() defer s.mu.Unlock() if s.conn != nil { s.conn.Close() s.conn = nil // 从活跃连接列表中移除 removeConnection(s.address) } } // 启动TCP服务器 func StartTCPServer(dao *dao.SensorDAO) error { // 创建TCP监听器并设置TCP选项 addr, err := net.ResolveTCPAddr("tcp", tcpPort) if err != nil { return fmt.Errorf("解析TCP地址失败: %v", err) } listener, err := net.ListenTCP("tcp", addr) if err != nil { return fmt.Errorf("启动TCP服务器失败: %v", err) } defer listener.Close() logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort) for { conn, err := listener.AcceptTCP() if err != nil { logger.Logger.Printf("接受连接失败: %v", err) continue } // 设置TCP连接选项 conn.SetKeepAlive(true) conn.SetKeepAlivePeriod(30 * time.Second) conn.SetLinger(0) // 立即关闭 // 创建新连接 sensor := NewSensorComm(conn, dao) logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr()) // 处理连接 go handleConnection(sensor) } } // 辅助函数:将十六进制字符串转换为字节数组 func hexStringToBytes(s string) []byte { // 移除空格 s = strings.ReplaceAll(s, " ", "") // 确保字符串长度是偶数 if len(s)%2 != 0 { return nil } bytes := make([]byte, len(s)/2) for i := 0; i < len(s); i += 2 { // 转换高位 high, ok := hexCharToByte(s[i]) if !ok { return nil } // 转换低位 low, ok := hexCharToByte(s[i+1]) if !ok { return nil } // 组合高位和低位 bytes[i/2] = high<<4 | low } return bytes } // 辅助函数:将十六进制字符转换为字节 func hexCharToByte(c byte) (byte, bool) { switch { case '0' <= c && c <= '9': return c - '0', true case 'a' <= c && c <= 'f': return c - 'a' + 10, true case 'A' <= c && c <= 'F': return c - 'A' + 10, true } return 0, false } // TriggerManualQuery 手动触发向所有设备发送查询 func TriggerManualQuery() int { activeConnections.mu.RLock() defer activeConnections.mu.RUnlock() count := 0 for addr, conn := range activeConnections.conns { // 只有活跃的连接才发送查询 if time.Since(conn.lastActivity) < time.Minute*10 { if err := conn.sendQuery(); err != nil { logger.Logger.Printf("手动触发查询失败 [%s]: %v", addr, err) } else { logger.Logger.Printf("手动触发查询成功 [%s]", addr) count++ } } } return count }