fix:修改TCP连接逻辑

1. 修改TCP连接逻辑
2. 新增查询最新数据的接口
This commit is contained in:
fengyarnom 2025-05-15 18:25:17 +08:00
parent 7fe577eedd
commit 36766ead45
4 changed files with 140 additions and 59 deletions

View File

@ -73,10 +73,12 @@ func (h *SensorHandler) GetConnectionStatus(w http.ResponseWriter, r *http.Reque
Connected bool `json:"connected"` Connected bool `json:"connected"`
IP string `json:"ip"` IP string `json:"ip"`
Port string `json:"port"` Port string `json:"port"`
Count int `json:"count"` // 添加连接数量
}{ }{
Connected: connected, Connected: connected,
IP: ip, IP: ip,
Port: port, Port: port,
Count: tcp.GetActiveConnectionCount(), // 获取活跃连接数
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -85,3 +87,44 @@ func (h *SensorHandler) GetConnectionStatus(w http.ResponseWriter, r *http.Reque
http.Error(w, "服务器内部错误", http.StatusInternalServerError) http.Error(w, "服务器内部错误", http.StatusInternalServerError)
} }
} }
// GetLatestData 获取最新的聚合数据
func (h *SensorHandler) GetLatestData(w http.ResponseWriter, r *http.Request) {
interval := r.URL.Query().Get("interval")
if interval == "" {
interval = "5min" // 默认使用更细粒度的时间间隔
}
// 默认查询最近1小时的数据
endTime := time.Now()
startTime := endTime.Add(-1 * time.Hour)
// 如果提供了时间范围,则使用提供的范围
if startStr := r.URL.Query().Get("start"); startStr != "" {
if t, err := time.Parse(time.RFC3339, startStr); err == nil {
startTime = t
}
}
if endStr := r.URL.Query().Get("end"); endStr != "" {
if t, err := time.Parse(time.RFC3339, endStr); err == nil {
endTime = t
}
}
// 查询数据库获取最新数据
data, err := h.dao.GetAggregatedData(startTime, endTime, interval)
if err != nil {
logger.Logger.Printf("获取最新聚合数据失败: %v", err)
http.Error(w, "服务器内部错误", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
encoder := json.NewEncoder(w)
encoder.SetIndent("", " ")
if err := encoder.Encode(data); err != nil {
logger.Logger.Printf("JSON编码失败: %v", err)
http.Error(w, "服务器内部错误", http.StatusInternalServerError)
return
}
}

View File

@ -22,53 +22,63 @@ const (
// ConnectionStatus 保存TCP连接状态 // ConnectionStatus 保存TCP连接状态
var ( var (
ConnectionStatus = struct { // 活跃连接映射表
Connected bool activeConnections = struct {
IP string conns map[string]*SensorComm // 使用IP:Port作为键
Port string
LastSeen time.Time
mu sync.RWMutex mu sync.RWMutex
}{ }{
Connected: false, conns: make(map[string]*SensorComm),
IP: "",
Port: "",
} }
) )
// GetConnectionStatus 返回当前连接状态 // GetConnectionStatus 返回当前连接状态
func GetConnectionStatus() (bool, string, string) { func GetConnectionStatus() (bool, string, string) {
ConnectionStatus.mu.RLock() activeConnections.mu.RLock()
defer ConnectionStatus.mu.RUnlock() defer activeConnections.mu.RUnlock()
// 如果最后一次通信时间超过1分钟认为连接已断开 // 如果有任何活跃连接,返回第一个作为示例
if ConnectionStatus.Connected && time.Since(ConnectionStatus.LastSeen) > time.Minute { // 在未来可以改进为返回一个完整的连接列表
return false, ConnectionStatus.IP, ConnectionStatus.Port for addr, conn := range activeConnections.conns {
} if time.Since(conn.lastActivity) < time.Minute {
return ConnectionStatus.Connected, ConnectionStatus.IP, ConnectionStatus.Port
}
// updateConnectionStatus 更新连接状态
func updateConnectionStatus(connected bool, addr string) {
ConnectionStatus.mu.Lock()
defer ConnectionStatus.mu.Unlock()
ConnectionStatus.Connected = connected
if connected && addr != "" {
host, port, _ := net.SplitHostPort(addr) host, port, _ := net.SplitHostPort(addr)
ConnectionStatus.IP = host return true, host, port
ConnectionStatus.Port = port
ConnectionStatus.LastSeen = time.Now()
} else if !connected {
// 如果断开连接只更新状态保留最后的IP和端口信息
ConnectionStatus.Connected = false
} }
} }
return false, "", ""
}
// 更新连接映射表
func addConnection(addr string, conn *SensorComm) {
activeConnections.mu.Lock()
defer activeConnections.mu.Unlock()
activeConnections.conns[addr] = conn
logger.Logger.Printf("添加新连接: %s, 当前连接数: %d", addr, len(activeConnections.conns))
}
// 从映射表中移除连接
func removeConnection(addr string) {
activeConnections.mu.Lock()
defer activeConnections.mu.Unlock()
delete(activeConnections.conns, addr)
logger.Logger.Printf("移除连接: %s, 当前连接数: %d", addr, len(activeConnections.conns))
}
// 获取活跃连接数量
func getActiveConnectionCount() int {
activeConnections.mu.RLock()
defer activeConnections.mu.RUnlock()
return len(activeConnections.conns)
}
type SensorComm struct { type SensorComm struct {
conn net.Conn conn net.Conn
dao *dao.SensorDAO dao *dao.SensorDAO
address string address string
lastActivity time.Time
mu sync.Mutex mu sync.Mutex
} }
@ -78,6 +88,7 @@ func NewSensorComm(conn net.Conn, dao *dao.SensorDAO) *SensorComm {
conn: conn, conn: conn,
dao: dao, dao: dao,
address: conn.RemoteAddr().String(), address: conn.RemoteAddr().String(),
lastActivity: time.Now(),
} }
} }
@ -98,6 +109,7 @@ func (s *SensorComm) sendQuery() error {
return err return err
} }
s.lastActivity = time.Now()
logger.Logger.Printf("发送查询命令: %X", cmd) logger.Logger.Printf("发送查询命令: %X", cmd)
return nil return nil
} }
@ -107,11 +119,13 @@ func handleConnection(sensor *SensorComm) {
defer sensor.Close() defer sensor.Close()
logger.Logger.Printf("新连接建立: %s", sensor.address) logger.Logger.Printf("新连接建立: %s", sensor.address)
updateConnectionStatus(true, sensor.address)
// 添加到活跃连接列表
addConnection(sensor.address, sensor)
// 发送首次查询 // 发送首次查询
if err := sensor.sendQuery(); err != nil { if err := sensor.sendQuery(); err != nil {
updateConnectionStatus(false, "") removeConnection(sensor.address)
return return
} }
@ -146,14 +160,13 @@ func handleConnection(sensor *SensorComm) {
// 其他错误才认为连接断开 // 其他错误才认为连接断开
logger.Logger.Printf("读取数据失败: %v", err) logger.Logger.Printf("读取数据失败: %v", err)
updateConnectionStatus(false, "")
done <- true done <- true
return return
} }
if n > 0 { if n > 0 {
// 更新最后通信时间 // 更新最后活动时间
updateConnectionStatus(true, sensor.address) sensor.lastActivity = time.Now()
// 记录原始数据 // 记录原始数据
logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n]) logger.Logger.Printf("接收到原始数据 [%s]: % X", sensor.address, buffer[:n])
@ -176,11 +189,11 @@ func handleConnection(sensor *SensorComm) {
case <-ticker.C: case <-ticker.C:
logger.Logger.Printf("定时查询触发 [%s]", sensor.address) logger.Logger.Printf("定时查询触发 [%s]", sensor.address)
if err := sensor.sendQuery(); err != nil { if err := sensor.sendQuery(); err != nil {
updateConnectionStatus(false, "") removeConnection(sensor.address)
return return
} }
case <-done: case <-done:
updateConnectionStatus(false, "") removeConnection(sensor.address)
return return
} }
} }
@ -249,7 +262,9 @@ func (s *SensorComm) Close() {
if s.conn != nil { if s.conn != nil {
s.conn.Close() s.conn.Close()
s.conn = nil s.conn = nil
updateConnectionStatus(false, "")
// 从活跃连接列表中移除
removeConnection(s.address)
} }
} }
@ -269,9 +284,6 @@ func StartTCPServer(dao *dao.SensorDAO) error {
logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort) logger.Logger.Printf("TCP服务器启动在端口%s", tcpPort)
var currentConn *SensorComm
var mu sync.Mutex
for { for {
conn, err := listener.AcceptTCP() conn, err := listener.AcceptTCP()
if err != nil { if err != nil {
@ -284,16 +296,8 @@ func StartTCPServer(dao *dao.SensorDAO) error {
conn.SetKeepAlivePeriod(30 * time.Second) conn.SetKeepAlivePeriod(30 * time.Second)
conn.SetLinger(0) // 立即关闭 conn.SetLinger(0) // 立即关闭
mu.Lock()
// 关闭旧连接
if currentConn != nil {
currentConn.Close()
}
// 创建新连接 // 创建新连接
sensor := NewSensorComm(conn, dao) sensor := NewSensorComm(conn, dao)
currentConn = sensor
mu.Unlock()
logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr()) logger.Logger.Printf("新连接建立: %s", conn.RemoteAddr())

View File

@ -52,6 +52,7 @@ func main() {
// 设置路由 // 设置路由
http.HandleFunc("/api/data", sensorHandler.GetAggregatedData) http.HandleFunc("/api/data", sensorHandler.GetAggregatedData)
http.HandleFunc("/api/latest", sensorHandler.GetLatestData)
http.HandleFunc("/api/status", sensorHandler.GetConnectionStatus) http.HandleFunc("/api/status", sensorHandler.GetConnectionStatus)
http.HandleFunc("/", sensorHandler.ServeStatic) http.HandleFunc("/", sensorHandler.ServeStatic)

View File

@ -146,7 +146,8 @@
<div class="control-group"> <div class="control-group">
<button onclick="queryData()">查询</button> <button onclick="queryData()">查询历史数据</button>
<button onclick="queryLatestData()">查询最新数据</button>
<button onclick="exportData()">导出数据</button> <button onclick="exportData()">导出数据</button>
</div> </div>
</div> </div>
@ -185,7 +186,11 @@
const statusElem = document.getElementById('connectionStatus'); const statusElem = document.getElementById('connectionStatus');
if (data.connected) { if (data.connected) {
statusElem.style.backgroundColor = 'green'; statusElem.style.backgroundColor = 'green';
if (data.count > 1) {
statusElem.textContent = `已连接: ${data.ip}:${data.port} (共${data.count}个设备)`;
} else {
statusElem.textContent = `已连接: ${data.ip}:${data.port}`; statusElem.textContent = `已连接: ${data.ip}:${data.port}`;
}
} else { } else {
statusElem.style.backgroundColor = 'red'; statusElem.style.backgroundColor = 'red';
statusElem.textContent = '未连接'; statusElem.textContent = '未连接';
@ -224,7 +229,7 @@
return `${year}-${month}-${day}T${hours}:${minutes}`; return `${year}-${month}-${day}T${hours}:${minutes}`;
} }
// 查询数据 // 查询历史数据
function queryData() { function queryData() {
const interval = document.getElementById('interval').value; const interval = document.getElementById('interval').value;
const startDate = document.getElementById('startDate').value; const startDate = document.getElementById('startDate').value;
@ -246,6 +251,34 @@
}); });
} }
// 查询最新数据
function queryLatestData() {
const interval = document.getElementById('interval').value;
// 计算最近1小时的时间范围
const endTime = new Date();
const startTime = new Date(endTime.getTime() - (1 * 60 * 60 * 1000)); // 1小时前
// 确保时间格式符合后端要求
const startDateTime = startTime.toISOString();
const endDateTime = endTime.toISOString();
fetch(`/api/latest?interval=${interval}&start=${startDateTime}&end=${endDateTime}`)
.then(response => response.json())
.then(data => {
updateChart(data);
updateTable(data);
// 自动更新日期选择器为最近查询的时间范围
document.getElementById('startDate').value = formatDateTime(startTime);
document.getElementById('endDate').value = formatDateTime(endTime);
})
.catch(error => {
console.error('Error:', error);
alert('获取最新数据失败,请检查网络连接');
});
}
// 更新图表 // 更新图表
function updateChart(data) { function updateChart(data) {
const ctx = document.getElementById('mainChart').getContext('2d'); const ctx = document.getElementById('mainChart').getContext('2d');
@ -424,7 +457,7 @@
// 页面加载完成后初始化 // 页面加载完成后初始化
document.addEventListener('DOMContentLoaded', function() { document.addEventListener('DOMContentLoaded', function() {
initDatePickers(); initDatePickers();
queryData(); queryLatestData(); // 初始加载最新数据
// 每30秒检查一次连接状态 // 每30秒检查一次连接状态
checkConnectionStatus(); checkConnectionStatus();