first commit

This commit is contained in:
fengyarnom 2025-05-19 16:59:25 +08:00
commit a3f3c78b49
33 changed files with 1577 additions and 0 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

18
.idea/compiler.xml generated Normal file
View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="rtklib-data-receiver" />
</profile>
</annotationProcessing>
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
<module name="rtklib-data-receiver" options="-parameters" />
</option>
</component>
</project>

6
.idea/encodings.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/ntrip_load-main/src/main/java" charset="UTF-8" />
</component>
</project>

20
.idea/jarRepositories.xml generated Normal file
View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>

14
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/ntrip_load-main/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" project-jdk-name="corretto-17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ntrip_load-main.iml" filepath="$PROJECT_DIR$/.idea/ntrip_load-main.iml" />
</modules>
</component>
</project>

9
.idea/ntrip_load-main.iml generated Normal file
View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

124
.idea/uiDesigner.xml generated Normal file
View File

@ -0,0 +1,124 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

1
ntrip_load-main/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
target/

8
ntrip_load-main/.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

19
ntrip_load-main/.idea/compiler.xml generated Normal file
View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="cprip_loadtest" />
</profile>
</annotationProcessing>
<bytecodeTargetLevel target="22" />
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
<module name="cprip_loadtest" options="-parameters" />
</option>
</component>
</project>

7
ntrip_load-main/.idea/encodings.xml generated Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="https://repo.maven.apache.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
</component>
</project>

12
ntrip_load-main/.idea/misc.xml generated Normal file
View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_21" project-jdk-name="openjdk-23" project-jdk-type="JavaSDK" />
</project>

41
ntrip_load-main/README.md Normal file
View File

@ -0,0 +1,41 @@
# ntrip_load
Dumb load test for caster.
Need openjdk java >21 (>22 ideally)
$ java ./src/main/java/com/test/NtripLoadTest.java
will launch multiple base + multiple client on a targetted caster ( constant set to 127.0.0.1 in the file )
-> Password set to letmein for mountpoint
-> password set to centipede:letmein for client
Mountpoint are named "TESTxx" with xx a number between 0 and max_number_of_mountpoint-1.
A "demo server" is also present
$ java ./src/main/java/com/test/NtripServer.java
Data created by the mountpoint are 1500 char + some. It contains "iteration: " + a incremental number + "-" repeated.
You can read a mountpoint with:
$ str2str -in ntrip://centipede:letmein@localhost:2101/TEST123 | sed "s/-//g"
so that you can just see the "iteration" number message (to check if some message are not lost, or delivered late)
if you want to manually create a mountpoint, you can use
$ java ./src/main/java/com/test/NtripProducer.java
which will listen on TCP 5015 and will send the message "iteration: " + a incremental number + "-" repeated.
so that you can use:
str2str -in tcpcli://localhost:5015 -out ntrips://:letmein@localhost:2101/MYTEST

89
ntrip_load-main/pom.xml Normal file
View File

@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version> <!-- 升级到 Spring Boot 3.x它使用 Jakarta EE 9+ -->
</parent>
<groupId>com.rtklib</groupId>
<artifactId>rtklib-data-receiver</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>17</java.version> <!-- Spring Boot 3.x 至少需要 Java 17 -->
</properties>
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 添加 Jakarta WebSocket API -->
<dependency>
<groupId>jakarta.websocket</groupId>
<artifactId>jakarta.websocket-api</artifactId>
<version>2.1.0</version>
</dependency>
<!-- Database -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version> <!-- 更新到与 Spring Boot 3 兼容的版本 -->
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>21</source>
<target>21</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,6 @@
sqlalchemy==2.0.23
pymysql==1.1.0
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.4.2

View File

@ -0,0 +1,13 @@
package com.rtklib;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class RtkDataReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(RtkDataReceiverApplication.class, args);
}
}

View File

