diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java index e4615615..7a8c34a1 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java @@ -58,9 +58,14 @@ public class RtkrcvConfigService { // If local tcp endpoints are used (e.g., 127.0.0.1:port), force type to tcpcli if (looksLikeTcpEndpoint(profile.getInpstr1Path())) { replaced = replaceValueLine(replaced, "inpstr1-type", "tcpcli"); + // relax timeouts to avoid frequent reconnects on local streams + replaced = replaceValueLine(replaced, "misc-timeout", "300000"); + replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); } if (looksLikeTcpEndpoint(profile.getOutstr1Path())) { replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli"); + replaced = replaceValueLine(replaced, "misc-timeout", "300000"); + replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); } Integer outHeight = profile.getOutHeight(); int heightValue = (outHeight == null) ? 1 : (outHeight == 0 ? 0 : 1); diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index 2d333eae..4212808c 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -154,7 +154,7 @@ public class RtkClusterService implements ApplicationRunner { DeviceEndpoint ep = endpoints.get(deviceId); if (ep == null || hexRtcm == null || hexRtcm.isEmpty()) return; try { - java.util.List frames = RtcmGgaUtil.getRtcms(hexRtcm); + List frames = RtcmGgaUtil.getRtcms(hexRtcm); if (frames != null && !frames.isEmpty()) { for (String f : frames) { if (f != null && !f.isEmpty()) { @@ -179,7 +179,7 @@ public class RtkClusterService implements ApplicationRunner { private volatile ServerSocket server; private volatile Socket inConn; // server writes RTCM to this private volatile Socket outConn; // server reads NMEA from this - private final BlockingQueue rtcmQueue = new LinkedBlockingQueue<>(1024); + private final java.util.concurrent.LinkedBlockingDeque rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); DeviceEndpoint(int port) { this.port = port; @@ -199,7 +199,11 @@ public class RtkClusterService implements ApplicationRunner { void enqueueRtcm(byte[] data) { if (data == null || data.length == 0) return; - rtcmQueue.offer(data); + if (!rtcmQueue.offerLast(data)) { + // queue full: drop oldest to keep stream fresh + rtcmQueue.pollFirst(); + rtcmQueue.offerLast(data); + } } private void acceptLoop() { @@ -220,8 +224,8 @@ public class RtkClusterService implements ApplicationRunner { private void classifyConnection(Socket s) { exec.submit(() -> { try { - // short probe to classify role - s.setSoTimeout(300); + // short probe to classify role (increase to 1000ms to reduce misclassification) + s.setSoTimeout(1000); InputStream in = s.getInputStream(); byte[] probe = new byte[256]; int n = 0; @@ -287,9 +291,13 @@ public class RtkClusterService implements ApplicationRunner { private void dequeueLoop() { while (true) { try { - byte[] data = rtcmQueue.take(); Socket sink = inConn; - if (sink == null || sink.isClosed()) continue; + if (sink == null || sink.isClosed()) { + // avoid consuming queue when no sink, preventing starvation + Thread.sleep(20); + continue; + } + byte[] data = rtcmQueue.take(); try { OutputStream os = sink.getOutputStream(); os.write(data);