Compare commits

...

5 Commits

Author SHA1 Message Date
274311a0f8 feat: rtksrv v1 2025-07-28 17:45:12 +08:00
b4d81245c5 feat: tcpcli v4 2025-07-28 17:20:26 +08:00
a403f8a3b8 feat: tcpcli v3 2025-07-28 17:06:41 +08:00
625019c503 feat: tcpcli v2 2025-07-28 17:02:02 +08:00
6e639b7a76 feat: tcpcli 2025-07-28 16:16:09 +08:00
10 changed files with 938 additions and 6 deletions

View File

@ -1,6 +1,7 @@
package com.imdroid.ntripproxy.executor;
import com.imdroid.ntripproxy.message.D331RtcmMessage;
import com.imdroid.ntripproxy.service.RtcmDistributor;
import com.imdroid.ntripproxy.service.UDPClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -16,8 +17,13 @@ import org.springframework.stereotype.Component;
public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
UDPClient rtcmClient;
@Autowired
RtcmDistributor rtcmDistributor;
@Override
public Void execute(D331RtcmMessage message) {
String id = message.getId();
@ -25,11 +31,19 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
if (logger.isDebugEnabled()) {
logger.debug("receive d331 message of device: "+id+", seq:"+message.getSeq()+", len:"+message.getLen());
}
// 推送基站数据
// 提取RTCM数据
byte[] srcData = new byte[message.getSrcBuf().readableBytes()];
message.getSrcBuf().getBytes(0,srcData);
message.getSrcBuf().getBytes(0, srcData);
// 继续发送到原来的UDP客户端
rtcmClient.send(srcData);
// 只有已经创建了TCP服务器的设备才发送数据
if (rtcmDistributor.hasDeviceServer(id)) {
rtcmDistributor.sendRtcmData(id, srcData);
}
return null;
}

View File

