1、修改TCP推送的问题

This commit is contained in:
weidong 2024-01-31 07:56:59 +08:00
parent 1bb6946bf8
commit 3a7350f0f7
5 changed files with 37 additions and 51 deletions

View File

@ -1,5 +1,6 @@
package com.imdroid.beidou_fwd.service; package com.imdroid.beidou_fwd.service;
import com.imdroid.common.util.ThreadManager;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -11,6 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class TCPClient { public class TCPClient {
@ -25,17 +27,15 @@ public class TCPClient {
private Bootstrap bootstrap; private Bootstrap bootstrap;
private EventLoopGroup group; private EventLoopGroup group;
private Channel channel; private Channel channel;
ByteBuf sendBuffer; LocalDateTime connectTime = LocalDateTime.now();
TCPListener listener;
public void start() { public void start() {
new Thread(this::connect, "xfz-tcp-client").start(); new Thread(this::connect, "xfz-tcp-client").start();
} }
public void init(String dest_addr, int dest_port, TCPListener listener) { public void init(String dest_addr, int dest_port) {
this.host = dest_addr; this.host = dest_addr;
this.port = dest_port; this.port = dest_port;
this.listener = listener;
//客户端需要一个事件循环组 //客户端需要一个事件循环组
group = new NioEventLoopGroup(); group = new NioEventLoopGroup();
@ -85,33 +85,34 @@ public class TCPClient {
} }
public void writeAndFlush(String json) { public void writeAndFlush(String json) {
sendBuffer = Unpooled.buffer(); ByteBuf sendBuffer = Unpooled.buffer();
sendBuffer.writeBytes(json.getBytes(StandardCharsets.UTF_8)); sendBuffer.writeBytes(json.getBytes(StandardCharsets.UTF_8));
if(channel.isActive()) flush();
else connect();
}
public void onConnected(){
listener.onConnected();
flush();
}
public void onDisconnect(){
listener.onDisconnect();
}
private void flush(){
if(sendBuffer!=null && sendBuffer.readableBytes()>0){
channel.writeAndFlush(sendBuffer).addListener(future -> { channel.writeAndFlush(sendBuffer).addListener(future -> {
if (future.isSuccess()) { if (future.isSuccess()) {
logger.info("send to xfz server succeed."); logger.info("send to xfz server succeed.");
//sendBuffer.release(); // writeAndFlush后就释放了
listener.onSendResult(true);
} else { } else {
logger.info("send to xfz server failed."); logger.info("send to xfz server failed.");
listener.onSendResult(false);
} }
}); });
} }
public void onConnected(){
connectTime = LocalDateTime.now();
}
public void onDisconnect(){
if(connectTime.isBefore(LocalDateTime.now().minusMinutes(1))) {
connect();
}
else{
ThreadManager.getScheduledThreadPool().schedule(() -> {
try {
connect();
} catch (Exception e) {
logger.error(e.toString());
}
},60, TimeUnit.SECONDS);
} }
} }
}

View File

@ -2,7 +2,6 @@ package com.imdroid.beidou_fwd.task;
import com.imdroid.beidou_fwd.entity.XFZData; import com.imdroid.beidou_fwd.entity.XFZData;
import com.imdroid.beidou_fwd.service.TCPClient; import com.imdroid.beidou_fwd.service.TCPClient;
import com.imdroid.beidou_fwd.service.TCPListener;
import com.imdroid.common.util.GsonUtil; import com.imdroid.common.util.GsonUtil;
import com.imdroid.common.util.NumberUtils; import com.imdroid.common.util.NumberUtils;
import com.imdroid.secapi.dto.*; import com.imdroid.secapi.dto.*;
@ -36,41 +35,24 @@ public class GXXfzForwarder extends Forwarder{
final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
static class GXXfzTCPListener implements TCPListener{
@Override
public void onConnected() {
}
@Override
public void onDisconnect() {
}
@Override
public void onSendResult(boolean isOK) {
}
}
@PostConstruct @PostConstruct
void registerMe(){ void registerMe(){
init(FORWARDER_NAME, "TCP "+host+":"+port,1,false); init(FORWARDER_NAME, "TCP "+host+":"+port,1,false);
xfzTcpClient = new TCPClient(); xfzTcpClient = new TCPClient();
xfzTcpClient.init(host, port, new GXXfzTCPListener()); xfzTcpClient.init(host, port);
xfzTcpClient.start(); xfzTcpClient.start();
} }
/** /**
* 每半小时转发GNSS解算结果 * 每半小时转发GNSS解算结果
*/ */
@Scheduled(cron = "0 0,30 * * * ?") // 每30分钟执行一次 @Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() { private void forwardGnss() {
logger.info("xfz forwardGnss");
forwardCurrentGnss(FORWARDER_NAME); forwardCurrentGnss(FORWARDER_NAME);
} }
@Scheduled(cron = "0 38 * * * ?") // 每小时的40分钟执行一次 @Scheduled(cron = "0 40 * * * ?") // 每小时的40分钟执行一次
//@Scheduled(cron = "0 0/20 * * * ?") // 每20分钟执行一次 //@Scheduled(cron = "0 0/20 * * * ?") // 每20分钟执行一次
private void forwardHistoryGnss() { private void forwardHistoryGnss() {
forwardHistoryGnss(FORWARDER_NAME); forwardHistoryGnss(FORWARDER_NAME);

View File

@ -40,12 +40,13 @@ public class GZYForwarder extends Forwarder{
/** /**
* 每半小时转发GNSS解算结果 * 每半小时转发GNSS解算结果
*/ */
@Scheduled(cron = "0 10,40 * * * ?") // 每30分钟执行一次 @Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() { private void forwardGnss() {
logger.info("gzy UDP forwardGnss");
forwardCurrentGnss(FORWARDER_NAME); forwardCurrentGnss(FORWARDER_NAME);
} }
@Scheduled(cron = "0 48 * * * ?") // 每小时的40分钟执行一次 @Scheduled(cron = "0 40 * * * ?") // 每小时的40分钟执行一次
//@Scheduled(cron = "0 0/20 * * * ?") // 每20分钟执行一次 //@Scheduled(cron = "0 0/20 * * * ?") // 每20分钟执行一次
private void forwardHistoryGnss() { private void forwardHistoryGnss() {
forwardHistoryGnss(FORWARDER_NAME); forwardHistoryGnss(FORWARDER_NAME);

View File

@ -49,12 +49,13 @@ public class GZYMQTTForwarder extends Forwarder {
/** /**
* 每半小时转发GNSS解算结果 * 每半小时转发GNSS解算结果
*/ */
@Scheduled(cron = "0 10,40 * * * ?") // 每30分钟执行一次 @Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() { private void forwardGnss() {
logger.info("gzy mqtt forwardGnss");
forwardCurrentGnss(FORWARDER_NAME); forwardCurrentGnss(FORWARDER_NAME);
} }
@Scheduled(cron = "0 48 * * * ?") // 每小时的40分钟执行一次 @Scheduled(cron = "0 40 * * * ?") // 每小时的40分钟执行一次
//@Scheduled(cron = "0 0/20 * * * ?") // 每20分钟执行一次 //@Scheduled(cron = "0 0/20 * * * ?") // 每20分钟执行一次
private void forwardHistoryGnss() { private void forwardHistoryGnss() {
forwardHistoryGnss(FORWARDER_NAME); forwardHistoryGnss(FORWARDER_NAME);

View File

@ -48,12 +48,13 @@ public class KingMaForwarder extends Forwarder{
/** /**
* 每半小时转发GNSS解算结果 * 每半小时转发GNSS解算结果
*/ */
@Scheduled(cron = "0 20,50 * * * ?") // 每30分钟执行一次 @Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() { private void forwardGnss() {
logger.info("kingma forwardGnss");
forwardCurrentGnss(FORWARDER_NAME); forwardCurrentGnss(FORWARDER_NAME);
} }
@Scheduled(cron = "0 58 * * * ?") // 每小时的40分钟执行一次 @Scheduled(cron = "0 40 * * * ?") // 每小时的40分钟执行一次
//@Scheduled(cron = "0 0/20 * * * ?") // 每20分钟执行一次 //@Scheduled(cron = "0 0/20 * * * ?") // 每20分钟执行一次
private void forwardHistoryGnss() { private void forwardHistoryGnss() {
forwardHistoryGnss(FORWARDER_NAME); forwardHistoryGnss(FORWARDER_NAME);