diff --git a/internal/handler/sensor.go b/internal/handler/sensor.go index 002b6fb..d5c86a2 100644 --- a/internal/handler/sensor.go +++ b/internal/handler/sensor.go @@ -73,10 +73,12 @@ func (h *SensorHandler) GetConnectionStatus(w http.ResponseWriter, r *http.Reque Connected bool `json:"connected"` IP string `json:"ip"` Port string `json:"port"` + Count int `json:"count"` // 添加连接数量 }{ Connected: connected, IP: ip, Port: port, + Count: tcp.GetActiveConnectionCount(), // 获取活跃连接数 } w.Header().Set("Content-Type", "application/json") @@ -85,3 +87,44 @@ func (h *SensorHandler) GetConnectionStatus(w http.ResponseWriter, r *http.Reque http.Error(w, "服务器内部错误", http.StatusInternalServerError) } } + +// GetLatestData 获取最新的聚合数据 +func (h *SensorHandler) GetLatestData(w http.ResponseWriter, r *http.Request) { + interval := r.URL.Query().Get("interval") + if interval == "" { + interval = "5min" // 默认使用更细粒度的时间间隔 + } + + // 默认查询最近1小时的数据 + endTime := time.Now() + startTime := endTime.Add(-1 * time.Hour) + + // 如果提供了时间范围,则使用提供的范围 + if startStr := r.URL.Query().Get("start"); startStr != "" { + if t, err := time.Parse(time.RFC3339, startStr); err == nil { + startTime = t + } + } + if endStr := r.URL.Query().Get("end"); endStr != "" { + if t, err := time.Parse(time.RFC3339, endStr); err == nil { + endTime = t + } + } + + // 查询数据库获取最新数据 + data, err := h.dao.GetAggregatedData(startTime, endTime, interval) + if err != nil { + logger.Logger.Printf("获取最新聚合数据失败: %v", err) + http.Error(w, "服务器内部错误", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + encoder := json.NewEncoder(w) + encoder.SetIndent("", " ") + if err := encoder.Encode(data); err != nil { + logger.Logger.Printf("JSON编码失败: %v", err) + http.Error(w, "服务器内部错误", http.StatusInternalServerError) + return + } +} diff --git a/internal/tcp/sensor.go b/internal/tcp/sensor.go index a8c63b1..a98fc14 100644 --- a/internal/tcp/sensor.go +++ b/internal/tcp/sensor.go @@ -22,62 +22,73 @@ const ( // ConnectionStatus 保存TCP连接状态 var ( - ConnectionStatus = struct { - Connected bool - IP string - Port string - LastSeen time.Time - mu sync.RWMutex + // 活跃连接映射表 + activeConnections = struct { + conns map[string]*SensorComm // 使用IP:Port作为键 + mu sync.RWMutex }{ - Connected: false, - IP: "", - Port: "", + conns: make(map[string]*SensorComm), } ) // GetConnectionStatus 返回当前连接状态 func GetConnectionStatus() (bool, string, string) { - ConnectionStatus.mu.RLock() - defer ConnectionStatus.mu.RUnlock() + activeConnections.mu.RLock() + defer activeConnections.mu.RUnlock() - // 如果最后一次通信时间超过1分钟,认为连接已断开 - if ConnectionStatus.Connected && time.Since(ConnectionStatus.LastSeen) > time.Minute { - return false, ConnectionStatus.IP, ConnectionStatus.Port + // 如果有任何活跃连接,返回第一个作为示例 + // 在未来可以改进为返回一个完整的连接列表 + for addr, conn := range activeConnections.conns { + if time.Since(conn.lastActivity) < time.Minute { + host, port, _ := net.SplitHostPort(addr) + return true, host, port + } } - return ConnectionStatus.Connected, ConnectionStatus.IP, ConnectionStatus.Port + return false, "", "" } -// updateConnectionStatus 更新连接状态 -func updateConnectionStatus(connected bool, addr string) { - ConnectionStatus.mu.Lock() - defer ConnectionStatus.mu.Unlock() +// 更新连接映射表 +func addConnection(addr string, conn *SensorComm) { + activeConnections.mu.Lock() + defer activeConnections.mu.Unlock() - ConnectionStatus.Connected = connected - if connected && addr != "" { - host, port, _ := net.SplitHostPort(addr) - ConnectionStatus.IP = host - ConnectionStatus.Port = port - ConnectionStatus.LastSeen = time.Now() - } else if !connected { - // 如果断开连接,只更新状态,保留最后的IP和端口信息 - ConnectionStatus.Connected = false - } + activeConnections.conns[addr] = conn + logger.Logger.Printf("添加新连接: %s, 当前连接数: %d", addr, len(activeConnections.conns)) +} + +// 从映射表中移除连接 +func removeConnection(addr string) { + activeConnections.mu.Lock() + defer activeConnections.mu.Unlock() + + delete(activeConnections.conns, addr) + logger.Logger.Printf("移除连接: %s, 当前连接数: %d", addr, len(activeConnections.conns)) +} + +// 获取活跃连接数量 +func getActiveConnectionCount() int { + activeConnections.mu.RLock() + defer activeConnections.mu.RUnlock() + + return len(activeConnections.conns) } type SensorComm struct { - conn net.Conn - dao *dao.SensorDAO - address string - mu sync.Mutex + conn net.Conn + dao *dao.SensorDAO + address string + lastActivity time.Time + mu sync.Mutex } // 创建新的传感器通信实例 func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm { return &SensorComm{ - conn: conn, - dao: dao, - address: conn.RemoteAddr().String(), + conn: conn, + dao: dao, + address: conn.RemoteAddr().String(), + lastActivity: time.Now(), } } @@ -98,6 +109,7 @@ func (s *SensorComm) sendQuery() error { return err } + s.lastActivity = time.Now() logger.Logger.Printf("发送查询命令: %X", cmd) return nil } @@ -107,11 +119,13 @@ func handleConnection(sensor *SensorComm) { defer sensor.Close() logger.Logger.Printf("新连接建立: %s", sensor.address) - updateConnectionStatus(true, sensor.address) + + // 添加到活跃连接列表 + addConnection(sensor.address, sensor) // 发送首次查询 if err := sensor.sendQuery(); err != nil { - updateConnectionStatus(false, "") + removeConnection(sensor.address) return } @@ -146,14 +160,13 @@ func handleConnection(sensor *SensorComm) { // 其他错误才认为连接断开 logger.Logger.Printf("读取数据失败: %v", err) - updateConnectionStatus(false, "") done <- true return } if n > 0 { - // 更新最后通信时间 - updateConnectionStatus(true, sensor.address) + // 更新最后活动时间 + sensor.lastActivity = time.Now() // 记录原始数据 logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n]) @@ -176,11 +189,11 @@ func handleConnection(sensor *SensorComm) { case <-ticker.C: logger.Logger.Printf("定时查询触发 [%s]", sensor.address) if err := sensor.sendQuery(); err != nil { - updateConnectionStatus(false, "") + removeConnection(sensor.address) return } case <-done: - updateConnectionStatus(false, "") + removeConnection(sensor.address) return } } @@ -249,7 +262,9 @@ func (s *SensorComm) Close() { if s.conn != nil { s.conn.Close() s.conn = nil - updateConnectionStatus(false, "") + + // 从活跃连接列表中移除 + removeConnection(s.address) } } @@ -269,9 +284,6 @@ func StartTCPServer(dao *dao.SensorDAO) error { logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort) - var currentConn *SensorComm - var mu sync.Mutex - for { conn, err := listener.AcceptTCP() if err != nil { @@ -284,16 +296,8 @@ func StartTCPServer(dao *dao.SensorDAO) error { conn.SetKeepAlivePeriod(30 * time.Second) conn.SetLinger(0) // 立即关闭 - mu.Lock() - // 关闭旧连接 - if currentConn != nil { - currentConn.Close() - } - // 创建新连接 sensor := NewSensorComm(conn, dao) - currentConn = sensor - mu.Unlock() logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr()) diff --git a/main.go b/main.go index 44c78c9..6df05a0 100644 --- a/main.go +++ b/main.go @@ -52,6 +52,7 @@ func main() { // 设置路由 http.HandleFunc("/api/data", sensorHandler.GetAggregatedData) + http.HandleFunc("/api/latest", sensorHandler.GetLatestData) http.HandleFunc("/api/status", sensorHandler.GetConnectionStatus) http.HandleFunc("/", sensorHandler.ServeStatic) diff --git a/static/index.html b/static/index.html index f2065ee..366e65f 100644 --- a/static/index.html +++ b/static/index.html @@ -146,7 +146,8 @@