Compare commits

...

1 Commits

Author SHA1 Message Date
01427e230e feat: rtksrv v1
# Conflicts:
#	sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/Ntrip2Channels.java
2025-07-28 17:52:55 +08:00
2 changed files with 191 additions and 2 deletions

View File

@ -3,8 +3,12 @@ package com.imdroid.ntripproxy.service;
public class Ntrip2Channels {
final private String localHost="127.0.0.1";
final private int localPort=9903;
final private String remoteHost="47.107.50.52";
final private int remotePort=9903;
// 将远程主机改为本地端口改为12000
final private String remoteHost="127.0.0.1";
//final private String remoteHost="100.91.37.6";
//final private String remoteHost="47.107.50.52";
//final private String remoteHost="8.134.185.53";
final private int remotePort=12000;
public static final Ntrip2Channels INSTANCE = new Ntrip2Channels();

View File

@ -0,0 +1,185 @@
package com.imdroid.ntripproxy.service;
import com.imdroid.common.util.DataTypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* RTCM数据过滤服务器
* 监听UDP 12000端口接收RTCM数据
* 开启TCP 12002端口转发指定deviceId的RTCM数据
*/
@Component
public class RtcmFilterServer implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(RtcmFilterServer.class);
// 配置参数
private static final int UDP_PORT = 12000;
private static final int TCP_PORT = 12002;
private static final String TARGET_DEVICE_ID = "3530795";
private static final int BUFFER_SIZE = 4096;
// TCP客户端连接列表
private final List<SocketChannel> tcpClients = new CopyOnWriteArrayList<>();
// 线程池
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
@Override
public void run(ApplicationArguments args) {
// 启动UDP监听服务
executorService.submit(this::startUdpServer);
// 启动TCP服务器
executorService.submit(this::startTcpServer);
logger.info("RTCM过滤服务已启动 - UDP监听端口:{}, TCP服务端口:{}, 目标设备ID:{}",
UDP_PORT, TCP_PORT, TARGET_DEVICE_ID);
}
/**
* 启动UDP服务器监听12000端口
*/
private void startUdpServer() {
try (DatagramSocket socket = new DatagramSocket(UDP_PORT)) {
logger.info("UDP服务已启动监听端口: {}", UDP_PORT);
byte[] buffer = new byte[BUFFER_SIZE];
while (true) {
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
// 解析数据包
byte[] data = new byte[packet.getLength()];
System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
// 提取设备ID
String deviceId = extractDeviceId(data);
// 如果是目标设备ID则转发到TCP客户端
if (TARGET_DEVICE_ID.equals(deviceId)) {
if (logger.isDebugEnabled()) {
logger.debug("接收到目标设备 {} 的RTCM数据长度: {}", deviceId, data.length);
}
forwardToTcpClients(data);
}
}
} catch (IOException e) {
logger.error("UDP服务异常: {}", e.getMessage(), e);
}
}
/**
* 启动TCP服务器监听12002端口
*/
private void startTcpServer() {
try {
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(TCP_PORT));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
logger.info("TCP服务已启动监听端口: {}", TCP_PORT);
while (true) {
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
acceptConnection(selector, serverChannel);
}
}
}
} catch (IOException e) {
logger.error("TCP服务异常: {}", e.getMessage(), e);
}
}
/**
* 接受新的TCP客户端连接
*/
private void acceptConnection(Selector selector, ServerSocketChannel serverChannel) throws IOException {
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
tcpClients.add(clientChannel);
logger.info("新的TCP客户端已连接: {}", clientChannel.getRemoteAddress());
}
/**
* 从数据包中提取设备ID
*/
private String extractDeviceId(byte[] data) {
// 按照NtripMessage格式解析设备ID从第4字节开始4字节无符号整型
if (data.length >= 8) {
ByteBuffer buffer = ByteBuffer.wrap(data, 4, 4);
long deviceId = buffer.getInt() & 0xFFFFFFFFL; // 转为无符号整型
return String.valueOf(deviceId);
}
return "";
}
/**
* 将数据转发到所有TCP客户端
*/
private void forwardToTcpClients(byte[] data) {
if (tcpClients.isEmpty()) {
return;
}
List<SocketChannel> disconnectedClients = new ArrayList<>();
for (SocketChannel client : tcpClients) {
try {
if (client.isOpen()) {
ByteBuffer buffer = ByteBuffer.wrap(data);
client.write(buffer);
} else {
disconnectedClients.add(client);
}
} catch (IOException e) {
logger.warn("向TCP客户端发送数据失败: {}", e.getMessage());
disconnectedClients.add(client);
}
}
// 移除断开连接的客户端
for (SocketChannel client : disconnectedClients) {
tcpClients.remove(client);
try {
client.close();
logger.info("已关闭断开连接的TCP客户端");
} catch (IOException e) {
logger.error("关闭TCP客户端异常: {}", e.getMessage());
}
}
}
}