From 76831fb1856080b9725403686753a9f4c57c1fed Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 29 Oct 2025 10:24:12 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=B0=9D=E8=AF=95=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=96=AD=E8=81=94=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sideslope/ntrip/RtkrcvConfigService.java | 26 +++- .../rtkcluster/RtkClusterService.java | 116 +++++++++--------- 2 files changed, 79 insertions(+), 63 deletions(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java index 7a8c34a1..ee63b81d 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java @@ -54,7 +54,8 @@ public class RtkrcvConfigService { replaced = replaceValueLine(replaced, "inpstr1-path", nz(profile.getInpstr1Path())); replaced = replaceValueLine(replaced, "inpstr2-path", nz(profile.getInpstr2Path())); replaced = replaceValueLine(replaced, "inpstr3-path", nz(profile.getInpstr3Path())); - replaced = replaceValueLine(replaced, "outstr1-path", nz(profile.getOutstr1Path())); + String resolvedOutPath = resolveOutPath(profile); + replaced = replaceValueLine(replaced, "outstr1-path", resolvedOutPath); // If local tcp endpoints are used (e.g., 127.0.0.1:port), force type to tcpcli if (looksLikeTcpEndpoint(profile.getInpstr1Path())) { replaced = replaceValueLine(replaced, "inpstr1-type", "tcpcli"); @@ -62,7 +63,7 @@ public class RtkrcvConfigService { replaced = replaceValueLine(replaced, "misc-timeout", "300000"); replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); } - if (looksLikeTcpEndpoint(profile.getOutstr1Path())) { + if (looksLikeTcpEndpoint(resolvedOutPath)) { replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli"); replaced = replaceValueLine(replaced, "misc-timeout", "300000"); replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); @@ -106,6 +107,27 @@ public class RtkrcvConfigService { return s == null ? "" : s; } + private String resolveOutPath(RtkrcvProfile profile) { + String candidate = nz(profile.getOutstr1Path()); + if (!candidate.isEmpty()) { + return candidate; + } + String inPath = nz(profile.getInpstr1Path()); + if (looksLikeTcpEndpoint(inPath)) { + String[] parts = inPath.split(":", 2); + if (parts.length == 2) { + String host = parts[0].trim(); + try { + int inPort = Integer.parseInt(parts[1].trim()); + return host + ":" + (inPort + 1); + } catch (NumberFormatException ignore) { + // fall through and return empty candidate + } + } + } + return candidate; + } + private boolean looksLikeTcpEndpoint(String path) { if (path == null) return false; String p = path.trim(); diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index d048c800..b682da2f 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -19,7 +19,6 @@ import java.io.*; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.LocalDateTime; @@ -51,6 +50,7 @@ public class RtkClusterService implements ApplicationRunner { private RtkrcvConfigService configService; private final Map endpoints = new ConcurrentHashMap<>(); + private final Map outEndpoints = new ConcurrentHashMap<>(); private final Map processes = new ConcurrentHashMap<>(); private final ExecutorService worker = Executors.newCachedThreadPool(r -> { @@ -67,10 +67,11 @@ public class RtkClusterService implements ApplicationRunner { LOGGER.info("No rtkrcv_profile records found. RtkCluster idle."); return; } - int slot = 1; + int slot = 0; for (RtkrcvProfile profile : profiles) { try { - int port = basePort + slot++; + int port = basePort + slot; + slot += 2; // reserve a pair (IN port, OUT port) bootstrapDevice(profile, port); } catch (Exception e) { LOGGER.error("Bootstrap device {} failed: {}", profile.getDeviceId(), e.getMessage(), e); @@ -82,13 +83,17 @@ public class RtkClusterService implements ApplicationRunner { String deviceId = profile.getDeviceId(); // 1) Start endpoint server (if not exists) endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort)); + outEndpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort + 1, true)); DeviceEndpoint ep = endpoints.get(deviceId); - ep.ensureStarted(); + DeviceEndpoint outEp = outEndpoints.get(deviceId); + ep.ensureInMode(); + outEp.ensureOutMode(); // 2) Update profile inp/out to local cluster port - String localPath = "127.0.0.1:" + workPort; - profile.setInpstr1Path(localPath); - profile.setOutstr1Path(localPath); + String localInPath = "127.0.0.1:" + workPort; + String localOutPath = "127.0.0.1:" + (workPort + 1); + profile.setInpstr1Path(localInPath); + profile.setOutstr1Path(localOutPath); profileMapper.updateById(profile); // 3) Generate config and start rtkrcv @@ -175,6 +180,7 @@ public class RtkClusterService implements ApplicationRunner { static class DeviceEndpoint { private final int port; + private final boolean outMode; private final ExecutorService exec; private volatile boolean started = false; private volatile ServerSocket server; @@ -183,7 +189,12 @@ public class RtkClusterService implements ApplicationRunner { private final java.util.concurrent.LinkedBlockingDeque rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); DeviceEndpoint(int port) { + this(port, false); + } + + DeviceEndpoint(int port, boolean outMode) { this.port = port; + this.outMode = outMode; this.exec = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "rtkcluster-ep-" + this.port); t.setDaemon(true); @@ -194,11 +205,28 @@ public class RtkClusterService implements ApplicationRunner { synchronized void ensureStarted() { if (started) return; exec.submit(this::acceptLoop); - exec.submit(this::dequeueLoop); + if (!outMode) { + exec.submit(this::dequeueLoop); + } started = true; } + void ensureInMode() { + if (outMode) { + throw new IllegalStateException("Endpoint is configured as OUT mode: " + port); + } + ensureStarted(); + } + + void ensureOutMode() { + if (!outMode) { + throw new IllegalStateException("Endpoint is configured as IN mode: " + port); + } + ensureStarted(); + } + void enqueueRtcm(byte[] data) { + if (outMode) return; if (data == null || data.length == 0) return; if (!rtcmQueue.offerLast(data)) { // queue full: drop oldest to keep stream fresh @@ -212,7 +240,7 @@ public class RtkClusterService implements ApplicationRunner { ss.setReuseAddress(true); ss.bind(new InetSocketAddress("127.0.0.1", port)); this.server = ss; - LOGGER.info("RtkCluster device endpoint listening on 127.0.0.1:{}", port); + LOGGER.info("RtkCluster {} endpoint listening on 127.0.0.1:{}", outMode ? "OUT" : "IN", port); while (true) { Socket s = ss.accept(); classifyConnection(s); @@ -228,54 +256,29 @@ public class RtkClusterService implements ApplicationRunner { try { s.setTcpNoDelay(true); s.setKeepAlive(true); - s.setSoTimeout(1500); - InputStream in = s.getInputStream(); - byte[] probe = new byte[256]; - int n; - try { - n = in.read(probe); - } catch (SocketTimeoutException timeout) { - n = 0; // treat as no immediate data - } catch (IOException readError) { - LOGGER.debug("Endpoint {} initial read failed: {}", port, readError.getMessage()); - closeQuietly(s); - return; - } finally { - s.setSoTimeout(0); - } - - if (n < 0) { - LOGGER.debug("Endpoint {} connection closed during probe", port); - return; - } - - if (n > 0 && isLikelyText(probe, n)) { + if (outMode) { Socket previous = outConn; - if (!isSocketAlive(previous)) { - outConn = s; - assigned = true; - LOGGER.debug("Endpoint {} OUT connected", port); - pumpOut(s, probe, n); - } else { + if (isSocketAlive(previous)) { LOGGER.info("Endpoint {} replacing existing OUT connection", port); closeQuietly(previous); - outConn = s; - assigned = true; - pumpOut(s, probe, n); - } - } else { - Socket previous = inConn; - if (!isSocketAlive(previous)) { - inConn = s; - assigned = true; - LOGGER.debug("Endpoint {} IN connected", port); } else { - LOGGER.info("Endpoint {} replacing existing IN connection", port); - closeQuietly(previous); - inConn = s; - assigned = true; + LOGGER.debug("Endpoint {} OUT connected", port); } + outConn = s; + assigned = true; + pumpOut(s, null, 0); + return; } + + Socket previous = inConn; + if (isSocketAlive(previous)) { + LOGGER.info("Endpoint {} replacing existing IN connection", port); + closeQuietly(previous); + } else { + LOGGER.debug("Endpoint {} IN connected", port); + } + inConn = s; + assigned = true; } catch (IOException e) { LOGGER.warn("classifyConnection error: {}", e.getMessage()); } finally { @@ -286,22 +289,13 @@ public class RtkClusterService implements ApplicationRunner { }); } - private boolean isLikelyText(byte[] buf, int n) { - int printable = 0; - for (int i = 0; i < n; i++) { - int b = buf[i] & 0xFF; - if (b >= 32 && b <= 126) printable++; - } - return printable >= Math.max(1, n - 4); // tolerate some non-printables - } - private void pumpOut(Socket s, byte[] firstBuf, int firstLen) { exec.submit(() -> { try (InputStream in = s.getInputStream()) { byte[] buf = new byte[2048]; int read; // deliver first classified bytes if any - if (firstLen > 0) { + if (firstBuf != null && firstLen > 0) { String preview = new String(firstBuf, 0, firstLen, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); LOGGER.info("[OUT:{}] {}", port, preview); }