添加NRT平滑

This commit is contained in:
zms 2025-08-08 11:15:11 +08:00
parent a652a0d272
commit 1edea7f56d
4 changed files with 448 additions and 55 deletions

View File

@ -23,6 +23,8 @@ import java.sql.ResultSet;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Map; import java.util.Map;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@Component @Component
@ -43,6 +45,11 @@ public class TcpRtkServer implements CommandLineRunner {
// 存储最后一次保存的时间 // 存储最后一次保存的时间
private LocalDateTime lastSaveTime; private LocalDateTime lastSaveTime;
// 10分钟窗口聚合器线程安全
private final TenMinuteAggregator tenMinuteAggregator = new TenMinuteAggregator();
// 6小时ZTD滑动窗口存储每个10分钟窗口的平均ZTD
private final Deque<ZtdWindowMean> ztdHistory = new ConcurrentLinkedDeque<>();
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
new Thread(this::startServer, "TcpRtkServer-Main").start(); new Thread(this::startServer, "TcpRtkServer-Main").start();
@ -106,6 +113,9 @@ public class TcpRtkServer implements CommandLineRunner {
latestData.set(rtkData); latestData.set(rtkData);
logger.debug("Parsed $POS data: {}", rtkData); logger.debug("Parsed $POS data: {}", rtkData);
// 将样本加入10分钟窗口聚合
tenMinuteAggregator.addSampleSnapshot(rtkData);
} catch (Exception e) { } catch (Exception e) {
logger.error("Error parsing $POS data: {}", e.getMessage(), e); logger.error("Error parsing $POS data: {}", e.getMessage(), e);
} }
@ -126,6 +136,9 @@ public class TcpRtkServer implements CommandLineRunner {
latestData.set(rtkData); latestData.set(rtkData);
logger.debug("Updated $TROP data: ZTD={}", parts[5]); logger.debug("Updated $TROP data: ZTD={}", parts[5]);
// 将当前样本加入10分钟窗口聚合使用快照避免后续字段被覆盖
tenMinuteAggregator.addSampleSnapshot(rtkData);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Error parsing $TROP data: {}", e.getMessage(), e); logger.error("Error parsing $TROP data: {}", e.getMessage(), e);
@ -157,6 +170,8 @@ public class TcpRtkServer implements CommandLineRunner {
latestData.set(rtkData); latestData.set(rtkData);
logger.info("成功解析$TRPG数据: North={}, East={}", north, east); logger.info("成功解析$TRPG数据: North={}, East={}", north, east);
// 将样本加入10分钟窗口聚合
tenMinuteAggregator.addSampleSnapshot(rtkData);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
logger.error("$TRPG数值解析错误位置5和6: {}, {}", parts[5], parts[6], e); logger.error("$TRPG数值解析错误位置5和6: {}, {}", parts[5], parts[6], e);
} }
@ -172,6 +187,7 @@ public class TcpRtkServer implements CommandLineRunner {
rtkData.setZtdGradientEast(east); rtkData.setZtdGradientEast(east);
latestData.set(rtkData); latestData.set(rtkData);
logger.info("使用替代格式解析$TRPG数据: North={}, East={}", north, east); logger.info("使用替代格式解析$TRPG数据: North={}, East={}", north, east);
tenMinuteAggregator.addSampleSnapshot(rtkData);
} catch (Exception ex) { } catch (Exception ex) {
logger.error("尝试替代格式解析$TRPG失败", ex); logger.error("尝试替代格式解析$TRPG失败", ex);
} }
@ -271,47 +287,203 @@ public class TcpRtkServer implements CommandLineRunner {
} }
} }
// 每10分钟执行一次数据保存 // 每10分钟整点触发一次001020304050分
@Scheduled(fixedRate = 60000) // 600000毫秒 = 10分钟 @Scheduled(cron = "0 */10 * * * ?")
public void saveData() { public void saveData() {
try { try {
RtkData currentData = latestData.get(); // 触发时间整10分钟
if (currentData == null) { LocalDateTime boundaryTime = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES);
logger.warn("没有可用的RTK数据跳过本次保存");
// 从聚合器取出并计算10分钟均值
TenMinuteAggregator.Averages windowAvg = tenMinuteAggregator.drainAndCompute();
if (windowAvg == null || windowAvg.getSampleCount() == 0) {
logger.warn("10分钟窗口内无样本跳过保存");
return; return;
} }
LocalDateTime now = LocalDateTime.now(); // 更新6小时ZTD历史并计算平滑ZTD
if (windowAvg.getZtdMean() != null) {
// 计算PWV ztdHistory.addLast(new ZtdWindowMean(boundaryTime, windowAvg.getZtdMean()));
calculateAndPreparePWV();
// 检查是否需要保存距离上次保存超过10分钟
if (lastSaveTime == null ||
ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) {
// 创建新的RtkData对象复制所有属性
RtkData newData = new RtkData();
newData.setTimestamp(currentData.getTimestamp());
newData.setWeek(currentData.getWeek());
newData.setTimeOfWeek(currentData.getTimeOfWeek());
newData.setLatitude(currentData.getLatitude());
newData.setLongitude(currentData.getLongitude());
newData.setHeight(currentData.getHeight());
newData.setZtd(currentData.getZtd());
newData.setZtdGradientNorth(currentData.getZtdGradientNorth());
newData.setZtdGradientEast(currentData.getZtdGradientEast());
newData.setSatelliteInfo(currentData.getSatelliteInfo());
newData.setPwv(currentData.getPwv());
newData.setDeviceId(currentData.getDeviceId());
rtkDataRepository.save(newData);
lastSaveTime = now;
logger.info("成功保存RTK数据: 时间={}, ZTD={}, PWV={}",
newData.getTimestamp(), newData.getZtd(), newData.getPwv());
} }
// 移除6小时之前的数据
LocalDateTime sixHoursAgo = boundaryTime.minusHours(6);
while (!ztdHistory.isEmpty() && ztdHistory.peekFirst().time.isBefore(sixHoursAgo)) {
ztdHistory.pollFirst();
}
Double smoothedZtd = null;
if (!ztdHistory.isEmpty()) {
double sum = 0.0;
int count = 0;
for (ZtdWindowMean m : ztdHistory) {
if (m.ztdMean != null) {
sum += m.ztdMean;
count++;
}
}
if (count > 0) {
smoothedZtd = sum / count;
}
}
// 组装保存数据时间使用边界时间ZTD使用6小时平滑其他字段用10分钟均值
RtkData toSave = new RtkData();
toSave.setTimestamp(boundaryTime);
toSave.setLatitude(windowAvg.getLatitudeMean());
toSave.setLongitude(windowAvg.getLongitudeMean());
toSave.setHeight(windowAvg.getHeightMean());
Double finalZtd = smoothedZtd != null ? smoothedZtd : windowAvg.getZtdMean();
toSave.setZtd(finalZtd);
toSave.setZtdSmoothed6h(smoothedZtd);
toSave.setZtdGradientNorth(windowAvg.getGradientNorthMean());
toSave.setZtdGradientEast(windowAvg.getGradientEastMean());
toSave.setDeviceId(windowAvg.getLastDeviceId());
// 计算PWV基于当前窗口均值及最新气象数据ZTD采用平滑值
Map<String, Double> sensorData = getLatestSensorData();
if (sensorData != null) {
toSave.setSurfaceTemp(sensorData.get("temperature"));
toSave.setSurfacePressure(sensorData.get("pressure"));
}
calculatePWVForRecord(toSave);
rtkDataRepository.save(toSave);
lastSaveTime = boundaryTime;
logger.info("成功保存RTK数据(整点对齐): 时间={}, 样本数={}, ZTD(均值)={}, ZTD(6h平滑)={}, PWV={}",
toSave.getTimestamp(), windowAvg.getSampleCount(), windowAvg.getZtdMean(), toSave.getZtdSmoothed6h(), toSave.getPwv());
} catch (Exception e) { } catch (Exception e) {
logger.error("保存数据过程中发生错误: {}", e.getMessage(), e); logger.error("保存数据过程中发生错误: {}", e.getMessage(), e);
} }
} }
private void calculatePWVForRecord(RtkData record) {
try {
if (record.getZtd() == null) {
return;
}
if (record.getSurfaceTemp() == null || record.getSurfacePressure() == null) {
Map<String, Double> sensorData = getLatestSensorData();
if (sensorData != null) {
record.setSurfaceTemp(sensorData.get("temperature"));
record.setSurfacePressure(sensorData.get("pressure"));
}
}
PwvCalculator.calculatePWV(record);
} catch (Exception ex) {
logger.warn("PWV计算失败: {}", ex.getMessage());
}
}
private static class ZtdWindowMean {
private final LocalDateTime time;
private final Double ztdMean;
private ZtdWindowMean(LocalDateTime time, Double ztdMean) {
this.time = time;
this.ztdMean = ztdMean;
}
}
private static class TenMinuteAggregator {
private double latSum;
private int latCount;
private double lonSum;
private int lonCount;
private double heightSum;
private int heightCount;
private double ztdSum;
private int ztdCount;
private double gradNorthSum;
private int gradNorthCount;
private double gradEastSum;
private int gradEastCount;
private String lastDeviceId;
private int sampleCount;
synchronized void addSampleSnapshot(RtkData sample) {
// 仅在关键信息存在时累加
if (sample.getLatitude() != null) {
latSum += sample.getLatitude();
latCount++;
}
if (sample.getLongitude() != null) {
lonSum += sample.getLongitude();
lonCount++;
}
if (sample.getHeight() != null) {
heightSum += sample.getHeight();
heightCount++;
}
if (sample.getZtd() != null) {
ztdSum += sample.getZtd();
ztdCount++;
}
if (sample.getZtdGradientNorth() != null) {
gradNorthSum += sample.getZtdGradientNorth();
gradNorthCount++;
}
if (sample.getZtdGradientEast() != null) {
gradEastSum += sample.getZtdGradientEast();
gradEastCount++;
}
if (sample.getDeviceId() != null) {
lastDeviceId = sample.getDeviceId();
}
sampleCount++;
}
synchronized Averages drainAndCompute() {
if (sampleCount == 0) {
reset();
return new Averages(0, null, null, null, null, null, null, null);
}
Double latMean = latCount > 0 ? latSum / latCount : null;
Double lonMean = lonCount > 0 ? lonSum / lonCount : null;
Double heightMean = heightCount > 0 ? heightSum / heightCount : null;
Double ztdMean = ztdCount > 0 ? ztdSum / ztdCount : null;
Double gradNorthMean = gradNorthCount > 0 ? gradNorthSum / gradNorthCount : null;
Double gradEastMean = gradEastCount > 0 ? gradEastSum / gradEastCount : null;
int count = sampleCount;
String device = lastDeviceId;
reset();
return new Averages(count, latMean, lonMean, heightMean, ztdMean, gradNorthMean, gradEastMean, device);
}
private void reset() {
latSum = lonSum = heightSum = ztdSum = gradNorthSum = gradEastSum = 0.0;
latCount = lonCount = heightCount = ztdCount = gradNorthCount = gradEastCount = 0;
sampleCount = 0;
lastDeviceId = null;
}
static class Averages {
private final int sampleCount;
private final Double latitudeMean;
private final Double longitudeMean;
private final Double heightMean;
private final Double ztdMean;
private final Double gradientNorthMean;
private final Double gradientEastMean;
private final String lastDeviceId;
Averages(int sampleCount, Double latitudeMean, Double longitudeMean, Double heightMean,
Double ztdMean, Double gradientNorthMean, Double gradientEastMean, String lastDeviceId) {
this.sampleCount = sampleCount;
this.latitudeMean = latitudeMean;
this.longitudeMean = longitudeMean;
this.heightMean = heightMean;
this.ztdMean = ztdMean;
this.gradientNorthMean = gradientNorthMean;
this.gradientEastMean = gradientEastMean;
this.lastDeviceId = lastDeviceId;
}
int getSampleCount() { return sampleCount; }
Double getLatitudeMean() { return latitudeMean; }
Double getLongitudeMean() { return longitudeMean; }
Double getHeightMean() { return heightMean; }
Double getZtdMean() { return ztdMean; }
Double getGradientNorthMean() { return gradientNorthMean; }
Double getGradientEastMean() { return gradientEastMean; }
String getLastDeviceId() { return lastDeviceId; }
}
}
} }

