fix: 排查TCP SERVER 发布RTCM消息的断联问题

This commit is contained in:
yarnom 2025-10-29 11:54:36 +08:00
parent 3522e16070
commit 6a9bcca4db

View File

@ -247,33 +247,40 @@ public class RtkClusterService implements ApplicationRunner {
while ((read = in.read(buf)) != -1) {
if (!anyRead) {
anyRead = true;
// First byte(s) received this connection is OUT (source of solution/NMEA)
Socket prev = outConn;
outConn = s;
if (prev != null && prev != s) {
LOGGER.debug("Endpoint {} switching OUT connection", port);
// keep previous reader running until it naturally closes
}
// IN should not be the same socket as OUT
if (inConn == s) inConn = null;
// Try minimal handshake if this looks like an HTTP/NTRIP GET from RTKLIB,
// to keep the connection open when rtkrcv mistakenly uses ntripcli.
try {
String head = new String(buf, 0, Math.min(read, 256), StandardCharsets.US_ASCII);
if (head.startsWith("GET ") || head.contains("RTKLIB/")) {
String head = new String(buf, 0, Math.min(read, 256), StandardCharsets.US_ASCII);
boolean looksHttpGet = head.startsWith("GET ") || head.contains("RTKLIB/");
boolean looksSolution = head.startsWith("% ") || head.startsWith("$GP") || head.startsWith("$GN") || head.startsWith("$GPGGA") || head.startsWith("$GNGGA");
if (looksHttpGet) {
// This is very likely rtkrcv's NTRIP client for IN stream; treat as IN sink, not OUT.
inConn = s;
try {
OutputStream os = s.getOutputStream();
String resp = head.contains("HTTP")
? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n"
: "ICY 200 OK\r\n\r\n";
os.write(resp.getBytes(StandardCharsets.US_ASCII));
os.flush();
LOGGER.info("[OUT:{}] handshake responded: {}", port, resp.trim());
LOGGER.info("[IN:{}] handshake responded: {}", port, resp.trim());
} catch (Exception ignore) {}
LOGGER.info("Endpoint {} IN (ntrip) established", port);
} else if (looksSolution || outConn == null) {
// Treat as OUT (solution/NMEA). Only switch if none set yet to avoid flapping.
if (outConn == null) {
outConn = s;
if (inConn == s) inConn = null;
LOGGER.info("Endpoint {} OUT established", port);
}
} catch (Exception ignore) {}
LOGGER.info("Endpoint {} OUT established", port);
}
}
String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
LOGGER.info("[OUT:{}] {}", port, preview);
if (outConn == s) {
LOGGER.info("[OUT:{}] {}", port, preview);
} else if (inConn == s) {
LOGGER.info("[IN:{}] {}", port, preview);
} else {
LOGGER.info("[UNK:{}] {}", port, preview);
}
}
} catch (IOException e) {
LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage());
@ -306,31 +313,16 @@ public class RtkClusterService implements ApplicationRunner {
private void dequeueLoop() {
byte[] pending = null;
boolean lastWaitLogged = false;
while (true) {
try {
Socket out = outConn;
Socket sink = inConn;
if (out == null) {
// wait until OUT连接已就绪rtkrcv 已输出首包/特征头
if (!lastWaitLogged) {
LOGGER.info("Endpoint {} waiting for OUT to be ready before sending RTCM", port);
lastWaitLogged = true;
}
Thread.sleep(50);
continue;
}
if (sink == null || sink.isClosed() || sink == out) {
if (sink == null || sink.isClosed() || sink == outConn) {
// 等待有效的 IN 连接且不能与 OUT 相同
if (!lastWaitLogged) {
LOGGER.info("Endpoint {} waiting for IN to be ready before sending RTCM", port);
lastWaitLogged = true;
}
LOGGER.info("Endpoint {} waiting for IN to be ready before sending RTCM", port);
ensureInCandidate();
Thread.sleep(50);
continue;
}
lastWaitLogged = false;
if (pending == null) {
pending = rtcmQueue.pollFirst(100, TimeUnit.MILLISECONDS);
if (pending == null) continue;