1、把倾角计数据的处理挪到一个新的服务InclineServer

2、InclineServer处理D350,推送第三方
This commit is contained in:
weidong 2025-10-16 09:23:20 +08:00
parent 7b63f24af5
commit a678491098
20 changed files with 1123 additions and 0 deletions

133
sec-incline-server/pom.xml Normal file
View File

@ -0,0 +1,133 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.imdroid</groupId>
<artifactId>security-monitor</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>sec-incline-server</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version><!--$NO-MVN-MAN-VER$ -->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.78.Final</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>com.imdroid</groupId>
<artifactId>sec-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.imdroid</groupId>
<artifactId>sec-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<!--mqtt相关依赖-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>central</id>
<name>ali-mirror</name>
<url>https://maven.aliyun.com/repository/central</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,23 @@
package com.imdroid.inclide_server;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.ComponentScan;
/**
* @author Layton
* @date 2023/1/31 20:33
*/
@SpringBootApplication(scanBasePackages = {"com.imdroid"})
@MapperScan({"com.imdroid.secapi"})
@ComponentScan({"com.imdroid.*"})
@EntityScan({"com.imdroid.*"})
public class InclineServerApp {
public static void main(String[] args) {
SpringApplication.run(InclineServerApp.class, args);
}
}

View File

@ -0,0 +1,44 @@
package com.imdroid.inclide_server.entity;
import lombok.Data;
import java.util.List;
@Data
public class InclineData {
private String ProjectID;
private String WorkPointID;
private double WorkPointLng = 0;
private double WorkPointLat = 0;
private List<Data> data;
@lombok.Data
public static class Data {
private String DevNum;
private String Devtype; //"ZdQx"
private double DevLng = 0;
private double DevLat = 0;
//角度
private double x;
private double y;
private double z;
//加速度振幅
private double Zx;
private double Zy;
private double Zz;
private String DataTime;
}
}

View File

@ -0,0 +1,14 @@
package com.imdroid.inclide_server.exception;
/**
* 设备消息校验异常
*
* @author LiGang
* @date 2023/4/8 10:55
*/
public class MessageValidateFailException extends RuntimeException {
public MessageValidateFailException(String message) {
super(message);
}
}

View File

@ -0,0 +1,8 @@
package com.imdroid.inclide_server.exception;
/**
* @author Layton
* @date 2023/2/2 20:43
*/
public class UnSupportedMessageException extends RuntimeException {
}

View File

@ -0,0 +1,28 @@
package com.imdroid.inclide_server.executor;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
@Component
public class BizExecutors {
private static HashMap<Class<?>, Object> executors = new HashMap<>();
private static List<Executor<?, ?>> executorList;
public BizExecutors(List<Executor<?, ?>> executorList) {
this.executorList = executorList;
for (Executor<?, ?> executor : this.executorList) {
System.out.println("executor type:" + executor.getMessageType().getName());
executors.put(executor.getMessageType(), executor);
}
}
public static <Q, R> R execute(Q query) {
Executor<Q, R> executor = (Executor<Q, R>)(executors.get(query.getClass()));
return executor.execute(query);
}
}

View File

