From 625019c5035aa3fbb2e165754a9d3109df6d28d0 Mon Sep 17 00:00:00 2001 From: yarnom Date: Mon, 28 Jul 2025 17:02:02 +0800 Subject: [PATCH] feat: tcpcli v2 --- .../service/RtcmCommandLineRunner.java | 24 ++++ .../ntripproxy/service/RtcmDataMonitor.java | 109 ++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDataMonitor.java diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java index 4eba2a4a..bfbe1779 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java @@ -15,6 +15,9 @@ public class RtcmCommandLineRunner implements CommandLineRunner { @Autowired private RtcmDistributor rtcmDistributor; + @Autowired + private RtcmDataMonitor rtcmDataMonitor; + private static final int DEFAULT_BASE_PORT = 10000; @Override @@ -52,6 +55,15 @@ public class RtcmCommandLineRunner implements CommandLineRunner { } else if (command.startsWith("remove ")) { String deviceId = command.substring("remove ".length()).trim(); removeDeviceServer(deviceId); + } else if (command.startsWith("monitor ")) { + try { + int port = Integer.parseInt(command.substring("monitor ".length()).trim()); + startMonitor(port); + } catch (NumberFormatException e) { + System.out.println("端口必须是数字"); + } + } else if ("stopmonitor".equalsIgnoreCase(command)) { + stopMonitor(); } else { System.out.println("未知命令,输入 'help' 查看可用命令"); } @@ -67,6 +79,8 @@ public class RtcmCommandLineRunner implements CommandLineRunner { System.out.println(" list - 列出所有设备及其TCP端口"); System.out.println(" create [port] - 为指定设备ID创建TCP服务器,可选指定端口"); System.out.println(" remove - 移除指定设备的TCP服务器"); + System.out.println(" monitor - 监控指定UDP端口的RTCM数据"); + System.out.println(" stopmonitor - 停止UDP数据监控"); System.out.println(" exit - 退出命令行工具(不会停止服务)"); } @@ -108,4 +122,14 @@ public class RtcmCommandLineRunner implements CommandLineRunner { System.out.println("设备 " + deviceId + " 没有TCP服务器"); } } + + private void startMonitor(int port) { + System.out.println("开始监控UDP端口 " + port + " 的数据..."); + rtcmDataMonitor.startUdpMonitor(port); + } + + private void stopMonitor() { + rtcmDataMonitor.stopMonitor(); + System.out.println("已停止UDP数据监控"); + } } \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDataMonitor.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDataMonitor.java new file mode 100644 index 00000000..778d3e1f --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDataMonitor.java @@ -0,0 +1,109 @@ +package com.imdroid.ntripproxy.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; + +@Component +public class RtcmDataMonitor { + private static final Logger logger = LoggerFactory.getLogger(RtcmDataMonitor.class); + private static final int BUFFER_SIZE = 4096; + + private DatagramSocket monitorSocket; + private boolean running = false; + + public void startUdpMonitor(int port) { + if (running) { + logger.info("UDP监控已经在运行中"); + return; + } + + Thread monitorThread = new Thread(() -> { + try { + monitorSocket = new DatagramSocket(null); + monitorSocket.setReuseAddress(true); + monitorSocket.bind(new InetSocketAddress(port)); + + byte[] buffer = new byte[BUFFER_SIZE]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + + running = true; + logger.info("开始监控UDP端口 {} 的数据", port); + + while (running) { + try { + monitorSocket.receive(packet); + byte[] data = new byte[packet.getLength()]; + System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength()); + + // 打印数据信息 + String sender = packet.getAddress().getHostAddress() + ":" + packet.getPort(); + printPacketInfo(sender, data); + + } catch (IOException e) { + if (running) { + logger.error("接收UDP数据时出错", e); + } + } + } + } catch (IOException e) { + logger.error("创建UDP监控套接字失败", e); + } finally { + if (monitorSocket != null) { + monitorSocket.close(); + } + running = false; + } + }); + + monitorThread.setDaemon(true); + monitorThread.start(); + } + + public void stopMonitor() { + running = false; + if (monitorSocket != null) { + monitorSocket.close(); + } + logger.info("停止UDP数据监控"); + } + + private void printPacketInfo(String sender, byte[] data) { + StringBuilder hexDump = new StringBuilder(); + int limit = Math.min(50, data.length); + + for (int i = 0; i < limit; i++) { + hexDump.append(String.format("%02X ", data[i] & 0xFF)); + if ((i + 1) % 16 == 0) { + hexDump.append("\n"); + } + } + + if (data.length > limit) { + hexDump.append("... (").append(data.length - limit).append(" more bytes)"); + } + + // 尝试解析设备ID (假设前4字节是header和len,接下来4字节是设备ID) + String deviceId = "unknown"; + if (data.length >= 8) { + try { + int pos = 4; // 跳过header和len + long id = ((data[pos] & 0xFFL) << 24) | + ((data[pos+1] & 0xFFL) << 16) | + ((data[pos+2] & 0xFFL) << 8) | + (data[pos+3] & 0xFFL); + deviceId = String.valueOf(id); + } catch (Exception e) { + // 解析失败,保持unknown + } + } + + logger.info("收到来自 {} 的UDP数据包,长度: {} 字节,可能的设备ID: {}\n数据内容:\n{}", + sender, data.length, deviceId, hexDump.toString()); + } +} \ No newline at end of file