fix: 排查TCP SERVER 发布RTCM消息的断联问题

This commit is contained in:
yarnom 2025-10-29 11:02:47 +08:00
parent f3aaea78a3
commit c72544cfd3

View File

@ -177,8 +177,10 @@ public class RtkClusterService implements ApplicationRunner {
private final ExecutorService exec;
private volatile boolean started = false;
private volatile ServerSocket server;
private volatile Socket inConn; // server writes RTCM to this
private volatile Socket outConn; // server reads NMEA from this
// Connections: OUT is the one that sends data to us (solution/NMEA). IN is where we write RTCM.
private volatile Socket inConn; // preferred sink to write RTCM
private volatile Socket outConn; // chosen source to read solution
private final java.util.Set<Socket> liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>());
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
DeviceEndpoint(int port) {
@ -214,94 +216,96 @@ public class RtkClusterService implements ApplicationRunner {
LOGGER.info("RtkCluster device endpoint listening on 127.0.0.1:{}", port);
while (true) {
Socket s = ss.accept();
classifyConnection(s);
onAccepted(s);
}
} catch (IOException e) {
LOGGER.error("Device endpoint on {} stopped: {}", port, e.getMessage(), e);
}
}
private void classifyConnection(Socket s) {
private void onAccepted(Socket s) {
exec.submit(() -> {
try {
// short probe to classify role (increase to 1000ms to reduce misclassification)
s.setSoTimeout(1000);
InputStream in = s.getInputStream();
byte[] probe = new byte[256];
int n = 0;
try { n = in.read(probe); } catch (IOException ignore) {}
// restore to blocking mode for steady-state
s.setSoTimeout(0);
if (n > 0 && isLikelyText(probe, n)) {
// OUT connection (NMEA etc.)
closeQuietly(outConn);
s.setTcpNoDelay(true);
s.setKeepAlive(true);
outConn = s;
LOGGER.debug("Endpoint {} OUT connected", port);
pumpOut(outConn, probe, n);
} else {
// IN connection (RTCM sink)
closeQuietly(inConn);
s.setTcpNoDelay(true);
s.setKeepAlive(true);
inConn = s;
LOGGER.debug("Endpoint {} IN connected", port);
}
} catch (IOException e) {
LOGGER.warn("classifyConnection error: {}", e.getMessage());
closeQuietly(s);
}
s.setTcpNoDelay(true);
s.setKeepAlive(true);
} catch (Exception ignore) {}
liveConns.add(s);
LOGGER.debug("Endpoint {} new connection (live={})", port, liveConns.size());
// Start a reader; the first socket that actually sends data becomes OUT.
startReader(s);
// Try to update IN candidate if missing
ensureInCandidate();
});
}
private boolean isLikelyText(byte[] buf, int n) {
int printable = 0;
for (int i = 0; i < n; i++) {
int b = buf[i] & 0xFF;
if (b >= 32 && b <= 126) printable++;
}
return printable >= Math.max(1, n - 4); // tolerate some non-printables
}
private void pumpOut(Socket s, byte[] firstBuf, int firstLen) {
private void startReader(Socket s) {
exec.submit(() -> {
boolean anyRead = false;
try (InputStream in = s.getInputStream()) {
byte[] buf = new byte[2048];
int read;
// deliver first classified bytes if any
if (firstLen > 0) {
String preview = new String(firstBuf, 0, firstLen, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
LOGGER.info("[OUT:{}] {}", port, preview);
}
while ((read = in.read(buf)) != -1) {
// For now, just log a short preview
if (!anyRead) {
anyRead = true;
// First byte(s) received this connection is OUT (source of solution/NMEA)
Socket prev = outConn;
outConn = s;
if (prev != null && prev != s) {
LOGGER.debug("Endpoint {} switching OUT connection", port);
// keep previous reader running until it naturally closes
}
// IN should not be the same socket as OUT
if (inConn == s) inConn = null;
}
String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
LOGGER.info("[OUT:{}] {}", port, preview);
}
} catch (IOException e) {
LOGGER.debug("OUT connection closed on {}: {}", port, e.getMessage());
LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage());
} finally {
liveConns.remove(s);
if (outConn == s) outConn = null;
if (inConn == s) inConn = null;
closeQuietly(s);
ensureInCandidate();
}
});
}
private void ensureInCandidate() {
// If current IN is invalid or equals OUT, select another live socket as IN
Socket current = inConn;
if (current != null && !current.isClosed() && current != outConn) return;
for (Socket c : liveConns) {
if (c != null && !c.isClosed() && c != outConn) {
inConn = c;
LOGGER.debug("Endpoint {} IN set (live={})", port, liveConns.size());
return;
}
}
inConn = null;
}
private void dequeueLoop() {
byte[] pending = null;
while (true) {
try {
Socket sink = inConn;
if (sink == null || sink.isClosed()) {
// avoid consuming queue when no sink, preventing starvation
if (sink == null || sink.isClosed() || sink == outConn) {
// wait for a valid IN sink before consuming queue
ensureInCandidate();
Thread.sleep(20);
continue;
}
byte[] data = rtcmQueue.take();
if (pending == null) {
pending = rtcmQueue.pollFirst(100, TimeUnit.MILLISECONDS);
if (pending == null) continue;
}
try {
OutputStream os = sink.getOutputStream();
os.write(data);
os.write(pending);
os.flush();
pending = null; // sent successfully
} catch (IOException e) {
LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage());
closeQuietly(sink);