@ -0,0 +1,90 @@
package com.imdroid.inclide_server.executor;
import com.github.yulichang.query.MPJQueryWrapper;
import com.imdroid.common.util.ThreadManager;
import com.imdroid.inclide_server.message.D350SurfaceInclineMessage;
import com.imdroid.inclide_server.service.InclineDataForwarder;
import com.imdroid.secapi.dto.GnssDevice;
import com.imdroid.secapi.dto.GnssDeviceJoin;
import com.imdroid.secapi.dto.GnssDeviceMapper;
import com.imdroid.secapi.dto.SurfaceInclineDataMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Layton
* @date 2023/2/2 20:40
* 1回ACK以便终端判断是否连接上服务器后台
* 2同步参数
* 3判断是否发冷启动指令
* 4保存状态信息判断是否有低电压等告警清除离线告警
*/
@Configuration
@EnableScheduling
public class D350SurfaceInclineMessageExecutor implements Executor<D350SurfaceInclineMessage, Void> {
@Autowired
SurfaceInclineDataMapper dataMapper;
@Autowired
GnssDeviceMapper deviceMapper;
@Autowired
InclineDataForwarder dataForwarder;
ConcurrentHashMap<String, GnssDeviceJoin> deviceInfoMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, LocalDateTime> deviceAliveMap = new ConcurrentHashMap<>();
@Override
public Void execute(D350SurfaceInclineMessage message) {
GnssDeviceJoin device = getDevice(message.getId());
if(device == null) return null;
deviceAliveMap.put(message.getId(), LocalDateTime.now());
// 补齐tenantId
message.getInclineData().setTenantid(device.getTenantid());
ThreadManager.getFixedThreadPool().submit(() -> {
//保存状态信息判断是否有低电压等告警清除离线告警
dataMapper.insert(message.getInclineData());
//推送
dataForwarder.send(device, message);
});
return null;
}
@Override
public Class<?> getMessageType() {
return D350SurfaceInclineMessage.class;
}
GnssDeviceJoin getDevice(String deviceId){
GnssDeviceJoin device = deviceInfoMap.get(deviceId);
if(device == null){
MPJQueryWrapper jquery = new MPJQueryWrapper<GnssDevice>()
.selectAll(GnssDevice.class)
.select("d.latitude as latitude")
.select("d.longitude as longitude")
.leftJoin("gnssstatus d on t.deviceid = d.deviceid")
.last("limit 1");
device = deviceMapper.selectJoinOne(GnssDeviceJoin.class, jquery);
if(device != null) deviceInfoMap.put(deviceId,device);
}
return device;
}
@Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次
void updateDeviceAliveTime(){
LocalDateTime now = LocalDateTime.now();
for(Map.Entry<String,LocalDateTime> entry:deviceAliveMap.entrySet()){
if(entry.getValue().isBefore(now.minusHours(2))){
deviceAliveMap.remove(entry);
deviceInfoMap.remove(entry.getKey());
}
}
}
}

View File

@ -0,0 +1,12 @@
package com.imdroid.inclide_server.executor;
/**
* @author Layton
* @date 2022/4/8 22:32
*/
public interface Executor<Q, R> {
R execute(Q message);
Class<?> getMessageType();
}

View File

@ -0,0 +1,40 @@
package com.imdroid.inclide_server.executor;
import com.imdroid.inclide_server.exception.UnSupportedMessageException;
import com.imdroid.inclide_server.message.BaseMessage;
import com.imdroid.inclide_server.message.D350SurfaceInclineMessage;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Map;
/**
* @author Layton
* @date 2023/2/2 20:41
*/
public class MessageParser {
private static final Map<Short, Class<? extends BaseMessage>> types = new HashMap<>();
public static final MessageParser instance = new MessageParser();
private MessageParser() {
}
static {
types.put((short)0xd350, D350SurfaceInclineMessage.class);//ACC上报
}
public BaseMessage parse(ByteBuf src) throws Exception {
short flag = src.getShort(0); // msg flag
Class<? extends BaseMessage> clz = types.get(flag);
if (clz == null) {
throw new UnSupportedMessageException();
}
BaseMessage message = clz.newInstance();
message.decode(src);
return message;
}
}

View File

@ -0,0 +1,50 @@
package com.imdroid.inclide_server.message;
import io.netty.buffer.ByteBuf;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author Layton
* @date 2023/2/2 20:32
*/
@Data
public abstract class BaseMessage {
protected int header;
protected String id;
protected int len;
protected int seq;
protected int pps;
protected LocalDateTime createTime;
protected int tenantId;//消息所属的租户在executor里获取
protected byte[] srcData;//message在线程中处理需要把channel传递过来的数据拷贝出来
public void decode(ByteBuf src) {
this.createTime = LocalDateTime.now();
if (shouldDecodeHeader()) {
// read操作会移动ByteBuf内部指针除D331外其他都用read来读
//int packetLen = src.readableBytes();
this.header = src.readUnsignedShort();
this.len = src.readUnsignedShort();
this.seq = this.len >> 11;
this.len = this.len & 0x7FF;
this.id = String.valueOf(src.readUnsignedInt());
/*if (packetLen - 4 != this.len) {
String msg = (String.format("id[%s],长度字段值[%s]与包的消息体长度[%s]不匹配", id, this.len, packetLen - 4));
WrongMessageRecorder.INSTANCE.append("receive wrong message," + msg);
}*/
this.pps = src.readUnsignedShort();
}
decodeBody(src);
}
public abstract void decodeBody(ByteBuf src);
public boolean shouldDecodeHeader() {
return true;
}
}