@ -3,8 +3,12 @@ package com.imdroid.ntripproxy.service;
public class Ntrip2Channels {
final private String localHost="127.0.0.1";
final private int localPort=9903;
final private String remoteHost="47.107.50.52";
final private int remotePort=9903;
// 将远程主机改为本地端口改为12000
final private String remoteHost="127.0.0.1";
//final private String remoteHost="100.91.37.6";
//final private String remoteHost="47.107.50.52";
//final private String remoteHost="8.134.185.53";
final private int remotePort=12000;
public static final Ntrip2Channels INSTANCE = new Ntrip2Channels();

View File

@ -0,0 +1,190 @@
package com.imdroid.ntripproxy.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class RtcmCommandLineRunner implements CommandLineRunner {
@Autowired
private RtcmDistributor rtcmDistributor;
@Autowired
private UdpHandler udpHandler;
@Autowired
private RtcmUdpHandler rtcmUdpHandler;
private static final int DEFAULT_BASE_PORT = 10000;
@Override
public void run(String... args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("RTCM TCP服务器命令行工具启动");
System.out.println("输入 'help' 查看可用命令");
while (true) {
System.out.print("> ");
String command = scanner.nextLine().trim();
if ("exit".equalsIgnoreCase(command)) {
break;
} else if ("help".equalsIgnoreCase(command)) {
printHelp();
} else if ("list".equalsIgnoreCase(command)) {
listDevices();
} else if (command.startsWith("create ")) {
String[] parts = command.substring("create ".length()).trim().split("\\s+");
if (parts.length == 1) {
createDeviceServer(parts[0], DEFAULT_BASE_PORT);
} else if (parts.length == 2) {
try {
int port = Integer.parseInt(parts[1]);
createDeviceServer(parts[0], port);
} catch (NumberFormatException e) {
System.out.println("端口必须是数字");
}
} else {
System.out.println("格式错误,使用 'create <deviceId> [port]'");
}
} else if (command.startsWith("remove ")) {
String deviceId = command.substring("remove ".length()).trim();
removeDeviceServer(deviceId);
} else if ("startmonitor".equalsIgnoreCase(command)) {
startMonitor();
} else if ("stopmonitor".equalsIgnoreCase(command)) {
stopMonitor();
} else if (command.startsWith("monitordevice ")) {
String deviceId = command.substring("monitordevice ".length()).trim();
monitorDevice(deviceId);
} else if ("clearmonitor".equalsIgnoreCase(command)) {
clearMonitoredDevices();
} else if (command.startsWith("setdefault ")) {
String deviceId = command.substring("setdefault ".length()).trim();
setDefaultDevice(deviceId);
} else {
System.out.println("未知命令,输入 'help' 查看可用命令");
}
}
System.out.println("命令行工具已退出");
});
}
private void printHelp() {
System.out.println("可用命令:");
System.out.println(" help - 显示此帮助信息");
System.out.println(" list - 列出所有设备及其TCP端口");
System.out.println(" create <deviceId> [port] - 为指定设备ID创建TCP服务器可选指定端口");
System.out.println(" remove <deviceId> - 移除指定设备的TCP服务器");
System.out.println(" startmonitor - 开始监控UDP数据");
System.out.println(" stopmonitor - 停止UDP数据监控");
System.out.println(" monitordevice <deviceId> - 添加指定设备ID到监控列表");
System.out.println(" clearmonitor - 清空设备监控列表(监控所有设备)");
System.out.println(" setdefault <deviceId> - 设置默认设备ID9903端口的RTCM数据将转发给此设备");
System.out.println(" exit - 退出命令行工具(不会停止服务)");
}
private void listDevices() {
Map<String, Integer> devicePorts = rtcmDistributor.listDevicePorts();
if (devicePorts.isEmpty()) {
System.out.println("当前没有设备TCP服务器");
} else {
System.out.println("设备TCP服务器列表");
for (Map.Entry<String, Integer> entry : devicePorts.entrySet()) {
String defaultMark = "";
if (entry.getKey().equals(rtcmUdpHandler.getDefaultDeviceId())) {
defaultMark = " (默认设备)";
}
System.out.println(" 设备ID: " + entry.getKey() + ", TCP端口: " + entry.getValue() + defaultMark);
}
}
}
private void createDeviceServer(String deviceId, int defaultPort) {
if (deviceId == null || deviceId.isEmpty()) {
System.out.println("设备ID不能为空");
return;
}
int port = rtcmDistributor.createDeviceServer(deviceId, defaultPort);
System.out.println("为设备 " + deviceId + " 创建TCP服务器端口: " + port);
System.out.println("可以在rtklib配置文件中使用以下设置");
System.out.println("inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)");
System.out.println("inpstr1-path =127.0.0.1:" + port);
// 如果没有默认设备自动设置为默认设备
if (rtcmUdpHandler.getDefaultDeviceId() == null) {
setDefaultDevice(deviceId);
}
}
private void removeDeviceServer(String deviceId) {
if (deviceId == null || deviceId.isEmpty()) {
System.out.println("设备ID不能为空");
return;
}
boolean removed = rtcmDistributor.removeDeviceServer(deviceId);
if (removed) {
System.out.println("已移除设备 " + deviceId + " 的TCP服务器");
// 如果移除的是默认设备清除默认设备设置
if (deviceId.equals(rtcmUdpHandler.getDefaultDeviceId())) {
rtcmUdpHandler.setDefaultDeviceId(null);
System.out.println("已清除默认设备设置");
}
} else {
System.out.println("设备 " + deviceId + " 没有TCP服务器");
}
}
private void startMonitor() {
udpHandler.enableMonitoring(true);
System.out.println("UDP数据监控已启用");
}
private void stopMonitor() {
udpHandler.enableMonitoring(false);
System.out.println("UDP数据监控已禁用");
}
private void monitorDevice(String deviceId) {
if (deviceId == null || deviceId.isEmpty()) {
System.out.println("设备ID不能为空");
return;
}
udpHandler.addMonitoredDevice(deviceId);
System.out.println("已添加设备 " + deviceId + " 到监控列表");
}
private void clearMonitoredDevices() {
udpHandler.clearMonitoredDevices();
System.out.println("已清空设备监控列表,将监控所有设备");
}
private void setDefaultDevice(String deviceId) {
if (deviceId == null || deviceId.isEmpty()) {
System.out.println("设备ID不能为空");
return;
}
if (!rtcmDistributor.hasDeviceServer(deviceId)) {
System.out.println("设备 " + deviceId + " 没有TCP服务器请先创建");
return;
}
rtcmUdpHandler.setDefaultDeviceId(deviceId);
System.out.println("已将设备 " + deviceId + " 设置为默认设备");
System.out.println("所有从9903端口接收的RTCM数据将转发给此设备");
}
}

View File

