From a28251d7872f6c903c2ee703367e18054a52535e Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 29 Oct 2025 10:34:58 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"fix:=20=E5=B0=9D=E8=AF=95=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=96=AD=E8=81=94=E9=97=AE=E9=A2=98"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit e07250637e5a7777e007256f692d6874c86ad656. --- .../rtkcluster/RtkClusterService.java | 76 ++++++------------- 1 file changed, 23 insertions(+), 53 deletions(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index 1361042f..4212808c 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -19,7 +19,6 @@ import java.io.*; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.LocalDateTime; @@ -224,55 +223,34 @@ public class RtkClusterService implements ApplicationRunner { private void classifyConnection(Socket s) { exec.submit(() -> { - boolean assigned = false; try { - s.setTcpNoDelay(true); - s.setKeepAlive(true); - s.setSoTimeout(1500); + // short probe to classify role (increase to 1000ms to reduce misclassification) + s.setSoTimeout(1000); InputStream in = s.getInputStream(); byte[] probe = new byte[256]; - int n; - try { - n = in.read(probe); - } catch (SocketTimeoutException timeout) { - n = 0; // treat as no immediate data - } catch (IOException readError) { - LOGGER.debug("Endpoint {} initial read failed: {}", port, readError.getMessage()); - closeQuietly(s); - return; - } finally { - s.setSoTimeout(0); - } - - if (n < 0) { - LOGGER.debug("Endpoint {} connection closed during probe", port); - return; - } - + int n = 0; + try { n = in.read(probe); } catch (IOException ignore) {} + // restore to blocking mode for steady-state + s.setSoTimeout(0); if (n > 0 && isLikelyText(probe, n)) { - if (!isSocketAlive(outConn)) { - outConn = s; - assigned = true; - LOGGER.debug("Endpoint {} OUT connected", port); - pumpOut(s, probe, n); - } else { - LOGGER.info("Endpoint {} additional OUT connection detected; keeping existing", port); - } + // OUT connection (NMEA etc.) + closeQuietly(outConn); + s.setTcpNoDelay(true); + s.setKeepAlive(true); + outConn = s; + LOGGER.debug("Endpoint {} OUT connected", port); + pumpOut(outConn, probe, n); } else { - if (!isSocketAlive(inConn)) { - inConn = s; - assigned = true; - LOGGER.debug("Endpoint {} IN connected", port); - } else { - LOGGER.info("Endpoint {} additional IN connection detected; keeping existing", port); - } + // IN connection (RTCM sink) + closeQuietly(inConn); + s.setTcpNoDelay(true); + s.setKeepAlive(true); + inConn = s; + LOGGER.debug("Endpoint {} IN connected", port); } } catch (IOException e) { LOGGER.warn("classifyConnection error: {}", e.getMessage()); - } finally { - if (!assigned) { - closeQuietly(s); - } + closeQuietly(s); } }); } @@ -314,7 +292,8 @@ public class RtkClusterService implements ApplicationRunner { while (true) { try { Socket sink = inConn; - if (!isSocketAlive(sink)) { + if (sink == null || sink.isClosed()) { + // avoid consuming queue when no sink, preventing starvation Thread.sleep(20); continue; } @@ -324,12 +303,7 @@ public class RtkClusterService implements ApplicationRunner { os.write(data); os.flush(); } catch (IOException e) { - String msg = e.getMessage(); - if (msg != null && msg.toLowerCase().contains("closed")) { - LOGGER.debug("Write RTCM failed on {}: {}", port, msg); - } else { - LOGGER.warn("Write RTCM failed on {}: {}", port, msg); - } + LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage()); closeQuietly(sink); inConn = null; } @@ -340,10 +314,6 @@ public class RtkClusterService implements ApplicationRunner { } } - private boolean isSocketAlive(Socket s) { - return s != null && s.isConnected() && !s.isClosed() && !s.isInputShutdown() && !s.isOutputShutdown(); - } - private void closeQuietly(Socket s) { if (s == null) return; try { s.close(); } catch (IOException ignore) {}