feat: update forward server
This commit is contained in:
parent
e852b7c640
commit
a35cb1a431
@ -9,6 +9,7 @@ import com.imdroid.sideslope.bd.Gga;
|
|||||||
import com.imdroid.sideslope.bd.Rtcm1005;
|
import com.imdroid.sideslope.bd.Rtcm1005;
|
||||||
import com.imdroid.sideslope.message.D331RtcmMessage;
|
import com.imdroid.sideslope.message.D331RtcmMessage;
|
||||||
import com.imdroid.sideslope.ntrip.UdpNtripServer;
|
import com.imdroid.sideslope.ntrip.UdpNtripServer;
|
||||||
|
import com.imdroid.sideslope.server.tcp.DeviceTcpPortManager;
|
||||||
import com.imdroid.sideslope.service.Device;
|
import com.imdroid.sideslope.service.Device;
|
||||||
import com.imdroid.sideslope.service.DeviceService;
|
import com.imdroid.sideslope.service.DeviceService;
|
||||||
import com.imdroid.sideslope.server.DeviceChannel;
|
import com.imdroid.sideslope.server.DeviceChannel;
|
||||||
@ -45,6 +46,8 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
private DataPersistService dataPersistService;
|
private DataPersistService dataPersistService;
|
||||||
@Autowired
|
@Autowired
|
||||||
UdpNtripServer ntripServer;
|
UdpNtripServer ntripServer;
|
||||||
|
@Autowired
|
||||||
|
private DeviceTcpPortManager deviceTcpPortManager;
|
||||||
|
|
||||||
// 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间
|
// 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间
|
||||||
private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>();
|
private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>();
|
||||||
@ -275,22 +278,32 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendToNtrip(String mountpoint, String hexData) {
|
private void sendToNtrip(String deviceId, String hexData) {
|
||||||
|
// 首先检查设备是否启用TCP转发
|
||||||
|
if (!deviceTcpPortManager.isDeviceEnabled(deviceId)) {
|
||||||
|
return; // 如果设备未启用,直接返回
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// 提取RTCM数据并通过TCP转发
|
||||||
// 将原始字节转换为16进制字符串用于RTCM提取
|
List<String> rtcmList = RtcmGgaUtil.getRtcms(hexData);
|
||||||
//String hexData = ByteUtil.bytesToHexString(rawData);
|
if (rtcmList != null && !rtcmList.isEmpty()) {
|
||||||
//System.out.println(hexData);
|
for (String rtcm : rtcmList) {
|
||||||
|
try {
|
||||||
// 提取RTCM数据并发送到NtripServer,使用设备ID作为挂载点
|
byte[] data = ByteUtil.hexStringTobyte(rtcm);
|
||||||
Optional.ofNullable(RtcmGgaUtil.getRtcms(hexData))
|
deviceTcpPortManager.sendData(deviceId, data);
|
||||||
.ifPresent(rtcm -> {
|
// 获取端口信息用于日志记录
|
||||||
//System.out.println("挂载点: " + mountpoint);
|
int port = deviceTcpPortManager.getOrCreatePort(deviceId);
|
||||||
//System.out.println("RTCM数据: " + rtcm);
|
if (port > 0) { // 只有当端口创建成功时才记录日志
|
||||||
ntripServer.send(mountpoint, rtcm);
|
logger.debug("Forwarded RTCM data for device {} to TCP port {}", deviceId, port);
|
||||||
});
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("处理NTRIP数据失败, 挂载点: {}, 错误: {}", mountpoint, e.getMessage());
|
logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("处理RTCM数据失败, 设备ID: {}, 错误: {}", deviceId, e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,138 @@
|
|||||||
|
package com.imdroid.sideslope.server.tcp;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class DeviceTcpPortManager {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(DeviceTcpPortManager.class);
|
||||||
|
|
||||||
|
@Value("${rtcm.port.start:20000}")
|
||||||
|
private int startPort;
|
||||||
|
|
||||||
|
@Value("${rtcm.port.end:21000}")
|
||||||
|
private int endPort;
|
||||||
|
|
||||||
|
@Value("${rtcm.forward.device.ids:}")
|
||||||
|
private String forwardDeviceIds;
|
||||||
|
|
||||||
|
private final Map<String, Integer> devicePortMap = new ConcurrentHashMap<>();
|
||||||
|
private final Map<Integer, RtcmForwardServer> portServerMap = new ConcurrentHashMap<>();
|
||||||
|
private Set<String> enabledDevices;
|
||||||
|
private int currentPort;
|
||||||
|
|
||||||
|
public DeviceTcpPortManager() {
|
||||||
|
this.currentPort = startPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
// 初始化启用TCP转发的设备ID集合
|
||||||
|
enabledDevices = new HashSet<>();
|
||||||
|
if (forwardDeviceIds != null && !forwardDeviceIds.trim().isEmpty()) {
|
||||||
|
String[] ids = forwardDeviceIds.split(",");
|
||||||
|
enabledDevices.addAll(Arrays.asList(ids));
|
||||||
|
logger.info("Enabled TCP forward for devices: {}", enabledDevices);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized int getOrCreatePort(String deviceId) {
|
||||||
|
// 检查设备是否在启用列表中
|
||||||
|
if (!enabledDevices.contains(deviceId)) {
|
||||||
|
return -1; // 返回-1表示该设备未启用TCP转发
|
||||||
|
}
|
||||||
|
|
||||||
|
return devicePortMap.computeIfAbsent(deviceId, id -> {
|
||||||
|
// 尝试获取配置的固定端口
|
||||||
|
int configuredPort = getConfiguredPort(deviceId);
|
||||||
|
if (configuredPort > 0) {
|
||||||
|
try {
|
||||||
|
if (!portServerMap.containsKey(configuredPort)) {
|
||||||
|
RtcmForwardServer server = new RtcmForwardServer(configuredPort);
|
||||||
|
server.start();
|
||||||
|
portServerMap.put(configuredPort, server);
|
||||||
|
logger.info("Created new TCP forward server for device {} on configured port {}", deviceId, configuredPort);
|
||||||
|
return configuredPort;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to create TCP server on configured port {} for device {}", configuredPort, deviceId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果没有配置固定端口或固定端口创建失败,使用动态端口
|
||||||
|
while (currentPort <= endPort) {
|
||||||
|
if (!portServerMap.containsKey(currentPort)) {
|
||||||
|
try {
|
||||||
|
RtcmForwardServer server = new RtcmForwardServer(currentPort);
|
||||||
|
server.start();
|
||||||
|
portServerMap.put(currentPort, server);
|
||||||
|
logger.info("Created new TCP forward server for device {} on port {}", deviceId, currentPort);
|
||||||
|
return currentPort++;
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to create TCP server on port {}, trying next port", currentPort, e);
|
||||||
|
currentPort++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
currentPort++;
|
||||||
|
}
|
||||||
|
throw new RuntimeException("No available ports");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getConfiguredPort(String deviceId) {
|
||||||
|
try {
|
||||||
|
// 从Spring环境中获取设备特定的端口配置
|
||||||
|
String portValue = System.getProperty("rtcm.device." + deviceId + ".port");
|
||||||
|
if (portValue != null && !portValue.trim().isEmpty()) {
|
||||||
|
int port = Integer.parseInt(portValue);
|
||||||
|
if (port >= startPort && port <= endPort) {
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("Failed to get configured port for device {}", deviceId, e);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendData(String deviceId, byte[] data) {
|
||||||
|
// 只处理启用的设备
|
||||||
|
if (!enabledDevices.contains(deviceId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer port = devicePortMap.get(deviceId);
|
||||||
|
if (port != null) {
|
||||||
|
RtcmForwardServer server = portServerMap.get(port);
|
||||||
|
if (server != null) {
|
||||||
|
server.broadcast(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeDevice(String deviceId) {
|
||||||
|
Integer port = devicePortMap.remove(deviceId);
|
||||||
|
if (port != null) {
|
||||||
|
RtcmForwardServer server = portServerMap.remove(port);
|
||||||
|
if (server != null) {
|
||||||
|
server.stop();
|
||||||
|
logger.info("Removed TCP forward server for device {} on port {}", deviceId, port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDeviceEnabled(String deviceId) {
|
||||||
|
return enabledDevices.contains(deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Integer> getActiveDevicePorts() {
|
||||||
|
return new HashMap<>(devicePortMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,111 @@
|
|||||||
|
package com.imdroid.sideslope.server.tcp;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.channel.*;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
|
import io.netty.handler.codec.bytes.ByteArrayEncoder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
public class RtcmForwardServer {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(RtcmForwardServer.class);
|
||||||
|
|
||||||
|
private final int port;
|
||||||
|
private final EventLoopGroup bossGroup;
|
||||||
|
private final EventLoopGroup workerGroup;
|
||||||
|
private Channel serverChannel;
|
||||||
|
private final ConcurrentMap<Channel, Boolean> connectedClients;
|
||||||
|
|
||||||
|
public RtcmForwardServer(int port) {
|
||||||
|
this.port = port;
|
||||||
|
this.bossGroup = new NioEventLoopGroup(1);
|
||||||
|
this.workerGroup = new NioEventLoopGroup();
|
||||||
|
this.connectedClients = new ConcurrentHashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
try {
|
||||||
|
ServerBootstrap b = new ServerBootstrap();
|
||||||
|
b.group(bossGroup, workerGroup)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) {
|
||||||
|
ChannelPipeline p = ch.pipeline();
|
||||||
|
p.addLast(new ByteArrayEncoder());
|
||||||
|
p.addLast(new SimpleChannelInboundHandler<Object>() {
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) {
|
||||||
|
connectedClients.put(ctx.channel(), true);
|
||||||
|
logger.info("Client connected to port {}: {}", port, ctx.channel().remoteAddress());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) {
|
||||||
|
connectedClients.remove(ctx.channel());
|
||||||
|
logger.info("Client disconnected from port {}: {}", port, ctx.channel().remoteAddress());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
// 我们不需要处理来自客户端的数据
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
logger.error("Error on port " + port, cause);
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.option(ChannelOption.SO_BACKLOG, 128)
|
||||||
|
.childOption(ChannelOption.SO_KEEPALIVE, true);
|
||||||
|
|
||||||
|
serverChannel = b.bind(port).sync().channel();
|
||||||
|
logger.info("RTCM forward server started on port {}", port);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to start RTCM forward server on port " + port, e);
|
||||||
|
throw new RuntimeException("Failed to start server", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void broadcast(byte[] data) {
|
||||||
|
if (data == null || data.length == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
connectedClients.keySet().forEach(channel -> {
|
||||||
|
if (channel.isActive()) {
|
||||||
|
channel.writeAndFlush(data).addListener((ChannelFutureListener) future -> {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
logger.error("Failed to send data to client on port " + port, future.cause());
|
||||||
|
future.channel().close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
try {
|
||||||
|
if (serverChannel != null) {
|
||||||
|
serverChannel.close().sync();
|
||||||
|
}
|
||||||
|
connectedClients.keySet().forEach(Channel::close);
|
||||||
|
connectedClients.clear();
|
||||||
|
|
||||||
|
bossGroup.shutdownGracefully();
|
||||||
|
workerGroup.shutdownGracefully();
|
||||||
|
|
||||||
|
logger.info("RTCM forward server stopped on port {}", port);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Error stopping RTCM forward server on port " + port, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -4,6 +4,7 @@ import com.imdroid.common.util.DataTypeUtil;
|
|||||||
import com.imdroid.secapi.client.HttpResp;
|
import com.imdroid.secapi.client.HttpResp;
|
||||||
import com.imdroid.sideslope.calc.MultiLineGNSSCalcService;
|
import com.imdroid.sideslope.calc.MultiLineGNSSCalcService;
|
||||||
import com.imdroid.sideslope.calc.SingleLineGNSSCalcService;
|
import com.imdroid.sideslope.calc.SingleLineGNSSCalcService;
|
||||||
|
import com.imdroid.sideslope.server.tcp.DeviceTcpPortManager;
|
||||||
import com.imdroid.sideslope.service.Device;
|
import com.imdroid.sideslope.service.Device;
|
||||||
import com.imdroid.sideslope.service.LocalDeviceServiceImpl;
|
import com.imdroid.sideslope.service.LocalDeviceServiceImpl;
|
||||||
import com.imdroid.sideslope.server.DeviceChannel;
|
import com.imdroid.sideslope.server.DeviceChannel;
|
||||||
@ -44,6 +45,8 @@ public class ApiController {
|
|||||||
MultiLineGNSSCalcService multiCalcService;
|
MultiLineGNSSCalcService multiCalcService;
|
||||||
@Autowired
|
@Autowired
|
||||||
GroupParaService groupParaService;
|
GroupParaService groupParaService;
|
||||||
|
@Autowired
|
||||||
|
private DeviceTcpPortManager deviceTcpPortManager;
|
||||||
|
|
||||||
@PostMapping(value = "/config")
|
@PostMapping(value = "/config")
|
||||||
public HttpResp config(String deviceId, String configuration) {
|
public HttpResp config(String deviceId, String configuration) {
|
||||||
@ -153,6 +156,13 @@ public class ApiController {
|
|||||||
return resp;
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GetMapping("/tcp/ports")
|
||||||
|
public Map<String, Object> getTcpPorts() {
|
||||||
|
Map<String, Object> result = new HashMap<>();
|
||||||
|
result.put("enabled_devices", deviceTcpPortManager.getActiveDevicePorts());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
@PostMapping("/device_param_changed")
|
@PostMapping("/device_param_changed")
|
||||||
public HttpResp deviceParamChanged(String deviceId, String oldParentId) {
|
public HttpResp deviceParamChanged(String deviceId, String oldParentId) {
|
||||||
// 更新设备缓存
|
// 更新设备缓存
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user