1、优化推送。如果一次推送超过30分钟,会造成重复推送

This commit is contained in:
weidong 2025-01-17 14:12:48 +08:00
parent e1e67b11a2
commit 3a29e6118a
3 changed files with 49 additions and 31 deletions

View File

@ -18,6 +18,7 @@ public class ResendRecord {
static public final short STATE_FWD_OK = 0; static public final short STATE_FWD_OK = 0;
static public final short STATE_FWD_FAILED = 1; static public final short STATE_FWD_FAILED = 1;
static public final short STATE_BREAK_POINT = 2; static public final short STATE_BREAK_POINT = 2;
static public final short STATE_FWDING = 3;
@TableId(value = "id", type = IdType.AUTO) @TableId(value = "id", type = IdType.AUTO)
Long id; Long id;
Integer tenantid; Integer tenantid;

View File

@ -1,6 +1,7 @@
package com.imdroid.beidou_fwd.task; package com.imdroid.beidou_fwd.task;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.github.yulichang.query.MPJQueryWrapper; import com.github.yulichang.query.MPJQueryWrapper;
import com.imdroid.beidou_fwd.entity.GZYMQTTData; import com.imdroid.beidou_fwd.entity.GZYMQTTData;
import com.imdroid.common.util.GsonUtil; import com.imdroid.common.util.GsonUtil;
@ -198,7 +199,8 @@ public class Forwarder {
int sendNum = send(projectId, records, sendTime); int sendNum = send(projectId, records, sendTime);
// 记录推送 // 非重发的才产生推送记录
if(resendRecord==null) {
FwdRecord fwdRecord = new FwdRecord(); FwdRecord fwdRecord = new FwdRecord();
fwdRecord.setProjectid(projectId); fwdRecord.setProjectid(projectId);
fwdRecord.setTenantid(tenantId); fwdRecord.setTenantid(tenantId);
@ -209,16 +211,10 @@ public class Forwarder {
if (sendNum > 0) { if (sendNum > 0) {
totalSendNum += sendNum; totalSendNum += sendNum;
fwdRecord.setResult(FwdRecord.RESULT_OK); fwdRecord.setResult(FwdRecord.RESULT_OK);
if(resendRecord != null){ } else {
resendRecord.setState(ResendRecord.STATE_FWD_OK);
resendRecordMapper.updateById(resendRecord);
}
}
else{
fwdResult = false; fwdResult = false;
fwdRecord.setResult(FwdRecord.RESULT_FAILED); fwdRecord.setResult(FwdRecord.RESULT_FAILED);
// 新增重发记录 // 新增重发记录
if(resendRecord == null) {
resendRecord = new ResendRecord(); resendRecord = new ResendRecord();
resendRecord.setProjectid(projectId); resendRecord.setProjectid(projectId);
resendRecord.setTenantid(tenantId); resendRecord.setTenantid(tenantId);
@ -229,17 +225,15 @@ public class Forwarder {
resendRecord.setState(ResendRecord.STATE_FWD_FAILED); resendRecord.setState(ResendRecord.STATE_FWD_FAILED);
resendRecordMapper.insert(resendRecord); resendRecordMapper.insert(resendRecord);
} }
else{
resendRecord.setState(ResendRecord.STATE_FWD_FAILED);
resendRecordMapper.updateById(resendRecord);
}
}
fwdRecordsMapper.insert(fwdRecord); fwdRecordsMapper.insert(fwdRecord);
} }
}
// 更新推送记录 // 更新推送记录
if(resendRecord==null) {
if (totalSendNum > 0) updateFwd(totalSendNum, true); if (totalSendNum > 0) updateFwd(totalSendNum, true);
else if (!fwdResult) updateFwd(totalSendNum, false); else if (!fwdResult) updateFwd(totalSendNum, false);
}
return fwdResult; return fwdResult;
} }
@ -258,10 +252,20 @@ public class Forwarder {
QueryWrapper<ResendRecord> queryWrapper = new QueryWrapper<>(); QueryWrapper<ResendRecord> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("fwd_group_id",fwdGroupId); queryWrapper.eq("fwd_group_id",fwdGroupId);
queryWrapper.ne("state",ResendRecord.STATE_FWD_OK); queryWrapper.ne("state",ResendRecord.STATE_FWD_OK);
queryWrapper.ne("state",ResendRecord.STATE_FWDING);
queryWrapper.ge("createtime", LocalDateTime.now().minusDays(30)); queryWrapper.ge("createtime", LocalDateTime.now().minusDays(30));
List<ResendRecord> resendRecordsList = resendRecordMapper.selectList(queryWrapper); List<ResendRecord> resendRecordsList = resendRecordMapper.selectList(queryWrapper);
if(resendRecordsList!=null){ if(resendRecordsList!=null){
logger.info("{} forward history records: {}",fwdGroupId, resendRecordsList.size()); //修改状态
UpdateWrapper<ResendRecord> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("fwd_group_id",fwdGroupId);
updateWrapper.ne("state",ResendRecord.STATE_FWD_OK);
updateWrapper.ne("state",ResendRecord.STATE_FWDING);
updateWrapper.ge("createtime", LocalDateTime.now().minusDays(30));
updateWrapper.set("state",ResendRecord.STATE_FWDING);
int updateNum = resendRecordMapper.update(null, updateWrapper);
logger.info("{} forward history records: {}, update {}",fwdGroupId, resendRecordsList.size(),updateNum);
// 2.检索这个这个时间段的解算结果如果有数据则单个终端转发标志记录为已补传 // 2.检索这个这个时间段的解算结果如果有数据则单个终端转发标志记录为已补传
for(ResendRecord record:resendRecordsList){ for(ResendRecord record:resendRecordsList){
if(record.getProjectid()!=null) if(record.getProjectid()!=null)
@ -273,10 +277,21 @@ public class Forwarder {
void forwardBatchGnssRecords(ResendRecord record) { void forwardBatchGnssRecords(ResendRecord record) {
LocalDateTime sendTime = record.getStarttime(); LocalDateTime sendTime = record.getStarttime();
int totalSend = 0;
while(sendTime.isBefore(record.getEndtime()) || sendTime.isEqual(record.getEndtime())){ while(sendTime.isBefore(record.getEndtime()) || sendTime.isEqual(record.getEndtime())){
forwardGnssRecords(sendTime,record); if(forwardGnssRecords(sendTime,record)) totalSend++;
sendTime = sendTime.plusMinutes(fwdCycleMinutes); sendTime = sendTime.plusMinutes(fwdCycleMinutes);
} }
if(totalSend>0) {
record.setState(ResendRecord.STATE_FWD_OK);
}
else{
record.setState(ResendRecord.STATE_FWD_FAILED);
}
resendRecordMapper.updateById(record);
} }
int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime) { int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime) {

View File

@ -128,8 +128,10 @@
<span class="layui-badge layui-bg-green">推送成功</span> <span class="layui-badge layui-bg-green">推送成功</span>
{{# } else if(d.state == 1){ }} {{# } else if(d.state == 1){ }}
<span class="layui-badge layui-bg-red">推送失败</span> <span class="layui-badge layui-bg-red">推送失败</span>
{{# } else if(d.state == 2){ }}
<span class="layui-badge layui-bg-red">断点补传</span>
{{# } else { }} {{# } else { }}
<span class="layui-badge layui-bg-orange">断点补传</span> <span class="layui-badge layui-bg-orange">推送中</span>
{{# } }} {{# } }}
</script> </script>