feat: update forward server
This commit is contained in:
parent
a35cb1a431
commit
bd9a6e575e
@ -6,6 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||||||
import org.springframework.boot.autoconfigure.domain.EntityScan;
|
import org.springframework.boot.autoconfigure.domain.EntityScan;
|
||||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||||
import org.springframework.context.annotation.ComponentScan;
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -17,6 +18,7 @@ import org.springframework.context.annotation.ComponentScan;
|
|||||||
@ComponentScan({"com.imdroid.*"})
|
@ComponentScan({"com.imdroid.*"})
|
||||||
@EntityScan({"com.imdroid.*"})
|
@EntityScan({"com.imdroid.*"})
|
||||||
@EnableFeignClients(basePackages = "com.imdroid.*")
|
@EnableFeignClients(basePackages = "com.imdroid.*")
|
||||||
|
@EnableScheduling
|
||||||
public class SideSlopeRtcmApp {
|
public class SideSlopeRtcmApp {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
|||||||
@ -16,6 +16,7 @@ import com.imdroid.sideslope.server.DeviceChannel;
|
|||||||
import com.imdroid.sideslope.server.OnlineChannels;
|
import com.imdroid.sideslope.server.OnlineChannels;
|
||||||
import com.imdroid.sideslope.service.DataPersistService;
|
import com.imdroid.sideslope.service.DataPersistService;
|
||||||
import com.imdroid.sideslope.bd.RtcmGgaUtil;
|
import com.imdroid.sideslope.bd.RtcmGgaUtil;
|
||||||
|
import com.imdroid.sideslope.config.RtcmPortConfigManager;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -48,6 +49,8 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
UdpNtripServer ntripServer;
|
UdpNtripServer ntripServer;
|
||||||
@Autowired
|
@Autowired
|
||||||
private DeviceTcpPortManager deviceTcpPortManager;
|
private DeviceTcpPortManager deviceTcpPortManager;
|
||||||
|
@Autowired
|
||||||
|
private RtcmPortConfigManager rtcmPortConfigManager;
|
||||||
|
|
||||||
// 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间
|
// 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间
|
||||||
private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>();
|
private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>();
|
||||||
@ -279,9 +282,10 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void sendToNtrip(String deviceId, String hexData) {
|
private void sendToNtrip(String deviceId, String hexData) {
|
||||||
// 首先检查设备是否启用TCP转发
|
// 检查设备是否启用TCP转发
|
||||||
if (!deviceTcpPortManager.isDeviceEnabled(deviceId)) {
|
if (!rtcmPortConfigManager.isDeviceEnabled(deviceId)) {
|
||||||
return; // 如果设备未启用,直接返回
|
logger.debug("Device {} not enabled for TCP forwarding", deviceId);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -292,11 +296,6 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
try {
|
try {
|
||||||
byte[] data = ByteUtil.hexStringTobyte(rtcm);
|
byte[] data = ByteUtil.hexStringTobyte(rtcm);
|
||||||
deviceTcpPortManager.sendData(deviceId, data);
|
deviceTcpPortManager.sendData(deviceId, data);
|
||||||
// 获取端口信息用于日志记录
|
|
||||||
int port = deviceTcpPortManager.getOrCreatePort(deviceId);
|
|
||||||
if (port > 0) { // 只有当端口创建成功时才记录日志
|
|
||||||
logger.debug("Forwarded RTCM data for device {} to TCP port {}", deviceId, port);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage());
|
logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,118 +2,39 @@ package com.imdroid.sideslope.server.tcp;
|
|||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import java.util.HashMap;
|
||||||
import java.util.*;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class DeviceTcpPortManager {
|
public class DeviceTcpPortManager {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DeviceTcpPortManager.class);
|
private static final Logger logger = LoggerFactory.getLogger(DeviceTcpPortManager.class);
|
||||||
|
|
||||||
@Value("${rtcm.port.start:20000}")
|
|
||||||
private int startPort;
|
|
||||||
|
|
||||||
@Value("${rtcm.port.end:21000}")
|
|
||||||
private int endPort;
|
|
||||||
|
|
||||||
@Value("${rtcm.forward.device.ids:}")
|
|
||||||
private String forwardDeviceIds;
|
|
||||||
|
|
||||||
private final Map<String, Integer> devicePortMap = new ConcurrentHashMap<>();
|
private final Map<String, Integer> devicePortMap = new ConcurrentHashMap<>();
|
||||||
private final Map<Integer, RtcmForwardServer> portServerMap = new ConcurrentHashMap<>();
|
private final Map<Integer, RtcmForwardServer> portServerMap = new ConcurrentHashMap<>();
|
||||||
private Set<String> enabledDevices;
|
|
||||||
private int currentPort;
|
|
||||||
|
|
||||||
public DeviceTcpPortManager() {
|
public synchronized void addDevice(String deviceId, int port) {
|
||||||
this.currentPort = startPort;
|
if (port <= 0) {
|
||||||
}
|
logger.error("Invalid port {} for device {}", port, deviceId);
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init() {
|
|
||||||
// 初始化启用TCP转发的设备ID集合
|
|
||||||
enabledDevices = new HashSet<>();
|
|
||||||
if (forwardDeviceIds != null && !forwardDeviceIds.trim().isEmpty()) {
|
|
||||||
String[] ids = forwardDeviceIds.split(",");
|
|
||||||
enabledDevices.addAll(Arrays.asList(ids));
|
|
||||||
logger.info("Enabled TCP forward for devices: {}", enabledDevices);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized int getOrCreatePort(String deviceId) {
|
|
||||||
// 检查设备是否在启用列表中
|
|
||||||
if (!enabledDevices.contains(deviceId)) {
|
|
||||||
return -1; // 返回-1表示该设备未启用TCP转发
|
|
||||||
}
|
|
||||||
|
|
||||||
return devicePortMap.computeIfAbsent(deviceId, id -> {
|
|
||||||
// 尝试获取配置的固定端口
|
|
||||||
int configuredPort = getConfiguredPort(deviceId);
|
|
||||||
if (configuredPort > 0) {
|
|
||||||
try {
|
|
||||||
if (!portServerMap.containsKey(configuredPort)) {
|
|
||||||
RtcmForwardServer server = new RtcmForwardServer(configuredPort);
|
|
||||||
server.start();
|
|
||||||
portServerMap.put(configuredPort, server);
|
|
||||||
logger.info("Created new TCP forward server for device {} on configured port {}", deviceId, configuredPort);
|
|
||||||
return configuredPort;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Failed to create TCP server on configured port {} for device {}", configuredPort, deviceId, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果没有配置固定端口或固定端口创建失败,使用动态端口
|
|
||||||
while (currentPort <= endPort) {
|
|
||||||
if (!portServerMap.containsKey(currentPort)) {
|
|
||||||
try {
|
|
||||||
RtcmForwardServer server = new RtcmForwardServer(currentPort);
|
|
||||||
server.start();
|
|
||||||
portServerMap.put(currentPort, server);
|
|
||||||
logger.info("Created new TCP forward server for device {} on port {}", deviceId, currentPort);
|
|
||||||
return currentPort++;
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Failed to create TCP server on port {}, trying next port", currentPort, e);
|
|
||||||
currentPort++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
currentPort++;
|
|
||||||
}
|
|
||||||
throw new RuntimeException("No available ports");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private int getConfiguredPort(String deviceId) {
|
|
||||||
try {
|
|
||||||
// 从Spring环境中获取设备特定的端口配置
|
|
||||||
String portValue = System.getProperty("rtcm.device." + deviceId + ".port");
|
|
||||||
if (portValue != null && !portValue.trim().isEmpty()) {
|
|
||||||
int port = Integer.parseInt(portValue);
|
|
||||||
if (port >= startPort && port <= endPort) {
|
|
||||||
return port;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("Failed to get configured port for device {}", deviceId, e);
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void sendData(String deviceId, byte[] data) {
|
|
||||||
// 只处理启用的设备
|
|
||||||
if (!enabledDevices.contains(deviceId)) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Integer port = devicePortMap.get(deviceId);
|
// 检查端口是否已被其他设备使用
|
||||||
if (port != null) {
|
if (portServerMap.containsKey(port)) {
|
||||||
RtcmForwardServer server = portServerMap.get(port);
|
logger.error("Port {} is already in use", port);
|
||||||
if (server != null) {
|
return;
|
||||||
server.broadcast(data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
RtcmForwardServer server = new RtcmForwardServer(port);
|
||||||
|
server.start();
|
||||||
|
portServerMap.put(port, server);
|
||||||
|
devicePortMap.put(deviceId, port);
|
||||||
|
logger.info("Created new TCP forward server for device {} on port {}", deviceId, port);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to create TCP server for device {} on port {}", deviceId, port, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,8 +49,19 @@ public class DeviceTcpPortManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isDeviceEnabled(String deviceId) {
|
public void sendData(String deviceId, byte[] data) {
|
||||||
return enabledDevices.contains(deviceId);
|
Integer port = devicePortMap.get(deviceId);
|
||||||
|
if (port != null) {
|
||||||
|
RtcmForwardServer server = portServerMap.get(port);
|
||||||
|
if (server != null) {
|
||||||
|
server.broadcast(data);
|
||||||
|
logger.debug("Forwarded {} bytes of data for device {} to port {}", data.length, deviceId, port);
|
||||||
|
} else {
|
||||||
|
logger.warn("Server not found for device {} on port {}", deviceId, port);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.debug("No port mapping found for device {}", deviceId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Integer> getActiveDevicePorts() {
|
public Map<String, Integer> getActiveDevicePorts() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user