@ -0,0 +1,273 @@
package com.rtklib;
import com.rtklib.entity.RtkData;
import com.rtklib.repository.RtkDataRepository;
import com.rtklib.util.PwvCalculator;
import com.rtklib.util.TimeConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@Component
public class TcpRtkServer implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(TcpRtkServer.class);
private static final int PORT = 5015; // 可根据需要修改端口
// 气象数据库连接信息
private static final String RAIN_DB_URL = "jdbc:mysql://8.134.185.53:3306/rain_db?useSSL=true";
private static final String RAIN_DB_USER = "remote";
private static final String RAIN_DB_PASSWORD = "root";
@Autowired
private RtkDataRepository rtkDataRepository;
// 使用AtomicReference来存储最新的数据
private final AtomicReference<RtkData> latestData = new AtomicReference<>();
// 存储最后一次保存的时间
private LocalDateTime lastSaveTime;
@Override
public void run(String... args) throws Exception {
new Thread(this::startServer, "TcpRtkServer-Main").start();
}
private void startServer() {
try (ServerSocket serverSocket = new ServerSocket(PORT)) {
logger.info("TCP RTK Server started on port {}", PORT);
while (true) {
Socket clientSocket = serverSocket.accept();
new Thread(() -> handleClient(clientSocket), "TcpRtkServer-Client").start();
}
} catch (Exception e) {
logger.error("TCP Server error: {}", e.getMessage(), e);
}
}
private void handleClient(Socket socket) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("$POS")) {
parsePosData(line);
} else if (line.startsWith("$TROP")) {
parseTropData(line);
} else if (line.startsWith("$TRPG")) {
parseTrpgData(line);
} else if (line.startsWith("$SATE")) {
parseSateData(line);
}
}
} catch (Exception e) {
logger.error("Error handling client: {}", e.getMessage(), e);
}
}
private void parsePosData(String message) {
try {
String[] parts = message.split(",");
// 获取当前数据如果有则保留原有数据否则创建新对象
RtkData rtkData = latestData.get();
if (rtkData == null) {
rtkData = new RtkData();
}
int week = Integer.parseInt(parts[1]);
double timeOfWeek = Double.parseDouble(parts[2]);
rtkData.setTimestamp(TimeConverter.gpsToBeijingTime(week, timeOfWeek));
rtkData.setWeek(week);
rtkData.setTimeOfWeek(timeOfWeek);
rtkData.setLatitude(Double.parseDouble(parts[4]));
rtkData.setLongitude(Double.parseDouble(parts[5]));
rtkData.setHeight(Double.parseDouble(parts[6]));
latestData.set(rtkData);
logger.debug("Parsed $POS data: {}", rtkData);
} catch (Exception e) {
logger.error("Error parsing $POS data: {}", e.getMessage(), e);
}
}
private void parseTropData(String message) {
try {
RtkData rtkData = latestData.get();
if (rtkData != null) {
String[] parts = message.split(",");
rtkData.setZtd(Double.parseDouble(parts[5]));
latestData.set(rtkData);
logger.debug("Updated $TROP data: ZTD={}", parts[5]);
}
} catch (Exception e) {
logger.error("Error parsing $TROP data: {}", e.getMessage(), e);
}
}
private void parseTrpgData(String message) {
try {
RtkData rtkData = latestData.get();
if (rtkData != null) {
String[] parts = message.split(",");
// 打印完整消息格式以便调试
logger.info("$TRPG消息格式: {}", message);
// 安全检查数组长度
if (parts.length > 6) {
try {
double north = Double.parseDouble(parts[5]);
double east = Double.parseDouble(parts[6]);
rtkData.setZtdGradientNorth(north);
rtkData.setZtdGradientEast(east);
latestData.set(rtkData);
logger.info("成功解析$TRPG数据: North={}, East={}", north, east);
} catch (NumberFormatException e) {
logger.error("$TRPG数值解析错误位置5和6: {}, {}", parts[5], parts[6], e);
}
} else {
logger.warn("$TRPG消息格式不正确数组长度不足: {}", parts.length);
// 尝试不同的索引位置例如假设格式可能是$TRPG,北梯度值,东梯度值
if (parts.length > 2) {
try {
double north = Double.parseDouble(parts[1]);
double east = Double.parseDouble(parts[2]);
rtkData.setZtdGradientNorth(north);
rtkData.setZtdGradientEast(east);
latestData.set(rtkData);
logger.info("使用替代格式解析$TRPG数据: North={}, East={}", north, east);
} catch (Exception ex) {
logger.error("尝试替代格式解析$TRPG失败", ex);
}
}
}
} else {
logger.warn("收到$TRPG数据但尚未收到$POS数据初始化RtkData对象");
}
} catch (Exception e) {
logger.error("解析$TRPG数据发生异常: {}", e.getMessage(), e);
}
}
private void parseSateData(String message) {
try {
RtkData rtkData = latestData.get();
if (rtkData != null) {
rtkData.setSatelliteInfo(message);
latestData.set(rtkData);
logger.debug("Updated $SATE data");
}
} catch (Exception e) {
logger.error("Error parsing $SATE data: {}", e.getMessage(), e);
}
}
/**
* 获取最新的传感器数据
* @return 返回包含温度和气压的Map如果查询失败则返回null
*/
private Map<String, Double> getLatestSensorData() {
try (Connection conn = DriverManager.getConnection(RAIN_DB_URL, RAIN_DB_USER, RAIN_DB_PASSWORD)) {
String query = "SELECT temperature/10 AS temperature, atm_pressure AS pressure " +
"FROM sensor_data ORDER BY timestamp DESC LIMIT 1";
try (PreparedStatement ps = conn.prepareStatement(query);
ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
double temperature = rs.getDouble("temperature");
double pressure = rs.getDouble("pressure");
logger.info("获取到最新传感器数据: 温度={}℃, 气压={}hPa", temperature, pressure);
return Map.of("temperature", temperature, "pressure", pressure);
} else {
logger.warn("未找到传感器数据");
return null;
}
}
} catch (Exception e) {
logger.error("获取传感器数据失败: {}", e.getMessage(), e);
return null;
}
}
/**
* 计算PWV并准备保存数据
*/
private void calculateAndPreparePWV() {
try {
RtkData currentData = latestData.get();
if (currentData == null || currentData.getZtd() == null) {
logger.warn("没有可用的ZTD数据无法计算PWV");
return;
}
// 获取最新传感器数据
Map<String, Double> sensorData = getLatestSensorData();
if (sensorData == null) {
logger.warn("无法获取传感器数据使用默认值进行PWV计算");
// 使用默认值温度25气压1000hPa
currentData.setSurfaceTemp(25.0);
currentData.setSurfacePressure(1000.0);
} else {
currentData.setSurfaceTemp(sensorData.get("temperature"));
currentData.setSurfacePressure(sensorData.get("pressure"));
}
// 计算PWV
boolean success = PwvCalculator.calculatePWV(currentData);
if (success) {
latestData.set(currentData);
logger.info("PWV计算成功结果: {}", currentData.getPwv());
} else {
logger.warn("PWV计算失败");
}
} catch (Exception e) {
logger.error("计算PWV过程中发生错误: {}", e.getMessage(), e);
}
}
// 每10分钟执行一次数据保存
@Scheduled(fixedRate = 6000) // 600000毫秒 = 10分钟
public void saveData() {
try {
RtkData currentData = latestData.get();
if (currentData == null) {
logger.warn("没有可用的RTK数据跳过本次保存");
return;
}
LocalDateTime now = LocalDateTime.now();
// 计算PWV
calculateAndPreparePWV();
// 检查是否需要保存距离上次保存超过10分钟
if (lastSaveTime == null ||
ChronoUnit.MINUTES.between(lastSaveTime, now) >= 1) { // 调试时设为1分钟实际使用时改为10
rtkDataRepository.save(currentData);
lastSaveTime = now;
logger.info("成功保存RTK数据: 时间={}, ZTD={}, PWV={}",
currentData.getTimestamp(), currentData.getZtd(), currentData.getPwv());
}
} catch (Exception e) {
logger.error("保存数据过程中发生错误: {}", e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,22 @@
package com.rtklib.config;
import com.rtklib.websocket.RtkWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(rtkWebSocketHandler(), "/rtk").setAllowedOrigins("*");
}
@Bean
public RtkWebSocketHandler rtkWebSocketHandler() {
return new RtkWebSocketHandler();
}
}

View File

@ -0,0 +1,53 @@
package com.rtklib.entity;
import lombok.Data;
import jakarta.persistence.*;
import java.time.LocalDateTime;
@Data
@Entity
@Table(name = "rtk_data")
public class RtkData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "timestamp")
private LocalDateTime timestamp;
@Column(name = "week")
private Integer week;
@Column(name = "time_of_week")
private Double timeOfWeek;
@Column(name = "latitude")
private Double latitude;
@Column(name = "longitude")
private Double longitude;
@Column(name = "height")
private Double height;
@Column(name = "ztd")
private Double ztd;
@Column(name = "ztd_gradient_north")
private Double ztdGradientNorth;
@Column(name = "ztd_gradient_east")
private Double ztdGradientEast;
@Column(name = "satellite_info", length = 1000)
private String satelliteInfo;
@Column(name = "pwv")
private Double pwv;
@Transient // 不存入数据库的临时字段
private Double surfacePressure;
@Transient // 不存入数据库的临时字段
private Double surfaceTemp;
}

