diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index 4212808c..1361042f 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -19,6 +19,7 @@ import java.io.*; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.LocalDateTime; @@ -223,34 +224,55 @@ public class RtkClusterService implements ApplicationRunner { private void classifyConnection(Socket s) { exec.submit(() -> { + boolean assigned = false; try { - // short probe to classify role (increase to 1000ms to reduce misclassification) - s.setSoTimeout(1000); + s.setTcpNoDelay(true); + s.setKeepAlive(true); + s.setSoTimeout(1500); InputStream in = s.getInputStream(); byte[] probe = new byte[256]; - int n = 0; - try { n = in.read(probe); } catch (IOException ignore) {} - // restore to blocking mode for steady-state - s.setSoTimeout(0); + int n; + try { + n = in.read(probe); + } catch (SocketTimeoutException timeout) { + n = 0; // treat as no immediate data + } catch (IOException readError) { + LOGGER.debug("Endpoint {} initial read failed: {}", port, readError.getMessage()); + closeQuietly(s); + return; + } finally { + s.setSoTimeout(0); + } + + if (n < 0) { + LOGGER.debug("Endpoint {} connection closed during probe", port); + return; + } + if (n > 0 && isLikelyText(probe, n)) { - // OUT connection (NMEA etc.) - closeQuietly(outConn); - s.setTcpNoDelay(true); - s.setKeepAlive(true); - outConn = s; - LOGGER.debug("Endpoint {} OUT connected", port); - pumpOut(outConn, probe, n); + if (!isSocketAlive(outConn)) { + outConn = s; + assigned = true; + LOGGER.debug("Endpoint {} OUT connected", port); + pumpOut(s, probe, n); + } else { + LOGGER.info("Endpoint {} additional OUT connection detected; keeping existing", port); + } } else { - // IN connection (RTCM sink) - closeQuietly(inConn); - s.setTcpNoDelay(true); - s.setKeepAlive(true); - inConn = s; - LOGGER.debug("Endpoint {} IN connected", port); + if (!isSocketAlive(inConn)) { + inConn = s; + assigned = true; + LOGGER.debug("Endpoint {} IN connected", port); + } else { + LOGGER.info("Endpoint {} additional IN connection detected; keeping existing", port); + } } } catch (IOException e) { LOGGER.warn("classifyConnection error: {}", e.getMessage()); - closeQuietly(s); + } finally { + if (!assigned) { + closeQuietly(s); + } } }); } @@ -292,8 +314,7 @@ public class RtkClusterService implements ApplicationRunner { while (true) { try { Socket sink = inConn; - if (sink == null || sink.isClosed()) { - // avoid consuming queue when no sink, preventing starvation + if (!isSocketAlive(sink)) { Thread.sleep(20); continue; } @@ -303,7 +324,12 @@ public class RtkClusterService implements ApplicationRunner { os.write(data); os.flush(); } catch (IOException e) { - LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage()); + String msg = e.getMessage(); + if (msg != null && msg.toLowerCase().contains("closed")) { + LOGGER.debug("Write RTCM failed on {}: {}", port, msg); + } else { + LOGGER.warn("Write RTCM failed on {}: {}", port, msg); + } closeQuietly(sink); inConn = null; } @@ -314,6 +340,10 @@ public class RtkClusterService implements ApplicationRunner { } } + private boolean isSocketAlive(Socket s) { + return s != null && s.isConnected() && !s.isClosed() && !s.isInputShutdown() && !s.isOutputShutdown(); + } + private void closeQuietly(Socket s) { if (s == null) return; try { s.close(); } catch (IOException ignore) {}