diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java index 6ac8f14c..ffaff53d 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/D331RtcmMessageExecutor.java @@ -128,6 +128,7 @@ public class D331RtcmMessageExecutor implements Executor String rtcm = ByteUtil.bytesToHexString(srcdata); sendToNtrip(id, rtcm); rtkClusterService.sendRtcm(id, rtcm); + // keep routing strictly by device ID; group aliasing is handled inside RtkClusterService/Scheduler } diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java index ed02cf98..83bad13d 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java @@ -22,6 +22,8 @@ public class GroupRtkScheduler { @Autowired private RtkrcvGroupMapper groupMapper; @Autowired private RtkrcvConfigService configService; + @Autowired private RtkClusterService rtkClusterService; + @Autowired private com.imdroid.secapi.dto.RtkrcvProfileMapper profileMapper; @Value("${rtkrcv.bin:rtkrcv}") private String rtkBinary; @@ -65,22 +67,34 @@ public class GroupRtkScheduler { resp.put("pid", rt.pid); resp.put("state", rt.state); return resp; } - // Ensure local endpoint for capturing OUT stream - if (rt.endpoint == null) { - rt.endpoint = new Endpoint(); - rt.endpoint.start(); - } - // Build a synthetic profile using group configuration + local OUT endpoints + // Ensure local cluster endpoint (shared with rtkClusterService) for both IN/OUT + int port = rtkClusterService.ensureGroupEndpoint(groupId); + rt.workPort = port; + String epPath = "127.0.0.1:" + port; + // Build a synthetic profile using group configuration + local endpoints RtkrcvProfile profile = new RtkrcvProfile(); profile.setDeviceId("group-" + groupId); profile.setGroupId(groupId); - String epPath = "127.0.0.1:" + rt.endpoint.getPort(); profile.setOutstr1Path(epPath); profile.setOutstr2Path(epPath); // also set inpstr1 to same endpoint to avoid dead tcpcli profile.setInpstr1Path(epPath); Path conf = configService.generateConfig(profile); rt.confPath = conf.toString(); + + // Alias all deviceIds in this group to the same endpoint, so D331 sendRtcm(id, rtcm) reaches here + try { + com.baomidou.mybatisplus.core.conditions.query.QueryWrapper qw = new com.baomidou.mybatisplus.core.conditions.query.QueryWrapper<>(); + qw.eq("group_id", groupId); + java.util.List profiles = profileMapper.selectList(qw); + if (profiles != null) { + for (com.imdroid.secapi.dto.RtkrcvProfile p : profiles) { + if (p.getDeviceId() != null) { + rtkClusterService.aliasKeyToGroup(groupId, p.getDeviceId()); + } + } + } + } catch (Exception ignore) {} ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString()); pb.directory(conf.getParent().toFile()); pb.redirectErrorStream(true); @@ -115,7 +129,7 @@ public class GroupRtkScheduler { boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS); if (!exited) rt.process.destroyForcibly(); rt.state = "stopped"; - if (rt.endpoint != null) { rt.endpoint.close(); rt.endpoint = null; } + // keep rtkClusterService endpoint for reuse; not closing here resp.put("code", 0); } catch (Exception e) { resp.put("code", 1); resp.put("msg", e.getMessage()); diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index f2e71b81..04246521 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -439,4 +439,34 @@ public class RtkClusterService implements ApplicationRunner { try { s.close(); } catch (IOException ignore) {} } } + + // Ensure a dedicated endpoint exists for a group key like "group-" and return its port + public int ensureGroupEndpoint(long groupId) { + String key = "group-" + groupId; + DeviceEndpoint ep = endpoints.get(key); + if (ep == null) { + // derive a stable port for group endpoints to avoid collisions + int port = basePort + 10000 + (int) (Math.floorMod(groupId, 50000)); + endpoints.computeIfAbsent(key, k -> new DeviceEndpoint(key, port, null)); + ep = endpoints.get(key); + ep.ensureStarted(); + LOGGER.info("Group endpoint ready for {} on 127.0.0.1:{}", key, port); + } else { + ep.ensureStarted(); + } + return ep.port; + } + + public void aliasKeyToGroup(long groupId, String key) { + String gkey = "group-" + groupId; + DeviceEndpoint ep = endpoints.get(gkey); + if (ep == null) { + int port = ensureGroupEndpoint(groupId); + ep = endpoints.get(gkey); + } + if (ep != null) { + endpoints.put(key, ep); + LOGGER.info("Aliased endpoint {} -> {}", key, gkey); + } + } }