View File

@ -33,6 +33,10 @@ public class RtkData {
@Column(name = "ztd") @Column(name = "ztd")
private Double ztd; private Double ztd;
// 6小时滑动平均的ZTD
@Column(name = "ztd_smoothed_6h")
private Double ztdSmoothed6h;
@Column(name = "ztd_gradient_north") @Column(name = "ztd_gradient_north")
private Double ztdGradientNorth; private Double ztdGradientNorth;

View File

@ -12,6 +12,8 @@ import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -20,6 +22,8 @@ public class RtkWebSocketHandler extends TextWebSocketHandler {
private static RtkDataRepository rtkDataRepository; private static RtkDataRepository rtkDataRepository;
private static final AtomicReference<RtkData> latestData = new AtomicReference<>(); private static final AtomicReference<RtkData> latestData = new AtomicReference<>();
private static LocalDateTime lastSaveTime; private static LocalDateTime lastSaveTime;
private static final TenMinuteAggregator tenMinuteAggregator = new TenMinuteAggregator();
private static final Deque<ZtdWindowMean> ztdHistory = new ConcurrentLinkedDeque<>();
@Autowired @Autowired
public void setRtkDataRepository(RtkDataRepository repository) { public void setRtkDataRepository(RtkDataRepository repository) {
@ -66,6 +70,7 @@ public class RtkWebSocketHandler extends TextWebSocketHandler {
rtkData.setLongitude(Double.parseDouble(parts[5])); rtkData.setLongitude(Double.parseDouble(parts[5]));
rtkData.setHeight(Double.parseDouble(parts[6])); rtkData.setHeight(Double.parseDouble(parts[6]));
latestData.set(rtkData); latestData.set(rtkData);
tenMinuteAggregator.addSampleSnapshot(rtkData);
} }
private void parseTropData(String message) { private void parseTropData(String message) {
@ -74,6 +79,7 @@ public class RtkWebSocketHandler extends TextWebSocketHandler {
String[] parts = message.split(","); String[] parts = message.split(",");
rtkData.setZtd(Double.parseDouble(parts[5])); rtkData.setZtd(Double.parseDouble(parts[5]));
latestData.set(rtkData); latestData.set(rtkData);
tenMinuteAggregator.addSampleSnapshot(rtkData);
} }
} }
@ -84,6 +90,7 @@ public class RtkWebSocketHandler extends TextWebSocketHandler {
rtkData.setZtdGradientNorth(Double.parseDouble(parts[5])); rtkData.setZtdGradientNorth(Double.parseDouble(parts[5]));
rtkData.setZtdGradientEast(Double.parseDouble(parts[6])); rtkData.setZtdGradientEast(Double.parseDouble(parts[6]));
latestData.set(rtkData); latestData.set(rtkData);
tenMinuteAggregator.addSampleSnapshot(rtkData);
} }
} }
@ -95,16 +102,120 @@ public class RtkWebSocketHandler extends TextWebSocketHandler {
} }
} }
@Scheduled(fixedRate = 600000) // 对齐整10分钟保存00,10,20,30,40,50
@Scheduled(cron = "0 */10 * * * ?")
public void saveData() { public void saveData() {
RtkData currentData = latestData.get(); LocalDateTime boundaryTime = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES);
if (currentData != null) { TenMinuteAggregator.Averages avg = tenMinuteAggregator.drainAndCompute();
LocalDateTime now = LocalDateTime.now(); if (avg == null || avg.getSampleCount() == 0) {
if (lastSaveTime == null || ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) { logger.warn("10分钟窗口内无样本跳过保存");
rtkDataRepository.save(currentData); return;
lastSaveTime = now; }
logger.info("Saved RTK data at: {}", now); if (avg.getZtdMean() != null) {
} ztdHistory.addLast(new ZtdWindowMean(boundaryTime, avg.getZtdMean()));
}
LocalDateTime sixHoursAgo = boundaryTime.minusHours(6);
while (!ztdHistory.isEmpty() && ztdHistory.peekFirst().time.isBefore(sixHoursAgo)) {
ztdHistory.pollFirst();
}
Double smoothedZtd = null;
if (!ztdHistory.isEmpty()) {
double sum = 0.0;
int count = 0;
for (ZtdWindowMean m : ztdHistory) {
if (m.ztdMean != null) { sum += m.ztdMean; count++; }
}
if (count > 0) smoothedZtd = sum / count;
}
RtkData toSave = new RtkData();
toSave.setTimestamp(boundaryTime);
toSave.setLatitude(avg.getLatitudeMean());
toSave.setLongitude(avg.getLongitudeMean());
toSave.setHeight(avg.getHeightMean());
Double finalZtd = smoothedZtd != null ? smoothedZtd : avg.getZtdMean();
toSave.setZtd(finalZtd);
toSave.setZtdSmoothed6h(smoothedZtd);
toSave.setZtdGradientNorth(avg.getGradientNorthMean());
toSave.setZtdGradientEast(avg.getGradientEastMean());
rtkDataRepository.save(toSave);
lastSaveTime = boundaryTime;
logger.info("Saved RTK data at aligned boundary with 10-min mean and 6h smoothed ZTD: {}", boundaryTime);
}
private static class ZtdWindowMean {
private final LocalDateTime time;
private final Double ztdMean;
private ZtdWindowMean(LocalDateTime time, Double ztdMean) {
this.time = time;
this.ztdMean = ztdMean;
}
}
private static class TenMinuteAggregator {
private double latSum; private int latCount;
private double lonSum; private int lonCount;
private double heightSum; private int heightCount;
private double ztdSum; private int ztdCount;
private double gradNorthSum; private int gradNorthCount;
private double gradEastSum; private int gradEastCount;
private int sampleCount;
synchronized void addSampleSnapshot(RtkData sample) {
if (sample.getLatitude() != null) { latSum += sample.getLatitude(); latCount++; }
if (sample.getLongitude() != null) { lonSum += sample.getLongitude(); lonCount++; }
if (sample.getHeight() != null) { heightSum += sample.getHeight(); heightCount++; }
if (sample.getZtd() != null) { ztdSum += sample.getZtd(); ztdCount++; }
if (sample.getZtdGradientNorth() != null) { gradNorthSum += sample.getZtdGradientNorth(); gradNorthCount++; }
if (sample.getZtdGradientEast() != null) { gradEastSum += sample.getZtdGradientEast(); gradEastCount++; }
sampleCount++;
}
synchronized Averages drainAndCompute() {
if (sampleCount == 0) { reset(); return new Averages(0, null, null, null, null, null, null); }
Double latMean = latCount > 0 ? latSum / latCount : null;
Double lonMean = lonCount > 0 ? lonSum / lonCount : null;
Double heightMean = heightCount > 0 ? heightSum / heightCount : null;
Double ztdMean = ztdCount > 0 ? ztdSum / ztdCount : null;
Double gradNorthMean = gradNorthCount > 0 ? gradNorthSum / gradNorthCount : null;
Double gradEastMean = gradEastCount > 0 ? gradEastSum / gradEastCount : null;
int count = sampleCount;
reset();
return new Averages(count, latMean, lonMean, heightMean, ztdMean, gradNorthMean, gradEastMean);
}
private void reset() {
latSum = lonSum = heightSum = ztdSum = gradNorthSum = gradEastSum = 0.0;
latCount = lonCount = heightCount = ztdCount = gradNorthCount = gradEastCount = 0;
sampleCount = 0;
}
static class Averages {
private final int sampleCount;
private final Double latitudeMean;
private final Double longitudeMean;
private final Double heightMean;
private final Double ztdMean;
private final Double gradientNorthMean;
private final Double gradientEastMean;
Averages(int sampleCount, Double latitudeMean, Double longitudeMean, Double heightMean,
Double ztdMean, Double gradientNorthMean, Double gradientEastMean) {
this.sampleCount = sampleCount;
this.latitudeMean = latitudeMean;
this.longitudeMean = longitudeMean;
this.heightMean = heightMean;
this.ztdMean = ztdMean;
this.gradientNorthMean = gradientNorthMean;
this.gradientEastMean = gradientEastMean;
}
int getSampleCount() { return sampleCount; }
Double getLatitudeMean() { return latitudeMean; }
Double getLongitudeMean() { return longitudeMean; }
Double getHeightMean() { return heightMean; }
Double getZtdMean() { return ztdMean; }
Double getGradientNorthMean() { return gradientNorthMean; }
Double getGradientEastMean() { return gradientEastMean; }
} }
} }
} }

