1、增加tcp推送应答的处理

This commit is contained in:
weidong 2024-02-05 23:04:56 +08:00
parent 922808548b
commit d2f42deb26
6 changed files with 105 additions and 15 deletions

View File

@ -26,14 +26,16 @@ public class TCPClient {
private EventLoopGroup group; private EventLoopGroup group;
private Channel channel; private Channel channel;
LocalDateTime connectTime = LocalDateTime.now(); LocalDateTime connectTime = LocalDateTime.now();
TCPListener listener;
public void start() { public void start() {
new Thread(this::connect, "forwarder tcp-client").start(); new Thread(this::connect, "forwarder tcp-client").start();
} }
public void init(String dest_addr, int dest_port) { public void init(String dest_addr, int dest_port, TCPListener listener) {
this.host = dest_addr; this.host = dest_addr;
this.port = dest_port; this.port = dest_port;
this.listener = listener;
//客户端需要一个事件循环组 //客户端需要一个事件循环组
group = new NioEventLoopGroup(); group = new NioEventLoopGroup();
@ -90,6 +92,9 @@ public class TCPClient {
logger.info("send to tcp:"+host+" succeed."); logger.info("send to tcp:"+host+" succeed.");
} else { } else {
logger.info("send to tcp:"+host+" failed."); logger.info("send to tcp:"+host+" failed.");
if(listener!=null){
listener.onMessage("failed");
}
} }
}); });
} }
@ -113,4 +118,10 @@ public class TCPClient {
} }
} }
public void onMessage(String msg){
if(listener!=null){
listener.onMessage(msg);
}
}
} }

View File

@ -3,5 +3,5 @@ package com.imdroid.beidou_fwd.service;
public interface TCPListener { public interface TCPListener {
void onConnected(); void onConnected();
void onDisconnect(); void onDisconnect();
void onSendResult(boolean isOK); void onMessage(String msg);
} }

View File