View File

@ -0,0 +1,50 @@
package com.imdroid.inclide_server.message;
import com.imdroid.secapi.dto.SurfaceInclineData;
import io.netty.buffer.ByteBuf;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* 自检消息
*
* @author Layton
* @date 2023/2/2 20:38
*/
@EqualsAndHashCode(callSuper=true)
@Data
public class D350SurfaceInclineMessage extends BaseMessage {
SurfaceInclineData inclineData = new SurfaceInclineData();
@Override
public void decodeBody(ByteBuf src) {
// d3 50 length(2bytes) device_id(4bytes) seq(2bytes) data
this.len = src.readableBytes(); // total length
this.header = src.readUnsignedShort();// flag
src.readUnsignedShort();// d342 length
this.id = String.valueOf(src.readUnsignedInt());//id
this.seq = src.readUnsignedShort();
inclineData.setDeviceid(getId());
inclineData.setCreatetime(createTime);
inclineData.setSensorid(src.readUnsignedByte());
inclineData.setAnglex(src.readFloat());
inclineData.setAngley(src.readFloat());
inclineData.setAnglez(src.readFloat());
inclineData.setAccx(src.readFloat());
inclineData.setAccy(src.readFloat());
inclineData.setAccz(src.readFloat());
inclineData.setMaxaccx(src.readFloat());
inclineData.setMaxaccy(src.readFloat());
inclineData.setMaxaccz(src.readFloat());
inclineData.setMinaccx(src.readFloat());
inclineData.setMinaccy(src.readFloat());
inclineData.setMinaccz(src.readFloat());
}
@Override
public boolean shouldDecodeHeader() {
return false;
}
}

View File

@ -0,0 +1,50 @@
package com.imdroid.inclide_server.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.socket.DatagramPacket;
import lombok.Data;
import java.net.InetSocketAddress;
/**
* @author Layton
* @date 2023/2/2 21:00
*/
@Data
public class DeviceChannel {
private String deviceId;
private String imei;
private Channel channel;
private InetSocketAddress address;
private long lastTime;
private boolean tcp;
public DeviceChannel(String deviceId, Channel channel, InetSocketAddress address) {
this.deviceId = deviceId;
this.channel = channel;
this.address = address;
lastTime = System.currentTimeMillis();
this.tcp = (address == null);
}
public boolean isOnline() {
if (tcp) {
return channel.isActive();
}
// return (System.currentTimeMillis() - lastTime) < 28 * 1000L;
return true;
}
public void writeAndFlush(ByteBuf buf) {
if (tcp) {
channel.writeAndFlush(buf);
} else {
channel.writeAndFlush(new DatagramPacket(buf, address));
}
}
}

View File

@ -0,0 +1,69 @@
package com.imdroid.inclide_server.server;
import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Layton
* @date 2023/2/2 21:00
* 提供两种获取channel的方法通过deviceId通过imeiimei是DTU连接服务器最先上报的消息
* 利用imei-ipaddr-deviceId可以发现imei和deviceId的绑定关系
*/
public class OnlineChannels {
public static final OnlineChannels INSTANCE = new OnlineChannels();
// 设备已连接deviceId已上报
private final Map<String, DeviceChannel> dataChannels = new ConcurrentHashMap<>();
private final Map<String, DeviceChannel> configChannels = new ConcurrentHashMap<>();
private OnlineChannels() {}
public DeviceChannel updateDataChannel(String deviceId, Channel channel, InetSocketAddress address) {
DeviceChannel deviceChannel = dataChannels.get(deviceId);
if(deviceChannel == null){
deviceChannel = new DeviceChannel(deviceId, channel, address);
dataChannels.put(deviceId, deviceChannel);
}
else {
deviceChannel.setChannel(channel);
deviceChannel.setAddress(address);
}
return deviceChannel;
}
public DeviceChannel updateConfigChannel(String deviceId, Channel channel, InetSocketAddress address) {
DeviceChannel deviceChannel = configChannels.get(deviceId);
if(deviceChannel == null){
deviceChannel = new DeviceChannel(deviceId, channel, address);
configChannels.put(deviceId, deviceChannel);
}
else {
deviceChannel.setChannel(channel);
deviceChannel.setAddress(address);
}
return deviceChannel;
}
public void removeConfigChannel(String deviceId){
configChannels.remove(deviceId);
}
public Optional<DeviceChannel> get(String deviceId) {
return Optional.ofNullable(dataChannels.get(deviceId));
}
public DeviceChannel getDataChannel(String deviceId){
return dataChannels.get(deviceId);
}
public DeviceChannel getConfigChannel(String deviceId){
return configChannels.get(deviceId);
}
}

