fix: 写入输入和输出的TCP端口

This commit is contained in:
yarnom 2025-10-30 16:23:24 +08:00
parent 1aec89c221
commit 99ddec40b1

View File

@ -37,6 +37,7 @@ public class GroupRtkScheduler {
final Deque<String> outRing = new ArrayDeque<>(1024); final Deque<String> outRing = new ArrayDeque<>(1024);
Process process; Process process;
Thread readerThread; Thread readerThread;
Endpoint endpoint;
synchronized void addOut(String line) { synchronized void addOut(String line) {
if (outRing.size() >= 500) outRing.pollFirst(); if (outRing.size() >= 500) outRing.pollFirst();
outRing.addLast(line); outRing.addLast(line);
@ -64,10 +65,20 @@ public class GroupRtkScheduler {
resp.put("pid", rt.pid); resp.put("state", rt.state); resp.put("pid", rt.pid); resp.put("state", rt.state);
return resp; return resp;
} }
// Build a synthetic profile using group configuration // Ensure local endpoint for capturing OUT stream
if (rt.endpoint == null) {
rt.endpoint = new Endpoint();
rt.endpoint.start();
}
// Build a synthetic profile using group configuration + local OUT endpoints
RtkrcvProfile profile = new RtkrcvProfile(); RtkrcvProfile profile = new RtkrcvProfile();
profile.setDeviceId("group-" + groupId); profile.setDeviceId("group-" + groupId);
profile.setGroupId(groupId); profile.setGroupId(groupId);
String epPath = "127.0.0.1:" + rt.endpoint.getPort();
profile.setOutstr1Path(epPath);
profile.setOutstr2Path(epPath);
// also set inpstr1 to same endpoint to avoid dead tcpcli
profile.setInpstr1Path(epPath);
Path conf = configService.generateConfig(profile); Path conf = configService.generateConfig(profile);
rt.confPath = conf.toString(); rt.confPath = conf.toString();
ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString()); ProcessBuilder pb = new ProcessBuilder(rtkBinary, "-nc", "-o", conf.toString());
@ -104,6 +115,7 @@ public class GroupRtkScheduler {
boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS); boolean exited = rt.process.waitFor(3, java.util.concurrent.TimeUnit.SECONDS);
if (!exited) rt.process.destroyForcibly(); if (!exited) rt.process.destroyForcibly();
rt.state = "stopped"; rt.state = "stopped";
if (rt.endpoint != null) { rt.endpoint.close(); rt.endpoint = null; }
resp.put("code", 0); resp.put("code", 0);
} catch (Exception e) { } catch (Exception e) {
resp.put("code", 1); resp.put("msg", e.getMessage()); resp.put("code", 1); resp.put("msg", e.getMessage());
@ -191,3 +203,73 @@ public class GroupRtkScheduler {
return null; return null;
} }
} }
class Endpoint {
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(Endpoint.class);
private java.net.ServerSocket server;
private Thread acceptThread;
private final java.util.Set<java.net.Socket> sockets = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>());
private volatile boolean running = false;
private int port;
private final java.util.concurrent.ExecutorService exec = java.util.concurrent.Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "rtk-group-ep"); t.setDaemon(true); return t; });
void start() throws IOException {
server = new java.net.ServerSocket(0, 50, java.net.InetAddress.getByName("127.0.0.1"));
port = server.getLocalPort();
running = true;
acceptThread = new Thread(this::acceptLoop, "rtk-ep-accept-"+port);
acceptThread.setDaemon(true);
acceptThread.start();
LOG.info("[endpoint] listening 127.0.0.1:{}", port);
}
int getPort() { return port; }
private void acceptLoop() {
while (running) {
try {
java.net.Socket s = server.accept();
sockets.add(s);
handle(s);
} catch (IOException e) {
if (running) LOG.warn("[endpoint] accept error: {}", e.getMessage());
}
}
}
private void handle(java.net.Socket s) {
exec.submit(() -> {
try {
s.setTcpNoDelay(true);
s.setKeepAlive(true);
InputStream in = s.getInputStream();
OutputStream os = s.getOutputStream();
byte[] buf = new byte[2048];
int read = in.read(buf);
if (read > 0) {
String head = new String(buf, 0, Math.min(read, 256), StandardCharsets.US_ASCII);
if (head.startsWith("GET ") || head.contains("RTKLIB/")) {
String resp = head.contains("HTTP") ? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n" : "ICY 200 OK\r\n\r\n";
os.write(resp.getBytes(StandardCharsets.US_ASCII)); os.flush();
}
// Log first chunk
LOG.info("[endpoint:{}] {}", port, head.replaceAll("\n","\\n"));
}
// Continue pumping to dev-null while connected
while (read != -1) { read = in.read(buf); }
} catch (IOException ignore) {
} finally {
sockets.remove(s);
try { s.close(); } catch (IOException ignore) {}
}
});
}
void close() {
running = false;
try { if (server != null) server.close(); } catch (IOException ignore) {}
for (java.net.Socket s : sockets) { try { s.close(); } catch (IOException ignore) {} }
exec.shutdownNow();
}
}