完善基站切换

This commit is contained in:
zms 2025-07-08 17:20:41 +08:00
parent 4ceac73519
commit 9913b8a732

View File

@ -21,14 +21,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Layton
* @date 2023/2/2 20:49
@ -38,7 +37,14 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final Map<String ,Boolean> deviceBackupStatus = new ConcurrentHashMap<>();
// 基站状态管理
private static final Map<String, BaseStationStatus> baseStationStatusMap = new ConcurrentHashMap<>();
// 测站当前使用的基站映射
private static final Map<String, String> deviceCurrentBaseMap = new ConcurrentHashMap<>();
// 基站切换监听器列表
private static final List<BaseStationSwitchListener> switchListeners = new ArrayList<>();
@Autowired
private DeviceService deviceService;
@ -53,33 +59,124 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
private final Map<String, Long> lastD300ForwardTimeMap = new ConcurrentHashMap<>();
private static final long D300_FORWARD_INTERVAL = 5000; // 5秒单位毫秒
private static final Map<String, Long> deviceLastTime = new ConcurrentHashMap<>();
private static final long LOG_INTERVAL = 10 * 60 * 1000L;
// 基站在线检查间隔 - 修改为秒
private static final long BASE_STATION_CHECK_INTERVAL = 30; // 30秒
private static final long BASE_STATION_OFFLINE_TIMEOUT = 5 * 60; // 5分钟单位
/**
* 基站状态枚举
*/
public enum BaseStationStatusEnum {
ONLINE,
OFFLINE,
SWITCHING
}
/**
* 基站状态信息
*/
public static class BaseStationStatus {
private String baseStationId;
private BaseStationStatusEnum status;
private LocalDateTime lastActiveTime;
private LocalDateTime statusChangeTime;
private Set<String> servingDevices; // 当前服务的测站列表
public BaseStationStatus(String baseStationId) {
this.baseStationId = baseStationId;
this.status = BaseStationStatusEnum.OFFLINE;
this.lastActiveTime = LocalDateTime.now();
this.statusChangeTime = LocalDateTime.now();
this.servingDevices = ConcurrentHashMap.newKeySet();
}
// Getters and Setters
public String getBaseStationId() { return baseStationId; }
public BaseStationStatusEnum getStatus() { return status; }
public void setStatus(BaseStationStatusEnum status) {
if (this.status != status) {
this.status = status;
this.statusChangeTime = LocalDateTime.now();
}
}
public LocalDateTime getLastActiveTime() { return lastActiveTime; }
public void setLastActiveTime(LocalDateTime lastActiveTime) { this.lastActiveTime = lastActiveTime; }
public LocalDateTime getStatusChangeTime() { return statusChangeTime; }
public Set<String> getServingDevices() { return servingDevices; }
}
/**
* 基站切换监听器接口
*/
public interface BaseStationSwitchListener {
void onBaseStationSwitch(String deviceId, String fromBaseId, String toBaseId, String reason);
void onBaseStationStatusChange(String baseStationId, BaseStationStatusEnum oldStatus, BaseStationStatusEnum newStatus);
}
/**
* 默认的基站切换监听器实现
*/
private class DefaultBaseStationSwitchListener implements BaseStationSwitchListener {
@Override
public void onBaseStationSwitch(String deviceId, String fromBaseId, String toBaseId, String reason) {
logger.info("Base station switch - Device: {}, From: {} To: {}, Reason: {}",
deviceId, fromBaseId, toBaseId, reason);
ThreadManager.getFixedThreadPool().submit(() -> {
try {
notifyBaseStationSwitch(deviceId, fromBaseId, toBaseId, reason);
} catch (Exception e) {
logger.error("Failed to send base station switch notification", e);
}
});
}
@Override
public void onBaseStationStatusChange(String baseStationId, BaseStationStatusEnum oldStatus, BaseStationStatusEnum newStatus) {
logger.info("Base station status change - BaseStation: {}, From: {} To: {}",
baseStationId, oldStatus, newStatus);
ThreadManager.getFixedThreadPool().submit(() -> {
try {
notifyBaseStationStatusChange(baseStationId, oldStatus, newStatus);
} catch (Exception e) {
logger.error("Failed to send base station status notification", e);
}
});
}
}
// 初始化默认监听器
@PostConstruct
public void init() {
switchListeners.add(new DefaultBaseStationSwitchListener());
logger.info("D331RtcmMessageExecutor initialized");
}
@Override
public Void execute(D331RtcmMessage message) {
String id = message.getId();
// 更新基站状态
updateBaseStationStatus(id);
// 补齐tenantId
Device deviceBs = deviceService.findByDeviceId(id);
if(deviceBs == null || deviceBs.getOpMode() == GnssDevice.OP_MODE_UNUSE || deviceBs.getOpMode() == GnssDevice.OP_MODE_CHECK) {
logger.warn("基站 {} 不存在或未启用 (opMode: {})",
id, deviceBs != null ? deviceBs.getOpMode() : "null");
return null;
}
// 推送基站数据
if(deviceBs.getOpMode() == GnssDevice.OP_MODE_USE || deviceBs.getOpMode() == null) {
if(deviceBs.getOpMode() == GnssDevice.OP_MODE_USE) {
byte[] forwardBytes = message.getSrcData();
// 获取使用该基站(包括作为主基站和备选基站)的所有测站
List<Device> primaryDevices = deviceService.findByParentId(id);
List<Device> backupDevices = deviceService.findByParentId1(id);
// 添加调试日志
logger.info("基站 {} 状态: opMode={}, lastRxTime={}, d3xxCount={}",
id, deviceBs.getOpMode(), deviceBs.getLastRxTime(), deviceBs.getD3xxCount());
if (!primaryDevices.isEmpty() || !backupDevices.isEmpty()) {
logger.debug("基站 {} 关联的主站设备数: {}, 备站设备数: {}",
id, primaryDevices.size(), backupDevices.size());
}
// 合并两个列表
List<Device> allDevices = new ArrayList<>();
allDevices.addAll(primaryDevices);
@ -87,69 +184,35 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
DeviceChannel deviceChannel = null;
for (Device device : allDevices) {
<<<<<<< Updated upstream
if (device.getOpMode() != GnssDevice.OP_MODE_USE) continue;
if ((device.getModel()==GnssDevice.MODEL_G510) &&
(device.getFixedNum()>100) && (device.getGnssSampleRate()>1)
&& (deviceBs.getD3xxCount()%device.getGnssSampleRate()) != 0) {
//if(!UBXUtil.has1005(forwardBytes)) continue; //1005必推
=======
if (device.getOpMode() != GnssDevice.OP_MODE_USE) {
>>>>>>> Stashed changes
continue;
}
String deviceId = device.getDeviceId();
// 检查该设备是否应该接收此基站的数据
String primaryBaseId = device.getParentId();
String backupBaseId = device.getParentId1();
// 智能基站选择和切换
String selectedBaseId = selectOptimalBaseStation(device);
// 添加调试日志
logger.info("测站 {} 配置信息: 主基站={}, 备用基站={}, 当前数据来自基站={}",
deviceId, primaryBaseId, backupBaseId, id);
// 如果当前基站是该设备的备选基站需要检查主基站是否离线
if (id.equals(backupBaseId)) {
Device primaryBase = deviceService.findByDeviceId(primaryBaseId);
// 添加调试日志
if (primaryBase != null) {
logger.info("测站 {} 的主基站 {} 状态: lastRxTime={}, opMode={}",
deviceId, primaryBaseId,
primaryBase.getLastRxTime(),
primaryBase.getOpMode());
} else {
logger.warn("测站 {} 的主基站 {} 未找到或已删除", deviceId, primaryBaseId);
if (selectedBaseId == null || !selectedBaseId.equals(id)) {
continue; // 当前基站不是该设备的最优选择
}
// 如果主基站仍然在线则跳过备选基站的数据
if (primaryBase != null && isBaseStationOnline(primaryBase)) {
if(deviceBackupStatus.remove(deviceId) != null){
logger.info("设备 {} 从备用基站 {} 切换回主基站 {}",
deviceId, id, primaryBaseId);
}
logger.debug("主基站在线,跳过备用基站数据");
// 验证兼容性双重检查
if (!isBaseStationCompatibleWithDevice(selectedBaseId, device)) {
logger.warn("Base station {} selected but not compatible with device {}, skipping",
selectedBaseId, deviceId);
continue;
} else {
String hexPrimaryBase = String.format("%06x",Integer.parseInt(primaryBaseId));
if(deviceBackupStatus.putIfAbsent(deviceId,true) == null){
logger.info("设备 {} 从主基站 {} 切换到备用基站 {} (原因: {})",
deviceId, primaryBaseId, id,
primaryBase == null ? "主基站不存在" : "主基站离线");
}
byte[] modifyData = forwardBytes.clone();
modifyData[5] = (byte) Integer.parseInt(hexPrimaryBase.substring(0,2),16);
modifyData[6] = (byte) Integer.parseInt(hexPrimaryBase.substring(2,4),16);
modifyData[7] = (byte) Integer.parseInt(hexPrimaryBase.substring(4,6),16);
forwardBytes = modifyData;
logger.debug("使用备用基站数据但保持主基站ID: {}", primaryBaseId);
}
} else if (id.equals(primaryBaseId)) {
// 如果是主基站确保清除备用状态
if(deviceBackupStatus.remove(deviceId) != null) {
logger.info("设备 {} 恢复使用主基站 {}", deviceId, id);
}
}
// 添加日志显示当前测站使用的基站
logger.info("测站 {} 当前使用的基站: {}", deviceId, id);
// 获取要转发的数据可能需要修改基站ID
byte[] deviceForwardBytes = prepareForwardData(forwardBytes, device, selectedBaseId);
// 获取设备通道并发送数据
if(device.getDataChannelType() == Device.CHANNEL_TYPE_UDP) {
@ -158,149 +221,384 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
deviceChannel = OnlineChannels.INSTANCE.getConfigChannel(deviceId);
}
// 读取数据库中model字段判断基站类型
Short baseStationModel = deviceBs.getModel();
// 如果model为null使用默认值0
// 根据基站和设备类型处理数据转发
if (shouldForwardData(deviceBs, device)) {
forwardDataToDevice(deviceChannel, deviceForwardBytes, device, deviceBs, id, deviceId);
}
}
}
// 处理设备状态和统计信息
processDeviceStatistics(deviceBs, message);
// 添加NTRIP处理
byte[] srcdata = message.getSrcData();
String rtcm = ByteUtil.bytesToHexString(srcdata);
sendToNtrip(id, rtcm);
// 日志记录
logMessageIfNeeded(deviceBs, message);
return null;
}
/**
* 更新基站状态
*/
private void updateBaseStationStatus(String baseStationId) {
BaseStationStatus status = baseStationStatusMap.computeIfAbsent(baseStationId,
k -> new BaseStationStatus(baseStationId));
BaseStationStatusEnum oldStatus = status.getStatus();
status.setLastActiveTime(LocalDateTime.now());
if (oldStatus == BaseStationStatusEnum.OFFLINE) {
status.setStatus(BaseStationStatusEnum.ONLINE);
logger.info("Base station {} changed status from {} to {}", baseStationId, oldStatus, BaseStationStatusEnum.ONLINE);
notifyStatusChange(baseStationId, oldStatus, BaseStationStatusEnum.ONLINE);
// 检查是否有设备需要切换回主基站
checkAndSwitchBackToPrimary(baseStationId);
}
}
/**
* 检查并切换回主基站
*/
private void checkAndSwitchBackToPrimary(String primaryBaseId) {
ThreadManager.getFixedThreadPool().submit(() -> {
try {
List<Device> devicesUsingPrimary = deviceService.findByParentId(primaryBaseId);
for (Device device : devicesUsingPrimary) {
String currentBase = deviceCurrentBaseMap.get(device.getDeviceId());
if (currentBase != null && !currentBase.equals(primaryBaseId)) {
// 设备当前使用备用基站需要切换回主基站
logger.info("Switching device {} back to primary base station {} from {}",
device.getDeviceId(), primaryBaseId, currentBase);
switchDeviceToBaseStation(device.getDeviceId(), primaryBaseId,
"Primary base station back online");
}
}
} catch (Exception e) {
logger.error("Error checking devices for primary base station switch back", e);
}
});
}
/**
* 智能选择最优基站
*/
private String selectOptimalBaseStation(Device device) {
String primaryBaseId = device.getParentId();
String backupBaseId = device.getParentId1();
String deviceId = device.getDeviceId();
// 获取当前使用的基站
String currentBaseId = deviceCurrentBaseMap.get(deviceId);
// 获取所有候选基站按优先级排序
List<String> candidateBases = getCandidateBaseStations(device);
for (String baseId : candidateBases) {
// 检查基站是否在线
boolean isOnline = isBaseStationOnline(baseId);
if (!isOnline) {
continue;
}
// 检查基站与设备的兼容性
if (!isBaseStationCompatibleWithDevice(baseId, device)) {
continue;
}
// 如果当前基站发生变化记录切换
if (currentBaseId != null && !currentBaseId.equals(baseId)) {
switchDeviceToBaseStation(deviceId, baseId,
String.format("Switch from %s to %s (priority/availability)", currentBaseId, baseId));
} else if (currentBaseId == null) {
// 首次连接
deviceCurrentBaseMap.put(deviceId, baseId);
BaseStationStatus status = baseStationStatusMap.get(baseId);
if (status != null) {
status.getServingDevices().add(deviceId);
}
}
return baseId;
}
logger.warn("No compatible and available base station found for device: {}", deviceId);
return null;
}
/**
* 检查基站与设备的兼容性
*/
private boolean isBaseStationCompatibleWithDevice(String baseStationId, Device device) {
// 获取基站设备信息
Device baseStation = deviceService.findByDeviceId(baseStationId);
if (baseStation == null) {
logger.warn("Base station {} not found", baseStationId);
return false;
}
// 检查基站是否处于可用状态
if (baseStation.getOpMode() != GnssDevice.OP_MODE_USE) {
return false;
}
// 使用已有的兼容性判断逻辑
return shouldForwardData(baseStation, device);
}
/**
* 获取候选基站列表按优先级排序
*/
private List<String> getCandidateBaseStations(Device device) {
List<String> candidates = new ArrayList<>();
// 主基站优先级最高
if (device.getParentId() != null) {
candidates.add(device.getParentId());
}
// 备用基站
if (device.getParentId1() != null) {
candidates.add(device.getParentId1());
}
return candidates;
}
/**
* 切换设备到指定基站
*/
private void switchDeviceToBaseStation(String deviceId, String newBaseId, String reason) {
String oldBaseId = deviceCurrentBaseMap.get(deviceId);
logger.info("Executing base station switch for device {}: {} -> {}, Reason: {}",
deviceId, oldBaseId, newBaseId, reason);
// 更新映射
deviceCurrentBaseMap.put(deviceId, newBaseId);
// 更新基站服务列表
if (oldBaseId != null) {
BaseStationStatus oldStatus = baseStationStatusMap.get(oldBaseId);
if (oldStatus != null) {
oldStatus.getServingDevices().remove(deviceId);
}
}
BaseStationStatus newStatus = baseStationStatusMap.get(newBaseId);
if (newStatus != null) {
newStatus.getServingDevices().add(deviceId);
}
// 通知监听器
for (BaseStationSwitchListener listener : switchListeners) {
try {
listener.onBaseStationSwitch(deviceId, oldBaseId, newBaseId, reason);
} catch (Exception e) {
logger.error("Error notifying base station switch listener", e);
}
}
}
/**
* 准备转发数据可能需要修改基站ID
*/
private byte[] prepareForwardData(byte[] originalData, Device device, String selectedBaseId) {
String primaryBaseId = device.getParentId();
// 如果选择的基站不是主基站需要修改RTCM数据中的基站ID
if (primaryBaseId != null && !selectedBaseId.equals(primaryBaseId)) {
try {
byte[] modifyData = originalData.clone();
// 修复使用Long.parseLong而不是Integer.parseInt因为基站ID可能很大
long primaryBaseIdLong = Long.parseLong(primaryBaseId);
String hexPrimaryBase = String.format("%06x", primaryBaseIdLong);
if (hexPrimaryBase.length() >= 6) {
modifyData[5] = (byte) Integer.parseInt(hexPrimaryBase.substring(0, 2), 16);
modifyData[6] = (byte) Integer.parseInt(hexPrimaryBase.substring(2, 4), 16);
modifyData[7] = (byte) Integer.parseInt(hexPrimaryBase.substring(4, 6), 16);
}
return modifyData;
} catch (NumberFormatException e) {
logger.error("Error parsing base station ID: {}", primaryBaseId, e);
return originalData;
}
}
return originalData;
}
/**
* 判断是否应该转发数据
* 基站类型说明
* - model = 1: f9p基站所有设备都可以接收
* - model = 0: 博通基站只有博通设备(model=0)和兼容设备(model=1)可以接收
*/
private boolean shouldForwardData(Device baseStation, Device device) {
Short baseStationModel = baseStation.getModel();
if (baseStationModel == null) {
baseStationModel = 0;
logger.warn("Base station model is null for device: {}, using default value 0", id);
}
Short deviceModel = device.getModel();
if (deviceModel == null) {
deviceModel = 0;
}
if (baseStationModel == 1) {
// 基站类型为1正常执行
if(deviceChannel != null && deviceChannel.isOnline()) {
if (logger.isDebugEnabled()) {
logger.debug("forward d331 rtcm from {} to device {}", id, deviceId);
}
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(forwardBytes);
deviceChannel.writeAndFlush(buf);
}
// f9p基站所有设备都可以接收
return true;
} else if (baseStationModel == 0) {
//logger.info("Base station model is 0 for device: {}", deviceId);
// 博通基站只支持特定设备
return (deviceModel == 0 || deviceModel == 1);
}
return false;
}
/**
* 转发数据到设备
*/
private void forwardDataToDevice(DeviceChannel deviceChannel, byte[] forwardBytes,
Device device, Device deviceBs, String baseStationId, String deviceId) {
if (deviceChannel == null || !deviceChannel.isOnline()) {
return;
}
Short deviceModel = device.getModel();
// 如果model为null使用默认值0
if (deviceModel == null) {
deviceModel = 0;
//logger.warn("Device model is null for device: {}, using default value 0", deviceId);
}
if(deviceModel == 0){
// 测站类型为0正常执行
if(deviceId.startsWith("2307")){
// 处理2307型号的测站
forwardBytes[2] = (byte) (forwardBytes[2] & 0x07);//兼容不带序号的测站
}
// 对所有测站类型为0的设备执行转发
if(deviceChannel != null && deviceChannel.isOnline()) {
if (logger.isDebugEnabled()) {
logger.debug("forward d331 rtcm from {} to device {}", id, deviceId);
logger.debug("forward d331 rtcm from {} to device {}", baseStationId, deviceId);
}
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(forwardBytes);
// 特殊处理设备类型
if (deviceModel == 0 && deviceId.startsWith("2307")) {
// 处理2307型号的测站
byte[] modifiedData = buf.array();
modifiedData[2] = (byte) (modifiedData[2] & 0x07); // 兼容不带序号的测站
} else if (deviceModel == 1) {
// 处理类型1的设备可能需要添加D300数据
buf = handleDeviceType1(buf, device, deviceBs, deviceId);
}
deviceChannel.writeAndFlush(buf);
}
}
else if(deviceModel == 1){
//logger.info("Device model is 1 for device: {}", deviceId);
if(deviceChannel != null && deviceChannel.isOnline()) {
//logger.info("Device channel is online for device: {}", deviceId);
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(forwardBytes);
// 检查是否满足10秒转发间隔只有满足条件时才添加D300字符串
/**
* 处理设备类型1的特殊逻辑
*/
private ByteBuf handleDeviceType1(ByteBuf buf, Device device, Device deviceBs, String deviceId) {
long currentTime = System.currentTimeMillis();
Long lastForwardTime = lastD300ForwardTimeMap.getOrDefault(deviceId, 0L);
if (currentTime - lastForwardTime >= D300_FORWARD_INTERVAL) {
//logger.info("Adding D300 string for device: {}", deviceId);
// 获取当前buf中的数据
byte[] originalData = buf.array();
String originalHex = ByteUtil.bytesToHexString(originalData);
// 找到D300和D301的位置
int d300Index = originalHex.indexOf("d300");
int d301Index = originalHex.indexOf("d301");
// 确定插入位置如果两个都存在取位置靠前的如果只存在一个就用那个位置
int insertIndex = -1;
if (d300Index != -1 && d301Index != -1) {
// 两个都存在取位置靠前的
insertIndex = (d300Index < d301Index) ? d300Index : d301Index;
//logger.info("Found both D300 and D301, D300 at {}, D301 at {}, will insert before position {}",
/// d300Index, d301Index, insertIndex);
insertIndex = Math.min(d300Index, d301Index);
} else if (d300Index != -1) {
insertIndex = d300Index;
//logger.info("Found D300 at position {}", d300Index);
} else if (d301Index != -1) {
insertIndex = d301Index;
//logger.info("Found D301 at position {}", d301Index);
}
if (insertIndex != -1) {
// 创建新的buf
ByteBuf newBuf = Unpooled.buffer();
// 写入D300/D301之前的数据
newBuf.writeBytes(originalData, 0, insertIndex / 2);
// 使用f9p坐标生成1005消息并插入
// 使用f9p坐标生成1005消息
double[] ecef = new double[3];
ecef[0] = deviceBs.getEcefx();
ecef[1] = deviceBs.getEcefy();
ecef[2] = deviceBs.getEcefz();
// 修复确保坐标值不为null并正确处理Double类型
Double ecefX = deviceBs.getEcefx();
Double ecefY = deviceBs.getEcefy();
Double ecefZ = deviceBs.getEcefz();
ecef[0] = ecefX != null ? ecefX.doubleValue() : 0.0;
ecef[1] = ecefY != null ? ecefY.doubleValue() : 0.0;
ecef[2] = ecefZ != null ? ecefZ.doubleValue() : 0.0;
String rtcm1005 = Rtcm1005.generateRtcm1005Hex(ecef, 0);
if (rtcm1005 != null) {
// 写入RTCM 1005消息
byte[] rtcm1005Bytes = ByteUtil.hexStringTobyte(rtcm1005);
newBuf.writeBytes(rtcm1005Bytes);
// logger.info("Generated RTCM 1005 message for base station {}: {}", deviceBs.getDeviceId(), rtcm1005);
} else {
//logger.warn("Failed to generate RTCM 1005 message for base station: {}", deviceBs.getDeviceId());
}
// 写入剩余的数据
newBuf.writeBytes(originalData, insertIndex / 2, originalData.length - insertIndex / 2);
// 更新buf
buf = newBuf;
// 添加日志记录插入位置和完整数据
// logger.info("Inserted RTCM 1005 message before position {}, complete data: {}",
// insertIndex,
// ByteUtil.bytesToHexString(buf.array()));
}
// 更新最后转发时间
lastD300ForwardTimeMap.put(deviceId, currentTime);
// 添加日志记录测站转发的完整数据
//logger.info("Forward data to device: {}, time: {}, complete data: {}",
// deviceId,
// LocalDateTime.now(),
// ByteUtil.bytesToHexString(buf.array()));
}
deviceChannel.writeAndFlush(buf);
}
}
}
}
return buf;
}
// 如果30分钟内收到不到d3f0和d3f2则根据UDP最后一个报文触发状态更新和统计
/**
* 处理设备统计信息
*/
private void processDeviceStatistics(Device deviceBs, D331RtcmMessage message) {
if (deviceBs.getD3xxbytes() > 0) {
LocalDateTime now = LocalDateTime.now();
if(deviceBs.getLastRxTime().isBefore(now.minusMinutes(1)) &&
if (deviceBs.getLastRxTime() != null &&
deviceBs.getLastRxTime().isBefore(now.minusMinutes(1)) &&
(deviceBs.getLastD3f2Time() == null ||
deviceBs.getLastD3f2Time().isBefore(now.minusMinutes(30)))) {
// new cycle
logger.info("device {} rx {} d331 in a cycle while not d3f0f2",deviceBs.getDeviceId(),deviceBs.getD3xxCount());
logger.info("device {} rx {} d331 in a cycle while not d3f0f2",
deviceBs.getDeviceId(), deviceBs.getD3xxCount());
Device lastCycleDevice = createLastCycleDevice(deviceBs);
deviceBs.clearStat();
ThreadManager.getFixedThreadPool().submit(() -> {
try {
beidouClient.onDeviceActive(deviceBs.getDeviceId(), deviceBs.getTenantId());
} catch (Exception e) {
logger.error("Error notifying device active", e);
}
dataPersistService.updateDeviceState(lastCycleDevice);
});
}
}
// update trx
deviceBs.updateRx(message.getHeader(), message.getLen(), message.getPacketNum());
// update gga
Gga gga = message.getGga();
if (gga != null) {
deviceBs.updateSatelitesNum(gga.getSatellitesInUsed());
// 修复正确处理Double类型的坐标
deviceBs.setLatitude(gga.getLatitude());
deviceBs.setLongitude(gga.getLongitude());
deviceBs.setAltitude(gga.getAltitude());
}
}
/**
* 创建上一周期设备信息
*/
private Device createLastCycleDevice(Device deviceBs) {
Device lastCycleDevice = new Device();
lastCycleDevice.setDeviceId(deviceBs.getDeviceId());
lastCycleDevice.setDeviceType(deviceBs.getDeviceType());
@ -313,44 +611,19 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
lastCycleDevice.setLastRxTime(deviceBs.getLastRxTime());
lastCycleDevice.setLastValidCalcDataTime(deviceBs.getLastValidCalcDataTime());
lastCycleDevice.setSatelitesInUse(deviceBs.getSatelitesInUse());
deviceBs.clearStat();
return lastCycleDevice;
}
/**
* 记录日志如果需要
*/
private void logMessageIfNeeded(Device deviceBs, D331RtcmMessage message) {
ThreadManager.getFixedThreadPool().submit(() -> {
// 通知上线
try {
beidouClient.onDeviceActive(deviceBs.getDeviceId(), deviceBs.getTenantId());
} catch (Exception e) {
logger.error(e.toString());
}
dataPersistService.updateDeviceState(lastCycleDevice);
});
}
}
// update trx
deviceBs.updateRx(message.getHeader(),message.getLen(),message.getPacketNum());
// update gga
Gga gga = message.getGga();
if(gga != null) {
deviceBs.updateSatelitesNum(gga.getSatellitesInUsed());
deviceBs.setLatitude(gga.getLatitude());
deviceBs.setLongitude(gga.getLongitude());
deviceBs.setAltitude(gga.getAltitude());
}
// 添加NTRIP处理
byte[] srcdata = message.getSrcData();
String rtcm = ByteUtil.bytesToHexString(srcdata);
sendToNtrip(id, rtcm);
ThreadManager.getFixedThreadPool().submit(() -> {
// 原始码流输出到日志文件 -- INFO 级别
// 只有测站开了日志记录或者消息来自基站才将原码记录到日志文件
if(deviceBs.getLoggingmode() == GnssDevice.LOGGING_MODE_FULL){
logger.info("receive {} d331 message: {}", message.getId(), DataTypeUtil.getHexString(message.getSrcData()));
if (deviceBs.getLoggingmode() != null && deviceBs.getLoggingmode() == GnssDevice.LOGGING_MODE_FULL) {
logger.info("receive {} d331 message: {}", message.getId(),
DataTypeUtil.getHexString(message.getSrcData()));
}
});
return null;
}
@Override
@ -358,38 +631,190 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
return D331RtcmMessage.class;
}
/**
* 发送数据到NTRIP
*/
private void sendToNtrip(String mountpoint, String hexData) {
try {
// 将原始字节转换为16进制字符串用于RTCM提取
//String hexData = ByteUtil.bytesToHexString(rawData);
//System.out.println(hexData);
// 提取RTCM数据并发送到NtripServer,使用设备ID作为挂载点
Optional.ofNullable(RtcmGgaUtil.getRtcms(hexData))
.ifPresent(rtcm -> {
//System.out.println("挂载点: " + mountpoint);
//System.out.println("RTCM数据: " + rtcm);
ntripServer.send(mountpoint, rtcm);
});
.ifPresent(rtcm -> ntripServer.send(mountpoint, rtcm));
} catch (Exception e) {
logger.error("处理NTRIP数据失败, 挂载点: {}, 错误: {}", mountpoint, e.getMessage());
}
}
/**
* 判断基站是否在线
*/
private boolean isBaseStationOnline(String baseStationId) {
if (baseStationId == null) return false;
BaseStationStatus status = baseStationStatusMap.get(baseStationId);
if (status == null) {
return false;
}
LocalDateTime now = LocalDateTime.now();
return status.getLastActiveTime() != null &&
status.getLastActiveTime().isAfter(now.minusSeconds(BASE_STATION_OFFLINE_TIMEOUT));
}
/**
* 判断住基站是否在线
* @param baseStation 基站
* @return 是否在线
* 判断基站设备是否在线兼容原有方法
*/
private boolean isBaseStationOnline(Device baseStation) {
if (baseStation == null) return false;
LocalDateTime now = LocalDateTime.now();
return baseStation.getLastRxTime() != null &&
baseStation.getLastRxTime().isAfter(now.minusMinutes(5)); // 从30分钟改为5分钟
baseStation.getLastRxTime().isAfter(now.minusMinutes(5));
}
/**
* 通知状态变化
*/
private void notifyStatusChange(String baseStationId, BaseStationStatusEnum oldStatus, BaseStationStatusEnum newStatus) {
for (BaseStationSwitchListener listener : switchListeners) {
try {
listener.onBaseStationStatusChange(baseStationId, oldStatus, newStatus);
} catch (Exception e) {
logger.error("Error notifying base station status change listener", e);
}
}
}
/**
* 通知基站切换可扩展为调用外部服务
*/
private void notifyBaseStationSwitch(String deviceId, String fromBaseId, String toBaseId, String reason) {
logger.info("Sending external notification for base station switch: Device={}, From={}, To={}, Reason={}",
deviceId, fromBaseId, toBaseId, reason);
}
/**
* 通知基站状态变化可扩展为调用外部服务
*/
private void notifyBaseStationStatusChange(String baseStationId, BaseStationStatusEnum oldStatus, BaseStationStatusEnum newStatus) {
logger.info("Sending external notification for base station status change: BaseStation={}, From={}, To={}",
baseStationId, oldStatus, newStatus);
}
/**
* 定期检查基站状态
*/
@Scheduled(fixedRate = 30000) // 30秒执行一次
public void checkBaseStationStatus() {
LocalDateTime now = LocalDateTime.now();
for (BaseStationStatus status : baseStationStatusMap.values()) {
if (status.getStatus() == BaseStationStatusEnum.ONLINE) {
// 检查是否超时离线
if (status.getLastActiveTime().isBefore(now.minusSeconds(BASE_STATION_OFFLINE_TIMEOUT))) {
BaseStationStatusEnum oldStatus = status.getStatus();
status.setStatus(BaseStationStatusEnum.OFFLINE);
logger.warn("Base station {} went offline (timeout). Last active: {}",
status.getBaseStationId(), status.getLastActiveTime());
notifyStatusChange(status.getBaseStationId(), oldStatus, BaseStationStatusEnum.OFFLINE);
// 触发设备切换到备用基站
triggerDeviceSwitchToBackup(status.getBaseStationId());
}
}
}
}
/**
* 触发设备切换到备用基站
*/
private void triggerDeviceSwitchToBackup(String offlineBaseId) {
ThreadManager.getFixedThreadPool().submit(() -> {
try {
BaseStationStatus status = baseStationStatusMap.get(offlineBaseId);
if (status != null) {
Set<String> affectedDevices = new HashSet<>(status.getServingDevices());
for (String deviceId : affectedDevices) {
// 查找设备信息并触发切换
Device device = deviceService.findByDeviceId(deviceId);
if (device != null) {
String newBaseId = selectOptimalBaseStation(device);
if (newBaseId != null && !newBaseId.equals(offlineBaseId)) {
switchDeviceToBaseStation(deviceId, newBaseId,
"Primary base station offline: " + offlineBaseId);
} else {
logger.warn("No suitable backup base station found for device {} (was using {})",
deviceId, offlineBaseId);
}
}
}
}
} catch (Exception e) {
logger.error("Error triggering device switch to backup base station", e);
}
});
}
/**
* 添加基站切换监听器
*/
public static void addBaseStationSwitchListener(BaseStationSwitchListener listener) {
switchListeners.add(listener);
}
/**
* 移除基站切换监听器
*/
public static void removeBaseStationSwitchListener(BaseStationSwitchListener listener) {
switchListeners.remove(listener);
}
/**
* 获取基站状态信息
*/
public static BaseStationStatus getBaseStationStatus(String baseStationId) {
return baseStationStatusMap.get(baseStationId);
}
/**
* 获取所有基站状态
*/
public static Map<String, BaseStationStatus> getAllBaseStationStatus() {
return new HashMap<>(baseStationStatusMap);
}
/**
* 获取设备当前使用的基站
*/
public static String getDeviceCurrentBase(String deviceId) {
return deviceCurrentBaseMap.get(deviceId);
}
/**
* 打印当前基站和设备状态用于调试
*/
@Scheduled(fixedRate = 300000) // 5分钟执行一次
public void printStatusSummary() {
if (logger.isInfoEnabled() && !baseStationStatusMap.isEmpty()) {
StringBuilder summary = new StringBuilder();
summary.append("\n=== Base Station Status Summary ===\n");
for (BaseStationStatus status : baseStationStatusMap.values()) {
summary.append(String.format("Base Station: %s, Status: %s, Last Active: %s, Serving Devices: %d\n",
status.getBaseStationId(),
status.getStatus(),
status.getLastActiveTime(),
status.getServingDevices().size()));
}
summary.append("=== Device Base Mapping ===\n");
for (Map.Entry<String, String> entry : deviceCurrentBaseMap.entrySet()) {
summary.append(String.format("Device: %s -> Base Station: %s\n",
entry.getKey(), entry.getValue()));
}
logger.info(summary.toString());
}
}
}