@ -1,12 +1,13 @@
package com.imdroid.beidou_fwd.service; package com.imdroid.beidou_fwd.service;
import com.imdroid.common.util.DataTypeUtil;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
/** /**
* @author Layton * @author Layton
* @date 2023/2/18 20:36 * @date 2023/2/18 20:36
@ -23,28 +24,28 @@ public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf buf) throws Exception { protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf buf) throws Exception {
String msg = buf.toString(Charset.defaultCharset());
tcpClient.onMessage(msg);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
byte[] data = new byte[buf.readableBytes()]; logger.debug("receive server message:" + msg);
buf.getBytes(0, data);
logger.debug("receive server message:" + DataTypeUtil.getHexString(data));
} }
} }
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("xfz tcp channel active"); logger.info("tcp channel active");
tcpClient.onConnected(); tcpClient.onConnected();
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("xfz tcp channel inactive"); logger.info("tcp channel inactive");
tcpClient.onDisconnect(); tcpClient.onDisconnect();
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("XFZTcpMessageHandler error: {}", cause.toString()); logger.error("TcpMessageHandler error: {}", cause.toString());
ctx.close(); ctx.close();
} }

View File

@ -241,4 +241,4 @@ public class Forwarder {
fwdMapper.updateById(gnssGroupFwd); fwdMapper.updateById(gnssGroupFwd);
} }
} }
} }

View File

@ -2,6 +2,7 @@ package com.imdroid.beidou_fwd.task;
import com.imdroid.beidou_fwd.entity.XFZData; import com.imdroid.beidou_fwd.entity.XFZData;
import com.imdroid.beidou_fwd.service.TCPClient; import com.imdroid.beidou_fwd.service.TCPClient;
import com.imdroid.beidou_fwd.service.TCPListener;
import com.imdroid.common.util.GsonUtil; import com.imdroid.common.util.GsonUtil;
import com.imdroid.common.util.NumberUtils; import com.imdroid.common.util.NumberUtils;
import com.imdroid.secapi.dto.*; import com.imdroid.secapi.dto.*;
@ -36,11 +37,37 @@ public class GXXfzForwarder extends Forwarder{
final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
static class XFZTCPListener implements TCPListener{
public static final int STATE_NO_ACK = 0;
public static final int STATE_OK = 1;
public static final int STATE_FAILED = 2;
public int state = STATE_NO_ACK;
public void clear(){
state = STATE_NO_ACK;
}
@Override
public void onConnected() {
}
@Override
public void onDisconnect() {
}
@Override
public void onMessage(String msg) {
if(msg.contains("succeed")) state = STATE_OK;
else state = STATE_FAILED;
}
}
XFZTCPListener listener = new XFZTCPListener();
@PostConstruct @PostConstruct
void registerMe(){ void registerMe(){
init(FORWARDER_NAME, "TCP "+host+":"+port,1,false,30); init(FORWARDER_NAME, "TCP "+host+":"+port,1,false,30);
xfzTcpClient = new TCPClient(); xfzTcpClient = new TCPClient();
xfzTcpClient.init(host, port); xfzTcpClient.init(host, port,listener);
xfzTcpClient.start(); xfzTcpClient.start();
} }
@ -81,12 +108,25 @@ public class GXXfzForwarder extends Forwarder{
logger.info("project " + projectId + ": push calculation result to XFZ"); logger.info("project " + projectId + ": push calculation result to XFZ");
logger.info(json); logger.info(json);
try { try {
listener.clear();
xfzTcpClient.writeAndFlush(json); xfzTcpClient.writeAndFlush(json);
Thread.sleep(1000); //等待应答
if(!checkResult()) sendNum = 0;
} catch (Exception e1) { } catch (Exception e1) {
sendNum = 0;
e1.printStackTrace(); e1.printStackTrace();
} }
return sendNum; return sendNum;
} }
boolean checkResult() throws InterruptedException {
// 等待应答最多等1s
for(int i=0; i<10; i++){
Thread.sleep(100);
if(listener.state == XFZTCPListener.STATE_OK) return true;
else if(listener.state == XFZTCPListener.STATE_FAILED) return false;
}
return false;
}
} }

View File

@ -2,6 +2,7 @@ package com.imdroid.beidou_fwd.task;
import com.imdroid.beidou_fwd.entity.XFZData; import com.imdroid.beidou_fwd.entity.XFZData;
import com.imdroid.beidou_fwd.service.TCPClient; import com.imdroid.beidou_fwd.service.TCPClient;
import com.imdroid.beidou_fwd.service.TCPListener;
import com.imdroid.common.util.GsonUtil; import com.imdroid.common.util.GsonUtil;
import com.imdroid.common.util.NumberUtils; import com.imdroid.common.util.NumberUtils;
import com.imdroid.secapi.dto.GnssCalcData; import com.imdroid.secapi.dto.GnssCalcData;
@ -35,19 +36,44 @@ public class SaasForwarder extends Forwarder{
private TCPClient tcpClient; private TCPClient tcpClient;
final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
static class MyTCPListener implements TCPListener {
public static final int STATE_NO_ACK = 0;
public static final int STATE_OK = 1;
public static final int STATE_FAILED = 2;
public int state = STATE_NO_ACK;
public void clear(){
state = STATE_NO_ACK;
}
@Override
public void onConnected() {
}
@Override
public void onDisconnect() {
}
@Override
public void onMessage(String msg) {
if(msg.contains("succeed")) state = STATE_OK;
else state = STATE_FAILED;
}
}
MyTCPListener listener = new MyTCPListener();
@PostConstruct @PostConstruct
void registerMe(){ void registerMe(){
init(FORWARDER_NAME, "TCP "+host+":"+port,0,false,30); init(FORWARDER_NAME, "TCP "+host+":"+port,0,false,30);
tcpClient = new TCPClient(); tcpClient = new TCPClient();
tcpClient.init(host, port); tcpClient.init(host, port,listener);
tcpClient.start(); tcpClient.start();
} }
/** /**
* 每半小时转发GNSS解算结果 * 每半小时转发GNSS解算结果
*/ */
@Scheduled(cron = "0 20,50 * * * ?") // 每30分钟执行一次 @Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() { private void forwardGnss() {
logger.info("saas forwardGnss"); logger.info("saas forwardGnss");
forwardCurrentGnss(); forwardCurrentGnss();
@ -81,12 +107,24 @@ public class SaasForwarder extends Forwarder{
logger.info("project " + projectId + ": push calculation result to SAAS"); logger.info("project " + projectId + ": push calculation result to SAAS");
logger.info(json); logger.info(json);
try { try {
listener.clear();
tcpClient.writeAndFlush(json); tcpClient.writeAndFlush(json);
Thread.sleep(1000); if(!checkResult()) sendNum = 0;
} catch (Exception e1) { } catch (Exception e1) {
e1.printStackTrace(); e1.printStackTrace();
sendNum = 0;
} }
return sendNum; return sendNum;
} }
boolean checkResult() throws InterruptedException {
// 等待应答最多等1s
for(int i=0; i<10; i++){
Thread.sleep(100);
if(listener.state == GXXfzForwarder.XFZTCPListener.STATE_OK) return true;
else if(listener.state == GXXfzForwarder.XFZTCPListener.STATE_FAILED) return false;
}
return false;
}
} }