View File

@ -0,0 +1,9 @@
package com.rtklib.repository;
import com.rtklib.entity.RtkData;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface RtkDataRepository extends JpaRepository<RtkData, Long> {
}

View File

@ -0,0 +1,72 @@
package com.rtklib.util;
import com.rtklib.entity.RtkData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* PWV计算工具类
*/
public class PwvCalculator {
private static final Logger logger = LoggerFactory.getLogger(PwvCalculator.class);
// PWV计算需要的常量
private static final double ROUW = 1e3; // 水密度kg/m³
private static final double RV = 461.5; // 水汽气体常数J/(kg·K)
private static final double K2P = 22.1 / 100; // 转换为 K/Pa
private static final double K3 = 3.739e5 / 100; // 转换为 K²/Pa
/**
* 计算PWV值
* @param data RtkData对象包含ZTD和气象数据
* @return 计算成功返回true否则返回false
*/
public static boolean calculatePWV(RtkData data) {
try {
if (data.getZtd() == null || data.getSurfacePressure() == null ||
data.getSurfaceTemp() == null || data.getLatitude() == null ||
data.getHeight() == null) {
logger.warn("计算PWV所需数据不完整: ZTD={}, 气压={}, 温度={}, 纬度={}, 高度={}",
data.getZtd(), data.getSurfacePressure(), data.getSurfaceTemp(),
data.getLatitude(), data.getHeight());
return false;
}
// 1. 计算ZHD静力学延迟标准Saastamoinen公式
double lat = Math.toRadians(data.getLatitude()); // 转换为弧度
double h = data.getHeight() / 1000.0; // 高度转换为千米
double p = data.getSurfacePressure(); // 气压单位hPa
// 计算ZHD (单位mm)
// 标准公式ZHD = (2.2767 * p) / (1 - 0.00266 * cos() - 0.00028 * h)
double f = 1.0 - 0.00266 * Math.cos(2 * lat) - 0.00028 * h;
double zhd = (2.2767 * p) / f;
// 2. 计算ZWD湿延迟单位mm
double zwd = data.getZtd() - zhd;
// 记录中间计算结果
logger.info("ZHD计算中间值: 纬度={}°({}rad), 高度={}m({}km), 气压={}hPa, f={}, ZHD={}mm",
data.getLatitude(), lat, data.getHeight(), h, p, f, zhd);
// 3. 计算Tm加权平均温度
double ts_K = data.getSurfaceTemp() + 273.15; // 转换为开尔文温度
double tm = 70.2 + 0.72 * ts_K;
// 4. 计算转换因子tao
double b = 1e6;
double tao = b / (ROUW * RV * (K2P + K3 / tm));
// 5. 计算PWV以毫米为单位
double pwv = zwd * tao;
data.setPwv(pwv);
logger.info("PWV计算成功: ZTD={}mm, ZHD={}mm, ZWD={}mm, Tm={}K, PWV={}mm",
data.getZtd(), zhd, zwd, tm, pwv);
return true;
} catch (Exception e) {
logger.error("计算PWV时发生错误: {}", e.getMessage(), e);
return false;
}
}
}

