1、如果一个项目的设备数大于50,则拆包发送

2、广西交设、葛洲坝、广西路建继承同一个新发展类,基类不加Component注解
3、如果是对端强制中断TCP连接,则10秒后重连
This commit is contained in:
weidong 2025-07-19 14:23:34 +08:00
parent bedb2e3ef5
commit 6d170eaed0
6 changed files with 111 additions and 59 deletions

View File

@ -73,10 +73,10 @@ public class TCPClient {
} }
}, 5000, TimeUnit.MILLISECONDS); }, 5000, TimeUnit.MILLISECONDS);
} else { } else {
future.channel().config().setWriteBufferWaterMark(new WriteBufferWaterMark( /*future.channel().config().setWriteBufferWaterMark(new WriteBufferWaterMark(
1024 * 1024, // low 1024 * 1024, // low
4 *1024*1024 // high 4 *1024*1024 // high
)); ));*/
logger.info("{}:{} tcp client start success!",host,port); logger.info("{}:{} tcp client start success!",host,port);
} }
} }
@ -108,7 +108,7 @@ public class TCPClient {
connectTime = LocalDateTime.now(); connectTime = LocalDateTime.now();
} }
public void onDisconnect(){ public void onDisconnect(boolean isIdle){
if(connectTime.isBefore(LocalDateTime.now().minusMinutes(1))) { if(connectTime.isBefore(LocalDateTime.now().minusMinutes(1))) {
connect(); connect();
} }
@ -119,7 +119,7 @@ public class TCPClient {
} catch (Exception e) { } catch (Exception e) {
logger.error(e.toString()); logger.error(e.toString());
} }
},60, TimeUnit.SECONDS); },isIdle?60:10, TimeUnit.SECONDS);
} }
} }

View File

@ -40,13 +40,14 @@ public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("tcp channel inactive"); logger.info("tcp channel inactive");
tcpClient.onDisconnect(); tcpClient.onDisconnect(true);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("TcpMessageHandler error: {}", cause.toString()); logger.error("TcpMessageHandler error: {}", cause.toString());
ctx.close(); ctx.close();
tcpClient.onDisconnect(false);
} }
} }

View File

@ -0,0 +1,43 @@
package com.imdroid.beidou_fwd.task;
import com.imdroid.beidou_fwd.service.TCPClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Configuration
@EnableScheduling
public class GXJSForwarder extends GXXfzForwarder{
private final String FORWARDER_NAME = "广西新发展";
@Value("${xfz.server.host}")
private String host;
@Value("${xfz.server.port}")
private int port;
private boolean enabled=true;
@PostConstruct
void registerMe(){
init(FORWARDER_NAME, "TCP "+host+":"+port,1,FWD_DEVICE_ID,30);
xfzTcpClient = new TCPClient();
xfzTcpClient.init(host, port,listener);
if(!enabled) return;
xfzTcpClient.start();
}
/**
* 每半小时转发GNSS解算结果
*/
@Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() {
if(!enabled) return;
logger.debug("gxjs forwardGnss");
forwardCurrentGnss();
}
}

View File

@ -4,6 +4,7 @@ import com.imdroid.beidou_fwd.service.TCPClient;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -11,20 +12,31 @@ import javax.annotation.PostConstruct;
@Component @Component
@Configuration @Configuration
@EnableScheduling @EnableScheduling
public class GXXfz2Forwarder extends GXXfzForwarder{ public class GXLJForwarder extends GXXfzForwarder{
private String FORWARDER_NAME = "广西路建"; private final String FORWARDER_NAME = "广西路建";
@Value("${gxlj.server.host}") @Value("${gxlj.server.host}")
private String host; private String host;
@Value("${gxlj.server.port}") @Value("${gxlj.server.port}")
private int port; private int port;
private boolean enabled=true;
@PostConstruct @PostConstruct
@Override
void registerMe(){ void registerMe(){
init(FORWARDER_NAME, "TCP "+host+":"+port,6,FWD_DEVICE_ID,30); init(FORWARDER_NAME, "TCP "+host+":"+port,6,FWD_DEVICE_ID,30);
xfzTcpClient = new TCPClient(); xfzTcpClient = new TCPClient();
xfzTcpClient.init(host, port,listener); xfzTcpClient.init(host, port,listener);
if(!enabled) return;
xfzTcpClient.start(); xfzTcpClient.start();
} }
/**
* 每半小时转发GNSS解算结果
*/
@Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() {
if(!enabled) return;
logger.debug("gxlj forwardGnss");
forwardCurrentGnss();
}
} }

View File