View File

@ -0,0 +1,62 @@
package com.imdroid.inclide_server.server.udp;
import com.imdroid.common.util.DataTypeUtil;
import com.imdroid.inclide_server.exception.UnSupportedMessageException;
import com.imdroid.inclide_server.executor.BizExecutors;
import com.imdroid.inclide_server.executor.MessageParser;
import com.imdroid.inclide_server.message.BaseMessage;
import com.imdroid.inclide_server.server.OnlineChannels;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* @author Layton
* @date 2023/2/13 11:47
*/
@ChannelHandler.Sharable
@Component
public class RtcmUdpHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg;
try {
if (packet.content() == null) {
return;
}
/*if (logger.isDebugEnabled()) {
byte[] data = new byte[packet.content().readableBytes()];
packet.content().getBytes(0, data);
logger.debug("receive message:" + DataTypeUtil.getHexString(data));
}*/
// 消息解析
BaseMessage message = MessageParser.instance.parse(packet.content());
OnlineChannels.INSTANCE.updateDataChannel(message.getId(), ctx.channel(), packet.sender());
BizExecutors.execute(message);
} catch (UnSupportedMessageException e) {
byte[] data = new byte[packet.content().readableBytes()];
packet.content().getBytes(0, data);
logger.warn("receive un supported message: {}", DataTypeUtil.getHexString(data));
} catch (Exception e) {
logger.error("channel read error: {}", e.toString());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
logger.error("Exception caught: {}", cause.toString());
ctx.close();
}
}

View File

