From b4d81245c5e1e7f5125e0856a962a19508caae87 Mon Sep 17 00:00:00 2001 From: yarnom Date: Mon, 28 Jul 2025 17:20:26 +0800 Subject: [PATCH] feat: tcpcli v4 --- .../service/RtcmCommandLineRunner.java | 40 ++++++++- .../ntripproxy/service/RtcmDistributor.java | 38 ++++++++- .../ntripproxy/service/RtcmUdpHandler.java | 83 +++++++++++++++++++ .../ntripproxy/service/RtcmUdpServer.java | 49 +++++++++++ 4 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmUdpHandler.java create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmUdpServer.java diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java index 6744ae96..02acefad 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java @@ -18,6 +18,9 @@ public class RtcmCommandLineRunner implements CommandLineRunner { @Autowired private UdpHandler udpHandler; + @Autowired + private RtcmUdpHandler rtcmUdpHandler; + private static final int DEFAULT_BASE_PORT = 10000; @Override @@ -64,6 +67,9 @@ public class RtcmCommandLineRunner implements CommandLineRunner { monitorDevice(deviceId); } else if ("clearmonitor".equalsIgnoreCase(command)) { clearMonitoredDevices(); + } else if (command.startsWith("setdefault ")) { + String deviceId = command.substring("setdefault ".length()).trim(); + setDefaultDevice(deviceId); } else { System.out.println("未知命令,输入 'help' 查看可用命令"); } @@ -83,6 +89,7 @@ public class RtcmCommandLineRunner implements CommandLineRunner { System.out.println(" stopmonitor - 停止UDP数据监控"); System.out.println(" monitordevice - 添加指定设备ID到监控列表"); System.out.println(" clearmonitor - 清空设备监控列表(监控所有设备)"); + System.out.println(" setdefault - 设置默认设备ID,9903端口的RTCM数据将转发给此设备"); System.out.println(" exit - 退出命令行工具(不会停止服务)"); } @@ -93,7 +100,11 @@ public class RtcmCommandLineRunner implements CommandLineRunner { } else { System.out.println("设备TCP服务器列表:"); for (Map.Entry entry : devicePorts.entrySet()) { - System.out.println(" 设备ID: " + entry.getKey() + ", TCP端口: " + entry.getValue()); + String defaultMark = ""; + if (entry.getKey().equals(rtcmUdpHandler.getDefaultDeviceId())) { + defaultMark = " (默认设备)"; + } + System.out.println(" 设备ID: " + entry.getKey() + ", TCP端口: " + entry.getValue() + defaultMark); } } } @@ -109,6 +120,11 @@ public class RtcmCommandLineRunner implements CommandLineRunner { System.out.println("可以在rtklib配置文件中使用以下设置:"); System.out.println("inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)"); System.out.println("inpstr1-path =127.0.0.1:" + port); + + // 如果没有默认设备,自动设置为默认设备 + if (rtcmUdpHandler.getDefaultDeviceId() == null) { + setDefaultDevice(deviceId); + } } private void removeDeviceServer(String deviceId) { @@ -120,6 +136,12 @@ public class RtcmCommandLineRunner implements CommandLineRunner { boolean removed = rtcmDistributor.removeDeviceServer(deviceId); if (removed) { System.out.println("已移除设备 " + deviceId + " 的TCP服务器"); + + // 如果移除的是默认设备,清除默认设备设置 + if (deviceId.equals(rtcmUdpHandler.getDefaultDeviceId())) { + rtcmUdpHandler.setDefaultDeviceId(null); + System.out.println("已清除默认设备设置"); + } } else { System.out.println("设备 " + deviceId + " 没有TCP服务器"); } @@ -149,4 +171,20 @@ public class RtcmCommandLineRunner implements CommandLineRunner { udpHandler.clearMonitoredDevices(); System.out.println("已清空设备监控列表,将监控所有设备"); } + + private void setDefaultDevice(String deviceId) { + if (deviceId == null || deviceId.isEmpty()) { + System.out.println("设备ID不能为空"); + return; + } + + if (!rtcmDistributor.hasDeviceServer(deviceId)) { + System.out.println("设备 " + deviceId + " 没有TCP服务器,请先创建"); + return; + } + + rtcmUdpHandler.setDefaultDeviceId(deviceId); + System.out.println("已将设备 " + deviceId + " 设置为默认设备"); + System.out.println("所有从9903端口接收的RTCM数据将转发给此设备"); + } } \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDistributor.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDistributor.java index b9a20476..b4b5ba33 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDistributor.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDistributor.java @@ -15,6 +15,7 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; @Service public class RtcmDistributor { @@ -26,6 +27,9 @@ public class RtcmDistributor { // 设备ID与对应的端口映射 private final Map devicePorts = new ConcurrentHashMap<>(); + // 数据统计 + private final Map dataCounters = new ConcurrentHashMap<>(); + private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; @@ -46,6 +50,7 @@ public class RtcmDistributor { DeviceTcpServer server = new DeviceTcpServer(deviceId, port); deviceServers.put(deviceId, server); devicePorts.put(deviceId, port); + dataCounters.put(deviceId, new AtomicLong(0)); // 启动服务器 server.start(bossGroup, workerGroup); @@ -59,8 +64,22 @@ public class RtcmDistributor { DeviceTcpServer server = deviceServers.get(deviceId); if (server != null) { server.broadcastData(data); - if (logger.isDebugEnabled()) { - logger.debug("Sent RTCM data to device {} TCP server", deviceId); + + // 更新数据统计 + AtomicLong counter = dataCounters.computeIfAbsent(deviceId, k -> new AtomicLong(0)); + long totalBytes = counter.addAndGet(data.length); + + logger.info("已将{}字节的RTCM数据发送到设备{}的TCP服务器,累计发送{}字节", + data.length, deviceId, totalBytes); + + // 打印前20个字节的十六进制表示,帮助调试 + if (data.length > 0 && logger.isDebugEnabled()) { + StringBuilder hexData = new StringBuilder(); + int limit = Math.min(20, data.length); + for (int i = 0; i < limit; i++) { + hexData.append(String.format("%02X ", data[i] & 0xFF)); + } + logger.debug("RTCM数据样本: {}", hexData.toString()); } } } @@ -75,6 +94,7 @@ public class RtcmDistributor { DeviceTcpServer server = deviceServers.remove(deviceId); if (server != null) { devicePorts.remove(deviceId); + dataCounters.remove(deviceId); server.stop(); logger.info("Removed TCP server for device {}", deviceId); return true; @@ -87,6 +107,13 @@ public class RtcmDistributor { return new ConcurrentHashMap<>(devicePorts); } + // 获取设备数据统计 + public Map getDataStatistics() { + Map stats = new ConcurrentHashMap<>(); + dataCounters.forEach((deviceId, counter) -> stats.put(deviceId, counter.get())); + return stats; + } + // 内部类:每个设备对应一个TCP服务器 public class DeviceTcpServer { private final String deviceId; @@ -142,11 +169,18 @@ public class RtcmDistributor { } public void broadcastData(byte[] data) { + int activeClients = 0; for (Channel channel : connectedClients.keySet()) { if (channel.isActive()) { channel.writeAndFlush(channel.alloc().buffer().writeBytes(data)); + activeClients++; } } + + if (activeClients > 0 && logger.isDebugEnabled()) { + logger.debug("已将{}字节的数据广播到设备{}的{}个客户端", + data.length, deviceId, activeClients); + } } } } \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmUdpHandler.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmUdpHandler.java new file mode 100644 index 00000000..90742ef9 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmUdpHandler.java @@ -0,0 +1,83 @@ +package com.imdroid.ntripproxy.service; + +import com.imdroid.common.util.DataTypeUtil; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.ReferenceCountUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicReference; + +@ChannelHandler.Sharable +@Component +public class RtcmUdpHandler extends ChannelInboundHandlerAdapter { + + private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class); + + @Autowired + private RtcmDistributor rtcmDistributor; + + // 默认设备ID,可以通过命令行设置 + private AtomicReference defaultDeviceId = new AtomicReference<>(null); + + public void setDefaultDeviceId(String deviceId) { + defaultDeviceId.set(deviceId); + logger.info("设置默认设备ID为: {}", deviceId); + } + + public String getDefaultDeviceId() { + return defaultDeviceId.get(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + DatagramPacket packet = (DatagramPacket) msg; + try { + if (packet.content() == null) { + return; + } + + byte[] data = new byte[packet.content().readableBytes()]; + packet.content().getBytes(0, data); + + String deviceId = defaultDeviceId.get(); + if (deviceId == null || deviceId.isEmpty()) { + logger.info("收到RTCM数据,但未设置默认设备ID,无法转发"); + return; + } + + // 如果该设备有TCP服务器,则转发数据 + if (rtcmDistributor.hasDeviceServer(deviceId)) { + rtcmDistributor.sendRtcmData(deviceId, data); + logger.info("已将{}字节的RTCM数据转发到设备{}的TCP服务器", data.length, deviceId); + + // 打印前20个字节的十六进制表示,帮助调试 + if (data.length > 0 && logger.isDebugEnabled()) { + StringBuilder hexData = new StringBuilder(); + int limit = Math.min(20, data.length); + for (int i = 0; i < limit; i++) { + hexData.append(String.format("%02X ", data[i] & 0xFF)); + } + logger.debug("RTCM数据样本: {}", hexData.toString()); + } + } else { + logger.info("收到RTCM数据,但设备{}没有TCP服务器", deviceId); + } + + } catch (Exception e) { + logger.error("处理RTCM数据时出错: {}", e.toString()); + } finally { + ReferenceCountUtil.release(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + logger.error("RTCM UDP处理器异常: {}", cause.toString()); + } +} \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmUdpServer.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmUdpServer.java new file mode 100644 index 00000000..f63e49a0 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmUdpServer.java @@ -0,0 +1,49 @@ +package com.imdroid.ntripproxy.service; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioDatagramChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +@Component +public class RtcmUdpServer implements ApplicationRunner { + + private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class); + private final int port = 9903; // 直接监听9903端口 + + @Autowired + private RtcmUdpHandler rtcmUdpHandler; + + @Override + public void run(ApplicationArguments args) throws Exception { + new Thread(this::start0, "rtcm-server").start(); + } + + private void start0() { + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioDatagramChannel.class) + .option(ChannelOption.SO_REUSEADDR, true) // 允许端口重用 + .option(ChannelOption.SO_SNDBUF, 1024*1024) + .option(ChannelOption.SO_RCVBUF, 1024*1024) + .handler(rtcmUdpHandler); + try { + ChannelFuture future = bootstrap.bind(port).sync().channel().closeFuture(); + logger.info("RTCM UDP server started at port {}", port); + future.await(); + } catch (Exception e) { + logger.error("Error starting RTCM UDP server at port {}", port, e); + } finally { + group.shutdownGracefully(); + } + } +} \ No newline at end of file