From 20a323940c9a18f3233686ea4895518bbfe7d7a0 Mon Sep 17 00:00:00 2001 From: yarnom Date: Thu, 30 Oct 2025 16:54:39 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=89=8D=E7=AB=AF=E6=9F=A5=E7=9C=8B=20r?= =?UTF-8?q?tkrcv=20output=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rtkcluster/GroupRtkScheduler.java | 13 ++++++++++- .../rtkcluster/RtkClusterService.java | 22 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java index 83bad13d..fa2fd866 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java @@ -39,7 +39,7 @@ public class GroupRtkScheduler { final Deque outRing = new ArrayDeque<>(1024); Process process; Thread readerThread; - Endpoint endpoint; + java.util.function.Consumer outListener; synchronized void addOut(String line) { if (outRing.size() >= 500) outRing.pollFirst(); outRing.addLast(line); @@ -94,6 +94,11 @@ public class GroupRtkScheduler { } } } + // Subscribe OUT previews into ring for UI snapshot + if (rt.outListener == null) { + rt.outListener = (line) -> { if (line != null) rt.addOut(line); }; + rtkClusterService.addOutListener("group-"+groupId, rt.outListener); + } } catch (Exception ignore) {} ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString()); pb.directory(conf.getParent().toFile()); @@ -129,6 +134,12 @@ public class GroupRtkScheduler { boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS); if (!exited) rt.process.destroyForcibly(); rt.state = "stopped"; + try { + if (rt.outListener != null) { + rtkClusterService.removeOutListener("group-"+groupId, rt.outListener); + rt.outListener = null; + } + } catch (Exception ignore) {} // keep rtkClusterService endpoint for reuse; not closing here resp.put("code", 0); } catch (Exception e) { diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index 04246521..263f75ae 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -61,6 +61,7 @@ public class RtkClusterService implements ApplicationRunner { private final Map endpoints = new ConcurrentHashMap<>(); private final Map processes = new ConcurrentHashMap<>(); private final Map currentSessionIds = new ConcurrentHashMap<>(); + private final Map>> outListeners = new ConcurrentHashMap<>(); private final ExecutorService worker = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "rtkcluster-worker"); @@ -356,6 +357,7 @@ public class RtkClusterService implements ApplicationRunner { } String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); LOGGER.info("[OUT:{}] {}", port, preview); + notifyOut(deviceId, preview); } } catch (IOException e) { LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); @@ -440,6 +442,26 @@ public class RtkClusterService implements ApplicationRunner { } } + private void notifyOut(String key, String line) { + try { + java.util.List> ls = outListeners.get(key); + if (ls != null) { + for (java.util.function.Consumer c : ls) { + try { c.accept(line); } catch (Exception ignore) {} + } + } + } catch (Exception ignore) {} + } + + public void addOutListener(String key, java.util.function.Consumer listener) { + outListeners.computeIfAbsent(key, k -> new java.util.concurrent.CopyOnWriteArrayList<>()).add(listener); + } + + public void removeOutListener(String key, java.util.function.Consumer listener) { + java.util.List> ls = outListeners.get(key); + if (ls != null) ls.remove(listener); + } + // Ensure a dedicated endpoint exists for a group key like "group-" and return its port public int ensureGroupEndpoint(long groupId) { String key = "group-" + groupId;