@ -0,0 +1,60 @@
package com.imdroid.inclide_server.server.udp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* @author Layton
* @date 2023/2/13 11:47
*/
@Component
public class RtcmUdpServer implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class);
@Value("${netty.data.port:9903}")
private Integer port;
@Autowired
private RtcmUdpHandler rtcmUdpHandler;
public RtcmUdpServer() {
}
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(this::start0, "udp-server").start();
}
private void start0() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_SNDBUF, 1024*1024) //1M缓存考虑1000个基站同时转发
.option(ChannelOption.SO_RCVBUF, 1024*1024)//1M缓存断点续传要大带宽
.handler(rtcmUdpHandler);
try {
ChannelFuture future = bootstrap.bind(port).sync().channel().closeFuture();
logger.info("udp server start at port {}", port);
future.await();
} catch (Exception e) {
logger.error("Error starting Imdroid protocol at port {}", port, e);
} finally {
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,155 @@
package com.imdroid.inclide_server.service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.imdroid.common.util.GsonUtil;
import com.imdroid.common.util.NumberUtils;
import com.imdroid.inclide_server.entity.InclineData;
import com.imdroid.inclide_server.message.D350SurfaceInclineMessage;
import com.imdroid.secapi.dto.GnssDeviceJoin;
import com.imdroid.secapi.dto.GnssGroupFwd;
import com.imdroid.secapi.dto.GnssGroupFwdMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class InclineDataForwarder{
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
final Logger logger = LoggerFactory.getLogger(InclineDataForwarder.class);
@Autowired
GnssGroupFwdMapper fwdMapper;
@Value("${incline.data_server_list}")
private String strServerList;
ConcurrentHashMap<String, TCPClient> tcpClientMap = new ConcurrentHashMap<>();
class XFZTCPListener implements TCPListener{
public static final int STATE_NO_ACK = 0;
public static final int STATE_OK = 1;
public static final int STATE_FAILED = 2;
public int state = STATE_NO_ACK;
@Override
public void clear(){
state = STATE_NO_ACK;
}
@Override
public void onConnected() {
}
@Override
public void onDisconnect() {
}
@Override
public void onMessage(String msg) {
if(msg.contains("succeed")) state = STATE_OK;
else state = STATE_FAILED;
}
}
@PostConstruct
void init(){
// TCP客户端: 服务器名1,地址,端口;服务器名2,地址,端口;...
String[] serverArray = strServerList.split(";");
for(String serverPara:serverArray){
String[] paras = serverPara.split(",");
if(paras.length==3){
initTcpClient(paras[0], paras[1], paras[2]);
}
}
}
void initTcpClient(String name, String host, String port){
//登记到数据库
String description = host+":"+port;
QueryWrapper<GnssGroupFwd> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("name",name);
queryWrapper.last("limit 1");
GnssGroupFwd gnssGroupFwd = fwdMapper.selectOne(queryWrapper);
if(gnssGroupFwd == null){
gnssGroupFwd = new GnssGroupFwd();
gnssGroupFwd.setName(name);
gnssGroupFwd.setDescription(description);
gnssGroupFwd.setDevice_num(0);
fwdMapper.insert(gnssGroupFwd);
}
else{
gnssGroupFwd.setDescription(description);
fwdMapper.updateById(gnssGroupFwd);
}
//启动客户端
TCPClient tcpClient = new TCPClient();
tcpClient.init(host, Integer.parseInt(port),new XFZTCPListener());
tcpClient.start();
tcpClientMap.put(name, tcpClient);
}
public void send(GnssDeviceJoin device, D350SurfaceInclineMessage message){
InclineData inclineData = new InclineData();
inclineData.setProjectID(device.getProject_id());
inclineData.setWorkPointID(device.getProject_id());
List<InclineData.Data> dataList = new ArrayList<>(1);
inclineData.setData(dataList);
InclineData.Data data = new InclineData.Data();
dataList.add(data);
data.setDataTime(message.getCreateTime().format(formatter));
data.setDevNum(device.getDeviceid());
data.setDevtype("ZdQx");
// 角度
float x = message.getInclineData().getAnglex();
float y = message.getInclineData().getAngley();
float z = message.getInclineData().getAnglez();
float Zx = message.getInclineData().getMaxaccx()-message.getInclineData().getMinaccx();
float Zy = message.getInclineData().getMaxaccx()-message.getInclineData().getMinaccy();
float Zz = message.getInclineData().getMaxaccx()-message.getInclineData().getMinaccz();
data.setX(NumberUtils.scale((double) x, 3));
data.setY(NumberUtils.scale((double) y, 3));
data.setZ(NumberUtils.scale((double) z, 3));
data.setZx(NumberUtils.scale((double) Zx, 3));
data.setZy(NumberUtils.scale((double) Zy, 3));
data.setZz(NumberUtils.scale((double) Zz, 3));
// 经纬度
data.setDevLng(device.getLongitude());
data.setDevLat(device.getLatitude());
// 发送
TCPClient tcpClient1 = tcpClientMap.get(device.getFwd_group_id());
TCPClient tcpClient2 = tcpClientMap.get(device.getFwd_group_id2());
String json = "#" + GsonUtil.toJson(inclineData) + "!";
logger.debug("forward {}: {}",device.getDeviceid(), json);
if(tcpClient1!=null) {
try {
tcpClient1.listener.clear();
tcpClient1.writeAndFlush(json);
} catch (Exception e1) {
logger.error(e1.toString());
}
}
if(tcpClient2!=null) {
try {
tcpClient2.listener.clear();
tcpClient2.writeAndFlush(json);
} catch (Exception e1) {
logger.error(e1.toString());
}
}
dataList.clear();
}
}

View File

@ -0,0 +1,147 @@
package com.imdroid.inclide_server.service;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
public class TCPClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private String host;
private int port;
private Bootstrap bootstrap;
private EventLoopGroup group;
private Channel channel;
LocalDateTime connectTime = LocalDateTime.now();
public TCPListener listener;
public void start() {
//new Thread(this::connect, host+":"+port+" forwarder tcp-client").start();
}
public void init(String dest_addr, int dest_port, TCPListener listener) {
this.host = dest_addr;
this.port = dest_port;
this.listener = listener;
//客户端需要一个事件循环组
group = new NioEventLoopGroup();
//创建客户端启动对象
// bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new TcpMessageHandler(TCPClient.this));
}
});
}
public void connect() {
logger.info("{}:{} tcp connecting...",host,port);
//启动客户端去连接服务器端
try {
ChannelFuture cf = bootstrap.connect(host, port);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
logger.info("{}:{} tcp connect failed. {}",host,port,future.cause().toString());
//重连交给后端线程执行
/*future.channel().eventLoop().schedule(() -> {
logger.info("{}:{} tcp client reconnect",host,port);
try {
connect();
} catch (Exception e) {
e.printStackTrace();
}
}, 5000, TimeUnit.MILLISECONDS);*/
} else {
/*future.channel().config().setWriteBufferWaterMark(new WriteBufferWaterMark(
1024 * 1024, // low
4 *1024*1024 // high
));*/
logger.info("{}:{} tcp client start success!",host,port);
}
}
});
//对通道关闭进行监听
this.channel = cf.channel();
this.channel.closeFuture().sync();
} catch (Exception e) {
logger.error(host+":"+port+" tcp connect error:", e);
}
}
boolean tryReconnect() throws Exception{
new Thread(this::connect, host+":"+port+" forwarder tcp-client").start();
for(int i=0; i<20; i++){
Thread.sleep(50);
if(channel!=null && channel.isActive()) return true;
}
return false;
}
public void writeAndFlush(String json) {
ByteBuf sendBuffer = Unpooled.buffer();
sendBuffer.writeBytes(json.getBytes(StandardCharsets.UTF_8));
//logger.info("send to {}: {}",host,json);
if(channel==null || !channel.isActive()){
try {
if(!tryReconnect()) return;
} catch (Exception e) {
logger.error(e.toString());
}
}
channel.writeAndFlush(sendBuffer).addListener(future -> {
if (future.isSuccess()) {
logger.info("send to tcp:"+host+" succeed.");
} else {
logger.info("send to tcp: {} failed. {}",host,future.cause().toString());
if(listener!=null){
listener.onMessage("failed");
}
}
});
}
public void onConnected(){
connectTime = LocalDateTime.now();
}
public void onDisconnect(boolean isIdle){
/*if(connectTime.isBefore(LocalDateTime.now().minusMinutes(1))) {
connect();
}
else{
ThreadManager.getScheduledThreadPool().schedule(() -> {
try {
connect();
} catch (Exception e) {
logger.error(e.toString());
}
},isIdle?30:10, TimeUnit.SECONDS);
}*/
}
public void onMessage(String msg){
if(listener!=null){
listener.onMessage(msg);
}
}
}

