373 lines
9.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package tcp
import (
"encoding/binary"
"encoding/hex"
"fmt"
"net"
"strings"
"sync"
"time"
"go_rain_dtu/internal/dao"
"go_rain_dtu/internal/model"
"go_rain_dtu/pkg/logger"
)
const (
queryCmd = "010301F400100408" // 读取寄存器命令
resetCmd = "0106600200005AB6" // 清除雨量统计命令
tcpPort = ":10004" // TCP服务器端口
)
// ConnectionStatus 保存TCP连接状态
var (
// 活跃连接映射表
activeConnections = struct {
conns map[string]*SensorComm // 使用IP:Port作为键
mu sync.RWMutex
}{
conns: make(map[string]*SensorComm),
}
)
// GetConnectionStatus 返回当前连接状态
func GetConnectionStatus() (bool, string, string) {
activeConnections.mu.RLock()
defer activeConnections.mu.RUnlock()
// 如果有任何活跃连接,返回第一个作为示例
// 在未来可以改进为返回一个完整的连接列表
for addr, conn := range activeConnections.conns {
if time.Since(conn.lastActivity) < time.Minute {
host, port, _ := net.SplitHostPort(addr)
return true, host, port
}
}
return false, "", ""
}
// 更新连接映射表
func addConnection(addr string, conn *SensorComm) {
activeConnections.mu.Lock()
defer activeConnections.mu.Unlock()
activeConnections.conns[addr] = conn
logger.Logger.Printf("添加新连接: %s, 当前连接数: %d", addr, len(activeConnections.conns))
}
// 从映射表中移除连接
func removeConnection(addr string) {
activeConnections.mu.Lock()
defer activeConnections.mu.Unlock()
delete(activeConnections.conns, addr)
logger.Logger.Printf("移除连接: %s, 当前连接数: %d", addr, len(activeConnections.conns))
}
// GetActiveConnectionCount 获取活跃连接数量
func GetActiveConnectionCount() int {
activeConnections.mu.RLock()
defer activeConnections.mu.RUnlock()
return len(activeConnections.conns)
}
type SensorComm struct {
conn net.Conn
dao *dao.SensorDAO
address string
lastActivity time.Time
mu sync.Mutex
}
// 创建新的传感器通信实例
func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm {
return &SensorComm{
conn: conn,
dao: dao,
address: conn.RemoteAddr().String(),
lastActivity: time.Now(),
}
}
// 发送查询命令
func (s *SensorComm) sendQuery() error {
s.mu.Lock()
defer s.mu.Unlock()
cmd, err := hex.DecodeString(queryCmd)
if err != nil {
logger.Logger.Printf("解析命令失败: %v", err)
return err
}
_, err = s.conn.Write(cmd)
if err != nil {
logger.Logger.Printf("发送查询命令失败: %v", err)
return err
}
s.lastActivity = time.Now()
logger.Logger.Printf("发送查询命令: %X", cmd)
return nil
}
// 处理连接
func handleConnection(sensor *SensorComm) {
defer sensor.Close()
logger.Logger.Printf("新连接建立: %s", sensor.address)
// 添加到活跃连接列表
addConnection(sensor.address, sensor)
// 发送首次查询
if err := sensor.sendQuery(); err != nil {
removeConnection(sensor.address)
return
}
// 设置定时器每5分钟查询一次
ticker := time.NewTicker(time.Minute * 5)
defer ticker.Stop()
// 读取数据的缓冲区
buffer := make([]byte, 1024)
done := make(chan bool)
// 设置tcp保持活动状态防止连接断开
tcpConn, ok := sensor.conn.(*net.TCPConn)
if ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(30 * time.Second)
}
// 处理接收数据的goroutine
go func() {
for {
// 设置读取超时 - 增加超时时间到10分钟大于查询间隔
sensor.conn.SetReadDeadline(time.Now().Add(10 * time.Minute))
n, err := sensor.conn.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
// 超时错误不一定意味着连接断开,继续等待
logger.Logger.Printf("读取超时: %v, 等待下一次查询", err)
continue
}
// 其他错误才认为连接断开
logger.Logger.Printf("读取数据失败: %v", err)
done <- true
return
}
if n > 0 {
// 更新最后活动时间
sensor.lastActivity = time.Now()
// 记录原始数据
logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n])
// 只处理符合预期长度的响应37字节
if n == 37 {
if sensorData := sensor.handleData(buffer[:n]); sensorData != nil {
logger.Logger.Printf("处理数据成功: %+v", sensorData)
}
} else {
logger.Logger.Printf("数据长度不符合要求: %d != 37", n)
}
}
}
}()
// 主循环
for {
select {
case <-ticker.C:
logger.Logger.Printf("定时查询触发 [%s]", sensor.address)
if err := sensor.sendQuery(); err != nil {
removeConnection(sensor.address)
return
}
case <-done:
removeConnection(sensor.address)
return
}
}
}
// 处理接收到的数据
func (s *SensorComm) handleData(data []byte) *model.SensorData {
// 检查数据长度
if len(data) != 37 {
logger.Logger.Printf("数据长度错误: %d != 37", len(data))
return nil
}
// 验证数据格式
// 1. 检查起始字节是否为 0x01 0x03
if data[0] != 0x01 || data[1] != 0x03 {
logger.Logger.Printf("数据格式错误: 起始字节不是0x01 0x03")
return nil
}
// 2. 检查数据长度字节第3个字节是否为0x20
if data[2] != 0x20 {
logger.Logger.Printf("数据格式错误: 长度字节不是0x20")
return nil
}
// 解析数据从第4个字节开始是数据部分
sensorData := &model.SensorData{
Timestamp: time.Now(),
WindSpeed: int(binary.BigEndian.Uint16(data[3:5])), // 风速值*100
WindForce: int(binary.BigEndian.Uint16(data[5:7])), // 风力
WindDirection8: int(binary.BigEndian.Uint16(data[7:9])), // 8方位风向
WindDirection360: int(binary.BigEndian.Uint16(data[9:11])), // 360度风向
Humidity: int(binary.BigEndian.Uint16(data[11:13])), // 湿度*10
Temperature: int(binary.BigEndian.Uint16(data[13:15])), // 温度*10
AtmPressure: int(binary.BigEndian.Uint16(data[21:23])), // 大气压*10
SolarRadiation: int(binary.BigEndian.Uint16(data[33:35])), // 太阳辐射
Rainfall: int(binary.BigEndian.Uint16(data[15:17])), // 光学雨量*10
}
// 记录解析后的传感器数据,展示实际物理量
logger.Logger.Printf("传感器数据: 风速=%.2f m/s, 风力=%d级, 风向=%d°, 湿度=%.1f%%, 温度=%.1f℃, 大气压=%.1f kPa, 太阳辐射=%d W/m², 光学雨量=%.1f mm",
float64(sensorData.WindSpeed)/100.0,
sensorData.WindForce,
sensorData.WindDirection360,
float64(sensorData.Humidity)/10.0,
float64(sensorData.Temperature)/10.0,
float64(sensorData.AtmPressure)/10.0,
sensorData.SolarRadiation,
float64(sensorData.Rainfall)/10.0)
// 保存数据到数据库
if err := s.dao.Insert(sensorData); err != nil {
logger.Logger.Printf("保存数据失败: %v", err)
return nil
}
return sensorData
}
// 关闭连接
func (s *SensorComm) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.conn != nil {
s.conn.Close()
s.conn = nil
// 从活跃连接列表中移除
removeConnection(s.address)
}
}
// 启动TCP服务器
func StartTCPServer(dao *dao.SensorDAO) error {
// 创建TCP监听器并设置TCP选项
addr, err := net.ResolveTCPAddr("tcp", tcpPort)
if err != nil {
return fmt.Errorf("解析TCP地址失败: %v", err)
}
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return fmt.Errorf("启动TCP服务器失败: %v", err)
}
defer listener.Close()
logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort)
for {
conn, err := listener.AcceptTCP()
if err != nil {
logger.Logger.Printf("接受连接失败: %v", err)
continue
}
// 设置TCP连接选项
conn.SetKeepAlive(true)
conn.SetKeepAlivePeriod(30 * time.Second)
conn.SetLinger(0) // 立即关闭
// 创建新连接
sensor := NewSensorComm(conn, dao)
logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr())
// 处理连接
go handleConnection(sensor)
}
}
// 辅助函数:将十六进制字符串转换为字节数组
func hexStringToBytes(s string) []byte {
// 移除空格
s = strings.ReplaceAll(s, " ", "")
// 确保字符串长度是偶数
if len(s)%2 != 0 {
return nil
}
bytes := make([]byte, len(s)/2)
for i := 0; i < len(s); i += 2 {
// 转换高位
high, ok := hexCharToByte(s[i])
if !ok {
return nil
}
// 转换低位
low, ok := hexCharToByte(s[i+1])
if !ok {
return nil
}
// 组合高位和低位
bytes[i/2] = high<<4 | low
}
return bytes
}
// 辅助函数:将十六进制字符转换为字节
func hexCharToByte(c byte) (byte, bool) {
switch {
case '0' <= c && c <= '9':
return c - '0', true
case 'a' <= c && c <= 'f':
return c - 'a' + 10, true
case 'A' <= c && c <= 'F':
return c - 'A' + 10, true
}
return 0, false
}
// TriggerManualQuery 手动触发向所有设备发送查询
func TriggerManualQuery() int {
activeConnections.mu.RLock()
defer activeConnections.mu.RUnlock()
count := 0
for addr, conn := range activeConnections.conns {
// 只有活跃的连接才发送查询
if time.Since(conn.lastActivity) < time.Minute*10 {
if err := conn.sendQuery(); err != nil {
logger.Logger.Printf("手动触发查询失败 [%s]: %v", addr, err)
} else {
logger.Logger.Printf("手动触发查询成功 [%s]", addr)
count++
}
}
}
return count
}