From c72544cfd33fcb320984b6d72a4d98fee77bcbcd Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 29 Oct 2025 11:02:47 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=8E=92=E6=9F=A5TCP=20SERVER=20?= =?UTF-8?q?=E5=8F=91=E5=B8=83RTCM=E6=B6=88=E6=81=AF=E7=9A=84=E6=96=AD?= =?UTF-8?q?=E8=81=94=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rtkcluster/RtkClusterService.java | 110 +++++++++--------- 1 file changed, 57 insertions(+), 53 deletions(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index ae19f7e3..b1d258d3 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -177,8 +177,10 @@ public class RtkClusterService implements ApplicationRunner { private final ExecutorService exec; private volatile boolean started = false; private volatile ServerSocket server; - private volatile Socket inConn; // server writes RTCM to this - private volatile Socket outConn; // server reads NMEA from this + // Connections: OUT is the one that sends data to us (solution/NMEA). IN is where we write RTCM. + private volatile Socket inConn; // preferred sink to write RTCM + private volatile Socket outConn; // chosen source to read solution + private final java.util.Set liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>()); private final java.util.concurrent.LinkedBlockingDeque rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); DeviceEndpoint(int port) { @@ -214,94 +216,96 @@ public class RtkClusterService implements ApplicationRunner { LOGGER.info("RtkCluster device endpoint listening on 127.0.0.1:{}", port); while (true) { Socket s = ss.accept(); - classifyConnection(s); + onAccepted(s); } } catch (IOException e) { LOGGER.error("Device endpoint on {} stopped: {}", port, e.getMessage(), e); } } - private void classifyConnection(Socket s) { + private void onAccepted(Socket s) { exec.submit(() -> { try { - // short probe to classify role (increase to 1000ms to reduce misclassification) - s.setSoTimeout(1000); - InputStream in = s.getInputStream(); - byte[] probe = new byte[256]; - int n = 0; - try { n = in.read(probe); } catch (IOException ignore) {} - // restore to blocking mode for steady-state - s.setSoTimeout(0); - if (n > 0 && isLikelyText(probe, n)) { - // OUT connection (NMEA etc.) - closeQuietly(outConn); - s.setTcpNoDelay(true); - s.setKeepAlive(true); - outConn = s; - LOGGER.debug("Endpoint {} OUT connected", port); - pumpOut(outConn, probe, n); - } else { - // IN connection (RTCM sink) - closeQuietly(inConn); - s.setTcpNoDelay(true); - s.setKeepAlive(true); - inConn = s; - LOGGER.debug("Endpoint {} IN connected", port); - } - } catch (IOException e) { - LOGGER.warn("classifyConnection error: {}", e.getMessage()); - closeQuietly(s); - } + s.setTcpNoDelay(true); + s.setKeepAlive(true); + } catch (Exception ignore) {} + liveConns.add(s); + LOGGER.debug("Endpoint {} new connection (live={})", port, liveConns.size()); + // Start a reader; the first socket that actually sends data becomes OUT. + startReader(s); + // Try to update IN candidate if missing + ensureInCandidate(); }); } - private boolean isLikelyText(byte[] buf, int n) { - int printable = 0; - for (int i = 0; i < n; i++) { - int b = buf[i] & 0xFF; - if (b >= 32 && b <= 126) printable++; - } - return printable >= Math.max(1, n - 4); // tolerate some non-printables - } - - private void pumpOut(Socket s, byte[] firstBuf, int firstLen) { + private void startReader(Socket s) { exec.submit(() -> { + boolean anyRead = false; try (InputStream in = s.getInputStream()) { byte[] buf = new byte[2048]; int read; - // deliver first classified bytes if any - if (firstLen > 0) { - String preview = new String(firstBuf, 0, firstLen, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); - LOGGER.info("[OUT:{}] {}", port, preview); - } while ((read = in.read(buf)) != -1) { - // For now, just log a short preview + if (!anyRead) { + anyRead = true; + // First byte(s) received — this connection is OUT (source of solution/NMEA) + Socket prev = outConn; + outConn = s; + if (prev != null && prev != s) { + LOGGER.debug("Endpoint {} switching OUT connection", port); + // keep previous reader running until it naturally closes + } + // IN should not be the same socket as OUT + if (inConn == s) inConn = null; + } String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); LOGGER.info("[OUT:{}] {}", port, preview); } } catch (IOException e) { - LOGGER.debug("OUT connection closed on {}: {}", port, e.getMessage()); + LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); } finally { + liveConns.remove(s); if (outConn == s) outConn = null; + if (inConn == s) inConn = null; closeQuietly(s); + ensureInCandidate(); } }); } + private void ensureInCandidate() { + // If current IN is invalid or equals OUT, select another live socket as IN + Socket current = inConn; + if (current != null && !current.isClosed() && current != outConn) return; + for (Socket c : liveConns) { + if (c != null && !c.isClosed() && c != outConn) { + inConn = c; + LOGGER.debug("Endpoint {} IN set (live={})", port, liveConns.size()); + return; + } + } + inConn = null; + } + private void dequeueLoop() { + byte[] pending = null; while (true) { try { Socket sink = inConn; - if (sink == null || sink.isClosed()) { - // avoid consuming queue when no sink, preventing starvation + if (sink == null || sink.isClosed() || sink == outConn) { + // wait for a valid IN sink before consuming queue + ensureInCandidate(); Thread.sleep(20); continue; } - byte[] data = rtcmQueue.take(); + if (pending == null) { + pending = rtcmQueue.pollFirst(100, TimeUnit.MILLISECONDS); + if (pending == null) continue; + } try { OutputStream os = sink.getOutputStream(); - os.write(data); + os.write(pending); os.flush(); + pending = null; // sent successfully } catch (IOException e) { LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage()); closeQuietly(sink);