Compare commits
No commits in common. "006979df31b7201aae346bdecd292eac0e81ccc3" and "a35cb1a43113941da33804425e930350c0350ea5" have entirely different histories.
006979df31
...
a35cb1a431
@ -6,7 +6,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||||||
import org.springframework.boot.autoconfigure.domain.EntityScan;
|
import org.springframework.boot.autoconfigure.domain.EntityScan;
|
||||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||||
import org.springframework.context.annotation.ComponentScan;
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -18,7 +17,6 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
|||||||
@ComponentScan({"com.imdroid.*"})
|
@ComponentScan({"com.imdroid.*"})
|
||||||
@EntityScan({"com.imdroid.*"})
|
@EntityScan({"com.imdroid.*"})
|
||||||
@EnableFeignClients(basePackages = "com.imdroid.*")
|
@EnableFeignClients(basePackages = "com.imdroid.*")
|
||||||
@EnableScheduling
|
|
||||||
public class SideSlopeRtcmApp {
|
public class SideSlopeRtcmApp {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
|||||||
@ -1,123 +0,0 @@
|
|||||||
package com.imdroid.sideslope.config;
|
|
||||||
|
|
||||||
import com.imdroid.sideslope.server.tcp.DeviceTcpPortManager;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class RtcmPortConfigManager {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RtcmPortConfigManager.class);
|
|
||||||
|
|
||||||
@Value("${rtcm.config.file:/root/beidou/config/rtcm_port}")
|
|
||||||
private String configFilePath;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DeviceTcpPortManager deviceTcpPortManager;
|
|
||||||
|
|
||||||
// 当前配置的设备和端口映射
|
|
||||||
private final Map<String, Integer> currentConfig = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init() {
|
|
||||||
// 确保配置目录存在
|
|
||||||
File configFile = new File(configFilePath);
|
|
||||||
if (!configFile.getParentFile().exists()) {
|
|
||||||
configFile.getParentFile().mkdirs();
|
|
||||||
}
|
|
||||||
// 初始加载配置
|
|
||||||
loadConfig();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Scheduled(fixedRate = 60000) // 每60秒执行一次
|
|
||||||
public void scheduledLoadConfig() {
|
|
||||||
loadConfig();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void loadConfig() {
|
|
||||||
Map<String, Integer> newConfig = new HashMap<>();
|
|
||||||
File configFile = new File(configFilePath);
|
|
||||||
|
|
||||||
if (!configFile.exists()) {
|
|
||||||
logger.info("Config file not found: {}, will create when needed", configFilePath);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try (BufferedReader reader = new BufferedReader(new FileReader(configFile))) {
|
|
||||||
String line;
|
|
||||||
while ((line = reader.readLine()) != null) {
|
|
||||||
line = line.trim();
|
|
||||||
if (line.isEmpty() || line.startsWith("#")) {
|
|
||||||
continue; // 跳过空行和注释
|
|
||||||
}
|
|
||||||
|
|
||||||
String[] parts = line.split("\\s+");
|
|
||||||
if (parts.length >= 2) {
|
|
||||||
try {
|
|
||||||
String deviceId = parts[0];
|
|
||||||
int port = Integer.parseInt(parts[1]);
|
|
||||||
newConfig.put(deviceId, port);
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
logger.warn("Invalid port number in config file: {}", line);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("Error reading config file: {}", configFilePath, e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 找出需要移除的设备
|
|
||||||
Set<String> devicesToRemove = new HashSet<>(currentConfig.keySet());
|
|
||||||
devicesToRemove.removeAll(newConfig.keySet());
|
|
||||||
|
|
||||||
// 找出新增或更新的设备
|
|
||||||
Map<String, Integer> devicesToUpdate = new HashMap<>(newConfig);
|
|
||||||
|
|
||||||
// 移除不再需要的设备
|
|
||||||
for (String deviceId : devicesToRemove) {
|
|
||||||
deviceTcpPortManager.removeDevice(deviceId);
|
|
||||||
logger.info("Removed device {} from TCP port manager", deviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新或添加新的设备配置
|
|
||||||
for (Map.Entry<String, Integer> entry : devicesToUpdate.entrySet()) {
|
|
||||||
String deviceId = entry.getKey();
|
|
||||||
int port = entry.getValue();
|
|
||||||
|
|
||||||
// 如果端口发生变化,需要重新创建
|
|
||||||
Integer currentPort = currentConfig.get(deviceId);
|
|
||||||
if (currentPort == null || !currentPort.equals(port)) {
|
|
||||||
if (currentPort != null) {
|
|
||||||
deviceTcpPortManager.removeDevice(deviceId);
|
|
||||||
}
|
|
||||||
deviceTcpPortManager.addDevice(deviceId, port);
|
|
||||||
logger.info("Updated device {} to use port {}", deviceId, port);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新当前配置
|
|
||||||
currentConfig.clear();
|
|
||||||
currentConfig.putAll(newConfig);
|
|
||||||
logger.info("Config updated, current devices: {}", currentConfig);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isDeviceEnabled(String deviceId) {
|
|
||||||
return currentConfig.containsKey(deviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getDevicePort(String deviceId) {
|
|
||||||
return currentConfig.get(deviceId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -16,7 +16,6 @@ import com.imdroid.sideslope.server.DeviceChannel;
|
|||||||
import com.imdroid.sideslope.server.OnlineChannels;
|
import com.imdroid.sideslope.server.OnlineChannels;
|
||||||
import com.imdroid.sideslope.service.DataPersistService;
|
import com.imdroid.sideslope.service.DataPersistService;
|
||||||
import com.imdroid.sideslope.bd.RtcmGgaUtil;
|
import com.imdroid.sideslope.bd.RtcmGgaUtil;
|
||||||
import com.imdroid.sideslope.config.RtcmPortConfigManager;
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -49,8 +48,6 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
UdpNtripServer ntripServer;
|
UdpNtripServer ntripServer;
|
||||||
@Autowired
|
@Autowired
|
||||||
private DeviceTcpPortManager deviceTcpPortManager;
|
private DeviceTcpPortManager deviceTcpPortManager;
|
||||||
@Autowired
|
|
||||||
private RtcmPortConfigManager rtcmPortConfigManager;
|
|
||||||
|
|
||||||
// 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间
|
// 添加一个成员变量用于追踪每个测站最后一次转发D300数据的时间
|
||||||
private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>();
|
private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>();
|
||||||
@ -282,10 +279,9 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void sendToNtrip(String deviceId, String hexData) {
|
private void sendToNtrip(String deviceId, String hexData) {
|
||||||
// 检查设备是否启用TCP转发
|
// 首先检查设备是否启用TCP转发
|
||||||
if (!rtcmPortConfigManager.isDeviceEnabled(deviceId)) {
|
if (!deviceTcpPortManager.isDeviceEnabled(deviceId)) {
|
||||||
logger.debug("Device {} not enabled for TCP forwarding", deviceId);
|
return; // 如果设备未启用,直接返回
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -296,6 +292,11 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
try {
|
try {
|
||||||
byte[] data = ByteUtil.hexStringTobyte(rtcm);
|
byte[] data = ByteUtil.hexStringTobyte(rtcm);
|
||||||
deviceTcpPortManager.sendData(deviceId, data);
|
deviceTcpPortManager.sendData(deviceId, data);
|
||||||
|
// 获取端口信息用于日志记录
|
||||||
|
int port = deviceTcpPortManager.getOrCreatePort(deviceId);
|
||||||
|
if (port > 0) { // 只有当端口创建成功时才记录日志
|
||||||
|
logger.debug("Forwarded RTCM data for device {} to TCP port {}", deviceId, port);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage());
|
logger.error("Error forwarding RTCM data for device {}: {}", deviceId, e.getMessage());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,39 +2,118 @@ package com.imdroid.sideslope.server.tcp;
|
|||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import javax.annotation.PostConstruct;
|
||||||
import java.util.Map;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class DeviceTcpPortManager {
|
public class DeviceTcpPortManager {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DeviceTcpPortManager.class);
|
private static final Logger logger = LoggerFactory.getLogger(DeviceTcpPortManager.class);
|
||||||
|
|
||||||
|
@Value("${rtcm.port.start:20000}")
|
||||||
|
private int startPort;
|
||||||
|
|
||||||
|
@Value("${rtcm.port.end:21000}")
|
||||||
|
private int endPort;
|
||||||
|
|
||||||
|
@Value("${rtcm.forward.device.ids:}")
|
||||||
|
private String forwardDeviceIds;
|
||||||
|
|
||||||
private final Map<String, Integer> devicePortMap = new ConcurrentHashMap<>();
|
private final Map<String, Integer> devicePortMap = new ConcurrentHashMap<>();
|
||||||
private final Map<Integer, RtcmForwardServer> portServerMap = new ConcurrentHashMap<>();
|
private final Map<Integer, RtcmForwardServer> portServerMap = new ConcurrentHashMap<>();
|
||||||
|
private Set<String> enabledDevices;
|
||||||
|
private int currentPort;
|
||||||
|
|
||||||
public synchronized void addDevice(String deviceId, int port) {
|
public DeviceTcpPortManager() {
|
||||||
if (port <= 0) {
|
this.currentPort = startPort;
|
||||||
logger.error("Invalid port {} for device {}", port, deviceId);
|
}
|
||||||
return;
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
// 初始化启用TCP转发的设备ID集合
|
||||||
|
enabledDevices = new HashSet<>();
|
||||||
|
if (forwardDeviceIds != null && !forwardDeviceIds.trim().isEmpty()) {
|
||||||
|
String[] ids = forwardDeviceIds.split(",");
|
||||||
|
enabledDevices.addAll(Arrays.asList(ids));
|
||||||
|
logger.info("Enabled TCP forward for devices: {}", enabledDevices);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized int getOrCreatePort(String deviceId) {
|
||||||
|
// 检查设备是否在启用列表中
|
||||||
|
if (!enabledDevices.contains(deviceId)) {
|
||||||
|
return -1; // 返回-1表示该设备未启用TCP转发
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查端口是否已被其他设备使用
|
return devicePortMap.computeIfAbsent(deviceId, id -> {
|
||||||
if (portServerMap.containsKey(port)) {
|
// 尝试获取配置的固定端口
|
||||||
logger.error("Port {} is already in use", port);
|
int configuredPort = getConfiguredPort(deviceId);
|
||||||
return;
|
if (configuredPort > 0) {
|
||||||
}
|
try {
|
||||||
|
if (!portServerMap.containsKey(configuredPort)) {
|
||||||
|
RtcmForwardServer server = new RtcmForwardServer(configuredPort);
|
||||||
|
server.start();
|
||||||
|
portServerMap.put(configuredPort, server);
|
||||||
|
logger.info("Created new TCP forward server for device {} on configured port {}", deviceId, configuredPort);
|
||||||
|
return configuredPort;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to create TCP server on configured port {} for device {}", configuredPort, deviceId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果没有配置固定端口或固定端口创建失败,使用动态端口
|
||||||
|
while (currentPort <= endPort) {
|
||||||
|
if (!portServerMap.containsKey(currentPort)) {
|
||||||
|
try {
|
||||||
|
RtcmForwardServer server = new RtcmForwardServer(currentPort);
|
||||||
|
server.start();
|
||||||
|
portServerMap.put(currentPort, server);
|
||||||
|
logger.info("Created new TCP forward server for device {} on port {}", deviceId, currentPort);
|
||||||
|
return currentPort++;
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to create TCP server on port {}, trying next port", currentPort, e);
|
||||||
|
currentPort++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
currentPort++;
|
||||||
|
}
|
||||||
|
throw new RuntimeException("No available ports");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getConfiguredPort(String deviceId) {
|
||||||
try {
|
try {
|
||||||
RtcmForwardServer server = new RtcmForwardServer(port);
|
// 从Spring环境中获取设备特定的端口配置
|
||||||
server.start();
|
String portValue = System.getProperty("rtcm.device." + deviceId + ".port");
|
||||||
portServerMap.put(port, server);
|
if (portValue != null && !portValue.trim().isEmpty()) {
|
||||||
devicePortMap.put(deviceId, port);
|
int port = Integer.parseInt(portValue);
|
||||||
logger.info("Created new TCP forward server for device {} on port {}", deviceId, port);
|
if (port >= startPort && port <= endPort) {
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Failed to create TCP server for device {} on port {}", deviceId, port, e);
|
logger.warn("Failed to get configured port for device {}", deviceId, e);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendData(String deviceId, byte[] data) {
|
||||||
|
// 只处理启用的设备
|
||||||
|
if (!enabledDevices.contains(deviceId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer port = devicePortMap.get(deviceId);
|
||||||
|
if (port != null) {
|
||||||
|
RtcmForwardServer server = portServerMap.get(port);
|
||||||
|
if (server != null) {
|
||||||
|
server.broadcast(data);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,19 +128,8 @@ public class DeviceTcpPortManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendData(String deviceId, byte[] data) {
|
public boolean isDeviceEnabled(String deviceId) {
|
||||||
Integer port = devicePortMap.get(deviceId);
|
return enabledDevices.contains(deviceId);
|
||||||
if (port != null) {
|
|
||||||
RtcmForwardServer server = portServerMap.get(port);
|
|
||||||
if (server != null) {
|
|
||||||
server.broadcast(data);
|
|
||||||
logger.debug("Forwarded {} bytes of data for device {} to port {}", data.length, deviceId, port);
|
|
||||||
} else {
|
|
||||||
logger.warn("Server not found for device {} on port {}", deviceId, port);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.debug("No port mapping found for device {}", deviceId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Integer> getActiveDevicePorts() {
|
public Map<String, Integer> getActiveDevicePorts() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user