@ -0,0 +1,109 @@
package com.imdroid.ntripproxy.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
@Component
public class RtcmDataMonitor {
private static final Logger logger = LoggerFactory.getLogger(RtcmDataMonitor.class);
private static final int BUFFER_SIZE = 4096;
private DatagramSocket monitorSocket;
private boolean running = false;
public void startUdpMonitor(int port) {
if (running) {
logger.info("UDP监控已经在运行中");
return;
}
Thread monitorThread = new Thread(() -> {
try {
monitorSocket = new DatagramSocket(null);
monitorSocket.setReuseAddress(true);
monitorSocket.bind(new InetSocketAddress(port));
byte[] buffer = new byte[BUFFER_SIZE];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
running = true;
logger.info("开始监控UDP端口 {} 的数据", port);
while (running) {
try {
monitorSocket.receive(packet);
byte[] data = new byte[packet.getLength()];
System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
// 打印数据信息
String sender = packet.getAddress().getHostAddress() + ":" + packet.getPort();
printPacketInfo(sender, data);
} catch (IOException e) {
if (running) {
logger.error("接收UDP数据时出错", e);
}
}
}
} catch (IOException e) {
logger.error("创建UDP监控套接字失败", e);
} finally {
if (monitorSocket != null) {
monitorSocket.close();
}
running = false;
}
});
monitorThread.setDaemon(true);
monitorThread.start();
}
public void stopMonitor() {
running = false;
if (monitorSocket != null) {
monitorSocket.close();
}
logger.info("停止UDP数据监控");
}
private void printPacketInfo(String sender, byte[] data) {
StringBuilder hexDump = new StringBuilder();
int limit = Math.min(50, data.length);
for (int i = 0; i < limit; i++) {
hexDump.append(String.format("%02X ", data[i] & 0xFF));
if ((i + 1) % 16 == 0) {
hexDump.append("\n");
}
}
if (data.length > limit) {
hexDump.append("... (").append(data.length - limit).append(" more bytes)");
}
// 尝试解析设备ID (假设前4字节是header和len接下来4字节是设备ID)
String deviceId = "unknown";
if (data.length >= 8) {
try {
int pos = 4; // 跳过header和len
long id = ((data[pos] & 0xFFL) << 24) |
((data[pos+1] & 0xFFL) << 16) |
((data[pos+2] & 0xFFL) << 8) |
(data[pos+3] & 0xFFL);
deviceId = String.valueOf(id);
} catch (Exception e) {
// 解析失败保持unknown
}
}
logger.info("收到来自 {} 的UDP数据包长度: {} 字节可能的设备ID: {}\n数据内容:\n{}",
sender, data.length, deviceId, hexDump.toString());
}
}

View File

