1、优化推送逻辑

This commit is contained in:
weidong 2025-06-28 14:00:17 +08:00
parent eed05660ab
commit d62777bc91

View File

@ -62,7 +62,7 @@ public class Forwarder {
* @param fwdNameType推送设备名deviceid或fwdid或projectid-sector-name
* @param cycle发送周期
*/
void init(String fwdGroupId, String desc, Integer tenantId, byte fwdNameType, int cycle){
public void init(String fwdGroupId, String desc, Integer tenantId, byte fwdNameType, int cycle){
this.fwdGroupId = fwdGroupId;
this.description = desc;
this.tenantId = tenantId;
@ -90,10 +90,18 @@ public class Forwarder {
}
}
public void forwardCurrentGnss() {
if(forwardGnssRecords(LocalDateTime.now(), null)){
// 推送失败记录补发记录
ThreadManager.getScheduledThreadPool().schedule(() -> {
this.forwardHistoryGnss();
},100, TimeUnit.SECONDS);
}
}
/***
* 推送指定企业设备ID时间的GNSS记录查找指定时间前cycle分钟的有效数据推送
* @param sendTime要推送的记录的时间
* @param resendRecord重发记录
*/
private boolean forwardGnssRecords(LocalDateTime sendTime, ResendRecord resendRecord) {
@ -101,24 +109,45 @@ public class Forwarder {
String beginTime = sendTime.minusMinutes(fwdCycleMinutes).format(formatter);
// 查找属于指定推送组的设备列表
/* QueryWrapper<GnssDevice> queryWrapper = new QueryWrapper<>();
if(deviceId != null){
queryWrapper.eq("deviceid", deviceId);
}
queryWrapper.eq("fwd_group_id", fwdGroupId)
.or()
.eq("fwd_group_id2", fwdGroupId);
List<GnssDevice> gnssDeviceList = deviceMapper.selectList(queryWrapper);
*/
MPJQueryWrapper jquery = new MPJQueryWrapper<GnssDevice> ()
List<GnssDeviceJoin> gnssDeviceList;
MPJQueryWrapper jquery = null;
if(resendRecord!=null && resendRecord.getDeviceid()!=null){
jquery = new MPJQueryWrapper<GnssDevice> ()
.selectAll(GnssDevice.class)
.select("d.latitude as latitude")
.select("d.longitude as longitude")
.leftJoin("gnssstatus d on t.deviceid = d.deviceid")
.eq("fwd_group_id", fwdGroupId)
.and(warpper->warpper.eq("fwd_group_id", fwdGroupId)
.or()
.eq("fwd_group_id2", fwdGroupId);
List<GnssDeviceJoin> gnssDeviceList = deviceMapper.selectJoinList(GnssDeviceJoin.class, jquery);
.eq("fwd_group_id2", fwdGroupId)).
eq("deviceid", resendRecord.getDeviceid());
}
else if(resendRecord!=null && resendRecord.getProjectid()!=null){
jquery = new MPJQueryWrapper<GnssDevice> ()
.selectAll(GnssDevice.class)
.select("d.latitude as latitude")
.select("d.longitude as longitude")
.leftJoin("gnssstatus d on t.deviceid = d.deviceid")
.and(warpper->warpper.eq("fwd_group_id", fwdGroupId)
.or()
.eq("fwd_group_id2", fwdGroupId))
.and(warpper->warpper.eq("project_id",resendRecord.getProjectid())
.or()
.eq("project2_id",resendRecord.getProjectid()));
}
else{
jquery = new MPJQueryWrapper<GnssDevice> ()
.selectAll(GnssDevice.class)
.select("d.latitude as latitude")
.select("d.longitude as longitude")
.leftJoin("gnssstatus d on t.deviceid = d.deviceid")
.and(warpper->warpper.eq("fwd_group_id", fwdGroupId)
.or()
.eq("fwd_group_id2", fwdGroupId));
}
gnssDeviceList = deviceMapper.selectJoinList(GnssDeviceJoin.class, jquery);
logger.debug("candidate fwd devices {}", gnssDeviceList.size());
// 查询最近半小时的GNSS记录
QueryWrapper<GnssCalcData> gnssQueryWrapper = new QueryWrapper<>();
@ -131,7 +160,9 @@ public class Forwarder {
if(resendRecord != null && resendRecord.getDeviceid()!=null){
gnssQueryWrapper.eq("deviceid", resendRecord.getDeviceid());
}
List<GnssCalcData> locationRecords = gnssDataMapper.selectList(gnssQueryWrapper);
logger.debug("candidate fwd records {}", locationRecords.size());
if(locationRecords.size() == 0) return false;
// 构造按项目id分类的GNSS记录
@ -216,15 +247,15 @@ public class Forwarder {
fwdResult = false;
fwdRecord.setResult(FwdRecord.RESULT_FAILED);
// 新增重发记录
resendRecord = new ResendRecord();
resendRecord.setProjectid(projectId);
resendRecord.setTenantid(tenantId);
resendRecord.setCreatetime(LocalDateTime.now());
resendRecord.setStarttime(sendTime);
resendRecord.setEndtime(sendTime);
resendRecord.setFwd_group_id(fwdGroupId);
resendRecord.setState(ResendRecord.STATE_FWD_FAILED);
resendRecordMapper.insert(resendRecord);
ResendRecord resendRecord1 = new ResendRecord();
resendRecord1.setProjectid(projectId);
resendRecord1.setTenantid(tenantId);
resendRecord1.setCreatetime(LocalDateTime.now());
resendRecord1.setStarttime(sendTime);
resendRecord1.setEndtime(sendTime);
resendRecord1.setFwd_group_id(fwdGroupId);
resendRecord1.setState(ResendRecord.STATE_FWD_FAILED);
resendRecordMapper.insert(resendRecord1);
}
fwdRecordsMapper.insert(fwdRecord);
}
@ -239,14 +270,6 @@ public class Forwarder {
return fwdResult;
}
void forwardCurrentGnss() {
if(forwardGnssRecords(LocalDateTime.now(),null)) {
// 推送失败记录补发记录
ThreadManager.getScheduledThreadPool().schedule(() -> {
this.forwardHistoryGnss();
},100, TimeUnit.SECONDS);
}
}
void forwardHistoryGnss() {
// 1.从转发记录表里检索待补传记录时间表含设备Id时间段
@ -257,7 +280,7 @@ public class Forwarder {
queryWrapper.ge("createtime", LocalDateTime.now().minusDays(30));
List<ResendRecord> resendRecordsList = resendRecordMapper.selectList(queryWrapper);
if(resendRecordsList!=null){
if(resendRecordsList!=null && resendRecordsList.size()>0){
//修改状态
UpdateWrapper<ResendRecord> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("fwd_group_id",fwdGroupId);