diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java index b7a67354..ed02cf98 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/GroupRtkScheduler.java @@ -37,6 +37,7 @@ public class GroupRtkScheduler { final Deque outRing = new ArrayDeque<>(1024); Process process; Thread readerThread; + Endpoint endpoint; synchronized void addOut(String line) { if (outRing.size() >= 500) outRing.pollFirst(); outRing.addLast(line); @@ -64,10 +65,20 @@ public class GroupRtkScheduler { resp.put("pid", rt.pid); resp.put("state", rt.state); return resp; } - // Build a synthetic profile using group configuration + // Ensure local endpoint for capturing OUT stream + if (rt.endpoint == null) { + rt.endpoint = new Endpoint(); + rt.endpoint.start(); + } + // Build a synthetic profile using group configuration + local OUT endpoints RtkrcvProfile profile = new RtkrcvProfile(); profile.setDeviceId("group-" + groupId); profile.setGroupId(groupId); + String epPath = "127.0.0.1:" + rt.endpoint.getPort(); + profile.setOutstr1Path(epPath); + profile.setOutstr2Path(epPath); + // also set inpstr1 to same endpoint to avoid dead tcpcli + profile.setInpstr1Path(epPath); Path conf = configService.generateConfig(profile); rt.confPath = conf.toString(); ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString()); @@ -104,6 +115,7 @@ public class GroupRtkScheduler { boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS); if (!exited) rt.process.destroyForcibly(); rt.state = "stopped"; + if (rt.endpoint != null) { rt.endpoint.close(); rt.endpoint = null; } resp.put("code", 0); } catch (Exception e) { resp.put("code", 1); resp.put("msg", e.getMessage()); @@ -191,3 +203,73 @@ public class GroupRtkScheduler { return null; } } + +class Endpoint { + private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(Endpoint.class); + private java.net.ServerSocket server; + private Thread acceptThread; + private final java.util.Set sockets = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>()); + private volatile boolean running = false; + private int port; + private final java.util.concurrent.ExecutorService exec = java.util.concurrent.Executors.newCachedThreadPool(r -> { + Thread t = new Thread(r, "rtk-group-ep"); t.setDaemon(true); return t; }); + + void start() throws IOException { + server = new java.net.ServerSocket(0, 50, java.net.InetAddress.getByName("127.0.0.1")); + port = server.getLocalPort(); + running = true; + acceptThread = new Thread(this::acceptLoop, "rtk-ep-accept-"+port); + acceptThread.setDaemon(true); + acceptThread.start(); + LOG.info("[endpoint] listening 127.0.0.1:{}", port); + } + + int getPort() { return port; } + + private void acceptLoop() { + while (running) { + try { + java.net.Socket s = server.accept(); + sockets.add(s); + handle(s); + } catch (IOException e) { + if (running) LOG.warn("[endpoint] accept error: {}", e.getMessage()); + } + } + } + + private void handle(java.net.Socket s) { + exec.submit(() -> { + try { + s.setTcpNoDelay(true); + s.setKeepAlive(true); + InputStream in = s.getInputStream(); + OutputStream os = s.getOutputStream(); + byte[] buf = new byte[2048]; + int read = in.read(buf); + if (read > 0) { + String head = new String(buf, 0, Math.min(read, 256), StandardCharsets.US_ASCII); + if (head.startsWith("GET ") || head.contains("RTKLIB/")) { + String resp = head.contains("HTTP") ? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n" : "ICY 200 OK\r\n\r\n"; + os.write(resp.getBytes(StandardCharsets.US_ASCII)); os.flush(); + } + // Log first chunk + LOG.info("[endpoint:{}] {}", port, head.replaceAll("\n","\\n")); + } + // Continue pumping to dev-null while connected + while (read != -1) { read = in.read(buf); } + } catch (IOException ignore) { + } finally { + sockets.remove(s); + try { s.close(); } catch (IOException ignore) {} + } + }); + } + + void close() { + running = false; + try { if (server != null) server.close(); } catch (IOException ignore) {} + for (java.net.Socket s : sockets) { try { s.close(); } catch (IOException ignore) {} } + exec.shutdownNow(); + } +}