From 36766ead45cc13dc6bbbe0379853a549b2d56e31 Mon Sep 17 00:00:00 2001 From: fengyarnom Date: Thu, 15 May 2025 18:25:17 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E4=BF=AE=E6=94=B9TCP=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E9=80=BB=E8=BE=91=201.=20=E4=BF=AE=E6=94=B9TCP?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E9=80=BB=E8=BE=91=202.=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=9C=80=E6=96=B0=E6=95=B0=E6=8D=AE=E7=9A=84?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/handler/sensor.go | 43 ++++++++++++++ internal/tcp/sensor.go | 114 +++++++++++++++++++------------------ main.go | 1 + static/index.html | 41 +++++++++++-- 4 files changed, 140 insertions(+), 59 deletions(-) diff --git a/internal/handler/sensor.go b/internal/handler/sensor.go index 002b6fb..d5c86a2 100644 --- a/internal/handler/sensor.go +++ b/internal/handler/sensor.go @@ -73,10 +73,12 @@ func (h *SensorHandler) GetConnectionStatus(w http.ResponseWriter, r *http.Reque Connected bool `json:"connected"` IP string `json:"ip"` Port string `json:"port"` + Count int `json:"count"` // 添加连接数量 }{ Connected: connected, IP: ip, Port: port, + Count: tcp.GetActiveConnectionCount(), // 获取活跃连接数 } w.Header().Set("Content-Type", "application/json") @@ -85,3 +87,44 @@ func (h *SensorHandler) GetConnectionStatus(w http.ResponseWriter, r *http.Reque http.Error(w, "服务器内部错误", http.StatusInternalServerError) } } + +// GetLatestData 获取最新的聚合数据 +func (h *SensorHandler) GetLatestData(w http.ResponseWriter, r *http.Request) { + interval := r.URL.Query().Get("interval") + if interval == "" { + interval = "5min" // 默认使用更细粒度的时间间隔 + } + + // 默认查询最近1小时的数据 + endTime := time.Now() + startTime := endTime.Add(-1 * time.Hour) + + // 如果提供了时间范围,则使用提供的范围 + if startStr := r.URL.Query().Get("start"); startStr != "" { + if t, err := time.Parse(time.RFC3339, startStr); err == nil { + startTime = t + } + } + if endStr := r.URL.Query().Get("end"); endStr != "" { + if t, err := time.Parse(time.RFC3339, endStr); err == nil { + endTime = t + } + } + + // 查询数据库获取最新数据 + data, err := h.dao.GetAggregatedData(startTime, endTime, interval) + if err != nil { + logger.Logger.Printf("获取最新聚合数据失败: %v", err) + http.Error(w, "服务器内部错误", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + encoder := json.NewEncoder(w) + encoder.SetIndent("", " ") + if err := encoder.Encode(data); err != nil { + logger.Logger.Printf("JSON编码失败: %v", err) + http.Error(w, "服务器内部错误", http.StatusInternalServerError) + return + } +} diff --git a/internal/tcp/sensor.go b/internal/tcp/sensor.go index a8c63b1..a98fc14 100644 --- a/internal/tcp/sensor.go +++ b/internal/tcp/sensor.go @@ -22,62 +22,73 @@ const ( // ConnectionStatus 保存TCP连接状态 var ( - ConnectionStatus = struct { - Connected bool - IP string - Port string - LastSeen time.Time - mu sync.RWMutex + // 活跃连接映射表 + activeConnections = struct { + conns map[string]*SensorComm // 使用IP:Port作为键 + mu sync.RWMutex }{ - Connected: false, - IP: "", - Port: "", + conns: make(map[string]*SensorComm), } ) // GetConnectionStatus 返回当前连接状态 func GetConnectionStatus() (bool, string, string) { - ConnectionStatus.mu.RLock() - defer ConnectionStatus.mu.RUnlock() + activeConnections.mu.RLock() + defer activeConnections.mu.RUnlock() - // 如果最后一次通信时间超过1分钟,认为连接已断开 - if ConnectionStatus.Connected && time.Since(ConnectionStatus.LastSeen) > time.Minute { - return false, ConnectionStatus.IP, ConnectionStatus.Port + // 如果有任何活跃连接,返回第一个作为示例 + // 在未来可以改进为返回一个完整的连接列表 + for addr, conn := range activeConnections.conns { + if time.Since(conn.lastActivity) < time.Minute { + host, port, _ := net.SplitHostPort(addr) + return true, host, port + } } - return ConnectionStatus.Connected, ConnectionStatus.IP, ConnectionStatus.Port + return false, "", "" } -// updateConnectionStatus 更新连接状态 -func updateConnectionStatus(connected bool, addr string) { - ConnectionStatus.mu.Lock() - defer ConnectionStatus.mu.Unlock() +// 更新连接映射表 +func addConnection(addr string, conn *SensorComm) { + activeConnections.mu.Lock() + defer activeConnections.mu.Unlock() - ConnectionStatus.Connected = connected - if connected && addr != "" { - host, port, _ := net.SplitHostPort(addr) - ConnectionStatus.IP = host - ConnectionStatus.Port = port - ConnectionStatus.LastSeen = time.Now() - } else if !connected { - // 如果断开连接,只更新状态,保留最后的IP和端口信息 - ConnectionStatus.Connected = false - } + activeConnections.conns[addr] = conn + logger.Logger.Printf("添加新连接: %s, 当前连接数: %d", addr, len(activeConnections.conns)) +} + +// 从映射表中移除连接 +func removeConnection(addr string) { + activeConnections.mu.Lock() + defer activeConnections.mu.Unlock() + + delete(activeConnections.conns, addr) + logger.Logger.Printf("移除连接: %s, 当前连接数: %d", addr, len(activeConnections.conns)) +} + +// 获取活跃连接数量 +func getActiveConnectionCount() int { + activeConnections.mu.RLock() + defer activeConnections.mu.RUnlock() + + return len(activeConnections.conns) } type SensorComm struct { - conn net.Conn - dao *dao.SensorDAO - address string - mu sync.Mutex + conn net.Conn + dao *dao.SensorDAO + address string + lastActivity time.Time + mu sync.Mutex } // 创建新的传感器通信实例 func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm { return &SensorComm{ - conn: conn, - dao: dao, - address: conn.RemoteAddr().String(), + conn: conn, + dao: dao, + address: conn.RemoteAddr().String(), + lastActivity: time.Now(), } } @@ -98,6 +109,7 @@ func (s *SensorComm) sendQuery() error { return err } + s.lastActivity = time.Now() logger.Logger.Printf("发送查询命令: %X", cmd) return nil } @@ -107,11 +119,13 @@ func handleConnection(sensor *SensorComm) { defer sensor.Close() logger.Logger.Printf("新连接建立: %s", sensor.address) - updateConnectionStatus(true, sensor.address) + + // 添加到活跃连接列表 + addConnection(sensor.address, sensor) // 发送首次查询 if err := sensor.sendQuery(); err != nil { - updateConnectionStatus(false, "") + removeConnection(sensor.address) return } @@ -146,14 +160,13 @@ func handleConnection(sensor *SensorComm) { // 其他错误才认为连接断开 logger.Logger.Printf("读取数据失败: %v", err) - updateConnectionStatus(false, "") done <- true return } if n > 0 { - // 更新最后通信时间 - updateConnectionStatus(true, sensor.address) + // 更新最后活动时间 + sensor.lastActivity = time.Now() // 记录原始数据 logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n]) @@ -176,11 +189,11 @@ func handleConnection(sensor *SensorComm) { case <-ticker.C: logger.Logger.Printf("定时查询触发 [%s]", sensor.address) if err := sensor.sendQuery(); err != nil { - updateConnectionStatus(false, "") + removeConnection(sensor.address) return } case <-done: - updateConnectionStatus(false, "") + removeConnection(sensor.address) return } } @@ -249,7 +262,9 @@ func (s *SensorComm) Close() { if s.conn != nil { s.conn.Close() s.conn = nil - updateConnectionStatus(false, "") + + // 从活跃连接列表中移除 + removeConnection(s.address) } } @@ -269,9 +284,6 @@ func StartTCPServer(dao *dao.SensorDAO) error { logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort) - var currentConn *SensorComm - var mu sync.Mutex - for { conn, err := listener.AcceptTCP() if err != nil { @@ -284,16 +296,8 @@ func StartTCPServer(dao *dao.SensorDAO) error { conn.SetKeepAlivePeriod(30 * time.Second) conn.SetLinger(0) // 立即关闭 - mu.Lock() - // 关闭旧连接 - if currentConn != nil { - currentConn.Close() - } - // 创建新连接 sensor := NewSensorComm(conn, dao) - currentConn = sensor - mu.Unlock() logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr()) diff --git a/main.go b/main.go index 44c78c9..6df05a0 100644 --- a/main.go +++ b/main.go @@ -52,6 +52,7 @@ func main() { // 设置路由 http.HandleFunc("/api/data", sensorHandler.GetAggregatedData) + http.HandleFunc("/api/latest", sensorHandler.GetLatestData) http.HandleFunc("/api/status", sensorHandler.GetConnectionStatus) http.HandleFunc("/", sensorHandler.ServeStatic) diff --git a/static/index.html b/static/index.html index f2065ee..366e65f 100644 --- a/static/index.html +++ b/static/index.html @@ -146,7 +146,8 @@
- + +
@@ -185,7 +186,11 @@ const statusElem = document.getElementById('connectionStatus'); if (data.connected) { statusElem.style.backgroundColor = 'green'; - statusElem.textContent = `已连接: ${data.ip}:${data.port}`; + if (data.count > 1) { + statusElem.textContent = `已连接: ${data.ip}:${data.port} (共${data.count}个设备)`; + } else { + statusElem.textContent = `已连接: ${data.ip}:${data.port}`; + } } else { statusElem.style.backgroundColor = 'red'; statusElem.textContent = '未连接'; @@ -224,7 +229,7 @@ return `${year}-${month}-${day}T${hours}:${minutes}`; } - // 查询数据 + // 查询历史数据 function queryData() { const interval = document.getElementById('interval').value; const startDate = document.getElementById('startDate').value; @@ -246,6 +251,34 @@ }); } + // 查询最新数据 + function queryLatestData() { + const interval = document.getElementById('interval').value; + + // 计算最近1小时的时间范围 + const endTime = new Date(); + const startTime = new Date(endTime.getTime() - (1 * 60 * 60 * 1000)); // 1小时前 + + // 确保时间格式符合后端要求 + const startDateTime = startTime.toISOString(); + const endDateTime = endTime.toISOString(); + + fetch(`/api/latest?interval=${interval}&start=${startDateTime}&end=${endDateTime}`) + .then(response => response.json()) + .then(data => { + updateChart(data); + updateTable(data); + + // 自动更新日期选择器为最近查询的时间范围 + document.getElementById('startDate').value = formatDateTime(startTime); + document.getElementById('endDate').value = formatDateTime(endTime); + }) + .catch(error => { + console.error('Error:', error); + alert('获取最新数据失败,请检查网络连接'); + }); + } + // 更新图表 function updateChart(data) { const ctx = document.getElementById('mainChart').getContext('2d'); @@ -424,7 +457,7 @@ // 页面加载完成后初始化 document.addEventListener('DOMContentLoaded', function() { initDatePickers(); - queryData(); + queryLatestData(); // 初始加载最新数据 // 每30秒检查一次连接状态 checkConnectionStatus();