From bd9a6e575e41e27c56d1668c7ab5ec9997102a5c Mon Sep 17 00:00:00 2001 From: yarnom Date: Tue, 29 Jul 2025 14:55:58 +0800 Subject: [PATCH] feat: update forward server --- .../imdroid/sideslope/SideSlopeRtcmApp.java | 2 + .../executor/D331RtcmMessageExecutor.java | 15 +- .../server/tcp/DeviceTcpPortManager.java | 142 +++++------------- 3 files changed, 46 insertions(+), 113 deletions(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/SideSlopeRtcmApp.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/SideSlopeRtcmApp.java index bbc5fb97..548bc4b5 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/SideSlopeRtcmApp.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/SideSlopeRtcmApp.java @@ -6,6 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.ComponentScan; +import org.springframework.scheduling.annotation.EnableScheduling; /** @@ -17,6 +18,7 @@ import org.springframework.context.annotation.ComponentScan; @ComponentScan({"com.imdroid.*"}) @EntityScan({"com.imdroid.*"}) @EnableFeignClients(basePackages = "com.imdroid.*") +@EnableScheduling public class SideSlopeRtcmApp { public static void main(String[] args) { diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java index 5737d77d..26459e83 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java @@ -16,6 +16,7 @@ import com.imdroid.sideslope.server.DeviceChannel; import com.imdroid.sideslope.server.OnlineChannels; import com.imdroid.sideslope.service.DataPersistService; import com.imdroid.sideslope.bd.RtcmGgaUtil; +import com.imdroid.sideslope.config.RtcmPortConfigManager; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.slf4j.Logger; @@ -48,6 +49,8 @@ public class D331RtcmMessageExecutor implements Executor UdpNtripServer ntripServer; @Autowired private DeviceTcpPortManager deviceTcpPortManager; + @Autowired + private RtcmPortConfigManager rtcmPortConfigManager; // 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间 private final Map lastD300ForwardTimeMap = new ConcurrentHashMap<>(); @@ -279,9 +282,10 @@ public class D331RtcmMessageExecutor implements Executor } private void sendToNtrip(String deviceId, String hexData) { - // 首先检查设备是否启用TCP转发 - if (!deviceTcpPortManager.isDeviceEnabled(deviceId)) { - return; // 如果设备未启用,直接返回 + // 检查设备是否启用TCP转发 + if (!rtcmPortConfigManager.isDeviceEnabled(deviceId)) { + logger.debug("Device {} not enabled for TCP forwarding", deviceId); + return; } try { @@ -292,11 +296,6 @@ public class D331RtcmMessageExecutor implements Executor try { byte[] data = ByteUtil.hexStringTobyte(rtcm); deviceTcpPortManager.sendData(deviceId, data); - // 获取端口信息用于日志记录 - int port = deviceTcpPortManager.getOrCreatePort(deviceId); - if (port > 0) { // 只有当端口创建成功时才记录日志 - logger.debug("Forwarded RTCM data for device {} to TCP port {}", deviceId, port); - } } catch (Exception e) { logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage()); } diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java index 79b4a294..f6ca04df 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java @@ -2,121 +2,42 @@ package com.imdroid.sideslope.server.tcp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import java.util.*; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Service public class DeviceTcpPortManager { private static final Logger logger = LoggerFactory.getLogger(DeviceTcpPortManager.class); - @Value("${rtcm.port.start:20000}") - private int startPort; - - @Value("${rtcm.port.end:21000}") - private int endPort; - - @Value("${rtcm.forward.device.ids:}") - private String forwardDeviceIds; - private final Map devicePortMap = new ConcurrentHashMap<>(); private final Map portServerMap = new ConcurrentHashMap<>(); - private Set enabledDevices; - private int currentPort; - - public DeviceTcpPortManager() { - this.currentPort = startPort; - } - - @PostConstruct - public void init() { - // 初始化启用TCP转发的设备ID集合 - enabledDevices = new HashSet<>(); - if (forwardDeviceIds != null && !forwardDeviceIds.trim().isEmpty()) { - String[] ids = forwardDeviceIds.split(","); - enabledDevices.addAll(Arrays.asList(ids)); - logger.info("Enabled TCP forward for devices: {}", enabledDevices); - } - } - - public synchronized int getOrCreatePort(String deviceId) { - // 检查设备是否在启用列表中 - if (!enabledDevices.contains(deviceId)) { - return -1; // 返回-1表示该设备未启用TCP转发 - } - - return devicePortMap.computeIfAbsent(deviceId, id -> { - // 尝试获取配置的固定端口 - int configuredPort = getConfiguredPort(deviceId); - if (configuredPort > 0) { - try { - if (!portServerMap.containsKey(configuredPort)) { - RtcmForwardServer server = new RtcmForwardServer(configuredPort); - server.start(); - portServerMap.put(configuredPort, server); - logger.info("Created new TCP forward server for device {} on configured port {}", deviceId, configuredPort); - return configuredPort; - } - } catch (Exception e) { - logger.error("Failed to create TCP server on configured port {} for device {}", configuredPort, deviceId, e); - } - } - - // 如果没有配置固定端口或固定端口创建失败,使用动态端口 - while (currentPort <= endPort) { - if (!portServerMap.containsKey(currentPort)) { - try { - RtcmForwardServer server = new RtcmForwardServer(currentPort); - server.start(); - portServerMap.put(currentPort, server); - logger.info("Created new TCP forward server for device {} on port {}", deviceId, currentPort); - return currentPort++; - } catch (Exception e) { - logger.error("Failed to create TCP server on port {}, trying next port", currentPort, e); - currentPort++; - } - } - currentPort++; - } - throw new RuntimeException("No available ports"); - }); - } - - private int getConfiguredPort(String deviceId) { - try { - // 从Spring环境中获取设备特定的端口配置 - String portValue = System.getProperty("rtcm.device." + deviceId + ".port"); - if (portValue != null && !portValue.trim().isEmpty()) { - int port = Integer.parseInt(portValue); - if (port >= startPort && port <= endPort) { - return port; - } - } - } catch (Exception e) { - logger.warn("Failed to get configured port for device {}", deviceId, e); - } - return -1; - } - - public void sendData(String deviceId, byte[] data) { - // 只处理启用的设备 - if (!enabledDevices.contains(deviceId)) { + + public synchronized void addDevice(String deviceId, int port) { + if (port <= 0) { + logger.error("Invalid port {} for device {}", port, deviceId); return; } - - Integer port = devicePortMap.get(deviceId); - if (port != null) { - RtcmForwardServer server = portServerMap.get(port); - if (server != null) { - server.broadcast(data); - } + + // 检查端口是否已被其他设备使用 + if (portServerMap.containsKey(port)) { + logger.error("Port {} is already in use", port); + return; + } + + try { + RtcmForwardServer server = new RtcmForwardServer(port); + server.start(); + portServerMap.put(port, server); + devicePortMap.put(deviceId, port); + logger.info("Created new TCP forward server for device {} on port {}", deviceId, port); + } catch (Exception e) { + logger.error("Failed to create TCP server for device {} on port {}", deviceId, port, e); } } - + public void removeDevice(String deviceId) { Integer port = devicePortMap.remove(deviceId); if (port != null) { @@ -127,11 +48,22 @@ public class DeviceTcpPortManager { } } } - - public boolean isDeviceEnabled(String deviceId) { - return enabledDevices.contains(deviceId); + + public void sendData(String deviceId, byte[] data) { + Integer port = devicePortMap.get(deviceId); + if (port != null) { + RtcmForwardServer server = portServerMap.get(port); + if (server != null) { + server.broadcast(data); + logger.debug("Forwarded {} bytes of data for device {} to port {}", data.length, deviceId, port); + } else { + logger.warn("Server not found for device {} on port {}", deviceId, port); + } + } else { + logger.debug("No port mapping found for device {}", deviceId); + } } - + public Map getActiveDevicePorts() { return new HashMap<>(devicePortMap); }