feat: tcpcli v3

This commit is contained in:
yarnom 2025-07-28 17:06:41 +08:00
parent 625019c503
commit a403f8a3b8
2 changed files with 104 additions and 15 deletions

View File

@ -16,7 +16,7 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
private RtcmDistributor rtcmDistributor; private RtcmDistributor rtcmDistributor;
@Autowired @Autowired
private RtcmDataMonitor rtcmDataMonitor; private UdpHandler udpHandler;
private static final int DEFAULT_BASE_PORT = 10000; private static final int DEFAULT_BASE_PORT = 10000;
@ -55,15 +55,15 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
} else if (command.startsWith("remove ")) { } else if (command.startsWith("remove ")) {
String deviceId = command.substring("remove ".length()).trim(); String deviceId = command.substring("remove ".length()).trim();
removeDeviceServer(deviceId); removeDeviceServer(deviceId);
} else if (command.startsWith("monitor ")) { } else if ("startmonitor".equalsIgnoreCase(command)) {
try { startMonitor();
int port = Integer.parseInt(command.substring("monitor ".length()).trim());
startMonitor(port);
} catch (NumberFormatException e) {
System.out.println("端口必须是数字");
}
} else if ("stopmonitor".equalsIgnoreCase(command)) { } else if ("stopmonitor".equalsIgnoreCase(command)) {
stopMonitor(); stopMonitor();
} else if (command.startsWith("monitordevice ")) {
String deviceId = command.substring("monitordevice ".length()).trim();
monitorDevice(deviceId);
} else if ("clearmonitor".equalsIgnoreCase(command)) {
clearMonitoredDevices();
} else { } else {
System.out.println("未知命令,输入 'help' 查看可用命令"); System.out.println("未知命令,输入 'help' 查看可用命令");
} }
@ -79,8 +79,10 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
System.out.println(" list - 列出所有设备及其TCP端口"); System.out.println(" list - 列出所有设备及其TCP端口");
System.out.println(" create <deviceId> [port] - 为指定设备ID创建TCP服务器可选指定端口"); System.out.println(" create <deviceId> [port] - 为指定设备ID创建TCP服务器可选指定端口");
System.out.println(" remove <deviceId> - 移除指定设备的TCP服务器"); System.out.println(" remove <deviceId> - 移除指定设备的TCP服务器");
System.out.println(" monitor <port> - 监控指定UDP端口的RTCM数据"); System.out.println(" startmonitor - 开始监控UDP数据");
System.out.println(" stopmonitor - 停止UDP数据监控"); System.out.println(" stopmonitor - 停止UDP数据监控");
System.out.println(" monitordevice <deviceId> - 添加指定设备ID到监控列表");
System.out.println(" clearmonitor - 清空设备监控列表(监控所有设备)");
System.out.println(" exit - 退出命令行工具(不会停止服务)"); System.out.println(" exit - 退出命令行工具(不会停止服务)");
} }
@ -123,13 +125,28 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
} }
} }
private void startMonitor(int port) { private void startMonitor() {
System.out.println("开始监控UDP端口 " + port + " 的数据..."); udpHandler.enableMonitoring(true);
rtcmDataMonitor.startUdpMonitor(port); System.out.println("UDP数据监控已启用");
} }
private void stopMonitor() { private void stopMonitor() {
rtcmDataMonitor.stopMonitor(); udpHandler.enableMonitoring(false);
System.out.println("已停止UDP数据监控"); System.out.println("UDP数据监控已禁用");
}
private void monitorDevice(String deviceId) {
if (deviceId == null || deviceId.isEmpty()) {
System.out.println("设备ID不能为空");
return;
}
udpHandler.addMonitoredDevice(deviceId);
System.out.println("已添加设备 " + deviceId + " 到监控列表");
}
private void clearMonitoredDevices() {
udpHandler.clearMonitoredDevices();
System.out.println("已清空设备监控列表,将监控所有设备");
} }
} }

View File

@ -14,6 +14,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* @author Layton * @author Layton
* @date 2023/2/13 11:47 * @date 2023/2/13 11:47
@ -23,6 +27,32 @@ import org.springframework.stereotype.Component;
public class UdpHandler extends ChannelInboundHandlerAdapter { public class UdpHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(UdpHandler.class); private static final Logger logger = LoggerFactory.getLogger(UdpHandler.class);
// 数据监控标志
private AtomicBoolean monitoringEnabled = new AtomicBoolean(false);
private Set<String> monitoredDevices = new HashSet<>();
public void enableMonitoring(boolean enable) {
monitoringEnabled.set(enable);
if (enable) {
logger.info("UDP数据监控已启用");
} else {
logger.info("UDP数据监控已禁用");
monitoredDevices.clear();
}
}
public void addMonitoredDevice(String deviceId) {
if (deviceId != null && !deviceId.isEmpty()) {
monitoredDevices.add(deviceId);
logger.info("添加设备 {} 到监控列表", deviceId);
}
}
public void clearMonitoredDevices() {
monitoredDevices.clear();
logger.info("清空设备监控列表");
}
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
@ -31,6 +61,49 @@ public class UdpHandler extends ChannelInboundHandlerAdapter {
if (packet.content() == null) { if (packet.content() == null) {
return; return;
} }
// 如果监控已启用记录数据包信息
if (monitoringEnabled.get()) {
byte[] data = new byte[packet.content().readableBytes()];
packet.content().getBytes(0, data);
String sender = packet.sender().getAddress().getHostAddress() + ":" + packet.sender().getPort();
// 尝试解析设备ID (假设前4字节是header和len接下来4字节是设备ID)
String deviceId = "unknown";
if (data.length >= 8) {
try {
int pos = 4; // 跳过header和len
long id = ((data[pos] & 0xFFL) << 24) |
((data[pos+1] & 0xFFL) << 16) |
((data[pos+2] & 0xFFL) << 8) |
(data[pos+3] & 0xFFL);
deviceId = String.valueOf(id);
} catch (Exception e) {
// 解析失败保持unknown
}
}
// 如果没有指定监控设备或者当前设备在监控列表中
if (monitoredDevices.isEmpty() || monitoredDevices.contains(deviceId)) {
StringBuilder hexDump = new StringBuilder();
int limit = Math.min(50, data.length);
for (int i = 0; i < limit; i++) {
hexDump.append(String.format("%02X ", data[i] & 0xFF));
if ((i + 1) % 16 == 0) {
hexDump.append("\n");
}
}
if (data.length > limit) {
hexDump.append("... (").append(data.length - limit).append(" more bytes)");
}
logger.info("收到来自 {} 的UDP数据包长度: {} 字节设备ID: {}\n数据内容:\n{}",
sender, data.length, deviceId, hexDump.toString());
}
}
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
byte[] data = new byte[packet.content().readableBytes()]; byte[] data = new byte[packet.content().readableBytes()];
packet.content().getBytes(0, data); packet.content().getBytes(0, data);
@ -56,5 +129,4 @@ public class UdpHandler extends ChannelInboundHandlerAdapter {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
logger.error("Exception caught: {}", cause.toString()); logger.error("Exception caught: {}", cause.toString());
} }
} }