diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index 263f75ae..31545cf2 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -103,7 +103,7 @@ public class RtkClusterService implements ApplicationRunner { private void bootstrapDevice(RtkrcvProfile profile, int workPort) throws IOException { String deviceId = profile.getDeviceId(); // 1) Start endpoint server (if not exists) - endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(deviceId, workPort, + endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(this, deviceId, workPort, () -> onOutEstablished(deviceId))); DeviceEndpoint ep = endpoints.get(deviceId); ep.ensureStarted(); @@ -251,6 +251,7 @@ public class RtkClusterService implements ApplicationRunner { } static class DeviceEndpoint { + private final RtkClusterService parent; private final String deviceId; private final int port; private final ExecutorService exec; @@ -263,7 +264,8 @@ public class RtkClusterService implements ApplicationRunner { private final java.util.concurrent.LinkedBlockingDeque rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); private final Runnable onOutEstablished; - DeviceEndpoint(String deviceId, int port, Runnable onOutEstablished) { + DeviceEndpoint(RtkClusterService parent, String deviceId, int port, Runnable onOutEstablished) { + this.parent = parent; this.deviceId = deviceId; this.port = port; this.onOutEstablished = onOutEstablished; @@ -357,7 +359,7 @@ public class RtkClusterService implements ApplicationRunner { } String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); LOGGER.info("[OUT:{}] {}", port, preview); - notifyOut(deviceId, preview); + if (parent != null) parent.notifyOut(deviceId, preview); } } catch (IOException e) { LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); @@ -469,7 +471,7 @@ public class RtkClusterService implements ApplicationRunner { if (ep == null) { // derive a stable port for group endpoints to avoid collisions int port = basePort + 10000 + (int) (Math.floorMod(groupId, 50000)); - endpoints.computeIfAbsent(key, k -> new DeviceEndpoint(key, port, null)); + endpoints.computeIfAbsent(key, k -> new DeviceEndpoint(this, key, port, null)); ep = endpoints.get(key); ep.ensureStarted(); LOGGER.info("Group endpoint ready for {} on 127.0.0.1:{}", key, port);