fix: 修正TCP端口

This commit is contained in:
yarnom 2025-10-30 16:45:45 +08:00
parent 99ddec40b1
commit da18e9a495
3 changed files with 53 additions and 8 deletions

View File

@ -128,6 +128,7 @@ public class D331RtcmMessageExecutor implements Executor<D331RtcmMessage, Void>
String rtcm = ByteUtil.bytesToHexString(srcdata); String rtcm = ByteUtil.bytesToHexString(srcdata);
sendToNtrip(id, rtcm); sendToNtrip(id, rtcm);
rtkClusterService.sendRtcm(id, rtcm); rtkClusterService.sendRtcm(id, rtcm);
// keep routing strictly by device ID; group aliasing is handled inside RtkClusterService/Scheduler
} }

View File

@ -22,6 +22,8 @@ public class GroupRtkScheduler {
@Autowired private RtkrcvGroupMapper groupMapper; @Autowired private RtkrcvGroupMapper groupMapper;
@Autowired private RtkrcvConfigService configService; @Autowired private RtkrcvConfigService configService;
@Autowired private RtkClusterService rtkClusterService;
@Autowired private com.imdroid.secapi.dto.RtkrcvProfileMapper profileMapper;
@Value("${rtkrcv.bin:rtkrcv}") @Value("${rtkrcv.bin:rtkrcv}")
private String rtkBinary; private String rtkBinary;
@ -65,22 +67,34 @@ public class GroupRtkScheduler {
resp.put("pid", rt.pid); resp.put("state", rt.state); resp.put("pid", rt.pid); resp.put("state", rt.state);
return resp; return resp;
} }
// Ensure local endpoint for capturing OUT stream // Ensure local cluster endpoint (shared with rtkClusterService) for both IN/OUT
if (rt.endpoint == null) { int port = rtkClusterService.ensureGroupEndpoint(groupId);
rt.endpoint = new Endpoint(); rt.workPort = port;
rt.endpoint.start(); String epPath = "127.0.0.1:" + port;
} // Build a synthetic profile using group configuration + local endpoints
// Build a synthetic profile using group configuration + local OUT endpoints
RtkrcvProfile profile = new RtkrcvProfile(); RtkrcvProfile profile = new RtkrcvProfile();
profile.setDeviceId("group-" + groupId); profile.setDeviceId("group-" + groupId);
profile.setGroupId(groupId); profile.setGroupId(groupId);
String epPath = "127.0.0.1:" + rt.endpoint.getPort();
profile.setOutstr1Path(epPath); profile.setOutstr1Path(epPath);
profile.setOutstr2Path(epPath); profile.setOutstr2Path(epPath);
// also set inpstr1 to same endpoint to avoid dead tcpcli // also set inpstr1 to same endpoint to avoid dead tcpcli
profile.setInpstr1Path(epPath); profile.setInpstr1Path(epPath);
Path conf = configService.generateConfig(profile); Path conf = configService.generateConfig(profile);
rt.confPath = conf.toString(); rt.confPath = conf.toString();
// Alias all deviceIds in this group to the same endpoint, so D331 sendRtcm(id, rtcm) reaches here
try {
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper<com.imdroid.secapi.dto.RtkrcvProfile> qw = new com.baomidou.mybatisplus.core.conditions.query.QueryWrapper<>();
qw.eq("group_id", groupId);
java.util.List<com.imdroid.secapi.dto.RtkrcvProfile> profiles = profileMapper.selectList(qw);
if (profiles != null) {
for (com.imdroid.secapi.dto.RtkrcvProfile p : profiles) {
if (p.getDeviceId() != null) {
rtkClusterService.aliasKeyToGroup(groupId, p.getDeviceId());
}
}
}
} catch (Exception ignore) {}
ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString()); ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString());
pb.directory(conf.getParent().toFile()); pb.directory(conf.getParent().toFile());
pb.redirectErrorStream(true); pb.redirectErrorStream(true);
@ -115,7 +129,7 @@ public class GroupRtkScheduler {
boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS); boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS);
if (!exited) rt.process.destroyForcibly(); if (!exited) rt.process.destroyForcibly();
rt.state = "stopped"; rt.state = "stopped";
if (rt.endpoint != null) { rt.endpoint.close(); rt.endpoint = null; } // keep rtkClusterService endpoint for reuse; not closing here
resp.put("code", 0); resp.put("code", 0);
} catch (Exception e) { } catch (Exception e) {
resp.put("code", 1); resp.put("msg", e.getMessage()); resp.put("code", 1); resp.put("msg", e.getMessage());

View File

@ -439,4 +439,34 @@ public class RtkClusterService implements ApplicationRunner {
try { s.close(); } catch (IOException ignore) {} try { s.close(); } catch (IOException ignore) {}
} }
} }
// Ensure a dedicated endpoint exists for a group key like "group-<id>" and return its port
public int ensureGroupEndpoint(long groupId) {
String key = "group-" + groupId;
DeviceEndpoint ep = endpoints.get(key);
if (ep == null) {
// derive a stable port for group endpoints to avoid collisions
int port = basePort + 10000 + (int) (Math.floorMod(groupId, 50000));
endpoints.computeIfAbsent(key, k -> new DeviceEndpoint(key, port, null));
ep = endpoints.get(key);
ep.ensureStarted();
LOGGER.info("Group endpoint ready for {} on 127.0.0.1:{}", key, port);
} else {
ep.ensureStarted();
}
return ep.port;
}
public void aliasKeyToGroup(long groupId, String key) {
String gkey = "group-" + groupId;
DeviceEndpoint ep = endpoints.get(gkey);
if (ep == null) {
int port = ensureGroupEndpoint(groupId);
ep = endpoints.get(gkey);
}
if (ep != null) {
endpoints.put(key, ep);
LOGGER.info("Aliased endpoint {} -> {}", key, gkey);
}
}
} }