diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index d3e05e40..16d26b3b 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -177,9 +177,11 @@ public class RtkClusterService implements ApplicationRunner { private final ExecutorService exec; private volatile boolean started = false; private volatile ServerSocket server; - // Connections: OUT is the one that sends data to us (solution/NMEA). IN is where we write RTCM. - private volatile Socket inConn; // preferred sink to write RTCM - private volatile Socket outConn; // chosen source to read solution + // 命名按你的语义: + // IN = rtkrcv -> server(我们读取解算/NMEA) + // OUT = server -> rtkrcv(我们向其发送RTCM) + private volatile Socket inConn; // source we READ (solution/NMEA) + private volatile Socket outConn; // sink we WRITE RTCM private final java.util.Set liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>()); private final java.util.concurrent.LinkedBlockingDeque rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); @@ -231,10 +233,10 @@ public class RtkClusterService implements ApplicationRunner { } catch (Exception ignore) {} liveConns.add(s); LOGGER.debug("Endpoint {} new connection (live={})", port, liveConns.size()); - // Start a reader; the first socket that actually sends data becomes OUT. + // Start a reader; the first socket that actually sends data and looks like solution/NTRIP becomes IN. startReader(s); - // Try to update IN candidate if missing - ensureInCandidate(); + // Try to set OUT candidate if missing (the sink for RTCM) + ensureOutCandidate(); }); } @@ -251,33 +253,31 @@ public class RtkClusterService implements ApplicationRunner { boolean looksHttpGet = head.startsWith("GET ") || head.contains("RTKLIB/"); boolean looksSolution = head.startsWith("% ") || head.startsWith("$GP") || head.startsWith("$GN") || head.startsWith("$GPGGA") || head.startsWith("$GNGGA"); - if (looksHttpGet) { - // This is very likely rtkrcv's NTRIP client for IN stream; treat as IN sink, not OUT. + if (looksHttpGet || looksSolution) { + // 这是 rtkrcv -> 我们 的连接(IN:我们读取) inConn = s; - try { - OutputStream os = s.getOutputStream(); - String resp = head.contains("HTTP") - ? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n" - : "ICY 200 OK\r\n\r\n"; - os.write(resp.getBytes(StandardCharsets.US_ASCII)); - os.flush(); - LOGGER.info("[IN:{}] handshake responded: {}", port, resp.trim()); - } catch (Exception ignore) {} - LOGGER.info("Endpoint {} IN (ntrip) established", port); - } else if (looksSolution || outConn == null) { - // Treat as OUT (solution/NMEA). Only switch if none set yet to avoid flapping. - if (outConn == null) { - outConn = s; - if (inConn == s) inConn = null; - LOGGER.info("Endpoint {} OUT established", port); + if (looksHttpGet) { + // 若像 NTRIP GET,请求一个最小应答保持连接 + try { + OutputStream os = s.getOutputStream(); + String resp = head.contains("HTTP") + ? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n" + : "ICY 200 OK\r\n\r\n"; + os.write(resp.getBytes(StandardCharsets.US_ASCII)); + os.flush(); + LOGGER.info("[IN:{}] handshake responded: {}", port, resp.trim()); + } catch (Exception ignore) {} } + // OUT 不能与 IN 相同 + if (outConn == s) outConn = null; + LOGGER.info("Endpoint {} IN established", port); } } String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); - if (outConn == s) { - LOGGER.info("[OUT:{}] {}", port, preview); - } else if (inConn == s) { + if (inConn == s) { LOGGER.info("[IN:{}] {}", port, preview); + } else if (outConn == s) { + LOGGER.info("[OUT:{}] {}", port, preview); } else { LOGGER.info("[UNK:{}] {}", port, preview); } @@ -286,40 +286,37 @@ public class RtkClusterService implements ApplicationRunner { LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); } finally { liveConns.remove(s); - if (outConn == s) { - outConn = null; - LOGGER.info("Endpoint {} OUT disconnected", port); - } - if (inConn == s) inConn = null; + if (inConn == s) { inConn = null; LOGGER.info("Endpoint {} IN disconnected", port); } + if (outConn == s) { outConn = null; LOGGER.info("Endpoint {} OUT disconnected", port); } closeQuietly(s); - ensureInCandidate(); + ensureOutCandidate(); } }); } - private void ensureInCandidate() { - // If current IN is invalid or equals OUT, select another live socket as IN - Socket current = inConn; - if (current != null && !current.isClosed() && current != outConn) return; + private void ensureOutCandidate() { + // 如果当前 OUT 不可用或与 IN 相同,则从存活连接里挑选一个非 IN 的作为 OUT(RTCM 写入端) + Socket current = outConn; + if (current != null && !current.isClosed() && current != inConn) return; for (Socket c : liveConns) { - if (c != null && !c.isClosed() && c != outConn) { - inConn = c; - LOGGER.debug("Endpoint {} IN set (live={})", port, liveConns.size()); + if (c != null && !c.isClosed() && c != inConn) { + outConn = c; + LOGGER.debug("Endpoint {} OUT set (live={})", port, liveConns.size()); return; } } - inConn = null; + outConn = null; } private void dequeueLoop() { byte[] pending = null; while (true) { try { - Socket sink = inConn; - if (sink == null || sink.isClosed() || sink == outConn) { - // 等待有效的 IN 连接(且不能与 OUT 相同) - LOGGER.info("Endpoint {} waiting for IN to be ready before sending RTCM", port); - ensureInCandidate(); + Socket sink = outConn; + if (sink == null || sink.isClosed() || sink == inConn) { + // 等待有效的 OUT 连接(我们往其写 RTCM),且不能与 IN 相同 + LOGGER.info("Endpoint {} waiting for OUT to be ready before sending RTCM", port); + ensureOutCandidate(); Thread.sleep(50); continue; } @@ -335,7 +332,7 @@ public class RtkClusterService implements ApplicationRunner { } catch (IOException e) { LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage()); closeQuietly(sink); - inConn = null; + outConn = null; } } catch (InterruptedException e) { Thread.currentThread().interrupt();