Revert "fix: 尝试修复断联问题"

This reverts commit e07250637e5a7777e007256f692d6874c86ad656.
This commit is contained in:
yarnom 2025-10-29 10:34:58 +08:00
parent c252b11d3c
commit a28251d787

View File

@ -19,7 +19,6 @@ import java.io.*;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -224,56 +223,35 @@ public class RtkClusterService implements ApplicationRunner {
private void classifyConnection(Socket s) { private void classifyConnection(Socket s) {
exec.submit(() -> { exec.submit(() -> {
boolean assigned = false;
try { try {
s.setTcpNoDelay(true); // short probe to classify role (increase to 1000ms to reduce misclassification)
s.setKeepAlive(true); s.setSoTimeout(1000);
s.setSoTimeout(1500);
InputStream in = s.getInputStream(); InputStream in = s.getInputStream();
byte[] probe = new byte[256]; byte[] probe = new byte[256];
int n; int n = 0;
try { try { n = in.read(probe); } catch (IOException ignore) {}
n = in.read(probe); // restore to blocking mode for steady-state
} catch (SocketTimeoutException timeout) {
n = 0; // treat as no immediate data
} catch (IOException readError) {
LOGGER.debug("Endpoint {} initial read failed: {}", port, readError.getMessage());
closeQuietly(s);
return;
} finally {
s.setSoTimeout(0); s.setSoTimeout(0);
}
if (n < 0) {
LOGGER.debug("Endpoint {} connection closed during probe", port);
return;
}
if (n > 0 && isLikelyText(probe, n)) { if (n > 0 && isLikelyText(probe, n)) {
if (!isSocketAlive(outConn)) { // OUT connection (NMEA etc.)
closeQuietly(outConn);
s.setTcpNoDelay(true);
s.setKeepAlive(true);
outConn = s; outConn = s;
assigned = true;
LOGGER.debug("Endpoint {} OUT connected", port); LOGGER.debug("Endpoint {} OUT connected", port);
pumpOut(s, probe, n); pumpOut(outConn, probe, n);
} else { } else {
LOGGER.info("Endpoint {} additional OUT connection detected; keeping existing", port); // IN connection (RTCM sink)
} closeQuietly(inConn);
} else { s.setTcpNoDelay(true);
if (!isSocketAlive(inConn)) { s.setKeepAlive(true);
inConn = s; inConn = s;
assigned = true;
LOGGER.debug("Endpoint {} IN connected", port); LOGGER.debug("Endpoint {} IN connected", port);
} else {
LOGGER.info("Endpoint {} additional IN connection detected; keeping existing", port);
}
} }
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn("classifyConnection error: {}", e.getMessage()); LOGGER.warn("classifyConnection error: {}", e.getMessage());
} finally {
if (!assigned) {
closeQuietly(s); closeQuietly(s);
} }
}
}); });
} }
@ -314,7 +292,8 @@ public class RtkClusterService implements ApplicationRunner {
while (true) { while (true) {
try { try {
Socket sink = inConn; Socket sink = inConn;
if (!isSocketAlive(sink)) { if (sink == null || sink.isClosed()) {
// avoid consuming queue when no sink, preventing starvation
Thread.sleep(20); Thread.sleep(20);
continue; continue;
} }
@ -324,12 +303,7 @@ public class RtkClusterService implements ApplicationRunner {
os.write(data); os.write(data);
os.flush(); os.flush();
} catch (IOException e) { } catch (IOException e) {
String msg = e.getMessage(); LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage());
if (msg != null && msg.toLowerCase().contains("closed")) {
LOGGER.debug("Write RTCM failed on {}: {}", port, msg);
} else {
LOGGER.warn("Write RTCM failed on {}: {}", port, msg);
}
closeQuietly(sink); closeQuietly(sink);
inConn = null; inConn = null;
} }
@ -340,10 +314,6 @@ public class RtkClusterService implements ApplicationRunner {
} }
} }
private boolean isSocketAlive(Socket s) {
return s != null && s.isConnected() && !s.isClosed() && !s.isInputShutdown() && !s.isOutputShutdown();
}
private void closeQuietly(Socket s) { private void closeQuietly(Socket s) {
if (s == null) return; if (s == null) return;
try { s.close(); } catch (IOException ignore) {} try { s.close(); } catch (IOException ignore) {}