From a403f8a3b88e00e145a547ec66973b9bce2cb850 Mon Sep 17 00:00:00 2001 From: yarnom Date: Mon, 28 Jul 2025 17:06:41 +0800 Subject: [PATCH] feat: tcpcli v3 --- .../service/RtcmCommandLineRunner.java | 45 +++++++---- .../ntripproxy/service/UdpHandler.java | 74 ++++++++++++++++++- 2 files changed, 104 insertions(+), 15 deletions(-) diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java index bfbe1779..6744ae96 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java @@ -16,7 +16,7 @@ public class RtcmCommandLineRunner implements CommandLineRunner { private RtcmDistributor rtcmDistributor; @Autowired - private RtcmDataMonitor rtcmDataMonitor; + private UdpHandler udpHandler; private static final int DEFAULT_BASE_PORT = 10000; @@ -55,15 +55,15 @@ public class RtcmCommandLineRunner implements CommandLineRunner { } else if (command.startsWith("remove ")) { String deviceId = command.substring("remove ".length()).trim(); removeDeviceServer(deviceId); - } else if (command.startsWith("monitor ")) { - try { - int port = Integer.parseInt(command.substring("monitor ".length()).trim()); - startMonitor(port); - } catch (NumberFormatException e) { - System.out.println("端口必须是数字"); - } + } else if ("startmonitor".equalsIgnoreCase(command)) { + startMonitor(); } else if ("stopmonitor".equalsIgnoreCase(command)) { stopMonitor(); + } else if (command.startsWith("monitordevice ")) { + String deviceId = command.substring("monitordevice ".length()).trim(); + monitorDevice(deviceId); + } else if ("clearmonitor".equalsIgnoreCase(command)) { + clearMonitoredDevices(); } else { System.out.println("未知命令,输入 'help' 查看可用命令"); } @@ -79,8 +79,10 @@ public class RtcmCommandLineRunner implements CommandLineRunner { System.out.println(" list - 列出所有设备及其TCP端口"); System.out.println(" create [port] - 为指定设备ID创建TCP服务器,可选指定端口"); System.out.println(" remove - 移除指定设备的TCP服务器"); - System.out.println(" monitor - 监控指定UDP端口的RTCM数据"); + System.out.println(" startmonitor - 开始监控UDP数据"); System.out.println(" stopmonitor - 停止UDP数据监控"); + System.out.println(" monitordevice - 添加指定设备ID到监控列表"); + System.out.println(" clearmonitor - 清空设备监控列表(监控所有设备)"); System.out.println(" exit - 退出命令行工具(不会停止服务)"); } @@ -123,13 +125,28 @@ public class RtcmCommandLineRunner implements CommandLineRunner { } } - private void startMonitor(int port) { - System.out.println("开始监控UDP端口 " + port + " 的数据..."); - rtcmDataMonitor.startUdpMonitor(port); + private void startMonitor() { + udpHandler.enableMonitoring(true); + System.out.println("UDP数据监控已启用"); } private void stopMonitor() { - rtcmDataMonitor.stopMonitor(); - System.out.println("已停止UDP数据监控"); + udpHandler.enableMonitoring(false); + System.out.println("UDP数据监控已禁用"); + } + + private void monitorDevice(String deviceId) { + if (deviceId == null || deviceId.isEmpty()) { + System.out.println("设备ID不能为空"); + return; + } + + udpHandler.addMonitoredDevice(deviceId); + System.out.println("已添加设备 " + deviceId + " 到监控列表"); + } + + private void clearMonitoredDevices() { + udpHandler.clearMonitoredDevices(); + System.out.println("已清空设备监控列表,将监控所有设备"); } } \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/UdpHandler.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/UdpHandler.java index 3b53203c..f4f909f6 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/UdpHandler.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/UdpHandler.java @@ -14,6 +14,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + /** * @author Layton * @date 2023/2/13 11:47 @@ -23,6 +27,32 @@ import org.springframework.stereotype.Component; public class UdpHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(UdpHandler.class); + + // 数据监控标志 + private AtomicBoolean monitoringEnabled = new AtomicBoolean(false); + private Set monitoredDevices = new HashSet<>(); + + public void enableMonitoring(boolean enable) { + monitoringEnabled.set(enable); + if (enable) { + logger.info("UDP数据监控已启用"); + } else { + logger.info("UDP数据监控已禁用"); + monitoredDevices.clear(); + } + } + + public void addMonitoredDevice(String deviceId) { + if (deviceId != null && !deviceId.isEmpty()) { + monitoredDevices.add(deviceId); + logger.info("添加设备 {} 到监控列表", deviceId); + } + } + + public void clearMonitoredDevices() { + monitoredDevices.clear(); + logger.info("清空设备监控列表"); + } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -31,6 +61,49 @@ public class UdpHandler extends ChannelInboundHandlerAdapter { if (packet.content() == null) { return; } + + // 如果监控已启用,记录数据包信息 + if (monitoringEnabled.get()) { + byte[] data = new byte[packet.content().readableBytes()]; + packet.content().getBytes(0, data); + String sender = packet.sender().getAddress().getHostAddress() + ":" + packet.sender().getPort(); + + // 尝试解析设备ID (假设前4字节是header和len,接下来4字节是设备ID) + String deviceId = "unknown"; + if (data.length >= 8) { + try { + int pos = 4; // 跳过header和len + long id = ((data[pos] & 0xFFL) << 24) | + ((data[pos+1] & 0xFFL) << 16) | + ((data[pos+2] & 0xFFL) << 8) | + (data[pos+3] & 0xFFL); + deviceId = String.valueOf(id); + } catch (Exception e) { + // 解析失败,保持unknown + } + } + + // 如果没有指定监控设备或者当前设备在监控列表中 + if (monitoredDevices.isEmpty() || monitoredDevices.contains(deviceId)) { + StringBuilder hexDump = new StringBuilder(); + int limit = Math.min(50, data.length); + + for (int i = 0; i < limit; i++) { + hexDump.append(String.format("%02X ", data[i] & 0xFF)); + if ((i + 1) % 16 == 0) { + hexDump.append("\n"); + } + } + + if (data.length > limit) { + hexDump.append("... (").append(data.length - limit).append(" more bytes)"); + } + + logger.info("收到来自 {} 的UDP数据包,长度: {} 字节,设备ID: {}\n数据内容:\n{}", + sender, data.length, deviceId, hexDump.toString()); + } + } + if (logger.isDebugEnabled()) { byte[] data = new byte[packet.content().readableBytes()]; packet.content().getBytes(0, data); @@ -56,5 +129,4 @@ public class UdpHandler extends ChannelInboundHandlerAdapter { super.exceptionCaught(ctx, cause); logger.error("Exception caught: {}", cause.toString()); } - }