View File

@ -0,0 +1,8 @@
package com.imdroid.inclide_server.service;
public interface TCPListener {
void clear();
void onConnected();
void onDisconnect();
void onMessage(String msg);
}

View File

@ -0,0 +1,53 @@
package com.imdroid.inclide_server.service;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
/**
* @author Layton
* @date 2023/2/18 20:36
*/
public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final TCPClient tcpClient;
public TcpMessageHandler(TCPClient tcpClient) {
this.tcpClient = tcpClient;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf buf) throws Exception {
String msg = buf.toString(Charset.defaultCharset());
tcpClient.onMessage(msg);
if (logger.isDebugEnabled()) {
logger.debug("receive server message:" + msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("tcp channel active");
tcpClient.onConnected();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("tcp channel inactive");
tcpClient.onDisconnect(true);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("TcpMessageHandler error: {}", cause.toString());
ctx.close();
tcpClient.onDisconnect(false);
}
}

View File

@ -0,0 +1,27 @@
server.port=9918
server.servlet.context-path=/
spring.application.name=surface-incline
spring.application.build=202510
spring.jpa.show-sql = true
spring.jpa.hibernate.ddl-auto = none
spring.jpa.database-platform = org.hibernate.dialect.MySQLDialect
spring.datasource.url = jdbc:mysql://localhost:3306/beidou?characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
#spring.datasource.url=jdbc:mysql://139.9.51.237:3306/beidou?characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
spring.datasource.username = admin
spring.datasource.password = DBMgr_2022
spring.datasource.driverClassName = com.mysql.cj.jdbc.Driver
spring.jackson.dateFormat = yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone = GMT+8
app.format.date = yyyy-MM-dd
app.format.time = HH:mm:ss
app.format.datetime = yyyy-MM-dd HH:mm:ss
mybatis-plus.configuration.map-underscore-to-camel-case=false
netty.data.port=9919
incline.data_server_list = QJTestServer,62ng747631.goho.co,54483