fix: 尝试修复断联问题

This commit is contained in:
yarnom 2025-10-29 09:48:34 +08:00
parent ee77c07bc2
commit e07250637e

View File

@ -19,6 +19,7 @@ import java.io.*;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -223,35 +224,56 @@ public class RtkClusterService implements ApplicationRunner {
private void classifyConnection(Socket s) { private void classifyConnection(Socket s) {
exec.submit(() -> { exec.submit(() -> {
boolean assigned = false;
try { try {
// short probe to classify role (increase to 1000ms to reduce misclassification) s.setTcpNoDelay(true);
s.setSoTimeout(1000); s.setKeepAlive(true);
s.setSoTimeout(1500);
InputStream in = s.getInputStream(); InputStream in = s.getInputStream();
byte[] probe = new byte[256]; byte[] probe = new byte[256];
int n = 0; int n;
try { n = in.read(probe); } catch (IOException ignore) {} try {
// restore to blocking mode for steady-state n = in.read(probe);
} catch (SocketTimeoutException timeout) {
n = 0; // treat as no immediate data
} catch (IOException readError) {
LOGGER.debug("Endpoint {} initial read failed: {}", port, readError.getMessage());
closeQuietly(s);
return;
} finally {
s.setSoTimeout(0); s.setSoTimeout(0);
}
if (n < 0) {
LOGGER.debug("Endpoint {} connection closed during probe", port);
return;
}
if (n > 0 && isLikelyText(probe, n)) { if (n > 0 && isLikelyText(probe, n)) {
// OUT connection (NMEA etc.) if (!isSocketAlive(outConn)) {
closeQuietly(outConn);
s.setTcpNoDelay(true);
s.setKeepAlive(true);
outConn = s; outConn = s;
assigned = true;
LOGGER.debug("Endpoint {} OUT connected", port); LOGGER.debug("Endpoint {} OUT connected", port);
pumpOut(outConn, probe, n); pumpOut(s, probe, n);
} else { } else {
// IN connection (RTCM sink) LOGGER.info("Endpoint {} additional OUT connection detected; keeping existing", port);
closeQuietly(inConn); }
s.setTcpNoDelay(true); } else {
s.setKeepAlive(true); if (!isSocketAlive(inConn)) {
inConn = s; inConn = s;
assigned = true;
LOGGER.debug("Endpoint {} IN connected", port); LOGGER.debug("Endpoint {} IN connected", port);
} else {
LOGGER.info("Endpoint {} additional IN connection detected; keeping existing", port);
}
} }
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn("classifyConnection error: {}", e.getMessage()); LOGGER.warn("classifyConnection error: {}", e.getMessage());
} finally {
if (!assigned) {
closeQuietly(s); closeQuietly(s);
} }
}
}); });
} }
@ -292,8 +314,7 @@ public class RtkClusterService implements ApplicationRunner {
while (true) { while (true) {
try { try {
Socket sink = inConn; Socket sink = inConn;
if (sink == null || sink.isClosed()) { if (!isSocketAlive(sink)) {
// avoid consuming queue when no sink, preventing starvation
Thread.sleep(20); Thread.sleep(20);
continue; continue;
} }
@ -303,7 +324,12 @@ public class RtkClusterService implements ApplicationRunner {
os.write(data); os.write(data);
os.flush(); os.flush();
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage()); String msg = e.getMessage();
if (msg != null && msg.toLowerCase().contains("closed")) {
LOGGER.debug("Write RTCM failed on {}: {}", port, msg);
} else {
LOGGER.warn("Write RTCM failed on {}: {}", port, msg);
}
closeQuietly(sink); closeQuietly(sink);
inConn = null; inConn = null;
} }
@ -314,6 +340,10 @@ public class RtkClusterService implements ApplicationRunner {
} }
} }
private boolean isSocketAlive(Socket s) {
return s != null && s.isConnected() && !s.isClosed() && !s.isInputShutdown() && !s.isOutputShutdown();
}
private void closeQuietly(Socket s) { private void closeQuietly(Socket s) {
if (s == null) return; if (s == null) return;
try { s.close(); } catch (IOException ignore) {} try { s.close(); } catch (IOException ignore) {}