View File

@ -0,0 +1,28 @@
package com.rtklib.util;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
public class TimeConverter {
// GPS起始时间1980-01-06 00:00:00 UTC
private static final LocalDateTime GPS_EPOCH = LocalDateTime.of(1980, 1, 6, 0, 0, 0);
// 北京时区
private static final ZoneId BEIJING_ZONE = ZoneId.of("Asia/Shanghai");
// 当前闰秒数需要定期更新
private static final int CURRENT_LEAP_SECONDS = 18;
public static LocalDateTime gpsToBeijingTime(int week, double timeOfWeek) {
// 计算GPS时间UTC
LocalDateTime gpsTime = GPS_EPOCH
.plus(week, ChronoUnit.WEEKS)
.plus((long) timeOfWeek, ChronoUnit.SECONDS);
// 转换为北京时间考虑闰秒
return gpsTime
.plus(CURRENT_LEAP_SECONDS, ChronoUnit.SECONDS)
.atZone(ZoneId.of("UTC"))
.withZoneSameInstant(BEIJING_ZONE)
.toLocalDateTime();
}
}

View File

@ -0,0 +1,110 @@
package com.rtklib.websocket;
import com.rtklib.entity.RtkData;
import com.rtklib.repository.RtkDataRepository;
import com.rtklib.util.TimeConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicReference;
public class RtkWebSocketHandler extends TextWebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(RtkWebSocketHandler.class);
private static RtkDataRepository rtkDataRepository;
private static final AtomicReference<RtkData> latestData = new AtomicReference<>();
private static LocalDateTime lastSaveTime;
@Autowired
public void setRtkDataRepository(RtkDataRepository repository) {
rtkDataRepository = repository;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) {
logger.info("New connection established: {}", session.getId());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
logger.info("Connection closed: {}", session.getId());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
try {
String msg = message.getPayload();
if (msg.startsWith("$POS")) {
parsePosData(msg);
} else if (msg.startsWith("$TROP")) {
parseTropData(msg);
} else if (msg.startsWith("$TRPG")) {
parseTrpgData(msg);
} else if (msg.startsWith("$SATE")) {
parseSateData(msg);
}
} catch (Exception e) {
logger.error("Error processing RTK data: {}", e.getMessage());
}
}
private void parsePosData(String message) {
String[] parts = message.split(",");
RtkData rtkData = new RtkData();
int week = Integer.parseInt(parts[1]);
double timeOfWeek = Double.parseDouble(parts[2]);
rtkData.setTimestamp(TimeConverter.gpsToBeijingTime(week, timeOfWeek));
rtkData.setWeek(week);
rtkData.setTimeOfWeek(timeOfWeek);
rtkData.setLatitude(Double.parseDouble(parts[4]));
rtkData.setLongitude(Double.parseDouble(parts[5]));
rtkData.setHeight(Double.parseDouble(parts[6]));
latestData.set(rtkData);
}
private void parseTropData(String message) {
RtkData rtkData = latestData.get();
if (rtkData != null) {
String[] parts = message.split(",");
rtkData.setZtd(Double.parseDouble(parts[5]));
latestData.set(rtkData);
}
}
private void parseTrpgData(String message) {
RtkData rtkData = latestData.get();
if (rtkData != null) {
String[] parts = message.split(",");
rtkData.setZtdGradientNorth(Double.parseDouble(parts[5]));
rtkData.setZtdGradientEast(Double.parseDouble(parts[6]));
latestData.set(rtkData);
}
}
private void parseSateData(String message) {
RtkData rtkData = latestData.get();
if (rtkData != null) {
rtkData.setSatelliteInfo(message);
latestData.set(rtkData);
}
}
@Scheduled(fixedRate = 600000)
public void saveData() {
RtkData currentData = latestData.get();
if (currentData != null) {
LocalDateTime now = LocalDateTime.now();
if (lastSaveTime == null || ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) {
rtkDataRepository.save(currentData);
lastSaveTime = now;
logger.info("Saved RTK data at: {}", now);
}
}
}
}

