angle_dtu/tcp_server.go
2025-05-30 19:02:58 +08:00

406 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"fmt"
"io"
"net"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
// 客户端信息结构
type ClientInfo struct {
IP string // IP地址
Port string // 端口
LastSeen time.Time // 最后活跃时间
IsConnected bool // 是否当前连接
}
// 客户端列表(使用互斥锁保护的映射)
var (
clientsMutex sync.Mutex
clients = make(map[string]*ClientInfo)
clientConns = make(map[string]net.Conn) // 存储客户端连接
)
// StartTCPServer 启动TCP服务器
func StartTCPServer(address string) error {
listener, err := net.Listen("tcp", address)
if err != nil {
return err
}
startClientCleanup()
Logger.Printf("TCP服务器已启动正在监听 %s\n", address)
for {
conn, err := listener.Accept()
if err != nil {
Logger.Printf("接受连接失败: %v", err)
continue
}
go handleConnection(conn)
}
}
// handleConnection 处理客户端连接
func handleConnection(conn net.Conn) {
defer conn.Close()
remoteAddr := conn.RemoteAddr().String()
Logger.Printf("新的客户端连接: %s", remoteAddr)
addClient(remoteAddr)
// 存储连接以便手动查询使用
clientsMutex.Lock()
clientConns[remoteAddr] = conn
clientsMutex.Unlock()
// 注释掉自动发送指令 - 设备刚连接时立即发送一次查询指令
go func() {
time.Sleep(2 * time.Second) // 等待2秒让连接稳定
command := "@1602301014A*0!\n"
if _, err := conn.Write([]byte(command)); err != nil {
Logger.Printf("发送连接后查询指令到客户端 %s 失败: %v", remoteAddr, err)
} else {
TCPDataLogger.Printf("发送连接后查询指令到客户端 %s: %s", remoteAddr, strings.TrimSpace(command))
}
}()
// 启动定时发送指令的goroutine30分钟间隔
go sendPeriodicCommand(conn, remoteAddr)
buffer := make([]byte, 1024)
for {
n, err := conn.Read(buffer)
if err != nil {
if err != io.EOF {
Logger.Printf("从客户端读取失败 %s: %v", remoteAddr, err)
} else {
Logger.Printf("客户端断开连接 %s", remoteAddr)
}
removeClient(remoteAddr)
// 清理连接映射
clientsMutex.Lock()
delete(clientConns, remoteAddr)
clientsMutex.Unlock()
break
}
rawData := string(buffer[:n])
TCPDataLogger.Printf("从客户端 %s 接收到原始数据: %s", remoteAddr, rawData)
// 注释掉心跳包响应 - 检查是否为心跳包 JML610
// if strings.Contains(rawData, "JML610") {
// TCPDataLogger.Printf("收到心跳包从客户端 %s: %s", remoteAddr, rawData)
//
// // 立即发送查询指令
// command := "@1602301014A*0!\n"
// if _, err := conn.Write([]byte(command)); err != nil {
// Logger.Printf("响应心跳包发送查询指令到客户端 %s 失败: %v", remoteAddr, err)
// } else {
// TCPDataLogger.Printf("响应心跳包发送查询指令到客户端 %s: %s", remoteAddr, strings.TrimSpace(command))
// }
//
// updateClientLastSeen(remoteAddr)
// continue // 跳过数据解析,继续监听
// }
// 检查是否为心跳包 JML610仅记录不发送查询指令
if strings.Contains(rawData, "JML610") {
TCPDataLogger.Printf("收到心跳包从客户端 %s: %s", remoteAddr, rawData)
updateClientLastSeen(remoteAddr)
// 发送OK响应
resp := "OK\n"
if _, err := conn.Write([]byte(resp)); err != nil {
Logger.Printf("发送响应到客户端 %s 失败: %v", remoteAddr, err)
removeClient(remoteAddr)
// 清理连接映射
clientsMutex.Lock()
delete(clientConns, remoteAddr)
clientsMutex.Unlock()
break
}
continue // 跳过数据解析,继续监听
}
sensorID, x, y, z, temperature, err := parseData(rawData)
if err == nil {
TCPDataLogger.Printf("解析成功 - 客户端: %s, 传感器ID: %d, 值: X=%.3f, Y=%.3f, Z=%.3f, 温度=%.1f°C",
remoteAddr, sensorID, x, y, z, temperature)
if err := SaveSensorData(sensorID, x, y, z, temperature); err != nil {
Logger.Printf("保存传感器数据失败: %v", err)
}
} else {
TCPDataLogger.Printf("无法解析从客户端 %s 接收到的数据: %s, 错误: %v", remoteAddr, rawData, err)
}
resp := "OK\n"
if _, err := conn.Write([]byte(resp)); err != nil {
Logger.Printf("发送响应到客户端 %s 失败: %v", remoteAddr, err)
removeClient(remoteAddr)
// 清理连接映射
clientsMutex.Lock()
delete(clientConns, remoteAddr)
clientsMutex.Unlock()
break
}
updateClientLastSeen(remoteAddr)
}
}
// sendPeriodicCommand 每30分钟发送一次查询指令
func sendPeriodicCommand(conn net.Conn, remoteAddr string) {
ticker := time.NewTicker(30 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
command := "@1602301014A*0!\n"
if _, err := conn.Write([]byte(command)); err != nil {
Logger.Printf("发送定时指令到客户端 %s 失败: %v", remoteAddr, err)
return // 连接断开退出goroutine
}
TCPDataLogger.Printf("发送定时指令到客户端 %s: %s", remoteAddr, strings.TrimSpace(command))
}
}
}
// parseData 使用正则表达式解析传感器数据,支持新格式 #{1602301014-01,1,1,28.4,-6.884,1.540}!
func parseData(data string) (int, float64, float64, float64, float64, error) {
// 尝试解析新格式: #{1602301014-01,1,1,28.4,-6.884,1.540}!
newPattern := regexp.MustCompile(`#\{[^,]+-(\d+),\d+,(\d+),([-]?\d+\.\d+),([-]?\d+\.\d+),([-]?\d+\.\d+)\}!`)
matches := newPattern.FindStringSubmatch(data)
if len(matches) == 6 {
// 新格式解析
sensorID, err := strconv.Atoi(matches[2]) // 使用传感器地址编号
if err != nil {
return 0, 0, 0, 0, 0, fmt.Errorf("解析传感器ID失败: %v", err)
}
temperature, err := strconv.ParseFloat(strings.TrimSpace(matches[3]), 64)
if err != nil {
return 0, 0, 0, 0, 0, fmt.Errorf("解析温度值失败: %v", err)
}
x, err := strconv.ParseFloat(strings.TrimSpace(matches[4]), 64)
if err != nil {
return 0, 0, 0, 0, 0, fmt.Errorf("解析X值失败: %v", err)
}
y, err := strconv.ParseFloat(strings.TrimSpace(matches[5]), 64)
if err != nil {
return 0, 0, 0, 0, 0, fmt.Errorf("解析Y值失败: %v", err)
}
z := 0.0 // 新格式没有Z值设为0
return sensorID, x, y, z, temperature, nil
}
// 尝试解析旧格式: 1:1.000, 2.000, 3.000
oldPattern := regexp.MustCompile(`(\d+):([-]?\d+\.\d+),\s*([-]?\d+\.\d+),\s*([-]?\d+\.\d+)`)
matches = oldPattern.FindStringSubmatch(data)
if len(matches) == 5 {
// 旧格式解析
sensorID, err := strconv.Atoi(matches[1])
if err != nil {
return 0, 0, 0, 0, 0, fmt.Errorf("解析传感器ID失败: %v", err)
}
x, err := strconv.ParseFloat(strings.TrimSpace(matches[2]), 64)
if err != nil {
return 0, 0, 0, 0, 0, fmt.Errorf("解析X值失败: %v", err)
}
y, err := strconv.ParseFloat(strings.TrimSpace(matches[3]), 64)
if err != nil {
return 0, 0, 0, 0, 0, fmt.Errorf("解析Y值失败: %v", err)
}
z, err := strconv.ParseFloat(strings.TrimSpace(matches[4]), 64)
if err != nil {
return 0, 0, 0, 0, 0, fmt.Errorf("解析Z值失败: %v", err)
}
temperature := 0.0 // 旧格式没有温度值设为0
return sensorID, x, y, z, temperature, nil
}
return 0, 0, 0, 0, 0, fmt.Errorf("数据格式不正确: %s", data)
}
// addClient 添加客户端
func addClient(addr string) {
clientsMutex.Lock()
defer clientsMutex.Unlock()
host, port, err := net.SplitHostPort(addr)
if err != nil {
Logger.Printf("解析客户端地址失败 %s: %v", addr, err)
host = addr
port = "unknown"
}
clients[addr] = &ClientInfo{
IP: host,
Port: port,
LastSeen: time.Now(),
IsConnected: true,
}
Logger.Printf("添加新客户端: %s", addr)
}
// updateClientLastSeen 更新客户端最后活跃时间
func updateClientLastSeen(addr string) {
clientsMutex.Lock()
defer clientsMutex.Unlock()
if client, exists := clients[addr]; exists {
client.LastSeen = time.Now()
}
}
// removeClient 移除客户端
func removeClient(addr string) {
clientsMutex.Lock()
defer clientsMutex.Unlock()
if client, exists := clients[addr]; exists {
// 标记为断开连接不更新LastSeen时间
client.IsConnected = false
Logger.Printf("客户端断开连接: %s", addr)
}
}
// getAllClients 获取所有客户端信息
func getAllClients() []map[string]interface{} {
clientsMutex.Lock()
defer clientsMutex.Unlock()
now := time.Now()
result := make([]map[string]interface{}, 0, len(clients))
for addr, client := range clients {
lastSeenDuration := now.Sub(client.LastSeen)
// 清理24小时前的记录
if lastSeenDuration > 24*time.Hour {
delete(clients, addr)
continue
}
// 连接状态判断当前连接且2小时内活跃为在线
isOnline := client.IsConnected && lastSeenDuration < 2*time.Hour
var connectionStatus string
if isOnline {
connectionStatus = "保持连接"
} else if client.IsConnected {
connectionStatus = "连接超时"
} else {
connectionStatus = "已断开"
}
result = append(result, map[string]interface{}{
"address": addr,
"ip": client.IP,
"port": client.Port,
"lastSeen": client.LastSeen,
"isOnline": isOnline,
"connectionStatus": connectionStatus,
"lastSeenFormatted": formatDuration(lastSeenDuration),
})
}
return result
}
// formatDuration 格式化持续时间为友好的字符串
func formatDuration(d time.Duration) string {
if d < time.Minute {
return "刚刚"
} else if d < time.Hour {
return fmt.Sprintf("%d分钟前", int(d.Minutes()))
} else if d < 24*time.Hour {
hours := int(d.Hours())
minutes := int(d.Minutes()) % 60
if minutes == 0 {
return fmt.Sprintf("%d小时前", hours)
} else {
return fmt.Sprintf("%d小时%d分钟前", hours, minutes)
}
} else {
return fmt.Sprintf("%d天前", int(d.Hours()/24))
}
}
// startClientCleanup 启动清理过期客户端的goroutine
func startClientCleanup() {
go func() {
for {
time.Sleep(1 * time.Hour) // 每小时检查一次
clientsMutex.Lock()
now := time.Now()
for addr, client := range clients {
if now.Sub(client.LastSeen) > 24*time.Hour {
delete(clients, addr)
delete(clientConns, addr) // 同时清理连接
Logger.Printf("移除过期客户端: %s", addr)
}
}
clientsMutex.Unlock()
}
}()
}
// triggerManualQuery 手动触发向所有在线客户端发送查询指令
func triggerManualQuery() int {
clientsMutex.Lock()
defer clientsMutex.Unlock()
sentCount := 0
command := "@1602301014A*0!\n"
for addr, client := range clients {
// 检查客户端是否在线连接状态且2小时内活跃
if client.IsConnected && time.Since(client.LastSeen) < 2*time.Hour {
if conn, exists := clientConns[addr]; exists {
if _, err := conn.Write([]byte(command)); err != nil {
Logger.Printf("手动发送查询指令到客户端 %s 失败: %v", addr, err)
// 连接可能已断开,从映射中移除并标记为断开
delete(clientConns, addr)
client.IsConnected = false
} else {
TCPDataLogger.Printf("手动发送查询指令到客户端 %s: %s", addr, strings.TrimSpace(command))
sentCount++
}
} else {
// 没有连接记录,标记为断开
client.IsConnected = false
}
}
}
Logger.Printf("手动查询指令已发送到 %d 个客户端", sentCount)
return sentCount
}