fix: 前端查看 rtkrcv output stream

This commit is contained in:
yarnom 2025-10-30 16:57:22 +08:00
parent 20a323940c
commit ec15880413

View File

@ -103,7 +103,7 @@ public class RtkClusterService implements ApplicationRunner {
private void bootstrapDevice(RtkrcvProfile profile, int workPort) throws IOException { private void bootstrapDevice(RtkrcvProfile profile, int workPort) throws IOException {
String deviceId = profile.getDeviceId(); String deviceId = profile.getDeviceId();
// 1) Start endpoint server (if not exists) // 1) Start endpoint server (if not exists)
endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(deviceId, workPort, endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(this, deviceId, workPort,
() -> onOutEstablished(deviceId))); () -> onOutEstablished(deviceId)));
DeviceEndpoint ep = endpoints.get(deviceId); DeviceEndpoint ep = endpoints.get(deviceId);
ep.ensureStarted(); ep.ensureStarted();
@ -251,6 +251,7 @@ public class RtkClusterService implements ApplicationRunner {
} }
static class DeviceEndpoint { static class DeviceEndpoint {
private final RtkClusterService parent;
private final String deviceId; private final String deviceId;
private final int port; private final int port;
private final ExecutorService exec; private final ExecutorService exec;
@ -263,7 +264,8 @@ public class RtkClusterService implements ApplicationRunner {
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
private final Runnable onOutEstablished; private final Runnable onOutEstablished;
DeviceEndpoint(String deviceId, int port, Runnable onOutEstablished) { DeviceEndpoint(RtkClusterService parent, String deviceId, int port, Runnable onOutEstablished) {
this.parent = parent;
this.deviceId = deviceId; this.deviceId = deviceId;
this.port = port; this.port = port;
this.onOutEstablished = onOutEstablished; this.onOutEstablished = onOutEstablished;
@ -357,7 +359,7 @@ public class RtkClusterService implements ApplicationRunner {
} }
String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
LOGGER.info("[OUT:{}] {}", port, preview); LOGGER.info("[OUT:{}] {}", port, preview);
notifyOut(deviceId, preview); if (parent != null) parent.notifyOut(deviceId, preview);
} }
} catch (IOException e) { } catch (IOException e) {
LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage());
@ -469,7 +471,7 @@ public class RtkClusterService implements ApplicationRunner {
if (ep == null) { if (ep == null) {
// derive a stable port for group endpoints to avoid collisions // derive a stable port for group endpoints to avoid collisions
int port = basePort + 10000 + (int) (Math.floorMod(groupId, 50000)); int port = basePort + 10000 + (int) (Math.floorMod(groupId, 50000));
endpoints.computeIfAbsent(key, k -> new DeviceEndpoint(key, port, null)); endpoints.computeIfAbsent(key, k -> new DeviceEndpoint(this, key, port, null));
ep = endpoints.get(key); ep = endpoints.get(key);
ep.ensureStarted(); ep.ensureStarted();
LOGGER.info("Group endpoint ready for {} on 127.0.0.1:{}", key, port); LOGGER.info("Group endpoint ready for {} on 127.0.0.1:{}", key, port);