From 4ffc6e732da471fd59e7c6c9278c809075bcfd04 Mon Sep 17 00:00:00 2001 From: weidong Date: Thu, 6 Jun 2024 14:33:15 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=A2=9E=E5=8A=A0ntrip=20proxy?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/D331RtcmMessageExecutor.java | 40 +++++++++++++++++++ .../executor/D341GgaMessageExecutor.java | 25 ++++++++---- .../ntripproxy/executor/MessageParser.java | 1 + .../ntripproxy/message/D331RtcmMessage.java | 19 +++++++++ .../ntripproxy/message/NtripMessage.java | 3 +- .../ntripproxy/service/NtripClient.java | 20 ++++++++++ .../ntripproxy/service/RtcmClient.java | 20 ++++++++++ .../imdroid/ntripproxy/service/UDPClient.java | 25 +++++------- 8 files changed, 129 insertions(+), 24 deletions(-) create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D331RtcmMessageExecutor.java create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/message/D331RtcmMessage.java create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/NtripClient.java create mode 100644 sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmClient.java diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D331RtcmMessageExecutor.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D331RtcmMessageExecutor.java new file mode 100644 index 00000000..194a7828 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D331RtcmMessageExecutor.java @@ -0,0 +1,40 @@ +package com.imdroid.ntripproxy.executor; + +import com.imdroid.ntripproxy.message.D331RtcmMessage; +import com.imdroid.ntripproxy.service.UDPClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + +/** + * @author Layton + * @date 2023/2/2 20:49 + */ +@Component +public class D331RtcmMessageExecutor implements Executor { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + @Autowired + UDPClient rtcmClient; + @Override + public Void execute(D331RtcmMessage message) { + String id = message.getId(); + // 补齐tenantId + if (logger.isDebugEnabled()) { + logger.debug("receive d331 message of device: "+id+", seq:"+message.getSeq()+", len:"+message.getLen()); + } + // 推送基站数据 + byte[] srcData = new byte[message.getSrcBuf().readableBytes()]; + message.getSrcBuf().getBytes(0,srcData); + rtcmClient.sendMessage(srcData); + + return null; + } + + @Override + public Class getMessageType() { + return D331RtcmMessage.class; + } +} diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D341GgaMessageExecutor.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D341GgaMessageExecutor.java index 4b58a570..6d2fccab 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D341GgaMessageExecutor.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/D341GgaMessageExecutor.java @@ -1,6 +1,5 @@ package com.imdroid.ntripproxy.executor; -import com.imdroid.common.util.ThreadManager; import com.imdroid.ntripproxy.message.D341GgaMessage; import com.imdroid.ntripproxy.service.UDPClient; import org.slf4j.Logger; @@ -19,18 +18,30 @@ public class D341GgaMessageExecutor implements Executor { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired - UDPClient udpClient; + UDPClient ntripClient; + @Autowired + UDPClient rtcmClient; + @Override public Void execute(D341GgaMessage message) { if (logger.isDebugEnabled()) { logger.debug("receive d341 message of device: "+message.getId()+", seq:"+message.getSeq()+", len:"+message.getLen()); } // 发给ntrip server - ThreadManager.getFixedThreadPool().submit(() -> { - byte[] data = new byte[message.getTransBuf().readableBytes()]; - message.getTransBuf().getBytes(0,data); - udpClient.sendMessage(data); - }); + try { + byte[] transData = new byte[message.getTransBuf().readableBytes()]; + message.getTransBuf().getBytes(0, transData); + byte[] srcData = new byte[message.getSrcBuf().readableBytes()]; + message.getSrcBuf().getBytes(0, srcData); + + // send to ntrip server + ntripClient.sendMessage(transData); + // send to rtcm server + rtcmClient.sendMessage(srcData); + } + catch (Exception e){ + logger.error("Exception {}",e); + } return null; } diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/MessageParser.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/MessageParser.java index ae4289b2..9c31965a 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/MessageParser.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/executor/MessageParser.java @@ -21,6 +21,7 @@ public class MessageParser { } static { + types.put((short)0xd331, D331RtcmMessage.class); types.put((short)0xd333, D333RtcmMessage.class); types.put((short)0xd341, D341GgaMessage.class); } diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/message/D331RtcmMessage.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/message/D331RtcmMessage.java new file mode 100644 index 00000000..83472d54 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/message/D331RtcmMessage.java @@ -0,0 +1,19 @@ +package com.imdroid.ntripproxy.message; + +import io.netty.buffer.ByteBuf; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author Layton + * @date 2023/2/2 20:50 + */ +@Data +@EqualsAndHashCode(callSuper=true) +public class D331RtcmMessage extends NtripMessage { + @Override + public void decodeBody(ByteBuf src) { + fromDevice = true; + } + +} diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/message/NtripMessage.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/message/NtripMessage.java index 80cf7099..af1a4fff 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/message/NtripMessage.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/message/NtripMessage.java @@ -21,6 +21,7 @@ public abstract class NtripMessage { boolean fromDevice = false; + ByteBuf srcBuf; ByteBuf transBuf; public void decode(ByteBuf src) { @@ -33,7 +34,7 @@ public abstract class NtripMessage { this.seq = this.len >> 11; this.len = this.len & 0x7FF; this.id = String.valueOf(src.getUnsignedInt(pos)); - + srcBuf = src; } decodeBody(src); } diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/NtripClient.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/NtripClient.java new file mode 100644 index 00000000..ace3d69c --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/NtripClient.java @@ -0,0 +1,20 @@ +package com.imdroid.ntripproxy.service; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +@Service +public class NtripClient extends UDPClient{ + @Value("${ntrip.server.host}") + private String ntripHost; + + @Value("${ntrip.server.port}") + private int ntripPort; + + @PostConstruct + void init(){ + super.init(ntripHost, ntripPort); + } +} diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmClient.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmClient.java new file mode 100644 index 00000000..87145e41 --- /dev/null +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/RtcmClient.java @@ -0,0 +1,20 @@ +package com.imdroid.ntripproxy.service; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +@Service +public class RtcmClient extends UDPClient{ + @Value("${rtcm.server.host}") + private String ntripHost; + + @Value("${rtcm.server.port}") + private int ntripPort; + + @PostConstruct + void init(){ + super.init(ntripHost, ntripPort); + } +} diff --git a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/UDPClient.java b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/UDPClient.java index 21a5d54f..5f333d72 100644 --- a/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/UDPClient.java +++ b/sec-ntrip-proxy/src/main/java/com/imdroid/ntripproxy/service/UDPClient.java @@ -1,17 +1,11 @@ package com.imdroid.ntripproxy.service; -import com.imdroid.common.util.DataTypeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; -@Service public class UDPClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -19,18 +13,17 @@ public class UDPClient { private InetAddress inetAddress; - @Value("${ntrip.server.host}") private String host; - @Value("${ntrip.server.port}") private int port; - @PostConstruct - public void init() { + public void init(String host, int port) { + this.host = host; + this.port = port; try { - logger.info("UDP client init "+host + ":" + port); + logger.info("UDP client init "+this.host + ":" + this.port); this.socket = new DatagramSocket(); - this.inetAddress = InetAddress.getByName(host); + this.inetAddress = InetAddress.getByName(this.host); } catch (Exception e) { logger.error("初始化udp客户端失败:", e); } @@ -38,13 +31,13 @@ public class UDPClient { public void sendMessage(byte[] data) { try { - if (logger.isDebugEnabled()) { - logger.debug("推送gga到NTRIP Server:{}", DataTypeUtil.getHexString(data)); - } + /*if (logger.isDebugEnabled()) { + logger.debug("推送udp {}: {}", port, DataTypeUtil.getHexString(data)); + }*/ DatagramPacket packet = new DatagramPacket(data, data.length, inetAddress, port); socket.send(packet); } catch (Exception e) { - logger.error("推送ntrip异常:", e); + logger.error("推送异常:", e); } } }