1、优化金马平台推送是否成功的判断
This commit is contained in:
parent
191334ff54
commit
f70ef35096
@ -19,6 +19,7 @@ public class FwdRecord {
|
||||
public static final short STATE_UPLOADING = 2;
|
||||
public static final short STATE_UPLOAD_DONE = 3;
|
||||
public static final short STATE_FWD_DONE = 0;
|
||||
public static final short STATE_FWD_FAILED = -1;
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
Long id;
|
||||
Integer tenantid;
|
||||
|
||||
@ -26,14 +26,16 @@ public class TCPClient {
|
||||
private EventLoopGroup group;
|
||||
private Channel channel;
|
||||
ByteBuf sendBuffer;
|
||||
TCPListener listener;
|
||||
|
||||
public void start() {
|
||||
new Thread(this::connect, "xfz-tcp-client").start();
|
||||
}
|
||||
|
||||
public void init(String dest_addr, int dest_port) {
|
||||
host = dest_addr;
|
||||
port = dest_port;
|
||||
public void init(String dest_addr, int dest_port, TCPListener listener) {
|
||||
this.host = dest_addr;
|
||||
this.port = dest_port;
|
||||
this.listener = listener;
|
||||
|
||||
//客户端需要一个事件循环组
|
||||
group = new NioEventLoopGroup();
|
||||
@ -89,18 +91,25 @@ public class TCPClient {
|
||||
else connect();
|
||||
}
|
||||
|
||||
public void onConnectionActive(){
|
||||
public void onConnected(){
|
||||
listener.onConnected();
|
||||
flush();
|
||||
}
|
||||
|
||||
public void onDisconnect(){
|
||||
listener.onDisconnect();
|
||||
}
|
||||
|
||||
private void flush(){
|
||||
if(sendBuffer!=null && sendBuffer.readableBytes()>0){
|
||||
channel.writeAndFlush(sendBuffer).addListener(future -> {
|
||||
if (future.isSuccess()) {
|
||||
logger.info("send to xfz server succeed.");
|
||||
//sendBuffer.release(); // writeAndFlush后就释放了
|
||||
listener.onSendResult(true);
|
||||
} else {
|
||||
logger.info("send to xfz server failed.");
|
||||
listener.onSendResult(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -0,0 +1,7 @@
|
||||
package com.imdroid.beidou_fwd.service;
|
||||
|
||||
public interface TCPListener {
|
||||
void onConnected();
|
||||
void onDisconnect();
|
||||
void onSendResult(boolean isOK);
|
||||
}
|
||||
@ -33,12 +33,13 @@ public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
logger.info("xfz tcp channel active");
|
||||
tcpClient.onConnectionActive();
|
||||
tcpClient.onConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
logger.info("xfz tcp channel inactive");
|
||||
tcpClient.onDisconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -100,26 +100,37 @@ public class Forwarder {
|
||||
|
||||
// 按项目打包推送
|
||||
int totalSendNum = 0;
|
||||
boolean fwdFailed = false;
|
||||
for (Map.Entry<String, List<GnssCalcData>> entry: projects.entrySet()){
|
||||
int sendNum = send(entry.getKey(), entry.getValue());
|
||||
String projectId = entry.getKey();
|
||||
List<GnssCalcData> records = entry.getValue();
|
||||
if(records.size() == 0) continue;
|
||||
|
||||
int sendNum = send(projectId, records);
|
||||
|
||||
FwdRecord fwdRecord = new FwdRecord();
|
||||
fwdRecord.setProject_id(projectId);
|
||||
fwdRecord.setTenantid(tenantId);
|
||||
fwdRecord.setDevicenum((short) records.size());
|
||||
fwdRecord.setStarttime(nowTime);
|
||||
fwdRecord.setEndtime(nowTime);
|
||||
fwdRecord.setFwd_group_id(fwdGroupId);
|
||||
|
||||
if(sendNum > 0) {
|
||||
totalSendNum += sendNum;
|
||||
// 记录推送
|
||||
FwdRecord fwdRecord = new FwdRecord();
|
||||
fwdRecord.setProject_id(entry.getKey());
|
||||
fwdRecord.setTenantid(tenantId);
|
||||
fwdRecord.setDevicenum((short) entry.getValue().size());
|
||||
fwdRecord.setStarttime(nowTime);
|
||||
fwdRecord.setEndtime(nowTime);
|
||||
fwdRecord.setState(FwdRecord.STATE_FWD_DONE);
|
||||
fwdRecord.setFwd_group_id(fwdGroupId);
|
||||
fwdRecordsMapper.insert(fwdRecord);
|
||||
}
|
||||
else{
|
||||
fwdRecord.setState(FwdRecord.STATE_FWD_FAILED);
|
||||
fwdFailed = true;
|
||||
}
|
||||
fwdRecordsMapper.insert(fwdRecord);
|
||||
}
|
||||
|
||||
// 更新推送信息
|
||||
if(totalSendNum>0) updateFwd(totalSendNum, true);
|
||||
else if(fwdFailed) updateFwd(totalSendNum, false);
|
||||
|
||||
}
|
||||
|
||||
@ -130,6 +141,8 @@ public class Forwarder {
|
||||
queryWrapper.eq("state",FwdRecord.STATE_UPLOAD_DONE);
|
||||
List<FwdRecord> fwdRecordsList = fwdRecordsMapper.selectList(queryWrapper);
|
||||
// 2.检索这个这个时间段的解算结果,如果有数据则单个终端转发,标志记录为已补传
|
||||
int totalSendNum = 0;
|
||||
boolean fwdFailed = false;
|
||||
for(FwdRecord fwdRecord:fwdRecordsList){
|
||||
// 查找device
|
||||
QueryWrapper<GnssDevice> deviceQueryWrapper = new QueryWrapper<>();
|
||||
@ -148,12 +161,22 @@ public class Forwarder {
|
||||
calcDataQueryWrapper.isNotNull("rb562e");
|
||||
List<GnssCalcData> calcDataList = gnssDataMapper.selectList(calcDataQueryWrapper);
|
||||
// 推送记录
|
||||
if(sendBatch(device, fwdRecord.getProject_id(), calcDataList)>0) {
|
||||
int sendNum = sendBatch(device, fwdRecord.getProject_id(), calcDataList);
|
||||
if(sendNum > 0) {
|
||||
// 记录推送结果
|
||||
fwdRecord.setState(FwdRecord.STATE_FWD_DONE);
|
||||
totalSendNum += sendNum;
|
||||
}
|
||||
else{
|
||||
fwdRecord.setState(FwdRecord.STATE_FWD_FAILED);
|
||||
fwdFailed = true;
|
||||
}
|
||||
fwdRecordsMapper.updateById(fwdRecord);
|
||||
}
|
||||
}
|
||||
|
||||
// 更新推送信息
|
||||
if(totalSendNum>0) updateFwd(totalSendNum, true);
|
||||
else if(fwdFailed) updateFwd(totalSendNum, false);
|
||||
}
|
||||
|
||||
int sendBatch(GnssDevice device, String projectId, List<GnssCalcData> records){
|
||||
|
||||
@ -2,6 +2,7 @@ package com.imdroid.beidou_fwd.task;
|
||||
|
||||
import com.imdroid.beidou_fwd.entity.XFZData;
|
||||
import com.imdroid.beidou_fwd.service.TCPClient;
|
||||
import com.imdroid.beidou_fwd.service.TCPListener;
|
||||
import com.imdroid.common.util.GsonUtil;
|
||||
import com.imdroid.common.util.NumberUtils;
|
||||
import com.imdroid.secapi.dto.*;
|
||||
@ -35,11 +36,29 @@ public class GXXfzForwarder extends Forwarder{
|
||||
|
||||
final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
static class GXXfzTCPListener implements TCPListener{
|
||||
|
||||
@Override
|
||||
public void onConnected() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSendResult(boolean isOK) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
void registerMe(){
|
||||
init(FORWARDER_NAME, "TCP "+host+":"+port,1,false);
|
||||
xfzTcpClient = new TCPClient();
|
||||
xfzTcpClient.init(host, port);
|
||||
xfzTcpClient.init(host, port, new GXXfzTCPListener());
|
||||
xfzTcpClient.start();
|
||||
}
|
||||
|
||||
|
||||
@ -59,21 +59,16 @@ public class KingMaForwarder extends Forwarder{
|
||||
forwardHistoryGnss(FORWARDER_NAME);
|
||||
}
|
||||
|
||||
boolean updateToken(){
|
||||
boolean updateToken() throws Exception{
|
||||
String jsonLogin = "{\"username\":\""+login_user+"\"," +
|
||||
"\"password\":\""+login_pwd+"\"}";
|
||||
String result = HttpUtils.postJson(login_host, jsonLogin);
|
||||
if(result == null) return false;
|
||||
|
||||
String token = null;
|
||||
try {
|
||||
JSONObject obj = (JSONObject) JSONObject.parse(result);
|
||||
obj = obj.getJSONObject("data");
|
||||
token = obj.getString("token");
|
||||
}
|
||||
catch (Exception e){
|
||||
|
||||
}
|
||||
if(token == null) return false;
|
||||
|
||||
if(header == null){
|
||||
@ -86,12 +81,17 @@ public class KingMaForwarder extends Forwarder{
|
||||
@Override
|
||||
int send(String projectId, List<GnssCalcData> records){
|
||||
int sendNum = 0;
|
||||
if(records.size() == 0) return 0;
|
||||
|
||||
// 检查token是否过期
|
||||
LocalDateTime nowTime = LocalDateTime.now();
|
||||
if(header == null || nowTime.isAfter(lastTokenTime.plusMinutes(59))){
|
||||
if(!updateToken()){
|
||||
try {
|
||||
if (!updateToken()) {
|
||||
logger.info("update token failed!");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
catch (Exception e){
|
||||
logger.info("update token failed!");
|
||||
return 0;
|
||||
}
|
||||
@ -115,7 +115,10 @@ public class KingMaForwarder extends Forwarder{
|
||||
logger.info("project " + projectId + ": push calculation result to Kingma");
|
||||
logger.info(json);
|
||||
logger.info("result: "+result);
|
||||
return sendNum;
|
||||
JSONObject obj = (JSONObject) JSONObject.parse(result);
|
||||
String msg = obj.getString("message");
|
||||
if(msg.equals("Success")) return sendNum;
|
||||
else return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -55,6 +55,8 @@
|
||||
<span>补传中</span>
|
||||
{{# } else if(d.state == 3){ }}
|
||||
<span>补传完成</span>
|
||||
{{# } else if(d.state == -1){ }}
|
||||
<span>推送失败</span>
|
||||
{{# } else { }}
|
||||
<span >等待</span>
|
||||
{{# } }}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user