feat: 新增RTK配置组

This commit is contained in:
yarnom 2025-10-29 15:53:36 +08:00
parent 3522e16070
commit f25131c4b9
6 changed files with 160 additions and 11 deletions

View File

@ -0,0 +1,54 @@
package com.imdroid.secapi.dto;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@TableName("rtkrcv_group")
public class RtkrcvGroup {
@TableId(value = "group_id", type = IdType.AUTO)
private Long groupId;
@TableField("tenant_no")
private String tenantNo;
@TableField("group_name")
private String groupName;
@TableField("created_at")
private LocalDateTime createdAt;
@TableField("updated_at")
private LocalDateTime updatedAt;
// Unified group-level config fields (nullable)
@TableField("inpstr2_type")
private String inpstr2Type;
@TableField("inpstr2_path")
private String inpstr2Path;
@TableField("inpstr2_format")
private String inpstr2Format;
@TableField("inpstr3_type")
private String inpstr3Type;
@TableField("inpstr3_path")
private String inpstr3Path;
@TableField("inpstr3_format")
private String inpstr3Format;
@TableField("outstr1_format")
private String outstr1Format;
@TableField("outstr2_type")
private String outstr2Type;
@TableField("outstr2_format")
private String outstr2Format;
@TableField("outstr2_path")
private String outstr2Path;
}

View File

@ -0,0 +1,9 @@
package com.imdroid.secapi.dto;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface RtkrcvGroupMapper extends BaseMapper<RtkrcvGroup> {
}

View File

