1、增加ntrip proxy服务

This commit is contained in:
weidong 2024-06-06 14:33:15 +08:00
parent fe8eb2e07d
commit 4ffc6e732d
8 changed files with 129 additions and 24 deletions

View File

@ -0,0 +1,40 @@
package com.imdroid.ntripproxy.executor;
import com.imdroid.ntripproxy.message.D331RtcmMessage;
import com.imdroid.ntripproxy.service.UDPClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author Layton
* @date 2023/2/2 20:49
*/
@Component
public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
UDPClient rtcmClient;
@Override
public Void execute(D331RtcmMessage message) {
String id = message.getId();
// 补齐tenantId
if (logger.isDebugEnabled()) {
logger.debug("receive d331 message of device: "+id+", seq:"+message.getSeq()+", len:"+message.getLen());
}
// 推送基站数据
byte[] srcData = new byte[message.getSrcBuf().readableBytes()];
message.getSrcBuf().getBytes(0,srcData);
rtcmClient.sendMessage(srcData);
return null;
}
@Override
public Class<?> getMessageType() {
return D331RtcmMessage.class;
}
}

View File

@ -1,6 +1,5 @@
package com.imdroid.ntripproxy.executor; package com.imdroid.ntripproxy.executor;
import com.imdroid.common.util.ThreadManager;
import com.imdroid.ntripproxy.message.D341GgaMessage; import com.imdroid.ntripproxy.message.D341GgaMessage;
import com.imdroid.ntripproxy.service.UDPClient; import com.imdroid.ntripproxy.service.UDPClient;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -19,18 +18,30 @@ public class D341GgaMessageExecutor implements Executor<D341GgaMessage, Void> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired @Autowired
UDPClient udpClient; UDPClient ntripClient;
@Autowired
UDPClient rtcmClient;
@Override @Override
public Void execute(D341GgaMessage message) { public Void execute(D341GgaMessage message) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("receive d341 message of device: "+message.getId()+", seq:"+message.getSeq()+", len:"+message.getLen()); logger.debug("receive d341 message of device: "+message.getId()+", seq:"+message.getSeq()+", len:"+message.getLen());
} }
// 发给ntrip server // 发给ntrip server
ThreadManager.getFixedThreadPool().submit(() -> { try {
byte[] data = new byte[message.getTransBuf().readableBytes()]; byte[] transData = new byte[message.getTransBuf().readableBytes()];
message.getTransBuf().getBytes(0,data); message.getTransBuf().getBytes(0, transData);
udpClient.sendMessage(data); byte[] srcData = new byte[message.getSrcBuf().readableBytes()];
}); message.getSrcBuf().getBytes(0, srcData);
// send to ntrip server
ntripClient.sendMessage(transData);
// send to rtcm server
rtcmClient.sendMessage(srcData);
}
catch (Exception e){
logger.error("Exception {}",e);
}
return null; return null;
} }

View File

@ -21,6 +21,7 @@ public class MessageParser {
} }
static { static {
types.put((short)0xd331, D331RtcmMessage.class);
types.put((short)0xd333, D333RtcmMessage.class); types.put((short)0xd333, D333RtcmMessage.class);
types.put((short)0xd341, D341GgaMessage.class); types.put((short)0xd341, D341GgaMessage.class);
} }

View File

@ -0,0 +1,19 @@
package com.imdroid.ntripproxy.message;
import io.netty.buffer.ByteBuf;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author Layton
* @date 2023/2/2 20:50
*/
@Data
@EqualsAndHashCode(callSuper=true)
public class D331RtcmMessage extends NtripMessage {
@Override
public void decodeBody(ByteBuf src) {
fromDevice = true;
}
}

View File

@ -21,6 +21,7 @@ public abstract class NtripMessage {
boolean fromDevice = false; boolean fromDevice = false;
ByteBuf srcBuf;
ByteBuf transBuf; ByteBuf transBuf;
public void decode(ByteBuf src) { public void decode(ByteBuf src) {
@ -33,7 +34,7 @@ public abstract class NtripMessage {
this.seq = this.len >> 11; this.seq = this.len >> 11;
this.len = this.len & 0x7FF; this.len = this.len & 0x7FF;
this.id = String.valueOf(src.getUnsignedInt(pos)); this.id = String.valueOf(src.getUnsignedInt(pos));
srcBuf = src;
} }
decodeBody(src); decodeBody(src);
} }

View File

@ -0,0 +1,20 @@
package com.imdroid.ntripproxy.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class NtripClient extends UDPClient{
@Value("${ntrip.server.host}")
private String ntripHost;
@Value("${ntrip.server.port}")
private int ntripPort;
@PostConstruct
void init(){
super.init(ntripHost, ntripPort);
}
}

View File

@ -0,0 +1,20 @@
package com.imdroid.ntripproxy.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class RtcmClient extends UDPClient{
@Value("${rtcm.server.host}")
private String ntripHost;
@Value("${rtcm.server.port}")
private int ntripPort;
@PostConstruct
void init(){
super.init(ntripHost, ntripPort);
}
}

View File

@ -1,17 +1,11 @@
package com.imdroid.ntripproxy.service; package com.imdroid.ntripproxy.service;
import com.imdroid.common.util.DataTypeUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
@Service
public class UDPClient { public class UDPClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
@ -19,18 +13,17 @@ public class UDPClient {
private InetAddress inetAddress; private InetAddress inetAddress;
@Value("${ntrip.server.host}")
private String host; private String host;
@Value("${ntrip.server.port}")
private int port; private int port;
@PostConstruct public void init(String host, int port) {
public void init() { this.host = host;
this.port = port;
try { try {
logger.info("UDP client init "+host + ":" + port); logger.info("UDP client init "+this.host + ":" + this.port);
this.socket = new DatagramSocket(); this.socket = new DatagramSocket();
this.inetAddress = InetAddress.getByName(host); this.inetAddress = InetAddress.getByName(this.host);
} catch (Exception e) { } catch (Exception e) {
logger.error("初始化udp客户端失败", e); logger.error("初始化udp客户端失败", e);
} }
@ -38,13 +31,13 @@ public class UDPClient {
public void sendMessage(byte[] data) { public void sendMessage(byte[] data) {
try { try {
if (logger.isDebugEnabled()) { /*if (logger.isDebugEnabled()) {
logger.debug("推送gga到NTRIP Server:{}", DataTypeUtil.getHexString(data)); logger.debug("推送udp {}: {}", port, DataTypeUtil.getHexString(data));
} }*/
DatagramPacket packet = new DatagramPacket(data, data.length, inetAddress, port); DatagramPacket packet = new DatagramPacket(data, data.length, inetAddress, port);
socket.send(packet); socket.send(packet);
} catch (Exception e) { } catch (Exception e) {
logger.error("推送ntrip异常:", e); logger.error("推送异常:", e);
} }
} }
} }