feat: tcpcli

This commit is contained in:
yarnom 2025-07-28 16:16:09 +08:00
parent b6629696d6
commit 6e639b7a76
5 changed files with 320 additions and 8 deletions

View File

@ -1,6 +1,7 @@
package com.imdroid.ntripproxy.executor; package com.imdroid.ntripproxy.executor;
import com.imdroid.ntripproxy.message.D331RtcmMessage; import com.imdroid.ntripproxy.message.D331RtcmMessage;
import com.imdroid.ntripproxy.service.RtcmDistributor;
import com.imdroid.ntripproxy.service.UDPClient; import com.imdroid.ntripproxy.service.UDPClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -16,8 +17,13 @@ import org.springframework.stereotype.Component;
public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void> { public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired @Autowired
UDPClient rtcmClient; UDPClient rtcmClient;
@Autowired
RtcmDistributor rtcmDistributor;
@Override @Override
public Void execute(D331RtcmMessage message) { public Void execute(D331RtcmMessage message) {
String id = message.getId(); String id = message.getId();
@ -25,11 +31,19 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("receive d331 message of device: "+id+", seq:"+message.getSeq()+", len:"+message.getLen()); logger.debug("receive d331 message of device: "+id+", seq:"+message.getSeq()+", len:"+message.getLen());
} }
// 推送基站数据
// 提取RTCM数据
byte[] srcData = new byte[message.getSrcBuf().readableBytes()]; byte[] srcData = new byte[message.getSrcBuf().readableBytes()];
message.getSrcBuf().getBytes(0, srcData); message.getSrcBuf().getBytes(0, srcData);
// 继续发送到原来的UDP客户端
rtcmClient.send(srcData); rtcmClient.send(srcData);
// 只有已经创建了TCP服务器的设备才发送数据
if (rtcmDistributor.hasDeviceServer(id)) {
rtcmDistributor.sendRtcmData(id, srcData);
}
return null; return null;
} }

View File

@ -3,8 +3,6 @@ package com.imdroid.ntripproxy.service;
public class Ntrip2Channels { public class Ntrip2Channels {
final private String localHost="127.0.0.1"; final private String localHost="127.0.0.1";
final private int localPort=9903; final private int localPort=9903;
final private String remoteHost="47.107.50.52";
final private int remotePort=9903;
public static final Ntrip2Channels INSTANCE = new Ntrip2Channels(); public static final Ntrip2Channels INSTANCE = new Ntrip2Channels();
@ -12,13 +10,10 @@ public class Ntrip2Channels {
UDPClient remoteRtcm; UDPClient remoteRtcm;
private Ntrip2Channels() { private Ntrip2Channels() {
localRtcm = new UDPClient(); localRtcm = new UDPClient();
remoteRtcm = new UDPClient();
localRtcm.init(localHost, localPort); localRtcm.init(localHost, localPort);
remoteRtcm.init(remoteHost, remotePort);
} }
public void send(byte[] data) { public void send(byte[] data) {
localRtcm.send(data); localRtcm.send(data);
remoteRtcm.send(data);
} }
} }

View File

@ -0,0 +1,111 @@
package com.imdroid.ntripproxy.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class RtcmCommandLineRunner implements CommandLineRunner {
@Autowired
private RtcmDistributor rtcmDistributor;
private static final int DEFAULT_BASE_PORT = 10000;
@Override
public void run(String... args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("RTCM TCP服务器命令行工具启动");
System.out.println("输入 'help' 查看可用命令");
while (true) {
System.out.print("> ");
String command = scanner.nextLine().trim();
if ("exit".equalsIgnoreCase(command)) {
break;
} else if ("help".equalsIgnoreCase(command)) {
printHelp();
} else if ("list".equalsIgnoreCase(command)) {
listDevices();
} else if (command.startsWith("create ")) {
String[] parts = command.substring("create ".length()).trim().split("\\s+");
if (parts.length == 1) {
createDeviceServer(parts[0], DEFAULT_BASE_PORT);
} else if (parts.length == 2) {
try {
int port = Integer.parseInt(parts[1]);
createDeviceServer(parts[0], port);
} catch (NumberFormatException e) {
System.out.println("端口必须是数字");
}
} else {
System.out.println("格式错误,使用 'create <deviceId> [port]'");
}
} else if (command.startsWith("remove ")) {
String deviceId = command.substring("remove ".length()).trim();
removeDeviceServer(deviceId);
} else {
System.out.println("未知命令,输入 'help' 查看可用命令");
}
}
System.out.println("命令行工具已退出");
});
}
private void printHelp() {
System.out.println("可用命令:");
System.out.println(" help - 显示此帮助信息");
System.out.println(" list - 列出所有设备及其TCP端口");
System.out.println(" create <deviceId> [port] - 为指定设备ID创建TCP服务器可选指定端口");
System.out.println(" remove <deviceId> - 移除指定设备的TCP服务器");
System.out.println(" exit - 退出命令行工具(不会停止服务)");
}
private void listDevices() {
Map<String, Integer> devicePorts = rtcmDistributor.listDevicePorts();
if (devicePorts.isEmpty()) {
System.out.println("当前没有设备TCP服务器");
} else {
System.out.println("设备TCP服务器列表");
for (Map.Entry<String, Integer> entry : devicePorts.entrySet()) {
System.out.println(" 设备ID: " + entry.getKey() + ", TCP端口: " + entry.getValue());
}
}
}
private void createDeviceServer(String deviceId, int defaultPort) {
if (deviceId == null || deviceId.isEmpty()) {
System.out.println("设备ID不能为空");
return;
}
int port = rtcmDistributor.createDeviceServer(deviceId, defaultPort);
System.out.println("为设备 " + deviceId + " 创建TCP服务器端口: " + port);
System.out.println("可以在rtklib配置文件中使用以下设置");
System.out.println("inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)");
System.out.println("inpstr1-path =127.0.0.1:" + port);
}
private void removeDeviceServer(String deviceId) {
if (deviceId == null || deviceId.isEmpty()) {
System.out.println("设备ID不能为空");
return;
}
boolean removed = rtcmDistributor.removeDeviceServer(deviceId);
if (removed) {
System.out.println("已移除设备 " + deviceId + " 的TCP服务器");
} else {
System.out.println("设备 " + deviceId + " 没有TCP服务器");
}
}
}

