From 9913b8a7326aa88a6045e36851ef84ddeeda3c0e Mon Sep 17 00:00:00 2001 From: zms Date: Tue, 8 Jul 2025 17:20:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E5=9F=BA=E7=AB=99=E5=88=87?= =?UTF-8?q?=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/D331RtcmMessageExecutor.java | 929 +++++++++++++----- 1 file changed, 677 insertions(+), 252 deletions(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java index fc7b30d6..2753fe6f 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java @@ -21,14 +21,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.scheduling.annotation.Scheduled; +import javax.annotation.PostConstruct; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.time.Duration; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; - /** * @author Layton * @date 2023/2/2 20:49 @@ -38,7 +37,14 @@ public class D331RtcmMessageExecutor implements Executor private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private static final Map deviceBackupStatus = new ConcurrentHashMap<>(); + // 基站状态管理 + private static final Map baseStationStatusMap = new ConcurrentHashMap<>(); + + // 测站当前使用的基站映射 + private static final Map deviceCurrentBaseMap = new ConcurrentHashMap<>(); + + // 基站切换监听器列表 + private static final List switchListeners = new ArrayList<>(); @Autowired private DeviceService deviceService; @@ -53,33 +59,124 @@ public class D331RtcmMessageExecutor implements Executor private final Map lastD300ForwardTimeMap = new ConcurrentHashMap<>(); private static final long D300_FORWARD_INTERVAL = 5000; // 5秒,单位毫秒 + private static final Map deviceLastTime = new ConcurrentHashMap<>(); + private static final long LOG_INTERVAL = 10 * 60 * 1000L; + + // 基站在线检查间隔 - 修改为秒 + private static final long BASE_STATION_CHECK_INTERVAL = 30; // 30秒 + private static final long BASE_STATION_OFFLINE_TIMEOUT = 5 * 60; // 5分钟,单位:秒 + + /** + * 基站状态枚举 + */ + public enum BaseStationStatusEnum { + ONLINE, + OFFLINE, + SWITCHING + } + + /** + * 基站状态信息 + */ + public static class BaseStationStatus { + private String baseStationId; + private BaseStationStatusEnum status; + private LocalDateTime lastActiveTime; + private LocalDateTime statusChangeTime; + private Set servingDevices; // 当前服务的测站列表 + + public BaseStationStatus(String baseStationId) { + this.baseStationId = baseStationId; + this.status = BaseStationStatusEnum.OFFLINE; + this.lastActiveTime = LocalDateTime.now(); + this.statusChangeTime = LocalDateTime.now(); + this.servingDevices = ConcurrentHashMap.newKeySet(); + } + + // Getters and Setters + public String getBaseStationId() { return baseStationId; } + public BaseStationStatusEnum getStatus() { return status; } + public void setStatus(BaseStationStatusEnum status) { + if (this.status != status) { + this.status = status; + this.statusChangeTime = LocalDateTime.now(); + } + } + public LocalDateTime getLastActiveTime() { return lastActiveTime; } + public void setLastActiveTime(LocalDateTime lastActiveTime) { this.lastActiveTime = lastActiveTime; } + public LocalDateTime getStatusChangeTime() { return statusChangeTime; } + public Set getServingDevices() { return servingDevices; } + } + + /** + * 基站切换监听器接口 + */ + public interface BaseStationSwitchListener { + void onBaseStationSwitch(String deviceId, String fromBaseId, String toBaseId, String reason); + void onBaseStationStatusChange(String baseStationId, BaseStationStatusEnum oldStatus, BaseStationStatusEnum newStatus); + } + + /** + * 默认的基站切换监听器实现 + */ + private class DefaultBaseStationSwitchListener implements BaseStationSwitchListener { + @Override + public void onBaseStationSwitch(String deviceId, String fromBaseId, String toBaseId, String reason) { + logger.info("Base station switch - Device: {}, From: {} To: {}, Reason: {}", + deviceId, fromBaseId, toBaseId, reason); + + ThreadManager.getFixedThreadPool().submit(() -> { + try { + notifyBaseStationSwitch(deviceId, fromBaseId, toBaseId, reason); + } catch (Exception e) { + logger.error("Failed to send base station switch notification", e); + } + }); + } + + @Override + public void onBaseStationStatusChange(String baseStationId, BaseStationStatusEnum oldStatus, BaseStationStatusEnum newStatus) { + logger.info("Base station status change - BaseStation: {}, From: {} To: {}", + baseStationId, oldStatus, newStatus); + + ThreadManager.getFixedThreadPool().submit(() -> { + try { + notifyBaseStationStatusChange(baseStationId, oldStatus, newStatus); + } catch (Exception e) { + logger.error("Failed to send base station status notification", e); + } + }); + } + } + + // 初始化默认监听器 + @PostConstruct + public void init() { + switchListeners.add(new DefaultBaseStationSwitchListener()); + logger.info("D331RtcmMessageExecutor initialized"); + } + @Override public Void execute(D331RtcmMessage message) { String id = message.getId(); + + // 更新基站状态 + updateBaseStationStatus(id); + // 补齐tenantId Device deviceBs = deviceService.findByDeviceId(id); if(deviceBs == null || deviceBs.getOpMode() == GnssDevice.OP_MODE_UNUSE || deviceBs.getOpMode() == GnssDevice.OP_MODE_CHECK) { - logger.warn("基站 {} 不存在或未启用 (opMode: {})", - id, deviceBs != null ? deviceBs.getOpMode() : "null"); return null; } // 推送基站数据 - if(deviceBs.getOpMode() == GnssDevice.OP_MODE_USE || deviceBs.getOpMode() == null) { + if(deviceBs.getOpMode() == GnssDevice.OP_MODE_USE) { byte[] forwardBytes = message.getSrcData(); + // 获取使用该基站(包括作为主基站和备选基站)的所有测站 List primaryDevices = deviceService.findByParentId(id); List backupDevices = deviceService.findByParentId1(id); - // 添加调试日志 - logger.info("基站 {} 状态: opMode={}, lastRxTime={}, d3xxCount={}", - id, deviceBs.getOpMode(), deviceBs.getLastRxTime(), deviceBs.getD3xxCount()); - - if (!primaryDevices.isEmpty() || !backupDevices.isEmpty()) { - logger.debug("基站 {} 关联的主站设备数: {}, 备站设备数: {}", - id, primaryDevices.size(), backupDevices.size()); - } - // 合并两个列表 List allDevices = new ArrayList<>(); allDevices.addAll(primaryDevices); @@ -87,69 +184,35 @@ public class D331RtcmMessageExecutor implements Executor DeviceChannel deviceChannel = null; for (Device device : allDevices) { +<<<<<<< Updated upstream if (device.getOpMode() != GnssDevice.OP_MODE_USE) continue; if ((device.getModel()==GnssDevice.MODEL_G510) && (device.getFixedNum()>100) && (device.getGnssSampleRate()>1) && (deviceBs.getD3xxCount()%device.getGnssSampleRate()) != 0) { //if(!UBXUtil.has1005(forwardBytes)) continue; //1005必推 +======= + if (device.getOpMode() != GnssDevice.OP_MODE_USE) { +>>>>>>> Stashed changes continue; } String deviceId = device.getDeviceId(); - // 检查该设备是否应该接收此基站的数据 - String primaryBaseId = device.getParentId(); - String backupBaseId = device.getParentId1(); + // 智能基站选择和切换 + String selectedBaseId = selectOptimalBaseStation(device); - // 添加调试日志 - logger.info("测站 {} 配置信息: 主基站={}, 备用基站={}, 当前数据来自基站={}", - deviceId, primaryBaseId, backupBaseId, id); - - // 如果当前基站是该设备的备选基站,需要检查主基站是否离线 - if (id.equals(backupBaseId)) { - Device primaryBase = deviceService.findByDeviceId(primaryBaseId); - // 添加调试日志 - if (primaryBase != null) { - logger.info("测站 {} 的主基站 {} 状态: lastRxTime={}, opMode={}", - deviceId, primaryBaseId, - primaryBase.getLastRxTime(), - primaryBase.getOpMode()); - } else { - logger.warn("测站 {} 的主基站 {} 未找到或已删除", deviceId, primaryBaseId); - } - - // 如果主基站仍然在线,则跳过备选基站的数据 - if (primaryBase != null && isBaseStationOnline(primaryBase)) { - if(deviceBackupStatus.remove(deviceId) != null){ - logger.info("设备 {} 从备用基站 {} 切换回主基站 {}", - deviceId, id, primaryBaseId); - } - logger.debug("主基站在线,跳过备用基站数据"); - continue; - } else { - String hexPrimaryBase = String.format("%06x",Integer.parseInt(primaryBaseId)); - if(deviceBackupStatus.putIfAbsent(deviceId,true) == null){ - logger.info("设备 {} 从主基站 {} 切换到备用基站 {} (原因: {})", - deviceId, primaryBaseId, id, - primaryBase == null ? "主基站不存在" : "主基站离线"); - } - - byte[] modifyData = forwardBytes.clone(); - modifyData[5] = (byte) Integer.parseInt(hexPrimaryBase.substring(0,2),16); - modifyData[6] = (byte) Integer.parseInt(hexPrimaryBase.substring(2,4),16); - modifyData[7] = (byte) Integer.parseInt(hexPrimaryBase.substring(4,6),16); - - forwardBytes = modifyData; - logger.debug("使用备用基站数据,但保持主基站ID: {}", primaryBaseId); - } - } else if (id.equals(primaryBaseId)) { - // 如果是主基站,确保清除备用状态 - if(deviceBackupStatus.remove(deviceId) != null) { - logger.info("设备 {} 恢复使用主基站 {}", deviceId, id); - } + if (selectedBaseId == null || !selectedBaseId.equals(id)) { + continue; // 当前基站不是该设备的最优选择 } - // 添加日志显示当前测站使用的基站 - logger.info("测站 {} 当前使用的基站: {}", deviceId, id); + // 验证兼容性(双重检查) + if (!isBaseStationCompatibleWithDevice(selectedBaseId, device)) { + logger.warn("Base station {} selected but not compatible with device {}, skipping", + selectedBaseId, deviceId); + continue; + } + + // 获取要转发的数据(可能需要修改基站ID) + byte[] deviceForwardBytes = prepareForwardData(forwardBytes, device, selectedBaseId); // 获取设备通道并发送数据 if(device.getDataChannelType() == Device.CHANNEL_TYPE_UDP) { @@ -158,168 +221,360 @@ public class D331RtcmMessageExecutor implements Executor deviceChannel = OnlineChannels.INSTANCE.getConfigChannel(deviceId); } - // 读取数据库中model字段,判断基站类型 - Short baseStationModel = deviceBs.getModel(); - // 如果model为null,使用默认值0 - if (baseStationModel == null) { - baseStationModel = 0; - logger.warn("Base station model is null for device: {}, using default value 0", id); - } - - if (baseStationModel == 1) { - // 基站类型为1,正常执行 - if(deviceChannel != null && deviceChannel.isOnline()) { - if (logger.isDebugEnabled()) { - logger.debug("forward d331 rtcm from {} to device {}", id, deviceId); - } - ByteBuf buf = Unpooled.buffer(); - buf.writeBytes(forwardBytes); - deviceChannel.writeAndFlush(buf); - } - } else if (baseStationModel == 0) { - //logger.info("Base station model is 0 for device: {}", deviceId); - - Short deviceModel = device.getModel(); - // 如果model为null,使用默认值0 - if (deviceModel == null) { - deviceModel = 0; - //logger.warn("Device model is null for device: {}, using default value 0", deviceId); - } - - if(deviceModel == 0){ - // 测站类型为0,正常执行 - if(deviceId.startsWith("2307")){ - // 处理2307型号的测站 - forwardBytes[2] = (byte) (forwardBytes[2] & 0x07);//兼容不带序号的测站 - } - // 对所有测站类型为0的设备执行转发 - if(deviceChannel != null && deviceChannel.isOnline()) { - if (logger.isDebugEnabled()) { - logger.debug("forward d331 rtcm from {} to device {}", id, deviceId); - } - ByteBuf buf = Unpooled.buffer(); - buf.writeBytes(forwardBytes); - deviceChannel.writeAndFlush(buf); - } - } - else if(deviceModel == 1){ - //logger.info("Device model is 1 for device: {}", deviceId); - - if(deviceChannel != null && deviceChannel.isOnline()) { - //logger.info("Device channel is online for device: {}", deviceId); - - ByteBuf buf = Unpooled.buffer(); - buf.writeBytes(forwardBytes); - - // 检查是否满足10秒转发间隔,只有满足条件时才添加D300字符串 - long currentTime = System.currentTimeMillis(); - Long lastForwardTime = lastD300ForwardTimeMap.getOrDefault(deviceId, 0L); - - if(currentTime - lastForwardTime >= D300_FORWARD_INTERVAL) { - //logger.info("Adding D300 string for device: {}", deviceId); - - // 获取当前buf中的数据 - byte[] originalData = buf.array(); - String originalHex = ByteUtil.bytesToHexString(originalData); - - // 找到D300和D301的位置 - int d300Index = originalHex.indexOf("d300"); - int d301Index = originalHex.indexOf("d301"); - - // 确定插入位置:如果两个都存在,取位置靠前的;如果只存在一个,就用那个位置 - int insertIndex = -1; - if (d300Index != -1 && d301Index != -1) { - // 两个都存在,取位置靠前的 - insertIndex = (d300Index < d301Index) ? d300Index : d301Index; - //logger.info("Found both D300 and D301, D300 at {}, D301 at {}, will insert before position {}", - /// d300Index, d301Index, insertIndex); - } else if (d300Index != -1) { - insertIndex = d300Index; - //logger.info("Found D300 at position {}", d300Index); - } else if (d301Index != -1) { - insertIndex = d301Index; - //logger.info("Found D301 at position {}", d301Index); - } - - - if (insertIndex != -1) { - // 创建新的buf - ByteBuf newBuf = Unpooled.buffer(); - // 写入D300/D301之前的数据 - newBuf.writeBytes(originalData, 0, insertIndex / 2); - - // 使用f9p坐标生成1005消息,并插入 - double[] ecef =new double[3]; - ecef[0] = deviceBs.getEcefx(); - ecef[1] = deviceBs.getEcefy(); - ecef[2] = deviceBs.getEcefz(); - String rtcm1005 = Rtcm1005.generateRtcm1005Hex(ecef, 0); - if (rtcm1005 != null) { - // 写入RTCM 1005消息 - byte[] rtcm1005Bytes = ByteUtil.hexStringTobyte(rtcm1005); - newBuf.writeBytes(rtcm1005Bytes); - // logger.info("Generated RTCM 1005 message for base station {}: {}", deviceBs.getDeviceId(), rtcm1005); - } else { - //logger.warn("Failed to generate RTCM 1005 message for base station: {}", deviceBs.getDeviceId()); - } - - // 写入剩余的数据 - newBuf.writeBytes(originalData, insertIndex / 2, originalData.length - insertIndex / 2); - - // 更新buf - buf = newBuf; - - // 添加日志,记录插入位置和完整数据 -// logger.info("Inserted RTCM 1005 message before position {}, complete data: {}", -// insertIndex, -// ByteUtil.bytesToHexString(buf.array())); - } - - // 更新最后转发时间 - lastD300ForwardTimeMap.put(deviceId, currentTime); - - // 添加日志,记录测站转发的完整数据 - //logger.info("Forward data to device: {}, time: {}, complete data: {}", - // deviceId, - // LocalDateTime.now(), - // ByteUtil.bytesToHexString(buf.array())); - } - - deviceChannel.writeAndFlush(buf); - } - } + // 根据基站和设备类型处理数据转发 + if (shouldForwardData(deviceBs, device)) { + forwardDataToDevice(deviceChannel, deviceForwardBytes, device, deviceBs, id, deviceId); } } } - // 如果30分钟内收到不到d3f0和d3f2,则根据UDP最后一个报文触发状态更新和统计 - if(deviceBs.getD3xxbytes()>0){ - LocalDateTime now = LocalDateTime.now(); - if(deviceBs.getLastRxTime().isBefore(now.minusMinutes(1)) && - (deviceBs.getLastD3f2Time() == null || - deviceBs.getLastD3f2Time().isBefore(now.minusMinutes(30)))) { - // new cycle - logger.info("device {} rx {} d331 in a cycle while not d3f0f2",deviceBs.getDeviceId(),deviceBs.getD3xxCount()); + // 处理设备状态和统计信息 + processDeviceStatistics(deviceBs, message); - Device lastCycleDevice = new Device(); - lastCycleDevice.setDeviceId(deviceBs.getDeviceId()); - lastCycleDevice.setDeviceType(deviceBs.getDeviceType()); - lastCycleDevice.setTenantId(deviceBs.getTenantId()); - lastCycleDevice.setD341bytes(deviceBs.getD341bytes()); - lastCycleDevice.setD341Count(deviceBs.getD341Count()); - lastCycleDevice.setFixedNum(deviceBs.getFixedNum()); - lastCycleDevice.setFloatNum(deviceBs.getFloatNum()); - lastCycleDevice.setNoFixedAndFloatResult(deviceBs.getNoFixedAndFloatResult()); - lastCycleDevice.setLastRxTime(deviceBs.getLastRxTime()); - lastCycleDevice.setLastValidCalcDataTime(deviceBs.getLastValidCalcDataTime()); - lastCycleDevice.setSatelitesInUse(deviceBs.getSatelitesInUse()); + // 添加NTRIP处理 + byte[] srcdata = message.getSrcData(); + String rtcm = ByteUtil.bytesToHexString(srcdata); + sendToNtrip(id, rtcm); + + // 日志记录 + logMessageIfNeeded(deviceBs, message); + + return null; + } + + /** + * 更新基站状态 + */ + private void updateBaseStationStatus(String baseStationId) { + BaseStationStatus status = baseStationStatusMap.computeIfAbsent(baseStationId, + k -> new BaseStationStatus(baseStationId)); + + BaseStationStatusEnum oldStatus = status.getStatus(); + status.setLastActiveTime(LocalDateTime.now()); + + if (oldStatus == BaseStationStatusEnum.OFFLINE) { + status.setStatus(BaseStationStatusEnum.ONLINE); + logger.info("Base station {} changed status from {} to {}", baseStationId, oldStatus, BaseStationStatusEnum.ONLINE); + notifyStatusChange(baseStationId, oldStatus, BaseStationStatusEnum.ONLINE); + + // 检查是否有设备需要切换回主基站 + checkAndSwitchBackToPrimary(baseStationId); + } + } + + /** + * 检查并切换回主基站 + */ + private void checkAndSwitchBackToPrimary(String primaryBaseId) { + ThreadManager.getFixedThreadPool().submit(() -> { + try { + List devicesUsingPrimary = deviceService.findByParentId(primaryBaseId); + + for (Device device : devicesUsingPrimary) { + String currentBase = deviceCurrentBaseMap.get(device.getDeviceId()); + + if (currentBase != null && !currentBase.equals(primaryBaseId)) { + // 设备当前使用备用基站,需要切换回主基站 + logger.info("Switching device {} back to primary base station {} from {}", + device.getDeviceId(), primaryBaseId, currentBase); + switchDeviceToBaseStation(device.getDeviceId(), primaryBaseId, + "Primary base station back online"); + } + } + } catch (Exception e) { + logger.error("Error checking devices for primary base station switch back", e); + } + }); + } + + /** + * 智能选择最优基站 + */ + private String selectOptimalBaseStation(Device device) { + String primaryBaseId = device.getParentId(); + String backupBaseId = device.getParentId1(); + String deviceId = device.getDeviceId(); + + // 获取当前使用的基站 + String currentBaseId = deviceCurrentBaseMap.get(deviceId); + + // 获取所有候选基站,按优先级排序 + List candidateBases = getCandidateBaseStations(device); + + for (String baseId : candidateBases) { + // 检查基站是否在线 + boolean isOnline = isBaseStationOnline(baseId); + + if (!isOnline) { + continue; + } + + // 检查基站与设备的兼容性 + if (!isBaseStationCompatibleWithDevice(baseId, device)) { + continue; + } + + // 如果当前基站发生变化,记录切换 + if (currentBaseId != null && !currentBaseId.equals(baseId)) { + switchDeviceToBaseStation(deviceId, baseId, + String.format("Switch from %s to %s (priority/availability)", currentBaseId, baseId)); + } else if (currentBaseId == null) { + // 首次连接 + deviceCurrentBaseMap.put(deviceId, baseId); + BaseStationStatus status = baseStationStatusMap.get(baseId); + if (status != null) { + status.getServingDevices().add(deviceId); + } + } + return baseId; + } + + logger.warn("No compatible and available base station found for device: {}", deviceId); + return null; + } + + /** + * 检查基站与设备的兼容性 + */ + private boolean isBaseStationCompatibleWithDevice(String baseStationId, Device device) { + // 获取基站设备信息 + Device baseStation = deviceService.findByDeviceId(baseStationId); + if (baseStation == null) { + logger.warn("Base station {} not found", baseStationId); + return false; + } + + // 检查基站是否处于可用状态 + if (baseStation.getOpMode() != GnssDevice.OP_MODE_USE) { + return false; + } + + // 使用已有的兼容性判断逻辑 + return shouldForwardData(baseStation, device); + } + + /** + * 获取候选基站列表(按优先级排序) + */ + private List getCandidateBaseStations(Device device) { + List candidates = new ArrayList<>(); + + // 主基站优先级最高 + if (device.getParentId() != null) { + candidates.add(device.getParentId()); + } + + // 备用基站 + if (device.getParentId1() != null) { + candidates.add(device.getParentId1()); + } + + return candidates; + } + + /** + * 切换设备到指定基站 + */ + private void switchDeviceToBaseStation(String deviceId, String newBaseId, String reason) { + String oldBaseId = deviceCurrentBaseMap.get(deviceId); + + logger.info("Executing base station switch for device {}: {} -> {}, Reason: {}", + deviceId, oldBaseId, newBaseId, reason); + + // 更新映射 + deviceCurrentBaseMap.put(deviceId, newBaseId); + + // 更新基站服务列表 + if (oldBaseId != null) { + BaseStationStatus oldStatus = baseStationStatusMap.get(oldBaseId); + if (oldStatus != null) { + oldStatus.getServingDevices().remove(deviceId); + } + } + + BaseStationStatus newStatus = baseStationStatusMap.get(newBaseId); + if (newStatus != null) { + newStatus.getServingDevices().add(deviceId); + } + + // 通知监听器 + for (BaseStationSwitchListener listener : switchListeners) { + try { + listener.onBaseStationSwitch(deviceId, oldBaseId, newBaseId, reason); + } catch (Exception e) { + logger.error("Error notifying base station switch listener", e); + } + } + } + + /** + * 准备转发数据(可能需要修改基站ID) + */ + private byte[] prepareForwardData(byte[] originalData, Device device, String selectedBaseId) { + String primaryBaseId = device.getParentId(); + + // 如果选择的基站不是主基站,需要修改RTCM数据中的基站ID + if (primaryBaseId != null && !selectedBaseId.equals(primaryBaseId)) { + try { + byte[] modifyData = originalData.clone(); + // 修复:使用Long.parseLong而不是Integer.parseInt,因为基站ID可能很大 + long primaryBaseIdLong = Long.parseLong(primaryBaseId); + String hexPrimaryBase = String.format("%06x", primaryBaseIdLong); + + if (hexPrimaryBase.length() >= 6) { + modifyData[5] = (byte) Integer.parseInt(hexPrimaryBase.substring(0, 2), 16); + modifyData[6] = (byte) Integer.parseInt(hexPrimaryBase.substring(2, 4), 16); + modifyData[7] = (byte) Integer.parseInt(hexPrimaryBase.substring(4, 6), 16); + } + + return modifyData; + } catch (NumberFormatException e) { + logger.error("Error parsing base station ID: {}", primaryBaseId, e); + return originalData; + } + } + + return originalData; + } + + /** + * 判断是否应该转发数据 + * 基站类型说明: + * - model = 1: f9p基站,所有设备都可以接收 + * - model = 0: 博通基站,只有博通设备(model=0)和兼容设备(model=1)可以接收 + */ + private boolean shouldForwardData(Device baseStation, Device device) { + Short baseStationModel = baseStation.getModel(); + if (baseStationModel == null) { + baseStationModel = 0; + } + + Short deviceModel = device.getModel(); + if (deviceModel == null) { + deviceModel = 0; + } + + if (baseStationModel == 1) { + // f9p基站,所有设备都可以接收 + return true; + } else if (baseStationModel == 0) { + // 博通基站,只支持特定设备 + return (deviceModel == 0 || deviceModel == 1); + } + + return false; + } + + /** + * 转发数据到设备 + */ + private void forwardDataToDevice(DeviceChannel deviceChannel, byte[] forwardBytes, + Device device, Device deviceBs, String baseStationId, String deviceId) { + if (deviceChannel == null || !deviceChannel.isOnline()) { + return; + } + + Short deviceModel = device.getModel(); + if (deviceModel == null) { + deviceModel = 0; + } + + if (logger.isDebugEnabled()) { + logger.debug("forward d331 rtcm from {} to device {}", baseStationId, deviceId); + } + + ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(forwardBytes); + + // 特殊处理设备类型 + if (deviceModel == 0 && deviceId.startsWith("2307")) { + // 处理2307型号的测站 + byte[] modifiedData = buf.array(); + modifiedData[2] = (byte) (modifiedData[2] & 0x07); // 兼容不带序号的测站 + } else if (deviceModel == 1) { + // 处理类型1的设备,可能需要添加D300数据 + buf = handleDeviceType1(buf, device, deviceBs, deviceId); + } + + deviceChannel.writeAndFlush(buf); + } + + /** + * 处理设备类型1的特殊逻辑 + */ + private ByteBuf handleDeviceType1(ByteBuf buf, Device device, Device deviceBs, String deviceId) { + long currentTime = System.currentTimeMillis(); + Long lastForwardTime = lastD300ForwardTimeMap.getOrDefault(deviceId, 0L); + + if (currentTime - lastForwardTime >= D300_FORWARD_INTERVAL) { + byte[] originalData = buf.array(); + String originalHex = ByteUtil.bytesToHexString(originalData); + + int d300Index = originalHex.indexOf("d300"); + int d301Index = originalHex.indexOf("d301"); + + int insertIndex = -1; + if (d300Index != -1 && d301Index != -1) { + insertIndex = Math.min(d300Index, d301Index); + } else if (d300Index != -1) { + insertIndex = d300Index; + } else if (d301Index != -1) { + insertIndex = d301Index; + } + + if (insertIndex != -1) { + ByteBuf newBuf = Unpooled.buffer(); + newBuf.writeBytes(originalData, 0, insertIndex / 2); + + // 使用f9p坐标生成1005消息 + double[] ecef = new double[3]; + // 修复:确保坐标值不为null,并正确处理Double类型 + Double ecefX = deviceBs.getEcefx(); + Double ecefY = deviceBs.getEcefy(); + Double ecefZ = deviceBs.getEcefz(); + + ecef[0] = ecefX != null ? ecefX.doubleValue() : 0.0; + ecef[1] = ecefY != null ? ecefY.doubleValue() : 0.0; + ecef[2] = ecefZ != null ? ecefZ.doubleValue() : 0.0; + + String rtcm1005 = Rtcm1005.generateRtcm1005Hex(ecef, 0); + if (rtcm1005 != null) { + byte[] rtcm1005Bytes = ByteUtil.hexStringTobyte(rtcm1005); + newBuf.writeBytes(rtcm1005Bytes); + } + + newBuf.writeBytes(originalData, insertIndex / 2, originalData.length - insertIndex / 2); + buf = newBuf; + } + + lastD300ForwardTimeMap.put(deviceId, currentTime); + } + + return buf; + } + + /** + * 处理设备统计信息 + */ + private void processDeviceStatistics(Device deviceBs, D331RtcmMessage message) { + if (deviceBs.getD3xxbytes() > 0) { + LocalDateTime now = LocalDateTime.now(); + if (deviceBs.getLastRxTime() != null && + deviceBs.getLastRxTime().isBefore(now.minusMinutes(1)) && + (deviceBs.getLastD3f2Time() == null || + deviceBs.getLastD3f2Time().isBefore(now.minusMinutes(30)))) { + + logger.info("device {} rx {} d331 in a cycle while not d3f0f2", + deviceBs.getDeviceId(), deviceBs.getD3xxCount()); + + Device lastCycleDevice = createLastCycleDevice(deviceBs); deviceBs.clearStat(); + ThreadManager.getFixedThreadPool().submit(() -> { - // 通知上线 try { beidouClient.onDeviceActive(deviceBs.getDeviceId(), deviceBs.getTenantId()); } catch (Exception e) { - logger.error(e.toString()); + logger.error("Error notifying device active", e); } dataPersistService.updateDeviceState(lastCycleDevice); }); @@ -327,30 +582,48 @@ public class D331RtcmMessageExecutor implements Executor } // update trx - deviceBs.updateRx(message.getHeader(),message.getLen(),message.getPacketNum()); + deviceBs.updateRx(message.getHeader(), message.getLen(), message.getPacketNum()); + // update gga Gga gga = message.getGga(); - if(gga != null) { + if (gga != null) { deviceBs.updateSatelitesNum(gga.getSatellitesInUsed()); + // 修复:正确处理Double类型的坐标 deviceBs.setLatitude(gga.getLatitude()); deviceBs.setLongitude(gga.getLongitude()); deviceBs.setAltitude(gga.getAltitude()); } + } - // 添加NTRIP处理 - byte[] srcdata = message.getSrcData(); - String rtcm = ByteUtil.bytesToHexString(srcdata); - sendToNtrip(id, rtcm); + /** + * 创建上一周期设备信息 + */ + private Device createLastCycleDevice(Device deviceBs) { + Device lastCycleDevice = new Device(); + lastCycleDevice.setDeviceId(deviceBs.getDeviceId()); + lastCycleDevice.setDeviceType(deviceBs.getDeviceType()); + lastCycleDevice.setTenantId(deviceBs.getTenantId()); + lastCycleDevice.setD341bytes(deviceBs.getD341bytes()); + lastCycleDevice.setD341Count(deviceBs.getD341Count()); + lastCycleDevice.setFixedNum(deviceBs.getFixedNum()); + lastCycleDevice.setFloatNum(deviceBs.getFloatNum()); + lastCycleDevice.setNoFixedAndFloatResult(deviceBs.getNoFixedAndFloatResult()); + lastCycleDevice.setLastRxTime(deviceBs.getLastRxTime()); + lastCycleDevice.setLastValidCalcDataTime(deviceBs.getLastValidCalcDataTime()); + lastCycleDevice.setSatelitesInUse(deviceBs.getSatelitesInUse()); + return lastCycleDevice; + } + /** + * 记录日志(如果需要) + */ + private void logMessageIfNeeded(Device deviceBs, D331RtcmMessage message) { ThreadManager.getFixedThreadPool().submit(() -> { - // 原始码流输出到日志文件 -- INFO 级别 - // 只有测站开了日志记录,或者消息来自基站,才将原码记录到日志文件 - if(deviceBs.getLoggingmode() == GnssDevice.LOGGING_MODE_FULL){ - logger.info("receive {} d331 message: {}", message.getId(), DataTypeUtil.getHexString(message.getSrcData())); + if (deviceBs.getLoggingmode() != null && deviceBs.getLoggingmode() == GnssDevice.LOGGING_MODE_FULL) { + logger.info("receive {} d331 message: {}", message.getId(), + DataTypeUtil.getHexString(message.getSrcData())); } }); - - return null; } @Override @@ -358,38 +631,190 @@ public class D331RtcmMessageExecutor implements Executor return D331RtcmMessage.class; } + /** + * 发送数据到NTRIP + */ private void sendToNtrip(String mountpoint, String hexData) { try { - - // 将原始字节转换为16进制字符串用于RTCM提取 - //String hexData = ByteUtil.bytesToHexString(rawData); - //System.out.println(hexData); - - // 提取RTCM数据并发送到NtripServer,使用设备ID作为挂载点 Optional.ofNullable(RtcmGgaUtil.getRtcms(hexData)) - .ifPresent(rtcm -> { - //System.out.println("挂载点: " + mountpoint); - //System.out.println("RTCM数据: " + rtcm); - ntripServer.send(mountpoint, rtcm); - }); + .ifPresent(rtcm -> ntripServer.send(mountpoint, rtcm)); } catch (Exception e) { logger.error("处理NTRIP数据失败, 挂载点: {}, 错误: {}", mountpoint, e.getMessage()); } } + /** + * 判断基站是否在线 + */ + private boolean isBaseStationOnline(String baseStationId) { + if (baseStationId == null) return false; + + BaseStationStatus status = baseStationStatusMap.get(baseStationId); + if (status == null) { + return false; + } + + LocalDateTime now = LocalDateTime.now(); + return status.getLastActiveTime() != null && + status.getLastActiveTime().isAfter(now.minusSeconds(BASE_STATION_OFFLINE_TIMEOUT)); + } /** - * 判断住基站是否在线 - * @param baseStation 基站 - * @return 是否在线 + * 判断基站设备是否在线(兼容原有方法) */ - - private boolean isBaseStationOnline(Device baseStation){ - if(baseStation == null) return false; + private boolean isBaseStationOnline(Device baseStation) { + if (baseStation == null) return false; LocalDateTime now = LocalDateTime.now(); return baseStation.getLastRxTime() != null && - baseStation.getLastRxTime().isAfter(now.minusMinutes(5)); // 从30分钟改为5分钟 + baseStation.getLastRxTime().isAfter(now.minusMinutes(5)); } -} + /** + * 通知状态变化 + */ + private void notifyStatusChange(String baseStationId, BaseStationStatusEnum oldStatus, BaseStationStatusEnum newStatus) { + for (BaseStationSwitchListener listener : switchListeners) { + try { + listener.onBaseStationStatusChange(baseStationId, oldStatus, newStatus); + } catch (Exception e) { + logger.error("Error notifying base station status change listener", e); + } + } + } + + /** + * 通知基站切换(可扩展为调用外部服务) + */ + private void notifyBaseStationSwitch(String deviceId, String fromBaseId, String toBaseId, String reason) { + logger.info("Sending external notification for base station switch: Device={}, From={}, To={}, Reason={}", + deviceId, fromBaseId, toBaseId, reason); + } + + /** + * 通知基站状态变化(可扩展为调用外部服务) + */ + private void notifyBaseStationStatusChange(String baseStationId, BaseStationStatusEnum oldStatus, BaseStationStatusEnum newStatus) { + logger.info("Sending external notification for base station status change: BaseStation={}, From={}, To={}", + baseStationId, oldStatus, newStatus); + } + + /** + * 定期检查基站状态 + */ + @Scheduled(fixedRate = 30000) // 30秒执行一次 + public void checkBaseStationStatus() { + LocalDateTime now = LocalDateTime.now(); + + for (BaseStationStatus status : baseStationStatusMap.values()) { + if (status.getStatus() == BaseStationStatusEnum.ONLINE) { + // 检查是否超时离线 + if (status.getLastActiveTime().isBefore(now.minusSeconds(BASE_STATION_OFFLINE_TIMEOUT))) { + BaseStationStatusEnum oldStatus = status.getStatus(); + status.setStatus(BaseStationStatusEnum.OFFLINE); + + logger.warn("Base station {} went offline (timeout). Last active: {}", + status.getBaseStationId(), status.getLastActiveTime()); + + notifyStatusChange(status.getBaseStationId(), oldStatus, BaseStationStatusEnum.OFFLINE); + + // 触发设备切换到备用基站 + triggerDeviceSwitchToBackup(status.getBaseStationId()); + } + } + } + } + + /** + * 触发设备切换到备用基站 + */ + private void triggerDeviceSwitchToBackup(String offlineBaseId) { + ThreadManager.getFixedThreadPool().submit(() -> { + try { + BaseStationStatus status = baseStationStatusMap.get(offlineBaseId); + if (status != null) { + Set affectedDevices = new HashSet<>(status.getServingDevices()); + + for (String deviceId : affectedDevices) { + // 查找设备信息并触发切换 + Device device = deviceService.findByDeviceId(deviceId); + if (device != null) { + String newBaseId = selectOptimalBaseStation(device); + if (newBaseId != null && !newBaseId.equals(offlineBaseId)) { + switchDeviceToBaseStation(deviceId, newBaseId, + "Primary base station offline: " + offlineBaseId); + } else { + logger.warn("No suitable backup base station found for device {} (was using {})", + deviceId, offlineBaseId); + } + } + } + } + } catch (Exception e) { + logger.error("Error triggering device switch to backup base station", e); + } + }); + } + + /** + * 添加基站切换监听器 + */ + public static void addBaseStationSwitchListener(BaseStationSwitchListener listener) { + switchListeners.add(listener); + } + + /** + * 移除基站切换监听器 + */ + public static void removeBaseStationSwitchListener(BaseStationSwitchListener listener) { + switchListeners.remove(listener); + } + + /** + * 获取基站状态信息 + */ + public static BaseStationStatus getBaseStationStatus(String baseStationId) { + return baseStationStatusMap.get(baseStationId); + } + + /** + * 获取所有基站状态 + */ + public static Map getAllBaseStationStatus() { + return new HashMap<>(baseStationStatusMap); + } + + /** + * 获取设备当前使用的基站 + */ + public static String getDeviceCurrentBase(String deviceId) { + return deviceCurrentBaseMap.get(deviceId); + } + + /** + * 打印当前基站和设备状态(用于调试) + */ + @Scheduled(fixedRate = 300000) // 5分钟执行一次 + public void printStatusSummary() { + if (logger.isInfoEnabled() && !baseStationStatusMap.isEmpty()) { + StringBuilder summary = new StringBuilder(); + summary.append("\n=== Base Station Status Summary ===\n"); + + for (BaseStationStatus status : baseStationStatusMap.values()) { + summary.append(String.format("Base Station: %s, Status: %s, Last Active: %s, Serving Devices: %d\n", + status.getBaseStationId(), + status.getStatus(), + status.getLastActiveTime(), + status.getServingDevices().size())); + } + + summary.append("=== Device Base Mapping ===\n"); + for (Map.Entry entry : deviceCurrentBaseMap.entrySet()) { + summary.append(String.format("Device: %s -> Base Station: %s\n", + entry.getKey(), entry.getValue())); + } + + logger.info(summary.toString()); + } + } +} \ No newline at end of file