@ -0,0 +1,186 @@
package com.imdroid.ntripproxy.service;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@Service
public class RtcmDistributor {
private static final Logger logger = LoggerFactory.getLogger(RtcmDistributor.class);
// 设备ID与对应的TCP服务器映射
private final Map<String, DeviceTcpServer> deviceServers = new ConcurrentHashMap<>();
// 设备ID与对应的端口映射
private final Map<String, Integer> devicePorts = new ConcurrentHashMap<>();
// 数据统计
private final Map<String, AtomicLong> dataCounters = new ConcurrentHashMap<>();
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@PostConstruct
public void init() {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
logger.info("RtcmDistributor initialized");
}
// 为指定设备ID创建TCP服务器返回分配的端口
public synchronized int createDeviceServer(String deviceId, int port) {
if (deviceServers.containsKey(deviceId)) {
logger.info("Device {} already has a TCP server on port {}", deviceId, devicePorts.get(deviceId));
return devicePorts.get(deviceId);
}
DeviceTcpServer server = new DeviceTcpServer(deviceId, port);
deviceServers.put(deviceId, server);
devicePorts.put(deviceId, port);
dataCounters.put(deviceId, new AtomicLong(0));
// 启动服务器
server.start(bossGroup, workerGroup);
logger.info("Created TCP server for device {} on port {}", deviceId, port);
return port;
}
// 发送RTCM数据到特定设备的TCP服务器如果存在
public void sendRtcmData(String deviceId, byte[] data) {
DeviceTcpServer server = deviceServers.get(deviceId);
if (server != null) {
server.broadcastData(data);
// 更新数据统计
AtomicLong counter = dataCounters.computeIfAbsent(deviceId, k -> new AtomicLong(0));
long totalBytes = counter.addAndGet(data.length);
logger.info("已将{}字节的RTCM数据发送到设备{}的TCP服务器累计发送{}字节",
data.length, deviceId, totalBytes);
// 打印前20个字节的十六进制表示帮助调试
if (data.length > 0 && logger.isDebugEnabled()) {
StringBuilder hexData = new StringBuilder();
int limit = Math.min(20, data.length);
for (int i = 0; i < limit; i++) {
hexData.append(String.format("%02X ", data[i] & 0xFF));
}
logger.debug("RTCM数据样本: {}", hexData.toString());
}
}
}
// 检查设备是否有TCP服务器
public boolean hasDeviceServer(String deviceId) {
return deviceServers.containsKey(deviceId);
}
// 删除设备的TCP服务器
public boolean removeDeviceServer(String deviceId) {
DeviceTcpServer server = deviceServers.remove(deviceId);
if (server != null) {
devicePorts.remove(deviceId);
dataCounters.remove(deviceId);
server.stop();
logger.info("Removed TCP server for device {}", deviceId);
return true;
}
return false;
}
// 列出所有设备及其端口
public Map<String, Integer> listDevicePorts() {
return new ConcurrentHashMap<>(devicePorts);
}
// 获取设备数据统计
public Map<String, Long> getDataStatistics() {
Map<String, Long> stats = new ConcurrentHashMap<>();
dataCounters.forEach((deviceId, counter) -> stats.put(deviceId, counter.get()));
return stats;
}
// 内部类每个设备对应一个TCP服务器
public class DeviceTcpServer {
private final String deviceId;
private final int port;
private Channel serverChannel;
private final Map<Channel, Boolean> connectedClients = new ConcurrentHashMap<>();
public DeviceTcpServer(String deviceId, int port) {
this.deviceId = deviceId;
this.port = port;
}
public void start(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new RtcmTcpHandler(DeviceTcpServer.this));
}
});
try {
serverChannel = b.bind(port).sync().channel();
logger.info("Device {} TCP server started on port {}", deviceId, port);
} catch (Exception e) {
logger.error("Failed to start TCP server for device {} on port {}", deviceId, port, e);
}
}
public void stop() {
if (serverChannel != null) {
try {
serverChannel.close().sync();
} catch (InterruptedException e) {
logger.error("Error closing server channel for device {}", deviceId, e);
}
}
}
public void addClient(Channel channel) {
connectedClients.put(channel, true);
logger.info("New client connected to device {} server, total clients: {}",
deviceId, connectedClients.size());
}
public void removeClient(Channel channel) {
connectedClients.remove(channel);
logger.info("Client disconnected from device {} server, remaining clients: {}",
deviceId, connectedClients.size());
}
public void broadcastData(byte[] data) {
int activeClients = 0;
for (Channel channel : connectedClients.keySet()) {
if (channel.isActive()) {
channel.writeAndFlush(channel.alloc().buffer().writeBytes(data));
activeClients++;
}
}
if (activeClients > 0 && logger.isDebugEnabled()) {
logger.debug("已将{}字节的数据广播到设备{}的{}个客户端",
data.length, deviceId, activeClients);
}
}
}
}

View File

