211 lines
4.3 KiB
Go
211 lines
4.3 KiB
Go
package tcp
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"fmt"
|
||
"net"
|
||
"sync"
|
||
"time"
|
||
|
||
"go_rain_dtu/internal/dao"
|
||
"go_rain_dtu/internal/model"
|
||
"go_rain_dtu/pkg/logger"
|
||
)
|
||
|
||
const (
|
||
queryCmd = "01 03 01 F4 00 10 04 08" // 读取寄存器命令
|
||
resetCmd = "01 06 60 02 00 5A B6 31" // 清除雨量统计命令
|
||
tcpPort = ":10004" // TCP服务器端口
|
||
)
|
||
|
||
type SensorComm struct {
|
||
conn net.Conn
|
||
dao *dao.SensorDAO
|
||
address string
|
||
mu sync.Mutex
|
||
}
|
||
|
||
// 创建新的传感器通信实例
|
||
func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm {
|
||
return &SensorComm{
|
||
conn: conn,
|
||
dao: dao,
|
||
address: conn.RemoteAddr().String(),
|
||
}
|
||
}
|
||
|
||
// 发送查询命令
|
||
func (s *SensorComm) sendQuery() error {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
|
||
cmd := hexStringToBytes(queryCmd)
|
||
_, err := s.conn.Write(cmd)
|
||
if err != nil {
|
||
logger.Logger.Printf("发送查询命令失败: %v", err)
|
||
return err
|
||
}
|
||
|
||
logger.Logger.Printf("发送查询命令: %X", cmd)
|
||
return nil
|
||
}
|
||
|
||
// 处理接收到的数据
|
||
func (s *SensorComm) handleData(data []byte) *model.SensorData {
|
||
if len(data) < 37 {
|
||
logger.Logger.Printf("数据长度不足: %d", len(data))
|
||
return nil
|
||
}
|
||
|
||
// 解析数据,从第4个字节开始是数据部分
|
||
sensorData := &model.SensorData{
|
||
Timestamp: time.Now(),
|
||
WindSpeed: int(binary.BigEndian.Uint16(data[3:5])),
|
||
WindForce: int(binary.BigEndian.Uint16(data[5:7])),
|
||
WindDirection8: int(binary.BigEndian.Uint16(data[7:9])),
|
||
WindDirection360: int(binary.BigEndian.Uint16(data[9:11])),
|
||
Humidity: int(binary.BigEndian.Uint16(data[11:13])),
|
||
Temperature: int(binary.BigEndian.Uint16(data[13:15])),
|
||
AtmPressure: int(binary.BigEndian.Uint16(data[21:23])),
|
||
SolarRadiation: int(binary.BigEndian.Uint16(data[33:35])),
|
||
}
|
||
|
||
// 保存数据到数据库
|
||
if err := s.dao.Insert(sensorData); err != nil {
|
||
logger.Logger.Printf("保存数据失败: %v", err)
|
||
return nil
|
||
}
|
||
|
||
return sensorData
|
||
}
|
||
|
||
// 关闭连接
|
||
func (s *SensorComm) Close() {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
|
||
if s.conn != nil {
|
||
s.conn.Close()
|
||
s.conn = nil
|
||
}
|
||
}
|
||
|
||
// 启动TCP服务器
|
||
func StartTCPServer(dao *dao.SensorDAO) error {
|
||
listener, err := net.Listen("tcp", tcpPort)
|
||
if err != nil {
|
||
return fmt.Errorf("启动TCP服务器失败: %v", err)
|
||
}
|
||
defer listener.Close()
|
||
|
||
logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort)
|
||
|
||
var currentConn *SensorComm
|
||
var mu sync.Mutex
|
||
|
||
for {
|
||
conn, err := listener.Accept()
|
||
if err != nil {
|
||
logger.Logger.Printf("接受连接失败: %v", err)
|
||
continue
|
||
}
|
||
|
||
mu.Lock()
|
||
// 关闭旧连接
|
||
if currentConn != nil {
|
||
currentConn.Close()
|
||
}
|
||
|
||
// 创建新连接
|
||
sensor := NewSensorComm(conn, dao)
|
||
currentConn = sensor
|
||
mu.Unlock()
|
||
|
||
logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr())
|
||
|
||
// 处理连接
|
||
go handleConnection(sensor)
|
||
}
|
||
}
|
||
|
||
// 处理连接
|
||
func handleConnection(sensor *SensorComm) {
|
||
defer sensor.Close()
|
||
|
||
// 发送首次查询
|
||
if err := sensor.sendQuery(); err != nil {
|
||
return
|
||
}
|
||
|
||
// 设置定时器,每分钟查询一次
|
||
ticker := time.NewTicker(time.Minute)
|
||
defer ticker.Stop()
|
||
|
||
// 读取数据的缓冲区
|
||
buffer := make([]byte, 1024)
|
||
|
||
for {
|
||
// 设置读取超时
|
||
sensor.conn.SetReadDeadline(time.Now().Add(time.Second * 30))
|
||
|
||
n, err := sensor.conn.Read(buffer)
|
||
if err != nil {
|
||
logger.Logger.Printf("读取数据失败: %v", err)
|
||
return
|
||
}
|
||
|
||
if n > 0 {
|
||
logger.Logger.Printf("接收数据: %X", buffer[:n])
|
||
if sensorData := sensor.handleData(buffer[:n]); sensorData != nil {
|
||
logger.Logger.Printf("处理数据成功: %+v", sensorData)
|
||
}
|
||
}
|
||
|
||
select {
|
||
case <-ticker.C:
|
||
if err := sensor.sendQuery(); err != nil {
|
||
return
|
||
}
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
|
||
// 辅助函数:将十六进制字符串转换为字节数组
|
||
func hexStringToBytes(s string) []byte {
|
||
var bytes []byte
|
||
var b byte
|
||
var ok bool
|
||
|
||
for i := 0; i < len(s); i++ {
|
||
if s[i] == ' ' {
|
||
continue
|
||
}
|
||
|
||
if b, ok = hexCharToByte(s[i]); !ok {
|
||
continue
|
||
}
|
||
|
||
if i%2 == 0 {
|
||
bytes = append(bytes, b<<4)
|
||
} else {
|
||
bytes[len(bytes)-1] |= b
|
||
}
|
||
}
|
||
|
||
return bytes
|
||
}
|
||
|
||
// 辅助函数:将十六进制字符转换为字节
|
||
func hexCharToByte(c byte) (byte, bool) {
|
||
switch {
|
||
case '0' <= c && c <= '9':
|
||
return c - '0', true
|
||
case 'a' <= c && c <= 'f':
|
||
return c - 'a' + 10, true
|
||
case 'A' <= c && c <= 'F':
|
||
return c - 'A' + 10, true
|
||
}
|
||
return 0, false
|
||
}
|