1、增加TCP Channel,作为配置通道。平台兼容单通道设备和双通道(数据通道和配置通道分离)设备

This commit is contained in:
weidong 2023-12-25 11:22:06 +08:00
parent ccd8d35744
commit 84aeaec695
10 changed files with 147 additions and 81 deletions

View File

@ -8,9 +8,9 @@ import java.util.List;
@Component @Component
public class BizExecutors { public class BizExecutors {
private final HashMap<Class<?>, Object> executors = new HashMap<>(); private static HashMap<Class<?>, Object> executors = new HashMap<>();
private final List<Executor<?, ?>> executorList; private static List<Executor<?, ?>> executorList;
public BizExecutors(List<Executor<?, ?>> executorList) { public BizExecutors(List<Executor<?, ?>> executorList) {
this.executorList = executorList; this.executorList = executorList;
@ -21,7 +21,7 @@ public class BizExecutors {
} }
public <Q, R> R execute(Q query) { public static <Q, R> R execute(Q query) {
Executor<Q, R> executor = (Executor<Q, R>)(executors.get(query.getClass())); Executor<Q, R> executor = (Executor<Q, R>)(executors.get(query.getClass()));
return executor.execute(query); return executor.execute(query);
} }

View File

@ -33,7 +33,7 @@ public class DeviceChannel {
this.channel = channel; this.channel = channel;
this.address = address; this.address = address;
lastTime = System.currentTimeMillis(); lastTime = System.currentTimeMillis();
this.tcp = address == null; this.tcp = (address == null);
} }
public boolean isOnline() { public boolean isOnline() {

View File

@ -17,46 +17,53 @@ public class OnlineChannels {
public static final OnlineChannels INSTANCE = new OnlineChannels(); public static final OnlineChannels INSTANCE = new OnlineChannels();
// DTU已连接有imei上报但deviceId还没上报
// 记录DTU连接的好处当DTU重连接而设备还没上报deviceId时可以通过IMEI或广播消息来找到设备
private final Map<InetSocketAddress, String> dtuChannels = new ConcurrentHashMap<>();
// 设备已连接deviceId已上报 // 设备已连接deviceId已上报
private final Map<String, DeviceChannel> channels = new ConcurrentHashMap<>(); private final Map<String, DeviceChannel> dataChannels = new ConcurrentHashMap<>();
private final Map<String, DeviceChannel> configChannels = new ConcurrentHashMap<>();
private OnlineChannels() {} private OnlineChannels() {}
public void putDtuChannel(String imei, InetSocketAddress address){ public DeviceChannel updateDataChannel(String deviceId, Channel channel, InetSocketAddress address) {
dtuChannels.put(address, imei); DeviceChannel deviceChannel = dataChannels.get(deviceId);
}
public DeviceChannel updateDeviceChannel(String deviceId, Channel channel, InetSocketAddress address) {
DeviceChannel deviceChannel = channels.get(deviceId);
if(deviceChannel == null){ if(deviceChannel == null){
deviceChannel = new DeviceChannel(deviceId, channel, address); deviceChannel = new DeviceChannel(deviceId, channel, address);
channels.put(deviceId, deviceChannel); dataChannels.put(deviceId, deviceChannel);
} }
else { else {
deviceChannel.setChannel(channel); deviceChannel.setChannel(channel);
deviceChannel.setAddress(address); deviceChannel.setAddress(address);
} }
String imei = dtuChannels.get(address); return deviceChannel;
if(imei != null) deviceChannel.setImei(imei); }
public DeviceChannel updateConfigChannel(String deviceId, Channel channel, InetSocketAddress address) {
DeviceChannel deviceChannel = configChannels.get(deviceId);
if(deviceChannel == null){
deviceChannel = new DeviceChannel(deviceId, channel, address);
configChannels.put(deviceId, deviceChannel);
}
else {
deviceChannel.setChannel(channel);
deviceChannel.setAddress(address);
}
return deviceChannel; return deviceChannel;
} }
public Optional<DeviceChannel> get(String deviceId) { public Optional<DeviceChannel> get(String deviceId) {
return Optional.ofNullable(channels.get(deviceId)); return Optional.ofNullable(dataChannels.get(deviceId));
} }
public List<DeviceChannel> getIfPresent(List<String> deviceIds) { public List<DeviceChannel> ifPresent(List<String> deviceIds) {
return deviceIds.stream().map(x -> channels.get(x)).filter(Objects::nonNull).collect(Collectors.toList()); return deviceIds.stream().map(x -> dataChannels.get(x)).filter(Objects::nonNull).collect(Collectors.toList());
} }
public DeviceChannel getDeviceChannel(String deviceId){ public DeviceChannel getDataChannel(String deviceId){
return channels.get(deviceId); return dataChannels.get(deviceId);
}
public DeviceChannel getConfigChannel(String deviceId){
return configChannels.get(deviceId);
} }
} }

View File

@ -0,0 +1,45 @@
package com.imdroid.sideslope.server.tcp;
import com.imdroid.common.util.DataTypeUtil;
import com.imdroid.sideslope.executor.BizExecutors;
import com.imdroid.sideslope.executor.MessageParser;
import com.imdroid.sideslope.message.BaseMessage;
import com.imdroid.sideslope.server.OnlineChannels;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@ChannelHandler.Sharable
@Component
public class RtcmTcpHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final Logger logger = LoggerFactory.getLogger(RtcmTcpServer.class);
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf src) throws Exception{
if (logger.isDebugEnabled()) {
byte[] data = new byte[src.readableBytes()];
src.getBytes(0, data);
logger.debug("receive message:" + DataTypeUtil.getHexString(data));
}
BaseMessage message = MessageParser.instance.parse(src);
OnlineChannels.INSTANCE.updateConfigChannel(message.getId(), ctx.channel(), null);
BizExecutors.execute(message);
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,54 @@
package com.imdroid.sideslope.server.tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class RtcmTcpServer implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(RtcmTcpServer.class);
@Value("${netty.config.port}")
private int port;
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(this::start0, "tcp-server").start();
}
private void start0() {
logger.info("rtcm tcp server starting...");
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new RtcmTcpHandler());
}
});
Channel ch = b.bind(port).sync().channel();
logger.info("tcp server start at port {}", port);
ch.closeFuture().sync();
} catch (Exception el) {
logger.error(el.toString());
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@ -4,7 +4,6 @@ import com.imdroid.sideslope.exception.UnSupportedMessageException;
import com.imdroid.sideslope.executor.BizExecutors; import com.imdroid.sideslope.executor.BizExecutors;
import com.imdroid.sideslope.executor.MessageParser; import com.imdroid.sideslope.executor.MessageParser;
import com.imdroid.sideslope.message.BaseMessage; import com.imdroid.sideslope.message.BaseMessage;
import com.imdroid.sideslope.server.DeviceChannel;
import com.imdroid.sideslope.server.OnlineChannels; import com.imdroid.sideslope.server.OnlineChannels;
import com.imdroid.common.util.DataTypeUtil; import com.imdroid.common.util.DataTypeUtil;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -14,11 +13,8 @@ import io.netty.channel.socket.DatagramPacket;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/** /**
* @author Layton * @author Layton
* @date 2023/2/13 11:47 * @date 2023/2/13 11:47
@ -29,9 +25,6 @@ public class RtcmUdpHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class); private static final Logger logger = LoggerFactory.getLogger(RtcmUdpHandler.class);
@Autowired
private BizExecutors bizExecutors;
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg; DatagramPacket packet = (DatagramPacket) msg;
@ -46,18 +39,10 @@ public class RtcmUdpHandler extends ChannelInboundHandlerAdapter {
} }
// 消息解析 // 消息解析
BaseMessage message = MessageParser.instance.parse(packet.content()); BaseMessage message = MessageParser.instance.parse(packet.content());
procMessage(message, ctx, packet.sender()); OnlineChannels.INSTANCE.updateDataChannel(message.getId(), ctx.channel(), packet.sender());
BizExecutors.execute(message);
} catch (UnSupportedMessageException e) { } catch (UnSupportedMessageException e) {
// 是不是IMEI logger.warn("receive un supported message: {}", e.getMessage());
if(packet.content().readableBytes() == 15){
byte[] data = new byte[packet.content().readableBytes()];
packet.content().getBytes(0, data);
if(data[0]=='8' && data[1]=='6') {
OnlineChannels.INSTANCE.putDtuChannel(new String(data), packet.sender());
}
else logger.warn("receive un supported message: {}", e.getMessage());
}
else logger.warn("receive un supported message: {}", e.getMessage());
} catch (Exception e) { } catch (Exception e) {
logger.error("channel read error: {}", e.toString()); logger.error("channel read error: {}", e.toString());
} finally { } finally {
@ -71,17 +56,4 @@ public class RtcmUdpHandler extends ChannelInboundHandlerAdapter {
logger.error("Exception caught: {}", cause.toString()); logger.error("Exception caught: {}", cause.toString());
} }
void procMessage(BaseMessage message, ChannelHandlerContext ctx, InetSocketAddress senderAddr){
// 为加快处理速度只有收到D3F0/D3F2时才更新通道包括对端地址和绑定imei
DeviceChannel deviceChannel = null;
if (message.getHeader() == 0xD3F0 || message.getHeader() == 0xD3F2) {
OnlineChannels.INSTANCE.updateDeviceChannel(message.getId(), ctx.channel(), senderAddr);
} else deviceChannel = OnlineChannels.INSTANCE.getDeviceChannel(message.getId());
if (deviceChannel != null) {
deviceChannel.updateRxBytes(message.getLen(), message.getHeader());
}
// 业务处理
bizExecutors.execute(message);
}
} }

View File

@ -22,7 +22,7 @@ public class RtcmUdpServer implements ApplicationRunner {
private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class); private final Logger logger = LoggerFactory.getLogger(RtcmUdpServer.class);
@Value("${netty.port:9903}") @Value("${netty.data.port:9903}")
private Integer port; private Integer port;
@Autowired @Autowired

View File

@ -1,11 +0,0 @@
package com.imdroid.sideslope.service;
import com.imdroid.secapi.dto.GnssCalcData;
import org.springframework.stereotype.Service;
@Service
public class ThirdPartyClient {
public void send(GnssCalcData data){
}
}

View File

@ -29,20 +29,18 @@ public class ApiController {
public HttpResp config(String deviceId, String configuration) { public HttpResp config(String deviceId, String configuration) {
Map<String, Object> status = new HashMap<>(); Map<String, Object> status = new HashMap<>();
HttpResp resp = new HttpResp(); HttpResp resp = new HttpResp();
OnlineChannels onlineChannels = OnlineChannels.INSTANCE; DeviceChannel deviceChannel = OnlineChannels.INSTANCE.getConfigChannel(deviceId);
onlineChannels if(deviceChannel == null) deviceChannel = OnlineChannels.INSTANCE.getDataChannel(deviceId);
.get(deviceId) if(deviceChannel!=null && deviceChannel.isOnline()){
.filter(DeviceChannel::isOnline) status.put("status", "Online");
.ifPresent(x -> { // send command
status.put("status", "Online"); ByteBuf buf = Unpooled.buffer();
status.put("lastUpdate", x.getLastTime()); byte[] data = getBinaryData(ConfigDataTypeEnum.HEX, configuration);
// send command logger.info("send command:{}", configuration);
ByteBuf buf = Unpooled.buffer(); buf.writeBytes(data);
byte[] data = getBinaryData(ConfigDataTypeEnum.HEX, configuration); deviceChannel.writeAndFlush(buf);
logger.info("send command:{}", configuration); }
buf.writeBytes(data);
x.writeAndFlush(buf);
});
if (status.isEmpty()) { if (status.isEmpty()) {
status.put("status", "Offline"); status.put("status", "Offline");
resp.setCode(HttpResp.HTTP_RSP_FAILED); resp.setCode(HttpResp.HTTP_RSP_FAILED);

View File

@ -22,7 +22,8 @@ spring.rabbitmq.username=guest
spring.rabbitmq.password=guest spring.rabbitmq.password=guest
spring.rabbitmq.virtualHost=/ spring.rabbitmq.virtualHost=/
netty.port=9903 netty.data.port=9903
netty.config.port=9902
app.format.date = yyyy-MM-dd app.format.date = yyyy-MM-dd
app.format.time = HH:mm:ss app.format.time = HH:mm:ss