View File

@ -0,0 +1,136 @@
package com.rtklib.websocket;
import jakarta.websocket.Session;
import com.rtklib.entity.RtkData;
import com.rtklib.repository.RtkDataRepository;
import com.rtklib.util.TimeConverter;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
@Component
@ServerEndpoint("/rtk")
public class RtkWebSocketServer {
private static final Logger logger = LoggerFactory.getLogger(RtkWebSocketServer.class);
private static RtkDataRepository rtkDataRepository;
private static final Map<String, Session> SESSIONS = new ConcurrentHashMap<>();
// 使用AtomicReference来存储最新的数据
private static final AtomicReference<RtkData> latestData = new AtomicReference<>();
// 存储最后一次保存的时间
private static LocalDateTime lastSaveTime;
@Autowired
public void setRtkDataRepository(RtkDataRepository repository) {
rtkDataRepository = repository;
}
@OnOpen
public void onOpen(Session session) {
SESSIONS.put(session.getId(), session);
logger.info("New connection established: {}", session.getId());
}
@OnClose
public void onClose(Session session) {
SESSIONS.remove(session.getId());
logger.info("Connection closed: {}", session.getId());
}
@OnMessage
public void onMessage(String message, Session session) {
try {
if (message.startsWith("$POS")) {
parsePosData(message);
} else if (message.startsWith("$TROP")) {
parseTropData(message);
} else if (message.startsWith("$TRPG")) {
parseTrpgData(message);
} else if (message.startsWith("$SATE")) {
parseSateData(message);
}
} catch (Exception e) {
logger.error("Error processing RTK data: {}", e.getMessage());
}
}
@OnError
public void onError(Session session, Throwable error) {
logger.error("WebSocket error: {}", error.getMessage());
}
private void parsePosData(String message) {
String[] parts = message.split(",");
RtkData rtkData = new RtkData();
int week = Integer.parseInt(parts[1]);
double timeOfWeek = Double.parseDouble(parts[2]);
// 转换GPS时间到北京时间
rtkData.setTimestamp(TimeConverter.gpsToBeijingTime(week, timeOfWeek));
rtkData.setWeek(week);
rtkData.setTimeOfWeek(timeOfWeek);
rtkData.setLatitude(Double.parseDouble(parts[4]));
rtkData.setLongitude(Double.parseDouble(parts[5]));
rtkData.setHeight(Double.parseDouble(parts[6]));
latestData.set(rtkData);
}
private void parseTropData(String message) {
RtkData rtkData = latestData.get();
if (rtkData != null) {
String[] parts = message.split(",");
rtkData.setZtd(Double.parseDouble(parts[5]));
latestData.set(rtkData);
}
}
private void parseTrpgData(String message) {
RtkData rtkData = latestData.get();
if (rtkData != null) {
String[] parts = message.split(",");
rtkData.setZtdGradientNorth(Double.parseDouble(parts[5]));
rtkData.setZtdGradientEast(Double.parseDouble(parts[6]));
latestData.set(rtkData);
}
}
private void parseSateData(String message) {
RtkData rtkData = latestData.get();
if (rtkData != null) {
rtkData.setSatelliteInfo(message);
latestData.set(rtkData);
}
}
// 每10分钟执行一次数据保存
@Scheduled(fixedRate = 600000) // 600000毫秒 = 10分钟
public void saveData() {
RtkData currentData = latestData.get();
if (currentData != null) {
LocalDateTime now = LocalDateTime.now();
// 检查是否需要保存距离上次保存超过10分钟
if (lastSaveTime == null ||
ChronoUnit.MINUTES.between(lastSaveTime, now) >= 10) {
rtkDataRepository.save(currentData);
lastSaveTime = now;
logger.info("Saved RTK data at: {}", now);
}
}
}
}

