feat: 修改发布逻辑

This commit is contained in:
yarnom 2025-10-29 13:12:28 +08:00
parent 0ef993f9f9
commit cc399deacc
3 changed files with 36 additions and 100 deletions

View File

@ -62,7 +62,10 @@ public class RtkrcvConfigService {
replaced = replaceValueLine(replaced, "misc-timeout", "300000");
replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
}
if (looksLikeTcpEndpoint(profile.getOutstr1Path())) {
// For outstr1, if path looks like file, force type to file; if it looks like tcp, force tcpcli
if (looksLikeFilePath(profile.getOutstr1Path())) {
replaced = replaceValueLine(replaced, "outstr1-type", "file");
} else if (looksLikeTcpEndpoint(profile.getOutstr1Path())) {
replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli");
replaced = replaceValueLine(replaced, "misc-timeout", "300000");
replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
@ -113,6 +116,12 @@ public class RtkrcvConfigService {
return p.contains(":") && !p.contains("@") && !p.contains("/");
}
private boolean looksLikeFilePath(String path) {
if (path == null) return false;
String p = path.trim();
return p.startsWith("/") || p.startsWith("./") || p.startsWith("../");
}
/**
* Replace the value of a line like: "key = value # comment" while preserving spacing and comment.
*/

View File

@ -26,10 +26,10 @@ import java.util.*;
import java.util.concurrent.*;
/**
* RtkCluster: per-device TCP server on a single port number for both IN (RTCM to rtkrcv) and OUT (NMEA from rtkrcv).
* RtkCluster: per-device TCP server that ONLY sends RTCM to rtkrcv (single purpose).
* - For each deviceId, we create a local server bound to 127.0.0.1:workPort.
* - rtkrcv connects twice to the same port: one connection will be classified as OUT (it sends data quickly),
* the other as IN (no initial data; we will write RTCM to it).
* - rtkrcv (tcpcli) connects to this port to receive RTCM; we do not read solution via TCP anymore.
* - rtkrcv writes its solution to file (configured via outstr1-*).
*/
@Service
public class RtkClusterService implements ApplicationRunner {
@ -84,10 +84,11 @@ public class RtkClusterService implements ApplicationRunner {
DeviceEndpoint ep = endpoints.get(deviceId);
ep.ensureStarted();
// 2) Update profile inp/out to local cluster port
// 2) Update profile inpstr1 to local cluster port, and outstr1 to per-device file
String localPath = "127.0.0.1:" + workPort;
profile.setInpstr1Path(localPath);
profile.setOutstr1Path(localPath);
String outFile = rtkWorkDir + "/" + deviceId + "_llh.log";
profile.setOutstr1Path(outFile);
profileMapper.updateById(profile);
// 3) Generate config and start rtkrcv
@ -177,12 +178,8 @@ public class RtkClusterService implements ApplicationRunner {
private final ExecutorService exec;
private volatile boolean started = false;
private volatile ServerSocket server;
// 命名按你的语义用于日志呈现
// RTKRCV-OUT = rtkrcv -> server我们读取解算/NMEA
// RTKRCV-IN = server -> rtkrcv我们向其发送RTCM
private volatile Socket inConn; // source we READ (solution/NMEA)
private volatile Socket outConn; // sink we WRITE RTCM
private final java.util.Set<Socket> liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>());
// Single sink connection where we WRITE RTCM to rtkrcv tcpcli
private volatile Socket sink;
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
DeviceEndpoint(int port) {
@ -224,99 +221,29 @@ public class RtkClusterService implements ApplicationRunner {
LOGGER.error("Device endpoint on {} stopped: {}", port, e.getMessage(), e);
}
}
private void onAccepted(Socket s) {
exec.submit(() -> {
try {
s.setTcpNoDelay(true);
s.setKeepAlive(true);
} catch (Exception ignore) {}
liveConns.add(s);
LOGGER.debug("Endpoint {} new connection (live={})", port, liveConns.size());
// Start a reader; the first socket that actually sends data and looks like solution/NTRIP becomes IN.
startReader(s);
// Try to set OUT candidate if missing (the sink for RTCM)
ensureOutCandidate();
// Always replace previous sink with the latest connection
Socket prev = sink;
sink = s;
if (prev != null && prev != s) {
closeQuietly(prev);
}
LOGGER.info("Endpoint {} sink connected from {}", port, s.getRemoteSocketAddress());
});
}
private void startReader(Socket s) {
exec.submit(() -> {
boolean anyRead = false;
try (InputStream in = s.getInputStream()) {
byte[] buf = new byte[2048];
int read;
while ((read = in.read(buf)) != -1) {
if (!anyRead) {
anyRead = true;
String head = new String(buf, 0, Math.min(read, 256), StandardCharsets.US_ASCII);
boolean looksHttpGet = head.startsWith("GET ") || head.contains("RTKLIB/");
boolean looksSolution = head.startsWith("% ") || head.startsWith("$GP") || head.startsWith("$GN") || head.startsWith("$GPGGA") || head.startsWith("$GNGGA");
if (looksHttpGet || looksSolution) {
// 这是 rtkrcv -> 我们 的连接RTKRCV-OUT我们读取
inConn = s;
if (looksHttpGet) {
// 若像 NTRIP GET请求一个最小应答保持连接
try {
OutputStream os = s.getOutputStream();
String resp = head.contains("HTTP")
? "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n"
: "ICY 200 OK\r\n\r\n";
os.write(resp.getBytes(StandardCharsets.US_ASCII));
os.flush();
LOGGER.info("[OUT:{}] handshake responded: {}", port, resp.trim());
} catch (Exception ignore) {}
}
// OUT 不能与 IN 相同
if (outConn == s) outConn = null;
LOGGER.info("Endpoint {} RTKRCV OUT established", port);
}
}
String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
if (inConn == s) {
LOGGER.info("[OUT:{}] {}", port, preview);
} else if (outConn == s) {
LOGGER.info("[IN:{}] {}", port, preview);
} else {
LOGGER.info("[UNK:{}] {}", port, preview);
}
}
} catch (IOException e) {
LOGGER.debug("Connection reader closed on {}: {}", port, e.getMessage());
} finally {
liveConns.remove(s);
if (inConn == s) { inConn = null; LOGGER.info("Endpoint {} RTKRCV OUT disconnected", port); }
if (outConn == s) { outConn = null; LOGGER.info("Endpoint {} RTKRCV IN disconnected", port); }
closeQuietly(s);
ensureOutCandidate();
}
});
}
private void ensureOutCandidate() {
// 如果当前 OUT 不可用或与 IN 相同则从存活连接里挑选一个非 IN 的作为 OUTRTKRCV-INRTCM 写入端
Socket current = outConn;
if (current != null && !current.isClosed() && current != inConn) return;
for (Socket c : liveConns) {
if (c != null && !c.isClosed() && c != inConn) {
outConn = c;
LOGGER.debug("Endpoint {} RTKRCV IN set (live={})", port, liveConns.size());
return;
}
}
outConn = null;
}
private void dequeueLoop() {
byte[] pending = null;
while (true) {
try {
Socket sink = outConn;
if (sink == null || sink.isClosed() || sink == inConn) {
// 等待有效的 RTKRCV-IN 连接我们往其写 RTCM且不能与 RTKRCV-OUT 相同
LOGGER.info("Endpoint {} waiting for RTKRCV IN to be ready before sending RTCM", port);
ensureOutCandidate();
Socket s = sink;
if (s == null || s.isClosed()) {
LOGGER.info("Endpoint {} waiting for tcpcli to connect before sending RTCM", port);
Thread.sleep(50);
continue;
}
@ -325,14 +252,14 @@ public class RtkClusterService implements ApplicationRunner {
if (pending == null) continue;
}
try {
OutputStream os = sink.getOutputStream();
OutputStream os = s.getOutputStream();
os.write(pending);
os.flush();
pending = null; // sent successfully
} catch (IOException e) {
LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage());
closeQuietly(sink);
outConn = null;
closeQuietly(s);
if (sink == s) sink = null;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

View File

@ -1,4 +1,4 @@
pos1-posmode =single # (0:single,1:dgps,2:kinematic,3:static,4:static-start,5:movingbase,6:fixed,7:ppp-kine,8:ppp-static,9:ppp-fixed)
pos1-posmode =static # (0:single,1:dgps,2:kinematic,3:static,4:static-start,5:movingbase,6:fixed,7:ppp-kine,8:ppp-static,9:ppp-fixed)
pos1-frequency =l1+l2+l5 # (1:l1,2:l1+l2,3:l1+l2+l5,4:l1+l2+l5+l6)
pos1-soltype =forward # (0:forward,1:backward,2:combined,3:combined-nophasereset)
pos1-elmask =10 # (deg)
@ -125,10 +125,10 @@ file-solstatfile =
file-tracefile =
#
inpstr1-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)
inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)
inpstr2-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)
inpstr3-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)
inpstr1-path =beidou:29832611@8.134.185.53:8001/6539837
inpstr1-path =127.0.0.1:24001
#inpstr2-path =cedr25866:fyx25455@120.253.226.97:8001/RTCM33_GRCEJ
inpstr2-path =ytcors14847:fyx25943@gnss.ytcors.cn:8003/RTCM33GRCEJpro
#inpstr2-path =beidou:29832611@192.168.100.206:2101/8507903
@ -139,9 +139,9 @@ inpstr3-format =rtcm3 # (0:rtcm2,1:rtcm3,2:oem4,4:ubx,5:swift,6:hemis,7
inpstr2-nmeareq =single # (0:off,1:latlon,2:single)
inpstr2-nmealat = # (deg)
inpstr2-nmealon = # (deg)
outstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)
outstr1-type =file # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)
outstr2-type =off # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)
outstr1-path =127.0.0.1:20001
outstr1-path =/opt/rtk/temp/temp_llh.log
outstr2-path =
outstr1-format =llh # (0:llh,1:xyz,2:enu,3:nmea:4:stat)
outstr2-format = # (0:llh,1:xyz,2:enu,3:nmea:4:stat)