From a35cb1a43113941da33804425e930350c0350ea5 Mon Sep 17 00:00:00 2001 From: yarnom Date: Tue, 29 Jul 2025 10:01:51 +0800 Subject: [PATCH] feat: update forward server --- .../executor/D331RtcmMessageExecutor.java | 41 ++++-- .../server/tcp/DeviceTcpPortManager.java | 138 ++++++++++++++++++ .../server/tcp/RtcmForwardServer.java | 111 ++++++++++++++ .../imdroid/sideslope/web/ApiController.java | 10 ++ 4 files changed, 286 insertions(+), 14 deletions(-) create mode 100644 sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java create mode 100644 sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmForwardServer.java diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java index 3cd53ee5..5737d77d 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java @@ -9,6 +9,7 @@ import com.imdroid.sideslope.bd.Gga; import com.imdroid.sideslope.bd.Rtcm1005; import com.imdroid.sideslope.message.D331RtcmMessage; import com.imdroid.sideslope.ntrip.UdpNtripServer; +import com.imdroid.sideslope.server.tcp.DeviceTcpPortManager; import com.imdroid.sideslope.service.Device; import com.imdroid.sideslope.service.DeviceService; import com.imdroid.sideslope.server.DeviceChannel; @@ -45,6 +46,8 @@ public class D331RtcmMessageExecutor implements Executor private DataPersistService dataPersistService; @Autowired UdpNtripServer ntripServer; + @Autowired + private DeviceTcpPortManager deviceTcpPortManager; // 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间 private final Map lastD300ForwardTimeMap = new ConcurrentHashMap<>(); @@ -275,22 +278,32 @@ public class D331RtcmMessageExecutor implements Executor return null; } - private void sendToNtrip(String mountpoint, String hexData) { + private void sendToNtrip(String deviceId, String hexData) { + // 首先检查设备是否启用TCP转发 + if (!deviceTcpPortManager.isDeviceEnabled(deviceId)) { + return; // 如果设备未启用,直接返回 + } + try { - - // 将原始字节转换为16进制字符串用于RTCM提取 - //String hexData = ByteUtil.bytesToHexString(rawData); - //System.out.println(hexData); - - // 提取RTCM数据并发送到NtripServer,使用设备ID作为挂载点 - Optional.ofNullable(RtcmGgaUtil.getRtcms(hexData)) - .ifPresent(rtcm -> { - //System.out.println("挂载点: " + mountpoint); - //System.out.println("RTCM数据: " + rtcm); - ntripServer.send(mountpoint, rtcm); - }); + // 提取RTCM数据并通过TCP转发 + List rtcmList = RtcmGgaUtil.getRtcms(hexData); + if (rtcmList != null && !rtcmList.isEmpty()) { + for (String rtcm : rtcmList) { + try { + byte[] data = ByteUtil.hexStringTobyte(rtcm); + deviceTcpPortManager.sendData(deviceId, data); + // 获取端口信息用于日志记录 + int port = deviceTcpPortManager.getOrCreatePort(deviceId); + if (port > 0) { // 只有当端口创建成功时才记录日志 + logger.debug("Forwarded RTCM data for device {} to TCP port {}", deviceId, port); + } + } catch (Exception e) { + logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage()); + } + } + } } catch (Exception e) { - logger.error("处理NTRIP数据失败, 挂载点: {}, 错误: {}", mountpoint, e.getMessage()); + logger.error("处理RTCM数据失败, 设备ID: {}, 错误: {}", deviceId, e.getMessage()); } } diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java new file mode 100644 index 00000000..79b4a294 --- /dev/null +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java @@ -0,0 +1,138 @@ +package com.imdroid.sideslope.server.tcp; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class DeviceTcpPortManager { + private static final Logger logger = LoggerFactory.getLogger(DeviceTcpPortManager.class); + + @Value("${rtcm.port.start:20000}") + private int startPort; + + @Value("${rtcm.port.end:21000}") + private int endPort; + + @Value("${rtcm.forward.device.ids:}") + private String forwardDeviceIds; + + private final Map devicePortMap = new ConcurrentHashMap<>(); + private final Map portServerMap = new ConcurrentHashMap<>(); + private Set enabledDevices; + private int currentPort; + + public DeviceTcpPortManager() { + this.currentPort = startPort; + } + + @PostConstruct + public void init() { + // 初始化启用TCP转发的设备ID集合 + enabledDevices = new HashSet<>(); + if (forwardDeviceIds != null && !forwardDeviceIds.trim().isEmpty()) { + String[] ids = forwardDeviceIds.split(","); + enabledDevices.addAll(Arrays.asList(ids)); + logger.info("Enabled TCP forward for devices: {}", enabledDevices); + } + } + + public synchronized int getOrCreatePort(String deviceId) { + // 检查设备是否在启用列表中 + if (!enabledDevices.contains(deviceId)) { + return -1; // 返回-1表示该设备未启用TCP转发 + } + + return devicePortMap.computeIfAbsent(deviceId, id -> { + // 尝试获取配置的固定端口 + int configuredPort = getConfiguredPort(deviceId); + if (configuredPort > 0) { + try { + if (!portServerMap.containsKey(configuredPort)) { + RtcmForwardServer server = new RtcmForwardServer(configuredPort); + server.start(); + portServerMap.put(configuredPort, server); + logger.info("Created new TCP forward server for device {} on configured port {}", deviceId, configuredPort); + return configuredPort; + } + } catch (Exception e) { + logger.error("Failed to create TCP server on configured port {} for device {}", configuredPort, deviceId, e); + } + } + + // 如果没有配置固定端口或固定端口创建失败,使用动态端口 + while (currentPort <= endPort) { + if (!portServerMap.containsKey(currentPort)) { + try { + RtcmForwardServer server = new RtcmForwardServer(currentPort); + server.start(); + portServerMap.put(currentPort, server); + logger.info("Created new TCP forward server for device {} on port {}", deviceId, currentPort); + return currentPort++; + } catch (Exception e) { + logger.error("Failed to create TCP server on port {}, trying next port", currentPort, e); + currentPort++; + } + } + currentPort++; + } + throw new RuntimeException("No available ports"); + }); + } + + private int getConfiguredPort(String deviceId) { + try { + // 从Spring环境中获取设备特定的端口配置 + String portValue = System.getProperty("rtcm.device." + deviceId + ".port"); + if (portValue != null && !portValue.trim().isEmpty()) { + int port = Integer.parseInt(portValue); + if (port >= startPort && port <= endPort) { + return port; + } + } + } catch (Exception e) { + logger.warn("Failed to get configured port for device {}", deviceId, e); + } + return -1; + } + + public void sendData(String deviceId, byte[] data) { + // 只处理启用的设备 + if (!enabledDevices.contains(deviceId)) { + return; + } + + Integer port = devicePortMap.get(deviceId); + if (port != null) { + RtcmForwardServer server = portServerMap.get(port); + if (server != null) { + server.broadcast(data); + } + } + } + + public void removeDevice(String deviceId) { + Integer port = devicePortMap.remove(deviceId); + if (port != null) { + RtcmForwardServer server = portServerMap.remove(port); + if (server != null) { + server.stop(); + logger.info("Removed TCP forward server for device {} on port {}", deviceId, port); + } + } + } + + public boolean isDeviceEnabled(String deviceId) { + return enabledDevices.contains(deviceId); + } + + public Map getActiveDevicePorts() { + return new HashMap<>(devicePortMap); + } +} \ No newline at end of file diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmForwardServer.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmForwardServer.java new file mode 100644 index 00000000..db353fb0 --- /dev/null +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmForwardServer.java @@ -0,0 +1,111 @@ +package com.imdroid.sideslope.server.tcp; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.bytes.ByteArrayEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class RtcmForwardServer { + private static final Logger logger = LoggerFactory.getLogger(RtcmForwardServer.class); + + private final int port; + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + private Channel serverChannel; + private final ConcurrentMap connectedClients; + + public RtcmForwardServer(int port) { + this.port = port; + this.bossGroup = new NioEventLoopGroup(1); + this.workerGroup = new NioEventLoopGroup(); + this.connectedClients = new ConcurrentHashMap<>(); + } + + public void start() { + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new ByteArrayEncoder()); + p.addLast(new SimpleChannelInboundHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + connectedClients.put(ctx.channel(), true); + logger.info("Client connected to port {}: {}", port, ctx.channel().remoteAddress()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + connectedClients.remove(ctx.channel()); + logger.info("Client disconnected from port {}: {}", port, ctx.channel().remoteAddress()); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + // 我们不需要处理来自客户端的数据 + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("Error on port " + port, cause); + ctx.close(); + } + }); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + serverChannel = b.bind(port).sync().channel(); + logger.info("RTCM forward server started on port {}", port); + } catch (Exception e) { + logger.error("Failed to start RTCM forward server on port " + port, e); + throw new RuntimeException("Failed to start server", e); + } + } + + public void broadcast(byte[] data) { + if (data == null || data.length == 0) { + return; + } + + connectedClients.keySet().forEach(channel -> { + if (channel.isActive()) { + channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + logger.error("Failed to send data to client on port " + port, future.cause()); + future.channel().close(); + } + }); + } + }); + } + + public void stop() { + try { + if (serverChannel != null) { + serverChannel.close().sync(); + } + connectedClients.keySet().forEach(Channel::close); + connectedClients.clear(); + + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + + logger.info("RTCM forward server stopped on port {}", port); + } catch (Exception e) { + logger.error("Error stopping RTCM forward server on port " + port, e); + } + } +} \ No newline at end of file diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/web/ApiController.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/web/ApiController.java index d0dc17bf..5b0d5666 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/web/ApiController.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/web/ApiController.java @@ -4,6 +4,7 @@ import com.imdroid.common.util.DataTypeUtil; import com.imdroid.secapi.client.HttpResp; import com.imdroid.sideslope.calc.MultiLineGNSSCalcService; import com.imdroid.sideslope.calc.SingleLineGNSSCalcService; +import com.imdroid.sideslope.server.tcp.DeviceTcpPortManager; import com.imdroid.sideslope.service.Device; import com.imdroid.sideslope.service.LocalDeviceServiceImpl; import com.imdroid.sideslope.server.DeviceChannel; @@ -44,6 +45,8 @@ public class ApiController { MultiLineGNSSCalcService multiCalcService; @Autowired GroupParaService groupParaService; + @Autowired + private DeviceTcpPortManager deviceTcpPortManager; @PostMapping(value = "/config") public HttpResp config(String deviceId, String configuration) { @@ -153,6 +156,13 @@ public class ApiController { return resp; } + @GetMapping("/tcp/ports") + public Map getTcpPorts() { + Map result = new HashMap<>(); + result.put("enabled_devices", deviceTcpPortManager.getActiveDevicePorts()); + return result; + } + @PostMapping("/device_param_changed") public HttpResp deviceParamChanged(String deviceId, String oldParentId) { // 更新设备缓存