View File

@ -17,8 +17,10 @@ import org.springframework.stereotype.Component;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Deque;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@Component @Component
@ -32,6 +34,8 @@ public class RtkWebSocketServer {
private static final AtomicReference<RtkData> latestData = new AtomicReference<>(); private static final AtomicReference<RtkData> latestData = new AtomicReference<>();
// 存储最后一次保存的时间 // 存储最后一次保存的时间
private static LocalDateTime lastSaveTime; private static LocalDateTime lastSaveTime;
private static final TenMinuteAggregator tenMinuteAggregator = new TenMinuteAggregator();
private static final Deque<ZtdWindowMean> ztdHistory = new ConcurrentLinkedDeque<>();
@Autowired @Autowired
public void setRtkDataRepository(RtkDataRepository repository) { public void setRtkDataRepository(RtkDataRepository repository) {
@ -87,6 +91,7 @@ public class RtkWebSocketServer {
rtkData.setHeight(Double.parseDouble(parts[6])); rtkData.setHeight(Double.parseDouble(parts[6]));
latestData.set(rtkData); latestData.set(rtkData);
tenMinuteAggregator.addSampleSnapshot(rtkData);
} }
private void parseTropData(String message) { private void parseTropData(String message) {
@ -95,6 +100,7 @@ public class RtkWebSocketServer {
String[] parts = message.split(","); String[] parts = message.split(",");
rtkData.setZtd(Double.parseDouble(parts[5])); rtkData.setZtd(Double.parseDouble(parts[5]));
latestData.set(rtkData); latestData.set(rtkData);
tenMinuteAggregator.addSampleSnapshot(rtkData);
} }
} }
@ -105,6 +111,7 @@ public class RtkWebSocketServer {
rtkData.setZtdGradientNorth(Double.parseDouble(parts[5])); rtkData.setZtdGradientNorth(Double.parseDouble(parts[5]));
rtkData.setZtdGradientEast(Double.parseDouble(parts[6])); rtkData.setZtdGradientEast(Double.parseDouble(parts[6]));
latestData.set(rtkData); latestData.set(rtkData);
tenMinuteAggregator.addSampleSnapshot(rtkData);
} }
} }
@ -116,21 +123,120 @@ public class RtkWebSocketServer {
} }
} }
// 每10分钟执行一次数据保存 // 对齐整10分钟保存00,10,20,30,40,50
@Scheduled(fixedRate = 600000) // 600000毫秒 = 10分钟 @Scheduled(cron = "0 */10 * * * ?")
public void saveData() { public void saveData() {
RtkData currentData = latestData.get(); LocalDateTime boundaryTime = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES);
if (currentData != null) { TenMinuteAggregator.Averages avg = tenMinuteAggregator.drainAndCompute();
LocalDateTime now = LocalDateTime.now(); if (avg == null || avg.getSampleCount() == 0) {
logger.warn("10分钟窗口内无样本跳过保存");
return;
}
if (avg.getZtdMean() != null) {
ztdHistory.addLast(new ZtdWindowMean(boundaryTime, avg.getZtdMean()));
}
LocalDateTime sixHoursAgo = boundaryTime.minusHours(6);
while (!ztdHistory.isEmpty() && ztdHistory.peekFirst().time.isBefore(sixHoursAgo)) {
ztdHistory.pollFirst();
}
Double smoothedZtd = null;
if (!ztdHistory.isEmpty()) {
double sum = 0.0;
int count = 0;
for (ZtdWindowMean m : ztdHistory) {
if (m.ztdMean != null) { sum += m.ztdMean; count++; }
}
if (count > 0) smoothedZtd = sum / count;
}
// 检查是否需要保存距离上次保存超过10分钟 RtkData toSave = new RtkData();
if (lastSaveTime == null || toSave.setTimestamp(boundaryTime);
ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) { toSave.setLatitude(avg.getLatitudeMean());
toSave.setLongitude(avg.getLongitudeMean());
toSave.setHeight(avg.getHeightMean());
Double finalZtd = smoothedZtd != null ? smoothedZtd : avg.getZtdMean();
toSave.setZtd(finalZtd);
toSave.setZtdSmoothed6h(smoothedZtd);
toSave.setZtdGradientNorth(avg.getGradientNorthMean());
toSave.setZtdGradientEast(avg.getGradientEastMean());
rtkDataRepository.save(currentData); rtkDataRepository.save(toSave);
lastSaveTime = now; lastSaveTime = boundaryTime;
logger.info("Saved RTK data at: {}", now); logger.info("Saved RTK data at aligned boundary with 10-min mean and 6h smoothed ZTD: {}", boundaryTime);
} }
private static class ZtdWindowMean {
private final LocalDateTime time;
private final Double ztdMean;
private ZtdWindowMean(LocalDateTime time, Double ztdMean) {
this.time = time;
this.ztdMean = ztdMean;
}
}
private static class TenMinuteAggregator {
private double latSum; private int latCount;
private double lonSum; private int lonCount;
private double heightSum; private int heightCount;
private double ztdSum; private int ztdCount;
private double gradNorthSum; private int gradNorthCount;
private double gradEastSum; private int gradEastCount;
private int sampleCount;
synchronized void addSampleSnapshot(RtkData sample) {
if (sample.getLatitude() != null) { latSum += sample.getLatitude(); latCount++; }
if (sample.getLongitude() != null) { lonSum += sample.getLongitude(); lonCount++; }
if (sample.getHeight() != null) { heightSum += sample.getHeight(); heightCount++; }
if (sample.getZtd() != null) { ztdSum += sample.getZtd(); ztdCount++; }
if (sample.getZtdGradientNorth() != null) { gradNorthSum += sample.getZtdGradientNorth(); gradNorthCount++; }
if (sample.getZtdGradientEast() != null) { gradEastSum += sample.getZtdGradientEast(); gradEastCount++; }
sampleCount++;
}
synchronized Averages drainAndCompute() {
if (sampleCount == 0) { reset(); return new Averages(0, null, null, null, null, null, null); }
Double latMean = latCount > 0 ? latSum / latCount : null;
Double lonMean = lonCount > 0 ? lonSum / lonCount : null;
Double heightMean = heightCount > 0 ? heightSum / heightCount : null;
Double ztdMean = ztdCount > 0 ? ztdSum / ztdCount : null;
Double gradNorthMean = gradNorthCount > 0 ? gradNorthSum / gradNorthCount : null;
Double gradEastMean = gradEastCount > 0 ? gradEastSum / gradEastCount : null;
int count = sampleCount;
reset();
return new Averages(count, latMean, lonMean, heightMean, ztdMean, gradNorthMean, gradEastMean);
}
private void reset() {
latSum = lonSum = heightSum = ztdSum = gradNorthSum = gradEastSum = 0.0;
latCount = lonCount = heightCount = ztdCount = gradNorthCount = gradEastCount = 0;
sampleCount = 0;
}
static class Averages {
private final int sampleCount;
private final Double latitudeMean;
private final Double longitudeMean;
private final Double heightMean;
private final Double ztdMean;
private final Double gradientNorthMean;
private final Double gradientEastMean;
Averages(int sampleCount, Double latitudeMean, Double longitudeMean, Double heightMean,
Double ztdMean, Double gradientNorthMean, Double gradientEastMean) {
this.sampleCount = sampleCount;
this.latitudeMean = latitudeMean;
this.longitudeMean = longitudeMean;
this.heightMean = heightMean;
this.ztdMean = ztdMean;
this.gradientNorthMean = gradientNorthMean;
this.gradientEastMean = gradientEastMean;
}
int getSampleCount() { return sampleCount; }
Double getLatitudeMean() { return latitudeMean; }
Double getLongitudeMean() { return longitudeMean; }
Double getHeightMean() { return heightMean; }
Double getZtdMean() { return ztdMean; }
Double getGradientNorthMean() { return gradientNorthMean; }
Double getGradientEastMean() { return gradientEastMean; }
} }
} }
} }