commit a3f3c78b49fd94d0f26e6d6c0dec6e598bad4668 Author: fengyarnom Date: Mon May 19 16:59:25 2025 +0800 first commit diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..f6c9644 --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..ab49984 --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml new file mode 100644 index 0000000..712ab9d --- /dev/null +++ b/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..0d1c340 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..a282c5b --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/ntrip_load-main.iml b/.idea/ntrip_load-main.iml new file mode 100644 index 0000000..d6ebd48 --- /dev/null +++ b/.idea/ntrip_load-main.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml new file mode 100644 index 0000000..2b63946 --- /dev/null +++ b/.idea/uiDesigner.xml @@ -0,0 +1,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/ntrip_load-main/.gitignore b/ntrip_load-main/.gitignore new file mode 100644 index 0000000..2f7896d --- /dev/null +++ b/ntrip_load-main/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/ntrip_load-main/.idea/.gitignore b/ntrip_load-main/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/ntrip_load-main/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/ntrip_load-main/.idea/compiler.xml b/ntrip_load-main/.idea/compiler.xml new file mode 100644 index 0000000..f28789c --- /dev/null +++ b/ntrip_load-main/.idea/compiler.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/ntrip_load-main/.idea/encodings.xml b/ntrip_load-main/.idea/encodings.xml new file mode 100644 index 0000000..aa00ffa --- /dev/null +++ b/ntrip_load-main/.idea/encodings.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/ntrip_load-main/.idea/jarRepositories.xml b/ntrip_load-main/.idea/jarRepositories.xml new file mode 100644 index 0000000..712ab9d --- /dev/null +++ b/ntrip_load-main/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/ntrip_load-main/.idea/misc.xml b/ntrip_load-main/.idea/misc.xml new file mode 100644 index 0000000..65f9ed0 --- /dev/null +++ b/ntrip_load-main/.idea/misc.xml @@ -0,0 +1,12 @@ + + + + + + + + \ No newline at end of file diff --git a/ntrip_load-main/README.md b/ntrip_load-main/README.md new file mode 100644 index 0000000..5fac264 --- /dev/null +++ b/ntrip_load-main/README.md @@ -0,0 +1,41 @@ +# ntrip_load + +Dumb load test for caster. + +Need openjdk java >21 (>22 ideally) + +$ java ./src/main/java/com/test/NtripLoadTest.java + +will launch multiple base + multiple client on a targetted caster ( constant set to 127.0.0.1 in the file ) + -> Password set to letmein for mountpoint + -> password set to centipede:letmein for client + +Mountpoint are named "TESTxx" with xx a number between 0 and max_number_of_mountpoint-1. + + +A "demo server" is also present + +$ java ./src/main/java/com/test/NtripServer.java + + + +Data created by the mountpoint are 1500 char + some. It contains "iteration: " + a incremental number + "-" repeated. + +You can read a mountpoint with: + +$ str2str -in ntrip://centipede:letmein@localhost:2101/TEST123 | sed "s/-//g" + +so that you can just see the "iteration" number message (to check if some message are not lost, or delivered late) + + + if you want to manually create a mountpoint, you can use + +$ java ./src/main/java/com/test/NtripProducer.java + +which will listen on TCP 5015 and will send the message "iteration: " + a incremental number + "-" repeated. + +so that you can use: + +str2str -in tcpcli://localhost:5015 -out ntrips://:letmein@localhost:2101/MYTEST + + diff --git a/ntrip_load-main/pom.xml b/ntrip_load-main/pom.xml new file mode 100644 index 0000000..db64150 --- /dev/null +++ b/ntrip_load-main/pom.xml @@ -0,0 +1,89 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.1.0 + + + com.rtklib + rtklib-data-receiver + 1.0-SNAPSHOT + + + 17 + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-websocket + + + + + jakarta.websocket + jakarta.websocket-api + 2.1.0 + + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + com.mysql + mysql-connector-j + 8.0.33 + + + + + org.projectlombok + lombok + true + + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 21 + 21 + + + + + \ No newline at end of file diff --git a/ntrip_load-main/requirements.txt b/ntrip_load-main/requirements.txt new file mode 100644 index 0000000..0fc612f --- /dev/null +++ b/ntrip_load-main/requirements.txt @@ -0,0 +1,6 @@ +sqlalchemy==2.0.23 +pymysql==1.1.0 +python-dotenv==1.0.0 +fastapi==0.104.1 +uvicorn==0.24.0 +pydantic==2.4.2 \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/RtkDataReceiverApplication.java b/ntrip_load-main/src/main/java/com/rtklib/RtkDataReceiverApplication.java new file mode 100644 index 0000000..bb680f3 --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/RtkDataReceiverApplication.java @@ -0,0 +1,13 @@ +package com.rtklib; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; + +@SpringBootApplication +@EnableScheduling +public class RtkDataReceiverApplication { + public static void main(String[] args) { + SpringApplication.run(RtkDataReceiverApplication.class, args); + } +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/TcpRtkServer.java b/ntrip_load-main/src/main/java/com/rtklib/TcpRtkServer.java new file mode 100644 index 0000000..352aa21 --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/TcpRtkServer.java @@ -0,0 +1,273 @@ +package com.rtklib; + +import com.rtklib.entity.RtkData; +import com.rtklib.repository.RtkDataRepository; +import com.rtklib.util.PwvCalculator; +import com.rtklib.util.TimeConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.ServerSocket; +import java.net.Socket; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +@Component +public class TcpRtkServer implements CommandLineRunner { + private static final Logger logger = LoggerFactory.getLogger(TcpRtkServer.class); + private static final int PORT = 5015; // 可根据需要修改端口 + + // 气象数据库连接信息 + private static final String RAIN_DB_URL = "jdbc:mysql://8.134.185.53:3306/rain_db?useSSL=true"; + private static final String RAIN_DB_USER = "remote"; + private static final String RAIN_DB_PASSWORD = "root"; + + @Autowired + private RtkDataRepository rtkDataRepository; + + // 使用AtomicReference来存储最新的数据 + private final AtomicReference latestData = new AtomicReference<>(); + // 存储最后一次保存的时间 + private LocalDateTime lastSaveTime; + + @Override + public void run(String... args) throws Exception { + new Thread(this::startServer, "TcpRtkServer-Main").start(); + } + + private void startServer() { + try (ServerSocket serverSocket = new ServerSocket(PORT)) { + logger.info("TCP RTK Server started on port {}", PORT); + while (true) { + Socket clientSocket = serverSocket.accept(); + new Thread(() -> handleClient(clientSocket), "TcpRtkServer-Client").start(); + } + } catch (Exception e) { + logger.error("TCP Server error: {}", e.getMessage(), e); + } + } + + private void handleClient(Socket socket) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + if (line.startsWith("$POS")) { + parsePosData(line); + } else if (line.startsWith("$TROP")) { + parseTropData(line); + } else if (line.startsWith("$TRPG")) { + parseTrpgData(line); + } else if (line.startsWith("$SATE")) { + parseSateData(line); + } + } + } catch (Exception e) { + logger.error("Error handling client: {}", e.getMessage(), e); + } + } + + private void parsePosData(String message) { + try { + String[] parts = message.split(","); + + // 获取当前数据,如果有则保留原有数据,否则创建新对象 + RtkData rtkData = latestData.get(); + if (rtkData == null) { + rtkData = new RtkData(); + } + + int week = Integer.parseInt(parts[1]); + double timeOfWeek = Double.parseDouble(parts[2]); + rtkData.setTimestamp(TimeConverter.gpsToBeijingTime(week, timeOfWeek)); + rtkData.setWeek(week); + rtkData.setTimeOfWeek(timeOfWeek); + rtkData.setLatitude(Double.parseDouble(parts[4])); + rtkData.setLongitude(Double.parseDouble(parts[5])); + rtkData.setHeight(Double.parseDouble(parts[6])); + latestData.set(rtkData); + logger.debug("Parsed $POS data: {}", rtkData); + } catch (Exception e) { + logger.error("Error parsing $POS data: {}", e.getMessage(), e); + } + } + + private void parseTropData(String message) { + try { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + String[] parts = message.split(","); + rtkData.setZtd(Double.parseDouble(parts[5])); + latestData.set(rtkData); + logger.debug("Updated $TROP data: ZTD={}", parts[5]); + } + } catch (Exception e) { + logger.error("Error parsing $TROP data: {}", e.getMessage(), e); + } + } + + private void parseTrpgData(String message) { + try { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + String[] parts = message.split(","); + // 打印完整消息格式以便调试 + logger.info("$TRPG消息格式: {}", message); + + // 安全检查数组长度 + if (parts.length > 6) { + try { + double north = Double.parseDouble(parts[5]); + double east = Double.parseDouble(parts[6]); + + rtkData.setZtdGradientNorth(north); + rtkData.setZtdGradientEast(east); + latestData.set(rtkData); + logger.info("成功解析$TRPG数据: North={}, East={}", north, east); + } catch (NumberFormatException e) { + logger.error("$TRPG数值解析错误,位置5和6: {}, {}", parts[5], parts[6], e); + } + } else { + logger.warn("$TRPG消息格式不正确,数组长度不足: {}", parts.length); + // 尝试不同的索引位置,例如假设格式可能是$TRPG,北梯度值,东梯度值 + if (parts.length > 2) { + try { + double north = Double.parseDouble(parts[1]); + double east = Double.parseDouble(parts[2]); + + rtkData.setZtdGradientNorth(north); + rtkData.setZtdGradientEast(east); + latestData.set(rtkData); + logger.info("使用替代格式解析$TRPG数据: North={}, East={}", north, east); + } catch (Exception ex) { + logger.error("尝试替代格式解析$TRPG失败", ex); + } + } + } + } else { + logger.warn("收到$TRPG数据,但尚未收到$POS数据初始化RtkData对象"); + } + } catch (Exception e) { + logger.error("解析$TRPG数据发生异常: {}", e.getMessage(), e); + } + } + + private void parseSateData(String message) { + try { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + rtkData.setSatelliteInfo(message); + latestData.set(rtkData); + logger.debug("Updated $SATE data"); + } + } catch (Exception e) { + logger.error("Error parsing $SATE data: {}", e.getMessage(), e); + } + } + + /** + * 获取最新的传感器数据 + * @return 返回包含温度和气压的Map,如果查询失败则返回null + */ + private Map getLatestSensorData() { + try (Connection conn = DriverManager.getConnection(RAIN_DB_URL, RAIN_DB_USER, RAIN_DB_PASSWORD)) { + String query = "SELECT temperature/10 AS temperature, atm_pressure AS pressure " + + "FROM sensor_data ORDER BY timestamp DESC LIMIT 1"; + + try (PreparedStatement ps = conn.prepareStatement(query); + ResultSet rs = ps.executeQuery()) { + + if (rs.next()) { + double temperature = rs.getDouble("temperature"); + double pressure = rs.getDouble("pressure"); + + logger.info("获取到最新传感器数据: 温度={}℃, 气压={}hPa", temperature, pressure); + return Map.of("temperature", temperature, "pressure", pressure); + } else { + logger.warn("未找到传感器数据"); + return null; + } + } + } catch (Exception e) { + logger.error("获取传感器数据失败: {}", e.getMessage(), e); + return null; + } + } + + /** + * 计算PWV并准备保存数据 + */ + private void calculateAndPreparePWV() { + try { + RtkData currentData = latestData.get(); + if (currentData == null || currentData.getZtd() == null) { + logger.warn("没有可用的ZTD数据,无法计算PWV"); + return; + } + + // 获取最新传感器数据 + Map sensorData = getLatestSensorData(); + if (sensorData == null) { + logger.warn("无法获取传感器数据,使用默认值进行PWV计算"); + // 使用默认值:温度25℃,气压1000hPa + currentData.setSurfaceTemp(25.0); + currentData.setSurfacePressure(1000.0); + } else { + currentData.setSurfaceTemp(sensorData.get("temperature")); + currentData.setSurfacePressure(sensorData.get("pressure")); + } + + // 计算PWV + boolean success = PwvCalculator.calculatePWV(currentData); + if (success) { + latestData.set(currentData); + logger.info("PWV计算成功,结果: {}", currentData.getPwv()); + } else { + logger.warn("PWV计算失败"); + } + } catch (Exception e) { + logger.error("计算PWV过程中发生错误: {}", e.getMessage(), e); + } + } + + // 每10分钟执行一次数据保存 + @Scheduled(fixedRate = 6000) // 600000毫秒 = 10分钟 + public void saveData() { + try { + RtkData currentData = latestData.get(); + if (currentData == null) { + logger.warn("没有可用的RTK数据,跳过本次保存"); + return; + } + + LocalDateTime now = LocalDateTime.now(); + + // 计算PWV + calculateAndPreparePWV(); + + // 检查是否需要保存(距离上次保存超过10分钟) + if (lastSaveTime == null || + ChronoUnit.MINUTES.between(lastSaveTime, now) >= 1) { // 调试时设为1分钟,实际使用时改为10 + + rtkDataRepository.save(currentData); + lastSaveTime = now; + logger.info("成功保存RTK数据: 时间={}, ZTD={}, PWV={}", + currentData.getTimestamp(), currentData.getZtd(), currentData.getPwv()); + } + } catch (Exception e) { + logger.error("保存数据过程中发生错误: {}", e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/config/WebSocketConfig.java b/ntrip_load-main/src/main/java/com/rtklib/config/WebSocketConfig.java new file mode 100644 index 0000000..3cc30b4 --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/config/WebSocketConfig.java @@ -0,0 +1,22 @@ +package com.rtklib.config; + +import com.rtklib.websocket.RtkWebSocketHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(rtkWebSocketHandler(), "/rtk").setAllowedOrigins("*"); + } + + @Bean + public RtkWebSocketHandler rtkWebSocketHandler() { + return new RtkWebSocketHandler(); + } +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/entity/RtkData.java b/ntrip_load-main/src/main/java/com/rtklib/entity/RtkData.java new file mode 100644 index 0000000..48601ab --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/entity/RtkData.java @@ -0,0 +1,53 @@ +package com.rtklib.entity; + +import lombok.Data; +import jakarta.persistence.*; +import java.time.LocalDateTime; + +@Data +@Entity +@Table(name = "rtk_data") +public class RtkData { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "timestamp") + private LocalDateTime timestamp; + + @Column(name = "week") + private Integer week; + + @Column(name = "time_of_week") + private Double timeOfWeek; + + @Column(name = "latitude") + private Double latitude; + + @Column(name = "longitude") + private Double longitude; + + @Column(name = "height") + private Double height; + + @Column(name = "ztd") + private Double ztd; + + @Column(name = "ztd_gradient_north") + private Double ztdGradientNorth; + + @Column(name = "ztd_gradient_east") + private Double ztdGradientEast; + + @Column(name = "satellite_info", length = 1000) + private String satelliteInfo; + + @Column(name = "pwv") + private Double pwv; + + @Transient // 不存入数据库的临时字段 + private Double surfacePressure; + + @Transient // 不存入数据库的临时字段 + private Double surfaceTemp; +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/repository/RtkDataRepository.java b/ntrip_load-main/src/main/java/com/rtklib/repository/RtkDataRepository.java new file mode 100644 index 0000000..b06502d --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/repository/RtkDataRepository.java @@ -0,0 +1,9 @@ +package com.rtklib.repository; + +import com.rtklib.entity.RtkData; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface RtkDataRepository extends JpaRepository { +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/util/PwvCalculator.java b/ntrip_load-main/src/main/java/com/rtklib/util/PwvCalculator.java new file mode 100644 index 0000000..f06bd07 --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/util/PwvCalculator.java @@ -0,0 +1,72 @@ +package com.rtklib.util; + +import com.rtklib.entity.RtkData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PWV计算工具类 + */ +public class PwvCalculator { + private static final Logger logger = LoggerFactory.getLogger(PwvCalculator.class); + + // PWV计算需要的常量 + private static final double ROUW = 1e3; // 水密度,kg/m³ + private static final double RV = 461.5; // 水汽气体常数,J/(kg·K) + private static final double K2P = 22.1 / 100; // 转换为 K/Pa + private static final double K3 = 3.739e5 / 100; // 转换为 K²/Pa + + /** + * 计算PWV值 + * @param data RtkData对象,包含ZTD和气象数据 + * @return 计算成功返回true,否则返回false + */ + public static boolean calculatePWV(RtkData data) { + try { + if (data.getZtd() == null || data.getSurfacePressure() == null || + data.getSurfaceTemp() == null || data.getLatitude() == null || + data.getHeight() == null) { + logger.warn("计算PWV所需数据不完整: ZTD={}, 气压={}, 温度={}, 纬度={}, 高度={}", + data.getZtd(), data.getSurfacePressure(), data.getSurfaceTemp(), + data.getLatitude(), data.getHeight()); + return false; + } + + // 1. 计算ZHD(静力学延迟),标准Saastamoinen公式 + double lat = Math.toRadians(data.getLatitude()); // 转换为弧度 + double h = data.getHeight() / 1000.0; // 高度转换为千米 + double p = data.getSurfacePressure(); // 气压,单位hPa + + // 计算ZHD (单位:mm) + // 标准公式:ZHD = (2.2767 * p) / (1 - 0.00266 * cos(2φ) - 0.00028 * h) + double f = 1.0 - 0.00266 * Math.cos(2 * lat) - 0.00028 * h; + double zhd = (2.2767 * p) / f; + + // 2. 计算ZWD(湿延迟,单位:mm) + double zwd = data.getZtd() - zhd; + + // 记录中间计算结果 + logger.info("ZHD计算中间值: 纬度={}°({}rad), 高度={}m({}km), 气压={}hPa, f={}, ZHD={}mm", + data.getLatitude(), lat, data.getHeight(), h, p, f, zhd); + + // 3. 计算Tm(加权平均温度) + double ts_K = data.getSurfaceTemp() + 273.15; // 转换为开尔文温度 + double tm = 70.2 + 0.72 * ts_K; + + // 4. 计算转换因子tao + double b = 1e6; + double tao = b / (ROUW * RV * (K2P + K3 / tm)); + + // 5. 计算PWV(以毫米为单位) + double pwv = zwd * tao; + data.setPwv(pwv); + + logger.info("PWV计算成功: ZTD={}mm, ZHD={}mm, ZWD={}mm, Tm={}K, PWV={}mm", + data.getZtd(), zhd, zwd, tm, pwv); + return true; + } catch (Exception e) { + logger.error("计算PWV时发生错误: {}", e.getMessage(), e); + return false; + } + } +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/util/TimeConverter.java b/ntrip_load-main/src/main/java/com/rtklib/util/TimeConverter.java new file mode 100644 index 0000000..0ad0cce --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/util/TimeConverter.java @@ -0,0 +1,28 @@ +package com.rtklib.util; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; + +public class TimeConverter { + // GPS起始时间:1980-01-06 00:00:00 UTC + private static final LocalDateTime GPS_EPOCH = LocalDateTime.of(1980, 1, 6, 0, 0, 0); + // 北京时区 + private static final ZoneId BEIJING_ZONE = ZoneId.of("Asia/Shanghai"); + // 当前闰秒数(需要定期更新) + private static final int CURRENT_LEAP_SECONDS = 18; + + public static LocalDateTime gpsToBeijingTime(int week, double timeOfWeek) { + // 计算GPS时间(UTC) + LocalDateTime gpsTime = GPS_EPOCH + .plus(week, ChronoUnit.WEEKS) + .plus((long) timeOfWeek, ChronoUnit.SECONDS); + + // 转换为北京时间(考虑闰秒) + return gpsTime + .plus(CURRENT_LEAP_SECONDS, ChronoUnit.SECONDS) + .atZone(ZoneId.of("UTC")) + .withZoneSameInstant(BEIJING_ZONE) + .toLocalDateTime(); + } +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketHandler.java b/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketHandler.java new file mode 100644 index 0000000..7028b48 --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketHandler.java @@ -0,0 +1,110 @@ +package com.rtklib.websocket; + +import com.rtklib.entity.RtkData; +import com.rtklib.repository.RtkDataRepository; +import com.rtklib.util.TimeConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.web.socket.*; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.atomic.AtomicReference; + + +public class RtkWebSocketHandler extends TextWebSocketHandler { + private static final Logger logger = LoggerFactory.getLogger(RtkWebSocketHandler.class); + private static RtkDataRepository rtkDataRepository; + private static final AtomicReference latestData = new AtomicReference<>(); + private static LocalDateTime lastSaveTime; + + @Autowired + public void setRtkDataRepository(RtkDataRepository repository) { + rtkDataRepository = repository; + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) { + logger.info("New connection established: {}", session.getId()); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + logger.info("Connection closed: {}", session.getId()); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) { + try { + String msg = message.getPayload(); + if (msg.startsWith("$POS")) { + parsePosData(msg); + } else if (msg.startsWith("$TROP")) { + parseTropData(msg); + } else if (msg.startsWith("$TRPG")) { + parseTrpgData(msg); + } else if (msg.startsWith("$SATE")) { + parseSateData(msg); + } + } catch (Exception e) { + logger.error("Error processing RTK data: {}", e.getMessage()); + } + } + + private void parsePosData(String message) { + String[] parts = message.split(","); + RtkData rtkData = new RtkData(); + int week = Integer.parseInt(parts[1]); + double timeOfWeek = Double.parseDouble(parts[2]); + rtkData.setTimestamp(TimeConverter.gpsToBeijingTime(week, timeOfWeek)); + rtkData.setWeek(week); + rtkData.setTimeOfWeek(timeOfWeek); + rtkData.setLatitude(Double.parseDouble(parts[4])); + rtkData.setLongitude(Double.parseDouble(parts[5])); + rtkData.setHeight(Double.parseDouble(parts[6])); + latestData.set(rtkData); + } + + private void parseTropData(String message) { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + String[] parts = message.split(","); + rtkData.setZtd(Double.parseDouble(parts[5])); + latestData.set(rtkData); + } + } + + private void parseTrpgData(String message) { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + String[] parts = message.split(","); + rtkData.setZtdGradientNorth(Double.parseDouble(parts[5])); + rtkData.setZtdGradientEast(Double.parseDouble(parts[6])); + latestData.set(rtkData); + } + } + + private void parseSateData(String message) { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + rtkData.setSatelliteInfo(message); + latestData.set(rtkData); + } + } + + @Scheduled(fixedRate = 600000) + public void saveData() { + RtkData currentData = latestData.get(); + if (currentData != null) { + LocalDateTime now = LocalDateTime.now(); + if (lastSaveTime == null || ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) { + rtkDataRepository.save(currentData); + lastSaveTime = now; + logger.info("Saved RTK data at: {}", now); + } + } + } +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketServer.java b/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketServer.java new file mode 100644 index 0000000..2f41bcd --- /dev/null +++ b/ntrip_load-main/src/main/java/com/rtklib/websocket/RtkWebSocketServer.java @@ -0,0 +1,136 @@ +package com.rtklib.websocket; + +import jakarta.websocket.Session; +import com.rtklib.entity.RtkData; +import com.rtklib.repository.RtkDataRepository; +import com.rtklib.util.TimeConverter; +import jakarta.websocket.OnClose; +import jakarta.websocket.OnError; +import jakarta.websocket.OnMessage; +import jakarta.websocket.OnOpen; +import jakarta.websocket.server.ServerEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +@Component +@ServerEndpoint("/rtk") +public class RtkWebSocketServer { + private static final Logger logger = LoggerFactory.getLogger(RtkWebSocketServer.class); + private static RtkDataRepository rtkDataRepository; + private static final Map SESSIONS = new ConcurrentHashMap<>(); + + // 使用AtomicReference来存储最新的数据 + private static final AtomicReference latestData = new AtomicReference<>(); + // 存储最后一次保存的时间 + private static LocalDateTime lastSaveTime; + + @Autowired + public void setRtkDataRepository(RtkDataRepository repository) { + rtkDataRepository = repository; + } + + @OnOpen + public void onOpen(Session session) { + SESSIONS.put(session.getId(), session); + logger.info("New connection established: {}", session.getId()); + } + + @OnClose + public void onClose(Session session) { + SESSIONS.remove(session.getId()); + logger.info("Connection closed: {}", session.getId()); + } + + @OnMessage + public void onMessage(String message, Session session) { + try { + if (message.startsWith("$POS")) { + parsePosData(message); + } else if (message.startsWith("$TROP")) { + parseTropData(message); + } else if (message.startsWith("$TRPG")) { + parseTrpgData(message); + } else if (message.startsWith("$SATE")) { + parseSateData(message); + } + } catch (Exception e) { + logger.error("Error processing RTK data: {}", e.getMessage()); + } + } + + @OnError + public void onError(Session session, Throwable error) { + logger.error("WebSocket error: {}", error.getMessage()); + } + + private void parsePosData(String message) { + String[] parts = message.split(","); + RtkData rtkData = new RtkData(); + int week = Integer.parseInt(parts[1]); + double timeOfWeek = Double.parseDouble(parts[2]); + + // 转换GPS时间到北京时间 + rtkData.setTimestamp(TimeConverter.gpsToBeijingTime(week, timeOfWeek)); + rtkData.setWeek(week); + rtkData.setTimeOfWeek(timeOfWeek); + rtkData.setLatitude(Double.parseDouble(parts[4])); + rtkData.setLongitude(Double.parseDouble(parts[5])); + rtkData.setHeight(Double.parseDouble(parts[6])); + + latestData.set(rtkData); + } + + private void parseTropData(String message) { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + String[] parts = message.split(","); + rtkData.setZtd(Double.parseDouble(parts[5])); + latestData.set(rtkData); + } + } + + private void parseTrpgData(String message) { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + String[] parts = message.split(","); + rtkData.setZtdGradientNorth(Double.parseDouble(parts[5])); + rtkData.setZtdGradientEast(Double.parseDouble(parts[6])); + latestData.set(rtkData); + } + } + + private void parseSateData(String message) { + RtkData rtkData = latestData.get(); + if (rtkData != null) { + rtkData.setSatelliteInfo(message); + latestData.set(rtkData); + } + } + + // 每10分钟执行一次数据保存 + @Scheduled(fixedRate = 600000) // 600000毫秒 = 10分钟 + public void saveData() { + RtkData currentData = latestData.get(); + if (currentData != null) { + LocalDateTime now = LocalDateTime.now(); + + // 检查是否需要保存(距离上次保存超过10分钟) + if (lastSaveTime == null || + ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) { + + rtkDataRepository.save(currentData); + lastSaveTime = now; + logger.info("Saved RTK data at: {}", now); + } + } + } +} \ No newline at end of file diff --git a/ntrip_load-main/src/main/java/com/test/NtripLoadTest.java b/ntrip_load-main/src/main/java/com/test/NtripLoadTest.java new file mode 100644 index 0000000..7cbcbc2 --- /dev/null +++ b/ntrip_load-main/src/main/java/com/test/NtripLoadTest.java @@ -0,0 +1,130 @@ +package com.test; + +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class NtripLoadTest { + public static final String TARGETTED_SERVER="127.0.0.1"; + public static final String casterPassword="letmein"; + public static final int packetSize = 1500; + public static final int mountNb=1000; + + + public static void main(String[] args) throws Exception { + System.out.println("Will launch "+mountNb+" mountpoint + 90%*3 clients"); + List threadList = new ArrayList<>(); + for (int i = 0; i < mountNb; i++) { + int mountId=i; + Thread thread = new Thread(() -> sendThread(mountId)); + thread.start(); + threadList.add(thread); + } + Thread.sleep(5000); + for (int i = 0; i < mountNb; i++) { + if (i % 10 == 0) { + continue; + } + for (int j = 0; j < 3; j++) { + int mountId=i; + int multiplier=j; + Thread thread = new Thread(() -> sendThread(mountId)); + thread.start(); + threadList.add(thread); + } + } + threadList.forEach(x -> { + try { + x.join(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + } + + public static void receiveThread(int mountId, int multiplier) { + try { + Thread.sleep(random.nextInt(1000)); + receive(mountId, multiplier); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static Random random=new Random(); + public static void sendThread(int mountId) { + try { + Thread.sleep(random.nextInt(1000)); + send(mountId); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void receive(int mountId, int multiplier) throws Exception { + String request = """ + GET /TEST$i HTTP/1.0 + User-Agent: NTRIP LOADTEST/0.0.0 + Authorization: Basic Y2VudGlwZWRlOmxldG1laW4= + + """.replace("$i", ""+mountId); + + Socket sock = new Socket(TARGETTED_SERVER, 2101); + + var os = sock.getOutputStream(); + var in = sock.getInputStream(); + os.write(request.getBytes()); + String s = ""; + byte[] b = new byte[10024]; + while (!s.endsWith("\n\n") && !s.endsWith("\r\n\r\n")) { + int len = in.read(b); + if (len < 0) { + System.out.println("Error on receive: " + s); + return; + } + s += new String(b, 0, len); + //System.out.println(s); + } + System.out.println("ok"); + while (true) { + int len = in.read(b); + if (len==0){ + continue; + } + //System.out.println("<-"+mountId+" "+multiplier); + } + + } + + + public static void send(int mountId) throws Exception { + Socket sock = new Socket(TARGETTED_SERVER, 2101); + String request = """ + SOURCE $passwd TEST$i + Source-Agent: NTRIP LOADTEST/0.0.0 + STR: + + """.replace("$i", ""+mountId) + .replace("$passwd", casterPassword); + var os = sock.getOutputStream(); + os.write(request.getBytes()); + String s = ""; + while (!s.endsWith("\n\n") && !s.endsWith("\r\n\r\n")) { + byte[] b = new byte[1024]; + int len = sock.getInputStream().read(b); + s += new String(b, 0, len); + System.out.println(s); + } + System.out.println("ok"); + int iteration = 0; + while (true) { + os.write(("iteration: " + iteration + "\r\n" + "a".repeat(packetSize)).getBytes()); + iteration++; + os.flush(); + //System.out.println("->"+mountId); + Thread.sleep(1000); + } + } +} diff --git a/ntrip_load-main/src/main/java/com/test/NtripProducer.java b/ntrip_load-main/src/main/java/com/test/NtripProducer.java new file mode 100644 index 0000000..5f3bd75 --- /dev/null +++ b/ntrip_load-main/src/main/java/com/test/NtripProducer.java @@ -0,0 +1,39 @@ +package com.test; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; + +public class NtripProducer implements Runnable{ + public static void main(String[] args) throws IOException { + ServerSocket sockServer=new ServerSocket(5015); + System.out.println("Ntrip producer listen on 5015"); + while (true){ + var sock=sockServer.accept(); + new Thread(new NtripProducer(sock)).start(); + } + } + + private final Socket socket; + public NtripProducer(Socket sock) { + this.socket=sock; + } + + @Override + public void run() { + int iteration=0; + try { + var out=socket.getOutputStream(); + while (true){ + System.out.println("iteration: "+iteration); + out.write(("iteration: "+iteration+"\r\n"+ "-".repeat(750)).getBytes()); + iteration++; + out.flush(); + + Thread.sleep(500); + } + }catch(Exception e){ + e.printStackTrace(); + } + } +} diff --git a/ntrip_load-main/src/main/java/com/test/NtripServer.java b/ntrip_load-main/src/main/java/com/test/NtripServer.java new file mode 100644 index 0000000..b91384d --- /dev/null +++ b/ntrip_load-main/src/main/java/com/test/NtripServer.java @@ -0,0 +1,118 @@ +package com.test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +public class NtripServer { + private static final Map mountPointMap=new HashMap<>(); + private static final AtomicInteger countClient=new AtomicInteger(); + + public static final class MountPoint implements Runnable{ + private final Socket socket; + private final List clients=new ArrayList<>(); + + public MountPoint(Socket socket){ + this.socket=socket; + } + + public void addClient(Socket socket){ + this.clients.add(socket); + System.out.println("Nb of client: "+countClient.incrementAndGet()); + } + + @Override + public void run() { + try { + var buffer=new byte[1024]; + var baos = new ByteArrayOutputStream(); + var is = socket.getInputStream(); + socket.getOutputStream().write("ICY 200 OK\r\n\r\n".getBytes()); + while (true){ + int len=is.read(buffer); + if (len>0){ + this.clients.removeIf(c->{ + try{ + c.getOutputStream().write(buffer,0, len); + return false; + }catch (Exception e){ + System.out.println("Nb of client: "+countClient.decrementAndGet()); + return true; + } + }); + } + } + }catch(Exception e){ + System.out.println("Nb of client: "+countClient.addAndGet(clients.size())); + e.printStackTrace(); + } + } + + } + + public static void main(String[] args) throws IOException { + System.out.println("Ntrip test server started"); + ServerSocket sockServer = new ServerSocket(2101); + + // 创建线程池 + ExecutorService executor = Executors.newCachedThreadPool(); + + try { + while (true) { + Socket sock = sockServer.accept(); + executor.submit(() -> processInit(sock)); + } + } finally { + executor.shutdown(); + } + } + + public static void processInit(Socket socket){ + try { + var buffer = new byte[1024]; + var baos = new ByteArrayOutputStream(); + var is = socket.getInputStream(); + while (!baos.toString().contains("\r\n\r\n") && !baos.toString().contains("\n\n")) { + int len = is.read(buffer); + if (len==-1)return; + baos.write(buffer, 0, len); + } + var req=baos.toString().lines().findFirst(); + if (req.isEmpty())return; + var method=req.get().split(" "); + if (method.length<3)return; + if ("GET".equals(method[0])){ + String mp=method[1].replace("/",""); + if (mountPointMap.containsKey(mp)){ + socket.getOutputStream().write("ICY 200 OK\r\n\r\n".getBytes()); + mountPointMap.get(mp).addClient(socket); + }else{ + System.out.println("mountpoint not found for client "+mp); + socket.close(); + } + } else if ("SOURCE".equals(method[0])) { + MountPoint mp = new MountPoint(socket); + mountPointMap.put(method[2], mp); + System.out.println("Mountpoint size: " + mountPointMap.size() + " added " + method[2]); + + // 使用传统线程代替虚拟线程 + Thread thread = new Thread(mp, "MOUNT " + method[2]); + thread.start(); + }else{ + System.out.println("Method "+method[0]+" not implemented"); + socket.close(); + } + }catch (Exception e){ + e.printStackTrace(); + } + } + +} diff --git a/ntrip_load-main/src/main/java/com/test/NtripTestClient.java b/ntrip_load-main/src/main/java/com/test/NtripTestClient.java new file mode 100644 index 0000000..59a0010 --- /dev/null +++ b/ntrip_load-main/src/main/java/com/test/NtripTestClient.java @@ -0,0 +1,126 @@ +package com.test; + +import java.net.Socket; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class NtripTestClient { + public static final int packetSize = 1500; + public static final int clientNb=40; + + + public static void main(String[] args) throws Exception { + List threadList = new ArrayList<>(); + + // 创建并启动发送线程 + Thread sendThread = new Thread(() -> sendThread(0)); + sendThread.start(); + threadList.add(sendThread); + + Thread.sleep(5000); + + // 创建并启动多个接收线程 + for (int i = 0; i < clientNb; i++) { + int multiplier = i; + Thread receiveThread = new Thread(() -> receiveThread(0, multiplier)); + receiveThread.setName("Client " + i); + receiveThread.start(); + threadList.add(receiveThread); + } + + // 等待所有线程完成 + threadList.forEach(thread -> { + try { + thread.join(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + + public static void receiveThread(int mountId, int multiplier) { + try { + Thread.sleep(random.nextInt(1000)); + receive(mountId, multiplier); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static Random random=new Random(); + public static void sendThread(int mountId) { + try { + Thread.sleep(random.nextInt(1000)); + send(mountId); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void receive(int mountId, int multiplier) throws Exception { + String request = """ + GET /TEST$i HTTP/1.0 + User-Agent: NTRIP LOADTEST/0.0.0 + Authorization: Basic Y2VudGlwZWRlOmxldG1laW4= + + """.replace("$i", ""+mountId); + + Socket sock = new Socket("127.0.0.1", 2101); + + var os = sock.getOutputStream(); + var in = sock.getInputStream(); + os.write(request.getBytes()); + String s = ""; + byte[] b = new byte[10024]; + while (!s.endsWith("\n\n") && !s.endsWith("\r\n\r\n")) { + int len = in.read(b); + if (len < 0) { + System.out.println("Error on receive: " + s); + return; + } + s += new String(b, 0, len); + //System.out.println(s); + } + System.out.println("ok"); + while (true) { + int len = in.read(b); + var data=new String(b, 0, len).replace("-","").replace("\n",""); + if (data.length()>0){ + System.out.println(Instant.now()+" "+multiplier+" "+data); + } + //System.out.println("<-"+mountId+" "+multiplier); + } + + } + + + public static void send(int mountId) throws Exception { + Socket sock = new Socket("127.0.0.1", 2101); + String request = """ + SOURCE letmein TEST$i + Source-Agent: NTRIP LOADTEST/0.0.0 + STR: + + """.replace("$i", ""+mountId); + var os = sock.getOutputStream(); + os.write(request.getBytes()); + String s = ""; + while (!s.endsWith("\n\n") && !s.endsWith("\r\n\r\n")) { + byte[] b = new byte[1024]; + int len = sock.getInputStream().read(b); + s += new String(b, 0, len); + System.out.println(s); + } + System.out.println("ok"); + int iteration = 0; + while (true) { + os.write(("iteration: " + iteration + "\r\n" + "-".repeat(packetSize)).getBytes()); + iteration++; + os.flush(); + //System.out.println("->"+mountId); + Thread.sleep(1000); + } + } +} diff --git a/ntrip_load-main/src/main/resources/application.yml b/ntrip_load-main/src/main/resources/application.yml new file mode 100644 index 0000000..7418558 --- /dev/null +++ b/ntrip_load-main/src/main/resources/application.yml @@ -0,0 +1,21 @@ +server: + port: 11111 + +spring: + datasource: + url: jdbc:mysql://localhost:3306/rtk_data?useSSL=false&serverTimezone=UTC + username: root + password: root + driver-class-name: com.mysql.cj.jdbc.Driver + jpa: + hibernate: + ddl-auto: update + show-sql: true + properties: + hibernate: + dialect: org.hibernate.dialect.MySQL8Dialect + format_sql: true + +logging: + level: + com.rtklib: DEBUG \ No newline at end of file diff --git a/ntrip_load-main/src/main/resources/logback.xml b/ntrip_load-main/src/main/resources/logback.xml new file mode 100644 index 0000000..841f9c9 --- /dev/null +++ b/ntrip_load-main/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + +