feat: update forward server

This commit is contained in:
yarnom 2025-07-29 15:08:36 +08:00
parent 006979df31
commit b25a26b2c6
3 changed files with 139 additions and 37 deletions

View File

@ -284,25 +284,39 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
private void sendToNtrip(String deviceId, String hexData) { private void sendToNtrip(String deviceId, String hexData) {
// 检查设备是否启用TCP转发 // 检查设备是否启用TCP转发
if (!rtcmPortConfigManager.isDeviceEnabled(deviceId)) { if (!rtcmPortConfigManager.isDeviceEnabled(deviceId)) {
logger.debug("Device {} not enabled for TCP forwarding", deviceId); logger.info("Device {} not enabled for TCP forwarding", deviceId);
return; return;
} }
try { try {
// 添加原始数据日志
logger.info("Processing hex data for device {}, data length: {}", deviceId, hexData.length());
if (logger.isDebugEnabled()) {
logger.debug("Raw hex data: {}", hexData);
}
// 提取RTCM数据并通过TCP转发 // 提取RTCM数据并通过TCP转发
List<String> rtcmList = RtcmGgaUtil.getRtcms(hexData); List<String> rtcmList = RtcmGgaUtil.getRtcms(hexData);
logger.info("Extracted {} RTCM messages for device {}", rtcmList != null ? rtcmList.size() : 0, deviceId);
if (rtcmList != null && !rtcmList.isEmpty()) { if (rtcmList != null && !rtcmList.isEmpty()) {
for (String rtcm : rtcmList) { for (String rtcm : rtcmList) {
try { try {
byte[] data = ByteUtil.hexStringTobyte(rtcm); byte[] data = ByteUtil.hexStringTobyte(rtcm);
logger.info("Forwarding RTCM data for device {}, data length: {}", deviceId, data.length);
if (logger.isDebugEnabled()) {
logger.debug("RTCM data: {}", rtcm);
}
deviceTcpPortManager.sendData(deviceId, data); deviceTcpPortManager.sendData(deviceId, data);
} catch (Exception e) { } catch (Exception e) {
logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage()); logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage(), e);
} }
} }
} else {
logger.info("No RTCM data found in message for device {}", deviceId);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("处理RTCM数据失败, 设备ID: {}, 错误: {}", deviceId, e.getMessage()); logger.error("处理RTCM数据失败, 设备ID: {}, 错误: {}", deviceId, e.getMessage(), e);
} }
} }

View File

@ -28,43 +28,71 @@ public class DeviceTcpPortManager {
} }
try { try {
logger.info("Creating new TCP forward server for device {} on port {}", deviceId, port);
RtcmForwardServer server = new RtcmForwardServer(port); RtcmForwardServer server = new RtcmForwardServer(port);
server.start(); server.start();
portServerMap.put(port, server); portServerMap.put(port, server);
devicePortMap.put(deviceId, port); devicePortMap.put(deviceId, port);
logger.info("Created new TCP forward server for device {} on port {}", deviceId, port); logger.info("Successfully created TCP forward server for device {} on port {}", deviceId, port);
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to create TCP server for device {} on port {}", deviceId, port, e); logger.error("Failed to create TCP server for device {} on port {}: {}", deviceId, port, e.getMessage(), e);
} }
} }
public void removeDevice(String deviceId) { public void removeDevice(String deviceId) {
Integer port = devicePortMap.remove(deviceId); Integer port = devicePortMap.remove(deviceId);
if (port != null) { if (port != null) {
logger.info("Removing TCP forward server for device {} on port {}", deviceId, port);
RtcmForwardServer server = portServerMap.remove(port); RtcmForwardServer server = portServerMap.remove(port);
if (server != null) { if (server != null) {
server.stop(); server.stop();
logger.info("Removed TCP forward server for device {} on port {}", deviceId, port); logger.info("Successfully removed TCP forward server for device {} on port {}", deviceId, port);
}
}
}
public void sendData(String deviceId, byte[] data) {
Integer port = devicePortMap.get(deviceId);
if (port != null) {
RtcmForwardServer server = portServerMap.get(port);
if (server != null) {
server.broadcast(data);
logger.debug("Forwarded {} bytes of data for device {} to port {}", data.length, deviceId, port);
} else { } else {
logger.warn("Server not found for device {} on port {}", deviceId, port); logger.warn("Server not found for device {} on port {}", deviceId, port);
} }
} else { } else {
logger.debug("No port mapping found for device {}", deviceId); logger.info("No port mapping found for device {} to remove", deviceId);
}
}
public void sendData(String deviceId, byte[] data) {
if (data == null || data.length == 0) {
logger.warn("Empty data received for device {}", deviceId);
return;
}
Integer port = devicePortMap.get(deviceId);
if (port != null) {
RtcmForwardServer server = portServerMap.get(port);
if (server != null) {
try {
server.broadcast(data);
logger.info("Successfully forwarded {} bytes of data for device {} to port {}",
data.length, deviceId, port);
} catch (Exception e) {
logger.error("Error broadcasting data for device {} on port {}: {}",
deviceId, port, e.getMessage(), e);
}
} else {
logger.error("Server not found for device {} on port {}", deviceId, port);
// 尝试重新创建服务器
logger.info("Attempting to recreate server for device {} on port {}", deviceId, port);
addDevice(deviceId, port);
}
} else {
logger.error("No port mapping found for device {}", deviceId);
} }
} }
public Map<String, Integer> getActiveDevicePorts() { public Map<String, Integer> getActiveDevicePorts() {
return new HashMap<>(devicePortMap); return new HashMap<>(devicePortMap);
} }
public boolean hasDevice(String deviceId) {
return devicePortMap.containsKey(deviceId);
}
public Integer getDevicePort(String deviceId) {
return devicePortMap.get(deviceId);
}
} }

