fix: 排查TCP SERVER 发布RTCM消息的断联问题

This commit is contained in:
yarnom 2025-10-29 12:23:31 +08:00
parent bde6d798c5
commit dcfa0881f3

View File

@ -177,9 +177,11 @@ public class RtkClusterService implements ApplicationRunner {
private final ExecutorService exec; private final ExecutorService exec;
private volatile boolean started = false; private volatile boolean started = false;
private volatile ServerSocket server; private volatile ServerSocket server;
// Connections: OUT is the one that sends data to us (solution/NMEA). IN is where we write RTCM. // 命名按你的语义
private volatile Socket inConn; // preferred sink to write RTCM // IN = rtkrcv -> server我们读取解算/NMEA
private volatile Socket outConn; // chosen source to read solution // OUT = server -> rtkrcv我们向其发送RTCM
private volatile Socket inConn; // source we READ (solution/NMEA)
private volatile Socket outConn; // sink we WRITE RTCM
private final java.util.Set<Socket> liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>()); private final java.util.Set<Socket> liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>());
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
@ -231,10 +233,10 @@ public class RtkClusterService implements ApplicationRunner {
} catch (Exception ignore) {} } catch (Exception ignore) {}
liveConns.add(s); liveConns.add(s);
LOGGER.debug("Endpoint {} new connection (live={})", port, liveConns.size()); LOGGER.debug("Endpoint {} new connection (live={})", port, liveConns.size());
// Start a reader; the first socket that actually sends data becomes OUT. // Start a reader; the first socket that actually sends data and looks like solution/NTRIP becomes IN.
startReader(s); startReader(s);
// Try to update IN candidate if missing // Try to set OUT candidate if missing (the sink for RTCM)
ensureInCandidate(); ensureOutCandidate();
}); });
} }
@ -251,33 +253,31 @@ public class RtkClusterService implements ApplicationRunner {
boolean looksHttpGet = head.startsWith("GET ") || head.contains("RTKLIB/"); boolean looksHttpGet = head.startsWith("GET ") || head.contains("RTKLIB/");
boolean looksSolution = head.startsWith("% ") || head.startsWith("$GP") || head.startsWith("$GN") || head.startsWith("$GPGGA") || head.startsWith("$GNGGA"); boolean looksSolution = head.startsWith("% ") || head.startsWith("$GP") || head.startsWith("$GN") || head.startsWith("$GPGGA") || head.startsWith("$GNGGA");
if (looksHttpGet) { if (looksHttpGet || looksSolution) {
// This is very likely rtkrcv's NTRIP client for IN stream; treat as IN sink, not OUT. // 这是 rtkrcv -> 我们 的连接IN我们读取
inConn = s; inConn = s;
try { if (looksHttpGet) {
OutputStream os = s.getOutputStream(); // 若像 NTRIP GET请求一个最小应答保持连接
String resp = head.contains("HTTP") try {
? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n" OutputStream os = s.getOutputStream();
: "ICY 200 OK\r\n\r\n"; String resp = head.contains("HTTP")
os.write(resp.getBytes(StandardCharsets.US_ASCII)); ? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n"
os.flush(); : "ICY 200 OK\r\n\r\n";
LOGGER.info("[IN:{}] handshake responded: {}", port, resp.trim()); os.write(resp.getBytes(StandardCharsets.US_ASCII));
} catch (Exception ignore) {} os.flush();
LOGGER.info("Endpoint {} IN (ntrip) established", port); LOGGER.info("[IN:{}] handshake responded: {}", port, resp.trim());
} else if (looksSolution || outConn == null) { } catch (Exception ignore) {}
// Treat as OUT (solution/NMEA). Only switch if none set yet to avoid flapping.
if (outConn == null) {
outConn = s;
if (inConn == s) inConn = null;
LOGGER.info("Endpoint {} OUT established", port);
} }
// OUT 不能与 IN 相同
if (outConn == s) outConn = null;
LOGGER.info("Endpoint {} IN established", port);
} }
} }
String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
if (outConn == s) { if (inConn == s) {
LOGGER.info("[OUT:{}] {}", port, preview);
} else if (inConn == s) {
LOGGER.info("[IN:{}] {}", port, preview); LOGGER.info("[IN:{}] {}", port, preview);
} else if (outConn == s) {
LOGGER.info("[OUT:{}] {}", port, preview);
} else { } else {
LOGGER.info("[UNK:{}] {}", port, preview); LOGGER.info("[UNK:{}] {}", port, preview);
} }
@ -286,40 +286,37 @@ public class RtkClusterService implements ApplicationRunner {
LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage());
} finally { } finally {
liveConns.remove(s); liveConns.remove(s);
if (outConn == s) { if (inConn == s) { inConn = null; LOGGER.info("Endpoint {} IN disconnected", port); }
outConn = null; if (outConn == s) { outConn = null; LOGGER.info("Endpoint {} OUT disconnected", port); }
LOGGER.info("Endpoint {} OUT disconnected", port);
}
if (inConn == s) inConn = null;
closeQuietly(s); closeQuietly(s);
ensureInCandidate(); ensureOutCandidate();
} }
}); });
} }
private void ensureInCandidate() { private void ensureOutCandidate() {
// If current IN is invalid or equals OUT, select another live socket as IN // 如果当前 OUT 不可用或与 IN 相同则从存活连接里挑选一个非 IN 的作为 OUTRTCM 写入端
Socket current = inConn; Socket current = outConn;
if (current != null && !current.isClosed() && current != outConn) return; if (current != null && !current.isClosed() && current != inConn) return;
for (Socket c : liveConns) { for (Socket c : liveConns) {
if (c != null && !c.isClosed() && c != outConn) { if (c != null && !c.isClosed() && c != inConn) {
inConn = c; outConn = c;
LOGGER.debug("Endpoint {} IN set (live={})", port, liveConns.size()); LOGGER.debug("Endpoint {} OUT set (live={})", port, liveConns.size());
return; return;
} }
} }
inConn = null; outConn = null;
} }
private void dequeueLoop() { private void dequeueLoop() {
byte[] pending = null; byte[] pending = null;
while (true) { while (true) {
try { try {
Socket sink = inConn; Socket sink = outConn;
if (sink == null || sink.isClosed() || sink == outConn) { if (sink == null || sink.isClosed() || sink == inConn) {
// 等待有效的 IN 连接且不能与 OUT 相同 // 等待有效的 OUT 连接我们往其写 RTCM且不能与 IN 相同
LOGGER.info("Endpoint {} waiting for IN to be ready before sending RTCM", port); LOGGER.info("Endpoint {} waiting for OUT to be ready before sending RTCM", port);
ensureInCandidate(); ensureOutCandidate();
Thread.sleep(50); Thread.sleep(50);
continue; continue;
} }
@ -335,7 +332,7 @@ public class RtkClusterService implements ApplicationRunner {
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage()); LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage());
closeQuietly(sink); closeQuietly(sink);
inConn = null; outConn = null;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();