diff --git a/sec-incline-server/pom.xml b/sec-incline-server/pom.xml new file mode 100644 index 00000000..3ae96f87 --- /dev/null +++ b/sec-incline-server/pom.xml @@ -0,0 +1,133 @@ + + + 4.0.0 + + com.imdroid + security-monitor + 1.0-SNAPSHOT + + + sec-incline-server + + + 8 + 8 + UTF-8 + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-devtools + true + + + + mysql + mysql-connector-java + 8.0.11 + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + io.netty + netty-all + 4.1.78.Final + + + com.google.code.gson + gson + 2.9.1 + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + com.baomidou + mybatis-plus-boot-starter + 3.5.2 + + + + com.imdroid + sec-api + 1.0-SNAPSHOT + + + com.imdroid + sec-common + 1.0-SNAPSHOT + + + + org.projectlombok + lombok + true + + + com.alibaba + fastjson + 1.2.76 + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + + + + central + ali-mirror + https://maven.aliyun.com/repository/central + + true + + + true + + + + + \ No newline at end of file diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/InclineServerApp.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/InclineServerApp.java new file mode 100644 index 00000000..ec60d12c --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/InclineServerApp.java @@ -0,0 +1,23 @@ +package com.imdroid.inclide_server; + +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.context.annotation.ComponentScan; + + +/** + * @author Layton + * @date 2023/1/31 20:33 + */ +@SpringBootApplication(scanBasePackages = {"com.imdroid"}) +@MapperScan({"com.imdroid.secapi"}) +@ComponentScan({"com.imdroid.*"}) +@EntityScan({"com.imdroid.*"}) +public class InclineServerApp { + + public static void main(String[] args) { + SpringApplication.run(InclineServerApp.class, args); + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/entity/InclineData.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/entity/InclineData.java new file mode 100644 index 00000000..afeffb1f --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/entity/InclineData.java @@ -0,0 +1,44 @@ +package com.imdroid.inclide_server.entity; + +import lombok.Data; + +import java.util.List; + +@Data +public class InclineData { + private String ProjectID; + + private String WorkPointID; + + private double WorkPointLng = 0; + + private double WorkPointLat = 0; + + private List data; + + @lombok.Data + public static class Data { + private String DevNum; + + private String Devtype; //"ZdQx" + + private double DevLng = 0; + + private double DevLat = 0; + + //角度 + private double x; + + private double y; + + private double z; + //加速度振幅 + private double Zx; + + private double Zy; + + private double Zz; + + private String DataTime; + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/exception/MessageValidateFailException.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/exception/MessageValidateFailException.java new file mode 100644 index 00000000..91c2c3fd --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/exception/MessageValidateFailException.java @@ -0,0 +1,14 @@ +package com.imdroid.inclide_server.exception; + +/** + * 设备消息校验异常 + * + * @author LiGang + * @date 2023/4/8 10:55 + */ +public class MessageValidateFailException extends RuntimeException { + + public MessageValidateFailException(String message) { + super(message); + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/exception/UnSupportedMessageException.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/exception/UnSupportedMessageException.java new file mode 100644 index 00000000..644533df --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/exception/UnSupportedMessageException.java @@ -0,0 +1,8 @@ +package com.imdroid.inclide_server.exception; + +/** + * @author Layton + * @date 2023/2/2 20:43 + */ +public class UnSupportedMessageException extends RuntimeException { +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/BizExecutors.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/BizExecutors.java new file mode 100644 index 00000000..b8f66dc0 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/BizExecutors.java @@ -0,0 +1,28 @@ +package com.imdroid.inclide_server.executor; + +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; + +@Component +public class BizExecutors { + + private static HashMap, Object> executors = new HashMap<>(); + + private static List> executorList; + + public BizExecutors(List> executorList) { + this.executorList = executorList; + for (Executor executor : this.executorList) { + System.out.println("executor type:" + executor.getMessageType().getName()); + executors.put(executor.getMessageType(), executor); + } + } + + + public static R execute(Q query) { + Executor executor = (Executor)(executors.get(query.getClass())); + return executor.execute(query); + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/D350SurfaceInclineMessageExecutor.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/D350SurfaceInclineMessageExecutor.java new file mode 100644 index 00000000..aaf17804 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/D350SurfaceInclineMessageExecutor.java @@ -0,0 +1,90 @@ +package com.imdroid.inclide_server.executor; + +import com.github.yulichang.query.MPJQueryWrapper; +import com.imdroid.common.util.ThreadManager; +import com.imdroid.inclide_server.message.D350SurfaceInclineMessage; +import com.imdroid.inclide_server.service.InclineDataForwarder; +import com.imdroid.secapi.dto.GnssDevice; +import com.imdroid.secapi.dto.GnssDeviceJoin; +import com.imdroid.secapi.dto.GnssDeviceMapper; +import com.imdroid.secapi.dto.SurfaceInclineDataMapper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author Layton + * @date 2023/2/2 20:40 + * 1、回ACK,以便终端判断是否连接上服务器后台 + * 2、同步参数 + * 3、判断是否发冷启动指令 + * 4、保存状态信息,判断是否有低电压等告警,清除离线告警 + */ +@Configuration +@EnableScheduling +public class D350SurfaceInclineMessageExecutor implements Executor { + @Autowired + SurfaceInclineDataMapper dataMapper; + @Autowired + GnssDeviceMapper deviceMapper; + @Autowired + InclineDataForwarder dataForwarder; + + ConcurrentHashMap deviceInfoMap = new ConcurrentHashMap<>(); + ConcurrentHashMap deviceAliveMap = new ConcurrentHashMap<>(); + + @Override + public Void execute(D350SurfaceInclineMessage message) { + GnssDeviceJoin device = getDevice(message.getId()); + if(device == null) return null; + + deviceAliveMap.put(message.getId(), LocalDateTime.now()); + // 补齐tenantId + message.getInclineData().setTenantid(device.getTenantid()); + + ThreadManager.getFixedThreadPool().submit(() -> { + //保存状态信息,判断是否有低电压等告警,清除离线告警 + dataMapper.insert(message.getInclineData()); + //推送 + dataForwarder.send(device, message); + }); + return null; + } + + @Override + public Class getMessageType() { + return D350SurfaceInclineMessage.class; + } + + GnssDeviceJoin getDevice(String deviceId){ + GnssDeviceJoin device = deviceInfoMap.get(deviceId); + if(device == null){ + MPJQueryWrapper jquery = new MPJQueryWrapper() + .selectAll(GnssDevice.class) + .select("d.latitude as latitude") + .select("d.longitude as longitude") + .leftJoin("gnssstatus d on t.deviceid = d.deviceid") + .last("limit 1"); + device = deviceMapper.selectJoinOne(GnssDeviceJoin.class, jquery); + if(device != null) deviceInfoMap.put(deviceId,device); + } + return device; + } + + @Scheduled(cron = "0 0/30 * * * ?") // 每30分钟执行一次 + void updateDeviceAliveTime(){ + LocalDateTime now = LocalDateTime.now(); + for(Map.Entry entry:deviceAliveMap.entrySet()){ + if(entry.getValue().isBefore(now.minusHours(2))){ + deviceAliveMap.remove(entry); + deviceInfoMap.remove(entry.getKey()); + } + } + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/Executor.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/Executor.java new file mode 100644 index 00000000..241a2af4 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/Executor.java @@ -0,0 +1,12 @@ +package com.imdroid.inclide_server.executor; + +/** + * @author Layton + * @date 2022/4/8 22:32 + */ +public interface Executor { + + R execute(Q message); + + Class getMessageType(); +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/MessageParser.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/MessageParser.java new file mode 100644 index 00000000..396d6194 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/executor/MessageParser.java @@ -0,0 +1,40 @@ +package com.imdroid.inclide_server.executor; + +import com.imdroid.inclide_server.exception.UnSupportedMessageException; +import com.imdroid.inclide_server.message.BaseMessage; +import com.imdroid.inclide_server.message.D350SurfaceInclineMessage; +import io.netty.buffer.ByteBuf; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Layton + * @date 2023/2/2 20:41 + */ +public class MessageParser { + + private static final Map> types = new HashMap<>(); + + public static final MessageParser instance = new MessageParser(); + + private MessageParser() { + + } + + static { + types.put((short)0xd350, D350SurfaceInclineMessage.class);//ACC上报 + } + + + public BaseMessage parse(ByteBuf src) throws Exception { + short flag = src.getShort(0); // msg flag + Class clz = types.get(flag); + if (clz == null) { + throw new UnSupportedMessageException(); + } + BaseMessage message = clz.newInstance(); + message.decode(src); + return message; + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/message/BaseMessage.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/message/BaseMessage.java new file mode 100644 index 00000000..81a14a89 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/message/BaseMessage.java @@ -0,0 +1,50 @@ +package com.imdroid.inclide_server.message; + +import io.netty.buffer.ByteBuf; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @author Layton + * @date 2023/2/2 20:32 + */ +@Data +public abstract class BaseMessage { + protected int header; + protected String id; + protected int len; + protected int seq; + protected int pps; + protected LocalDateTime createTime; + + protected int tenantId;//消息所属的租户,在executor里获取 + + protected byte[] srcData;//message在线程中处理,需要把channel传递过来的数据拷贝出来 + + public void decode(ByteBuf src) { + this.createTime = LocalDateTime.now(); + if (shouldDecodeHeader()) { + // read操作会移动ByteBuf内部指针,除D331外,其他都用read来读 + //int packetLen = src.readableBytes(); + this.header = src.readUnsignedShort(); + + this.len = src.readUnsignedShort(); + this.seq = this.len >> 11; + this.len = this.len & 0x7FF; + this.id = String.valueOf(src.readUnsignedInt()); + /*if (packetLen - 4 != this.len) { + String msg = (String.format("id[%s],长度字段值[%s]与包的消息体长度[%s]不匹配", id, this.len, packetLen - 4)); + WrongMessageRecorder.INSTANCE.append("receive wrong message," + msg); + }*/ + this.pps = src.readUnsignedShort(); + } + decodeBody(src); + } + + public abstract void decodeBody(ByteBuf src); + + public boolean shouldDecodeHeader() { + return true; + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/message/D350SurfaceInclineMessage.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/message/D350SurfaceInclineMessage.java new file mode 100644 index 00000000..60456a5b --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/message/D350SurfaceInclineMessage.java @@ -0,0 +1,50 @@ +package com.imdroid.inclide_server.message; + +import com.imdroid.secapi.dto.SurfaceInclineData; +import io.netty.buffer.ByteBuf; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * 自检消息 + * + * @author Layton + * @date 2023/2/2 20:38 + */ +@EqualsAndHashCode(callSuper=true) +@Data +public class D350SurfaceInclineMessage extends BaseMessage { + + SurfaceInclineData inclineData = new SurfaceInclineData(); + + @Override + public void decodeBody(ByteBuf src) { + // d3 50 length(2bytes) device_id(4bytes) seq(2bytes) data + this.len = src.readableBytes(); // total length + this.header = src.readUnsignedShort();// flag + src.readUnsignedShort();// d342 length + this.id = String.valueOf(src.readUnsignedInt());//id + this.seq = src.readUnsignedShort(); + + inclineData.setDeviceid(getId()); + inclineData.setCreatetime(createTime); + + inclineData.setSensorid(src.readUnsignedByte()); + inclineData.setAnglex(src.readFloat()); + inclineData.setAngley(src.readFloat()); + inclineData.setAnglez(src.readFloat()); + inclineData.setAccx(src.readFloat()); + inclineData.setAccy(src.readFloat()); + inclineData.setAccz(src.readFloat()); + inclineData.setMaxaccx(src.readFloat()); + inclineData.setMaxaccy(src.readFloat()); + inclineData.setMaxaccz(src.readFloat()); + inclineData.setMinaccx(src.readFloat()); + inclineData.setMinaccy(src.readFloat()); + inclineData.setMinaccz(src.readFloat()); + } + @Override + public boolean shouldDecodeHeader() { + return false; + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/DeviceChannel.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/DeviceChannel.java new file mode 100644 index 00000000..b37c06c7 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/DeviceChannel.java @@ -0,0 +1,50 @@ +package com.imdroid.inclide_server.server; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.socket.DatagramPacket; +import lombok.Data; + +import java.net.InetSocketAddress; + +/** + * @author Layton + * @date 2023/2/2 21:00 + */ +@Data +public class DeviceChannel { + private String deviceId; + private String imei; + + private Channel channel; + + private InetSocketAddress address; + + private long lastTime; + + private boolean tcp; + + public DeviceChannel(String deviceId, Channel channel, InetSocketAddress address) { + this.deviceId = deviceId; + this.channel = channel; + this.address = address; + lastTime = System.currentTimeMillis(); + this.tcp = (address == null); + } + + public boolean isOnline() { + if (tcp) { + return channel.isActive(); + } +// return (System.currentTimeMillis() - lastTime) < 28 * 1000L; + return true; + } + + public void writeAndFlush(ByteBuf buf) { + if (tcp) { + channel.writeAndFlush(buf); + } else { + channel.writeAndFlush(new DatagramPacket(buf, address)); + } + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/OnlineChannels.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/OnlineChannels.java new file mode 100644 index 00000000..f175e8de --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/OnlineChannels.java @@ -0,0 +1,69 @@ +package com.imdroid.inclide_server.server; + +import io.netty.channel.Channel; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author Layton + * @date 2023/2/2 21:00 + * 提供两种获取channel的方法:通过deviceId;通过imei。imei是DTU连接服务器最先上报的消息 + * 利用imei-ipaddr-deviceId,可以发现imei和deviceId的绑定关系 + */ +public class OnlineChannels { + + public static final OnlineChannels INSTANCE = new OnlineChannels(); + + // 设备已连接,deviceId已上报 + private final Map dataChannels = new ConcurrentHashMap<>(); + private final Map configChannels = new ConcurrentHashMap<>(); + + private OnlineChannels() {} + + public DeviceChannel updateDataChannel(String deviceId, Channel channel, InetSocketAddress address) { + DeviceChannel deviceChannel = dataChannels.get(deviceId); + if(deviceChannel == null){ + deviceChannel = new DeviceChannel(deviceId, channel, address); + dataChannels.put(deviceId, deviceChannel); + } + else { + deviceChannel.setChannel(channel); + deviceChannel.setAddress(address); + } + + return deviceChannel; + } + + public DeviceChannel updateConfigChannel(String deviceId, Channel channel, InetSocketAddress address) { + DeviceChannel deviceChannel = configChannels.get(deviceId); + if(deviceChannel == null){ + deviceChannel = new DeviceChannel(deviceId, channel, address); + configChannels.put(deviceId, deviceChannel); + } + else { + deviceChannel.setChannel(channel); + deviceChannel.setAddress(address); + } + + return deviceChannel; + } + + public void removeConfigChannel(String deviceId){ + configChannels.remove(deviceId); + } + + public Optional get(String deviceId) { + return Optional.ofNullable(dataChannels.get(deviceId)); + } + + public DeviceChannel getDataChannel(String deviceId){ + return dataChannels.get(deviceId); + } + public DeviceChannel getConfigChannel(String deviceId){ + return configChannels.get(deviceId); + } + +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/udp/RtcmUdpHandler.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/udp/RtcmUdpHandler.java new file mode 100644 index 00000000..839f965b --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/udp/RtcmUdpHandler.java @@ -0,0 +1,62 @@ +package com.imdroid.inclide_server.server.udp; + +import com.imdroid.common.util.DataTypeUtil; +import com.imdroid.inclide_server.exception.UnSupportedMessageException; +import com.imdroid.inclide_server.executor.BizExecutors; +import com.imdroid.inclide_server.executor.MessageParser; +import com.imdroid.inclide_server.message.BaseMessage; +import com.imdroid.inclide_server.server.OnlineChannels; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.ReferenceCountUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * @author Layton + * @date 2023/2/13 11:47 + */ +@ChannelHandler.Sharable +@Component +public class RtcmUdpHandler extends ChannelInboundHandlerAdapter { + + private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + DatagramPacket packet = (DatagramPacket) msg; + try { + if (packet.content() == null) { + return; + } + /*if (logger.isDebugEnabled()) { + byte[] data = new byte[packet.content().readableBytes()]; + packet.content().getBytes(0, data); + logger.debug("receive message:" + DataTypeUtil.getHexString(data)); + }*/ + // 消息解析 + BaseMessage message = MessageParser.instance.parse(packet.content()); + OnlineChannels.INSTANCE.updateDataChannel(message.getId(), ctx.channel(), packet.sender()); + BizExecutors.execute(message); + } catch (UnSupportedMessageException e) { + byte[] data = new byte[packet.content().readableBytes()]; + packet.content().getBytes(0, data); + logger.warn("receive un supported message: {}", DataTypeUtil.getHexString(data)); + } catch (Exception e) { + logger.error("channel read error: {}", e.toString()); + } finally { + ReferenceCountUtil.release(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + logger.error("Exception caught: {}", cause.toString()); + ctx.close(); + } + +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/udp/RtcmUdpServer.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/udp/RtcmUdpServer.java new file mode 100644 index 00000000..c882bdd9 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/server/udp/RtcmUdpServer.java @@ -0,0 +1,60 @@ +package com.imdroid.inclide_server.server.udp; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioDatagramChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +/** + * @author Layton + * @date 2023/2/13 11:47 + */ +@Component +public class RtcmUdpServer implements ApplicationRunner { + + private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class); + + @Value("${netty.data.port:9903}") + private Integer port; + + @Autowired + private RtcmUdpHandler rtcmUdpHandler; + + public RtcmUdpServer() { + + } + + @Override + public void run(ApplicationArguments args) throws Exception { + new Thread(this::start0, "udp-server").start(); + } + + + private void start0() { + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioDatagramChannel.class) + .option(ChannelOption.SO_SNDBUF, 1024*1024) //1M缓存,考虑1000个基站同时转发 + .option(ChannelOption.SO_RCVBUF, 1024*1024)//1M缓存,断点续传要大带宽 + .handler(rtcmUdpHandler); + try { + ChannelFuture future = bootstrap.bind(port).sync().channel().closeFuture(); + logger.info("udp server start at port {}", port); + future.await(); + } catch (Exception e) { + logger.error("Error starting Imdroid protocol at port {}", port, e); + } finally { + group.shutdownGracefully(); + } + } +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/InclineDataForwarder.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/InclineDataForwarder.java new file mode 100644 index 00000000..3772782f --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/InclineDataForwarder.java @@ -0,0 +1,155 @@ +package com.imdroid.inclide_server.service; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.imdroid.common.util.GsonUtil; +import com.imdroid.common.util.NumberUtils; +import com.imdroid.inclide_server.entity.InclineData; +import com.imdroid.inclide_server.message.D350SurfaceInclineMessage; +import com.imdroid.secapi.dto.GnssDeviceJoin; +import com.imdroid.secapi.dto.GnssGroupFwd; +import com.imdroid.secapi.dto.GnssGroupFwdMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class InclineDataForwarder{ + final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + final Logger logger = LoggerFactory.getLogger(InclineDataForwarder.class); + + @Autowired + GnssGroupFwdMapper fwdMapper; + @Value("${incline.data_server_list}") + private String strServerList; + ConcurrentHashMap tcpClientMap = new ConcurrentHashMap<>(); + + class XFZTCPListener implements TCPListener{ + public static final int STATE_NO_ACK = 0; + public static final int STATE_OK = 1; + public static final int STATE_FAILED = 2; + public int state = STATE_NO_ACK; + + @Override + public void clear(){ + state = STATE_NO_ACK; + } + @Override + public void onConnected() { + + } + + @Override + public void onDisconnect() { + + } + + @Override + public void onMessage(String msg) { + if(msg.contains("succeed")) state = STATE_OK; + else state = STATE_FAILED; + } + } + + @PostConstruct + void init(){ + // TCP客户端: 服务器名1,地址,端口;服务器名2,地址,端口;... + String[] serverArray = strServerList.split(";"); + for(String serverPara:serverArray){ + String[] paras = serverPara.split(","); + if(paras.length==3){ + initTcpClient(paras[0], paras[1], paras[2]); + } + } + } + + void initTcpClient(String name, String host, String port){ + //登记到数据库 + String description = host+":"+port; + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("name",name); + queryWrapper.last("limit 1"); + GnssGroupFwd gnssGroupFwd = fwdMapper.selectOne(queryWrapper); + if(gnssGroupFwd == null){ + gnssGroupFwd = new GnssGroupFwd(); + gnssGroupFwd.setName(name); + gnssGroupFwd.setDescription(description); + gnssGroupFwd.setDevice_num(0); + fwdMapper.insert(gnssGroupFwd); + } + else{ + gnssGroupFwd.setDescription(description); + fwdMapper.updateById(gnssGroupFwd); + } + //启动客户端 + TCPClient tcpClient = new TCPClient(); + tcpClient.init(host, Integer.parseInt(port),new XFZTCPListener()); + tcpClient.start(); + tcpClientMap.put(name, tcpClient); + } + + public void send(GnssDeviceJoin device, D350SurfaceInclineMessage message){ + InclineData inclineData = new InclineData(); + + inclineData.setProjectID(device.getProject_id()); + inclineData.setWorkPointID(device.getProject_id()); + + List dataList = new ArrayList<>(1); + inclineData.setData(dataList); + + + InclineData.Data data = new InclineData.Data(); + dataList.add(data); + data.setDataTime(message.getCreateTime().format(formatter)); + data.setDevNum(device.getDeviceid()); + data.setDevtype("ZdQx"); + // 角度 + float x = message.getInclineData().getAnglex(); + float y = message.getInclineData().getAngley(); + float z = message.getInclineData().getAnglez(); + float Zx = message.getInclineData().getMaxaccx()-message.getInclineData().getMinaccx(); + float Zy = message.getInclineData().getMaxaccx()-message.getInclineData().getMinaccy(); + float Zz = message.getInclineData().getMaxaccx()-message.getInclineData().getMinaccz(); + + data.setX(NumberUtils.scale((double) x, 3)); + data.setY(NumberUtils.scale((double) y, 3)); + data.setZ(NumberUtils.scale((double) z, 3)); + data.setZx(NumberUtils.scale((double) Zx, 3)); + data.setZy(NumberUtils.scale((double) Zy, 3)); + data.setZz(NumberUtils.scale((double) Zz, 3)); + // 经纬度 + data.setDevLng(device.getLongitude()); + data.setDevLat(device.getLatitude()); + + // 发送 + TCPClient tcpClient1 = tcpClientMap.get(device.getFwd_group_id()); + TCPClient tcpClient2 = tcpClientMap.get(device.getFwd_group_id2()); + String json = "#" + GsonUtil.toJson(inclineData) + "!"; + logger.debug("forward {}: {}",device.getDeviceid(), json); + if(tcpClient1!=null) { + try { + tcpClient1.listener.clear(); + tcpClient1.writeAndFlush(json); + } catch (Exception e1) { + logger.error(e1.toString()); + } + } + if(tcpClient2!=null) { + try { + tcpClient2.listener.clear(); + tcpClient2.writeAndFlush(json); + } catch (Exception e1) { + logger.error(e1.toString()); + } + } + dataList.clear(); + } + +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TCPClient.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TCPClient.java new file mode 100644 index 00000000..a91cf1e0 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TCPClient.java @@ -0,0 +1,147 @@ +package com.imdroid.inclide_server.service; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; + +public class TCPClient { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private String host; + + private int port; + + private Bootstrap bootstrap; + private EventLoopGroup group; + private Channel channel; + LocalDateTime connectTime = LocalDateTime.now(); + public TCPListener listener; + + public void start() { + //new Thread(this::connect, host+":"+port+" forwarder tcp-client").start(); + } + + public void init(String dest_addr, int dest_port, TCPListener listener) { + this.host = dest_addr; + this.port = dest_port; + this.listener = listener; + + //客户端需要一个事件循环组 + group = new NioEventLoopGroup(); + //创建客户端启动对象 + // bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可. + bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + //加入处理器 + ch.pipeline().addLast(new TcpMessageHandler(TCPClient.this)); + } + }); + } + + public void connect() { + logger.info("{}:{} tcp connecting...",host,port); + //启动客户端去连接服务器端 + try { + ChannelFuture cf = bootstrap.connect(host, port); + cf.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + logger.info("{}:{} tcp connect failed. {}",host,port,future.cause().toString()); + //重连交给后端线程执行 + /*future.channel().eventLoop().schedule(() -> { + logger.info("{}:{} tcp client reconnect",host,port); + try { + connect(); + } catch (Exception e) { + e.printStackTrace(); + } + }, 5000, TimeUnit.MILLISECONDS);*/ + } else { + /*future.channel().config().setWriteBufferWaterMark(new WriteBufferWaterMark( + 1024 * 1024, // low + 4 *1024*1024 // high + ));*/ + logger.info("{}:{} tcp client start success!",host,port); + } + } + }); + //对通道关闭进行监听 + this.channel = cf.channel(); + this.channel.closeFuture().sync(); + } catch (Exception e) { + logger.error(host+":"+port+" tcp connect error:", e); + } + } + + boolean tryReconnect() throws Exception{ + new Thread(this::connect, host+":"+port+" forwarder tcp-client").start(); + for(int i=0; i<20; i++){ + Thread.sleep(50); + if(channel!=null && channel.isActive()) return true; + } + return false; + } + + public void writeAndFlush(String json) { + ByteBuf sendBuffer = Unpooled.buffer(); + sendBuffer.writeBytes(json.getBytes(StandardCharsets.UTF_8)); + //logger.info("send to {}: {}",host,json); + if(channel==null || !channel.isActive()){ + try { + if(!tryReconnect()) return; + } catch (Exception e) { + logger.error(e.toString()); + } + } + channel.writeAndFlush(sendBuffer).addListener(future -> { + if (future.isSuccess()) { + logger.info("send to tcp:"+host+" succeed."); + } else { + logger.info("send to tcp: {} failed. {}",host,future.cause().toString()); + if(listener!=null){ + listener.onMessage("failed"); + } + } + }); + } + + public void onConnected(){ + connectTime = LocalDateTime.now(); + } + + public void onDisconnect(boolean isIdle){ + /*if(connectTime.isBefore(LocalDateTime.now().minusMinutes(1))) { + connect(); + } + else{ + ThreadManager.getScheduledThreadPool().schedule(() -> { + try { + connect(); + } catch (Exception e) { + logger.error(e.toString()); + } + },isIdle?30:10, TimeUnit.SECONDS); + }*/ + } + + public void onMessage(String msg){ + if(listener!=null){ + listener.onMessage(msg); + } + } + +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TCPListener.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TCPListener.java new file mode 100644 index 00000000..2e8c25e6 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TCPListener.java @@ -0,0 +1,8 @@ +package com.imdroid.inclide_server.service; + +public interface TCPListener { + void clear(); + void onConnected(); + void onDisconnect(); + void onMessage(String msg); +} diff --git a/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TcpMessageHandler.java b/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TcpMessageHandler.java new file mode 100644 index 00000000..ec9577e1 --- /dev/null +++ b/sec-incline-server/src/main/java/com/imdroid/inclide_server/service/TcpMessageHandler.java @@ -0,0 +1,53 @@ +package com.imdroid.inclide_server.service; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +/** + * @author Layton + * @date 2023/2/18 20:36 + */ +public class TcpMessageHandler extends SimpleChannelInboundHandler { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final TCPClient tcpClient; + + public TcpMessageHandler(TCPClient tcpClient) { + this.tcpClient = tcpClient; + } + + @Override + protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf buf) throws Exception { + String msg = buf.toString(Charset.defaultCharset()); + tcpClient.onMessage(msg); + if (logger.isDebugEnabled()) { + logger.debug("receive server message:" + msg); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + logger.info("tcp channel active"); + tcpClient.onConnected(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + logger.info("tcp channel inactive"); + tcpClient.onDisconnect(true); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + logger.error("TcpMessageHandler error: {}", cause.toString()); + ctx.close(); + tcpClient.onDisconnect(false); + } + +} diff --git a/sec-incline-server/src/main/resources/application.properties b/sec-incline-server/src/main/resources/application.properties new file mode 100644 index 00000000..f2ea7d32 --- /dev/null +++ b/sec-incline-server/src/main/resources/application.properties @@ -0,0 +1,27 @@ +server.port=9918 +server.servlet.context-path=/ + +spring.application.name=surface-incline +spring.application.build=202510 + +spring.jpa.show-sql = true +spring.jpa.hibernate.ddl-auto = none +spring.jpa.database-platform = org.hibernate.dialect.MySQLDialect +spring.datasource.url = jdbc:mysql://localhost:3306/beidou?characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai +#spring.datasource.url=jdbc:mysql://139.9.51.237:3306/beidou?characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai +spring.datasource.username = admin +spring.datasource.password = DBMgr_2022 +spring.datasource.driverClassName = com.mysql.cj.jdbc.Driver + +spring.jackson.dateFormat = yyyy-MM-dd HH:mm:ss +spring.jackson.time-zone = GMT+8 + +app.format.date = yyyy-MM-dd +app.format.time = HH:mm:ss +app.format.datetime = yyyy-MM-dd HH:mm:ss + +mybatis-plus.configuration.map-underscore-to-camel-case=false + +netty.data.port=9919 + +incline.data_server_list = QJTestServer,62ng747631.goho.co,54483