diff --git a/angle.go b/angle.go index 532b82d..9d2dc95 100644 --- a/angle.go +++ b/angle.go @@ -116,8 +116,21 @@ func parseBatchData(data string) ([]SensorReading, error) { // 保存批量传感器数据到数据库 func SaveBatchSensorData(readings []SensorReading) error { for _, reading := range readings { - // 使用现有的SaveSensorData函数保存每个传感器的数据 - if err := SaveSensorData(reading.SensorID, reading.X, reading.Y, reading.Z, reading.Temperature); err != nil { + deviceID := 0 + if idx := strings.Index(reading.SerialNumber, "-"); idx > 0 { + idPart := reading.SerialNumber[:idx] + if v, err := strconv.Atoi(idPart); err == nil { + deviceID = v + } + } + + if deviceID > 0 { + if err := EnsureDeviceExists(deviceID); err != nil { + return fmt.Errorf("确保设备 %d 存在失败: %v", deviceID, err) + } + } + + if err := SaveSensorData(reading.SensorID, reading.X, reading.Y, reading.Z, reading.Temperature, deviceID); err != nil { return fmt.Errorf("保存传感器 %d 数据失败: %v", reading.SensorID, err) } } diff --git a/clients.go b/clients.go new file mode 100644 index 0000000..b330aac --- /dev/null +++ b/clients.go @@ -0,0 +1,147 @@ +package main + +import ( + "fmt" + "net" + "sync" + "time" +) + +// 客户端信息结构 +type ClientInfo struct { + IP string // IP地址 + Port string // 端口 + LastSeen time.Time // 最后活跃时间 + IsConnected bool // 是否当前连接 +} + +// 客户端列表(使用互斥锁保护的映射) +var ( + clientsMutex sync.Mutex + clients = make(map[string]*ClientInfo) +) + +// addClient 添加客户端 +func addClient(addr string) { + clientsMutex.Lock() + defer clientsMutex.Unlock() + + host, port, err := net.SplitHostPort(addr) + if err != nil { + Logger.Printf("解析客户端地址失败 %s: %v", addr, err) + host = addr + port = "unknown" + } + + clients[addr] = &ClientInfo{ + IP: host, + Port: port, + LastSeen: time.Now(), + IsConnected: true, + } + + Logger.Printf("添加新客户端: %s", addr) +} + +// updateClientLastSeen 更新客户端最后活跃时间 +func updateClientLastSeen(addr string) { + clientsMutex.Lock() + defer clientsMutex.Unlock() + + if client, exists := clients[addr]; exists { + client.LastSeen = time.Now() + } +} + +// removeClient 移除客户端(标记断开) +func removeClient(addr string) { + clientsMutex.Lock() + defer clientsMutex.Unlock() + + if client, exists := clients[addr]; exists { + client.IsConnected = false + Logger.Printf("客户端断开连接: %s", addr) + } +} + +// getAllClients 获取所有客户端信息 +func getAllClients() []map[string]interface{} { + clientsMutex.Lock() + defer clientsMutex.Unlock() + + now := time.Now() + result := make([]map[string]interface{}, 0, len(clients)) + + for addr, client := range clients { + lastSeenDuration := now.Sub(client.LastSeen) + + // 清理24小时前的记录 + if lastSeenDuration > 24*time.Hour { + delete(clients, addr) + continue + } + + // 连接状态判断:当前连接且2小时内活跃为在线 + isOnline := client.IsConnected && lastSeenDuration < 2*time.Hour + var connectionStatus string + if isOnline { + connectionStatus = "保持连接" + } else if client.IsConnected { + connectionStatus = "连接超时" + } else { + connectionStatus = "已断开" + } + + result = append(result, map[string]interface{}{ + "address": addr, + "ip": client.IP, + "port": client.Port, + "lastSeen": client.LastSeen, + "isOnline": isOnline, + "connectionStatus": connectionStatus, + "lastSeenFormatted": formatDuration(lastSeenDuration), + }) + } + + return result +} + +// formatDuration 格式化持续时间为友好的字符串 +func formatDuration(d time.Duration) string { + if d < time.Minute { + return "刚刚" + } else if d < time.Hour { + return fmt.Sprintf("%d分钟前", int(d.Minutes())) + } else if d < 24*time.Hour { + hours := int(d.Hours()) + minutes := int(d.Minutes()) % 60 + if minutes == 0 { + return fmt.Sprintf("%d小时前", hours) + } else { + return fmt.Sprintf("%d小时%d分钟前", hours, minutes) + } + } else { + return fmt.Sprintf("%d天前", int(d.Hours()/24)) + } +} + +// startClientCleanup 启动清理过期客户端的goroutine +func startClientCleanup() { + go func() { + for { + time.Sleep(1 * time.Hour) // 每小时检查一次 + + clientsMutex.Lock() + now := time.Now() + + for addr, client := range clients { + if now.Sub(client.LastSeen) > 24*time.Hour { + delete(clients, addr) + Logger.Printf("移除过期客户端: %s", addr) + } + } + + clientsMutex.Unlock() + } + }() +} diff --git a/db.go b/db.go index f1e8f15..7485765 100644 --- a/db.go +++ b/db.go @@ -14,9 +14,9 @@ const SCALING_FACTOR = 1000 // 浮点数到整数的转换因子 // 初始化数据库连接 func InitDB() error { - username := "root" + username := "remote" password := "root" - host := "localhost" + host := "8.134.185.53" port := "3306" dbName := "probe_db" @@ -48,14 +48,32 @@ func CloseDB() { } } -// 保存传感器数据 - 将浮点值转换为整数存储,添加温度支持 -func SaveSensorData(sensorID int, x, y, z, temperature float64) error { +// EnsureDeviceExists 确保设备存在,不存在则创建为默认配置 +func EnsureDeviceExists(deviceID int) error { + if deviceID <= 0 { + return nil + } + _, err := db.Exec( + "INSERT INTO devices (id, forward_enable, host, port) VALUES (?, 0, NULL, NULL) ON DUPLICATE KEY UPDATE id = id", + deviceID, + ) + return err +} + +// 保存传感器数据 - 将浮点值转换为整数存储,添加温度与设备ID支持 +func SaveSensorData(sensorID int, x, y, z, temperature float64, deviceID int) error { xInt := int(x * SCALING_FACTOR) yInt := int(y * SCALING_FACTOR) zInt := int(z * SCALING_FACTOR) tempInt := int(temperature * SCALING_FACTOR) - query := `INSERT INTO sensor_data (sensor_id, x_value, y_value, z_value, temperature) VALUES (?, ?, ?, ?, ?)` + if deviceID > 0 { + query := `INSERT INTO sensor_data (sensor_id, x_value, y_value, z_value, temperature, device_id) VALUES (?, ?, ?, ?, ?, ?)` + _, err := db.Exec(query, sensorID, xInt, yInt, zInt, tempInt, deviceID) + return err + } + + query := `INSERT INTO sensor_data (sensor_id, x_value, y_value, z_value, temperature, device_id) VALUES (?, ?, ?, ?, ?, NULL)` _, err := db.Exec(query, sensorID, xInt, yInt, zInt, tempInt) return err } @@ -205,3 +223,27 @@ type SensorData struct { Temperature float64 `json:"temperature"` Timestamp time.Time `json:"timestamp"` } + +// Device 表映射 +type Device struct { + ID int + ForwardEnable bool + Host sql.NullString + Port sql.NullInt64 + RegCodeHex sql.NullString +} + +// GetDevice 获取设备配置 +func GetDevice(deviceID int) (*Device, error) { + row := db.QueryRow(`SELECT id, COALESCE(forward_enable, 0) as forward_enable, host, port, reg_code_hex FROM devices WHERE id = ?`, deviceID) + var d Device + var fe int + if err := row.Scan(&d.ID, &fe, &d.Host, &d.Port, &d.RegCodeHex); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + d.ForwardEnable = fe != 0 + return &d, nil +} diff --git a/forwarder.go b/forwarder.go new file mode 100644 index 0000000..9b5590e --- /dev/null +++ b/forwarder.go @@ -0,0 +1,63 @@ +package main + +import ( + "encoding/hex" + "fmt" + "net" + "time" +) + +// ExtractDeviceIDFromBatchRaw 从批量原始数据中提取设备ID(序列号形如 1513343-01) +func ExtractDeviceIDFromBatchRaw(data string) int { + for i := 0; i < len(data); i++ { + if data[i] >= '0' && data[i] <= '9' { + j := i + for j < len(data) && data[j] >= '0' && data[j] <= '9' { + j++ + } + if j < len(data) && data[j] == '-' { + var v int + for k := i; k < j; k++ { + v = v*10 + int(data[k]-'0') + } + return v + } + } + } + return 0 +} + +// ForwardRawData 将原始数据转发到设备配置的 TCP 目标 +func ForwardRawData(deviceID int, raw string) error { + if deviceID <= 0 { + return nil + } + dev, err := GetDevice(deviceID) + if err != nil || dev == nil { + return err + } + if !dev.ForwardEnable { + return nil + } + if !dev.Host.Valid || dev.Host.String == "" || !dev.Port.Valid || dev.Port.Int64 <= 0 { + return nil + } + + address := fmt.Sprintf("%s:%d", dev.Host.String, dev.Port.Int64) + conn, err := net.DialTimeout("tcp", address, 5*time.Second) + if err != nil { + return err + } + defer conn.Close() + + if dev.RegCodeHex.Valid && dev.RegCodeHex.String != "" { + bytes, decErr := hex.DecodeString(dev.RegCodeHex.String) + if decErr == nil { + _, _ = conn.Write(bytes) + time.Sleep(1 * time.Second) + } + } + + _, err = conn.Write([]byte(raw)) + return err +} diff --git a/http_server.go b/http_server.go index df06491..3b7261e 100644 --- a/http_server.go +++ b/http_server.go @@ -20,7 +20,6 @@ func StartHTTPServer(address string) error { http.HandleFunc("/api/latest", handleGetLatest) http.HandleFunc("/api/sensors", handleGetSensors) http.HandleFunc("/api/clients", handleGetClients) - http.HandleFunc("/api/trigger-query", handleTriggerQuery) fmt.Printf("HTTP服务器已启动,正在监听 %s\n", address) return http.ListenAndServe(address, nil) } @@ -184,30 +183,6 @@ func handleGetClients(w http.ResponseWriter, r *http.Request) { } } -// 处理手动触发查询指令的API -func handleTriggerQuery(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - - if r.Method != "POST" { - http.Error(w, "只支持POST请求", http.StatusMethodNotAllowed) - return - } - - // 触发向所有在线客户端发送查询指令 - sent := triggerManualQuery() - - response := map[string]interface{}{ - "success": true, - "message": fmt.Sprintf("已向%d个客户端发送查询指令", sent), - "sent_count": sent, - } - - if err := json.NewEncoder(w).Encode(response); err != nil { - log.Printf("错误: 编码响应JSON失败: %v", err) - http.Error(w, "编码JSON失败:"+err.Error(), http.StatusInternalServerError) - } -} - // 处理获取最新传感器数据的API func handleGetLatest(w http.ResponseWriter, r *http.Request) { log.Printf("接收到获取最新数据请求") diff --git a/main.go b/main.go index bd3db93..f865bf2 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,6 @@ import ( ) func main() { - serverType := flag.String("server", "tcp", "服务器类型: tcp 或 udp") - tcpPort := flag.String("tcpport", "10002", "TCP服务器端口") udpPort := flag.String("udpport", "10002", "UDP服务器端口") httpPort := flag.String("httpport", "10001", "HTTP服务器端口") flag.Parse() @@ -26,24 +24,14 @@ func main() { } defer CloseDB() - var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - if *serverType == "tcp" { - Logger.Printf("使用TCP服务器模式") - if err := StartTCPServer(":" + *tcpPort); err != nil { - log.Fatalf("TCP服务器启动失败: %v", err) - } - } else if *serverType == "udp" { - Logger.Printf("使用UDP服务器模式") - if err := StartUDPServer(":" + *udpPort); err != nil { - log.Fatalf("UDP服务器启动失败: %v", err) - } - } else { - log.Fatalf("未知的服务器类型: %s", *serverType) + Logger.Printf("使用UDP服务器模式") + if err := StartUDPServer(":" + *udpPort); err != nil { + log.Fatalf("UDP服务器启动失败: %v", err) } }() @@ -55,15 +43,9 @@ func main() { } }() - if *serverType == "tcp" { - fmt.Println("服务器已启动成功") - fmt.Printf("- HTTP接口: http://localhost:%s\n", *httpPort) - fmt.Printf("- TCP接口: localhost:%s\n", *tcpPort) - } else { - fmt.Println("服务器已启动成功") - fmt.Printf("- HTTP接口: http://localhost:%s\n", *httpPort) - fmt.Printf("- UDP接口: localhost:%s\n", *udpPort) - } + fmt.Println("服务器已启动成功") + fmt.Printf("- HTTP接口: http://localhost:%s\n", *httpPort) + fmt.Printf("- UDP接口: localhost:%s\n", *udpPort) wg.Wait() } diff --git a/parser.go b/parser.go new file mode 100644 index 0000000..1326e11 --- /dev/null +++ b/parser.go @@ -0,0 +1,106 @@ +package main + +import ( + "fmt" + "regexp" + "strconv" + "strings" +) + +// parseData 使用正则表达式解析传感器数据,支持新格式 #{1602301014-01,1,1,28.4,-6.884,1.540}! +func parseData(data string) (int, float64, float64, float64, float64, error) { + // 尝试解析新格式: #{1602301014-01,1,1,28.4,-6.884,1.540}! + newPattern := regexp.MustCompile(`#\{[^,]+-(\d+),\d+,(\d+),([-]?\d+\.\d+),([-]?\d+\.\d+),([-]?\d+\.\d+)\}!`) + matches := newPattern.FindStringSubmatch(data) + + if len(matches) == 6 { + // 新格式解析 + sensorID, err := strconv.Atoi(matches[2]) // 使用传感器地址编号 + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析传感器ID失败: %v", err) + } + + temperature, err := strconv.ParseFloat(strings.TrimSpace(matches[3]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析温度值失败: %v", err) + } + + x, err := strconv.ParseFloat(strings.TrimSpace(matches[4]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析X值失败: %v", err) + } + + y, err := strconv.ParseFloat(strings.TrimSpace(matches[5]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析Y值失败: %v", err) + } + + z := 0.0 // 新格式没有Z值,设为0 + + return sensorID, x, y, z, temperature, nil + } + + // 尝试解析无包装符号的格式: 1602301014-01,1,1,31.1,-6.781,1.542 + plainPattern := regexp.MustCompile(`([^,]+)-(\d+),\d+,(\d+),([-]?\d+\.\d+),([-]?\d+\.\d+),([-]?\d+\.\d+)`) + matches = plainPattern.FindStringSubmatch(data) + + if len(matches) == 7 { + // 无包装符号格式解析 + sensorID, err := strconv.Atoi(matches[3]) // 使用传感器地址编号 + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析传感器ID失败: %v", err) + } + + temperature, err := strconv.ParseFloat(strings.TrimSpace(matches[4]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析温度值失败: %v", err) + } + + x, err := strconv.ParseFloat(strings.TrimSpace(matches[5]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析X值失败: %v", err) + } + + y, err := strconv.ParseFloat(strings.TrimSpace(matches[6]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析Y值失败: %v", err) + } + + z := 0.0 // 这种格式没有Z值,设为0 + + return sensorID, x, y, z, temperature, nil + } + + // 尝试解析旧格式: 1:1.000, 2.000, 3.000 + oldPattern := regexp.MustCompile(`(\d+):([-]?\d+\.\d+),\s*([-]?\d+\.\d+),\s*([-]?\d+\.\d+)`) + matches = oldPattern.FindStringSubmatch(data) + + if len(matches) == 5 { + // 旧格式解析 + sensorID, err := strconv.Atoi(matches[1]) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析传感器ID失败: %v", err) + } + + x, err := strconv.ParseFloat(strings.TrimSpace(matches[2]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析X值失败: %v", err) + } + + y, err := strconv.ParseFloat(strings.TrimSpace(matches[3]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析Y值失败: %v", err) + } + + z, err := strconv.ParseFloat(strings.TrimSpace(matches[4]), 64) + if err != nil { + return 0, 0, 0, 0, 0, fmt.Errorf("解析Z值失败: %v", err) + } + + temperature := 0.0 // 旧格式没有温度值,设为0 + + return sensorID, x, y, z, temperature, nil + } + + return 0, 0, 0, 0, 0, fmt.Errorf("数据格式不正确: %s", data) +} diff --git a/tcp_server.go b/tcp_server.go deleted file mode 100644 index 3408989..0000000 --- a/tcp_server.go +++ /dev/null @@ -1,436 +0,0 @@ -package main - -import ( - "fmt" - "io" - "net" - "regexp" - "strconv" - "strings" - "sync" - "time" -) - -// 客户端信息结构 -type ClientInfo struct { - IP string // IP地址 - Port string // 端口 - LastSeen time.Time // 最后活跃时间 - IsConnected bool // 是否当前连接 -} - -// 客户端列表(使用互斥锁保护的映射) -var ( - clientsMutex sync.Mutex - clients = make(map[string]*ClientInfo) - clientConns = make(map[string]net.Conn) // 存储客户端连接 -) - -// StartTCPServer 启动TCP服务器 -func StartTCPServer(address string) error { - listener, err := net.Listen("tcp", address) - if err != nil { - return err - } - - startClientCleanup() - - Logger.Printf("TCP服务器已启动,正在监听 %s\n", address) - - for { - conn, err := listener.Accept() - if err != nil { - Logger.Printf("接受连接失败: %v", err) - continue - } - - go handleConnection(conn) - } -} - -// handleConnection 处理客户端连接 -func handleConnection(conn net.Conn) { - defer conn.Close() - - remoteAddr := conn.RemoteAddr().String() - Logger.Printf("新的客户端连接: %s", remoteAddr) - - addClient(remoteAddr) - - // 存储连接以便手动查询使用 - clientsMutex.Lock() - clientConns[remoteAddr] = conn - clientsMutex.Unlock() - - // 注释掉自动发送指令 - 设备刚连接时立即发送一次查询指令 - go func() { - time.Sleep(2 * time.Second) // 等待2秒让连接稳定 - command := "@1602301014A*0!\n" - if _, err := conn.Write([]byte(command)); err != nil { - Logger.Printf("发送连接后查询指令到客户端 %s 失败: %v", remoteAddr, err) - } else { - TCPDataLogger.Printf("发送连接后查询指令到客户端 %s: %s", remoteAddr, strings.TrimSpace(command)) - } - }() - - // 启动定时发送指令的goroutine(30分钟间隔) - go sendPeriodicCommand(conn, remoteAddr) - - buffer := make([]byte, 1024) - - for { - n, err := conn.Read(buffer) - if err != nil { - if err != io.EOF { - Logger.Printf("从客户端读取失败 %s: %v", remoteAddr, err) - } else { - Logger.Printf("客户端断开连接 %s", remoteAddr) - } - removeClient(remoteAddr) - // 清理连接映射 - clientsMutex.Lock() - delete(clientConns, remoteAddr) - clientsMutex.Unlock() - break - } - - rawData := string(buffer[:n]) - TCPDataLogger.Printf("从客户端 %s 接收到原始数据: %s", remoteAddr, rawData) - - // 注释掉心跳包响应 - 检查是否为心跳包 JML610 - // if strings.Contains(rawData, "JML610") { - // TCPDataLogger.Printf("收到心跳包从客户端 %s: %s", remoteAddr, rawData) - // - // // 立即发送查询指令 - // command := "@1602301014A*0!\n" - // if _, err := conn.Write([]byte(command)); err != nil { - // Logger.Printf("响应心跳包发送查询指令到客户端 %s 失败: %v", remoteAddr, err) - // } else { - // TCPDataLogger.Printf("响应心跳包发送查询指令到客户端 %s: %s", remoteAddr, strings.TrimSpace(command)) - // } - // - // updateClientLastSeen(remoteAddr) - // continue // 跳过数据解析,继续监听 - // } - - // 检查是否为心跳包 JML610(仅记录,不发送查询指令) - if strings.Contains(rawData, "JML610") { - TCPDataLogger.Printf("收到心跳包从客户端 %s: %s", remoteAddr, rawData) - updateClientLastSeen(remoteAddr) - - // 发送OK响应 - resp := "OK\n" - if _, err := conn.Write([]byte(resp)); err != nil { - Logger.Printf("发送响应到客户端 %s 失败: %v", remoteAddr, err) - removeClient(remoteAddr) - // 清理连接映射 - clientsMutex.Lock() - delete(clientConns, remoteAddr) - clientsMutex.Unlock() - break - } - continue // 跳过数据解析,继续监听 - } - - sensorID, x, y, z, temperature, err := parseData(rawData) - - if err == nil { - TCPDataLogger.Printf("解析成功 - 客户端: %s, 传感器ID: %d, 值: X=%.3f, Y=%.3f, Z=%.3f, 温度=%.1f°C", - remoteAddr, sensorID, x, y, z, temperature) - - if err := SaveSensorData(sensorID, x, y, z, temperature); err != nil { - Logger.Printf("保存传感器数据失败: %v", err) - } - } else { - TCPDataLogger.Printf("无法解析从客户端 %s 接收到的数据: %s, 错误: %v", remoteAddr, rawData, err) - } - - resp := "OK\n" - if _, err := conn.Write([]byte(resp)); err != nil { - Logger.Printf("发送响应到客户端 %s 失败: %v", remoteAddr, err) - removeClient(remoteAddr) - // 清理连接映射 - clientsMutex.Lock() - delete(clientConns, remoteAddr) - clientsMutex.Unlock() - break - } - - updateClientLastSeen(remoteAddr) - } -} - -// sendPeriodicCommand 每30分钟发送一次查询指令 -func sendPeriodicCommand(conn net.Conn, remoteAddr string) { - ticker := time.NewTicker(30 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - command := "@1602301014A*0!\n" - if _, err := conn.Write([]byte(command)); err != nil { - Logger.Printf("发送定时指令到客户端 %s 失败: %v", remoteAddr, err) - return // 连接断开,退出goroutine - } - TCPDataLogger.Printf("发送定时指令到客户端 %s: %s", remoteAddr, strings.TrimSpace(command)) - } - } -} - -// parseData 使用正则表达式解析传感器数据,支持新格式 #{1602301014-01,1,1,28.4,-6.884,1.540}! -func parseData(data string) (int, float64, float64, float64, float64, error) { - // 尝试解析新格式: #{1602301014-01,1,1,28.4,-6.884,1.540}! - newPattern := regexp.MustCompile(`#\{[^,]+-(\d+),\d+,(\d+),([-]?\d+\.\d+),([-]?\d+\.\d+),([-]?\d+\.\d+)\}!`) - matches := newPattern.FindStringSubmatch(data) - - if len(matches) == 6 { - // 新格式解析 - sensorID, err := strconv.Atoi(matches[2]) // 使用传感器地址编号 - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析传感器ID失败: %v", err) - } - - temperature, err := strconv.ParseFloat(strings.TrimSpace(matches[3]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析温度值失败: %v", err) - } - - x, err := strconv.ParseFloat(strings.TrimSpace(matches[4]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析X值失败: %v", err) - } - - y, err := strconv.ParseFloat(strings.TrimSpace(matches[5]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析Y值失败: %v", err) - } - - z := 0.0 // 新格式没有Z值,设为0 - - return sensorID, x, y, z, temperature, nil - } - - // 尝试解析无包装符号的格式: 1602301014-01,1,1,31.1,-6.781,1.542 - plainPattern := regexp.MustCompile(`([^,]+)-(\d+),\d+,(\d+),([-]?\d+\.\d+),([-]?\d+\.\d+),([-]?\d+\.\d+)`) - matches = plainPattern.FindStringSubmatch(data) - - if len(matches) == 7 { - // 无包装符号格式解析 - sensorID, err := strconv.Atoi(matches[3]) // 使用传感器地址编号 - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析传感器ID失败: %v", err) - } - - temperature, err := strconv.ParseFloat(strings.TrimSpace(matches[4]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析温度值失败: %v", err) - } - - x, err := strconv.ParseFloat(strings.TrimSpace(matches[5]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析X值失败: %v", err) - } - - y, err := strconv.ParseFloat(strings.TrimSpace(matches[6]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析Y值失败: %v", err) - } - - z := 0.0 // 这种格式没有Z值,设为0 - - return sensorID, x, y, z, temperature, nil - } - - // 尝试解析旧格式: 1:1.000, 2.000, 3.000 - oldPattern := regexp.MustCompile(`(\d+):([-]?\d+\.\d+),\s*([-]?\d+\.\d+),\s*([-]?\d+\.\d+)`) - matches = oldPattern.FindStringSubmatch(data) - - if len(matches) == 5 { - // 旧格式解析 - sensorID, err := strconv.Atoi(matches[1]) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析传感器ID失败: %v", err) - } - - x, err := strconv.ParseFloat(strings.TrimSpace(matches[2]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析X值失败: %v", err) - } - - y, err := strconv.ParseFloat(strings.TrimSpace(matches[3]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析Y值失败: %v", err) - } - - z, err := strconv.ParseFloat(strings.TrimSpace(matches[4]), 64) - if err != nil { - return 0, 0, 0, 0, 0, fmt.Errorf("解析Z值失败: %v", err) - } - - temperature := 0.0 // 旧格式没有温度值,设为0 - - return sensorID, x, y, z, temperature, nil - } - - return 0, 0, 0, 0, 0, fmt.Errorf("数据格式不正确: %s", data) -} - -// addClient 添加客户端 -func addClient(addr string) { - clientsMutex.Lock() - defer clientsMutex.Unlock() - - host, port, err := net.SplitHostPort(addr) - if err != nil { - Logger.Printf("解析客户端地址失败 %s: %v", addr, err) - host = addr - port = "unknown" - } - - clients[addr] = &ClientInfo{ - IP: host, - Port: port, - LastSeen: time.Now(), - IsConnected: true, - } - - Logger.Printf("添加新客户端: %s", addr) -} - -// updateClientLastSeen 更新客户端最后活跃时间 -func updateClientLastSeen(addr string) { - clientsMutex.Lock() - defer clientsMutex.Unlock() - - if client, exists := clients[addr]; exists { - client.LastSeen = time.Now() - } -} - -// removeClient 移除客户端 -func removeClient(addr string) { - clientsMutex.Lock() - defer clientsMutex.Unlock() - - if client, exists := clients[addr]; exists { - // 标记为断开连接,不更新LastSeen时间 - client.IsConnected = false - Logger.Printf("客户端断开连接: %s", addr) - } -} - -// getAllClients 获取所有客户端信息 -func getAllClients() []map[string]interface{} { - clientsMutex.Lock() - defer clientsMutex.Unlock() - - now := time.Now() - result := make([]map[string]interface{}, 0, len(clients)) - - for addr, client := range clients { - lastSeenDuration := now.Sub(client.LastSeen) - - // 清理24小时前的记录 - if lastSeenDuration > 24*time.Hour { - delete(clients, addr) - continue - } - - // 连接状态判断:当前连接且2小时内活跃为在线 - isOnline := client.IsConnected && lastSeenDuration < 2*time.Hour - var connectionStatus string - if isOnline { - connectionStatus = "保持连接" - } else if client.IsConnected { - connectionStatus = "连接超时" - } else { - connectionStatus = "已断开" - } - - result = append(result, map[string]interface{}{ - "address": addr, - "ip": client.IP, - "port": client.Port, - "lastSeen": client.LastSeen, - "isOnline": isOnline, - "connectionStatus": connectionStatus, - "lastSeenFormatted": formatDuration(lastSeenDuration), - }) - } - - return result -} - -// formatDuration 格式化持续时间为友好的字符串 -func formatDuration(d time.Duration) string { - if d < time.Minute { - return "刚刚" - } else if d < time.Hour { - return fmt.Sprintf("%d分钟前", int(d.Minutes())) - } else if d < 24*time.Hour { - hours := int(d.Hours()) - minutes := int(d.Minutes()) % 60 - if minutes == 0 { - return fmt.Sprintf("%d小时前", hours) - } else { - return fmt.Sprintf("%d小时%d分钟前", hours, minutes) - } - } else { - return fmt.Sprintf("%d天前", int(d.Hours()/24)) - } -} - -// startClientCleanup 启动清理过期客户端的goroutine -func startClientCleanup() { - go func() { - for { - time.Sleep(1 * time.Hour) // 每小时检查一次 - - clientsMutex.Lock() - now := time.Now() - - for addr, client := range clients { - if now.Sub(client.LastSeen) > 24*time.Hour { - delete(clients, addr) - delete(clientConns, addr) // 同时清理连接 - Logger.Printf("移除过期客户端: %s", addr) - } - } - - clientsMutex.Unlock() - } - }() -} - -// triggerManualQuery 手动触发向所有在线客户端发送查询指令 -func triggerManualQuery() int { - clientsMutex.Lock() - defer clientsMutex.Unlock() - - sentCount := 0 - command := "@1602301014A*0!\n" - - for addr, client := range clients { - // 检查客户端是否在线(连接状态且2小时内活跃) - if client.IsConnected && time.Since(client.LastSeen) < 2*time.Hour { - if conn, exists := clientConns[addr]; exists { - if _, err := conn.Write([]byte(command)); err != nil { - Logger.Printf("手动发送查询指令到客户端 %s 失败: %v", addr, err) - // 连接可能已断开,从映射中移除并标记为断开 - delete(clientConns, addr) - client.IsConnected = false - } else { - TCPDataLogger.Printf("手动发送查询指令到客户端 %s: %s", addr, strings.TrimSpace(command)) - sentCount++ - } - } else { - // 没有连接记录,标记为断开 - client.IsConnected = false - } - } - } - - Logger.Printf("手动查询指令已发送到 %d 个客户端", sentCount) - return sentCount -} diff --git a/udp_server.go b/udp_server.go index 63797f3..9b30c72 100644 --- a/udp_server.go +++ b/udp_server.go @@ -3,6 +3,8 @@ package main import ( "fmt" "net" + "regexp" + "strconv" ) // StartUDPServer 启动UDP服务器 @@ -55,6 +57,15 @@ func handleUDPPacket(conn *net.UDPConn, addr *net.UDPAddr, data []byte) { Logger.Printf("保存批量传感器数据失败: %v", err) } + // 尝试提取设备ID并透传原始数据 + if deviceID := ExtractDeviceIDFromBatchRaw(rawData); deviceID > 0 { + go func(id int, payload string) { + if fErr := ForwardRawData(id, payload); fErr != nil { + Logger.Printf("转发设备 %d 数据失败: %v", id, fErr) + } + }(deviceID, rawData) + } + // 记录第一个传感器的数据(用于兼容现有日志格式) firstSensor := readings[0] TCPDataLogger.Printf("解析成功 - UDP客户端: %s, 传感器ID: %d, 值: X=%.3f, Y=%.3f, Z=%.3f, 温度=%.1f°C", @@ -67,9 +78,20 @@ func handleUDPPacket(conn *net.UDPConn, addr *net.UDPAddr, data []byte) { TCPDataLogger.Printf("解析成功 - UDP客户端: %s, 传感器ID: %d, 值: X=%.3f, Y=%.3f, Z=%.3f, 温度=%.1f°C", remoteAddr, sensorID, x, y, z, temperature) - if err := SaveSensorData(sensorID, x, y, z, temperature); err != nil { + if err := SaveSensorData(sensorID, x, y, z, temperature, 0); err != nil { Logger.Printf("保存传感器数据失败: %v", err) } + + // 从原始字符串尝试提取设备ID并透传 + if m := regexp.MustCompile(`([0-9]+)-\d+`).FindStringSubmatch(rawData); len(m) == 2 { + if id, convErr := strconv.Atoi(m[1]); convErr == nil && id > 0 { + go func(id int, payload string) { + if fErr := ForwardRawData(id, payload); fErr != nil { + Logger.Printf("转发设备 %d 数据失败: %v", id, fErr) + } + }(id, rawData) + } + } } else { TCPDataLogger.Printf("无法解析从UDP客户端 %s 接收到的数据: %s, 错误: %v", remoteAddr, rawData, parseErr) }