feat: rtksrv v2 test

This commit is contained in:
yarnom 2025-07-28 18:49:53 +08:00
parent 0698640b26
commit 4a4def045a
2 changed files with 125 additions and 2 deletions

View File

@ -10,12 +10,13 @@ import com.imdroid.sideslope.bd.Rtcm1005;
import com.imdroid.sideslope.message.D331RtcmMessage; import com.imdroid.sideslope.message.D331RtcmMessage;
import com.imdroid.sideslope.ntrip.UdpNtripServer; import com.imdroid.sideslope.ntrip.UdpNtripServer;
import com.imdroid.sideslope.ntrip.RtcmUdpForwarder; import com.imdroid.sideslope.ntrip.RtcmUdpForwarder;
import com.imdroid.sideslope.service.Device;
import com.imdroid.sideslope.service.DeviceService;
import com.imdroid.sideslope.server.DeviceChannel; import com.imdroid.sideslope.server.DeviceChannel;
import com.imdroid.sideslope.server.OnlineChannels; import com.imdroid.sideslope.server.OnlineChannels;
import com.imdroid.sideslope.service.DataPersistService; import com.imdroid.sideslope.service.DataPersistService;
import com.imdroid.sideslope.bd.RtcmGgaUtil; import com.imdroid.sideslope.bd.RtcmGgaUtil;
import com.imdroid.sideslope.server.tcp.RtcmSpecificDeviceTcpServer;
import com.imdroid.sideslope.service.Device;
import com.imdroid.sideslope.service.DeviceService;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -48,6 +49,8 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
UdpNtripServer ntripServer; UdpNtripServer ntripServer;
@Autowired @Autowired
RtcmUdpForwarder rtcmUdpForwarder; RtcmUdpForwarder rtcmUdpForwarder;
@Autowired
RtcmSpecificDeviceTcpServer rtcmSpecificDeviceTcpServer;
// 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间 // 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间
private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>(); private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>();
@ -294,6 +297,16 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
// 同时转发到12001端口 // 同时转发到12001端口
rtcmUdpForwarder.forward(mountpoint, rtcm); rtcmUdpForwarder.forward(mountpoint, rtcm);
// 如果是特定设备的数据则通过TCP服务器转发
if (mountpoint.equals(rtcmSpecificDeviceTcpServer.getTargetDeviceId())) {
for (String rtcmHex : rtcm) {
byte[] rtcmData = ByteUtil.hexStringTobyte(rtcmHex);
rtcmSpecificDeviceTcpServer.broadcastRtcmData(rtcmData);
}
logger.debug("Forwarded RTCM data from device {} to TCP server on port {}",
mountpoint, rtcmSpecificDeviceTcpServer.getPort());
}
}); });
} catch (Exception e) { } catch (Exception e) {
logger.error("处理NTRIP数据失败, 挂载点: {}, 错误: {}", mountpoint, e.getMessage()); logger.error("处理NTRIP数据失败, 挂载点: {}, 错误: {}", mountpoint, e.getMessage());

View File

@ -0,0 +1,110 @@
package com.imdroid.sideslope.server.tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* 特定设备RTCM数据TCP服务器
*/
@Component
public class RtcmSpecificDeviceTcpServer implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(RtcmSpecificDeviceTcpServer.class);
@Value("${rtcm.specific.device.port:12002}")
private Integer port;
@Value("${rtcm.specific.device.id:3530795}")
private String targetDeviceId;
// 存储所有连接的客户端通道
private final CopyOnWriteArrayList<SocketChannel> clientChannels = new CopyOnWriteArrayList<>();
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(this::start0, "specific-device-tcp-server").start();
}
private void start0() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加到客户端列表
clientChannels.add(ch);
// 当连接关闭时从列表中移除
ch.closeFuture().addListener(future -> clientChannels.remove(ch));
logger.info("New client connected: {}", ch.remoteAddress());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
logger.info("Specific device TCP server started on port {} for device {}", port, targetDeviceId);
future.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("Error starting Specific device TCP server at port {}", port, e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* 向所有连接的客户端发送RTCM数据
* @param rtcmData RTCM数据字节数组
*/
public void broadcastRtcmData(byte[] rtcmData) {
if (clientChannels.isEmpty()) {
return;
}
for (SocketChannel channel : clientChannels) {
if (channel.isActive()) {
channel.writeAndFlush(io.netty.buffer.Unpooled.wrappedBuffer(rtcmData));
}
}
logger.debug("Broadcasted RTCM data to {} clients, data length: {}", clientChannels.size(), rtcmData.length);
}
/**
* 获取目标设备ID
* @return 目标设备ID
*/
public String getTargetDeviceId() {
return targetDeviceId;
}
/**
* 获取服务器端口
* @return 服务器端口
*/
public Integer getPort() {
return port;
}
}