From 2d159953842abe29b9a0b228f5d18dc4189c27b9 Mon Sep 17 00:00:00 2001 From: yarnom Date: Mon, 28 Jul 2025 18:22:21 +0800 Subject: [PATCH] feat: rtksrv v1 test --- .../controller/DeviceController.java | 173 +++++++++++++ .../ntripproxy/service/DeviceManager.java | 180 ++++++++++++++ .../ntripproxy/service/RtcmClient.java | 8 +- .../ntripproxy/service/RtcmFilterServer.java | 233 ++++++++++++++++++ 4 files changed, 589 insertions(+), 5 deletions(-) create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/controller/DeviceController.java create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/DeviceManager.java create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmFilterServer.java diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/controller/DeviceController.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/controller/DeviceController.java new file mode 100644 index 00000000..17607e34 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/controller/DeviceController.java @@ -0,0 +1,173 @@ +package com.imdroid.ntripproxy.controller; + +import com.imdroid.ntripproxy.service.DeviceManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 设备管理命令行控制器 + * 提供命令行接口管理设备描述 + */ +@Component +public class DeviceController implements ApplicationRunner { + + private static final Logger logger = LoggerFactory.getLogger(DeviceController.class); + + @Autowired + private DeviceManager deviceManager; + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Override + public void run(ApplicationArguments args) { + // 启动命令行交互线程 + executor.submit(this::startCommandLineInterface); + + logger.info("设备管理命令行控制器已启动,输入 'help' 查看可用命令"); + } + + /** + * 启动命令行交互界面 + */ + private void startCommandLineInterface() { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) { + logger.info("设备管理命令行已启动,输入 'help' 查看帮助"); + + String line; + while ((line = reader.readLine()) != null) { + if (line.trim().isEmpty()) { + continue; + } + + String[] parts = line.trim().split("\\s+", 2); + String command = parts[0].toLowerCase(); + + try { + switch (command) { + case "help": + printHelp(); + break; + case "list": + listDevices(); + break; + case "desc": + if (parts.length < 2) { + System.out.println("错误: 缺少参数,格式: desc "); + break; + } + String[] descParts = parts[1].split("\\s+", 2); + if (descParts.length < 2) { + System.out.println("错误: 缺少描述,格式: desc "); + break; + } + setDeviceDescription(descParts[0], descParts[1]); + break; + case "get": + if (parts.length < 2) { + System.out.println("错误: 缺少设备ID,格式: get "); + break; + } + getDeviceInfo(parts[1]); + break; + case "exit": + System.out.println("退出命令行界面"); + return; + default: + System.out.println("未知命令: " + command); + printHelp(); + break; + } + } catch (Exception e) { + System.out.println("命令执行出错: " + e.getMessage()); + } + } + } catch (Exception e) { + logger.error("命令行界面异常: {}", e.getMessage(), e); + } + } + + /** + * 打印帮助信息 + */ + private void printHelp() { + System.out.println("可用命令:"); + System.out.println(" help - 显示帮助信息"); + System.out.println(" list - 列出所有设备"); + System.out.println(" desc - 设置设备描述"); + System.out.println(" get - 获取设备信息"); + System.out.println(" exit - 退出命令行界面"); + } + + /** + * 列出所有设备 + */ + private void listDevices() { + Map devicePorts = deviceManager.getAllDevicePorts(); + + if (devicePorts.isEmpty()) { + System.out.println("当前没有注册的设备"); + return; + } + + System.out.println("设备列表:"); + System.out.println("------------------------------------------------------"); + System.out.printf("%-15s %-10s %-30s%n", "设备ID", "端口", "描述"); + System.out.println("------------------------------------------------------"); + + for (Map.Entry entry : devicePorts.entrySet()) { + String deviceId = entry.getKey(); + int port = entry.getValue(); + String description = deviceManager.getDeviceDescription(deviceId); + + System.out.printf("%-15s %-10d %-30s%n", deviceId, port, description); + } + + System.out.println("------------------------------------------------------"); + System.out.println("共 " + devicePorts.size() + " 个设备"); + } + + /** + * 设置设备描述 + */ + private void setDeviceDescription(String deviceId, String description) { + int port = deviceManager.getDevicePort(deviceId); + + if (port <= 0) { + System.out.println("警告: 设备 " + deviceId + " 尚未注册或分配端口"); + System.out.println("描述将被保存,但只有当设备连接后才会显示在设备列表中"); + } + + deviceManager.registerDeviceDescription(deviceId, description); + System.out.println("已为设备 " + deviceId + " 设置描述: " + description); + } + + /** + * 获取设备信息 + */ + private void getDeviceInfo(String deviceId) { + int port = deviceManager.getDevicePort(deviceId); + + if (port <= 0) { + System.out.println("设备 " + deviceId + " 未找到或未分配端口"); + return; + } + + String description = deviceManager.getDeviceDescription(deviceId); + + System.out.println("设备信息:"); + System.out.println(" 设备ID: " + deviceId); + System.out.println(" 端口: " + port); + System.out.println(" 描述: " + description); + System.out.println(" 连接命令: rtkrcv -t 127.0.0.1:" + port); + } +} \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/DeviceManager.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/DeviceManager.java new file mode 100644 index 00000000..40eb0b99 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/DeviceManager.java @@ -0,0 +1,180 @@ +package com.imdroid.ntripproxy.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.io.*; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 设备管理器,管理设备端口映射并提供API查询 + */ +@Component +@RestController +@RequestMapping("/gnss/device") +public class DeviceManager { + + private static final Logger logger = LoggerFactory.getLogger(DeviceManager.class); + + // 设备端口映射 + private final Map devicePorts = new ConcurrentHashMap<>(); + + // 设备描述映射 + private final Map deviceDescriptions = new ConcurrentHashMap<>(); + + // 设备描述文件 + private static final String DEVICE_DESCRIPTIONS_FILE = "device_descriptions.properties"; + + /** + * 初始化,从文件加载设备描述 + */ + @PostConstruct + public void init() { + loadDeviceDescriptions(); + } + + /** + * 服务关闭前,保存设备描述到文件 + */ + @PreDestroy + public void destroy() { + saveDeviceDescriptions(); + } + + /** + * 从文件加载设备描述 + */ + private void loadDeviceDescriptions() { + File file = new File(DEVICE_DESCRIPTIONS_FILE); + if (!file.exists()) { + logger.info("设备描述文件不存在,将在首次保存时创建"); + return; + } + + try (InputStream input = new FileInputStream(file)) { + Properties prop = new Properties(); + prop.load(input); + + for (String key : prop.stringPropertyNames()) { + deviceDescriptions.put(key, prop.getProperty(key)); + } + + logger.info("已从文件加载 {} 个设备描述", deviceDescriptions.size()); + } catch (IOException e) { + logger.error("加载设备描述文件失败: {}", e.getMessage(), e); + } + } + + /** + * 保存设备描述到文件 + */ + public void saveDeviceDescriptions() { + try (OutputStream output = new FileOutputStream(DEVICE_DESCRIPTIONS_FILE)) { + Properties prop = new Properties(); + + for (Map.Entry entry : deviceDescriptions.entrySet()) { + prop.setProperty(entry.getKey(), entry.getValue()); + } + + prop.store(output, "Device Descriptions"); + logger.info("已保存 {} 个设备描述到文件", deviceDescriptions.size()); + } catch (IOException e) { + logger.error("保存设备描述文件失败: {}", e.getMessage(), e); + } + } + + /** + * 注册设备端口 + */ + public void registerDevicePort(String deviceId, int port) { + devicePorts.put(deviceId, port); + logger.info("注册设备端口映射: {} -> {}", deviceId, port); + } + + /** + * 注册设备描述 + */ + public void registerDeviceDescription(String deviceId, String description) { + deviceDescriptions.put(deviceId, description); + // 每次更新描述时保存文件 + saveDeviceDescriptions(); + } + + /** + * 获取设备端口 + */ + public int getDevicePort(String deviceId) { + return devicePorts.getOrDefault(deviceId, -1); + } + + /** + * 获取设备描述 + */ + public String getDeviceDescription(String deviceId) { + return deviceDescriptions.getOrDefault(deviceId, "未知设备"); + } + + /** + * 获取所有设备端口映射 + */ + public Map getAllDevicePorts() { + return new HashMap<>(devicePorts); + } + + /** + * API: 获取所有设备端口映射 + */ + @GetMapping("/ports") + public Map getDevicePorts() { + Map result = new HashMap<>(); + Map> devices = new HashMap<>(); + + for (Map.Entry entry : devicePorts.entrySet()) { + String deviceId = entry.getKey(); + int port = entry.getValue(); + String description = getDeviceDescription(deviceId); + + Map deviceInfo = new HashMap<>(); + deviceInfo.put("port", port); + deviceInfo.put("description", description); + + devices.put(deviceId, deviceInfo); + } + + result.put("devices", devices); + result.put("count", devices.size()); + + return result; + } + + /** + * API: 获取特定设备的端口 + */ + @GetMapping("/port/{deviceId}") + public Map getDevicePortById(@PathVariable String deviceId) { + Map result = new HashMap<>(); + + int port = getDevicePort(deviceId); + if (port > 0) { + result.put("deviceId", deviceId); + result.put("port", port); + result.put("description", getDeviceDescription(deviceId)); + result.put("success", true); + } else { + result.put("success", false); + result.put("message", "设备未找到或未分配端口"); + } + + return result; + } +} \ No newline at end of file diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmClient.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmClient.java index 87145e41..d8c574ed 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmClient.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmClient.java @@ -7,11 +7,9 @@ import javax.annotation.PostConstruct; @Service public class RtcmClient extends UDPClient{ - @Value("${rtcm.server.host}") - private String ntripHost; - - @Value("${rtcm.server.port}") - private int ntripPort; + // 修改为固定的本地地址和12000端口,确保RTCM数据转发到我们的筛选服务 + private String ntripHost = "127.0.0.1"; + private int ntripPort = 12000; @PostConstruct void init(){ diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmFilterServer.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmFilterServer.java new file mode 100644 index 00000000..d47fff6a --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmFilterServer.java @@ -0,0 +1,233 @@ +package com.imdroid.ntripproxy.service; + +import com.imdroid.common.util.DataTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.*; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +/** + * RTCM数据筛选服务器 + * 监听UDP 12000端口,接收RTCM数据 + * 为每个deviceId创建独立的TCP服务器,供rtkrcv连接 + */ +@Component +public class RtcmFilterServer implements ApplicationRunner { + + private static final Logger logger = LoggerFactory.getLogger(RtcmFilterServer.class); + + // 配置参数 + private static final int UDP_PORT = 12000; + private static final int BASE_TCP_PORT = 12001; // TCP端口起始号 + private static final int BUFFER_SIZE = 4096; + + // 设备TCP服务器映射 + private final Map deviceServers = new ConcurrentHashMap<>(); + + // 线程池 + private final ExecutorService executorService = Executors.newCachedThreadPool(); + + // 设备管理器 + @Autowired + private DeviceManager deviceManager; + + @Override + public void run(ApplicationArguments args) { + // 启动UDP监听服务 + executorService.submit(this::startUdpServer); + + logger.info("RTCM筛选服务已启动 - UDP监听端口:{}, TCP服务端口范围:{}-{}", + UDP_PORT, BASE_TCP_PORT, BASE_TCP_PORT + 999); + } + + /** + * 启动UDP服务器监听12000端口 + */ + private void startUdpServer() { + try (DatagramSocket socket = new DatagramSocket(UDP_PORT)) { + logger.info("UDP服务已启动,监听端口: {}", UDP_PORT); + byte[] buffer = new byte[BUFFER_SIZE]; + + while (true) { + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + socket.receive(packet); + + // 解析数据包 + byte[] data = new byte[packet.getLength()]; + System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength()); + + // 提取设备ID + String deviceId = extractDeviceId(data); + + if (logger.isDebugEnabled()) { + logger.debug("接收到设备 {} 的RTCM数据,长度: {}", deviceId, data.length); + } + + // 获取或创建设备的TCP服务器 + DeviceTcpServer server = getOrCreateTcpServer(deviceId); + + // 转发数据到TCP服务器 + server.sendData(data); + } + } catch (IOException e) { + logger.error("UDP服务异常: {}", e.getMessage(), e); + } + } + + /** + * 从数据包中提取设备ID + */ + private String extractDeviceId(byte[] data) { + // 按照NtripMessage格式解析,设备ID从第4字节开始,4字节无符号整型 + if (data.length >= 8) { + ByteBuffer buffer = ByteBuffer.wrap(data, 4, 4); + long deviceId = buffer.getInt() & 0xFFFFFFFFL; // 转为无符号整型 + return String.valueOf(deviceId); + } + return "unknown"; + } + + /** + * 获取或创建设备的TCP服务器 + */ + private DeviceTcpServer getOrCreateTcpServer(String deviceId) { + return deviceServers.computeIfAbsent(deviceId, id -> { + int port = allocatePort(id); + DeviceTcpServer server = new DeviceTcpServer(id, port); + server.start(); + + // 注册设备端口到设备管理器 + deviceManager.registerDevicePort(id, port); + + return server; + }); + } + + /** + * 为设备分配TCP端口 + */ + private int allocatePort(String deviceId) { + // 特殊设备ID处理,如果是3530795,则固定使用12002端口 + if ("3530795".equals(deviceId)) { + return 12002; + } + + // 其他设备使用哈希分配端口 + return BASE_TCP_PORT + (Math.abs(deviceId.hashCode()) % 1000); + } + + /** + * 设备TCP服务器,为每个设备创建独立的TCP服务器 + */ + private class DeviceTcpServer { + private final String deviceId; + private final int port; + private ServerSocket serverSocket; + private final List clients = new CopyOnWriteArrayList<>(); + private boolean running = false; + + public DeviceTcpServer(String deviceId, int port) { + this.deviceId = deviceId; + this.port = port; + } + + public void start() { + if (running) { + return; + } + + running = true; + executorService.submit(this::runServer); + } + + private void runServer() { + try { + serverSocket = new ServerSocket(port); + logger.info("设备 {} 的TCP服务器已启动,端口: {}", deviceId, port); + + // 接受客户端连接 + while (running && !Thread.currentThread().isInterrupted()) { + try { + Socket client = serverSocket.accept(); + clients.add(client); + logger.info("设备 {} 的新客户端已连接: {}", deviceId, client.getRemoteSocketAddress()); + } catch (IOException e) { + if (!serverSocket.isClosed()) { + logger.error("接受客户端连接失败: {}", e.getMessage()); + } + } + } + } catch (IOException e) { + logger.error("设备 {} 的TCP服务器启动失败: {}", deviceId, e.getMessage()); + } + } + + public void sendData(byte[] data) { + if (clients.isEmpty()) { + return; + } + + List disconnectedClients = new ArrayList<>(); + + for (Socket client : clients) { + try { + if (client.isConnected() && !client.isClosed()) { + OutputStream out = client.getOutputStream(); + out.write(data); + out.flush(); + } else { + disconnectedClients.add(client); + } + } catch (IOException e) { + logger.warn("向设备 {} 的TCP客户端发送数据失败: {}", deviceId, e.getMessage()); + disconnectedClients.add(client); + } + } + + // 移除断开连接的客户端 + for (Socket client : disconnectedClients) { + clients.remove(client); + try { + client.close(); + logger.info("已关闭设备 {} 的断开连接的TCP客户端", deviceId); + } catch (IOException e) { + logger.error("关闭TCP客户端异常: {}", e.getMessage()); + } + } + } + + public void stop() { + running = false; + + // 关闭所有客户端连接 + for (Socket client : clients) { + try { + client.close(); + } catch (IOException e) { + logger.error("关闭客户端连接失败: {}", e.getMessage()); + } + } + clients.clear(); + + // 关闭服务器 + if (serverSocket != null && !serverSocket.isClosed()) { + try { + serverSocket.close(); + } catch (IOException e) { + logger.error("关闭TCP服务器失败: {}", e.getMessage()); + } + } + } + } +} \ No newline at end of file