@ -6,28 +6,13 @@ import com.imdroid.beidou_fwd.service.TCPListener;
import com.imdroid.common.util.GsonUtil; import com.imdroid.common.util.GsonUtil;
import com.imdroid.common.util.NumberUtils; import com.imdroid.common.util.NumberUtils;
import com.imdroid.secapi.dto.GnssCalcData; import com.imdroid.secapi.dto.GnssCalcData;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@Component
@Configuration
@EnableScheduling
public class GXXfzForwarder extends Forwarder{ public class GXXfzForwarder extends Forwarder{
private final String FORWARDER_NAME = "广西新发展";
@Value("${xfz.server.host}")
private String host;
@Value("${xfz.server.port}")
private int port;
TCPClient xfzTcpClient; TCPClient xfzTcpClient;
static class XFZTCPListener implements TCPListener{ static class XFZTCPListener implements TCPListener{
@ -56,31 +41,10 @@ public class GXXfzForwarder extends Forwarder{
} }
} }
XFZTCPListener listener = new XFZTCPListener(); XFZTCPListener listener = new XFZTCPListener();
@PostConstruct
void registerMe(){
init(FORWARDER_NAME, "TCP "+host+":"+port,1,FWD_DEVICE_ID,30);
xfzTcpClient = new TCPClient();
xfzTcpClient.init(host, port,listener);
xfzTcpClient.start();
}
/**
* 每半小时转发GNSS解算结果
*/
@Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() {
logger.debug("xfz forwardGnss");
forwardCurrentGnss();
}
/*
@Scheduled(cron = "0 0/10 * * * ?") // 每30分钟执行一次
private void checkDevice() {
//logger.debug("zny checkDevice");
checkOfflineDevice("2345053","2350106","2350124");
}*/
@Override @Override
int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime){ int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime){
int batchNum = 0;
int sendNum = 0; int sendNum = 0;
if(records.size() == 0) return 0; if(records.size() == 0) return 0;
@ -104,19 +68,40 @@ public class GXXfzForwarder extends Forwarder{
// 经纬度 // 经纬度
data.setDevLng(locationRecord.getR9250e()); data.setDevLng(locationRecord.getR9250e());
data.setDevLat(locationRecord.getR9250n()); data.setDevLat(locationRecord.getR9250n());
sendNum++; // 发送
batchNum++;
if(batchNum==50){
String json = "#" + GsonUtil.toJson(xfzTcpMessage) + "!";
logger.debug("project {}: forwad {} gnss records to {}}",projectId, dataList.size(),fwdGroupId);
logger.debug(json);
try {
listener.clear();
xfzTcpClient.writeAndFlush(json);
//等待应答
if(checkResult()) sendNum += batchNum;
} catch (Exception e1) {
logger.error(e1.toString());
}
batchNum = 0;
dataList.clear();
}
} }
String json = "#" + GsonUtil.toJson(xfzTcpMessage) + "!";
logger.debug("project " + projectId + ": push calculation result to XFZ"); if(batchNum>0){
logger.debug(json); String json = "#" + GsonUtil.toJson(xfzTcpMessage) + "!";
try { logger.debug("project {}: forwad {} gnss records to {}}",projectId, dataList.size(),fwdGroupId);
listener.clear(); logger.debug(json);
xfzTcpClient.writeAndFlush(json); try {
//等待应答 listener.clear();
if(!checkResult()) sendNum = 0; xfzTcpClient.writeAndFlush(json);
} catch (Exception e1) { //等待应答
sendNum = 0; if(checkResult()) sendNum += batchNum;
e1.printStackTrace(); } catch (Exception e1) {
logger.error(e1.toString());
}
dataList.clear();
} }
return sendNum; return sendNum;
} }

View File

@ -4,6 +4,7 @@ import com.imdroid.beidou_fwd.service.TCPClient;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -12,20 +13,30 @@ import javax.annotation.PostConstruct;
@Configuration @Configuration
@EnableScheduling @EnableScheduling
public class GZBForwarder extends GXXfzForwarder{ public class GZBForwarder extends GXXfzForwarder{
private String FORWARDER_NAME = "葛洲坝"; private final String FORWARDER_NAME = "葛洲坝";
@Value("${gzb.server.host}") @Value("${gzb.server.host}")
private String host; private String host;
@Value("${gzb.server.port}") @Value("${gzb.server.port}")
private int port; private int port;
private boolean enabled=true;
@PostConstruct @PostConstruct
@Override
void registerMe(){ void registerMe(){
init(FORWARDER_NAME, "TCP "+host+":"+port,1,FWD_DEVICE_ID,30); init(FORWARDER_NAME, "TCP "+host+":"+port,1,FWD_DEVICE_ID,30);
xfzTcpClient = new TCPClient(); xfzTcpClient = new TCPClient();
xfzTcpClient.init(host, port,listener); xfzTcpClient.init(host, port,listener);
if(!enabled) return;
xfzTcpClient.start(); xfzTcpClient.start();
} }
/**
* 每半小时转发GNSS解算结果
*/
@Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
private void forwardGnss() {
if(!enabled) return;
logger.debug("gzb forwardGnss");
forwardCurrentGnss();
}
} }