From e07250637e5a7777e007256f692d6874c86ad656 Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 29 Oct 2025 09:48:34 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=B0=9D=E8=AF=95=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=96=AD=E8=81=94=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rtkcluster/RtkClusterService.java | 76 +++++++++++++------ 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index 4212808c..1361042f 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -19,6 +19,7 @@ import java.io.*; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.LocalDateTime; @@ -223,34 +224,55 @@ public class RtkClusterService implements ApplicationRunner { private void classifyConnection(Socket s) { exec.submit(() -> { + boolean assigned = false; try { - // short probe to classify role (increase to 1000ms to reduce misclassification) - s.setSoTimeout(1000); + s.setTcpNoDelay(true); + s.setKeepAlive(true); + s.setSoTimeout(1500); InputStream in = s.getInputStream(); byte[] probe = new byte[256]; - int n = 0; - try { n = in.read(probe); } catch (IOException ignore) {} - // restore to blocking mode for steady-state - s.setSoTimeout(0); + int n; + try { + n = in.read(probe); + } catch (SocketTimeoutException timeout) { + n = 0; // treat as no immediate data + } catch (IOException readError) { + LOGGER.debug("Endpoint {} initial read failed: {}", port, readError.getMessage()); + closeQuietly(s); + return; + } finally { + s.setSoTimeout(0); + } + + if (n < 0) { + LOGGER.debug("Endpoint {} connection closed during probe", port); + return; + } + if (n > 0 && isLikelyText(probe, n)) { - // OUT connection (NMEA etc.) - closeQuietly(outConn); - s.setTcpNoDelay(true); - s.setKeepAlive(true); - outConn = s; - LOGGER.debug("Endpoint {} OUT connected", port); - pumpOut(outConn, probe, n); + if (!isSocketAlive(outConn)) { + outConn = s; + assigned = true; + LOGGER.debug("Endpoint {} OUT connected", port); + pumpOut(s, probe, n); + } else { + LOGGER.info("Endpoint {} additional OUT connection detected; keeping existing", port); + } } else { - // IN connection (RTCM sink) - closeQuietly(inConn); - s.setTcpNoDelay(true); - s.setKeepAlive(true); - inConn = s; - LOGGER.debug("Endpoint {} IN connected", port); + if (!isSocketAlive(inConn)) { + inConn = s; + assigned = true; + LOGGER.debug("Endpoint {} IN connected", port); + } else { + LOGGER.info("Endpoint {} additional IN connection detected; keeping existing", port); + } } } catch (IOException e) { LOGGER.warn("classifyConnection error: {}", e.getMessage()); - closeQuietly(s); + } finally { + if (!assigned) { + closeQuietly(s); + } } }); } @@ -292,8 +314,7 @@ public class RtkClusterService implements ApplicationRunner { while (true) { try { Socket sink = inConn; - if (sink == null || sink.isClosed()) { - // avoid consuming queue when no sink, preventing starvation + if (!isSocketAlive(sink)) { Thread.sleep(20); continue; } @@ -303,7 +324,12 @@ public class RtkClusterService implements ApplicationRunner { os.write(data); os.flush(); } catch (IOException e) { - LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage()); + String msg = e.getMessage(); + if (msg != null && msg.toLowerCase().contains("closed")) { + LOGGER.debug("Write RTCM failed on {}: {}", port, msg); + } else { + LOGGER.warn("Write RTCM failed on {}: {}", port, msg); + } closeQuietly(sink); inConn = null; } @@ -314,6 +340,10 @@ public class RtkClusterService implements ApplicationRunner { } } + private boolean isSocketAlive(Socket s) { + return s != null && s.isConnected() && !s.isClosed() && !s.isInputShutdown() && !s.isOutputShutdown(); + } + private void closeQuietly(Socket s) { if (s == null) return; try { s.close(); } catch (IOException ignore) {}