@ -0,0 +1,185 @@
package com.imdroid.ntripproxy.service;
import com.imdroid.common.util.DataTypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* RTCM数据过滤服务器
* 监听UDP 12000端口接收RTCM数据
* 开启TCP 12002端口转发指定deviceId的RTCM数据
*/
@Component
public class RtcmFilterServer implements ApplicationRunner {
private static final Logger logger = LoggerFactory.getLogger(RtcmFilterServer.class);
// 配置参数
private static final int UDP_PORT = 12000;
private static final int TCP_PORT = 12002;
private static final String TARGET_DEVICE_ID = "3530795";
private static final int BUFFER_SIZE = 4096;
// TCP客户端连接列表
private final List<SocketChannel> tcpClients = new CopyOnWriteArrayList<>();
// 线程池
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
@Override
public void run(ApplicationArguments args) {
// 启动UDP监听服务
executorService.submit(this::startUdpServer);
// 启动TCP服务器
executorService.submit(this::startTcpServer);
logger.info("RTCM过滤服务已启动 - UDP监听端口:{}, TCP服务端口:{}, 目标设备ID:{}",
UDP_PORT, TCP_PORT, TARGET_DEVICE_ID);
}
/**
* 启动UDP服务器监听12000端口
*/
private void startUdpServer() {
try (DatagramSocket socket = new DatagramSocket(UDP_PORT)) {
logger.info("UDP服务已启动监听端口: {}", UDP_PORT);
byte[] buffer = new byte[BUFFER_SIZE];
while (true) {
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
// 解析数据包
byte[] data = new byte[packet.getLength()];
System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
// 提取设备ID
String deviceId = extractDeviceId(data);
// 如果是目标设备ID则转发到TCP客户端
if (TARGET_DEVICE_ID.equals(deviceId)) {
if (logger.isDebugEnabled()) {
logger.debug("接收到目标设备 {} 的RTCM数据长度: {}", deviceId, data.length);
}
forwardToTcpClients(data);
}
}
} catch (IOException e) {
logger.error("UDP服务异常: {}", e.getMessage(), e);
}
}
/**
* 启动TCP服务器监听12002端口
*/
private void startTcpServer() {
try {
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(TCP_PORT));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
logger.info("TCP服务已启动监听端口: {}", TCP_PORT);
while (true) {
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
acceptConnection(selector, serverChannel);
}
}
}
} catch (IOException e) {
logger.error("TCP服务异常: {}", e.getMessage(), e);
}
}
/**
* 接受新的TCP客户端连接
*/
private void acceptConnection(Selector selector, ServerSocketChannel serverChannel) throws IOException {
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
tcpClients.add(clientChannel);
logger.info("新的TCP客户端已连接: {}", clientChannel.getRemoteAddress());
}
/**
* 从数据包中提取设备ID
*/
private String extractDeviceId(byte[] data) {
// 按照NtripMessage格式解析设备ID从第4字节开始4字节无符号整型
if (data.length >= 8) {
ByteBuffer buffer = ByteBuffer.wrap(data, 4, 4);
long deviceId = buffer.getInt() & 0xFFFFFFFFL; // 转为无符号整型
return String.valueOf(deviceId);
}
return "";
}
/**
* 将数据转发到所有TCP客户端
*/
private void forwardToTcpClients(byte[] data) {
if (tcpClients.isEmpty()) {
return;
}
List<SocketChannel> disconnectedClients = new ArrayList<>();
for (SocketChannel client : tcpClients) {
try {
if (client.isOpen()) {
ByteBuffer buffer = ByteBuffer.wrap(data);
client.write(buffer);
} else {
disconnectedClients.add(client);
}
} catch (IOException e) {
logger.warn("向TCP客户端发送数据失败: {}", e.getMessage());
disconnectedClients.add(client);
}
}
// 移除断开连接的客户端
for (SocketChannel client : disconnectedClients) {
tcpClients.remove(client);
try {
client.close();
logger.info("已关闭断开连接的TCP客户端");
} catch (IOException e) {
logger.error("关闭TCP客户端异常: {}", e.getMessage());
}
}
}
}

View File

@ -0,0 +1,40 @@
package com.imdroid.ntripproxy.service;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ChannelHandler.Sharable
public class RtcmTcpHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(RtcmTcpHandler.class);
private final RtcmDistributor.DeviceTcpServer server;
public RtcmTcpHandler(RtcmDistributor.DeviceTcpServer server) {
this.server = server;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
server.addClient(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
server.removeClient(ctx.channel());
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// 通常不需要处理从客户端接收的数据因为我们只是将RTCM数据推送给客户端
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("Exception in TCP handler", cause);
ctx.close();
}
}

View File

@ -0,0 +1,83 @@
package com.imdroid.ntripproxy.service;
import com.imdroid.common.util.DataTypeUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicReference;
@ChannelHandler.Sharable
@Component
public class RtcmUdpHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class);
@Autowired
private RtcmDistributor rtcmDistributor;
// 默认设备ID可以通过命令行设置
private AtomicReference<String> defaultDeviceId = new AtomicReference<>(null);
public void setDefaultDeviceId(String deviceId) {
defaultDeviceId.set(deviceId);
logger.info("设置默认设备ID为: {}", deviceId);
}
public String getDefaultDeviceId() {
return defaultDeviceId.get();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg;
try {
if (packet.content() == null) {
return;
}
byte[] data = new byte[packet.content().readableBytes()];
packet.content().getBytes(0, data);
String deviceId = defaultDeviceId.get();
if (deviceId == null || deviceId.isEmpty()) {
logger.info("收到RTCM数据但未设置默认设备ID无法转发");
return;
}
// 如果该设备有TCP服务器则转发数据
if (rtcmDistributor.hasDeviceServer(deviceId)) {
rtcmDistributor.sendRtcmData(deviceId, data);
logger.info("已将{}字节的RTCM数据转发到设备{}的TCP服务器", data.length, deviceId);
// 打印前20个字节的十六进制表示帮助调试
if (data.length > 0 && logger.isDebugEnabled()) {
StringBuilder hexData = new StringBuilder();
int limit = Math.min(20, data.length);
for (int i = 0; i < limit; i++) {
hexData.append(String.format("%02X ", data[i] & 0xFF));
}
logger.debug("RTCM数据样本: {}", hexData.toString());
}
} else {
logger.info("收到RTCM数据但设备{}没有TCP服务器", deviceId);
}
} catch (Exception e) {
logger.error("处理RTCM数据时出错: {}", e.toString());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("RTCM UDP处理器异常: {}", cause.toString());
}
}

