Compare commits
4 Commits
76831fb185
...
f3aaea78a3
| Author | SHA1 | Date | |
|---|---|---|---|
| f3aaea78a3 | |||
| a28251d787 | |||
| c252b11d3c | |||
| 2a9ab127c5 |
@ -54,8 +54,7 @@ public class RtkrcvConfigService {
|
|||||||
replaced = replaceValueLine(replaced, "inpstr1-path", nz(profile.getInpstr1Path()));
|
replaced = replaceValueLine(replaced, "inpstr1-path", nz(profile.getInpstr1Path()));
|
||||||
replaced = replaceValueLine(replaced, "inpstr2-path", nz(profile.getInpstr2Path()));
|
replaced = replaceValueLine(replaced, "inpstr2-path", nz(profile.getInpstr2Path()));
|
||||||
replaced = replaceValueLine(replaced, "inpstr3-path", nz(profile.getInpstr3Path()));
|
replaced = replaceValueLine(replaced, "inpstr3-path", nz(profile.getInpstr3Path()));
|
||||||
String resolvedOutPath = resolveOutPath(profile);
|
replaced = replaceValueLine(replaced, "outstr1-path", nz(profile.getOutstr1Path()));
|
||||||
replaced = replaceValueLine(replaced, "outstr1-path", resolvedOutPath);
|
|
||||||
// If local tcp endpoints are used (e.g., 127.0.0.1:port), force type to tcpcli
|
// If local tcp endpoints are used (e.g., 127.0.0.1:port), force type to tcpcli
|
||||||
if (looksLikeTcpEndpoint(profile.getInpstr1Path())) {
|
if (looksLikeTcpEndpoint(profile.getInpstr1Path())) {
|
||||||
replaced = replaceValueLine(replaced, "inpstr1-type", "tcpcli");
|
replaced = replaceValueLine(replaced, "inpstr1-type", "tcpcli");
|
||||||
@ -63,7 +62,7 @@ public class RtkrcvConfigService {
|
|||||||
replaced = replaceValueLine(replaced, "misc-timeout", "300000");
|
replaced = replaceValueLine(replaced, "misc-timeout", "300000");
|
||||||
replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
|
replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
|
||||||
}
|
}
|
||||||
if (looksLikeTcpEndpoint(resolvedOutPath)) {
|
if (looksLikeTcpEndpoint(profile.getOutstr1Path())) {
|
||||||
replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli");
|
replaced = replaceValueLine(replaced, "outstr1-type", "tcpcli");
|
||||||
replaced = replaceValueLine(replaced, "misc-timeout", "300000");
|
replaced = replaceValueLine(replaced, "misc-timeout", "300000");
|
||||||
replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
|
replaced = replaceValueLine(replaced, "misc-reconnect", "3000");
|
||||||
@ -107,27 +106,6 @@ public class RtkrcvConfigService {
|
|||||||
return s == null ? "" : s;
|
return s == null ? "" : s;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String resolveOutPath(RtkrcvProfile profile) {
|
|
||||||
String candidate = nz(profile.getOutstr1Path());
|
|
||||||
if (!candidate.isEmpty()) {
|
|
||||||
return candidate;
|
|
||||||
}
|
|
||||||
String inPath = nz(profile.getInpstr1Path());
|
|
||||||
if (looksLikeTcpEndpoint(inPath)) {
|
|
||||||
String[] parts = inPath.split(":", 2);
|
|
||||||
if (parts.length == 2) {
|
|
||||||
String host = parts[0].trim();
|
|
||||||
try {
|
|
||||||
int inPort = Integer.parseInt(parts[1].trim());
|
|
||||||
return host + ":" + (inPort + 1);
|
|
||||||
} catch (NumberFormatException ignore) {
|
|
||||||
// fall through and return empty candidate
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return candidate;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean looksLikeTcpEndpoint(String path) {
|
private boolean looksLikeTcpEndpoint(String path) {
|
||||||
if (path == null) return false;
|
if (path == null) return false;
|
||||||
String p = path.trim();
|
String p = path.trim();
|
||||||
|
|||||||
@ -50,7 +50,6 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
private RtkrcvConfigService configService;
|
private RtkrcvConfigService configService;
|
||||||
|
|
||||||
private final Map<String, DeviceEndpoint> endpoints = new ConcurrentHashMap<>();
|
private final Map<String, DeviceEndpoint> endpoints = new ConcurrentHashMap<>();
|
||||||
private final Map<String, DeviceEndpoint> outEndpoints = new ConcurrentHashMap<>();
|
|
||||||
private final Map<String, Process> processes = new ConcurrentHashMap<>();
|
private final Map<String, Process> processes = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final ExecutorService worker = Executors.newCachedThreadPool(r -> {
|
private final ExecutorService worker = Executors.newCachedThreadPool(r -> {
|
||||||
@ -67,11 +66,10 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
LOGGER.info("No rtkrcv_profile records found. RtkCluster idle.");
|
LOGGER.info("No rtkrcv_profile records found. RtkCluster idle.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int slot = 0;
|
int slot = 1;
|
||||||
for (RtkrcvProfile profile : profiles) {
|
for (RtkrcvProfile profile : profiles) {
|
||||||
try {
|
try {
|
||||||
int port = basePort + slot;
|
int port = basePort + slot++;
|
||||||
slot += 2; // reserve a pair (IN port, OUT port)
|
|
||||||
bootstrapDevice(profile, port);
|
bootstrapDevice(profile, port);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("Bootstrap device {} failed: {}", profile.getDeviceId(), e.getMessage(), e);
|
LOGGER.error("Bootstrap device {} failed: {}", profile.getDeviceId(), e.getMessage(), e);
|
||||||
@ -83,17 +81,13 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
String deviceId = profile.getDeviceId();
|
String deviceId = profile.getDeviceId();
|
||||||
// 1) Start endpoint server (if not exists)
|
// 1) Start endpoint server (if not exists)
|
||||||
endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort));
|
endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort));
|
||||||
outEndpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort + 1, true));
|
|
||||||
DeviceEndpoint ep = endpoints.get(deviceId);
|
DeviceEndpoint ep = endpoints.get(deviceId);
|
||||||
DeviceEndpoint outEp = outEndpoints.get(deviceId);
|
ep.ensureStarted();
|
||||||
ep.ensureInMode();
|
|
||||||
outEp.ensureOutMode();
|
|
||||||
|
|
||||||
// 2) Update profile inp/out to local cluster port
|
// 2) Update profile inp/out to local cluster port
|
||||||
String localInPath = "127.0.0.1:" + workPort;
|
String localPath = "127.0.0.1:" + workPort;
|
||||||
String localOutPath = "127.0.0.1:" + (workPort + 1);
|
profile.setInpstr1Path(localPath);
|
||||||
profile.setInpstr1Path(localInPath);
|
profile.setOutstr1Path(localPath);
|
||||||
profile.setOutstr1Path(localOutPath);
|
|
||||||
profileMapper.updateById(profile);
|
profileMapper.updateById(profile);
|
||||||
|
|
||||||
// 3) Generate config and start rtkrcv
|
// 3) Generate config and start rtkrcv
|
||||||
@ -109,7 +103,7 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
session.setUpdatedAt(LocalDateTime.now());
|
session.setUpdatedAt(LocalDateTime.now());
|
||||||
sessionMapper.insert(session);
|
sessionMapper.insert(session);
|
||||||
|
|
||||||
startRtkrcv(deviceId, conf.getParent().toString(), conf.toString());
|
//startRtkrcv(deviceId, conf.getParent().toString(), conf.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startRtkrcv(String deviceId, String workDir, String confPath) {
|
private void startRtkrcv(String deviceId, String workDir, String confPath) {
|
||||||
@ -180,7 +174,6 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
|
|
||||||
static class DeviceEndpoint {
|
static class DeviceEndpoint {
|
||||||
private final int port;
|
private final int port;
|
||||||
private final boolean outMode;
|
|
||||||
private final ExecutorService exec;
|
private final ExecutorService exec;
|
||||||
private volatile boolean started = false;
|
private volatile boolean started = false;
|
||||||
private volatile ServerSocket server;
|
private volatile ServerSocket server;
|
||||||
@ -189,12 +182,7 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
|
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
|
||||||
|
|
||||||
DeviceEndpoint(int port) {
|
DeviceEndpoint(int port) {
|
||||||
this(port, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
DeviceEndpoint(int port, boolean outMode) {
|
|
||||||
this.port = port;
|
this.port = port;
|
||||||
this.outMode = outMode;
|
|
||||||
this.exec = Executors.newCachedThreadPool(r -> {
|
this.exec = Executors.newCachedThreadPool(r -> {
|
||||||
Thread t = new Thread(r, "rtkcluster-ep-" + this.port);
|
Thread t = new Thread(r, "rtkcluster-ep-" + this.port);
|
||||||
t.setDaemon(true);
|
t.setDaemon(true);
|
||||||
@ -205,28 +193,11 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
synchronized void ensureStarted() {
|
synchronized void ensureStarted() {
|
||||||
if (started) return;
|
if (started) return;
|
||||||
exec.submit(this::acceptLoop);
|
exec.submit(this::acceptLoop);
|
||||||
if (!outMode) {
|
exec.submit(this::dequeueLoop);
|
||||||
exec.submit(this::dequeueLoop);
|
|
||||||
}
|
|
||||||
started = true;
|
started = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ensureInMode() {
|
|
||||||
if (outMode) {
|
|
||||||
throw new IllegalStateException("Endpoint is configured as OUT mode: " + port);
|
|
||||||
}
|
|
||||||
ensureStarted();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ensureOutMode() {
|
|
||||||
if (!outMode) {
|
|
||||||
throw new IllegalStateException("Endpoint is configured as IN mode: " + port);
|
|
||||||
}
|
|
||||||
ensureStarted();
|
|
||||||
}
|
|
||||||
|
|
||||||
void enqueueRtcm(byte[] data) {
|
void enqueueRtcm(byte[] data) {
|
||||||
if (outMode) return;
|
|
||||||
if (data == null || data.length == 0) return;
|
if (data == null || data.length == 0) return;
|
||||||
if (!rtcmQueue.offerLast(data)) {
|
if (!rtcmQueue.offerLast(data)) {
|
||||||
// queue full: drop oldest to keep stream fresh
|
// queue full: drop oldest to keep stream fresh
|
||||||
@ -240,7 +211,7 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
ss.setReuseAddress(true);
|
ss.setReuseAddress(true);
|
||||||
ss.bind(new InetSocketAddress("127.0.0.1", port));
|
ss.bind(new InetSocketAddress("127.0.0.1", port));
|
||||||
this.server = ss;
|
this.server = ss;
|
||||||
LOGGER.info("RtkCluster {} endpoint listening on 127.0.0.1:{}", outMode ? "OUT" : "IN", port);
|
LOGGER.info("RtkCluster device endpoint listening on 127.0.0.1:{}", port);
|
||||||
while (true) {
|
while (true) {
|
||||||
Socket s = ss.accept();
|
Socket s = ss.accept();
|
||||||
classifyConnection(s);
|
classifyConnection(s);
|
||||||
@ -252,50 +223,54 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
|
|
||||||
private void classifyConnection(Socket s) {
|
private void classifyConnection(Socket s) {
|
||||||
exec.submit(() -> {
|
exec.submit(() -> {
|
||||||
boolean assigned = false;
|
|
||||||
try {
|
try {
|
||||||
s.setTcpNoDelay(true);
|
// short probe to classify role (increase to 1000ms to reduce misclassification)
|
||||||
s.setKeepAlive(true);
|
s.setSoTimeout(1000);
|
||||||
if (outMode) {
|
InputStream in = s.getInputStream();
|
||||||
Socket previous = outConn;
|
byte[] probe = new byte[256];
|
||||||
if (isSocketAlive(previous)) {
|
int n = 0;
|
||||||
LOGGER.info("Endpoint {} replacing existing OUT connection", port);
|
try { n = in.read(probe); } catch (IOException ignore) {}
|
||||||
closeQuietly(previous);
|
// restore to blocking mode for steady-state
|
||||||
} else {
|
s.setSoTimeout(0);
|
||||||
LOGGER.debug("Endpoint {} OUT connected", port);
|
if (n > 0 && isLikelyText(probe, n)) {
|
||||||
}
|
// OUT connection (NMEA etc.)
|
||||||
|
closeQuietly(outConn);
|
||||||
|
s.setTcpNoDelay(true);
|
||||||
|
s.setKeepAlive(true);
|
||||||
outConn = s;
|
outConn = s;
|
||||||
assigned = true;
|
LOGGER.debug("Endpoint {} OUT connected", port);
|
||||||
pumpOut(s, null, 0);
|
pumpOut(outConn, probe, n);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Socket previous = inConn;
|
|
||||||
if (isSocketAlive(previous)) {
|
|
||||||
LOGGER.info("Endpoint {} replacing existing IN connection", port);
|
|
||||||
closeQuietly(previous);
|
|
||||||
} else {
|
} else {
|
||||||
|
// IN connection (RTCM sink)
|
||||||
|
closeQuietly(inConn);
|
||||||
|
s.setTcpNoDelay(true);
|
||||||
|
s.setKeepAlive(true);
|
||||||
|
inConn = s;
|
||||||
LOGGER.debug("Endpoint {} IN connected", port);
|
LOGGER.debug("Endpoint {} IN connected", port);
|
||||||
}
|
}
|
||||||
inConn = s;
|
|
||||||
assigned = true;
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOGGER.warn("classifyConnection error: {}", e.getMessage());
|
LOGGER.warn("classifyConnection error: {}", e.getMessage());
|
||||||
} finally {
|
closeQuietly(s);
|
||||||
if (!assigned) {
|
|
||||||
closeQuietly(s);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isLikelyText(byte[] buf, int n) {
|
||||||
|
int printable = 0;
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
int b = buf[i] & 0xFF;
|
||||||
|
if (b >= 32 && b <= 126) printable++;
|
||||||
|
}
|
||||||
|
return printable >= Math.max(1, n - 4); // tolerate some non-printables
|
||||||
|
}
|
||||||
|
|
||||||
private void pumpOut(Socket s, byte[] firstBuf, int firstLen) {
|
private void pumpOut(Socket s, byte[] firstBuf, int firstLen) {
|
||||||
exec.submit(() -> {
|
exec.submit(() -> {
|
||||||
try (InputStream in = s.getInputStream()) {
|
try (InputStream in = s.getInputStream()) {
|
||||||
byte[] buf = new byte[2048];
|
byte[] buf = new byte[2048];
|
||||||
int read;
|
int read;
|
||||||
// deliver first classified bytes if any
|
// deliver first classified bytes if any
|
||||||
if (firstBuf != null && firstLen > 0) {
|
if (firstLen > 0) {
|
||||||
String preview = new String(firstBuf, 0, firstLen, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
|
String preview = new String(firstBuf, 0, firstLen, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
|
||||||
LOGGER.info("[OUT:{}] {}", port, preview);
|
LOGGER.info("[OUT:{}] {}", port, preview);
|
||||||
}
|
}
|
||||||
@ -317,7 +292,8 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
Socket sink = inConn;
|
Socket sink = inConn;
|
||||||
if (!isSocketAlive(sink)) {
|
if (sink == null || sink.isClosed()) {
|
||||||
|
// avoid consuming queue when no sink, preventing starvation
|
||||||
Thread.sleep(20);
|
Thread.sleep(20);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -327,12 +303,7 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
os.write(data);
|
os.write(data);
|
||||||
os.flush();
|
os.flush();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
String msg = e.getMessage();
|
LOGGER.warn("Write RTCM failed on {}: {}", port, e.getMessage());
|
||||||
if (msg != null && msg.toLowerCase().contains("closed")) {
|
|
||||||
LOGGER.debug("Write RTCM failed on {}: {}", port, msg);
|
|
||||||
} else {
|
|
||||||
LOGGER.warn("Write RTCM failed on {}: {}", port, msg);
|
|
||||||
}
|
|
||||||
closeQuietly(sink);
|
closeQuietly(sink);
|
||||||
inConn = null;
|
inConn = null;
|
||||||
}
|
}
|
||||||
@ -343,10 +314,6 @@ public class RtkClusterService implements ApplicationRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isSocketAlive(Socket s) {
|
|
||||||
return s != null && s.isConnected() && !s.isClosed() && !s.isInputShutdown() && !s.isOutputShutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void closeQuietly(Socket s) {
|
private void closeQuietly(Socket s) {
|
||||||
if (s == null) return;
|
if (s == null) return;
|
||||||
try { s.close(); } catch (IOException ignore) {}
|
try { s.close(); } catch (IOException ignore) {}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user