Compare commits
1 Commits
feature/rt
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
| 01427e230e |
@ -3,8 +3,12 @@ package com.imdroid.ntripproxy.service;
|
|||||||
public class Ntrip2Channels {
|
public class Ntrip2Channels {
|
||||||
final private String localHost="127.0.0.1";
|
final private String localHost="127.0.0.1";
|
||||||
final private int localPort=9903;
|
final private int localPort=9903;
|
||||||
final private String remoteHost="47.107.50.52";
|
// 将远程主机改为本地,端口改为12000
|
||||||
final private int remotePort=9903;
|
final private String remoteHost="127.0.0.1";
|
||||||
|
//final private String remoteHost="100.91.37.6";
|
||||||
|
//final private String remoteHost="47.107.50.52";
|
||||||
|
//final private String remoteHost="8.134.185.53";
|
||||||
|
final private int remotePort=12000;
|
||||||
|
|
||||||
public static final Ntrip2Channels INSTANCE = new Ntrip2Channels();
|
public static final Ntrip2Channels INSTANCE = new Ntrip2Channels();
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,185 @@
|
|||||||
|
package com.imdroid.ntripproxy.service;
|
||||||
|
|
||||||
|
import com.imdroid.common.util.DataTypeUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.boot.ApplicationArguments;
|
||||||
|
import org.springframework.boot.ApplicationRunner;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.*;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.nio.channels.Selector;
|
||||||
|
import java.nio.channels.ServerSocketChannel;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RTCM数据过滤服务器
|
||||||
|
* 监听UDP 12000端口,接收RTCM数据
|
||||||
|
* 开启TCP 12002端口,转发指定deviceId的RTCM数据
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class RtcmFilterServer implements ApplicationRunner {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(RtcmFilterServer.class);
|
||||||
|
|
||||||
|
// 配置参数
|
||||||
|
private static final int UDP_PORT = 12000;
|
||||||
|
private static final int TCP_PORT = 12002;
|
||||||
|
private static final String TARGET_DEVICE_ID = "3530795";
|
||||||
|
private static final int BUFFER_SIZE = 4096;
|
||||||
|
|
||||||
|
// TCP客户端连接列表
|
||||||
|
private final List<SocketChannel> tcpClients = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
// 线程池
|
||||||
|
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(ApplicationArguments args) {
|
||||||
|
// 启动UDP监听服务
|
||||||
|
executorService.submit(this::startUdpServer);
|
||||||
|
|
||||||
|
// 启动TCP服务器
|
||||||
|
executorService.submit(this::startTcpServer);
|
||||||
|
|
||||||
|
logger.info("RTCM过滤服务已启动 - UDP监听端口:{}, TCP服务端口:{}, 目标设备ID:{}",
|
||||||
|
UDP_PORT, TCP_PORT, TARGET_DEVICE_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启动UDP服务器监听12000端口
|
||||||
|
*/
|
||||||
|
private void startUdpServer() {
|
||||||
|
try (DatagramSocket socket = new DatagramSocket(UDP_PORT)) {
|
||||||
|
logger.info("UDP服务已启动,监听端口: {}", UDP_PORT);
|
||||||
|
byte[] buffer = new byte[BUFFER_SIZE];
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
||||||
|
socket.receive(packet);
|
||||||
|
|
||||||
|
// 解析数据包
|
||||||
|
byte[] data = new byte[packet.getLength()];
|
||||||
|
System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
|
||||||
|
|
||||||
|
// 提取设备ID
|
||||||
|
String deviceId = extractDeviceId(data);
|
||||||
|
|
||||||
|
// 如果是目标设备ID,则转发到TCP客户端
|
||||||
|
if (TARGET_DEVICE_ID.equals(deviceId)) {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("接收到目标设备 {} 的RTCM数据,长度: {}", deviceId, data.length);
|
||||||
|
}
|
||||||
|
forwardToTcpClients(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("UDP服务异常: {}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启动TCP服务器监听12002端口
|
||||||
|
*/
|
||||||
|
private void startTcpServer() {
|
||||||
|
try {
|
||||||
|
Selector selector = Selector.open();
|
||||||
|
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||||
|
serverChannel.configureBlocking(false);
|
||||||
|
serverChannel.socket().bind(new InetSocketAddress(TCP_PORT));
|
||||||
|
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||||
|
|
||||||
|
logger.info("TCP服务已启动,监听端口: {}", TCP_PORT);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
selector.select();
|
||||||
|
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
|
||||||
|
|
||||||
|
while (keys.hasNext()) {
|
||||||
|
SelectionKey key = keys.next();
|
||||||
|
keys.remove();
|
||||||
|
|
||||||
|
if (!key.isValid()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (key.isAcceptable()) {
|
||||||
|
acceptConnection(selector, serverChannel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("TCP服务异常: {}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 接受新的TCP客户端连接
|
||||||
|
*/
|
||||||
|
private void acceptConnection(Selector selector, ServerSocketChannel serverChannel) throws IOException {
|
||||||
|
SocketChannel clientChannel = serverChannel.accept();
|
||||||
|
clientChannel.configureBlocking(false);
|
||||||
|
clientChannel.register(selector, SelectionKey.OP_READ);
|
||||||
|
|
||||||
|
tcpClients.add(clientChannel);
|
||||||
|
logger.info("新的TCP客户端已连接: {}", clientChannel.getRemoteAddress());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从数据包中提取设备ID
|
||||||
|
*/
|
||||||
|
private String extractDeviceId(byte[] data) {
|
||||||
|
// 按照NtripMessage格式解析,设备ID从第4字节开始,4字节无符号整型
|
||||||
|
if (data.length >= 8) {
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(data, 4, 4);
|
||||||
|
long deviceId = buffer.getInt() & 0xFFFFFFFFL; // 转为无符号整型
|
||||||
|
return String.valueOf(deviceId);
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将数据转发到所有TCP客户端
|
||||||
|
*/
|
||||||
|
private void forwardToTcpClients(byte[] data) {
|
||||||
|
if (tcpClients.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SocketChannel> disconnectedClients = new ArrayList<>();
|
||||||
|
|
||||||
|
for (SocketChannel client : tcpClients) {
|
||||||
|
try {
|
||||||
|
if (client.isOpen()) {
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(data);
|
||||||
|
client.write(buffer);
|
||||||
|
} else {
|
||||||
|
disconnectedClients.add(client);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("向TCP客户端发送数据失败: {}", e.getMessage());
|
||||||
|
disconnectedClients.add(client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 移除断开连接的客户端
|
||||||
|
for (SocketChannel client : disconnectedClients) {
|
||||||
|
tcpClients.remove(client);
|
||||||
|
try {
|
||||||
|
client.close();
|
||||||
|
logger.info("已关闭断开连接的TCP客户端");
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("关闭TCP客户端异常: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user