diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D331RtcmMessageExecutor.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D331RtcmMessageExecutor.java index 3802e700..1d654566 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D331RtcmMessageExecutor.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D331RtcmMessageExecutor.java @@ -1,6 +1,7 @@ package com.imdroid.ntripproxy.executor; import com.imdroid.ntripproxy.message.D331RtcmMessage; +import com.imdroid.ntripproxy.service.RtcmDistributor; import com.imdroid.ntripproxy.service.UDPClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,8 +17,13 @@ import org.springframework.stereotype.Component; public class D331RtcmMessageExecutor implements Executor { private final Logger logger = LoggerFactory.getLogger(this.getClass()); + @Autowired UDPClient rtcmClient; + + @Autowired + RtcmDistributor rtcmDistributor; + @Override public Void execute(D331RtcmMessage message) { String id = message.getId(); @@ -25,11 +31,19 @@ public class D331RtcmMessageExecutor implements Executor if (logger.isDebugEnabled()) { logger.debug("receive d331 message of device: "+id+", seq:"+message.getSeq()+", len:"+message.getLen()); } - // 推送基站数据 + + // 提取RTCM数据 byte[] srcData = new byte[message.getSrcBuf().readableBytes()]; - message.getSrcBuf().getBytes(0,srcData); + message.getSrcBuf().getBytes(0, srcData); + + // 继续发送到原来的UDP客户端 rtcmClient.send(srcData); - + + // 只有已经创建了TCP服务器的设备才发送数据 + if (rtcmDistributor.hasDeviceServer(id)) { + rtcmDistributor.sendRtcmData(id, srcData); + } + return null; } diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java index aab3da20..732bf37e 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java @@ -3,8 +3,6 @@ package com.imdroid.ntripproxy.service; public class Ntrip2Channels { final private String localHost="127.0.0.1"; final private int localPort=9903; - final private String remoteHost="47.107.50.52"; - final private int remotePort=9903; public static final Ntrip2Channels INSTANCE = new Ntrip2Channels(); @@ -12,13 +10,10 @@ public class Ntrip2Channels { UDPClient remoteRtcm; private Ntrip2Channels() { localRtcm = new UDPClient(); - remoteRtcm = new UDPClient(); localRtcm.init(localHost, localPort); - remoteRtcm.init(remoteHost, remotePort); } public void send(byte[] data) { localRtcm.send(data); - remoteRtcm.send(data); } } diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java new file mode 100644 index 00000000..4eba2a4a --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmCommandLineRunner.java @@ -0,0 +1,111 @@ +package com.imdroid.ntripproxy.service; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.Scanner; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Component +public class RtcmCommandLineRunner implements CommandLineRunner { + + @Autowired + private RtcmDistributor rtcmDistributor; + + private static final int DEFAULT_BASE_PORT = 10000; + + @Override + public void run(String... args) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> { + Scanner scanner = new Scanner(System.in); + System.out.println("RTCM TCP服务器命令行工具启动"); + System.out.println("输入 'help' 查看可用命令"); + + while (true) { + System.out.print("> "); + String command = scanner.nextLine().trim(); + + if ("exit".equalsIgnoreCase(command)) { + break; + } else if ("help".equalsIgnoreCase(command)) { + printHelp(); + } else if ("list".equalsIgnoreCase(command)) { + listDevices(); + } else if (command.startsWith("create ")) { + String[] parts = command.substring("create ".length()).trim().split("\\s+"); + if (parts.length == 1) { + createDeviceServer(parts[0], DEFAULT_BASE_PORT); + } else if (parts.length == 2) { + try { + int port = Integer.parseInt(parts[1]); + createDeviceServer(parts[0], port); + } catch (NumberFormatException e) { + System.out.println("端口必须是数字"); + } + } else { + System.out.println("格式错误,使用 'create [port]'"); + } + } else if (command.startsWith("remove ")) { + String deviceId = command.substring("remove ".length()).trim(); + removeDeviceServer(deviceId); + } else { + System.out.println("未知命令,输入 'help' 查看可用命令"); + } + } + + System.out.println("命令行工具已退出"); + }); + } + + private void printHelp() { + System.out.println("可用命令:"); + System.out.println(" help - 显示此帮助信息"); + System.out.println(" list - 列出所有设备及其TCP端口"); + System.out.println(" create [port] - 为指定设备ID创建TCP服务器,可选指定端口"); + System.out.println(" remove - 移除指定设备的TCP服务器"); + System.out.println(" exit - 退出命令行工具(不会停止服务)"); + } + + private void listDevices() { + Map devicePorts = rtcmDistributor.listDevicePorts(); + if (devicePorts.isEmpty()) { + System.out.println("当前没有设备TCP服务器"); + } else { + System.out.println("设备TCP服务器列表:"); + for (Map.Entry entry : devicePorts.entrySet()) { + System.out.println(" 设备ID: " + entry.getKey() + ", TCP端口: " + entry.getValue()); + } + } + } + + private void createDeviceServer(String deviceId, int defaultPort) { + if (deviceId == null || deviceId.isEmpty()) { + System.out.println("设备ID不能为空"); + return; + } + + int port = rtcmDistributor.createDeviceServer(deviceId, defaultPort); + System.out.println("为设备 " + deviceId + " 创建TCP服务器,端口: " + port); + System.out.println("可以在rtklib配置文件中使用以下设置:"); + System.out.println("inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)"); + System.out.println("inpstr1-path =127.0.0.1:" + port); + } + + private void removeDeviceServer(String deviceId) { + if (deviceId == null || deviceId.isEmpty()) { + System.out.println("设备ID不能为空"); + return; + } + + boolean removed = rtcmDistributor.removeDeviceServer(deviceId); + if (removed) { + System.out.println("已移除设备 " + deviceId + " 的TCP服务器"); + } else { + System.out.println("设备 " + deviceId + " 没有TCP服务器"); + } + } +} \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDistributor.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDistributor.java new file mode 100644 index 00000000..b9a20476 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmDistributor.java @@ -0,0 +1,152 @@ +package com.imdroid.ntripproxy.service; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class RtcmDistributor { + private static final Logger logger = LoggerFactory.getLogger(RtcmDistributor.class); + + // 设备ID与对应的TCP服务器映射 + private final Map deviceServers = new ConcurrentHashMap<>(); + + // 设备ID与对应的端口映射 + private final Map devicePorts = new ConcurrentHashMap<>(); + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + @PostConstruct + public void init() { + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + logger.info("RtcmDistributor initialized"); + } + + // 为指定设备ID创建TCP服务器,返回分配的端口 + public synchronized int createDeviceServer(String deviceId, int port) { + if (deviceServers.containsKey(deviceId)) { + logger.info("Device {} already has a TCP server on port {}", deviceId, devicePorts.get(deviceId)); + return devicePorts.get(deviceId); + } + + DeviceTcpServer server = new DeviceTcpServer(deviceId, port); + deviceServers.put(deviceId, server); + devicePorts.put(deviceId, port); + + // 启动服务器 + server.start(bossGroup, workerGroup); + logger.info("Created TCP server for device {} on port {}", deviceId, port); + + return port; + } + + // 发送RTCM数据到特定设备的TCP服务器(如果存在) + public void sendRtcmData(String deviceId, byte[] data) { + DeviceTcpServer server = deviceServers.get(deviceId); + if (server != null) { + server.broadcastData(data); + if (logger.isDebugEnabled()) { + logger.debug("Sent RTCM data to device {} TCP server", deviceId); + } + } + } + + // 检查设备是否有TCP服务器 + public boolean hasDeviceServer(String deviceId) { + return deviceServers.containsKey(deviceId); + } + + // 删除设备的TCP服务器 + public boolean removeDeviceServer(String deviceId) { + DeviceTcpServer server = deviceServers.remove(deviceId); + if (server != null) { + devicePorts.remove(deviceId); + server.stop(); + logger.info("Removed TCP server for device {}", deviceId); + return true; + } + return false; + } + + // 列出所有设备及其端口 + public Map listDevicePorts() { + return new ConcurrentHashMap<>(devicePorts); + } + + // 内部类:每个设备对应一个TCP服务器 + public class DeviceTcpServer { + private final String deviceId; + private final int port; + private Channel serverChannel; + private final Map connectedClients = new ConcurrentHashMap<>(); + + public DeviceTcpServer(String deviceId, int port) { + this.deviceId = deviceId; + this.port = port; + } + + public void start(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new RtcmTcpHandler(DeviceTcpServer.this)); + } + }); + + try { + serverChannel = b.bind(port).sync().channel(); + logger.info("Device {} TCP server started on port {}", deviceId, port); + } catch (Exception e) { + logger.error("Failed to start TCP server for device {} on port {}", deviceId, port, e); + } + } + + public void stop() { + if (serverChannel != null) { + try { + serverChannel.close().sync(); + } catch (InterruptedException e) { + logger.error("Error closing server channel for device {}", deviceId, e); + } + } + } + + public void addClient(Channel channel) { + connectedClients.put(channel, true); + logger.info("New client connected to device {} server, total clients: {}", + deviceId, connectedClients.size()); + } + + public void removeClient(Channel channel) { + connectedClients.remove(channel); + logger.info("Client disconnected from device {} server, remaining clients: {}", + deviceId, connectedClients.size()); + } + + public void broadcastData(byte[] data) { + for (Channel channel : connectedClients.keySet()) { + if (channel.isActive()) { + channel.writeAndFlush(channel.alloc().buffer().writeBytes(data)); + } + } + } + } +} \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmTcpHandler.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmTcpHandler.java new file mode 100644 index 00000000..c1427e37 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmTcpHandler.java @@ -0,0 +1,40 @@ +package com.imdroid.ntripproxy.service; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ChannelHandler.Sharable +public class RtcmTcpHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(RtcmTcpHandler.class); + private final RtcmDistributor.DeviceTcpServer server; + + public RtcmTcpHandler(RtcmDistributor.DeviceTcpServer server) { + this.server = server; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + server.addClient(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + server.removeClient(ctx.channel()); + ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { + // 通常不需要处理从客户端接收的数据,因为我们只是将RTCM数据推送给客户端 + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("Exception in TCP handler", cause); + ctx.close(); + } +} \ No newline at end of file