From 4a4def045a510e8e6d40fa45cd2bfd7d765ffde7 Mon Sep 17 00:00:00 2001 From: yarnom Date: Mon, 28 Jul 2025 18:49:53 +0800 Subject: [PATCH] feat: rtksrv v2 test --- .../executor/D331RtcmMessageExecutor.java | 17 ++- .../tcp/RtcmSpecificDeviceTcpServer.java | 110 ++++++++++++++++++ 2 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmSpecificDeviceTcpServer.java diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java index cc9f8272..65cea4a9 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java @@ -10,12 +10,13 @@ import com.imdroid.sideslope.bd.Rtcm1005; import com.imdroid.sideslope.message.D331RtcmMessage; import com.imdroid.sideslope.ntrip.UdpNtripServer; import com.imdroid.sideslope.ntrip.RtcmUdpForwarder; -import com.imdroid.sideslope.service.Device; -import com.imdroid.sideslope.service.DeviceService; import com.imdroid.sideslope.server.DeviceChannel; import com.imdroid.sideslope.server.OnlineChannels; import com.imdroid.sideslope.service.DataPersistService; import com.imdroid.sideslope.bd.RtcmGgaUtil; +import com.imdroid.sideslope.server.tcp.RtcmSpecificDeviceTcpServer; +import com.imdroid.sideslope.service.Device; +import com.imdroid.sideslope.service.DeviceService; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.slf4j.Logger; @@ -48,6 +49,8 @@ public class D331RtcmMessageExecutor implements Executor UdpNtripServer ntripServer; @Autowired RtcmUdpForwarder rtcmUdpForwarder; + @Autowired + RtcmSpecificDeviceTcpServer rtcmSpecificDeviceTcpServer; // 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间 private final Map lastD300ForwardTimeMap = new ConcurrentHashMap<>(); @@ -294,6 +297,16 @@ public class D331RtcmMessageExecutor implements Executor // 同时转发到12001端口 rtcmUdpForwarder.forward(mountpoint, rtcm); + + // 如果是特定设备的数据,则通过TCP服务器转发 + if (mountpoint.equals(rtcmSpecificDeviceTcpServer.getTargetDeviceId())) { + for (String rtcmHex : rtcm) { + byte[] rtcmData = ByteUtil.hexStringTobyte(rtcmHex); + rtcmSpecificDeviceTcpServer.broadcastRtcmData(rtcmData); + } + logger.debug("Forwarded RTCM data from device {} to TCP server on port {}", + mountpoint, rtcmSpecificDeviceTcpServer.getPort()); + } }); } catch (Exception e) { logger.error("处理NTRIP数据失败, 挂载点: {}, 错误: {}", mountpoint, e.getMessage()); diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmSpecificDeviceTcpServer.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmSpecificDeviceTcpServer.java new file mode 100644 index 00000000..a7efbce5 --- /dev/null +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmSpecificDeviceTcpServer.java @@ -0,0 +1,110 @@ +package com.imdroid.sideslope.server.tcp; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * 特定设备RTCM数据TCP服务器 + */ +@Component +public class RtcmSpecificDeviceTcpServer implements ApplicationRunner { + + private final Logger logger = LoggerFactory.getLogger(RtcmSpecificDeviceTcpServer.class); + + @Value("${rtcm.specific.device.port:12002}") + private Integer port; + + @Value("${rtcm.specific.device.id:3530795}") + private String targetDeviceId; + + // 存储所有连接的客户端通道 + private final CopyOnWriteArrayList clientChannels = new CopyOnWriteArrayList<>(); + + @Override + public void run(ApplicationArguments args) throws Exception { + new Thread(this::start0, "specific-device-tcp-server").start(); + } + + private void start0() { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // 添加到客户端列表 + clientChannels.add(ch); + + // 当连接关闭时,从列表中移除 + ch.closeFuture().addListener(future -> clientChannels.remove(ch)); + + logger.info("New client connected: {}", ch.remoteAddress()); + } + }); + + ChannelFuture future = bootstrap.bind(port).sync(); + logger.info("Specific device TCP server started on port {} for device {}", port, targetDeviceId); + + future.channel().closeFuture().sync(); + } catch (Exception e) { + logger.error("Error starting Specific device TCP server at port {}", port, e); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + /** + * 向所有连接的客户端发送RTCM数据 + * @param rtcmData RTCM数据字节数组 + */ + public void broadcastRtcmData(byte[] rtcmData) { + if (clientChannels.isEmpty()) { + return; + } + + for (SocketChannel channel : clientChannels) { + if (channel.isActive()) { + channel.writeAndFlush(io.netty.buffer.Unpooled.wrappedBuffer(rtcmData)); + } + } + + logger.debug("Broadcasted RTCM data to {} clients, data length: {}", clientChannels.size(), rtcmData.length); + } + + /** + * 获取目标设备ID + * @return 目标设备ID + */ + public String getTargetDeviceId() { + return targetDeviceId; + } + + /** + * 获取服务器端口 + * @return 服务器端口 + */ + public Integer getPort() { + return port; + } +} \ No newline at end of file