View File

@ -0,0 +1,152 @@
package com.imdroid.ntripproxy.service;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class RtcmDistributor {
private static final Logger logger = LoggerFactory.getLogger(RtcmDistributor.class);
// 设备ID与对应的TCP服务器映射
private final Map<String, DeviceTcpServer> deviceServers = new ConcurrentHashMap<>();
// 设备ID与对应的端口映射
private final Map<String, Integer> devicePorts = new ConcurrentHashMap<>();
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@PostConstruct
public void init() {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
logger.info("RtcmDistributor initialized");
}
// 为指定设备ID创建TCP服务器返回分配的端口
public synchronized int createDeviceServer(String deviceId, int port) {
if (deviceServers.containsKey(deviceId)) {
logger.info("Device {} already has a TCP server on port {}", deviceId, devicePorts.get(deviceId));
return devicePorts.get(deviceId);
}
DeviceTcpServer server = new DeviceTcpServer(deviceId, port);
deviceServers.put(deviceId, server);
devicePorts.put(deviceId, port);
// 启动服务器
server.start(bossGroup, workerGroup);
logger.info("Created TCP server for device {} on port {}", deviceId, port);
return port;
}
// 发送RTCM数据到特定设备的TCP服务器如果存在
public void sendRtcmData(String deviceId, byte[] data) {
DeviceTcpServer server = deviceServers.get(deviceId);
if (server != null) {
server.broadcastData(data);
if (logger.isDebugEnabled()) {
logger.debug("Sent RTCM data to device {} TCP server", deviceId);
}
}
}
// 检查设备是否有TCP服务器
public boolean hasDeviceServer(String deviceId) {
return deviceServers.containsKey(deviceId);
}
// 删除设备的TCP服务器
public boolean removeDeviceServer(String deviceId) {
DeviceTcpServer server = deviceServers.remove(deviceId);
if (server != null) {
devicePorts.remove(deviceId);
server.stop();
logger.info("Removed TCP server for device {}", deviceId);
return true;
}
return false;
}
// 列出所有设备及其端口
public Map<String, Integer> listDevicePorts() {
return new ConcurrentHashMap<>(devicePorts);
}
// 内部类每个设备对应一个TCP服务器
public class DeviceTcpServer {
private final String deviceId;
private final int port;
private Channel serverChannel;
private final Map<Channel, Boolean> connectedClients = new ConcurrentHashMap<>();
public DeviceTcpServer(String deviceId, int port) {
this.deviceId = deviceId;
this.port = port;
}
public void start(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new RtcmTcpHandler(DeviceTcpServer.this));
}
});
try {
serverChannel = b.bind(port).sync().channel();
logger.info("Device {} TCP server started on port {}", deviceId, port);
} catch (Exception e) {
logger.error("Failed to start TCP server for device {} on port {}", deviceId, port, e);
}
}
public void stop() {
if (serverChannel != null) {
try {
serverChannel.close().sync();
} catch (InterruptedException e) {
logger.error("Error closing server channel for device {}", deviceId, e);
}
}
}
public void addClient(Channel channel) {
connectedClients.put(channel, true);
logger.info("New client connected to device {} server, total clients: {}",
deviceId, connectedClients.size());
}
public void removeClient(Channel channel) {
connectedClients.remove(channel);
logger.info("Client disconnected from device {} server, remaining clients: {}",
deviceId, connectedClients.size());
}
public void broadcastData(byte[] data) {
for (Channel channel : connectedClients.keySet()) {
if (channel.isActive()) {
channel.writeAndFlush(channel.alloc().buffer().writeBytes(data));
}
}
}
}
}

View File

@ -0,0 +1,40 @@
package com.imdroid.ntripproxy.service;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ChannelHandler.Sharable
public class RtcmTcpHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(RtcmTcpHandler.class);
private final RtcmDistributor.DeviceTcpServer server;
public RtcmTcpHandler(RtcmDistributor.DeviceTcpServer server) {
this.server = server;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
server.addClient(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
server.removeClient(ctx.channel());
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// 通常不需要处理从客户端接收的数据因为我们只是将RTCM数据推送给客户端
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("Exception in TCP handler", cause);
ctx.close();
}
}