diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java index 732bf37e..8ebc3f05 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java @@ -3,6 +3,12 @@ package com.imdroid.ntripproxy.service; public class Ntrip2Channels { final private String localHost="127.0.0.1"; final private int localPort=9903; + // 将远程主机改为本地,端口改为12000 + final private String remoteHost="127.0.0.1"; + //final private String remoteHost="100.91.37.6"; + //final private String remoteHost="47.107.50.52"; + //final private String remoteHost="8.134.185.53"; + final private int remotePort=12000; public static final Ntrip2Channels INSTANCE = new Ntrip2Channels(); @@ -10,10 +16,13 @@ public class Ntrip2Channels { UDPClient remoteRtcm; private Ntrip2Channels() { localRtcm = new UDPClient(); + remoteRtcm = new UDPClient(); localRtcm.init(localHost, localPort); + remoteRtcm.init(remoteHost, remotePort); } public void send(byte[] data) { localRtcm.send(data); + remoteRtcm.send(data); } } diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmFilterServer.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmFilterServer.java new file mode 100644 index 00000000..2cf8d696 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmFilterServer.java @@ -0,0 +1,185 @@ +package com.imdroid.ntripproxy.service; + +import com.imdroid.common.util.DataTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * RTCM数据过滤服务器 + * 监听UDP 12000端口,接收RTCM数据 + * 开启TCP 12002端口,转发指定deviceId的RTCM数据 + */ +@Component +public class RtcmFilterServer implements ApplicationRunner { + + private static final Logger logger = LoggerFactory.getLogger(RtcmFilterServer.class); + + // 配置参数 + private static final int UDP_PORT = 12000; + private static final int TCP_PORT = 12002; + private static final String TARGET_DEVICE_ID = "3530795"; + private static final int BUFFER_SIZE = 4096; + + // TCP客户端连接列表 + private final List tcpClients = new CopyOnWriteArrayList<>(); + + // 线程池 + private final ExecutorService executorService = Executors.newFixedThreadPool(2); + + @Override + public void run(ApplicationArguments args) { + // 启动UDP监听服务 + executorService.submit(this::startUdpServer); + + // 启动TCP服务器 + executorService.submit(this::startTcpServer); + + logger.info("RTCM过滤服务已启动 - UDP监听端口:{}, TCP服务端口:{}, 目标设备ID:{}", + UDP_PORT, TCP_PORT, TARGET_DEVICE_ID); + } + + /** + * 启动UDP服务器监听12000端口 + */ + private void startUdpServer() { + try (DatagramSocket socket = new DatagramSocket(UDP_PORT)) { + logger.info("UDP服务已启动,监听端口: {}", UDP_PORT); + byte[] buffer = new byte[BUFFER_SIZE]; + + while (true) { + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + socket.receive(packet); + + // 解析数据包 + byte[] data = new byte[packet.getLength()]; + System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength()); + + // 提取设备ID + String deviceId = extractDeviceId(data); + + // 如果是目标设备ID,则转发到TCP客户端 + if (TARGET_DEVICE_ID.equals(deviceId)) { + if (logger.isDebugEnabled()) { + logger.debug("接收到目标设备 {} 的RTCM数据,长度: {}", deviceId, data.length); + } + forwardToTcpClients(data); + } + } + } catch (IOException e) { + logger.error("UDP服务异常: {}", e.getMessage(), e); + } + } + + /** + * 启动TCP服务器监听12002端口 + */ + private void startTcpServer() { + try { + Selector selector = Selector.open(); + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + serverChannel.socket().bind(new InetSocketAddress(TCP_PORT)); + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + + logger.info("TCP服务已启动,监听端口: {}", TCP_PORT); + + while (true) { + selector.select(); + Iterator keys = selector.selectedKeys().iterator(); + + while (keys.hasNext()) { + SelectionKey key = keys.next(); + keys.remove(); + + if (!key.isValid()) { + continue; + } + + if (key.isAcceptable()) { + acceptConnection(selector, serverChannel); + } + } + } + } catch (IOException e) { + logger.error("TCP服务异常: {}", e.getMessage(), e); + } + } + + /** + * 接受新的TCP客户端连接 + */ + private void acceptConnection(Selector selector, ServerSocketChannel serverChannel) throws IOException { + SocketChannel clientChannel = serverChannel.accept(); + clientChannel.configureBlocking(false); + clientChannel.register(selector, SelectionKey.OP_READ); + + tcpClients.add(clientChannel); + logger.info("新的TCP客户端已连接: {}", clientChannel.getRemoteAddress()); + } + + /** + * 从数据包中提取设备ID + */ + private String extractDeviceId(byte[] data) { + // 按照NtripMessage格式解析,设备ID从第4字节开始,4字节无符号整型 + if (data.length >= 8) { + ByteBuffer buffer = ByteBuffer.wrap(data, 4, 4); + long deviceId = buffer.getInt() & 0xFFFFFFFFL; // 转为无符号整型 + return String.valueOf(deviceId); + } + return ""; + } + + /** + * 将数据转发到所有TCP客户端 + */ + private void forwardToTcpClients(byte[] data) { + if (tcpClients.isEmpty()) { + return; + } + + List disconnectedClients = new ArrayList<>(); + + for (SocketChannel client : tcpClients) { + try { + if (client.isOpen()) { + ByteBuffer buffer = ByteBuffer.wrap(data); + client.write(buffer); + } else { + disconnectedClients.add(client); + } + } catch (IOException e) { + logger.warn("向TCP客户端发送数据失败: {}", e.getMessage()); + disconnectedClients.add(client); + } + } + + // 移除断开连接的客户端 + for (SocketChannel client : disconnectedClients) { + tcpClients.remove(client); + try { + client.close(); + logger.info("已关闭断开连接的TCP客户端"); + } catch (IOException e) { + logger.error("关闭TCP客户端异常: {}", e.getMessage()); + } + } + } +} \ No newline at end of file