feat: rtksrv v1
This commit is contained in:
parent
b4d81245c5
commit
274311a0f8
@ -3,6 +3,12 @@ package com.imdroid.ntripproxy.service;
|
||||
public class Ntrip2Channels {
|
||||
final private String localHost="127.0.0.1";
|
||||
final private int localPort=9903;
|
||||
// 将远程主机改为本地,端口改为12000
|
||||
final private String remoteHost="127.0.0.1";
|
||||
//final private String remoteHost="100.91.37.6";
|
||||
//final private String remoteHost="47.107.50.52";
|
||||
//final private String remoteHost="8.134.185.53";
|
||||
final private int remotePort=12000;
|
||||
|
||||
public static final Ntrip2Channels INSTANCE = new Ntrip2Channels();
|
||||
|
||||
@ -10,10 +16,13 @@ public class Ntrip2Channels {
|
||||
UDPClient remoteRtcm;
|
||||
private Ntrip2Channels() {
|
||||
localRtcm = new UDPClient();
|
||||
remoteRtcm = new UDPClient();
|
||||
localRtcm.init(localHost, localPort);
|
||||
remoteRtcm.init(remoteHost, remotePort);
|
||||
}
|
||||
|
||||
public void send(byte[] data) {
|
||||
localRtcm.send(data);
|
||||
remoteRtcm.send(data);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,185 @@
|
||||
package com.imdroid.ntripproxy.service;
|
||||
|
||||
import com.imdroid.common.util.DataTypeUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* RTCM数据过滤服务器
|
||||
* 监听UDP 12000端口,接收RTCM数据
|
||||
* 开启TCP 12002端口,转发指定deviceId的RTCM数据
|
||||
*/
|
||||
@Component
|
||||
public class RtcmFilterServer implements ApplicationRunner {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RtcmFilterServer.class);
|
||||
|
||||
// 配置参数
|
||||
private static final int UDP_PORT = 12000;
|
||||
private static final int TCP_PORT = 12002;
|
||||
private static final String TARGET_DEVICE_ID = "3530795";
|
||||
private static final int BUFFER_SIZE = 4096;
|
||||
|
||||
// TCP客户端连接列表
|
||||
private final List<SocketChannel> tcpClients = new CopyOnWriteArrayList<>();
|
||||
|
||||
// 线程池
|
||||
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
// 启动UDP监听服务
|
||||
executorService.submit(this::startUdpServer);
|
||||
|
||||
// 启动TCP服务器
|
||||
executorService.submit(this::startTcpServer);
|
||||
|
||||
logger.info("RTCM过滤服务已启动 - UDP监听端口:{}, TCP服务端口:{}, 目标设备ID:{}",
|
||||
UDP_PORT, TCP_PORT, TARGET_DEVICE_ID);
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动UDP服务器监听12000端口
|
||||
*/
|
||||
private void startUdpServer() {
|
||||
try (DatagramSocket socket = new DatagramSocket(UDP_PORT)) {
|
||||
logger.info("UDP服务已启动,监听端口: {}", UDP_PORT);
|
||||
byte[] buffer = new byte[BUFFER_SIZE];
|
||||
|
||||
while (true) {
|
||||
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
||||
socket.receive(packet);
|
||||
|
||||
// 解析数据包
|
||||
byte[] data = new byte[packet.getLength()];
|
||||
System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
|
||||
|
||||
// 提取设备ID
|
||||
String deviceId = extractDeviceId(data);
|
||||
|
||||
// 如果是目标设备ID,则转发到TCP客户端
|
||||
if (TARGET_DEVICE_ID.equals(deviceId)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("接收到目标设备 {} 的RTCM数据,长度: {}", deviceId, data.length);
|
||||
}
|
||||
forwardToTcpClients(data);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("UDP服务异常: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动TCP服务器监听12002端口
|
||||
*/
|
||||
private void startTcpServer() {
|
||||
try {
|
||||
Selector selector = Selector.open();
|
||||
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||
serverChannel.configureBlocking(false);
|
||||
serverChannel.socket().bind(new InetSocketAddress(TCP_PORT));
|
||||
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||
|
||||
logger.info("TCP服务已启动,监听端口: {}", TCP_PORT);
|
||||
|
||||
while (true) {
|
||||
selector.select();
|
||||
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
|
||||
|
||||
while (keys.hasNext()) {
|
||||
SelectionKey key = keys.next();
|
||||
keys.remove();
|
||||
|
||||
if (!key.isValid()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (key.isAcceptable()) {
|
||||
acceptConnection(selector, serverChannel);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("TCP服务异常: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 接受新的TCP客户端连接
|
||||
*/
|
||||
private void acceptConnection(Selector selector, ServerSocketChannel serverChannel) throws IOException {
|
||||
SocketChannel clientChannel = serverChannel.accept();
|
||||
clientChannel.configureBlocking(false);
|
||||
clientChannel.register(selector, SelectionKey.OP_READ);
|
||||
|
||||
tcpClients.add(clientChannel);
|
||||
logger.info("新的TCP客户端已连接: {}", clientChannel.getRemoteAddress());
|
||||
}
|
||||
|
||||
/**
|
||||
* 从数据包中提取设备ID
|
||||
*/
|
||||
private String extractDeviceId(byte[] data) {
|
||||
// 按照NtripMessage格式解析,设备ID从第4字节开始,4字节无符号整型
|
||||
if (data.length >= 8) {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(data, 4, 4);
|
||||
long deviceId = buffer.getInt() & 0xFFFFFFFFL; // 转为无符号整型
|
||||
return String.valueOf(deviceId);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
/**
|
||||
* 将数据转发到所有TCP客户端
|
||||
*/
|
||||
private void forwardToTcpClients(byte[] data) {
|
||||
if (tcpClients.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<SocketChannel> disconnectedClients = new ArrayList<>();
|
||||
|
||||
for (SocketChannel client : tcpClients) {
|
||||
try {
|
||||
if (client.isOpen()) {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(data);
|
||||
client.write(buffer);
|
||||
} else {
|
||||
disconnectedClients.add(client);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("向TCP客户端发送数据失败: {}", e.getMessage());
|
||||
disconnectedClients.add(client);
|
||||
}
|
||||
}
|
||||
|
||||
// 移除断开连接的客户端
|
||||
for (SocketChannel client : disconnectedClients) {
|
||||
tcpClients.remove(client);
|
||||
try {
|
||||
client.close();
|
||||
logger.info("已关闭断开连接的TCP客户端");
|
||||
} catch (IOException e) {
|
||||
logger.error("关闭TCP客户端异常: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user