@ -14,6 +14,8 @@ public class RtkrcvProfile {
@TableId(value = "device_id", type = IdType.INPUT)
private String deviceId;
@TableField("group_id")
private Long groupId;
@TableField("inpstr1_path")
private String inpstr1Path;
@TableField("inpstr2_path")

View File

@ -6,7 +6,6 @@ import com.imdroid.secapi.dto.RtkrcvProfileMapper;
import com.imdroid.secapi.dto.RtkrcvSession;
import com.imdroid.secapi.dto.RtkrcvSessionMapper;
import com.imdroid.sideslope.bd.RtcmGgaUtil;
import com.imdroid.sideslope.ntrip.RtkrcvConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -51,6 +50,7 @@ public class RtkClusterService implements ApplicationRunner {
private final Map<String, DeviceEndpoint> endpoints = new ConcurrentHashMap<>();
private final Map<String, Process> processes = new ConcurrentHashMap<>();
private final Map<String, Long> currentSessionIds = new ConcurrentHashMap<>();
private final ExecutorService worker = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "rtkcluster-worker");
@ -80,7 +80,8 @@ public class RtkClusterService implements ApplicationRunner {
private void bootstrapDevice(RtkrcvProfile profile, int workPort) throws IOException {
String deviceId = profile.getDeviceId();
// 1) Start endpoint server (if not exists)
endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(workPort));
endpoints.computeIfAbsent(deviceId, k -> new DeviceEndpoint(deviceId, workPort,
() -> onOutEstablished(deviceId)));
DeviceEndpoint ep = endpoints.get(deviceId);
ep.ensureStarted();
@ -102,6 +103,9 @@ public class RtkClusterService implements ApplicationRunner {
session.setCreatedAt(LocalDateTime.now());
session.setUpdatedAt(LocalDateTime.now());
sessionMapper.insert(session);
if (session.getSessionId() != null) {
currentSessionIds.put(deviceId, session.getSessionId());
}
startRtkrcv(deviceId, conf.getParent().toString(), conf.toString());
}
@ -122,6 +126,18 @@ public class RtkClusterService implements ApplicationRunner {
LOGGER.info("Starting rtkrcv for {} with {}", deviceId, confPath);
Process p = builder.start();
processes.put(deviceId, p);
// record pid and start_time for session
try {
Long sid = currentSessionIds.get(deviceId);
if (sid != null) {
RtkrcvSession upd = new RtkrcvSession();
upd.setSessionId(sid);
upd.setPid((int) p.pid());
upd.setStartTime(LocalDateTime.now());
upd.setUpdatedAt(LocalDateTime.now());
sessionMapper.updateById(upd);
}
} catch (Exception ignore) {}
// async log stdout
logOutput(deviceId, p);
} catch (IOException e) {
@ -143,6 +159,15 @@ public class RtkClusterService implements ApplicationRunner {
try {
int code = p.waitFor();
LOGGER.info("rtkrcv for {} exited with {}", deviceId, code);
Long sid = currentSessionIds.get(deviceId);
if (sid != null) {
RtkrcvSession upd = new RtkrcvSession();
upd.setSessionId(sid);
upd.setEndTime(LocalDateTime.now());
upd.setState(code == 0 ? "succeeded" : "failed");
upd.setUpdatedAt(LocalDateTime.now());
sessionMapper.updateById(upd);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@ -172,7 +197,23 @@ public class RtkClusterService implements ApplicationRunner {
ep.enqueueRtcm(data);
}
private void onOutEstablished(String deviceId) {
try {
Long sid = currentSessionIds.get(deviceId);
if (sid != null) {
RtkrcvSession upd = new RtkrcvSession();
upd.setSessionId(sid);
upd.setState("running");
upd.setUpdatedAt(LocalDateTime.now());
sessionMapper.updateById(upd);
}
} catch (Exception e) {
LOGGER.warn("Update session running failed for {}: {}", deviceId, e.getMessage());
}
}
static class DeviceEndpoint {
private final String deviceId;
private final int port;
private final ExecutorService exec;
private volatile boolean started = false;
@ -182,9 +223,12 @@ public class RtkClusterService implements ApplicationRunner {
private volatile Socket outConn; // chosen source to read solution
private final java.util.Set<Socket> liveConns = java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap<>());
private final java.util.concurrent.LinkedBlockingDeque<byte[]> rtcmQueue = new java.util.concurrent.LinkedBlockingDeque<>(1024);
private final Runnable onOutEstablished;
DeviceEndpoint(int port) {
DeviceEndpoint(String deviceId, int port, Runnable onOutEstablished) {
this.deviceId = deviceId;
this.port = port;
this.onOutEstablished = onOutEstablished;
this.exec = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "rtkcluster-ep-" + this.port);
t.setDaemon(true);
@ -271,6 +315,7 @@ public class RtkClusterService implements ApplicationRunner {
}
} catch (Exception ignore) {}
LOGGER.info("Endpoint {} OUT established", port);
try { if (onOutEstablished != null) onOutEstablished.run(); } catch (Exception ignore) {}
}
String preview = new String(buf, 0, read, StandardCharsets.US_ASCII).replaceAll("\n", "\\n");
LOGGER.info("[OUT:{}] {}", port, preview);

View File

@ -1,9 +1,12 @@
package com.imdroid.sideslope.ntrip;
package com.imdroid.sideslope.rtkcluster;
import com.imdroid.secapi.dto.RtkrcvProfile;
import com.imdroid.secapi.dto.RtkrcvGroup;
import com.imdroid.secapi.dto.RtkrcvGroupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Service;
@ -16,6 +19,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -27,6 +32,9 @@ public class RtkrcvConfigService {
@Value("${rtkrcv.workdir:/opt/rtk}")
private String defaultWorkDir;
@Autowired(required = false)
private RtkrcvGroupMapper groupMapper;
/**
* Generate an RTKLIB rtkrcv config file from template using profile values.
* The file will be saved to `${rtkrcv.workdir:/opt/rtk}/rtkrcv_{deviceId}.conf`.
@ -49,8 +57,14 @@ public class RtkrcvConfigService {
List<String> lines = readTemplateLines();
List<String> rendered = new ArrayList<>(lines.size());
Map<String, String> fromGroup = loadGroupConfig(profile);
for (String line : lines) {
String replaced = line;
if (!fromGroup.isEmpty()) {
for (Map.Entry<String, String> e : fromGroup.entrySet()) {
replaced = replaceValueLine(replaced, e.getKey(), e.getValue());
}
}
replaced = replaceValueLine(replaced, "inpstr1-path", nz(profile.getInpstr1Path()));
replaced = replaceValueLine(replaced, "inpstr2-path", nz(profile.getInpstr2Path()));
replaced = replaceValueLine(replaced, "inpstr3-path", nz(profile.getInpstr3Path()));
@ -87,6 +101,33 @@ public class RtkrcvConfigService {
return out;
}
private Map<String, String> loadGroupConfig(RtkrcvProfile profile) {
Map<String, String> map = new HashMap<>();
try {
if (profile.getGroupId() == null || groupMapper == null) return map;
RtkrcvGroup group = groupMapper.selectById(profile.getGroupId());
if (group == null) return map;
// Only apply non-null fields
putIfNotNull(map, "inpstr2-type", group.getInpstr2Type());
putIfNotNull(map, "inpstr2-path", group.getInpstr2Path());
putIfNotNull(map, "inpstr2-format", group.getInpstr2Format());
putIfNotNull(map, "inpstr3-type", group.getInpstr3Type());
putIfNotNull(map, "inpstr3-path", group.getInpstr3Path());
putIfNotNull(map, "inpstr3-format", group.getInpstr3Format());
putIfNotNull(map, "outstr1-format", group.getOutstr1Format());
putIfNotNull(map, "outstr2-type", group.getOutstr2Type());
putIfNotNull(map, "outstr2-format", group.getOutstr2Format());
putIfNotNull(map, "outstr2-path", group.getOutstr2Path());
} catch (Exception ex) {
LOGGER.warn("Failed to load group conf for group {}: {}", profile.getGroupId(), ex.getMessage());
}
return map;
}
private void putIfNotNull(Map<String,String> map, String key, String value) {
if (value != null) map.put(key, value);
}
private List<String> readTemplateLines() throws IOException {
ClassPathResource resource = new ClassPathResource("rtkrcv_default.conf");
if (!resource.exists()) {

View File

@ -1,4 +1,4 @@
pos1-posmode =single # (0:single,1:dgps,2:kinematic,3:static,4:static-start,5:movingbase,6:fixed,7:ppp-kine,8:ppp-static,9:ppp-fixed)
pos1-posmode =static # (0:single,1:dgps,2:kinematic,3:static,4:static-start,5:movingbase,6:fixed,7:ppp-kine,8:ppp-static,9:ppp-fixed)
pos1-frequency =l1+l2+l5 # (1:l1,2:l1+l2,3:l1+l2+l5,4:l1+l2+l5+l6)
pos1-soltype =forward # (0:forward,1:backward,2:combined,3:combined-nophasereset)
pos1-elmask =10 # (deg)
@ -125,13 +125,11 @@ file-solstatfile =
file-tracefile =
#
inpstr1-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)
inpstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)
inpstr2-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)
inpstr3-type =ntripcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,6:ntripcli,7:ftp,8:http)
inpstr1-path =beidou:29832611@8.134.185.53:8001/6539837
#inpstr2-path =cedr25866:fyx25455@120.253.226.97:8001/RTCM33_GRCEJ
inpstr1-path =127.0.0.1:20001
inpstr2-path =ytcors14847:fyx25943@gnss.ytcors.cn:8003/RTCM33GRCEJpro
#inpstr2-path =beidou:29832611@192.168.100.206:2101/8507903
inpstr3-path =Ming:@Zhang12345@ntrip.data.gnss.ga.gov.au:2101/BCEP00BKG0
inpstr1-format =rtcm3 # (0:rtcm2,1:rtcm3,2:oem4,4:ubx,5:swift,6:hemis,7:skytraq,8:javad,9:nvs,10:binex,11:rt17,12:sbf,15:sp3)
inpstr2-format =rtcm3 # (0:rtcm2,1:rtcm3,2:oem4,4:ubx,5:swift,6:hemis,7:skytraq,8:javad,9:nvs,10:binex,11:rt17,12:sbf,15:sp3)
@ -142,9 +140,9 @@ inpstr2-nmealon = # (deg)
outstr1-type =tcpcli # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)
outstr2-type =off # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)
outstr1-path =127.0.0.1:20001
outstr2-path =
outstr2-path =/opt/rtk/LLH/temp_llh.log
outstr1-format =llh # (0:llh,1:xyz,2:enu,3:nmea:4:stat)
outstr2-format = # (0:llh,1:xyz,2:enu,3:nmea:4:stat)
outstr2-format =llh # (0:llh,1:xyz,2:enu,3:nmea:4:stat)
logstr1-type =0 # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)
logstr2-type =off # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)
logstr3-type =off # (0:off,1:serial,2:file,3:tcpsvr,4:tcpcli,5:ntripsvr,9:ntripcas)