1、优化推送

This commit is contained in:
weidong 2025-10-23 16:23:10 +08:00
parent 4fc3fb9617
commit b04ec862b2

View File

@ -15,10 +15,7 @@ import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class Forwarder {
@ -96,15 +93,10 @@ public class Forwarder {
/***
* 推送指定企业设备ID时间的GNSS记录查找指定时间前cycle分钟的有效数据推送
* @param sendTime要推送的记录的时间
* @param resendRecord重推送信息
*/
private boolean forwardGnssRecords(LocalDateTime sendTime, ResendRecord resendRecord) {
String endTime = sendTime.format(formatter);
String beginTime = sendTime.minusMinutes(fwdCycleMinutes).format(formatter);
// 查找属于指定推送组的设备列表
List<GnssDeviceJoin> gnssDeviceList;
private List<GnssDeviceJoin> getFwdDeviceList(ResendRecord resendRecord){
MPJQueryWrapper jquery = null;
if(resendRecord!=null && StringUtils.hasText(resendRecord.getDeviceid())){
@ -118,8 +110,9 @@ public class Forwarder {
.leftJoin("gnssstatus d on t.deviceid = d.deviceid")
.and(warpper->warpper.eq("fwd_group_id", fwdGroupId)
.or()
.eq("fwd_group_id2", fwdGroupId)).
eq("t.deviceid", resendRecord.getDeviceid());
.eq("fwd_group_id2", fwdGroupId))
.eq("t.deviceid", resendRecord.getDeviceid())
.eq("opmode",GnssDevice.OP_MODE_USE);
}
else if(resendRecord!=null && resendRecord.getProjectid()!=null){
jquery = new MPJQueryWrapper<GnssDevice> ()
@ -135,7 +128,8 @@ public class Forwarder {
.eq("fwd_group_id2", fwdGroupId))
.and(warpper->warpper.eq("project_id",resendRecord.getProjectid())
.or()
.eq("project2_id",resendRecord.getProjectid()));
.eq("project2_id",resendRecord.getProjectid()))
.eq("opmode",GnssDevice.OP_MODE_USE);
}
else{
jquery = new MPJQueryWrapper<GnssDevice> ()
@ -148,29 +142,58 @@ public class Forwarder {
.leftJoin("gnssstatus d on t.deviceid = d.deviceid")
.and(warpper->warpper.eq("fwd_group_id", fwdGroupId)
.or()
.eq("fwd_group_id2", fwdGroupId));
.eq("fwd_group_id2", fwdGroupId))
.eq("opmode",GnssDevice.OP_MODE_USE);
}
return deviceMapper.selectJoinList(GnssDeviceJoin.class, jquery);
}
gnssDeviceList = deviceMapper.selectJoinList(GnssDeviceJoin.class, jquery);
logger.debug("candidate fwd devices {}", gnssDeviceList.size());
// 查询最近半小时的GNSS记录
HashMap<String, GnssCalcData> getFwdGnssData(LocalDateTime sendTime,List<GnssDeviceJoin> gnssDeviceList, ResendRecord resendRecord){
String endTime = sendTime.format(formatter);
String beginTime = sendTime.minusMinutes(fwdCycleMinutes).format(formatter);
// 直接以设备列表作为查询条件查询语句很大一个单位可能有几千台设备
// 用设备列表关联的所有的tenantId来查询然后再筛选效率更高但可能包含了非推送的设备列表
HashSet<Integer> tenantIdSet = new HashSet<>();
for(GnssDeviceJoin d:gnssDeviceList){
tenantIdSet.add(d.getTenantid());
}
logger.debug("candidate fwd tenants {}", tenantIdSet.size());
// 查询最近一个推送周期的GNSS记录
QueryWrapper<GnssCalcData> gnssQueryWrapper = new QueryWrapper<>();
gnssQueryWrapper.ge("createtime",beginTime);
gnssQueryWrapper.le("createtime",endTime);
gnssQueryWrapper.orderByDesc("createtime");
gnssQueryWrapper.eq("enabled",true);
gnssQueryWrapper.eq("stabled",true);
gnssQueryWrapper.isNotNull("rpose");
if(resendRecord != null && StringUtils.hasText(resendRecord.getDeviceid())){
gnssQueryWrapper.eq("deviceid", resendRecord.getDeviceid());
}
else {
gnssQueryWrapper.and(wrapper -> {
int i=0;
for (Integer tenantId : tenantIdSet) {
if(i==0) wrapper.eq("tenantid", tenantId);
else wrapper.or().eq("tenantid", tenantId);
i++;
}
});
}
gnssQueryWrapper.orderByDesc("createtime");
List<GnssCalcData> locationRecords = gnssDataMapper.selectList(gnssQueryWrapper);
logger.debug("candidate fwd records {}", locationRecords.size());
if(locationRecords.size() == 0) return false;
List<GnssCalcData> gnssDataList = gnssDataMapper.selectList(gnssQueryWrapper);
// 同一台设备只保留最近的记录
HashMap<String, GnssCalcData> gnssDataMap = new HashMap<>();
for(GnssCalcData data:gnssDataList){
gnssDataMap.put(data.getDeviceid(),data);
}
return gnssDataMap;
}
// 构造按项目id分类的GNSS记录
ConcurrentHashMap<String, List<GnssCalcData>> projects = new ConcurrentHashMap<>();
HashMap<String, List<GnssCalcData>> createFwdDataByProject(
List<GnssDeviceJoin> gnssDeviceList,HashMap<String, GnssCalcData> gnssDataMap,ResendRecord resendRecord){
HashMap<String, List<GnssCalcData>> projects = new HashMap<>();
for(GnssDeviceJoin device:gnssDeviceList){
if(device.getOpmode() != GnssDevice.OP_MODE_USE) continue;
String projectId = device.getProject_id();
@ -191,8 +214,8 @@ public class Forwarder {
recordsToSend = new ArrayList<>();
projects.put(projectId,recordsToSend);
}
for(GnssCalcData record:locationRecords){
if(record.getDeviceid().equals(device.getDeviceid())) {
GnssCalcData record=gnssDataMap.get(device.getDeviceid());
if(record!=null) {
// 替换成推送用的名字和数值
if (device.getIpose() != null &&
device.getIposn() != null &&
@ -224,10 +247,24 @@ public class Forwarder {
record.setAuxd(device.getYaw());
recordsToSend.add(record);
break;
}
}
return projects;
}
private boolean forwardGnssRecords(LocalDateTime sendTime, ResendRecord resendRecord) {
// 查找属于指定推送组fwdGroupId的设备列表补推条件可能包含项目号或设备号
List<GnssDeviceJoin> gnssDeviceList = getFwdDeviceList(resendRecord);
logger.debug("candidate fwd devices {}", gnssDeviceList.size());
if(gnssDeviceList.size() == 0) return false;
//查找要推送的数据
HashMap<String, GnssCalcData> gnssDataMap = getFwdGnssData(sendTime, gnssDeviceList, resendRecord);
logger.debug("candidate fwd records {}", gnssDataMap.size());
if(gnssDataMap.size() == 0) return false;
// 构造按项目id分类的GNSS记录
HashMap<String, List<GnssCalcData>> projects =
createFwdDataByProject(gnssDeviceList,gnssDataMap,resendRecord);
// 按项目打包推送
totalSendNum = 0;