View File

@ -0,0 +1,130 @@
package com.test;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class NtripLoadTest {
public static final String TARGETTED_SERVER="127.0.0.1";
public static final String casterPassword="letmein";
public static final int packetSize = 1500;
public static final int mountNb=1000;
public static void main(String[] args) throws Exception {
System.out.println("Will launch "+mountNb+" mountpoint + 90%*3 clients");
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < mountNb; i++) {
int mountId=i;
Thread thread = new Thread(() -> sendThread(mountId));
thread.start();
threadList.add(thread);
}
Thread.sleep(5000);
for (int i = 0; i < mountNb; i++) {
if (i % 10 == 0) {
continue;
}
for (int j = 0; j < 3; j++) {
int mountId=i;
int multiplier=j;
Thread thread = new Thread(() -> sendThread(mountId));
thread.start();
threadList.add(thread);
}
}
threadList.forEach(x -> {
try {
x.join();
} catch (Exception e) {
e.printStackTrace();
}
});
}
public static void receiveThread(int mountId, int multiplier) {
try {
Thread.sleep(random.nextInt(1000));
receive(mountId, multiplier);
} catch (Exception e) {
e.printStackTrace();
}
}
public static Random random=new Random();
public static void sendThread(int mountId) {
try {
Thread.sleep(random.nextInt(1000));
send(mountId);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void receive(int mountId, int multiplier) throws Exception {
String request = """
GET /TEST$i HTTP/1.0
User-Agent: NTRIP LOADTEST/0.0.0
Authorization: Basic Y2VudGlwZWRlOmxldG1laW4=
""".replace("$i", ""+mountId);
Socket sock = new Socket(TARGETTED_SERVER, 2101);
var os = sock.getOutputStream();
var in = sock.getInputStream();
os.write(request.getBytes());
String s = "";
byte[] b = new byte[10024];
while (!s.endsWith("\n\n") && !s.endsWith("\r\n\r\n")) {
int len = in.read(b);
if (len < 0) {
System.out.println("Error on receive: " + s);
return;
}
s += new String(b, 0, len);
//System.out.println(s);
}
System.out.println("ok");
while (true) {
int len = in.read(b);
if (len==0){
continue;
}
//System.out.println("<-"+mountId+" "+multiplier);
}
}
public static void send(int mountId) throws Exception {
Socket sock = new Socket(TARGETTED_SERVER, 2101);
String request = """
SOURCE $passwd TEST$i
Source-Agent: NTRIP LOADTEST/0.0.0
STR:
""".replace("$i", ""+mountId)
.replace("$passwd", casterPassword);
var os = sock.getOutputStream();
os.write(request.getBytes());
String s = "";
while (!s.endsWith("\n\n") && !s.endsWith("\r\n\r\n")) {
byte[] b = new byte[1024];
int len = sock.getInputStream().read(b);
s += new String(b, 0, len);
System.out.println(s);
}
System.out.println("ok");
int iteration = 0;
while (true) {
os.write(("iteration: " + iteration + "\r\n" + "a".repeat(packetSize)).getBytes());
iteration++;
os.flush();
//System.out.println("->"+mountId);
Thread.sleep(1000);
}
}
}

View File

@ -0,0 +1,39 @@
package com.test;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class NtripProducer implements Runnable{
public static void main(String[] args) throws IOException {
ServerSocket sockServer=new ServerSocket(5015);
System.out.println("Ntrip producer listen on 5015");
while (true){
var sock=sockServer.accept();
new Thread(new NtripProducer(sock)).start();
}
}
private final Socket socket;
public NtripProducer(Socket sock) {
this.socket=sock;
}
@Override
public void run() {
int iteration=0;
try {
var out=socket.getOutputStream();
while (true){
System.out.println("iteration: "+iteration);
out.write(("iteration: "+iteration+"\r\n"+ "-".repeat(750)).getBytes());
iteration++;
out.flush();
Thread.sleep(500);
}
}catch(Exception e){
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,118 @@
package com.test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class NtripServer {
private static final Map<String, MountPoint> mountPointMap=new HashMap<>();
private static final AtomicInteger countClient=new AtomicInteger();
public static final class MountPoint implements Runnable{
private final Socket socket;
private final List<Socket> clients=new ArrayList<>();
public MountPoint(Socket socket){
this.socket=socket;
}
public void addClient(Socket socket){
this.clients.add(socket);
System.out.println("Nb of client: "+countClient.incrementAndGet());
}
@Override
public void run() {
try {
var buffer=new byte[1024];
var baos = new ByteArrayOutputStream();
var is = socket.getInputStream();
socket.getOutputStream().write("ICY 200 OK\r\n\r\n".getBytes());
while (true){
int len=is.read(buffer);
if (len>0){
this.clients.removeIf(c->{
try{
c.getOutputStream().write(buffer,0, len);
return false;
}catch (Exception e){
System.out.println("Nb of client: "+countClient.decrementAndGet());
return true;
}
});
}
}
}catch(Exception e){
System.out.println("Nb of client: "+countClient.addAndGet(clients.size()));
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
System.out.println("Ntrip test server started");
ServerSocket sockServer = new ServerSocket(2101);
// 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
try {
while (true) {
Socket sock = sockServer.accept();
executor.submit(() -> processInit(sock));
}
} finally {
executor.shutdown();
}
}
public static void processInit(Socket socket){
try {
var buffer = new byte[1024];
var baos = new ByteArrayOutputStream();
var is = socket.getInputStream();
while (!baos.toString().contains("\r\n\r\n") && !baos.toString().contains("\n\n")) {
int len = is.read(buffer);
if (len==-1)return;
baos.write(buffer, 0, len);
}
var req=baos.toString().lines().findFirst();
if (req.isEmpty())return;
var method=req.get().split(" ");
if (method.length<3)return;
if ("GET".equals(method[0])){
String mp=method[1].replace("/","");
if (mountPointMap.containsKey(mp)){
socket.getOutputStream().write("ICY 200 OK\r\n\r\n".getBytes());
mountPointMap.get(mp).addClient(socket);
}else{
System.out.println("mountpoint not found for client "+mp);
socket.close();
}
} else if ("SOURCE".equals(method[0])) {
MountPoint mp = new MountPoint(socket);
mountPointMap.put(method[2], mp);
System.out.println("Mountpoint size: " + mountPointMap.size() + " added " + method[2]);
// 使用传统线程代替虚拟线程
Thread thread = new Thread(mp, "MOUNT " + method[2]);
thread.start();
}else{
System.out.println("Method "+method[0]+" not implemented");
socket.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,126 @@
package com.test;
import java.net.Socket;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class NtripTestClient {
public static final int packetSize = 1500;
public static final int clientNb=40;
public static void main(String[] args) throws Exception {
List<Thread> threadList = new ArrayList<>();
// 创建并启动发送线程
Thread sendThread = new Thread(() -> sendThread(0));
sendThread.start();
threadList.add(sendThread);
Thread.sleep(5000);
// 创建并启动多个接收线程
for (int i = 0; i < clientNb; i++) {
int multiplier = i;
Thread receiveThread = new Thread(() -> receiveThread(0, multiplier));
receiveThread.setName("Client " + i);
receiveThread.start();
threadList.add(receiveThread);
}
// 等待所有线程完成
threadList.forEach(thread -> {
try {
thread.join();
} catch (Exception e) {
e.printStackTrace();
}
});
}
public static void receiveThread(int mountId, int multiplier) {
try {
Thread.sleep(random.nextInt(1000));
receive(mountId, multiplier);
} catch (Exception e) {
e.printStackTrace();
}
}
public static Random random=new Random();
public static void sendThread(int mountId) {
try {
Thread.sleep(random.nextInt(1000));
send(mountId);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void receive(int mountId, int multiplier) throws Exception {
String request = """
GET /TEST$i HTTP/1.0
User-Agent: NTRIP LOADTEST/0.0.0
Authorization: Basic Y2VudGlwZWRlOmxldG1laW4=
""".replace("$i", ""+mountId);
Socket sock = new Socket("127.0.0.1", 2101);
var os = sock.getOutputStream();
var in = sock.getInputStream();
os.write(request.getBytes());
String s = "";
byte[] b = new byte[10024];
while (!s.endsWith("\n\n") && !s.endsWith("\r\n\r\n")) {
int len = in.read(b);
if (len < 0) {
System.out.println("Error on receive: " + s);
return;
}
s += new String(b, 0, len);
//System.out.println(s);
}
System.out.println("ok");
while (true) {
int len = in.read(b);
var data=new String(b, 0, len).replace("-","").replace("\n","");
if (data.length()>0){
System.out.println(Instant.now()+" "+multiplier+" "+data);
}
//System.out.println("<-"+mountId+" "+multiplier);
}
}
public static void send(int mountId) throws Exception {
Socket sock = new Socket("127.0.0.1", 2101);
String request = """
SOURCE letmein TEST$i
Source-Agent: NTRIP LOADTEST/0.0.0
STR:
""".replace("$i", ""+mountId);
var os = sock.getOutputStream();
os.write(request.getBytes());
String s = "";
while (!s.endsWith("\n\n") && !s.endsWith("\r\n\r\n")) {
byte[] b = new byte[1024];
int len = sock.getInputStream().read(b);
s += new String(b, 0, len);
System.out.println(s);
}
System.out.println("ok");
int iteration = 0;
while (true) {
os.write(("iteration: " + iteration + "\r\n" + "-".repeat(packetSize)).getBytes());
iteration++;
os.flush();
//System.out.println("->"+mountId);
Thread.sleep(1000);
}
}
}

View File

@ -0,0 +1,21 @@
server:
port: 11111
spring:
datasource:
url: jdbc:mysql://localhost:3306/rtk_data?useSSL=false&serverTimezone=UTC
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
format_sql: true
logging:
level:
com.rtklib: DEBUG

View File

@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>