diff --git a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TCPClient.java b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TCPClient.java index 12288963..84508d9f 100644 --- a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TCPClient.java +++ b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TCPClient.java @@ -26,14 +26,16 @@ public class TCPClient { private EventLoopGroup group; private Channel channel; LocalDateTime connectTime = LocalDateTime.now(); + TCPListener listener; public void start() { new Thread(this::connect, "forwarder tcp-client").start(); } - public void init(String dest_addr, int dest_port) { + public void init(String dest_addr, int dest_port, TCPListener listener) { this.host = dest_addr; this.port = dest_port; + this.listener = listener; //客户端需要一个事件循环组 group = new NioEventLoopGroup(); @@ -90,6 +92,9 @@ public class TCPClient { logger.info("send to tcp:"+host+" succeed."); } else { logger.info("send to tcp:"+host+" failed."); + if(listener!=null){ + listener.onMessage("failed"); + } } }); } @@ -113,4 +118,10 @@ public class TCPClient { } } + public void onMessage(String msg){ + if(listener!=null){ + listener.onMessage(msg); + } + } + } diff --git a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TCPListener.java b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TCPListener.java index 4b17a4d5..649bc4f5 100644 --- a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TCPListener.java +++ b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TCPListener.java @@ -3,5 +3,5 @@ package com.imdroid.beidou_fwd.service; public interface TCPListener { void onConnected(); void onDisconnect(); - void onSendResult(boolean isOK); + void onMessage(String msg); } diff --git a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TcpMessageHandler.java b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TcpMessageHandler.java index 549ffc17..f9ed7504 100644 --- a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TcpMessageHandler.java +++ b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/service/TcpMessageHandler.java @@ -1,12 +1,13 @@ package com.imdroid.beidou_fwd.service; -import com.imdroid.common.util.DataTypeUtil; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.Charset; + /** * @author Layton * @date 2023/2/18 20:36 @@ -23,28 +24,28 @@ public class TcpMessageHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf buf) throws Exception { + String msg = buf.toString(Charset.defaultCharset()); + tcpClient.onMessage(msg); if (logger.isDebugEnabled()) { - byte[] data = new byte[buf.readableBytes()]; - buf.getBytes(0, data); - logger.debug("receive server message:" + DataTypeUtil.getHexString(data)); + logger.debug("receive server message:" + msg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - logger.info("xfz tcp channel active"); + logger.info("tcp channel active"); tcpClient.onConnected(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - logger.info("xfz tcp channel inactive"); + logger.info("tcp channel inactive"); tcpClient.onDisconnect(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("XFZTcpMessageHandler error: {}", cause.toString()); + logger.error("TcpMessageHandler error: {}", cause.toString()); ctx.close(); } diff --git a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/Forwarder.java b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/Forwarder.java index e7fa292f..c6127577 100644 --- a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/Forwarder.java +++ b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/Forwarder.java @@ -241,4 +241,4 @@ public class Forwarder { fwdMapper.updateById(gnssGroupFwd); } } -} +} \ No newline at end of file diff --git a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/GXXfzForwarder.java b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/GXXfzForwarder.java index 95130a4f..75d0b182 100644 --- a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/GXXfzForwarder.java +++ b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/GXXfzForwarder.java @@ -2,6 +2,7 @@ package com.imdroid.beidou_fwd.task; import com.imdroid.beidou_fwd.entity.XFZData; import com.imdroid.beidou_fwd.service.TCPClient; +import com.imdroid.beidou_fwd.service.TCPListener; import com.imdroid.common.util.GsonUtil; import com.imdroid.common.util.NumberUtils; import com.imdroid.secapi.dto.*; @@ -36,11 +37,37 @@ public class GXXfzForwarder extends Forwarder{ final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + static class XFZTCPListener implements TCPListener{ + public static final int STATE_NO_ACK = 0; + public static final int STATE_OK = 1; + public static final int STATE_FAILED = 2; + public int state = STATE_NO_ACK; + + public void clear(){ + state = STATE_NO_ACK; + } + @Override + public void onConnected() { + + } + + @Override + public void onDisconnect() { + + } + + @Override + public void onMessage(String msg) { + if(msg.contains("succeed")) state = STATE_OK; + else state = STATE_FAILED; + } + } + XFZTCPListener listener = new XFZTCPListener(); @PostConstruct void registerMe(){ init(FORWARDER_NAME, "TCP "+host+":"+port,1,false,30); xfzTcpClient = new TCPClient(); - xfzTcpClient.init(host, port); + xfzTcpClient.init(host, port,listener); xfzTcpClient.start(); } @@ -81,12 +108,25 @@ public class GXXfzForwarder extends Forwarder{ logger.info("project " + projectId + ": push calculation result to XFZ"); logger.info(json); try { + listener.clear(); xfzTcpClient.writeAndFlush(json); - Thread.sleep(1000); + //等待应答 + if(!checkResult()) sendNum = 0; } catch (Exception e1) { + sendNum = 0; e1.printStackTrace(); } return sendNum; } + boolean checkResult() throws InterruptedException { + // 等待应答,最多等1s + for(int i=0; i<10; i++){ + Thread.sleep(100); + if(listener.state == XFZTCPListener.STATE_OK) return true; + else if(listener.state == XFZTCPListener.STATE_FAILED) return false; + } + return false; + } + } diff --git a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/SaasForwarder.java b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/SaasForwarder.java index 7da0ba74..1be99bc6 100644 --- a/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/SaasForwarder.java +++ b/sec-beidou-fwd/src/main/java/com/imdroid/beidou_fwd/task/SaasForwarder.java @@ -2,6 +2,7 @@ package com.imdroid.beidou_fwd.task; import com.imdroid.beidou_fwd.entity.XFZData; import com.imdroid.beidou_fwd.service.TCPClient; +import com.imdroid.beidou_fwd.service.TCPListener; import com.imdroid.common.util.GsonUtil; import com.imdroid.common.util.NumberUtils; import com.imdroid.secapi.dto.GnssCalcData; @@ -35,19 +36,44 @@ public class SaasForwarder extends Forwarder{ private TCPClient tcpClient; final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + static class MyTCPListener implements TCPListener { + public static final int STATE_NO_ACK = 0; + public static final int STATE_OK = 1; + public static final int STATE_FAILED = 2; + public int state = STATE_NO_ACK; + public void clear(){ + state = STATE_NO_ACK; + } + @Override + public void onConnected() { + + } + + @Override + public void onDisconnect() { + + } + + @Override + public void onMessage(String msg) { + if(msg.contains("succeed")) state = STATE_OK; + else state = STATE_FAILED; + } + } + MyTCPListener listener = new MyTCPListener(); @PostConstruct void registerMe(){ init(FORWARDER_NAME, "TCP "+host+":"+port,0,false,30); tcpClient = new TCPClient(); - tcpClient.init(host, port); + tcpClient.init(host, port,listener); tcpClient.start(); } /** * 每半小时转发GNSS解算结果 */ - @Scheduled(cron = "0 20,50 * * * ?") // 每30分钟执行一次 + @Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次 private void forwardGnss() { logger.info("saas forwardGnss"); forwardCurrentGnss(); @@ -81,12 +107,24 @@ public class SaasForwarder extends Forwarder{ logger.info("project " + projectId + ": push calculation result to SAAS"); logger.info(json); try { + listener.clear(); tcpClient.writeAndFlush(json); - Thread.sleep(1000); + if(!checkResult()) sendNum = 0; } catch (Exception e1) { e1.printStackTrace(); + sendNum = 0; } return sendNum; } + boolean checkResult() throws InterruptedException { + // 等待应答,最多等1s + for(int i=0; i<10; i++){ + Thread.sleep(100); + if(listener.state == GXXfzForwarder.XFZTCPListener.STATE_OK) return true; + else if(listener.state == GXXfzForwarder.XFZTCPListener.STATE_FAILED) return false; + } + return false; + } + }