diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java index 26459e83..c4d6f89d 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java @@ -284,25 +284,39 @@ public class D331RtcmMessageExecutor implements Executor private void sendToNtrip(String deviceId, String hexData) { // 检查设备是否启用TCP转发 if (!rtcmPortConfigManager.isDeviceEnabled(deviceId)) { - logger.debug("Device {} not enabled for TCP forwarding", deviceId); + logger.info("Device {} not enabled for TCP forwarding", deviceId); return; } try { + // 添加原始数据日志 + logger.info("Processing hex data for device {}, data length: {}", deviceId, hexData.length()); + if (logger.isDebugEnabled()) { + logger.debug("Raw hex data: {}", hexData); + } + // 提取RTCM数据并通过TCP转发 List rtcmList = RtcmGgaUtil.getRtcms(hexData); + logger.info("Extracted {} RTCM messages for device {}", rtcmList != null ? rtcmList.size() : 0, deviceId); + if (rtcmList != null && !rtcmList.isEmpty()) { for (String rtcm : rtcmList) { try { byte[] data = ByteUtil.hexStringTobyte(rtcm); + logger.info("Forwarding RTCM data for device {}, data length: {}", deviceId, data.length); + if (logger.isDebugEnabled()) { + logger.debug("RTCM data: {}", rtcm); + } deviceTcpPortManager.sendData(deviceId, data); } catch (Exception e) { - logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage()); + logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage(), e); } } + } else { + logger.info("No RTCM data found in message for device {}", deviceId); } } catch (Exception e) { - logger.error("处理RTCM数据失败, 设备ID: {}, 错误: {}", deviceId, e.getMessage()); + logger.error("处理RTCM数据失败, 设备ID: {}, 错误: {}", deviceId, e.getMessage(), e); } } diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java index f6ca04df..eb9803b0 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/DeviceTcpPortManager.java @@ -28,43 +28,71 @@ public class DeviceTcpPortManager { } try { + logger.info("Creating new TCP forward server for device {} on port {}", deviceId, port); RtcmForwardServer server = new RtcmForwardServer(port); server.start(); portServerMap.put(port, server); devicePortMap.put(deviceId, port); - logger.info("Created new TCP forward server for device {} on port {}", deviceId, port); + logger.info("Successfully created TCP forward server for device {} on port {}", deviceId, port); } catch (Exception e) { - logger.error("Failed to create TCP server for device {} on port {}", deviceId, port, e); + logger.error("Failed to create TCP server for device {} on port {}: {}", deviceId, port, e.getMessage(), e); } } public void removeDevice(String deviceId) { Integer port = devicePortMap.remove(deviceId); if (port != null) { + logger.info("Removing TCP forward server for device {} on port {}", deviceId, port); RtcmForwardServer server = portServerMap.remove(port); if (server != null) { server.stop(); - logger.info("Removed TCP forward server for device {} on port {}", deviceId, port); - } - } - } - - public void sendData(String deviceId, byte[] data) { - Integer port = devicePortMap.get(deviceId); - if (port != null) { - RtcmForwardServer server = portServerMap.get(port); - if (server != null) { - server.broadcast(data); - logger.debug("Forwarded {} bytes of data for device {} to port {}", data.length, deviceId, port); + logger.info("Successfully removed TCP forward server for device {} on port {}", deviceId, port); } else { logger.warn("Server not found for device {} on port {}", deviceId, port); } } else { - logger.debug("No port mapping found for device {}", deviceId); + logger.info("No port mapping found for device {} to remove", deviceId); + } + } + + public void sendData(String deviceId, byte[] data) { + if (data == null || data.length == 0) { + logger.warn("Empty data received for device {}", deviceId); + return; + } + + Integer port = devicePortMap.get(deviceId); + if (port != null) { + RtcmForwardServer server = portServerMap.get(port); + if (server != null) { + try { + server.broadcast(data); + logger.info("Successfully forwarded {} bytes of data for device {} to port {}", + data.length, deviceId, port); + } catch (Exception e) { + logger.error("Error broadcasting data for device {} on port {}: {}", + deviceId, port, e.getMessage(), e); + } + } else { + logger.error("Server not found for device {} on port {}", deviceId, port); + // 尝试重新创建服务器 + logger.info("Attempting to recreate server for device {} on port {}", deviceId, port); + addDevice(deviceId, port); + } + } else { + logger.error("No port mapping found for device {}", deviceId); } } public Map getActiveDevicePorts() { return new HashMap<>(devicePortMap); } + + public boolean hasDevice(String deviceId) { + return devicePortMap.containsKey(deviceId); + } + + public Integer getDevicePort(String deviceId) { + return devicePortMap.get(deviceId); + } } \ No newline at end of file diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmForwardServer.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmForwardServer.java index db353fb0..2011125d 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmForwardServer.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmForwardServer.java @@ -20,6 +20,7 @@ public class RtcmForwardServer { private final EventLoopGroup workerGroup; private Channel serverChannel; private final ConcurrentMap connectedClients; + private volatile boolean isRunning = false; public RtcmForwardServer(int port) { this.port = port; @@ -28,14 +29,21 @@ public class RtcmForwardServer { this.connectedClients = new ConcurrentHashMap<>(); } - public void start() { + public synchronized void start() { + if (isRunning) { + logger.warn("Server on port {} is already running", port); + return; + } + try { + logger.info("Starting RTCM forward server on port {}", port); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { + logger.info("New client connection on port {}: {}", port, ch.remoteAddress()); ChannelPipeline p = ch.pipeline(); p.addLast(new ByteArrayEncoder()); p.addLast(new SimpleChannelInboundHandler() { @@ -54,11 +62,12 @@ public class RtcmForwardServer { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { // 我们不需要处理来自客户端的数据 + logger.debug("Received data from client on port {}: {}", port, ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - logger.error("Error on port " + port, cause); + logger.error("Error on port {} for client {}: {}", port, ctx.channel().remoteAddress(), cause.getMessage(), cause); ctx.close(); } }); @@ -68,44 +77,95 @@ public class RtcmForwardServer { .childOption(ChannelOption.SO_KEEPALIVE, true); serverChannel = b.bind(port).sync().channel(); - logger.info("RTCM forward server started on port {}", port); + isRunning = true; + logger.info("RTCM forward server successfully started on port {}", port); } catch (Exception e) { - logger.error("Failed to start RTCM forward server on port " + port, e); - throw new RuntimeException("Failed to start server", e); + logger.error("Failed to start RTCM forward server on port {}: {}", port, e.getMessage(), e); + throw new RuntimeException("Failed to start server on port " + port, e); } } public void broadcast(byte[] data) { - if (data == null || data.length == 0) { + if (!isRunning) { + logger.error("Cannot broadcast data - server on port {} is not running", port); return; } - - connectedClients.keySet().forEach(channel -> { + + if (data == null || data.length == 0) { + logger.warn("Attempted to broadcast empty data on port {}", port); + return; + } + + int activeClients = 0; + int successfulBroadcasts = 0; + + for (Channel channel : connectedClients.keySet()) { if (channel.isActive()) { - channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> { - if (!future.isSuccess()) { - logger.error("Failed to send data to client on port " + port, future.cause()); - future.channel().close(); - } - }); + activeClients++; + try { + channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + logger.debug("Successfully sent {} bytes to client {} on port {}", + data.length, future.channel().remoteAddress(), port); + } else { + logger.error("Failed to send data to client {} on port {}: {}", + future.channel().remoteAddress(), port, future.cause().getMessage()); + future.channel().close(); + } + }); + successfulBroadcasts++; + } catch (Exception e) { + logger.error("Error broadcasting to client on port {}: {}", port, e.getMessage(), e); + } } - }); + } + + logger.info("Broadcast {} bytes to {}/{} clients on port {}", + data.length, successfulBroadcasts, activeClients, port); } - public void stop() { + public synchronized void stop() { + if (!isRunning) { + logger.warn("Server on port {} is already stopped", port); + return; + } + + logger.info("Stopping RTCM forward server on port {}", port); try { if (serverChannel != null) { serverChannel.close().sync(); } - connectedClients.keySet().forEach(Channel::close); + + // 关闭所有客户端连接 + int closedConnections = 0; + for (Channel channel : connectedClients.keySet()) { + try { + channel.close().sync(); + closedConnections++; + } catch (Exception e) { + logger.warn("Error closing client connection on port {}: {}", port, e.getMessage()); + } + } connectedClients.clear(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); - logger.info("RTCM forward server stopped on port {}", port); + isRunning = false; + logger.info("Successfully stopped RTCM forward server on port {}, closed {} client connections", + port, closedConnections); } catch (Exception e) { - logger.error("Error stopping RTCM forward server on port " + port, e); + logger.error("Error stopping RTCM forward server on port {}: {}", port, e.getMessage(), e); } } + + public boolean isRunning() { + return isRunning; + } + + public int getActiveClientCount() { + return (int) connectedClients.keySet().stream() + .filter(Channel::isActive) + .count(); + } } \ No newline at end of file