View File

@ -0,0 +1,49 @@
package com.imdroid.ntripproxy.service;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class RtcmUdpServer implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class);
private final int port = 9903; // 直接监听9903端口
@Autowired
private RtcmUdpHandler rtcmUdpHandler;
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(this::start0, "rtcm-server").start();
}
private void start0() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_REUSEADDR, true) // 允许端口重用
.option(ChannelOption.SO_SNDBUF, 1024*1024)
.option(ChannelOption.SO_RCVBUF, 1024*1024)
.handler(rtcmUdpHandler);
try {
ChannelFuture future = bootstrap.bind(port).sync().channel().closeFuture();
logger.info("RTCM UDP server started at port {}", port);
future.await();
} catch (Exception e) {
logger.error("Error starting RTCM UDP server at port {}", port, e);
} finally {
group.shutdownGracefully();
}
}
}

View File

@ -14,6 +14,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Layton
* @date 2023/2/13 11:47
@ -23,6 +27,32 @@ import org.springframework.stereotype.Component;
public class UdpHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(UdpHandler.class);
// 数据监控标志
private AtomicBoolean monitoringEnabled = new AtomicBoolean(false);
private Set<String> monitoredDevices = new HashSet<>();
public void enableMonitoring(boolean enable) {
monitoringEnabled.set(enable);
if (enable) {
logger.info("UDP数据监控已启用");
} else {
logger.info("UDP数据监控已禁用");
monitoredDevices.clear();
}
}
public void addMonitoredDevice(String deviceId) {
if (deviceId != null && !deviceId.isEmpty()) {
monitoredDevices.add(deviceId);
logger.info("添加设备 {} 到监控列表", deviceId);
}
}
public void clearMonitoredDevices() {
monitoredDevices.clear();
logger.info("清空设备监控列表");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
@ -31,6 +61,49 @@ public class UdpHandler extends ChannelInboundHandlerAdapter {
if (packet.content() == null) {
return;
}
// 如果监控已启用记录数据包信息
if (monitoringEnabled.get()) {
byte[] data = new byte[packet.content().readableBytes()];
packet.content().getBytes(0, data);
String sender = packet.sender().getAddress().getHostAddress() + ":" + packet.sender().getPort();
// 尝试解析设备ID (假设前4字节是header和len接下来4字节是设备ID)
String deviceId = "unknown";
if (data.length >= 8) {
try {
int pos = 4; // 跳过header和len
long id = ((data[pos] & 0xFFL) << 24) |
((data[pos+1] & 0xFFL) << 16) |
((data[pos+2] & 0xFFL) << 8) |
(data[pos+3] & 0xFFL);
deviceId = String.valueOf(id);
} catch (Exception e) {
// 解析失败保持unknown
}
}
// 如果没有指定监控设备或者当前设备在监控列表中
if (monitoredDevices.isEmpty() || monitoredDevices.contains(deviceId)) {
StringBuilder hexDump = new StringBuilder();
int limit = Math.min(50, data.length);
for (int i = 0; i < limit; i++) {
hexDump.append(String.format("%02X ", data[i] & 0xFF));
if ((i + 1) % 16 == 0) {
hexDump.append("\n");
}
}
if (data.length > limit) {
hexDump.append("... (").append(data.length - limit).append(" more bytes)");
}
logger.info("收到来自 {} 的UDP数据包长度: {} 字节设备ID: {}\n数据内容:\n{}",
sender, data.length, deviceId, hexDump.toString());
}
}
if (logger.isDebugEnabled()) {
byte[] data = new byte[packet.content().readableBytes()];
packet.content().getBytes(0, data);
@ -56,5 +129,4 @@ public class UdpHandler extends ChannelInboundHandlerAdapter {
super.exceptionCaught(ctx, cause);
logger.error("Exception caught: {}", cause.toString());
}
}