View File

@ -20,6 +20,7 @@ public class RtcmForwardServer {
private final EventLoopGroup workerGroup; private final EventLoopGroup workerGroup;
private Channel serverChannel; private Channel serverChannel;
private final ConcurrentMap<Channel, Boolean> connectedClients; private final ConcurrentMap<Channel, Boolean> connectedClients;
private volatile boolean isRunning = false;
public RtcmForwardServer(int port) { public RtcmForwardServer(int port) {
this.port = port; this.port = port;
@ -28,14 +29,21 @@ public class RtcmForwardServer {
this.connectedClients = new ConcurrentHashMap<>(); this.connectedClients = new ConcurrentHashMap<>();
} }
public void start() { public synchronized void start() {
if (isRunning) {
logger.warn("Server on port {} is already running", port);
return;
}
try { try {
logger.info("Starting RTCM forward server on port {}", port);
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) { protected void initChannel(SocketChannel ch) {
logger.info("New client connection on port {}: {}", port, ch.remoteAddress());
ChannelPipeline p = ch.pipeline(); ChannelPipeline p = ch.pipeline();
p.addLast(new ByteArrayEncoder()); p.addLast(new ByteArrayEncoder());
p.addLast(new SimpleChannelInboundHandler<Object>() { p.addLast(new SimpleChannelInboundHandler<Object>() {
@ -54,11 +62,12 @@ public class RtcmForwardServer {
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) { protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
// 我们不需要处理来自客户端的数据 // 我们不需要处理来自客户端的数据
logger.debug("Received data from client on port {}: {}", port, ctx.channel().remoteAddress());
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("Error on port " + port, cause); logger.error("Error on port {} for client {}: {}", port, ctx.channel().remoteAddress(), cause.getMessage(), cause);
ctx.close(); ctx.close();
} }
}); });
@ -68,44 +77,95 @@ public class RtcmForwardServer {
.childOption(ChannelOption.SO_KEEPALIVE, true); .childOption(ChannelOption.SO_KEEPALIVE, true);
serverChannel = b.bind(port).sync().channel(); serverChannel = b.bind(port).sync().channel();
logger.info("RTCM forward server started on port {}", port); isRunning = true;
logger.info("RTCM forward server successfully started on port {}", port);
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to start RTCM forward server on port " + port, e); logger.error("Failed to start RTCM forward server on port {}: {}", port, e.getMessage(), e);
throw new RuntimeException("Failed to start server", e); throw new RuntimeException("Failed to start server on port " + port, e);
} }
} }
public void broadcast(byte[] data) { public void broadcast(byte[] data) {
if (data == null || data.length == 0) { if (!isRunning) {
logger.error("Cannot broadcast data - server on port {} is not running", port);
return; return;
} }
connectedClients.keySet().forEach(channel -> { if (data == null || data.length == 0) {
logger.warn("Attempted to broadcast empty data on port {}", port);
return;
}
int activeClients = 0;
int successfulBroadcasts = 0;
for (Channel channel : connectedClients.keySet()) {
if (channel.isActive()) { if (channel.isActive()) {
channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> { activeClients++;
if (!future.isSuccess()) { try {
logger.error("Failed to send data to client on port " + port, future.cause()); channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> {
future.channel().close(); if (future.isSuccess()) {
} logger.debug("Successfully sent {} bytes to client {} on port {}",
}); data.length, future.channel().remoteAddress(), port);
} else {
logger.error("Failed to send data to client {} on port {}: {}",
future.channel().remoteAddress(), port, future.cause().getMessage());
future.channel().close();
}
});
successfulBroadcasts++;
} catch (Exception e) {
logger.error("Error broadcasting to client on port {}: {}", port, e.getMessage(), e);
}
} }
}); }
logger.info("Broadcast {} bytes to {}/{} clients on port {}",
data.length, successfulBroadcasts, activeClients, port);
} }
public void stop() { public synchronized void stop() {
if (!isRunning) {
logger.warn("Server on port {} is already stopped", port);
return;
}
logger.info("Stopping RTCM forward server on port {}", port);
try { try {
if (serverChannel != null) { if (serverChannel != null) {
serverChannel.close().sync(); serverChannel.close().sync();
} }
connectedClients.keySet().forEach(Channel::close);
// 关闭所有客户端连接
int closedConnections = 0;
for (Channel channel : connectedClients.keySet()) {
try {
channel.close().sync();
closedConnections++;
} catch (Exception e) {
logger.warn("Error closing client connection on port {}: {}", port, e.getMessage());
}
}
connectedClients.clear(); connectedClients.clear();
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
logger.info("RTCM forward server stopped on port {}", port); isRunning = false;
logger.info("Successfully stopped RTCM forward server on port {}, closed {} client connections",
port, closedConnections);
} catch (Exception e) { } catch (Exception e) {
logger.error("Error stopping RTCM forward server on port " + port, e); logger.error("Error stopping RTCM forward server on port {}: {}", port, e.getMessage(), e);
} }
} }
public boolean isRunning() {
return isRunning;
}
public int getActiveClientCount() {
return (int) connectedClients.keySet().stream()
.filter(Channel::isActive)
.count();
}
} }