Compare commits

..

4 Commits

Author SHA1 Message Date
f3aaea78a3 fix: 排查TCP SERVER 发布RTCM消息的断联问题 2025-10-29 10:37:09 +08:00
a28251d787 Revert "fix: 尝试修复断联问题"
This reverts commit e07250637e5a7777e007256f692d6874c86ad656.
2025-10-29 10:34:58 +08:00
c252b11d3c Revert "fix: 尝试修复断联问题"
This reverts commit 0b7789838bb6cc63d4d6775ef764b7073d451ca6.
2025-10-29 10:34:55 +08:00
2a9ab127c5 Revert "fix: 尝试修复断联问题"
This reverts commit 76831fb1856080b9725403686753a9f4c57c1fed.
2025-10-29 10:34:52 +08:00
2 changed files with 45 additions and 100 deletions

View File

@ -54,8 +54,7 @@ public class RtkrcvConfigService {
replaced = replaceValueLine(replaced, "inpstr1-path", nz(profile.getInpstr1Path())); replaced = replaceValueLine(replaced, "inpstr1-path", nz(profile.getInpstr1Path()));
replaced = replaceValueLine(replaced, "inpstr2-path", nz(profile.getInpstr2Path())); replaced = replaceValueLine(replaced, "inpstr2-path", nz(profile.getInpstr2Path()));
replaced = replaceValueLine(replaced, "inpstr3-path", nz(profile.getInpstr3Path())); replaced = replaceValueLine(replaced, "inpstr3-path", nz(profile.getInpstr3Path()));
String resolvedOutPath = resolveOutPath(profile); replaced = replaceValueLine(replaced, "outstr1-path", nz(profile.getOutstr1Path()));
replaced = replaceValueLine(replaced, "outstr1-path", resolvedOutPath);
// If local tcp endpoints are used (e.g., 127.0.0.1:port), force type to tcpcli // If local tcp endpoints are used (e.g., 127.0.0.1:port), force type to tcpcli
if (looksLikeTcpEndpoint(profile.getInpstr1Path())) { if (looksLikeTcpEndpoint(profile.getInpstr1Path())) {
replaced = replaceValueLine(replaced, "inpstr1-type", "tcpcli"); replaced = replaceValueLine(replaced, "inpstr1-type", "tcpcli");
@ -63,7 +62,7 @@ public class RtkrcvConfigService {
replaced = replaceValueLine(replaced, "misc-timeout", "300000"); replaced = replaceValueLine(replaced, "misc-timeout", "300000");
replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
} }
if (looksLikeTcpEndpoint(resolvedOutPath)) { if (looksLikeTcpEndpoint(profile.getOutstr1Path())) {
replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli"); replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli");
replaced = replaceValueLine(replaced, "misc-timeout", "300000"); replaced = replaceValueLine(replaced, "misc-timeout", "300000");
replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
@ -107,27 +106,6 @@ public class RtkrcvConfigService {
return s == null ? "" : s; return s == null ? "" : s;
} }
private String resolveOutPath(RtkrcvProfile profile) {
String candidate = nz(profile.getOutstr1Path());
if (!candidate.isEmpty()) {
return candidate;
}
String inPath = nz(profile.getInpstr1Path());
if (looksLikeTcpEndpoint(inPath)) {
String[] parts = inPath.split(":", 2);
if (parts.length == 2) {
String host = parts[0].trim();
try {
int inPort = Integer.parseInt(parts[1].trim());
return host + ":" + (inPort + 1);
} catch (NumberFormatException ignore) {
// fall through and return empty candidate
}
}
}
return candidate;
}
private boolean looksLikeTcpEndpoint(String path) { private boolean looksLikeTcpEndpoint(String path) {
if (path == null) return false; if (path == null) return false;
String p = path.trim(); String p = path.trim();

View File

@ -50,7 +50,6 @@ public class RtkClusterService implements ApplicationRunner {
private RtkrcvConfigService configService; private RtkrcvConfigService configService;
private final Map<String, DeviceEndpoint> endpoints = new ConcurrentHashMap<>(); private final Map<String, DeviceEndpoint> endpoints = new ConcurrentHashMap<>();
private final Map<String, DeviceEndpoint> outEndpoints = new ConcurrentHashMap<>();
private final Map<String, Process> processes = new ConcurrentHashMap<>(); private final Map<String, Process> processes = new ConcurrentHashMap<>();
private final ExecutorService worker = Executors.newCachedThreadPool(r -> { private final ExecutorService worker = Executors.newCachedThreadPool(r -> {
@ -67,11 +66,10 @@ public class RtkClusterService implements ApplicationRunner {
LOGGER.info("No rtkrcv_profile records found. RtkCluster idle."); LOGGER.info("No rtkrcv_profile records found. RtkCluster idle.");
return; return;
} }
int slot = 0; int slot = 1;
for (RtkrcvProfile profile : profiles) { for (RtkrcvProfile profile : profiles) {
try { try {
int port = basePort + slot; int port = basePort + slot++;
slot += 2; // reserve a pair (IN port, OUT port)
bootstrapDevice(profile, port); bootstrapDevice(profile, port);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("Bootstrap device {} failed: {}", profile.getDeviceId(), e.getMessage(), e); LOGGER.error("Bootstrap device {} failed: {}", profile.getDeviceId(), e.getMessage(), e);
@ -83,17 +81,13 @@ public class RtkClusterService implements ApplicationRunner {
String deviceId = profile.getDeviceId(); String deviceId = profile.getDeviceId();
// 1) Start endpoint server (if not exists) // 1) Start endpoint server (if not exists)
endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort)); endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort));
outEndpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort + 1, true));
DeviceEndpoint ep = endpoints.get(deviceId); DeviceEndpoint ep = endpoints.get(deviceId);
DeviceEndpoint outEp = outEndpoints.get(deviceId); ep.ensureStarted();
ep.ensureInMode();
outEp.ensureOutMode();
// 2) Update profile inp/out to local cluster port // 2) Update profile inp/out to local cluster port
String localInPath = "127.0.0.1:" + workPort; String localPath = "127.0.0.1:" + workPort;
String localOutPath = "127.0.0.1:" + (workPort + 1); profile.setInpstr1Path(localPath);
profile.setInpstr1Path(localInPath); profile.setOutstr1Path(localPath);
profile.setOutstr1Path(localOutPath);
profileMapper.updateById(profile); profileMapper.updateById(profile);
// 3) Generate config and start rtkrcv // 3) Generate config and start rtkrcv
@ -109,7 +103,7 @@ public class RtkClusterService implements ApplicationRunner {
session.setUpdatedAt(LocalDateTime.now()); session.setUpdatedAt(LocalDateTime.now());
sessionMapper.insert(session); sessionMapper.insert(session);
startRtkrcv(deviceId, conf.getParent().toString(), conf.toString()); //startRtkrcv(deviceId, conf.getParent().toString(), conf.toString());
} }
private void startRtkrcv(String deviceId, String workDir, String confPath) { private void startRtkrcv(String deviceId, String workDir, String confPath) {
@ -180,7 +174,6 @@ public class RtkClusterService implements ApplicationRunner {
static class DeviceEndpoint { static class DeviceEndpoint {
private final int port; private final int port;
private final boolean outMode;
private final ExecutorService exec; private final ExecutorService exec;
private volatile boolean started = false; private volatile boolean started = false;
private volatile ServerSocket server; private volatile ServerSocket server;
@ -189,12 +182,7 @@ public class RtkClusterService implements ApplicationRunner {
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
DeviceEndpoint(int port) { DeviceEndpoint(int port) {
this(port, false);
}
DeviceEndpoint(int port, boolean outMode) {
this.port = port; this.port = port;
this.outMode = outMode;
this.exec = Executors.newCachedThreadPool(r -> { this.exec = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "rtkcluster-ep-" + this.port); Thread t = new Thread(r, "rtkcluster-ep-" + this.port);
t.setDaemon(true); t.setDaemon(true);
@ -205,28 +193,11 @@ public class RtkClusterService implements ApplicationRunner {
synchronized void ensureStarted() { synchronized void ensureStarted() {
if (started) return; if (started) return;
exec.submit(this::acceptLoop); exec.submit(this::acceptLoop);
if (!outMode) {
exec.submit(this::dequeueLoop); exec.submit(this::dequeueLoop);
}
started = true; started = true;
} }
void ensureInMode() {
if (outMode) {
throw new IllegalStateException("Endpoint is configured as OUT mode: " + port);
}
ensureStarted();
}
void ensureOutMode() {
if (!outMode) {
throw new IllegalStateException("Endpoint is configured as IN mode: " + port);
}
ensureStarted();
}
void enqueueRtcm(byte[] data) { void enqueueRtcm(byte[] data) {
if (outMode) return;
if (data == null || data.length == 0) return; if (data == null || data.length == 0) return;
if (!rtcmQueue.offerLast(data)) { if (!rtcmQueue.offerLast(data)) {
// queue full: drop oldest to keep stream fresh // queue full: drop oldest to keep stream fresh
@ -240,7 +211,7 @@ public class RtkClusterService implements ApplicationRunner {
ss.setReuseAddress(true); ss.setReuseAddress(true);
ss.bind(new InetSocketAddress("127.0.0.1", port)); ss.bind(new InetSocketAddress("127.0.0.1", port));
this.server = ss; this.server = ss;
LOGGER.info("RtkCluster {} endpoint listening on 127.0.0.1:{}", outMode ? "OUT" : "IN", port); LOGGER.info("RtkCluster device endpoint listening on 127.0.0.1:{}", port);
while (true) { while (true) {
Socket s = ss.accept(); Socket s = ss.accept();
classifyConnection(s); classifyConnection(s);
@ -252,50 +223,54 @@ public class RtkClusterService implements ApplicationRunner {
private void classifyConnection(Socket s) { private void classifyConnection(Socket s) {
exec.submit(() -> { exec.submit(() -> {
boolean assigned = false;
try { try {
// short probe to classify role (increase to 1000ms to reduce misclassification)
s.setSoTimeout(1000);
InputStream in = s.getInputStream();
byte[] probe = new byte[256];
int n = 0;
try { n = in.read(probe); } catch (IOException ignore) {}
// restore to blocking mode for steady-state
s.setSoTimeout(0);
if (n > 0 && isLikelyText(probe, n)) {
// OUT connection (NMEA etc.)
closeQuietly(outConn);
s.setTcpNoDelay(true); s.setTcpNoDelay(true);
s.setKeepAlive(true); s.setKeepAlive(true);
if (outMode) {
Socket previous = outConn;
if (isSocketAlive(previous)) {
LOGGER.info("Endpoint {} replacing existing OUT connection", port);
closeQuietly(previous);
} else {
LOGGER.debug("Endpoint {} OUT connected", port);
}
outConn = s; outConn = s;
assigned = true; LOGGER.debug("Endpoint {} OUT connected", port);
pumpOut(s, null, 0); pumpOut(outConn, probe, n);
return;
}
Socket previous = inConn;
if (isSocketAlive(previous)) {
LOGGER.info("Endpoint {} replacing existing IN connection", port);
closeQuietly(previous);
} else { } else {
// IN connection (RTCM sink)
closeQuietly(inConn);
s.setTcpNoDelay(true);
s.setKeepAlive(true);
inConn = s;
LOGGER.debug("Endpoint {} IN connected", port); LOGGER.debug("Endpoint {} IN connected", port);
} }
inConn = s;
assigned = true;
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn("classifyConnection error: {}", e.getMessage()); LOGGER.warn("classifyConnection error: {}", e.getMessage());
} finally {
if (!assigned) {
closeQuietly(s); closeQuietly(s);
} }
}
}); });
} }
private boolean isLikelyText(byte[] buf, int n) {
int printable = 0;
for (int i = 0; i < n; i++) {
int b = buf[i] & 0xFF;
if (b >= 32 && b <= 126) printable++;
}
return printable >= Math.max(1, n - 4); // tolerate some non-printables
}
private void pumpOut(Socket s, byte[] firstBuf, int firstLen) { private void pumpOut(Socket s, byte[] firstBuf, int firstLen) {
exec.submit(() -> { exec.submit(() -> {
try (InputStream in = s.getInputStream()) { try (InputStream in = s.getInputStream()) {
byte[] buf = new byte[2048]; byte[] buf = new byte[2048];
int read; int read;
// deliver first classified bytes if any // deliver first classified bytes if any
if (firstBuf != null && firstLen > 0) { if (firstLen > 0) {
String preview = new String(firstBuf, 0, firstLen, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); String preview = new String(firstBuf, 0, firstLen, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
LOGGER.info("[OUT:{}] {}", port, preview); LOGGER.info("[OUT:{}] {}", port, preview);
} }
@ -317,7 +292,8 @@ public class RtkClusterService implements ApplicationRunner {
while (true) { while (true) {
try { try {
Socket sink = inConn; Socket sink = inConn;
if (!isSocketAlive(sink)) { if (sink == null || sink.isClosed()) {
// avoid consuming queue when no sink, preventing starvation
Thread.sleep(20); Thread.sleep(20);
continue; continue;
} }
@ -327,12 +303,7 @@ public class RtkClusterService implements ApplicationRunner {
os.write(data); os.write(data);
os.flush(); os.flush();
} catch (IOException e) { } catch (IOException e) {
String msg = e.getMessage(); LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage());
if (msg != null && msg.toLowerCase().contains("closed")) {
LOGGER.debug("Write RTCM failed on {}: {}", port, msg);
} else {
LOGGER.warn("Write RTCM failed on {}: {}", port, msg);
}
closeQuietly(sink); closeQuietly(sink);
inConn = null; inConn = null;
} }
@ -343,10 +314,6 @@ public class RtkClusterService implements ApplicationRunner {
} }
} }
private boolean isSocketAlive(Socket s) {
return s != null && s.isConnected() && !s.isClosed() && !s.isInputShutdown() && !s.isOutputShutdown();
}
private void closeQuietly(Socket s) { private void closeQuietly(Socket s) {
if (s == null) return; if (s == null) return;
try { s.close(); } catch (IOException ignore) {} try { s.close(); } catch (IOException ignore) {}