diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java index 7a8c34a1..459e07cf 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/ntrip/RtkrcvConfigService.java @@ -62,7 +62,10 @@ public class RtkrcvConfigService { replaced = replaceValueLine(replaced, "misc-timeout", "300000"); replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); } - if (looksLikeTcpEndpoint(profile.getOutstr1Path())) { + // For outstr1, if path looks like file, force type to file; if it looks like tcp, force tcpcli + if (looksLikeFilePath(profile.getOutstr1Path())) { + replaced = replaceValueLine(replaced, "outstr1-type", "file"); + } else if (looksLikeTcpEndpoint(profile.getOutstr1Path())) { replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli"); replaced = replaceValueLine(replaced, "misc-timeout", "300000"); replaced = replaceValueLine(replaced, "misc-reconnect", "3000"); @@ -113,6 +116,12 @@ public class RtkrcvConfigService { return p.contains(":") && !p.contains("@") && !p.contains("/"); } + private boolean looksLikeFilePath(String path) { + if (path == null) return false; + String p = path.trim(); + return p.startsWith("/") || p.startsWith("./") || p.startsWith("../"); + } + /** * Replace the value of a line like: "key = value # comment" while preserving spacing and comment. */ diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java index 57c22beb..5e2a8a1b 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/rtkcluster/RtkClusterService.java @@ -26,10 +26,10 @@ import java.util.*; import java.util.concurrent.*; /** - * RtkCluster: per-device TCP server on a single port number for both IN (RTCM to rtkrcv) and OUT (NMEA from rtkrcv). + * RtkCluster: per-device TCP server that ONLY sends RTCM to rtkrcv (single purpose). * - For each deviceId, we create a local server bound to 127.0.0.1:workPort. - * - rtkrcv connects twice to the same port: one connection will be classified as OUT (it sends data quickly), - * the other as IN (no initial data; we will write RTCM to it). + * - rtkrcv (tcpcli) connects to this port to receive RTCM; we do not read solution via TCP anymore. + * - rtkrcv writes its solution to file (configured via outstr1-*). */ @Service public class RtkClusterService implements ApplicationRunner { @@ -84,10 +84,11 @@ public class RtkClusterService implements ApplicationRunner { DeviceEndpoint ep = endpoints.get(deviceId); ep.ensureStarted(); - // 2) Update profile inp/out to local cluster port + // 2) Update profile inpstr1 to local cluster port, and outstr1 to per-device file String localPath = "127.0.0.1:" + workPort; profile.setInpstr1Path(localPath); - profile.setOutstr1Path(localPath); + String outFile = rtkWorkDir + "/" + deviceId + "_llh.log"; + profile.setOutstr1Path(outFile); profileMapper.updateById(profile); // 3) Generate config and start rtkrcv @@ -177,12 +178,8 @@ public class RtkClusterService implements ApplicationRunner { private final ExecutorService exec; private volatile boolean started = false; private volatile ServerSocket server; - // 命名按你的语义(用于日志呈现): - // RTKRCV-OUT = rtkrcv -> server(我们读取解算/NMEA) - // RTKRCV-IN = server -> rtkrcv(我们向其发送RTCM) - private volatile Socket inConn; // source we READ (solution/NMEA) - private volatile Socket outConn; // sink we WRITE RTCM - private final java.util.Set liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>()); + // Single sink connection where we WRITE RTCM to rtkrcv tcpcli + private volatile Socket sink; private final java.util.concurrent.LinkedBlockingDeque rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024); DeviceEndpoint(int port) { @@ -224,99 +221,29 @@ public class RtkClusterService implements ApplicationRunner { LOGGER.error("Device endpoint on {} stopped: {}", port, e.getMessage(), e); } } - private void onAccepted(Socket s) { exec.submit(() -> { try { s.setTcpNoDelay(true); s.setKeepAlive(true); } catch (Exception ignore) {} - liveConns.add(s); - LOGGER.debug("Endpoint {} new connection (live={})", port, liveConns.size()); - // Start a reader; the first socket that actually sends data and looks like solution/NTRIP becomes IN. - startReader(s); - // Try to set OUT candidate if missing (the sink for RTCM) - ensureOutCandidate(); - }); - } - - private void startReader(Socket s) { - exec.submit(() -> { - boolean anyRead = false; - try (InputStream in = s.getInputStream()) { - byte[] buf = new byte[2048]; - int read; - while ((read = in.read(buf)) != -1) { - if (!anyRead) { - anyRead = true; - String head = new String(buf, 0, Math.min(read, 256), StandardCharsets.US_ASCII); - boolean looksHttpGet = head.startsWith("GET ") || head.contains("RTKLIB/"); - boolean looksSolution = head.startsWith("% ") || head.startsWith("$GP") || head.startsWith("$GN") || head.startsWith("$GPGGA") || head.startsWith("$GNGGA"); - - if (looksHttpGet || looksSolution) { - // 这是 rtkrcv -> 我们 的连接(RTKRCV-OUT:我们读取) - inConn = s; - if (looksHttpGet) { - // 若像 NTRIP GET,请求一个最小应答保持连接 - try { - OutputStream os = s.getOutputStream(); - String resp = head.contains("HTTP") - ? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n" - : "ICY 200 OK\r\n\r\n"; - os.write(resp.getBytes(StandardCharsets.US_ASCII)); - os.flush(); - LOGGER.info("[OUT:{}] handshake responded: {}", port, resp.trim()); - } catch (Exception ignore) {} - } - // OUT 不能与 IN 相同 - if (outConn == s) outConn = null; - LOGGER.info("Endpoint {} RTKRCV OUT established", port); - } - } - String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n"); - if (inConn == s) { - LOGGER.info("[OUT:{}] {}", port, preview); - } else if (outConn == s) { - LOGGER.info("[IN:{}] {}", port, preview); - } else { - LOGGER.info("[UNK:{}] {}", port, preview); - } - } - } catch (IOException e) { - LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage()); - } finally { - liveConns.remove(s); - if (inConn == s) { inConn = null; LOGGER.info("Endpoint {} RTKRCV OUT disconnected", port); } - if (outConn == s) { outConn = null; LOGGER.info("Endpoint {} RTKRCV IN disconnected", port); } - closeQuietly(s); - ensureOutCandidate(); + // Always replace previous sink with the latest connection + Socket prev = sink; + sink = s; + if (prev != null && prev != s) { + closeQuietly(prev); } + LOGGER.info("Endpoint {} sink connected from {}", port, s.getRemoteSocketAddress()); }); } - private void ensureOutCandidate() { - // 如果当前 OUT 不可用或与 IN 相同,则从存活连接里挑选一个非 IN 的作为 OUT(RTKRCV-IN:RTCM 写入端) - Socket current = outConn; - if (current != null && !current.isClosed() && current != inConn) return; - for (Socket c : liveConns) { - if (c != null && !c.isClosed() && c != inConn) { - outConn = c; - LOGGER.debug("Endpoint {} RTKRCV IN set (live={})", port, liveConns.size()); - return; - } - } - outConn = null; - } - private void dequeueLoop() { byte[] pending = null; while (true) { try { - Socket sink = outConn; - if (sink == null || sink.isClosed() || sink == inConn) { - // 等待有效的 RTKRCV-IN 连接(我们往其写 RTCM),且不能与 RTKRCV-OUT 相同 - LOGGER.info("Endpoint {} waiting for RTKRCV IN to be ready before sending RTCM", port); - ensureOutCandidate(); + Socket s = sink; + if (s == null || s.isClosed()) { + LOGGER.info("Endpoint {} waiting for tcpcli to connect before sending RTCM", port); Thread.sleep(50); continue; } @@ -325,14 +252,14 @@ public class RtkClusterService implements ApplicationRunner { if (pending == null) continue; } try { - OutputStream os = sink.getOutputStream(); + OutputStream os = s.getOutputStream(); os.write(pending); os.flush(); pending = null; // sent successfully } catch (IOException e) { LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage()); - closeQuietly(sink); - outConn = null; + closeQuietly(s); + if (sink == s) sink = null; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/sec-beidou-rtcm/src/main/resources/rtkrcv_default.conf b/sec-beidou-rtcm/src/main/resources/rtkrcv_default.conf index 517496de..707caeaa 100644 --- a/sec-beidou-rtcm/src/main/resources/rtkrcv_default.conf +++ b/sec-beidou-rtcm/src/main/resources/rtkrcv_default.conf @@ -1,4 +1,4 @@ -pos1-posmode =single # (0:single,1:dgps,2:kinematic,3:static,4:static-start,5:movingbase,6:fixed,7:ppp-kine,8:ppp-static,9:ppp-fixed) +pos1-posmode =static # (0:single,1:dgps,2:kinematic,3:static,4:static-start,5:movingbase,6:fixed,7:ppp-kine,8:ppp-static,9:ppp-fixed) pos1-frequency =l1+l2+l5 # (1:l1,2:l1+l2,3:l1+l2+l5,4:l1+l2+l5+l6) pos1-soltype =forward # (0:forward,1:backward,2:combined,3:combined-nophasereset) pos1-elmask =10 # (deg) @@ -125,10 +125,10 @@ file-solstatfile = file-tracefile = # -inpstr1-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http) +inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http) inpstr2-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http) inpstr3-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http) -inpstr1-path =beidou:29832611@8.134.185.53:8001/6539837 +inpstr1-path =127.0.0.1:24001 #inpstr2-path =cedr25866:fyx25455@120.253.226.97:8001/RTCM33_GRCEJ inpstr2-path =ytcors14847:fyx25943@gnss.ytcors.cn:8003/RTCM33GRCEJpro #inpstr2-path =beidou:29832611@192.168.100.206:2101/8507903 @@ -139,12 +139,12 @@ inpstr3-format =rtcm3 # (0:rtcm2,1:rtcm3,2:oem4,4:ubx,5:swift,6:hemis,7 inpstr2-nmeareq =single # (0:off,1:latlon,2:single) inpstr2-nmealat = # (deg) inpstr2-nmealon = # (deg) -outstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas) +outstr1-type =file # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas) outstr2-type =off # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas) -outstr1-path =127.0.0.1:20001 +outstr1-path =/opt/rtk/temp/temp_llh.log outstr2-path = outstr1-format =llh # (0:llh,1:xyz,2:enu,3:nmea:4:stat) -outstr2-format = # (0:llh,1:xyz,2:enu,3:nmea:4:stat) +outstr2-format = # (0:llh,1:xyz,2:enu,3:nmea:4:stat) logstr1-type =0 # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas) logstr2-type =off # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas) logstr3-type =off # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)