fix: 前端查看 rtkrcv output stream

This commit is contained in:
yarnom 2025-10-30 16:54:39 +08:00
parent da18e9a495
commit 20a323940c
2 changed files with 34 additions and 1 deletions

View File

@ -39,7 +39,7 @@ public class GroupRtkScheduler {
final Deque<String> outRing = new ArrayDeque<>(1024); final Deque<String> outRing = new ArrayDeque<>(1024);
Process process; Process process;
Thread readerThread; Thread readerThread;
Endpoint endpoint; java.util.function.Consumer<String> outListener;
synchronized void addOut(String line) { synchronized void addOut(String line) {
if (outRing.size() >= 500) outRing.pollFirst(); if (outRing.size() >= 500) outRing.pollFirst();
outRing.addLast(line); outRing.addLast(line);
@ -94,6 +94,11 @@ public class GroupRtkScheduler {
} }
} }
} }
// Subscribe OUT previews into ring for UI snapshot
if (rt.outListener == null) {
rt.outListener = (line) -> { if (line != null) rt.addOut(line); };
rtkClusterService.addOutListener("group-"+groupId, rt.outListener);
}
} catch (Exception ignore) {} } catch (Exception ignore) {}
ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString()); ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString());
pb.directory(conf.getParent().toFile()); pb.directory(conf.getParent().toFile());
@ -129,6 +134,12 @@ public class GroupRtkScheduler {
boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS); boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS);
if (!exited) rt.process.destroyForcibly(); if (!exited) rt.process.destroyForcibly();
rt.state = "stopped"; rt.state = "stopped";
try {
if (rt.outListener != null) {
rtkClusterService.removeOutListener("group-"+groupId, rt.outListener);
rt.outListener = null;
}
} catch (Exception ignore) {}
// keep rtkClusterService endpoint for reuse; not closing here // keep rtkClusterService endpoint for reuse; not closing here
resp.put("code", 0); resp.put("code", 0);
} catch (Exception e) { } catch (Exception e) {

View File

@ -61,6 +61,7 @@ public class RtkClusterService implements ApplicationRunner {
private final Map<String, DeviceEndpoint> endpoints = new ConcurrentHashMap<>(); private final Map<String, DeviceEndpoint> endpoints = new ConcurrentHashMap<>();
private final Map<String, Process> processes = new ConcurrentHashMap<>(); private final Map<String, Process> processes = new ConcurrentHashMap<>();
private final Map<String, Long> currentSessionIds = new ConcurrentHashMap<>(); private final Map<String, Long> currentSessionIds = new ConcurrentHashMap<>();
private final Map<String, java.util.List<java.util.function.Consumer<String>>> outListeners = new ConcurrentHashMap<>();
private final ExecutorService worker = Executors.newCachedThreadPool(r -> { private final ExecutorService worker = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "rtkcluster-worker"); Thread t = new Thread(r, "rtkcluster-worker");
@ -356,6 +357,7 @@ public class RtkClusterService implements ApplicationRunner {
} }
String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
LOGGER.info("[OUT:{}] {}", port, preview); LOGGER.info("[OUT:{}] {}", port, preview);
notifyOut(deviceId, preview);
} }
} catch (IOException e) { } catch (IOException e) {
LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage());
@ -440,6 +442,26 @@ public class RtkClusterService implements ApplicationRunner {
} }
} }
private void notifyOut(String key, String line) {
try {
java.util.List<java.util.function.Consumer<String>> ls = outListeners.get(key);
if (ls != null) {
for (java.util.function.Consumer<String> c : ls) {
try { c.accept(line); } catch (Exception ignore) {}
}
}
} catch (Exception ignore) {}
}
public void addOutListener(String key, java.util.function.Consumer<String> listener) {
outListeners.computeIfAbsent(key, k -> new java.util.concurrent.CopyOnWriteArrayList<>()).add(listener);
}
public void removeOutListener(String key, java.util.function.Consumer<String> listener) {
java.util.List<java.util.function.Consumer<String>> ls = outListeners.get(key);
if (ls != null) ls.remove(listener);
}
// Ensure a dedicated endpoint exists for a group key like "group-<id>" and return its port // Ensure a dedicated endpoint exists for a group key like "group-<id>" and return its port
public int ensureGroupEndpoint(long groupId) { public int ensureGroupEndpoint(long groupId) {
String key = "group-" + groupId; String key = "group-" + groupId;