352 lines
8.6 KiB
Go
352 lines
8.6 KiB
Go
package tcp
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"encoding/hex"
|
||
"fmt"
|
||
"net"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"go_rain_dtu/internal/dao"
|
||
"go_rain_dtu/internal/model"
|
||
"go_rain_dtu/pkg/logger"
|
||
)
|
||
|
||
const (
|
||
queryCmd = "010301F400100408" // 读取寄存器命令
|
||
resetCmd = "0106600200005AB6" // 清除雨量统计命令
|
||
tcpPort = ":10004" // TCP服务器端口
|
||
)
|
||
|
||
// ConnectionStatus 保存TCP连接状态
|
||
var (
|
||
// 活跃连接映射表
|
||
activeConnections = struct {
|
||
conns map[string]*SensorComm // 使用IP:Port作为键
|
||
mu sync.RWMutex
|
||
}{
|
||
conns: make(map[string]*SensorComm),
|
||
}
|
||
)
|
||
|
||
// GetConnectionStatus 返回当前连接状态
|
||
func GetConnectionStatus() (bool, string, string) {
|
||
activeConnections.mu.RLock()
|
||
defer activeConnections.mu.RUnlock()
|
||
|
||
// 如果有任何活跃连接,返回第一个作为示例
|
||
// 在未来可以改进为返回一个完整的连接列表
|
||
for addr, conn := range activeConnections.conns {
|
||
if time.Since(conn.lastActivity) < time.Minute {
|
||
host, port, _ := net.SplitHostPort(addr)
|
||
return true, host, port
|
||
}
|
||
}
|
||
|
||
return false, "", ""
|
||
}
|
||
|
||
// 更新连接映射表
|
||
func addConnection(addr string, conn *SensorComm) {
|
||
activeConnections.mu.Lock()
|
||
defer activeConnections.mu.Unlock()
|
||
|
||
activeConnections.conns[addr] = conn
|
||
logger.Logger.Printf("添加新连接: %s, 当前连接数: %d", addr, len(activeConnections.conns))
|
||
}
|
||
|
||
// 从映射表中移除连接
|
||
func removeConnection(addr string) {
|
||
activeConnections.mu.Lock()
|
||
defer activeConnections.mu.Unlock()
|
||
|
||
delete(activeConnections.conns, addr)
|
||
logger.Logger.Printf("移除连接: %s, 当前连接数: %d", addr, len(activeConnections.conns))
|
||
}
|
||
|
||
// GetActiveConnectionCount 获取活跃连接数量
|
||
func GetActiveConnectionCount() int {
|
||
activeConnections.mu.RLock()
|
||
defer activeConnections.mu.RUnlock()
|
||
|
||
return len(activeConnections.conns)
|
||
}
|
||
|
||
type SensorComm struct {
|
||
conn net.Conn
|
||
dao *dao.SensorDAO
|
||
address string
|
||
lastActivity time.Time
|
||
mu sync.Mutex
|
||
}
|
||
|
||
// 创建新的传感器通信实例
|
||
func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm {
|
||
return &SensorComm{
|
||
conn: conn,
|
||
dao: dao,
|
||
address: conn.RemoteAddr().String(),
|
||
lastActivity: time.Now(),
|
||
}
|
||
}
|
||
|
||
// 发送查询命令
|
||
func (s *SensorComm) sendQuery() error {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
|
||
cmd, err := hex.DecodeString(queryCmd)
|
||
if err != nil {
|
||
logger.Logger.Printf("解析命令失败: %v", err)
|
||
return err
|
||
}
|
||
|
||
_, err = s.conn.Write(cmd)
|
||
if err != nil {
|
||
logger.Logger.Printf("发送查询命令失败: %v", err)
|
||
return err
|
||
}
|
||
|
||
s.lastActivity = time.Now()
|
||
logger.Logger.Printf("发送查询命令: %X", cmd)
|
||
return nil
|
||
}
|
||
|
||
// 处理连接
|
||
func handleConnection(sensor *SensorComm) {
|
||
defer sensor.Close()
|
||
|
||
logger.Logger.Printf("新连接建立: %s", sensor.address)
|
||
|
||
// 添加到活跃连接列表
|
||
addConnection(sensor.address, sensor)
|
||
|
||
// 发送首次查询
|
||
if err := sensor.sendQuery(); err != nil {
|
||
removeConnection(sensor.address)
|
||
return
|
||
}
|
||
|
||
// 设置定时器,每5分钟查询一次
|
||
ticker := time.NewTicker(time.Minute * 5)
|
||
defer ticker.Stop()
|
||
|
||
// 读取数据的缓冲区
|
||
buffer := make([]byte, 1024)
|
||
done := make(chan bool)
|
||
|
||
// 设置tcp保持活动状态,防止连接断开
|
||
tcpConn, ok := sensor.conn.(*net.TCPConn)
|
||
if ok {
|
||
tcpConn.SetKeepAlive(true)
|
||
tcpConn.SetKeepAlivePeriod(30 * time.Second)
|
||
}
|
||
|
||
// 处理接收数据的goroutine
|
||
go func() {
|
||
for {
|
||
// 设置读取超时 - 增加超时时间到10分钟,大于查询间隔
|
||
sensor.conn.SetReadDeadline(time.Now().Add(10 * time.Minute))
|
||
|
||
n, err := sensor.conn.Read(buffer)
|
||
if err != nil {
|
||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||
// 超时错误不一定意味着连接断开,继续等待
|
||
logger.Logger.Printf("读取超时: %v, 等待下一次查询", err)
|
||
continue
|
||
}
|
||
|
||
// 其他错误才认为连接断开
|
||
logger.Logger.Printf("读取数据失败: %v", err)
|
||
done <- true
|
||
return
|
||
}
|
||
|
||
if n > 0 {
|
||
// 更新最后活动时间
|
||
sensor.lastActivity = time.Now()
|
||
|
||
// 记录原始数据
|
||
logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n])
|
||
|
||
// 只处理符合预期长度的响应(37字节)
|
||
if n == 37 {
|
||
if sensorData := sensor.handleData(buffer[:n]); sensorData != nil {
|
||
logger.Logger.Printf("处理数据成功: %+v", sensorData)
|
||
}
|
||
} else {
|
||
logger.Logger.Printf("数据长度不符合要求: %d != 37", n)
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 主循环
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
logger.Logger.Printf("定时查询触发 [%s]", sensor.address)
|
||
if err := sensor.sendQuery(); err != nil {
|
||
removeConnection(sensor.address)
|
||
return
|
||
}
|
||
case <-done:
|
||
removeConnection(sensor.address)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// 处理接收到的数据
|
||
func (s *SensorComm) handleData(data []byte) *model.SensorData {
|
||
// 检查数据长度
|
||
if len(data) != 37 {
|
||
logger.Logger.Printf("数据长度错误: %d != 37", len(data))
|
||
return nil
|
||
}
|
||
|
||
// 验证数据格式
|
||
// 1. 检查起始字节是否为 0x01 0x03
|
||
if data[0] != 0x01 || data[1] != 0x03 {
|
||
logger.Logger.Printf("数据格式错误: 起始字节不是0x01 0x03")
|
||
return nil
|
||
}
|
||
|
||
// 2. 检查数据长度字节(第3个字节)是否为0x20
|
||
if data[2] != 0x20 {
|
||
logger.Logger.Printf("数据格式错误: 长度字节不是0x20")
|
||
return nil
|
||
}
|
||
|
||
// 解析数据,从第4个字节开始是数据部分
|
||
// 根据原始TCP服务器代码中的单位转换
|
||
windSpeed := int(binary.BigEndian.Uint16(data[3:5])) // 风速值*100
|
||
sensorData := &model.SensorData{
|
||
Timestamp: time.Now(),
|
||
WindSpeed: windSpeed, // 原始值已经是*100的
|
||
WindForce: int(binary.BigEndian.Uint16(data[5:7])),
|
||
WindDirection8: int(binary.BigEndian.Uint16(data[7:9])),
|
||
WindDirection360: int(binary.BigEndian.Uint16(data[9:11])),
|
||
Humidity: int(binary.BigEndian.Uint16(data[11:13])), // 湿度*10
|
||
Temperature: int(binary.BigEndian.Uint16(data[13:15])), // 温度*10
|
||
AtmPressure: int(binary.BigEndian.Uint16(data[21:23])), // 大气压*10
|
||
SolarRadiation: int(binary.BigEndian.Uint16(data[33:35])), // 太阳辐射原始值
|
||
}
|
||
|
||
// 记录解析后的传感器数据,展示实际物理量
|
||
logger.Logger.Printf("传感器数据: 风速=%.2f m/s, 风力=%d级, 风向=%d°, 湿度=%.1f%%, 温度=%.1f℃, 大气压=%.1f kPa, 太阳辐射=%d W/m²",
|
||
float64(sensorData.WindSpeed)/100.0,
|
||
sensorData.WindForce,
|
||
sensorData.WindDirection360,
|
||
float64(sensorData.Humidity)/10.0,
|
||
float64(sensorData.Temperature)/10.0,
|
||
float64(sensorData.AtmPressure)/10.0,
|
||
sensorData.SolarRadiation)
|
||
|
||
// 保存数据到数据库
|
||
if err := s.dao.Insert(sensorData); err != nil {
|
||
logger.Logger.Printf("保存数据失败: %v", err)
|
||
return nil
|
||
}
|
||
|
||
return sensorData
|
||
}
|
||
|
||
// 关闭连接
|
||
func (s *SensorComm) Close() {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
|
||
if s.conn != nil {
|
||
s.conn.Close()
|
||
s.conn = nil
|
||
|
||
// 从活跃连接列表中移除
|
||
removeConnection(s.address)
|
||
}
|
||
}
|
||
|
||
// 启动TCP服务器
|
||
func StartTCPServer(dao *dao.SensorDAO) error {
|
||
// 创建TCP监听器并设置TCP选项
|
||
addr, err := net.ResolveTCPAddr("tcp", tcpPort)
|
||
if err != nil {
|
||
return fmt.Errorf("解析TCP地址失败: %v", err)
|
||
}
|
||
|
||
listener, err := net.ListenTCP("tcp", addr)
|
||
if err != nil {
|
||
return fmt.Errorf("启动TCP服务器失败: %v", err)
|
||
}
|
||
defer listener.Close()
|
||
|
||
logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort)
|
||
|
||
for {
|
||
conn, err := listener.AcceptTCP()
|
||
if err != nil {
|
||
logger.Logger.Printf("接受连接失败: %v", err)
|
||
continue
|
||
}
|
||
|
||
// 设置TCP连接选项
|
||
conn.SetKeepAlive(true)
|
||
conn.SetKeepAlivePeriod(30 * time.Second)
|
||
conn.SetLinger(0) // 立即关闭
|
||
|
||
// 创建新连接
|
||
sensor := NewSensorComm(conn, dao)
|
||
|
||
logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr())
|
||
|
||
// 处理连接
|
||
go handleConnection(sensor)
|
||
}
|
||
}
|
||
|
||
// 辅助函数:将十六进制字符串转换为字节数组
|
||
func hexStringToBytes(s string) []byte {
|
||
// 移除空格
|
||
s = strings.ReplaceAll(s, " ", "")
|
||
|
||
// 确保字符串长度是偶数
|
||
if len(s)%2 != 0 {
|
||
return nil
|
||
}
|
||
|
||
bytes := make([]byte, len(s)/2)
|
||
for i := 0; i < len(s); i += 2 {
|
||
// 转换高位
|
||
high, ok := hexCharToByte(s[i])
|
||
if !ok {
|
||
return nil
|
||
}
|
||
|
||
// 转换低位
|
||
low, ok := hexCharToByte(s[i+1])
|
||
if !ok {
|
||
return nil
|
||
}
|
||
|
||
// 组合高位和低位
|
||
bytes[i/2] = high<<4 | low
|
||
}
|
||
|
||
return bytes
|
||
}
|
||
|
||
// 辅助函数:将十六进制字符转换为字节
|
||
func hexCharToByte(c byte) (byte, bool) {
|
||
switch {
|
||
case '0' <= c && c <= '9':
|
||
return c - '0', true
|
||
case 'a' <= c && c <= 'f':
|
||
return c - 'a' + 10, true
|
||
case 'A' <= c && c <= 'F':
|
||
return c - 'A' + 10, true
|
||
}
|
||
return 0, false
|
||
}
|