Compare commits
1 Commits
feature/tc
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
| 01427e230e |
@ -1,7 +1,6 @@
|
|||||||
package com.imdroid.ntripproxy.executor;
|
package com.imdroid.ntripproxy.executor;
|
||||||
|
|
||||||
import com.imdroid.ntripproxy.message.D331RtcmMessage;
|
import com.imdroid.ntripproxy.message.D331RtcmMessage;
|
||||||
import com.imdroid.ntripproxy.service.RtcmDistributor;
|
|
||||||
import com.imdroid.ntripproxy.service.UDPClient;
|
import com.imdroid.ntripproxy.service.UDPClient;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -17,13 +16,8 @@ import org.springframework.stereotype.Component;
|
|||||||
public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void> {
|
public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void> {
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
UDPClient rtcmClient;
|
UDPClient rtcmClient;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
RtcmDistributor rtcmDistributor;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void execute(D331RtcmMessage message) {
|
public Void execute(D331RtcmMessage message) {
|
||||||
String id = message.getId();
|
String id = message.getId();
|
||||||
@ -31,19 +25,11 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
|
|||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("receive d331 message of device: "+id+", seq:"+message.getSeq()+", len:"+message.getLen());
|
logger.debug("receive d331 message of device: "+id+", seq:"+message.getSeq()+", len:"+message.getLen());
|
||||||
}
|
}
|
||||||
|
// 推送基站数据
|
||||||
// 提取RTCM数据
|
|
||||||
byte[] srcData = new byte[message.getSrcBuf().readableBytes()];
|
byte[] srcData = new byte[message.getSrcBuf().readableBytes()];
|
||||||
message.getSrcBuf().getBytes(0, srcData);
|
message.getSrcBuf().getBytes(0,srcData);
|
||||||
|
|
||||||
// 继续发送到原来的UDP客户端
|
|
||||||
rtcmClient.send(srcData);
|
rtcmClient.send(srcData);
|
||||||
|
|
||||||
// 只有已经创建了TCP服务器的设备才发送数据
|
|
||||||
if (rtcmDistributor.hasDeviceServer(id)) {
|
|
||||||
rtcmDistributor.sendRtcmData(id, srcData);
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -3,6 +3,12 @@ package com.imdroid.ntripproxy.service;
|
|||||||
public class Ntrip2Channels {
|
public class Ntrip2Channels {
|
||||||
final private String localHost="127.0.0.1";
|
final private String localHost="127.0.0.1";
|
||||||
final private int localPort=9903;
|
final private int localPort=9903;
|
||||||
|
// 将远程主机改为本地,端口改为12000
|
||||||
|
final private String remoteHost="127.0.0.1";
|
||||||
|
//final private String remoteHost="100.91.37.6";
|
||||||
|
//final private String remoteHost="47.107.50.52";
|
||||||
|
//final private String remoteHost="8.134.185.53";
|
||||||
|
final private int remotePort=12000;
|
||||||
|
|
||||||
public static final Ntrip2Channels INSTANCE = new Ntrip2Channels();
|
public static final Ntrip2Channels INSTANCE = new Ntrip2Channels();
|
||||||
|
|
||||||
@ -10,10 +16,13 @@ public class Ntrip2Channels {
|
|||||||
UDPClient remoteRtcm;
|
UDPClient remoteRtcm;
|
||||||
private Ntrip2Channels() {
|
private Ntrip2Channels() {
|
||||||
localRtcm = new UDPClient();
|
localRtcm = new UDPClient();
|
||||||
|
remoteRtcm = new UDPClient();
|
||||||
localRtcm.init(localHost, localPort);
|
localRtcm.init(localHost, localPort);
|
||||||
|
remoteRtcm.init(remoteHost, remotePort);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void send(byte[] data) {
|
public void send(byte[] data) {
|
||||||
localRtcm.send(data);
|
localRtcm.send(data);
|
||||||
|
remoteRtcm.send(data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,190 +0,0 @@
|
|||||||
package com.imdroid.ntripproxy.service;
|
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.CommandLineRunner;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Scanner;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class RtcmCommandLineRunner implements CommandLineRunner {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RtcmDistributor rtcmDistributor;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private UdpHandler udpHandler;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RtcmUdpHandler rtcmUdpHandler;
|
|
||||||
|
|
||||||
private static final int DEFAULT_BASE_PORT = 10000;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run(String... args) {
|
|
||||||
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
|
||||||
executorService.submit(() -> {
|
|
||||||
Scanner scanner = new Scanner(System.in);
|
|
||||||
System.out.println("RTCM TCP服务器命令行工具启动");
|
|
||||||
System.out.println("输入 'help' 查看可用命令");
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
System.out.print("> ");
|
|
||||||
String command = scanner.nextLine().trim();
|
|
||||||
|
|
||||||
if ("exit".equalsIgnoreCase(command)) {
|
|
||||||
break;
|
|
||||||
} else if ("help".equalsIgnoreCase(command)) {
|
|
||||||
printHelp();
|
|
||||||
} else if ("list".equalsIgnoreCase(command)) {
|
|
||||||
listDevices();
|
|
||||||
} else if (command.startsWith("create ")) {
|
|
||||||
String[] parts = command.substring("create ".length()).trim().split("\\s+");
|
|
||||||
if (parts.length == 1) {
|
|
||||||
createDeviceServer(parts[0], DEFAULT_BASE_PORT);
|
|
||||||
} else if (parts.length == 2) {
|
|
||||||
try {
|
|
||||||
int port = Integer.parseInt(parts[1]);
|
|
||||||
createDeviceServer(parts[0], port);
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
System.out.println("端口必须是数字");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
System.out.println("格式错误,使用 'create <deviceId> [port]'");
|
|
||||||
}
|
|
||||||
} else if (command.startsWith("remove ")) {
|
|
||||||
String deviceId = command.substring("remove ".length()).trim();
|
|
||||||
removeDeviceServer(deviceId);
|
|
||||||
} else if ("startmonitor".equalsIgnoreCase(command)) {
|
|
||||||
startMonitor();
|
|
||||||
} else if ("stopmonitor".equalsIgnoreCase(command)) {
|
|
||||||
stopMonitor();
|
|
||||||
} else if (command.startsWith("monitordevice ")) {
|
|
||||||
String deviceId = command.substring("monitordevice ".length()).trim();
|
|
||||||
monitorDevice(deviceId);
|
|
||||||
} else if ("clearmonitor".equalsIgnoreCase(command)) {
|
|
||||||
clearMonitoredDevices();
|
|
||||||
} else if (command.startsWith("setdefault ")) {
|
|
||||||
String deviceId = command.substring("setdefault ".length()).trim();
|
|
||||||
setDefaultDevice(deviceId);
|
|
||||||
} else {
|
|
||||||
System.out.println("未知命令,输入 'help' 查看可用命令");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
System.out.println("命令行工具已退出");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printHelp() {
|
|
||||||
System.out.println("可用命令:");
|
|
||||||
System.out.println(" help - 显示此帮助信息");
|
|
||||||
System.out.println(" list - 列出所有设备及其TCP端口");
|
|
||||||
System.out.println(" create <deviceId> [port] - 为指定设备ID创建TCP服务器,可选指定端口");
|
|
||||||
System.out.println(" remove <deviceId> - 移除指定设备的TCP服务器");
|
|
||||||
System.out.println(" startmonitor - 开始监控UDP数据");
|
|
||||||
System.out.println(" stopmonitor - 停止UDP数据监控");
|
|
||||||
System.out.println(" monitordevice <deviceId> - 添加指定设备ID到监控列表");
|
|
||||||
System.out.println(" clearmonitor - 清空设备监控列表(监控所有设备)");
|
|
||||||
System.out.println(" setdefault <deviceId> - 设置默认设备ID,9903端口的RTCM数据将转发给此设备");
|
|
||||||
System.out.println(" exit - 退出命令行工具(不会停止服务)");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void listDevices() {
|
|
||||||
Map<String, Integer> devicePorts = rtcmDistributor.listDevicePorts();
|
|
||||||
if (devicePorts.isEmpty()) {
|
|
||||||
System.out.println("当前没有设备TCP服务器");
|
|
||||||
} else {
|
|
||||||
System.out.println("设备TCP服务器列表:");
|
|
||||||
for (Map.Entry<String, Integer> entry : devicePorts.entrySet()) {
|
|
||||||
String defaultMark = "";
|
|
||||||
if (entry.getKey().equals(rtcmUdpHandler.getDefaultDeviceId())) {
|
|
||||||
defaultMark = " (默认设备)";
|
|
||||||
}
|
|
||||||
System.out.println(" 设备ID: " + entry.getKey() + ", TCP端口: " + entry.getValue() + defaultMark);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createDeviceServer(String deviceId, int defaultPort) {
|
|
||||||
if (deviceId == null || deviceId.isEmpty()) {
|
|
||||||
System.out.println("设备ID不能为空");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int port = rtcmDistributor.createDeviceServer(deviceId, defaultPort);
|
|
||||||
System.out.println("为设备 " + deviceId + " 创建TCP服务器,端口: " + port);
|
|
||||||
System.out.println("可以在rtklib配置文件中使用以下设置:");
|
|
||||||
System.out.println("inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)");
|
|
||||||
System.out.println("inpstr1-path =127.0.0.1:" + port);
|
|
||||||
|
|
||||||
// 如果没有默认设备,自动设置为默认设备
|
|
||||||
if (rtcmUdpHandler.getDefaultDeviceId() == null) {
|
|
||||||
setDefaultDevice(deviceId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void removeDeviceServer(String deviceId) {
|
|
||||||
if (deviceId == null || deviceId.isEmpty()) {
|
|
||||||
System.out.println("设备ID不能为空");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean removed = rtcmDistributor.removeDeviceServer(deviceId);
|
|
||||||
if (removed) {
|
|
||||||
System.out.println("已移除设备 " + deviceId + " 的TCP服务器");
|
|
||||||
|
|
||||||
// 如果移除的是默认设备,清除默认设备设置
|
|
||||||
if (deviceId.equals(rtcmUdpHandler.getDefaultDeviceId())) {
|
|
||||||
rtcmUdpHandler.setDefaultDeviceId(null);
|
|
||||||
System.out.println("已清除默认设备设置");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
System.out.println("设备 " + deviceId + " 没有TCP服务器");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void startMonitor() {
|
|
||||||
udpHandler.enableMonitoring(true);
|
|
||||||
System.out.println("UDP数据监控已启用");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void stopMonitor() {
|
|
||||||
udpHandler.enableMonitoring(false);
|
|
||||||
System.out.println("UDP数据监控已禁用");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void monitorDevice(String deviceId) {
|
|
||||||
if (deviceId == null || deviceId.isEmpty()) {
|
|
||||||
System.out.println("设备ID不能为空");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
udpHandler.addMonitoredDevice(deviceId);
|
|
||||||
System.out.println("已添加设备 " + deviceId + " 到监控列表");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void clearMonitoredDevices() {
|
|
||||||
udpHandler.clearMonitoredDevices();
|
|
||||||
System.out.println("已清空设备监控列表,将监控所有设备");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setDefaultDevice(String deviceId) {
|
|
||||||
if (deviceId == null || deviceId.isEmpty()) {
|
|
||||||
System.out.println("设备ID不能为空");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!rtcmDistributor.hasDeviceServer(deviceId)) {
|
|
||||||
System.out.println("设备 " + deviceId + " 没有TCP服务器,请先创建");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
rtcmUdpHandler.setDefaultDeviceId(deviceId);
|
|
||||||
System.out.println("已将设备 " + deviceId + " 设置为默认设备");
|
|
||||||
System.out.println("所有从9903端口接收的RTCM数据将转发给此设备");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,109 +0,0 @@
|
|||||||
package com.imdroid.ntripproxy.service;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.DatagramPacket;
|
|
||||||
import java.net.DatagramSocket;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class RtcmDataMonitor {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RtcmDataMonitor.class);
|
|
||||||
private static final int BUFFER_SIZE = 4096;
|
|
||||||
|
|
||||||
private DatagramSocket monitorSocket;
|
|
||||||
private boolean running = false;
|
|
||||||
|
|
||||||
public void startUdpMonitor(int port) {
|
|
||||||
if (running) {
|
|
||||||
logger.info("UDP监控已经在运行中");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread monitorThread = new Thread(() -> {
|
|
||||||
try {
|
|
||||||
monitorSocket = new DatagramSocket(null);
|
|
||||||
monitorSocket.setReuseAddress(true);
|
|
||||||
monitorSocket.bind(new InetSocketAddress(port));
|
|
||||||
|
|
||||||
byte[] buffer = new byte[BUFFER_SIZE];
|
|
||||||
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
|
||||||
|
|
||||||
running = true;
|
|
||||||
logger.info("开始监控UDP端口 {} 的数据", port);
|
|
||||||
|
|
||||||
while (running) {
|
|
||||||
try {
|
|
||||||
monitorSocket.receive(packet);
|
|
||||||
byte[] data = new byte[packet.getLength()];
|
|
||||||
System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
|
|
||||||
|
|
||||||
// 打印数据信息
|
|
||||||
String sender = packet.getAddress().getHostAddress() + ":" + packet.getPort();
|
|
||||||
printPacketInfo(sender, data);
|
|
||||||
|
|
||||||
} catch (IOException e) {
|
|
||||||
if (running) {
|
|
||||||
logger.error("接收UDP数据时出错", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("创建UDP监控套接字失败", e);
|
|
||||||
} finally {
|
|
||||||
if (monitorSocket != null) {
|
|
||||||
monitorSocket.close();
|
|
||||||
}
|
|
||||||
running = false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
monitorThread.setDaemon(true);
|
|
||||||
monitorThread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stopMonitor() {
|
|
||||||
running = false;
|
|
||||||
if (monitorSocket != null) {
|
|
||||||
monitorSocket.close();
|
|
||||||
}
|
|
||||||
logger.info("停止UDP数据监控");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void printPacketInfo(String sender, byte[] data) {
|
|
||||||
StringBuilder hexDump = new StringBuilder();
|
|
||||||
int limit = Math.min(50, data.length);
|
|
||||||
|
|
||||||
for (int i = 0; i < limit; i++) {
|
|
||||||
hexDump.append(String.format("%02X ", data[i] & 0xFF));
|
|
||||||
if ((i + 1) % 16 == 0) {
|
|
||||||
hexDump.append("\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.length > limit) {
|
|
||||||
hexDump.append("... (").append(data.length - limit).append(" more bytes)");
|
|
||||||
}
|
|
||||||
|
|
||||||
// 尝试解析设备ID (假设前4字节是header和len,接下来4字节是设备ID)
|
|
||||||
String deviceId = "unknown";
|
|
||||||
if (data.length >= 8) {
|
|
||||||
try {
|
|
||||||
int pos = 4; // 跳过header和len
|
|
||||||
long id = ((data[pos] & 0xFFL) << 24) |
|
|
||||||
((data[pos+1] & 0xFFL) << 16) |
|
|
||||||
((data[pos+2] & 0xFFL) << 8) |
|
|
||||||
(data[pos+3] & 0xFFL);
|
|
||||||
deviceId = String.valueOf(id);
|
|
||||||
} catch (Exception e) {
|
|
||||||
// 解析失败,保持unknown
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("收到来自 {} 的UDP数据包,长度: {} 字节,可能的设备ID: {}\n数据内容:\n{}",
|
|
||||||
sender, data.length, deviceId, hexDump.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,186 +0,0 @@
|
|||||||
package com.imdroid.ntripproxy.service;
|
|
||||||
|
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
|
||||||
import io.netty.channel.Channel;
|
|
||||||
import io.netty.channel.ChannelInitializer;
|
|
||||||
import io.netty.channel.ChannelPipeline;
|
|
||||||
import io.netty.channel.EventLoopGroup;
|
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
|
||||||
import io.netty.channel.socket.SocketChannel;
|
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
public class RtcmDistributor {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RtcmDistributor.class);
|
|
||||||
|
|
||||||
// 设备ID与对应的TCP服务器映射
|
|
||||||
private final Map<String, DeviceTcpServer> deviceServers = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
// 设备ID与对应的端口映射
|
|
||||||
private final Map<String, Integer> devicePorts = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
// 数据统计
|
|
||||||
private final Map<String, AtomicLong> dataCounters = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
private EventLoopGroup bossGroup;
|
|
||||||
private EventLoopGroup workerGroup;
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init() {
|
|
||||||
bossGroup = new NioEventLoopGroup(1);
|
|
||||||
workerGroup = new NioEventLoopGroup();
|
|
||||||
logger.info("RtcmDistributor initialized");
|
|
||||||
}
|
|
||||||
|
|
||||||
// 为指定设备ID创建TCP服务器,返回分配的端口
|
|
||||||
public synchronized int createDeviceServer(String deviceId, int port) {
|
|
||||||
if (deviceServers.containsKey(deviceId)) {
|
|
||||||
logger.info("Device {} already has a TCP server on port {}", deviceId, devicePorts.get(deviceId));
|
|
||||||
return devicePorts.get(deviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
DeviceTcpServer server = new DeviceTcpServer(deviceId, port);
|
|
||||||
deviceServers.put(deviceId, server);
|
|
||||||
devicePorts.put(deviceId, port);
|
|
||||||
dataCounters.put(deviceId, new AtomicLong(0));
|
|
||||||
|
|
||||||
// 启动服务器
|
|
||||||
server.start(bossGroup, workerGroup);
|
|
||||||
logger.info("Created TCP server for device {} on port {}", deviceId, port);
|
|
||||||
|
|
||||||
return port;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 发送RTCM数据到特定设备的TCP服务器(如果存在)
|
|
||||||
public void sendRtcmData(String deviceId, byte[] data) {
|
|
||||||
DeviceTcpServer server = deviceServers.get(deviceId);
|
|
||||||
if (server != null) {
|
|
||||||
server.broadcastData(data);
|
|
||||||
|
|
||||||
// 更新数据统计
|
|
||||||
AtomicLong counter = dataCounters.computeIfAbsent(deviceId, k -> new AtomicLong(0));
|
|
||||||
long totalBytes = counter.addAndGet(data.length);
|
|
||||||
|
|
||||||
logger.info("已将{}字节的RTCM数据发送到设备{}的TCP服务器,累计发送{}字节",
|
|
||||||
data.length, deviceId, totalBytes);
|
|
||||||
|
|
||||||
// 打印前20个字节的十六进制表示,帮助调试
|
|
||||||
if (data.length > 0 && logger.isDebugEnabled()) {
|
|
||||||
StringBuilder hexData = new StringBuilder();
|
|
||||||
int limit = Math.min(20, data.length);
|
|
||||||
for (int i = 0; i < limit; i++) {
|
|
||||||
hexData.append(String.format("%02X ", data[i] & 0xFF));
|
|
||||||
}
|
|
||||||
logger.debug("RTCM数据样本: {}", hexData.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 检查设备是否有TCP服务器
|
|
||||||
public boolean hasDeviceServer(String deviceId) {
|
|
||||||
return deviceServers.containsKey(deviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 删除设备的TCP服务器
|
|
||||||
public boolean removeDeviceServer(String deviceId) {
|
|
||||||
DeviceTcpServer server = deviceServers.remove(deviceId);
|
|
||||||
if (server != null) {
|
|
||||||
devicePorts.remove(deviceId);
|
|
||||||
dataCounters.remove(deviceId);
|
|
||||||
server.stop();
|
|
||||||
logger.info("Removed TCP server for device {}", deviceId);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 列出所有设备及其端口
|
|
||||||
public Map<String, Integer> listDevicePorts() {
|
|
||||||
return new ConcurrentHashMap<>(devicePorts);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取设备数据统计
|
|
||||||
public Map<String, Long> getDataStatistics() {
|
|
||||||
Map<String, Long> stats = new ConcurrentHashMap<>();
|
|
||||||
dataCounters.forEach((deviceId, counter) -> stats.put(deviceId, counter.get()));
|
|
||||||
return stats;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 内部类:每个设备对应一个TCP服务器
|
|
||||||
public class DeviceTcpServer {
|
|
||||||
private final String deviceId;
|
|
||||||
private final int port;
|
|
||||||
private Channel serverChannel;
|
|
||||||
private final Map<Channel, Boolean> connectedClients = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
public DeviceTcpServer(String deviceId, int port) {
|
|
||||||
this.deviceId = deviceId;
|
|
||||||
this.port = port;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
|
|
||||||
ServerBootstrap b = new ServerBootstrap();
|
|
||||||
b.group(bossGroup, workerGroup)
|
|
||||||
.channel(NioServerSocketChannel.class)
|
|
||||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
|
||||||
@Override
|
|
||||||
protected void initChannel(SocketChannel ch) {
|
|
||||||
ChannelPipeline p = ch.pipeline();
|
|
||||||
p.addLast(new RtcmTcpHandler(DeviceTcpServer.this));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
serverChannel = b.bind(port).sync().channel();
|
|
||||||
logger.info("Device {} TCP server started on port {}", deviceId, port);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Failed to start TCP server for device {} on port {}", deviceId, port, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() {
|
|
||||||
if (serverChannel != null) {
|
|
||||||
try {
|
|
||||||
serverChannel.close().sync();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
logger.error("Error closing server channel for device {}", deviceId, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addClient(Channel channel) {
|
|
||||||
connectedClients.put(channel, true);
|
|
||||||
logger.info("New client connected to device {} server, total clients: {}",
|
|
||||||
deviceId, connectedClients.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeClient(Channel channel) {
|
|
||||||
connectedClients.remove(channel);
|
|
||||||
logger.info("Client disconnected from device {} server, remaining clients: {}",
|
|
||||||
deviceId, connectedClients.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void broadcastData(byte[] data) {
|
|
||||||
int activeClients = 0;
|
|
||||||
for (Channel channel : connectedClients.keySet()) {
|
|
||||||
if (channel.isActive()) {
|
|
||||||
channel.writeAndFlush(channel.alloc().buffer().writeBytes(data));
|
|
||||||
activeClients++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (activeClients > 0 && logger.isDebugEnabled()) {
|
|
||||||
logger.debug("已将{}字节的数据广播到设备{}的{}个客户端",
|
|
||||||
data.length, deviceId, activeClients);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -0,0 +1,185 @@
|
|||||||
|
package com.imdroid.ntripproxy.service;
|
||||||
|
|
||||||
|
import com.imdroid.common.util.DataTypeUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.boot.ApplicationArguments;
|
||||||
|
import org.springframework.boot.ApplicationRunner;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.*;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.nio.channels.Selector;
|
||||||
|
import java.nio.channels.ServerSocketChannel;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RTCM数据过滤服务器
|
||||||
|
* 监听UDP 12000端口,接收RTCM数据
|
||||||
|
* 开启TCP 12002端口,转发指定deviceId的RTCM数据
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class RtcmFilterServer implements ApplicationRunner {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(RtcmFilterServer.class);
|
||||||
|
|
||||||
|
// 配置参数
|
||||||
|
private static final int UDP_PORT = 12000;
|
||||||
|
private static final int TCP_PORT = 12002;
|
||||||
|
private static final String TARGET_DEVICE_ID = "3530795";
|
||||||
|
private static final int BUFFER_SIZE = 4096;
|
||||||
|
|
||||||
|
// TCP客户端连接列表
|
||||||
|
private final List<SocketChannel> tcpClients = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
// 线程池
|
||||||
|
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(ApplicationArguments args) {
|
||||||
|
// 启动UDP监听服务
|
||||||
|
executorService.submit(this::startUdpServer);
|
||||||
|
|
||||||
|
// 启动TCP服务器
|
||||||
|
executorService.submit(this::startTcpServer);
|
||||||
|
|
||||||
|
logger.info("RTCM过滤服务已启动 - UDP监听端口:{}, TCP服务端口:{}, 目标设备ID:{}",
|
||||||
|
UDP_PORT, TCP_PORT, TARGET_DEVICE_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启动UDP服务器监听12000端口
|
||||||
|
*/
|
||||||
|
private void startUdpServer() {
|
||||||
|
try (DatagramSocket socket = new DatagramSocket(UDP_PORT)) {
|
||||||
|
logger.info("UDP服务已启动,监听端口: {}", UDP_PORT);
|
||||||
|
byte[] buffer = new byte[BUFFER_SIZE];
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
|
||||||
|
socket.receive(packet);
|
||||||
|
|
||||||
|
// 解析数据包
|
||||||
|
byte[] data = new byte[packet.getLength()];
|
||||||
|
System.arraycopy(packet.getData(), packet.getOffset(), data, 0, packet.getLength());
|
||||||
|
|
||||||
|
// 提取设备ID
|
||||||
|
String deviceId = extractDeviceId(data);
|
||||||
|
|
||||||
|
// 如果是目标设备ID,则转发到TCP客户端
|
||||||
|
if (TARGET_DEVICE_ID.equals(deviceId)) {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("接收到目标设备 {} 的RTCM数据,长度: {}", deviceId, data.length);
|
||||||
|
}
|
||||||
|
forwardToTcpClients(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("UDP服务异常: {}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启动TCP服务器监听12002端口
|
||||||
|
*/
|
||||||
|
private void startTcpServer() {
|
||||||
|
try {
|
||||||
|
Selector selector = Selector.open();
|
||||||
|
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||||
|
serverChannel.configureBlocking(false);
|
||||||
|
serverChannel.socket().bind(new InetSocketAddress(TCP_PORT));
|
||||||
|
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||||
|
|
||||||
|
logger.info("TCP服务已启动,监听端口: {}", TCP_PORT);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
selector.select();
|
||||||
|
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
|
||||||
|
|
||||||
|
while (keys.hasNext()) {
|
||||||
|
SelectionKey key = keys.next();
|
||||||
|
keys.remove();
|
||||||
|
|
||||||
|
if (!key.isValid()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (key.isAcceptable()) {
|
||||||
|
acceptConnection(selector, serverChannel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("TCP服务异常: {}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 接受新的TCP客户端连接
|
||||||
|
*/
|
||||||
|
private void acceptConnection(Selector selector, ServerSocketChannel serverChannel) throws IOException {
|
||||||
|
SocketChannel clientChannel = serverChannel.accept();
|
||||||
|
clientChannel.configureBlocking(false);
|
||||||
|
clientChannel.register(selector, SelectionKey.OP_READ);
|
||||||
|
|
||||||
|
tcpClients.add(clientChannel);
|
||||||
|
logger.info("新的TCP客户端已连接: {}", clientChannel.getRemoteAddress());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从数据包中提取设备ID
|
||||||
|
*/
|
||||||
|
private String extractDeviceId(byte[] data) {
|
||||||
|
// 按照NtripMessage格式解析,设备ID从第4字节开始,4字节无符号整型
|
||||||
|
if (data.length >= 8) {
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(data, 4, 4);
|
||||||
|
long deviceId = buffer.getInt() & 0xFFFFFFFFL; // 转为无符号整型
|
||||||
|
return String.valueOf(deviceId);
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将数据转发到所有TCP客户端
|
||||||
|
*/
|
||||||
|
private void forwardToTcpClients(byte[] data) {
|
||||||
|
if (tcpClients.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SocketChannel> disconnectedClients = new ArrayList<>();
|
||||||
|
|
||||||
|
for (SocketChannel client : tcpClients) {
|
||||||
|
try {
|
||||||
|
if (client.isOpen()) {
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(data);
|
||||||
|
client.write(buffer);
|
||||||
|
} else {
|
||||||
|
disconnectedClients.add(client);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("向TCP客户端发送数据失败: {}", e.getMessage());
|
||||||
|
disconnectedClients.add(client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 移除断开连接的客户端
|
||||||
|
for (SocketChannel client : disconnectedClients) {
|
||||||
|
tcpClients.remove(client);
|
||||||
|
try {
|
||||||
|
client.close();
|
||||||
|
logger.info("已关闭断开连接的TCP客户端");
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("关闭TCP客户端异常: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,40 +0,0 @@
|
|||||||
package com.imdroid.ntripproxy.service;
|
|
||||||
|
|
||||||
import io.netty.channel.ChannelHandler;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
@ChannelHandler.Sharable
|
|
||||||
public class RtcmTcpHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RtcmTcpHandler.class);
|
|
||||||
private final RtcmDistributor.DeviceTcpServer server;
|
|
||||||
|
|
||||||
public RtcmTcpHandler(RtcmDistributor.DeviceTcpServer server) {
|
|
||||||
this.server = server;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelActive(ChannelHandlerContext ctx) {
|
|
||||||
server.addClient(ctx.channel());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelInactive(ChannelHandlerContext ctx) {
|
|
||||||
server.removeClient(ctx.channel());
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
|
|
||||||
// 通常不需要处理从客户端接收的数据,因为我们只是将RTCM数据推送给客户端
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|
||||||
logger.error("Exception in TCP handler", cause);
|
|
||||||
ctx.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,83 +0,0 @@
|
|||||||
package com.imdroid.ntripproxy.service;
|
|
||||||
|
|
||||||
import com.imdroid.common.util.DataTypeUtil;
|
|
||||||
import io.netty.channel.ChannelHandler;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
||||||
import io.netty.channel.socket.DatagramPacket;
|
|
||||||
import io.netty.util.ReferenceCountUtil;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
@ChannelHandler.Sharable
|
|
||||||
@Component
|
|
||||||
public class RtcmUdpHandler extends ChannelInboundHandlerAdapter {
|
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RtcmDistributor rtcmDistributor;
|
|
||||||
|
|
||||||
// 默认设备ID,可以通过命令行设置
|
|
||||||
private AtomicReference<String> defaultDeviceId = new AtomicReference<>(null);
|
|
||||||
|
|
||||||
public void setDefaultDeviceId(String deviceId) {
|
|
||||||
defaultDeviceId.set(deviceId);
|
|
||||||
logger.info("设置默认设备ID为: {}", deviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getDefaultDeviceId() {
|
|
||||||
return defaultDeviceId.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
DatagramPacket packet = (DatagramPacket) msg;
|
|
||||||
try {
|
|
||||||
if (packet.content() == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] data = new byte[packet.content().readableBytes()];
|
|
||||||
packet.content().getBytes(0, data);
|
|
||||||
|
|
||||||
String deviceId = defaultDeviceId.get();
|
|
||||||
if (deviceId == null || deviceId.isEmpty()) {
|
|
||||||
logger.info("收到RTCM数据,但未设置默认设备ID,无法转发");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果该设备有TCP服务器,则转发数据
|
|
||||||
if (rtcmDistributor.hasDeviceServer(deviceId)) {
|
|
||||||
rtcmDistributor.sendRtcmData(deviceId, data);
|
|
||||||
logger.info("已将{}字节的RTCM数据转发到设备{}的TCP服务器", data.length, deviceId);
|
|
||||||
|
|
||||||
// 打印前20个字节的十六进制表示,帮助调试
|
|
||||||
if (data.length > 0 && logger.isDebugEnabled()) {
|
|
||||||
StringBuilder hexData = new StringBuilder();
|
|
||||||
int limit = Math.min(20, data.length);
|
|
||||||
for (int i = 0; i < limit; i++) {
|
|
||||||
hexData.append(String.format("%02X ", data[i] & 0xFF));
|
|
||||||
}
|
|
||||||
logger.debug("RTCM数据样本: {}", hexData.toString());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.info("收到RTCM数据,但设备{}没有TCP服务器", deviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("处理RTCM数据时出错: {}", e.toString());
|
|
||||||
} finally {
|
|
||||||
ReferenceCountUtil.release(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
||||||
logger.error("RTCM UDP处理器异常: {}", cause.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,49 +0,0 @@
|
|||||||
package com.imdroid.ntripproxy.service;
|
|
||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
|
||||||
import io.netty.channel.ChannelFuture;
|
|
||||||
import io.netty.channel.ChannelOption;
|
|
||||||
import io.netty.channel.EventLoopGroup;
|
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
|
||||||
import io.netty.channel.socket.nio.NioDatagramChannel;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.ApplicationArguments;
|
|
||||||
import org.springframework.boot.ApplicationRunner;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
public class RtcmUdpServer implements ApplicationRunner {
|
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class);
|
|
||||||
private final int port = 9903; // 直接监听9903端口
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RtcmUdpHandler rtcmUdpHandler;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run(ApplicationArguments args) throws Exception {
|
|
||||||
new Thread(this::start0, "rtcm-server").start();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void start0() {
|
|
||||||
EventLoopGroup group = new NioEventLoopGroup();
|
|
||||||
Bootstrap bootstrap = new Bootstrap();
|
|
||||||
bootstrap.group(group)
|
|
||||||
.channel(NioDatagramChannel.class)
|
|
||||||
.option(ChannelOption.SO_REUSEADDR, true) // 允许端口重用
|
|
||||||
.option(ChannelOption.SO_SNDBUF, 1024*1024)
|
|
||||||
.option(ChannelOption.SO_RCVBUF, 1024*1024)
|
|
||||||
.handler(rtcmUdpHandler);
|
|
||||||
try {
|
|
||||||
ChannelFuture future = bootstrap.bind(port).sync().channel().closeFuture();
|
|
||||||
logger.info("RTCM UDP server started at port {}", port);
|
|
||||||
future.await();
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Error starting RTCM UDP server at port {}", port, e);
|
|
||||||
} finally {
|
|
||||||
group.shutdownGracefully();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -14,10 +14,6 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Layton
|
* @author Layton
|
||||||
* @date 2023/2/13 11:47
|
* @date 2023/2/13 11:47
|
||||||
@ -27,32 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||||||
public class UdpHandler extends ChannelInboundHandlerAdapter {
|
public class UdpHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(UdpHandler.class);
|
private static final Logger logger = LoggerFactory.getLogger(UdpHandler.class);
|
||||||
|
|
||||||
// 数据监控标志
|
|
||||||
private AtomicBoolean monitoringEnabled = new AtomicBoolean(false);
|
|
||||||
private Set<String> monitoredDevices = new HashSet<>();
|
|
||||||
|
|
||||||
public void enableMonitoring(boolean enable) {
|
|
||||||
monitoringEnabled.set(enable);
|
|
||||||
if (enable) {
|
|
||||||
logger.info("UDP数据监控已启用");
|
|
||||||
} else {
|
|
||||||
logger.info("UDP数据监控已禁用");
|
|
||||||
monitoredDevices.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addMonitoredDevice(String deviceId) {
|
|
||||||
if (deviceId != null && !deviceId.isEmpty()) {
|
|
||||||
monitoredDevices.add(deviceId);
|
|
||||||
logger.info("添加设备 {} 到监控列表", deviceId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void clearMonitoredDevices() {
|
|
||||||
monitoredDevices.clear();
|
|
||||||
logger.info("清空设备监控列表");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
@ -61,49 +31,6 @@ public class UdpHandler extends ChannelInboundHandlerAdapter {
|
|||||||
if (packet.content() == null) {
|
if (packet.content() == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 如果监控已启用,记录数据包信息
|
|
||||||
if (monitoringEnabled.get()) {
|
|
||||||
byte[] data = new byte[packet.content().readableBytes()];
|
|
||||||
packet.content().getBytes(0, data);
|
|
||||||
String sender = packet.sender().getAddress().getHostAddress() + ":" + packet.sender().getPort();
|
|
||||||
|
|
||||||
// 尝试解析设备ID (假设前4字节是header和len,接下来4字节是设备ID)
|
|
||||||
String deviceId = "unknown";
|
|
||||||
if (data.length >= 8) {
|
|
||||||
try {
|
|
||||||
int pos = 4; // 跳过header和len
|
|
||||||
long id = ((data[pos] & 0xFFL) << 24) |
|
|
||||||
((data[pos+1] & 0xFFL) << 16) |
|
|
||||||
((data[pos+2] & 0xFFL) << 8) |
|
|
||||||
(data[pos+3] & 0xFFL);
|
|
||||||
deviceId = String.valueOf(id);
|
|
||||||
} catch (Exception e) {
|
|
||||||
// 解析失败,保持unknown
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果没有指定监控设备或者当前设备在监控列表中
|
|
||||||
if (monitoredDevices.isEmpty() || monitoredDevices.contains(deviceId)) {
|
|
||||||
StringBuilder hexDump = new StringBuilder();
|
|
||||||
int limit = Math.min(50, data.length);
|
|
||||||
|
|
||||||
for (int i = 0; i < limit; i++) {
|
|
||||||
hexDump.append(String.format("%02X ", data[i] & 0xFF));
|
|
||||||
if ((i + 1) % 16 == 0) {
|
|
||||||
hexDump.append("\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.length > limit) {
|
|
||||||
hexDump.append("... (").append(data.length - limit).append(" more bytes)");
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("收到来自 {} 的UDP数据包,长度: {} 字节,设备ID: {}\n数据内容:\n{}",
|
|
||||||
sender, data.length, deviceId, hexDump.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
byte[] data = new byte[packet.content().readableBytes()];
|
byte[] data = new byte[packet.content().readableBytes()];
|
||||||
packet.content().getBytes(0, data);
|
packet.content().getBytes(0, data);
|
||||||
@ -129,4 +56,5 @@ public class UdpHandler extends ChannelInboundHandlerAdapter {
|
|||||||
super.exceptionCaught(ctx, cause);
|
super.exceptionCaught(ctx, cause);
|
||||||
logger.error("Exception caught: {}", cause.toString());
|
logger.error("Exception caught: {}", cause.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user