fix: 排查TCP SERVER 发布RTCM消息的断联问题

This commit is contained in:
yarnom 2025-10-29 12:39:02 +08:00
parent f75bafcc75
commit 0ef993f9f9

View File

@ -177,9 +177,9 @@ public class RtkClusterService implements ApplicationRunner {
private final ExecutorService exec;
private volatile boolean started = false;
private volatile ServerSocket server;
// 命名按你的语义
// IN = rtkrcv -> server我们读取解算/NMEA
// OUT = server -> rtkrcv我们向其发送RTCM
// 命名按你的语义用于日志呈现
// RTKRCV-OUT = rtkrcv -> server我们读取解算/NMEA
// RTKRCV-IN = server -> rtkrcv我们向其发送RTCM
private volatile Socket inConn; // source we READ (solution/NMEA)
private volatile Socket outConn; // sink we WRITE RTCM
private final java.util.Set<Socket> liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>());
@ -254,7 +254,7 @@ public class RtkClusterService implements ApplicationRunner {
boolean looksSolution = head.startsWith("% ") || head.startsWith("$GP") || head.startsWith("$GN") || head.startsWith("$GPGGA") || head.startsWith("$GNGGA");
if (looksHttpGet || looksSolution) {
// 这是 rtkrcv -> 我们 的连接IN我们读取
// 这是 rtkrcv -> 我们 的连接RTKRCV-OUT我们读取
inConn = s;
if (looksHttpGet) {
// 若像 NTRIP GET请求一个最小应答保持连接
@ -265,19 +265,19 @@ public class RtkClusterService implements ApplicationRunner {
: "ICY 200 OK\r\n\r\n";
os.write(resp.getBytes(StandardCharsets.US_ASCII));
os.flush();
LOGGER.info("[IN:{}] handshake responded: {}", port, resp.trim());
LOGGER.info("[OUT:{}] handshake responded: {}", port, resp.trim());
} catch (Exception ignore) {}
}
// OUT 不能与 IN 相同
if (outConn == s) outConn = null;
LOGGER.info("Endpoint {} IN established", port);
LOGGER.info("Endpoint {} RTKRCV OUT established", port);
}
}
String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
if (inConn == s) {
LOGGER.info("[IN:{}] {}", port, preview);
} else if (outConn == s) {
LOGGER.info("[OUT:{}] {}", port, preview);
} else if (outConn == s) {
LOGGER.info("[IN:{}] {}", port, preview);
} else {
LOGGER.info("[UNK:{}] {}", port, preview);
}
@ -286,8 +286,8 @@ public class RtkClusterService implements ApplicationRunner {
LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage());
} finally {
liveConns.remove(s);
if (inConn == s) { inConn = null; LOGGER.info("Endpoint {} IN disconnected", port); }
if (outConn == s) { outConn = null; LOGGER.info("Endpoint {} OUT disconnected", port); }
if (inConn == s) { inConn = null; LOGGER.info("Endpoint {} RTKRCV OUT disconnected", port); }
if (outConn == s) { outConn = null; LOGGER.info("Endpoint {} RTKRCV IN disconnected", port); }
closeQuietly(s);
ensureOutCandidate();
}
@ -295,13 +295,13 @@ public class RtkClusterService implements ApplicationRunner {
}
private void ensureOutCandidate() {
// 如果当前 OUT 不可用或与 IN 相同则从存活连接里挑选一个非 IN 的作为 OUTRTCM 写入端
// 如果当前 OUT 不可用或与 IN 相同则从存活连接里挑选一个非 IN 的作为 OUTRTKRCV-INRTCM 写入端
Socket current = outConn;
if (current != null && !current.isClosed() && current != inConn) return;
for (Socket c : liveConns) {
if (c != null && !c.isClosed() && c != inConn) {
outConn = c;
LOGGER.debug("Endpoint {} OUT set (live={})", port, liveConns.size());
LOGGER.debug("Endpoint {} RTKRCV IN set (live={})", port, liveConns.size());
return;
}
}
@ -314,8 +314,8 @@ public class RtkClusterService implements ApplicationRunner {
try {
Socket sink = outConn;
if (sink == null || sink.isClosed() || sink == inConn) {
// 等待有效的 OUT 连接我们往其写 RTCM且不能与 IN 相同
LOGGER.info("Endpoint {} waiting for OUT to be ready before sending RTCM", port);
// 等待有效的 RTKRCV-IN 连接我们往其写 RTCM且不能与 RTKRCV-OUT 相同
LOGGER.info("Endpoint {} waiting for RTKRCV IN to be ready before sending RTCM", port);
ensureOutCandidate();
Thread.sleep(50);
continue;