fix: 尝试修复断联问题

This commit is contained in:
yarnom 2025-10-28 18:18:00 +08:00
parent d2c85d6d3b
commit ee77c07bc2
2 changed files with 20 additions and 7 deletions

View File

@ -58,9 +58,14 @@ public class RtkrcvConfigService {
// If local tcp endpoints are used (e.g., 127.0.0.1:port), force type to tcpcli
if (looksLikeTcpEndpoint(profile.getInpstr1Path())) {
replaced = replaceValueLine(replaced, "inpstr1-type", "tcpcli");
// relax timeouts to avoid frequent reconnects on local streams
replaced = replaceValueLine(replaced, "misc-timeout", "300000");
replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
}
if (looksLikeTcpEndpoint(profile.getOutstr1Path())) {
replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli");
replaced = replaceValueLine(replaced, "misc-timeout", "300000");
replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
}
Integer outHeight = profile.getOutHeight();
int heightValue = (outHeight == null) ? 1 : (outHeight == 0 ? 0 : 1);

View File

@ -154,7 +154,7 @@ public class RtkClusterService implements ApplicationRunner {
DeviceEndpoint ep = endpoints.get(deviceId);
if (ep == null || hexRtcm == null || hexRtcm.isEmpty()) return;
try {
java.util.List<String> frames = RtcmGgaUtil.getRtcms(hexRtcm);
List<String> frames = RtcmGgaUtil.getRtcms(hexRtcm);
if (frames != null && !frames.isEmpty()) {
for (String f : frames) {
if (f != null && !f.isEmpty()) {
@ -179,7 +179,7 @@ public class RtkClusterService implements ApplicationRunner {
private volatile ServerSocket server;
private volatile Socket inConn; // server writes RTCM to this
private volatile Socket outConn; // server reads NMEA from this
private final BlockingQueue<byte[]> rtcmQueue = new LinkedBlockingQueue<>(1024);
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
DeviceEndpoint(int port) {
this.port = port;
@ -199,7 +199,11 @@ public class RtkClusterService implements ApplicationRunner {
void enqueueRtcm(byte[] data) {
if (data == null || data.length == 0) return;
rtcmQueue.offer(data);
if (!rtcmQueue.offerLast(data)) {
// queue full: drop oldest to keep stream fresh
rtcmQueue.pollFirst();
rtcmQueue.offerLast(data);
}
}
private void acceptLoop() {
@ -220,8 +224,8 @@ public class RtkClusterService implements ApplicationRunner {
private void classifyConnection(Socket s) {
exec.submit(() -> {
try {
// short probe to classify role
s.setSoTimeout(300);
// short probe to classify role (increase to 1000ms to reduce misclassification)
s.setSoTimeout(1000);
InputStream in = s.getInputStream();
byte[] probe = new byte[256];
int n = 0;
@ -287,9 +291,13 @@ public class RtkClusterService implements ApplicationRunner {
private void dequeueLoop() {
while (true) {
try {
byte[] data = rtcmQueue.take();
Socket sink = inConn;
if (sink == null || sink.isClosed()) continue;
if (sink == null || sink.isClosed()) {
// avoid consuming queue when no sink, preventing starvation
Thread.sleep(20);
continue;
}
byte[] data = rtcmQueue.take();
try {
OutputStream os = sink.getOutputStream();
os.write(data);