优化推送历史数据

This commit is contained in:
weidong 2024-02-04 17:24:43 +08:00
parent 877c107625
commit 440d8aebb2
6 changed files with 69 additions and 95 deletions

View File

@ -61,34 +61,46 @@ public class Forwarder {
} }
} }
void forwardCurrentGnss(String fwdGroupId) { /***
LocalDateTime nowTime = LocalDateTime.now(); * 推送指定推送组设备ID时间的GNSS记录查找指定时间前半小时的有效数据推送
ConcurrentHashMap<String, List<GnssCalcData>> projects = new ConcurrentHashMap<>(); * @param fwdGroupId推送组
* @param deviceId设备ID如果没有指定则推送全部
// 转发数据 * @param sendTime要推送的记录的时间
String sendAfterTime = nowTime.minusMinutes(30).format(formatter); */
void forwardGnssRecords(String fwdGroupId, String deviceId, LocalDateTime sendTime) {
String endTime = sendTime.format(formatter);
String beginTime = sendTime.minusMinutes(30).format(formatter);
// 查找属于指定推送组的设备列表
QueryWrapper<GnssDevice> queryWrapper = new QueryWrapper<>(); QueryWrapper<GnssDevice> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("fwd_group_id", fwdGroupId) queryWrapper.eq("fwd_group_id", fwdGroupId)
.or() .or()
.eq("fwd_group_id2", fwdGroupId); .eq("fwd_group_id2", fwdGroupId);
if(deviceId != null){
queryWrapper.eq("deviceid", deviceId);
}
List<GnssDevice> gnssDeviceList = deviceMapper.selectList(queryWrapper); List<GnssDevice> gnssDeviceList = deviceMapper.selectList(queryWrapper);
List<GnssCalcData> recordsToSend;
// 查询最近半小时的记录 // 查询最近半小时的GNSS记录
QueryWrapper<GnssCalcData> gnssQueryWrapper = new QueryWrapper<>(); QueryWrapper<GnssCalcData> gnssQueryWrapper = new QueryWrapper<>();
gnssQueryWrapper.ge("createtime",sendAfterTime); gnssQueryWrapper.ge("createtime",beginTime);
gnssQueryWrapper.le("createtime",endTime);
gnssQueryWrapper.orderByDesc("createtime"); gnssQueryWrapper.orderByDesc("createtime");
gnssQueryWrapper.eq("enabled",true); gnssQueryWrapper.eq("enabled",true);
gnssQueryWrapper.isNotNull("rpose"); gnssQueryWrapper.isNotNull("rpose");
if(deviceId != null){
gnssQueryWrapper.eq("deviceid", deviceId);
}
List<GnssCalcData> locationRecords = gnssDataMapper.selectList(gnssQueryWrapper); List<GnssCalcData> locationRecords = gnssDataMapper.selectList(gnssQueryWrapper);
// 构造按项目id分类的GNSS记录
ConcurrentHashMap<String, List<GnssCalcData>> projects = new ConcurrentHashMap<>();
for(GnssDevice device:gnssDeviceList){ for(GnssDevice device:gnssDeviceList){
if(device.getOpmode() != GnssDevice.OP_MODE_USE) continue; if(device.getOpmode() != GnssDevice.OP_MODE_USE) continue;
String projectId = device.getProject_id(); String projectId = device.getProject_id();
if(projectId == null) continue; if(projectId == null) continue;
recordsToSend = projects.get(projectId); List<GnssCalcData> recordsToSend = projects.get(projectId);
if(recordsToSend == null){ if(recordsToSend == null){
recordsToSend = new ArrayList<>(); recordsToSend = new ArrayList<>();
projects.put(projectId,recordsToSend); projects.put(projectId,recordsToSend);
@ -121,14 +133,14 @@ public class Forwarder {
List<GnssCalcData> records = entry.getValue(); List<GnssCalcData> records = entry.getValue();
if(records.size() == 0) continue; if(records.size() == 0) continue;
int sendNum = send(projectId, records); int sendNum = send(projectId, records, sendTime);
FwdRecord fwdRecord = new FwdRecord(); FwdRecord fwdRecord = new FwdRecord();
fwdRecord.setProject_id(projectId); fwdRecord.setProject_id(projectId);
fwdRecord.setTenantid(tenantId); fwdRecord.setTenantid(tenantId);
fwdRecord.setDevicenum((short) records.size()); fwdRecord.setDevicenum((short) records.size());
fwdRecord.setStarttime(nowTime); fwdRecord.setStarttime(sendTime);
fwdRecord.setEndtime(nowTime); fwdRecord.setEndtime(sendTime);
fwdRecord.setFwd_group_id(fwdGroupId); fwdRecord.setFwd_group_id(fwdGroupId);
if(sendNum > 0) { if(sendNum > 0) {
@ -151,8 +163,24 @@ public class Forwarder {
if(totalSendNum>0) updateFwd(totalSendNum, true); if(totalSendNum>0) updateFwd(totalSendNum, true);
else if(fwdFailed) updateFwd(totalSendNum, false); else if(fwdFailed) updateFwd(totalSendNum, false);
}); });
}
void forwardCurrentGnss(String fwdGroupId) {
forwardGnssRecords(fwdGroupId,null,LocalDateTime.now());
}
void forwardBatchGnssRecords(String fwdGroupId, String deviceId, LocalDateTime beginTime, LocalDateTime endTime) {
LocalDateTime sendTime = beginTime;
while(sendTime.isBefore(endTime)){
forwardGnssRecords(fwdGroupId,deviceId,sendTime);
sendTime = sendTime.plusMinutes(30);
try {
Thread.sleep(500);
}
catch (Exception e){
}
}
} }
void forwardHistoryGnss(String fwdGroupId) { void forwardHistoryGnss(String fwdGroupId) {
@ -162,80 +190,15 @@ public class Forwarder {
queryWrapper.eq("state",FwdRecord.STATE_UPLOAD_DONE); queryWrapper.eq("state",FwdRecord.STATE_UPLOAD_DONE);
List<FwdRecord> fwdRecordsList = fwdRecordsMapper.selectList(queryWrapper); List<FwdRecord> fwdRecordsList = fwdRecordsMapper.selectList(queryWrapper);
// 2.检索这个这个时间段的解算结果如果有数据则单个终端转发标志记录为已补传 // 2.检索这个这个时间段的解算结果如果有数据则单个终端转发标志记录为已补传
totalSendNum = 0;
fwdFailed = false;
List<FwdRecord> fwdRecordList = new ArrayList<>();
for(FwdRecord fwdRecord:fwdRecordsList){ for(FwdRecord fwdRecord:fwdRecordsList){
// 查找device forwardBatchGnssRecords(fwdRecord.getFwd_group_id(), fwdRecord.getDeviceid(),
QueryWrapper<GnssDevice> deviceQueryWrapper = new QueryWrapper<>(); fwdRecord.getStarttime(), fwdRecord.getEndtime());
deviceQueryWrapper.eq("fwd_group_id", fwdGroupId);
deviceQueryWrapper.eq("deviceid", fwdRecord.getDeviceid());
GnssDevice device = deviceMapper.selectOne(deviceQueryWrapper);
if(device == null) continue;
// 查找位置记录
QueryWrapper<GnssCalcData> calcDataQueryWrapper = new QueryWrapper<>();
calcDataQueryWrapper.eq("deviceid", fwdRecord.getDeviceid());
calcDataQueryWrapper.ge("createtime", fwdRecord.getStarttime());
calcDataQueryWrapper.le("createtime", fwdRecord.getEndtime());
calcDataQueryWrapper.orderByAsc("createtime");
calcDataQueryWrapper.eq("enabled",true);
calcDataQueryWrapper.isNotNull("rpose");
List<GnssCalcData> calcDataList = gnssDataMapper.selectList(calcDataQueryWrapper);
// 推送记录
int sendNum = sendBatch(device, fwdRecord.getProject_id(), calcDataList);
if(sendNum > 0) {
// 记录推送结果
fwdRecord.setState(FwdRecord.STATE_FWD_DONE); fwdRecord.setState(FwdRecord.STATE_FWD_DONE);
totalSendNum += sendNum; fwdRecordsMapper.updateById(fwdRecord);
}
else{
fwdRecord.setState(FwdRecord.STATE_FWD_FAILED);
fwdFailed = true;
}
fwdRecordList.add(fwdRecord);
}
// 更新推送信息
ThreadManager.getFixedThreadPool().submit(() -> {
for(FwdRecord record:fwdRecordList){
fwdRecordsMapper.updateById(record);
}
if(totalSendNum>0) updateFwd(totalSendNum, true);
else if(fwdFailed) updateFwd(totalSendNum, false);
});
}
int sendBatch(GnssDevice device, String projectId, List<GnssCalcData> records){
if(records.size() == 0) return 0;
LocalDateTime lastTime = records.get(0).getCreatetime();
for(GnssCalcData calcData:records){
if(calcData.getCreatetime().isAfter(lastTime.plusMinutes(28))){
// 推送
lastTime = calcData.getCreatetime();
// 替换推送名和值
if(useFwdId && device.getFwddeviceid()!=null && device.getFwddeviceid().trim().length()>0) {
calcData.setDeviceid(device.getFwddeviceid());
}
if(device.getIpose()!=null &&
device.getIposn()!=null &&
device.getIposd()!=null){
calcData.setRpose(calcData.getRpose()-device.getIpose());
calcData.setRposn(calcData.getRposn()-device.getIposn());
calcData.setRposd(calcData.getRposd()-device.getIposd());
}
}
else{
calcData.setEnabled(false);//借用来表示不推送不会保存到数据库
} }
} }
return send(projectId, records); int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime) {
}
int send(String projectId, List<GnssCalcData> records) {
return 0; return 0;
} }

View File

@ -14,6 +14,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -59,7 +60,7 @@ public class GXXfzForwarder extends Forwarder{
} }
@Override @Override
int send(String projectId, List<GnssCalcData> records){ int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime){
int sendNum = 0; int sendNum = 0;
if(records.size() == 0) return 0; if(records.size() == 0) return 0;

View File

@ -14,6 +14,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
@Component @Component
@ -53,7 +54,7 @@ public class GZYForwarder extends Forwarder{
} }
@Override @Override
int send(String projectId, List<GnssCalcData> records){ int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime){
int sendNum = 0; int sendNum = 0;
if(records.size() == 0) return 0; if(records.size() == 0) return 0;

View File

@ -16,6 +16,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
@Component @Component
@ -62,7 +63,7 @@ public class GZYMQTTForwarder extends Forwarder {
} }
@Override @Override
int send(String projectId, List<GnssCalcData> records) { int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime) {
int sendNum = 0; int sendNum = 0;
for (GnssCalcData locationRecord : records) { for (GnssCalcData locationRecord : records) {

View File

@ -82,7 +82,7 @@ public class KingMaForwarder extends Forwarder{
} }
@Override @Override
int send(String projectId, List<GnssCalcData> records){ int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime){
int sendNum = 0; int sendNum = 0;
// 检查token是否过期 // 检查token是否过期

View File

@ -51,10 +51,10 @@ public class ZNYForwarder extends Forwarder{
} }
@Override @Override
int send(String projectId, List<GnssCalcData> records){ int send(String projectId, List<GnssCalcData> records, LocalDateTime sentTime){
int sendNum = 0; int sendNum = 0;
// 单位 // 单位
DefoData defoData = new DefoData(projectId, LocalDateTime.now().format(dateFormatter), records.size()); DefoData defoData = new DefoData(projectId, sentTime.format(dateFormatter), records.size());
for(GnssCalcData locationRecord:records) { for(GnssCalcData locationRecord:records) {
double n = NumberUtils.scale(locationRecord.getRposn() * 0.001, 5); double n = NumberUtils.scale(locationRecord.getRposn() * 0.001, 5);
double e = NumberUtils.scale(locationRecord.getRpose() * 0.001, 5); double e = NumberUtils.scale(locationRecord.getRpose() * 0.001, 5);
@ -66,6 +66,9 @@ public class ZNYForwarder extends Forwarder{
} }
String json = GsonUtil.toJson(defoData); String json = GsonUtil.toJson(defoData);
logger.info("发送数据到武汉中南设计院平台:{}", json); logger.info("发送数据到武汉中南设计院平台:{}", json);
//return sendNum;
try {
String result = HttpUtils.postJson(data_host, json); String result = HttpUtils.postJson(data_host, json);
logger.info("发送数据到武汉中南设计院平台返回结果:{}", result); logger.info("发送数据到武汉中南设计院平台返回结果:{}", result);
JSONObject obj = (JSONObject) JSONObject.parse(result); JSONObject obj = (JSONObject) JSONObject.parse(result);
@ -73,5 +76,10 @@ public class ZNYForwarder extends Forwarder{
if (msg.contains("成功")) return sendNum; if (msg.contains("成功")) return sendNum;
else return 0; else return 0;
} }
catch (Exception e){
logger.error(e.toString());
return 0;
}
}
} }