diff --git a/ntrip_load-main/src/main/java/com/rtklib/TcpRtkServer.java b/ntrip_load-main/src/main/java/com/rtklib/TcpRtkServer.java index 6d779e0..019d861 100644 --- a/ntrip_load-main/src/main/java/com/rtklib/TcpRtkServer.java +++ b/ntrip_load-main/src/main/java/com/rtklib/TcpRtkServer.java @@ -23,6 +23,8 @@ import java.sql.ResultSet; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; import java.util.Map; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicReference; @Component @@ -43,6 +45,11 @@ public class TcpRtkServer implements CommandLineRunner { // 存储最后一次保存的时间 private LocalDateTime lastSaveTime; + // 10分钟窗口聚合器(线程安全) + private final TenMinuteAggregator tenMinuteAggregator = new TenMinuteAggregator(); + // 6小时ZTD滑动窗口(存储每个10分钟窗口的平均ZTD) + private final Deque ztdHistory = new ConcurrentLinkedDeque<>(); + @Override public void run(String... args) throws Exception { new Thread(this::startServer, "TcpRtkServer-Main").start(); @@ -106,6 +113,9 @@ public class TcpRtkServer implements CommandLineRunner { latestData.set(rtkData); logger.debug("Parsed $POS data: {}", rtkData); + + // 将样本加入10分钟窗口聚合 + tenMinuteAggregator.addSampleSnapshot(rtkData); } catch (Exception e) { logger.error("Error parsing $POS data: {}", e.getMessage(), e); } @@ -126,6 +136,9 @@ public class TcpRtkServer implements CommandLineRunner { latestData.set(rtkData); logger.debug("Updated $TROP data: ZTD={}", parts[5]); + + // 将当前样本加入10分钟窗口聚合(使用快照,避免后续字段被覆盖) + tenMinuteAggregator.addSampleSnapshot(rtkData); } } catch (Exception e) { logger.error("Error parsing $TROP data: {}", e.getMessage(), e); @@ -157,6 +170,8 @@ public class TcpRtkServer implements CommandLineRunner { latestData.set(rtkData); logger.info("成功解析$TRPG数据: North={}, East={}", north, east); + // 将样本加入10分钟窗口聚合 + tenMinuteAggregator.addSampleSnapshot(rtkData); } catch (NumberFormatException e) { logger.error("$TRPG数值解析错误,位置5和6: {}, {}", parts[5], parts[6], e); } @@ -172,6 +187,7 @@ public class TcpRtkServer implements CommandLineRunner { rtkData.setZtdGradientEast(east); latestData.set(rtkData); logger.info("使用替代格式解析$TRPG数据: North={}, East={}", north, east); + tenMinuteAggregator.addSampleSnapshot(rtkData); } catch (Exception ex) { logger.error("尝试替代格式解析$TRPG失败", ex); } @@ -271,47 +287,203 @@ public class TcpRtkServer implements CommandLineRunner { } } - // 每10分钟执行一次数据保存 - @Scheduled(fixedRate = 60000) // 600000毫秒 = 10分钟 + // 每10分钟整点触发一次(00、10、20、30、40、50分) + @Scheduled(cron = "0 */10 * * * ?") public void saveData() { try { - RtkData currentData = latestData.get(); - if (currentData == null) { - logger.warn("没有可用的RTK数据,跳过本次保存"); + // 触发时间(整10分钟) + LocalDateTime boundaryTime = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES); + + // 从聚合器取出并计算10分钟均值 + TenMinuteAggregator.Averages windowAvg = tenMinuteAggregator.drainAndCompute(); + if (windowAvg == null || windowAvg.getSampleCount() == 0) { + logger.warn("10分钟窗口内无样本,跳过保存"); return; } - - LocalDateTime now = LocalDateTime.now(); - - // 计算PWV - calculateAndPreparePWV(); - - // 检查是否需要保存(距离上次保存超过10分钟) - if (lastSaveTime == null || - ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) { - - // 创建新的RtkData对象,复制所有属性 - RtkData newData = new RtkData(); - newData.setTimestamp(currentData.getTimestamp()); - newData.setWeek(currentData.getWeek()); - newData.setTimeOfWeek(currentData.getTimeOfWeek()); - newData.setLatitude(currentData.getLatitude()); - newData.setLongitude(currentData.getLongitude()); - newData.setHeight(currentData.getHeight()); - newData.setZtd(currentData.getZtd()); - newData.setZtdGradientNorth(currentData.getZtdGradientNorth()); - newData.setZtdGradientEast(currentData.getZtdGradientEast()); - newData.setSatelliteInfo(currentData.getSatelliteInfo()); - newData.setPwv(currentData.getPwv()); - newData.setDeviceId(currentData.getDeviceId()); - - rtkDataRepository.save(newData); - lastSaveTime = now; - logger.info("成功保存RTK数据: 时间={}, ZTD={}, PWV={}", - newData.getTimestamp(), newData.getZtd(), newData.getPwv()); + + // 更新6小时ZTD历史并计算平滑ZTD + if (windowAvg.getZtdMean() != null) { + ztdHistory.addLast(new ZtdWindowMean(boundaryTime, windowAvg.getZtdMean())); } + // 移除6小时之前的数据 + LocalDateTime sixHoursAgo = boundaryTime.minusHours(6); + while (!ztdHistory.isEmpty() && ztdHistory.peekFirst().time.isBefore(sixHoursAgo)) { + ztdHistory.pollFirst(); + } + Double smoothedZtd = null; + if (!ztdHistory.isEmpty()) { + double sum = 0.0; + int count = 0; + for (ZtdWindowMean m : ztdHistory) { + if (m.ztdMean != null) { + sum += m.ztdMean; + count++; + } + } + if (count > 0) { + smoothedZtd = sum / count; + } + } + + // 组装保存数据:时间使用边界时间;ZTD使用6小时平滑;其他字段用10分钟均值 + RtkData toSave = new RtkData(); + toSave.setTimestamp(boundaryTime); + toSave.setLatitude(windowAvg.getLatitudeMean()); + toSave.setLongitude(windowAvg.getLongitudeMean()); + toSave.setHeight(windowAvg.getHeightMean()); + Double finalZtd = smoothedZtd != null ? smoothedZtd : windowAvg.getZtdMean(); + toSave.setZtd(finalZtd); + toSave.setZtdSmoothed6h(smoothedZtd); + toSave.setZtdGradientNorth(windowAvg.getGradientNorthMean()); + toSave.setZtdGradientEast(windowAvg.getGradientEastMean()); + toSave.setDeviceId(windowAvg.getLastDeviceId()); + + // 计算PWV(基于当前窗口均值及最新气象数据;ZTD采用平滑值) + Map sensorData = getLatestSensorData(); + if (sensorData != null) { + toSave.setSurfaceTemp(sensorData.get("temperature")); + toSave.setSurfacePressure(sensorData.get("pressure")); + } + calculatePWVForRecord(toSave); + + rtkDataRepository.save(toSave); + lastSaveTime = boundaryTime; + logger.info("成功保存RTK数据(整点对齐): 时间={}, 样本数={}, ZTD(均值)={}, ZTD(6h平滑)={}, PWV={}", + toSave.getTimestamp(), windowAvg.getSampleCount(), windowAvg.getZtdMean(), toSave.getZtdSmoothed6h(), toSave.getPwv()); } catch (Exception e) { logger.error("保存数据过程中发生错误: {}", e.getMessage(), e); } } + + private void calculatePWVForRecord(RtkData record) { + try { + if (record.getZtd() == null) { + return; + } + if (record.getSurfaceTemp() == null || record.getSurfacePressure() == null) { + Map sensorData = getLatestSensorData(); + if (sensorData != null) { + record.setSurfaceTemp(sensorData.get("temperature")); + record.setSurfacePressure(sensorData.get("pressure")); + } + } + PwvCalculator.calculatePWV(record); + } catch (Exception ex) { + logger.warn("PWV计算失败: {}", ex.getMessage()); + } + } + + private static class ZtdWindowMean { + private final LocalDateTime time; + private final Double ztdMean; + private ZtdWindowMean(LocalDateTime time, Double ztdMean) { + this.time = time; + this.ztdMean = ztdMean; + } + } + + private static class TenMinuteAggregator { + private double latSum; + private int latCount; + private double lonSum; + private int lonCount; + private double heightSum; + private int heightCount; + private double ztdSum; + private int ztdCount; + private double gradNorthSum; + private int gradNorthCount; + private double gradEastSum; + private int gradEastCount; + private String lastDeviceId; + private int sampleCount; + + synchronized void addSampleSnapshot(RtkData sample) { + // 仅在关键信息存在时累加 + if (sample.getLatitude() != null) { + latSum += sample.getLatitude(); + latCount++; + } + if (sample.getLongitude() != null) { + lonSum += sample.getLongitude(); + lonCount++; + } + if (sample.getHeight() != null) { + heightSum += sample.getHeight(); + heightCount++; + } + if (sample.getZtd() != null) { + ztdSum += sample.getZtd(); + ztdCount++; + } + if (sample.getZtdGradientNorth() != null) { + gradNorthSum += sample.getZtdGradientNorth(); + gradNorthCount++; + } + if (sample.getZtdGradientEast() != null) { + gradEastSum += sample.getZtdGradientEast(); + gradEastCount++; + } + if (sample.getDeviceId() != null) { + lastDeviceId = sample.getDeviceId(); + } + sampleCount++; + } + + synchronized Averages drainAndCompute() { + if (sampleCount == 0) { + reset(); + return new Averages(0, null, null, null, null, null, null, null); + } + Double latMean = latCount > 0 ? latSum / latCount : null; + Double lonMean = lonCount > 0 ? lonSum / lonCount : null; + Double heightMean = heightCount > 0 ? heightSum / heightCount : null; + Double ztdMean = ztdCount > 0 ? ztdSum / ztdCount : null; + Double gradNorthMean = gradNorthCount > 0 ? gradNorthSum / gradNorthCount : null; + Double gradEastMean = gradEastCount > 0 ? gradEastSum / gradEastCount : null; + int count = sampleCount; + String device = lastDeviceId; + reset(); + return new Averages(count, latMean, lonMean, heightMean, ztdMean, gradNorthMean, gradEastMean, device); + } + + private void reset() { + latSum = lonSum = heightSum = ztdSum = gradNorthSum = gradEastSum = 0.0; + latCount = lonCount = heightCount = ztdCount = gradNorthCount = gradEastCount = 0; + sampleCount = 0; + lastDeviceId = null; + } + + static class Averages { + private final int sampleCount; + private final Double latitudeMean; + private final Double longitudeMean; + private final Double heightMean; + private final Double ztdMean; + private final Double gradientNorthMean; + private final Double gradientEastMean; + private final String lastDeviceId; + + Averages(int sampleCount, Double latitudeMean, Double longitudeMean, Double heightMean, + Double ztdMean, Double gradientNorthMean, Double gradientEastMean, String lastDeviceId) { + this.sampleCount = sampleCount; + this.latitudeMean = latitudeMean; + this.longitudeMean = longitudeMean; + this.heightMean = heightMean; + this.ztdMean = ztdMean; + this.gradientNorthMean = gradientNorthMean; + this.gradientEastMean = gradientEastMean; + this.lastDeviceId = lastDeviceId; + } + + int getSampleCount() { return sampleCount; } + Double getLatitudeMean() { return latitudeMean; } + Double getLongitudeMean() { return longitudeMean; } + Double getHeightMean() { return heightMean; } + Double getZtdMean() { return ztdMean; } + Double getGradientNorthMean() { return gradientNorthMean; } + Double getGradientEastMean() { return gradientEastMean; } + String getLastDeviceId() { return lastDeviceId; } + } + } } diff --git a/ntrip_load-main/src/main/java/com/rtklib/entity/RtkData.java b/ntrip_load-main/src/main/java/com/rtklib/entity/RtkData.java index ee8c1fe..2307d34 100644 --- a/ntrip_load-main/src/main/java/com/rtklib/entity/RtkData.java +++ b/ntrip_load-main/src/main/java/com/rtklib/entity/RtkData.java @@ -33,6 +33,10 @@ public class RtkData { @Column(name = "ztd") private Double ztd; + // 6小时滑动平均的ZTD + @Column(name = "ztd_smoothed_6h") + private Double ztdSmoothed6h; + @Column(name = "ztd_gradient_north") private Double ztdGradientNorth; diff --git a/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketHandler.java b/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketHandler.java index 7028b48..2c32257 100644 --- a/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketHandler.java +++ b/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketHandler.java @@ -12,6 +12,8 @@ import org.springframework.web.socket.handler.TextWebSocketHandler; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicReference; @@ -20,6 +22,8 @@ public class RtkWebSocketHandler extends TextWebSocketHandler { private static RtkDataRepository rtkDataRepository; private static final AtomicReference latestData = new AtomicReference<>(); private static LocalDateTime lastSaveTime; + private static final TenMinuteAggregator tenMinuteAggregator = new TenMinuteAggregator(); + private static final Deque ztdHistory = new ConcurrentLinkedDeque<>(); @Autowired public void setRtkDataRepository(RtkDataRepository repository) { @@ -66,6 +70,7 @@ public class RtkWebSocketHandler extends TextWebSocketHandler { rtkData.setLongitude(Double.parseDouble(parts[5])); rtkData.setHeight(Double.parseDouble(parts[6])); latestData.set(rtkData); + tenMinuteAggregator.addSampleSnapshot(rtkData); } private void parseTropData(String message) { @@ -74,6 +79,7 @@ public class RtkWebSocketHandler extends TextWebSocketHandler { String[] parts = message.split(","); rtkData.setZtd(Double.parseDouble(parts[5])); latestData.set(rtkData); + tenMinuteAggregator.addSampleSnapshot(rtkData); } } @@ -84,6 +90,7 @@ public class RtkWebSocketHandler extends TextWebSocketHandler { rtkData.setZtdGradientNorth(Double.parseDouble(parts[5])); rtkData.setZtdGradientEast(Double.parseDouble(parts[6])); latestData.set(rtkData); + tenMinuteAggregator.addSampleSnapshot(rtkData); } } @@ -95,16 +102,120 @@ public class RtkWebSocketHandler extends TextWebSocketHandler { } } - @Scheduled(fixedRate = 600000) + // 对齐整10分钟保存(00,10,20,30,40,50) + @Scheduled(cron = "0 */10 * * * ?") public void saveData() { - RtkData currentData = latestData.get(); - if (currentData != null) { - LocalDateTime now = LocalDateTime.now(); - if (lastSaveTime == null || ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) { - rtkDataRepository.save(currentData); - lastSaveTime = now; - logger.info("Saved RTK data at: {}", now); + LocalDateTime boundaryTime = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES); + TenMinuteAggregator.Averages avg = tenMinuteAggregator.drainAndCompute(); + if (avg == null || avg.getSampleCount() == 0) { + logger.warn("10分钟窗口内无样本,跳过保存"); + return; + } + if (avg.getZtdMean() != null) { + ztdHistory.addLast(new ZtdWindowMean(boundaryTime, avg.getZtdMean())); + } + LocalDateTime sixHoursAgo = boundaryTime.minusHours(6); + while (!ztdHistory.isEmpty() && ztdHistory.peekFirst().time.isBefore(sixHoursAgo)) { + ztdHistory.pollFirst(); + } + Double smoothedZtd = null; + if (!ztdHistory.isEmpty()) { + double sum = 0.0; + int count = 0; + for (ZtdWindowMean m : ztdHistory) { + if (m.ztdMean != null) { sum += m.ztdMean; count++; } } + if (count > 0) smoothedZtd = sum / count; + } + + RtkData toSave = new RtkData(); + toSave.setTimestamp(boundaryTime); + toSave.setLatitude(avg.getLatitudeMean()); + toSave.setLongitude(avg.getLongitudeMean()); + toSave.setHeight(avg.getHeightMean()); + Double finalZtd = smoothedZtd != null ? smoothedZtd : avg.getZtdMean(); + toSave.setZtd(finalZtd); + toSave.setZtdSmoothed6h(smoothedZtd); + toSave.setZtdGradientNorth(avg.getGradientNorthMean()); + toSave.setZtdGradientEast(avg.getGradientEastMean()); + + rtkDataRepository.save(toSave); + lastSaveTime = boundaryTime; + logger.info("Saved RTK data at aligned boundary with 10-min mean and 6h smoothed ZTD: {}", boundaryTime); + } + + private static class ZtdWindowMean { + private final LocalDateTime time; + private final Double ztdMean; + private ZtdWindowMean(LocalDateTime time, Double ztdMean) { + this.time = time; + this.ztdMean = ztdMean; + } + } + + private static class TenMinuteAggregator { + private double latSum; private int latCount; + private double lonSum; private int lonCount; + private double heightSum; private int heightCount; + private double ztdSum; private int ztdCount; + private double gradNorthSum; private int gradNorthCount; + private double gradEastSum; private int gradEastCount; + private int sampleCount; + + synchronized void addSampleSnapshot(RtkData sample) { + if (sample.getLatitude() != null) { latSum += sample.getLatitude(); latCount++; } + if (sample.getLongitude() != null) { lonSum += sample.getLongitude(); lonCount++; } + if (sample.getHeight() != null) { heightSum += sample.getHeight(); heightCount++; } + if (sample.getZtd() != null) { ztdSum += sample.getZtd(); ztdCount++; } + if (sample.getZtdGradientNorth() != null) { gradNorthSum += sample.getZtdGradientNorth(); gradNorthCount++; } + if (sample.getZtdGradientEast() != null) { gradEastSum += sample.getZtdGradientEast(); gradEastCount++; } + sampleCount++; + } + + synchronized Averages drainAndCompute() { + if (sampleCount == 0) { reset(); return new Averages(0, null, null, null, null, null, null); } + Double latMean = latCount > 0 ? latSum / latCount : null; + Double lonMean = lonCount > 0 ? lonSum / lonCount : null; + Double heightMean = heightCount > 0 ? heightSum / heightCount : null; + Double ztdMean = ztdCount > 0 ? ztdSum / ztdCount : null; + Double gradNorthMean = gradNorthCount > 0 ? gradNorthSum / gradNorthCount : null; + Double gradEastMean = gradEastCount > 0 ? gradEastSum / gradEastCount : null; + int count = sampleCount; + reset(); + return new Averages(count, latMean, lonMean, heightMean, ztdMean, gradNorthMean, gradEastMean); + } + + private void reset() { + latSum = lonSum = heightSum = ztdSum = gradNorthSum = gradEastSum = 0.0; + latCount = lonCount = heightCount = ztdCount = gradNorthCount = gradEastCount = 0; + sampleCount = 0; + } + + static class Averages { + private final int sampleCount; + private final Double latitudeMean; + private final Double longitudeMean; + private final Double heightMean; + private final Double ztdMean; + private final Double gradientNorthMean; + private final Double gradientEastMean; + Averages(int sampleCount, Double latitudeMean, Double longitudeMean, Double heightMean, + Double ztdMean, Double gradientNorthMean, Double gradientEastMean) { + this.sampleCount = sampleCount; + this.latitudeMean = latitudeMean; + this.longitudeMean = longitudeMean; + this.heightMean = heightMean; + this.ztdMean = ztdMean; + this.gradientNorthMean = gradientNorthMean; + this.gradientEastMean = gradientEastMean; + } + int getSampleCount() { return sampleCount; } + Double getLatitudeMean() { return latitudeMean; } + Double getLongitudeMean() { return longitudeMean; } + Double getHeightMean() { return heightMean; } + Double getZtdMean() { return ztdMean; } + Double getGradientNorthMean() { return gradientNorthMean; } + Double getGradientEastMean() { return gradientEastMean; } } } } \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketServer.java b/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketServer.java index 2f41bcd..1e87a2c 100644 --- a/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketServer.java +++ b/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketServer.java @@ -17,8 +17,10 @@ import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; +import java.util.Deque; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicReference; @Component @@ -32,6 +34,8 @@ public class RtkWebSocketServer { private static final AtomicReference latestData = new AtomicReference<>(); // 存储最后一次保存的时间 private static LocalDateTime lastSaveTime; + private static final TenMinuteAggregator tenMinuteAggregator = new TenMinuteAggregator(); + private static final Deque ztdHistory = new ConcurrentLinkedDeque<>(); @Autowired public void setRtkDataRepository(RtkDataRepository repository) { @@ -87,6 +91,7 @@ public class RtkWebSocketServer { rtkData.setHeight(Double.parseDouble(parts[6])); latestData.set(rtkData); + tenMinuteAggregator.addSampleSnapshot(rtkData); } private void parseTropData(String message) { @@ -95,6 +100,7 @@ public class RtkWebSocketServer { String[] parts = message.split(","); rtkData.setZtd(Double.parseDouble(parts[5])); latestData.set(rtkData); + tenMinuteAggregator.addSampleSnapshot(rtkData); } } @@ -105,6 +111,7 @@ public class RtkWebSocketServer { rtkData.setZtdGradientNorth(Double.parseDouble(parts[5])); rtkData.setZtdGradientEast(Double.parseDouble(parts[6])); latestData.set(rtkData); + tenMinuteAggregator.addSampleSnapshot(rtkData); } } @@ -116,21 +123,120 @@ public class RtkWebSocketServer { } } - // 每10分钟执行一次数据保存 - @Scheduled(fixedRate = 600000) // 600000毫秒 = 10分钟 + // 对齐整10分钟保存(00,10,20,30,40,50) + @Scheduled(cron = "0 */10 * * * ?") public void saveData() { - RtkData currentData = latestData.get(); - if (currentData != null) { - LocalDateTime now = LocalDateTime.now(); - - // 检查是否需要保存(距离上次保存超过10分钟) - if (lastSaveTime == null || - ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) { - - rtkDataRepository.save(currentData); - lastSaveTime = now; - logger.info("Saved RTK data at: {}", now); + LocalDateTime boundaryTime = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES); + TenMinuteAggregator.Averages avg = tenMinuteAggregator.drainAndCompute(); + if (avg == null || avg.getSampleCount() == 0) { + logger.warn("10分钟窗口内无样本,跳过保存"); + return; + } + if (avg.getZtdMean() != null) { + ztdHistory.addLast(new ZtdWindowMean(boundaryTime, avg.getZtdMean())); + } + LocalDateTime sixHoursAgo = boundaryTime.minusHours(6); + while (!ztdHistory.isEmpty() && ztdHistory.peekFirst().time.isBefore(sixHoursAgo)) { + ztdHistory.pollFirst(); + } + Double smoothedZtd = null; + if (!ztdHistory.isEmpty()) { + double sum = 0.0; + int count = 0; + for (ZtdWindowMean m : ztdHistory) { + if (m.ztdMean != null) { sum += m.ztdMean; count++; } } + if (count > 0) smoothedZtd = sum / count; + } + + RtkData toSave = new RtkData(); + toSave.setTimestamp(boundaryTime); + toSave.setLatitude(avg.getLatitudeMean()); + toSave.setLongitude(avg.getLongitudeMean()); + toSave.setHeight(avg.getHeightMean()); + Double finalZtd = smoothedZtd != null ? smoothedZtd : avg.getZtdMean(); + toSave.setZtd(finalZtd); + toSave.setZtdSmoothed6h(smoothedZtd); + toSave.setZtdGradientNorth(avg.getGradientNorthMean()); + toSave.setZtdGradientEast(avg.getGradientEastMean()); + + rtkDataRepository.save(toSave); + lastSaveTime = boundaryTime; + logger.info("Saved RTK data at aligned boundary with 10-min mean and 6h smoothed ZTD: {}", boundaryTime); + } + + private static class ZtdWindowMean { + private final LocalDateTime time; + private final Double ztdMean; + private ZtdWindowMean(LocalDateTime time, Double ztdMean) { + this.time = time; + this.ztdMean = ztdMean; + } + } + + private static class TenMinuteAggregator { + private double latSum; private int latCount; + private double lonSum; private int lonCount; + private double heightSum; private int heightCount; + private double ztdSum; private int ztdCount; + private double gradNorthSum; private int gradNorthCount; + private double gradEastSum; private int gradEastCount; + private int sampleCount; + + synchronized void addSampleSnapshot(RtkData sample) { + if (sample.getLatitude() != null) { latSum += sample.getLatitude(); latCount++; } + if (sample.getLongitude() != null) { lonSum += sample.getLongitude(); lonCount++; } + if (sample.getHeight() != null) { heightSum += sample.getHeight(); heightCount++; } + if (sample.getZtd() != null) { ztdSum += sample.getZtd(); ztdCount++; } + if (sample.getZtdGradientNorth() != null) { gradNorthSum += sample.getZtdGradientNorth(); gradNorthCount++; } + if (sample.getZtdGradientEast() != null) { gradEastSum += sample.getZtdGradientEast(); gradEastCount++; } + sampleCount++; + } + + synchronized Averages drainAndCompute() { + if (sampleCount == 0) { reset(); return new Averages(0, null, null, null, null, null, null); } + Double latMean = latCount > 0 ? latSum / latCount : null; + Double lonMean = lonCount > 0 ? lonSum / lonCount : null; + Double heightMean = heightCount > 0 ? heightSum / heightCount : null; + Double ztdMean = ztdCount > 0 ? ztdSum / ztdCount : null; + Double gradNorthMean = gradNorthCount > 0 ? gradNorthSum / gradNorthCount : null; + Double gradEastMean = gradEastCount > 0 ? gradEastSum / gradEastCount : null; + int count = sampleCount; + reset(); + return new Averages(count, latMean, lonMean, heightMean, ztdMean, gradNorthMean, gradEastMean); + } + + private void reset() { + latSum = lonSum = heightSum = ztdSum = gradNorthSum = gradEastSum = 0.0; + latCount = lonCount = heightCount = ztdCount = gradNorthCount = gradEastCount = 0; + sampleCount = 0; + } + + static class Averages { + private final int sampleCount; + private final Double latitudeMean; + private final Double longitudeMean; + private final Double heightMean; + private final Double ztdMean; + private final Double gradientNorthMean; + private final Double gradientEastMean; + Averages(int sampleCount, Double latitudeMean, Double longitudeMean, Double heightMean, + Double ztdMean, Double gradientNorthMean, Double gradientEastMean) { + this.sampleCount = sampleCount; + this.latitudeMean = latitudeMean; + this.longitudeMean = longitudeMean; + this.heightMean = heightMean; + this.ztdMean = ztdMean; + this.gradientNorthMean = gradientNorthMean; + this.gradientEastMean = gradientEastMean; + } + int getSampleCount() { return sampleCount; } + Double getLatitudeMean() { return latitudeMean; } + Double getLongitudeMean() { return longitudeMean; } + Double getHeightMean() { return heightMean; } + Double getZtdMean() { return ztdMean; } + Double getGradientNorthMean() { return gradientNorthMean; } + Double getGradientEastMean() { return gradientEastMean; } } } } \ No newline at end of file