From 6a9bcca4db2ba914f18e37fed059c5f169037038 Mon Sep 17 00:00:00 2001 From: yarnom Date: Wed, 29 Oct 2025 11:54:36 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=8E=92=E6=9F=A5TCP=20SERVER=20?= =?UTF-8?q?=E5=8F=91=E5=B8=83RTCM=E6=B6=88=E6=81=AF=E7=9A=84=E6=96=AD?= =?UTF-8?q?=E8=81=94=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rtkcluster/RtkClusterService.java | 62 ++++++++----------- 1 file changed, 27 insertions(+), 35 deletions(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index 9e711c6d..8dbb458b 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -247,33 +247,40 @@ public class RtkClusterService implements ApplicationRunner { while ((read = in.read(buf)) != -1) { if (!anyRead) { anyRead = true; - // First byte(s) received — this connection is OUT (source of solution/NMEA) - Socket prev = outConn; - outConn = s; - if (prev != null && prev != s) { - LOGGER.debug("Endpoint {} switching OUT connection", port); - // keep previous reader running until it naturally closes - } - // IN should not be the same socket as OUT - if (inConn == s) inConn = null; - // Try minimal handshake if this looks like an HTTP/NTRIP GET from RTKLIB, - // to keep the connection open when rtkrcv mistakenly uses ntripcli. - try { - String head = new String(buf, 0, Math.min(read, 256), StandardCharsets.US_ASCII); - if (head.startsWith("GET ") || head.contains("RTKLIB/")) { + String head = new String(buf, 0, Math.min(read, 256), StandardCharsets.US_ASCII); + boolean looksHttpGet = head.startsWith("GET ") || head.contains("RTKLIB/"); + boolean looksSolution = head.startsWith("% ") || head.startsWith("$GP") || head.startsWith("$GN") || head.startsWith("$GPGGA") || head.startsWith("$GNGGA"); + + if (looksHttpGet) { + // This is very likely rtkrcv's NTRIP client for IN stream; treat as IN sink, not OUT. + inConn = s; + try { OutputStream os = s.getOutputStream(); String resp = head.contains("HTTP") ? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n" : "ICY 200 OK\r\n\r\n"; os.write(resp.getBytes(StandardCharsets.US_ASCII)); os.flush(); - LOGGER.info("[OUT:{}] handshake responded: {}", port, resp.trim()); + LOGGER.info("[IN:{}] handshake responded: {}", port, resp.trim()); + } catch (Exception ignore) {} + LOGGER.info("Endpoint {} IN (ntrip) established", port); + } else if (looksSolution || outConn == null) { + // Treat as OUT (solution/NMEA). Only switch if none set yet to avoid flapping. + if (outConn == null) { + outConn = s; + if (inConn == s) inConn = null; + LOGGER.info("Endpoint {} OUT established", port); } - } catch (Exception ignore) {} - LOGGER.info("Endpoint {} OUT established", port); + } } String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); - LOGGER.info("[OUT:{}] {}", port, preview); + if (outConn == s) { + LOGGER.info("[OUT:{}] {}", port, preview); + } else if (inConn == s) { + LOGGER.info("[IN:{}] {}", port, preview); + } else { + LOGGER.info("[UNK:{}] {}", port, preview); + } } } catch (IOException e) { LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); @@ -306,31 +313,16 @@ public class RtkClusterService implements ApplicationRunner { private void dequeueLoop() { byte[] pending = null; - boolean lastWaitLogged = false; while (true) { try { - Socket out = outConn; Socket sink = inConn; - if (out == null) { - // wait until OUT连接已就绪(rtkrcv 已输出首包/特征头) - if (!lastWaitLogged) { - LOGGER.info("Endpoint {} waiting for OUT to be ready before sending RTCM", port); - lastWaitLogged = true; - } - Thread.sleep(50); - continue; - } - if (sink == null || sink.isClosed() || sink == out) { + if (sink == null || sink.isClosed() || sink == outConn) { // 等待有效的 IN 连接(且不能与 OUT 相同) - if (!lastWaitLogged) { - LOGGER.info("Endpoint {} waiting for IN to be ready before sending RTCM", port); - lastWaitLogged = true; - } + LOGGER.info("Endpoint {} waiting for IN to be ready before sending RTCM", port); ensureInCandidate(); Thread.sleep(50); continue; } - lastWaitLogged = false; if (pending == null) { pending = rtcmQueue.pollFirst(100, TimeUnit.MILLISECONDS); if (pending == null) continue;