From 84aeaec6950411d23f0c680d1315e6f5021938ab Mon Sep 17 00:00:00 2001 From: weidong Date: Mon, 25 Dec 2023 11:22:06 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=A2=9E=E5=8A=A0TCP=20Channel?= =?UTF-8?q?=EF=BC=8C=E4=BD=9C=E4=B8=BA=E9=85=8D=E7=BD=AE=E9=80=9A=E9=81=93?= =?UTF-8?q?=E3=80=82=E5=B9=B3=E5=8F=B0=E5=85=BC=E5=AE=B9=E5=8D=95=E9=80=9A?= =?UTF-8?q?=E9=81=93=E8=AE=BE=E5=A4=87=E5=92=8C=E5=8F=8C=E9=80=9A=E9=81=93?= =?UTF-8?q?=EF=BC=88=E6=95=B0=E6=8D=AE=E9=80=9A=E9=81=93=E5=92=8C=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E9=80=9A=E9=81=93=E5=88=86=E7=A6=BB=EF=BC=89=E8=AE=BE?= =?UTF-8?q?=E5=A4=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sideslope/executor/BizExecutors.java | 6 +-- .../sideslope/server/DeviceChannel.java | 2 +- .../sideslope/server/OnlineChannels.java | 45 +++++++++------- .../sideslope/server/tcp/RtcmTcpHandler.java | 45 ++++++++++++++++ .../sideslope/server/tcp/RtcmTcpServer.java | 54 +++++++++++++++++++ .../sideslope/server/udp/RtcmUdpHandler.java | 34 ++---------- .../sideslope/server/udp/RtcmUdpServer.java | 2 +- .../sideslope/service/ThirdPartyClient.java | 11 ---- .../imdroid/sideslope/web/ApiController.java | 26 +++++---- .../src/main/resources/application.properties | 3 +- 10 files changed, 147 insertions(+), 81 deletions(-) create mode 100644 sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmTcpHandler.java create mode 100644 sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmTcpServer.java delete mode 100644 sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/service/ThirdPartyClient.java diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/BizExecutors.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/BizExecutors.java index 596b3442..3bcd4bb5 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/BizExecutors.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/executor/BizExecutors.java @@ -8,9 +8,9 @@ import java.util.List; @Component public class BizExecutors { - private final HashMap, Object> executors = new HashMap<>(); + private static HashMap, Object> executors = new HashMap<>(); - private final List> executorList; + private static List> executorList; public BizExecutors(List> executorList) { this.executorList = executorList; @@ -21,7 +21,7 @@ public class BizExecutors { } - public R execute(Q query) { + public static R execute(Q query) { Executor executor = (Executor)(executors.get(query.getClass())); return executor.execute(query); } diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/DeviceChannel.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/DeviceChannel.java index 39c93b1a..6c0709da 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/DeviceChannel.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/DeviceChannel.java @@ -33,7 +33,7 @@ public class DeviceChannel { this.channel = channel; this.address = address; lastTime = System.currentTimeMillis(); - this.tcp = address == null; + this.tcp = (address == null); } public boolean isOnline() { diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/OnlineChannels.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/OnlineChannels.java index 23a1c1ea..b716dc0e 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/OnlineChannels.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/OnlineChannels.java @@ -17,46 +17,53 @@ public class OnlineChannels { public static final OnlineChannels INSTANCE = new OnlineChannels(); - // DTU已连接,有imei上报,但deviceId还没上报 - // 记录DTU连接的好处:当DTU重连接而设备还没上报deviceId时,可以通过IMEI或广播消息来找到设备 - private final Map dtuChannels = new ConcurrentHashMap<>(); - // 设备已连接,deviceId已上报 - private final Map channels = new ConcurrentHashMap<>(); + private final Map dataChannels = new ConcurrentHashMap<>(); + private final Map configChannels = new ConcurrentHashMap<>(); private OnlineChannels() {} - public void putDtuChannel(String imei, InetSocketAddress address){ - dtuChannels.put(address, imei); - } - - public DeviceChannel updateDeviceChannel(String deviceId, Channel channel, InetSocketAddress address) { - DeviceChannel deviceChannel = channels.get(deviceId); + public DeviceChannel updateDataChannel(String deviceId, Channel channel, InetSocketAddress address) { + DeviceChannel deviceChannel = dataChannels.get(deviceId); if(deviceChannel == null){ deviceChannel = new DeviceChannel(deviceId, channel, address); - channels.put(deviceId, deviceChannel); + dataChannels.put(deviceId, deviceChannel); } else { deviceChannel.setChannel(channel); deviceChannel.setAddress(address); } - String imei = dtuChannels.get(address); - if(imei != null) deviceChannel.setImei(imei); + return deviceChannel; + } + + public DeviceChannel updateConfigChannel(String deviceId, Channel channel, InetSocketAddress address) { + DeviceChannel deviceChannel = configChannels.get(deviceId); + if(deviceChannel == null){ + deviceChannel = new DeviceChannel(deviceId, channel, address); + configChannels.put(deviceId, deviceChannel); + } + else { + deviceChannel.setChannel(channel); + deviceChannel.setAddress(address); + } return deviceChannel; } public Optional get(String deviceId) { - return Optional.ofNullable(channels.get(deviceId)); + return Optional.ofNullable(dataChannels.get(deviceId)); } - public List getIfPresent(List deviceIds) { - return deviceIds.stream().map(x -> channels.get(x)).filter(Objects::nonNull).collect(Collectors.toList()); + public List ifPresent(List deviceIds) { + return deviceIds.stream().map(x -> dataChannels.get(x)).filter(Objects::nonNull).collect(Collectors.toList()); } - public DeviceChannel getDeviceChannel(String deviceId){ - return channels.get(deviceId); + public DeviceChannel getDataChannel(String deviceId){ + return dataChannels.get(deviceId); + } + public DeviceChannel getConfigChannel(String deviceId){ + return configChannels.get(deviceId); } } diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmTcpHandler.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmTcpHandler.java new file mode 100644 index 00000000..e8f3ff67 --- /dev/null +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmTcpHandler.java @@ -0,0 +1,45 @@ +package com.imdroid.sideslope.server.tcp; + +import com.imdroid.common.util.DataTypeUtil; +import com.imdroid.sideslope.executor.BizExecutors; +import com.imdroid.sideslope.executor.MessageParser; +import com.imdroid.sideslope.message.BaseMessage; +import com.imdroid.sideslope.server.OnlineChannels; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@ChannelHandler.Sharable +@Component +public class RtcmTcpHandler extends SimpleChannelInboundHandler { + private final Logger logger = LoggerFactory.getLogger(RtcmTcpServer.class); + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf src) throws Exception{ + if (logger.isDebugEnabled()) { + byte[] data = new byte[src.readableBytes()]; + src.getBytes(0, data); + logger.debug("receive message:" + DataTypeUtil.getHexString(data)); + } + BaseMessage message = MessageParser.instance.parse(src); + OnlineChannels.INSTANCE.updateConfigChannel(message.getId(), ctx.channel(), null); + BizExecutors.execute(message); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) + throws Exception { + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + + ctx.close(); + } + +} \ No newline at end of file diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmTcpServer.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmTcpServer.java new file mode 100644 index 00000000..96fbb148 --- /dev/null +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/tcp/RtcmTcpServer.java @@ -0,0 +1,54 @@ +package com.imdroid.sideslope.server.tcp; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +@Component +public class RtcmTcpServer implements ApplicationRunner { + private final Logger logger = LoggerFactory.getLogger(RtcmTcpServer.class); + + @Value("${netty.config.port}") + private int port; + + @Override + public void run(ApplicationArguments args) throws Exception { + new Thread(this::start0, "tcp-server").start(); + } + + private void start0() { + logger.info("rtcm tcp server starting..."); + // Configure the server. + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + channel.pipeline().addLast(new RtcmTcpHandler()); + } + }); + Channel ch = b.bind(port).sync().channel(); + logger.info("tcp server start at port {}", port); + ch.closeFuture().sync(); + } catch (Exception el) { + logger.error(el.toString()); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/udp/RtcmUdpHandler.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/udp/RtcmUdpHandler.java index af431ed4..bea78589 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/udp/RtcmUdpHandler.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/udp/RtcmUdpHandler.java @@ -4,7 +4,6 @@ import com.imdroid.sideslope.exception.UnSupportedMessageException; import com.imdroid.sideslope.executor.BizExecutors; import com.imdroid.sideslope.executor.MessageParser; import com.imdroid.sideslope.message.BaseMessage; -import com.imdroid.sideslope.server.DeviceChannel; import com.imdroid.sideslope.server.OnlineChannels; import com.imdroid.common.util.DataTypeUtil; import io.netty.channel.ChannelHandler; @@ -14,11 +13,8 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.net.InetSocketAddress; - /** * @author Layton * @date 2023/2/13 11:47 @@ -29,9 +25,6 @@ public class RtcmUdpHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class); - @Autowired - private BizExecutors bizExecutors; - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { DatagramPacket packet = (DatagramPacket) msg; @@ -46,18 +39,10 @@ public class RtcmUdpHandler extends ChannelInboundHandlerAdapter { } // 消息解析 BaseMessage message = MessageParser.instance.parse(packet.content()); - procMessage(message, ctx, packet.sender()); + OnlineChannels.INSTANCE.updateDataChannel(message.getId(), ctx.channel(), packet.sender()); + BizExecutors.execute(message); } catch (UnSupportedMessageException e) { - // 是不是IMEI? - if(packet.content().readableBytes() == 15){ - byte[] data = new byte[packet.content().readableBytes()]; - packet.content().getBytes(0, data); - if(data[0]=='8' && data[1]=='6') { - OnlineChannels.INSTANCE.putDtuChannel(new String(data), packet.sender()); - } - else logger.warn("receive un supported message: {}", e.getMessage()); - } - else logger.warn("receive un supported message: {}", e.getMessage()); + logger.warn("receive un supported message: {}", e.getMessage()); } catch (Exception e) { logger.error("channel read error: {}", e.toString()); } finally { @@ -71,17 +56,4 @@ public class RtcmUdpHandler extends ChannelInboundHandlerAdapter { logger.error("Exception caught: {}", cause.toString()); } - void procMessage(BaseMessage message, ChannelHandlerContext ctx, InetSocketAddress senderAddr){ - // 为加快处理速度,只有收到D3F0/D3F2时才更新通道,包括对端地址和绑定imei - DeviceChannel deviceChannel = null; - if (message.getHeader() == 0xD3F0 || message.getHeader() == 0xD3F2) { - OnlineChannels.INSTANCE.updateDeviceChannel(message.getId(), ctx.channel(), senderAddr); - } else deviceChannel = OnlineChannels.INSTANCE.getDeviceChannel(message.getId()); - - if (deviceChannel != null) { - deviceChannel.updateRxBytes(message.getLen(), message.getHeader()); - } - // 业务处理 - bizExecutors.execute(message); - } } diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/udp/RtcmUdpServer.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/udp/RtcmUdpServer.java index 623d6272..6f699693 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/udp/RtcmUdpServer.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/server/udp/RtcmUdpServer.java @@ -22,7 +22,7 @@ public class RtcmUdpServer implements ApplicationRunner { private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class); - @Value("${netty.port:9903}") + @Value("${netty.data.port:9903}") private Integer port; @Autowired diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/service/ThirdPartyClient.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/service/ThirdPartyClient.java deleted file mode 100644 index 97f19935..00000000 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/service/ThirdPartyClient.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.imdroid.sideslope.service; - -import com.imdroid.secapi.dto.GnssCalcData; -import org.springframework.stereotype.Service; - -@Service -public class ThirdPartyClient { - public void send(GnssCalcData data){ - - } -} diff --git a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/web/ApiController.java b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/web/ApiController.java index afa03f55..fdbecb98 100644 --- a/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/web/ApiController.java +++ b/sec-beidou-rtcm/src/main/java/com/imdroid/sideslope/web/ApiController.java @@ -29,20 +29,18 @@ public class ApiController { public HttpResp config(String deviceId, String configuration) { Map status = new HashMap<>(); HttpResp resp = new HttpResp(); - OnlineChannels onlineChannels = OnlineChannels.INSTANCE; - onlineChannels - .get(deviceId) - .filter(DeviceChannel::isOnline) - .ifPresent(x -> { - status.put("status", "Online"); - status.put("lastUpdate", x.getLastTime()); - // send command - ByteBuf buf = Unpooled.buffer(); - byte[] data = getBinaryData(ConfigDataTypeEnum.HEX, configuration); - logger.info("send command:{}", configuration); - buf.writeBytes(data); - x.writeAndFlush(buf); - }); + DeviceChannel deviceChannel = OnlineChannels.INSTANCE.getConfigChannel(deviceId); + if(deviceChannel == null) deviceChannel = OnlineChannels.INSTANCE.getDataChannel(deviceId); + if(deviceChannel!=null && deviceChannel.isOnline()){ + status.put("status", "Online"); + // send command + ByteBuf buf = Unpooled.buffer(); + byte[] data = getBinaryData(ConfigDataTypeEnum.HEX, configuration); + logger.info("send command:{}", configuration); + buf.writeBytes(data); + deviceChannel.writeAndFlush(buf); + } + if (status.isEmpty()) { status.put("status", "Offline"); resp.setCode(HttpResp.HTTP_RSP_FAILED); diff --git a/sec-beidou-rtcm/src/main/resources/application.properties b/sec-beidou-rtcm/src/main/resources/application.properties index 4734b8d2..05977793 100644 --- a/sec-beidou-rtcm/src/main/resources/application.properties +++ b/sec-beidou-rtcm/src/main/resources/application.properties @@ -22,7 +22,8 @@ spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtualHost=/ -netty.port=9903 +netty.data.port=9903 +netty.config.port=9902 app.format.date = yyyy-MM-dd app.format.time = HH:mm:ss