feat: tcpcli v4
This commit is contained in:
parent
a403f8a3b8
commit
b4d81245c5
@ -18,6 +18,9 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private UdpHandler udpHandler;
|
private UdpHandler udpHandler;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RtcmUdpHandler rtcmUdpHandler;
|
||||||
|
|
||||||
private static final int DEFAULT_BASE_PORT = 10000;
|
private static final int DEFAULT_BASE_PORT = 10000;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -64,6 +67,9 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
|
|||||||
monitorDevice(deviceId);
|
monitorDevice(deviceId);
|
||||||
} else if ("clearmonitor".equalsIgnoreCase(command)) {
|
} else if ("clearmonitor".equalsIgnoreCase(command)) {
|
||||||
clearMonitoredDevices();
|
clearMonitoredDevices();
|
||||||
|
} else if (command.startsWith("setdefault ")) {
|
||||||
|
String deviceId = command.substring("setdefault ".length()).trim();
|
||||||
|
setDefaultDevice(deviceId);
|
||||||
} else {
|
} else {
|
||||||
System.out.println("未知命令,输入 'help' 查看可用命令");
|
System.out.println("未知命令,输入 'help' 查看可用命令");
|
||||||
}
|
}
|
||||||
@ -83,6 +89,7 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
|
|||||||
System.out.println(" stopmonitor - 停止UDP数据监控");
|
System.out.println(" stopmonitor - 停止UDP数据监控");
|
||||||
System.out.println(" monitordevice <deviceId> - 添加指定设备ID到监控列表");
|
System.out.println(" monitordevice <deviceId> - 添加指定设备ID到监控列表");
|
||||||
System.out.println(" clearmonitor - 清空设备监控列表(监控所有设备)");
|
System.out.println(" clearmonitor - 清空设备监控列表(监控所有设备)");
|
||||||
|
System.out.println(" setdefault <deviceId> - 设置默认设备ID,9903端口的RTCM数据将转发给此设备");
|
||||||
System.out.println(" exit - 退出命令行工具(不会停止服务)");
|
System.out.println(" exit - 退出命令行工具(不会停止服务)");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,7 +100,11 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
|
|||||||
} else {
|
} else {
|
||||||
System.out.println("设备TCP服务器列表:");
|
System.out.println("设备TCP服务器列表:");
|
||||||
for (Map.Entry<String, Integer> entry : devicePorts.entrySet()) {
|
for (Map.Entry<String, Integer> entry : devicePorts.entrySet()) {
|
||||||
System.out.println(" 设备ID: " + entry.getKey() + ", TCP端口: " + entry.getValue());
|
String defaultMark = "";
|
||||||
|
if (entry.getKey().equals(rtcmUdpHandler.getDefaultDeviceId())) {
|
||||||
|
defaultMark = " (默认设备)";
|
||||||
|
}
|
||||||
|
System.out.println(" 设备ID: " + entry.getKey() + ", TCP端口: " + entry.getValue() + defaultMark);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -109,6 +120,11 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
|
|||||||
System.out.println("可以在rtklib配置文件中使用以下设置:");
|
System.out.println("可以在rtklib配置文件中使用以下设置:");
|
||||||
System.out.println("inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)");
|
System.out.println("inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)");
|
||||||
System.out.println("inpstr1-path =127.0.0.1:" + port);
|
System.out.println("inpstr1-path =127.0.0.1:" + port);
|
||||||
|
|
||||||
|
// 如果没有默认设备,自动设置为默认设备
|
||||||
|
if (rtcmUdpHandler.getDefaultDeviceId() == null) {
|
||||||
|
setDefaultDevice(deviceId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeDeviceServer(String deviceId) {
|
private void removeDeviceServer(String deviceId) {
|
||||||
@ -120,6 +136,12 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
|
|||||||
boolean removed = rtcmDistributor.removeDeviceServer(deviceId);
|
boolean removed = rtcmDistributor.removeDeviceServer(deviceId);
|
||||||
if (removed) {
|
if (removed) {
|
||||||
System.out.println("已移除设备 " + deviceId + " 的TCP服务器");
|
System.out.println("已移除设备 " + deviceId + " 的TCP服务器");
|
||||||
|
|
||||||
|
// 如果移除的是默认设备,清除默认设备设置
|
||||||
|
if (deviceId.equals(rtcmUdpHandler.getDefaultDeviceId())) {
|
||||||
|
rtcmUdpHandler.setDefaultDeviceId(null);
|
||||||
|
System.out.println("已清除默认设备设置");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
System.out.println("设备 " + deviceId + " 没有TCP服务器");
|
System.out.println("设备 " + deviceId + " 没有TCP服务器");
|
||||||
}
|
}
|
||||||
@ -149,4 +171,20 @@ public class RtcmCommandLineRunner implements CommandLineRunner {
|
|||||||
udpHandler.clearMonitoredDevices();
|
udpHandler.clearMonitoredDevices();
|
||||||
System.out.println("已清空设备监控列表,将监控所有设备");
|
System.out.println("已清空设备监控列表,将监控所有设备");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setDefaultDevice(String deviceId) {
|
||||||
|
if (deviceId == null || deviceId.isEmpty()) {
|
||||||
|
System.out.println("设备ID不能为空");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!rtcmDistributor.hasDeviceServer(deviceId)) {
|
||||||
|
System.out.println("设备 " + deviceId + " 没有TCP服务器,请先创建");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
rtcmUdpHandler.setDefaultDeviceId(deviceId);
|
||||||
|
System.out.println("已将设备 " + deviceId + " 设置为默认设备");
|
||||||
|
System.out.println("所有从9903端口接收的RTCM数据将转发给此设备");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@ -15,6 +15,7 @@ import org.springframework.stereotype.Service;
|
|||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class RtcmDistributor {
|
public class RtcmDistributor {
|
||||||
@ -26,6 +27,9 @@ public class RtcmDistributor {
|
|||||||
// 设备ID与对应的端口映射
|
// 设备ID与对应的端口映射
|
||||||
private final Map<String, Integer> devicePorts = new ConcurrentHashMap<>();
|
private final Map<String, Integer> devicePorts = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
// 数据统计
|
||||||
|
private final Map<String, AtomicLong> dataCounters = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private EventLoopGroup bossGroup;
|
private EventLoopGroup bossGroup;
|
||||||
private EventLoopGroup workerGroup;
|
private EventLoopGroup workerGroup;
|
||||||
|
|
||||||
@ -46,6 +50,7 @@ public class RtcmDistributor {
|
|||||||
DeviceTcpServer server = new DeviceTcpServer(deviceId, port);
|
DeviceTcpServer server = new DeviceTcpServer(deviceId, port);
|
||||||
deviceServers.put(deviceId, server);
|
deviceServers.put(deviceId, server);
|
||||||
devicePorts.put(deviceId, port);
|
devicePorts.put(deviceId, port);
|
||||||
|
dataCounters.put(deviceId, new AtomicLong(0));
|
||||||
|
|
||||||
// 启动服务器
|
// 启动服务器
|
||||||
server.start(bossGroup, workerGroup);
|
server.start(bossGroup, workerGroup);
|
||||||
@ -59,8 +64,22 @@ public class RtcmDistributor {
|
|||||||
DeviceTcpServer server = deviceServers.get(deviceId);
|
DeviceTcpServer server = deviceServers.get(deviceId);
|
||||||
if (server != null) {
|
if (server != null) {
|
||||||
server.broadcastData(data);
|
server.broadcastData(data);
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("Sent RTCM data to device {} TCP server", deviceId);
|
// 更新数据统计
|
||||||
|
AtomicLong counter = dataCounters.computeIfAbsent(deviceId, k -> new AtomicLong(0));
|
||||||
|
long totalBytes = counter.addAndGet(data.length);
|
||||||
|
|
||||||
|
logger.info("已将{}字节的RTCM数据发送到设备{}的TCP服务器,累计发送{}字节",
|
||||||
|
data.length, deviceId, totalBytes);
|
||||||
|
|
||||||
|
// 打印前20个字节的十六进制表示,帮助调试
|
||||||
|
if (data.length > 0 && logger.isDebugEnabled()) {
|
||||||
|
StringBuilder hexData = new StringBuilder();
|
||||||
|
int limit = Math.min(20, data.length);
|
||||||
|
for (int i = 0; i < limit; i++) {
|
||||||
|
hexData.append(String.format("%02X ", data[i] & 0xFF));
|
||||||
|
}
|
||||||
|
logger.debug("RTCM数据样本: {}", hexData.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -75,6 +94,7 @@ public class RtcmDistributor {
|
|||||||
DeviceTcpServer server = deviceServers.remove(deviceId);
|
DeviceTcpServer server = deviceServers.remove(deviceId);
|
||||||
if (server != null) {
|
if (server != null) {
|
||||||
devicePorts.remove(deviceId);
|
devicePorts.remove(deviceId);
|
||||||
|
dataCounters.remove(deviceId);
|
||||||
server.stop();
|
server.stop();
|
||||||
logger.info("Removed TCP server for device {}", deviceId);
|
logger.info("Removed TCP server for device {}", deviceId);
|
||||||
return true;
|
return true;
|
||||||
@ -87,6 +107,13 @@ public class RtcmDistributor {
|
|||||||
return new ConcurrentHashMap<>(devicePorts);
|
return new ConcurrentHashMap<>(devicePorts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 获取设备数据统计
|
||||||
|
public Map<String, Long> getDataStatistics() {
|
||||||
|
Map<String, Long> stats = new ConcurrentHashMap<>();
|
||||||
|
dataCounters.forEach((deviceId, counter) -> stats.put(deviceId, counter.get()));
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
|
||||||
// 内部类:每个设备对应一个TCP服务器
|
// 内部类:每个设备对应一个TCP服务器
|
||||||
public class DeviceTcpServer {
|
public class DeviceTcpServer {
|
||||||
private final String deviceId;
|
private final String deviceId;
|
||||||
@ -142,11 +169,18 @@ public class RtcmDistributor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void broadcastData(byte[] data) {
|
public void broadcastData(byte[] data) {
|
||||||
|
int activeClients = 0;
|
||||||
for (Channel channel : connectedClients.keySet()) {
|
for (Channel channel : connectedClients.keySet()) {
|
||||||
if (channel.isActive()) {
|
if (channel.isActive()) {
|
||||||
channel.writeAndFlush(channel.alloc().buffer().writeBytes(data));
|
channel.writeAndFlush(channel.alloc().buffer().writeBytes(data));
|
||||||
|
activeClients++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (activeClients > 0 && logger.isDebugEnabled()) {
|
||||||
|
logger.debug("已将{}字节的数据广播到设备{}的{}个客户端",
|
||||||
|
data.length, deviceId, activeClients);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -0,0 +1,83 @@
|
|||||||
|
package com.imdroid.ntripproxy.service;
|
||||||
|
|
||||||
|
import com.imdroid.common.util.DataTypeUtil;
|
||||||
|
import io.netty.channel.ChannelHandler;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.channel.socket.DatagramPacket;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@ChannelHandler.Sharable
|
||||||
|
@Component
|
||||||
|
public class RtcmUdpHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RtcmDistributor rtcmDistributor;
|
||||||
|
|
||||||
|
// 默认设备ID,可以通过命令行设置
|
||||||
|
private AtomicReference<String> defaultDeviceId = new AtomicReference<>(null);
|
||||||
|
|
||||||
|
public void setDefaultDeviceId(String deviceId) {
|
||||||
|
defaultDeviceId.set(deviceId);
|
||||||
|
logger.info("设置默认设备ID为: {}", deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDefaultDeviceId() {
|
||||||
|
return defaultDeviceId.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
|
DatagramPacket packet = (DatagramPacket) msg;
|
||||||
|
try {
|
||||||
|
if (packet.content() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] data = new byte[packet.content().readableBytes()];
|
||||||
|
packet.content().getBytes(0, data);
|
||||||
|
|
||||||
|
String deviceId = defaultDeviceId.get();
|
||||||
|
if (deviceId == null || deviceId.isEmpty()) {
|
||||||
|
logger.info("收到RTCM数据,但未设置默认设备ID,无法转发");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果该设备有TCP服务器,则转发数据
|
||||||
|
if (rtcmDistributor.hasDeviceServer(deviceId)) {
|
||||||
|
rtcmDistributor.sendRtcmData(deviceId, data);
|
||||||
|
logger.info("已将{}字节的RTCM数据转发到设备{}的TCP服务器", data.length, deviceId);
|
||||||
|
|
||||||
|
// 打印前20个字节的十六进制表示,帮助调试
|
||||||
|
if (data.length > 0 && logger.isDebugEnabled()) {
|
||||||
|
StringBuilder hexData = new StringBuilder();
|
||||||
|
int limit = Math.min(20, data.length);
|
||||||
|
for (int i = 0; i < limit; i++) {
|
||||||
|
hexData.append(String.format("%02X ", data[i] & 0xFF));
|
||||||
|
}
|
||||||
|
logger.debug("RTCM数据样本: {}", hexData.toString());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.info("收到RTCM数据,但设备{}没有TCP服务器", deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("处理RTCM数据时出错: {}", e.toString());
|
||||||
|
} finally {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
logger.error("RTCM UDP处理器异常: {}", cause.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,49 @@
|
|||||||
|
package com.imdroid.ntripproxy.service;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.nio.NioDatagramChannel;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.ApplicationArguments;
|
||||||
|
import org.springframework.boot.ApplicationRunner;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class RtcmUdpServer implements ApplicationRunner {
|
||||||
|
|
||||||
|
private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class);
|
||||||
|
private final int port = 9903; // 直接监听9903端口
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RtcmUdpHandler rtcmUdpHandler;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(ApplicationArguments args) throws Exception {
|
||||||
|
new Thread(this::start0, "rtcm-server").start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void start0() {
|
||||||
|
EventLoopGroup group = new NioEventLoopGroup();
|
||||||
|
Bootstrap bootstrap = new Bootstrap();
|
||||||
|
bootstrap.group(group)
|
||||||
|
.channel(NioDatagramChannel.class)
|
||||||
|
.option(ChannelOption.SO_REUSEADDR, true) // 允许端口重用
|
||||||
|
.option(ChannelOption.SO_SNDBUF, 1024*1024)
|
||||||
|
.option(ChannelOption.SO_RCVBUF, 1024*1024)
|
||||||
|
.handler(rtcmUdpHandler);
|
||||||
|
try {
|
||||||
|
ChannelFuture future = bootstrap.bind(port).sync().channel().closeFuture();
|
||||||
|
logger.info("RTCM UDP server started at port {}", port);
|
||||||
|
future.await();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Error starting RTCM UDP server at port {}", port, e);
|
||||||
|
} finally {
|
||||||
|
group.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user