From 2a9ab127c56f1caf8d23496976c255f9fa574e4a Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 29 Oct 2025 10:34:52 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"fix:=20=E5=B0=9D=E8=AF=95=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=96=AD=E8=81=94=E9=97=AE=E9=A2=98"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 76831fb1856080b9725403686753a9f4c57c1fed. --- .../sideslope/ntrip/RtkrcvConfigService.java | 26 +--- .../rtkcluster/RtkClusterService.java | 118 +++++++++--------- 2 files changed, 64 insertions(+), 80 deletions(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java index ee63b81d..7a8c34a1 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java @@ -54,8 +54,7 @@ public class RtkrcvConfigService { replaced = replaceValueLine(replaced, "inpstr1-path", nz(profile.getInpstr1Path())); replaced = replaceValueLine(replaced, "inpstr2-path", nz(profile.getInpstr2Path())); replaced = replaceValueLine(replaced, "inpstr3-path", nz(profile.getInpstr3Path())); - String resolvedOutPath = resolveOutPath(profile); - replaced = replaceValueLine(replaced, "outstr1-path", resolvedOutPath); + replaced = replaceValueLine(replaced, "outstr1-path", nz(profile.getOutstr1Path())); // If local tcp endpoints are used (e.g., 127.0.0.1:port), force type to tcpcli if (looksLikeTcpEndpoint(profile.getInpstr1Path())) { replaced = replaceValueLine(replaced, "inpstr1-type", "tcpcli"); @@ -63,7 +62,7 @@ public class RtkrcvConfigService { replaced = replaceValueLine(replaced, "misc-timeout", "300000"); replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); } - if (looksLikeTcpEndpoint(resolvedOutPath)) { + if (looksLikeTcpEndpoint(profile.getOutstr1Path())) { replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli"); replaced = replaceValueLine(replaced, "misc-timeout", "300000"); replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); @@ -107,27 +106,6 @@ public class RtkrcvConfigService { return s == null ? "" : s; } - private String resolveOutPath(RtkrcvProfile profile) { - String candidate = nz(profile.getOutstr1Path()); - if (!candidate.isEmpty()) { - return candidate; - } - String inPath = nz(profile.getInpstr1Path()); - if (looksLikeTcpEndpoint(inPath)) { - String[] parts = inPath.split(":", 2); - if (parts.length == 2) { - String host = parts[0].trim(); - try { - int inPort = Integer.parseInt(parts[1].trim()); - return host + ":" + (inPort + 1); - } catch (NumberFormatException ignore) { - // fall through and return empty candidate - } - } - } - return candidate; - } - private boolean looksLikeTcpEndpoint(String path) { if (path == null) return false; String p = path.trim(); diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index b682da2f..d048c800 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -19,6 +19,7 @@ import java.io.*; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.LocalDateTime; @@ -50,7 +51,6 @@ public class RtkClusterService implements ApplicationRunner { private RtkrcvConfigService configService; private final Map endpoints = new ConcurrentHashMap<>(); - private final Map outEndpoints = new ConcurrentHashMap<>(); private final Map processes = new ConcurrentHashMap<>(); private final ExecutorService worker = Executors.newCachedThreadPool(r -> { @@ -67,11 +67,10 @@ public class RtkClusterService implements ApplicationRunner { LOGGER.info("No rtkrcv_profile records found. RtkCluster idle."); return; } - int slot = 0; + int slot = 1; for (RtkrcvProfile profile : profiles) { try { - int port = basePort + slot; - slot += 2; // reserve a pair (IN port, OUT port) + int port = basePort + slot++; bootstrapDevice(profile, port); } catch (Exception e) { LOGGER.error("Bootstrap device {} failed: {}", profile.getDeviceId(), e.getMessage(), e); @@ -83,17 +82,13 @@ public class RtkClusterService implements ApplicationRunner { String deviceId = profile.getDeviceId(); // 1) Start endpoint server (if not exists) endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort)); - outEndpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort + 1, true)); DeviceEndpoint ep = endpoints.get(deviceId); - DeviceEndpoint outEp = outEndpoints.get(deviceId); - ep.ensureInMode(); - outEp.ensureOutMode(); + ep.ensureStarted(); // 2) Update profile inp/out to local cluster port - String localInPath = "127.0.0.1:" + workPort; - String localOutPath = "127.0.0.1:" + (workPort + 1); - profile.setInpstr1Path(localInPath); - profile.setOutstr1Path(localOutPath); + String localPath = "127.0.0.1:" + workPort; + profile.setInpstr1Path(localPath); + profile.setOutstr1Path(localPath); profileMapper.updateById(profile); // 3) Generate config and start rtkrcv @@ -180,7 +175,6 @@ public class RtkClusterService implements ApplicationRunner { static class DeviceEndpoint { private final int port; - private final boolean outMode; private final ExecutorService exec; private volatile boolean started = false; private volatile ServerSocket server; @@ -189,12 +183,7 @@ public class RtkClusterService implements ApplicationRunner { private final java.util.concurrent.LinkedBlockingDeque rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); DeviceEndpoint(int port) { - this(port, false); - } - - DeviceEndpoint(int port, boolean outMode) { this.port = port; - this.outMode = outMode; this.exec = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "rtkcluster-ep-" + this.port); t.setDaemon(true); @@ -205,28 +194,11 @@ public class RtkClusterService implements ApplicationRunner { synchronized void ensureStarted() { if (started) return; exec.submit(this::acceptLoop); - if (!outMode) { - exec.submit(this::dequeueLoop); - } + exec.submit(this::dequeueLoop); started = true; } - void ensureInMode() { - if (outMode) { - throw new IllegalStateException("Endpoint is configured as OUT mode: " + port); - } - ensureStarted(); - } - - void ensureOutMode() { - if (!outMode) { - throw new IllegalStateException("Endpoint is configured as IN mode: " + port); - } - ensureStarted(); - } - void enqueueRtcm(byte[] data) { - if (outMode) return; if (data == null || data.length == 0) return; if (!rtcmQueue.offerLast(data)) { // queue full: drop oldest to keep stream fresh @@ -240,7 +212,7 @@ public class RtkClusterService implements ApplicationRunner { ss.setReuseAddress(true); ss.bind(new InetSocketAddress("127.0.0.1", port)); this.server = ss; - LOGGER.info("RtkCluster {} endpoint listening on 127.0.0.1:{}", outMode ? "OUT" : "IN", port); + LOGGER.info("RtkCluster device endpoint listening on 127.0.0.1:{}", port); while (true) { Socket s = ss.accept(); classifyConnection(s); @@ -256,29 +228,54 @@ public class RtkClusterService implements ApplicationRunner { try { s.setTcpNoDelay(true); s.setKeepAlive(true); - if (outMode) { - Socket previous = outConn; - if (isSocketAlive(previous)) { - LOGGER.info("Endpoint {} replacing existing OUT connection", port); - closeQuietly(previous); - } else { - LOGGER.debug("Endpoint {} OUT connected", port); - } - outConn = s; - assigned = true; - pumpOut(s, null, 0); + s.setSoTimeout(1500); + InputStream in = s.getInputStream(); + byte[] probe = new byte[256]; + int n; + try { + n = in.read(probe); + } catch (SocketTimeoutException timeout) { + n = 0; // treat as no immediate data + } catch (IOException readError) { + LOGGER.debug("Endpoint {} initial read failed: {}", port, readError.getMessage()); + closeQuietly(s); + return; + } finally { + s.setSoTimeout(0); + } + + if (n < 0) { + LOGGER.debug("Endpoint {} connection closed during probe", port); return; } - Socket previous = inConn; - if (isSocketAlive(previous)) { - LOGGER.info("Endpoint {} replacing existing IN connection", port); - closeQuietly(previous); + if (n > 0 && isLikelyText(probe, n)) { + Socket previous = outConn; + if (!isSocketAlive(previous)) { + outConn = s; + assigned = true; + LOGGER.debug("Endpoint {} OUT connected", port); + pumpOut(s, probe, n); + } else { + LOGGER.info("Endpoint {} replacing existing OUT connection", port); + closeQuietly(previous); + outConn = s; + assigned = true; + pumpOut(s, probe, n); + } } else { - LOGGER.debug("Endpoint {} IN connected", port); + Socket previous = inConn; + if (!isSocketAlive(previous)) { + inConn = s; + assigned = true; + LOGGER.debug("Endpoint {} IN connected", port); + } else { + LOGGER.info("Endpoint {} replacing existing IN connection", port); + closeQuietly(previous); + inConn = s; + assigned = true; + } } - inConn = s; - assigned = true; } catch (IOException e) { LOGGER.warn("classifyConnection error: {}", e.getMessage()); } finally { @@ -289,13 +286,22 @@ public class RtkClusterService implements ApplicationRunner { }); } + private boolean isLikelyText(byte[] buf, int n) { + int printable = 0; + for (int i = 0; i < n; i++) { + int b = buf[i] & 0xFF; + if (b >= 32 && b <= 126) printable++; + } + return printable >= Math.max(1, n - 4); // tolerate some non-printables + } + private void pumpOut(Socket s, byte[] firstBuf, int firstLen) { exec.submit(() -> { try (InputStream in = s.getInputStream()) { byte[] buf = new byte[2048]; int read; // deliver first classified bytes if any - if (firstBuf != null && firstLen > 0) { + if (firstLen > 0) { String preview = new String(firstBuf, 0, firstLen, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); LOGGER.info("[OUT:{}] {}", port, preview); }