1、优化TCP发送

This commit is contained in:
weidong 2024-01-16 08:13:21 +08:00
parent 45fa9509e7
commit ae1d9c8287
3 changed files with 31 additions and 33 deletions

View File

@ -22,10 +22,10 @@ public class TCPClient {
//@Value("${xfz.server.port}") //@Value("${xfz.server.port}")
private int port; private int port;
private Bootstrap bootstrap; private Bootstrap bootstrap;
private EventLoopGroup group; private EventLoopGroup group;
private Channel channel; private Channel channel;
ByteBuf sendBuffer;
public void start() { public void start() {
new Thread(this::connect, "xfz-tcp-client").start(); new Thread(this::connect, "xfz-tcp-client").start();
@ -34,6 +34,7 @@ public class TCPClient {
public void init(String dest_addr, int dest_port) { public void init(String dest_addr, int dest_port) {
host = dest_addr; host = dest_addr;
port = dest_port; port = dest_port;
sendBuffer = Unpooled.buffer();
//客户端需要一个事件循环组 //客户端需要一个事件循环组
group = new NioEventLoopGroup(); group = new NioEventLoopGroup();
//创建客户端启动对象 //创建客户端启动对象
@ -45,7 +46,7 @@ public class TCPClient {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器 //加入处理器
ch.pipeline().addLast(new TcpMessageHandler()); ch.pipeline().addLast(new TcpMessageHandler(TCPClient.this));
} }
}); });
} }
@ -81,21 +82,26 @@ public class TCPClient {
} }
} }
public void writeAndFlush(String json) { public void writeAndFlush(String json) {
String str = "#" + json + "!"; sendBuffer.writeBytes(json.getBytes(StandardCharsets.UTF_8));
ByteBuf buf = Unpooled.buffer(); if(channel.isActive()) flush();
buf.writeBytes(str.getBytes(StandardCharsets.UTF_8)); else connect();
channel.writeAndFlush(buf).addListener(future -> {
if (future.isSuccess()) {
logger.info("send {} to xfz server succeed.", str);
} else {
logger.info("send {} to xfz server failed.", str);
}
});
} }
public boolean isActive(){ public void onConnectionActive(){
return channel.isActive(); flush();
}
private void flush(){
if(sendBuffer.readableBytes()>0){
channel.writeAndFlush(sendBuffer).addListener(future -> {
if (future.isSuccess()) {
logger.info("send to xfz server succeed.");
sendBuffer.clear();
} else {
logger.info("send to xfz server failed.");
}
});
}
} }
} }

View File

@ -15,6 +15,12 @@ public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final TCPClient tcpClient;
public TcpMessageHandler(TCPClient tcpClient) {
this.tcpClient = tcpClient;
}
@Override @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf buf) throws Exception { protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf buf) throws Exception {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -27,6 +33,7 @@ public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("xfz tcp channel active"); logger.info("xfz tcp channel active");
tcpClient.onConnectionActive();
} }
@Override @Override

View File

@ -82,26 +82,11 @@ public class GXXfzForwarder extends Forwarder{
data.setZ(NumberUtils.scale(locationRecord.getRb562d() * 0.001, 5)); data.setZ(NumberUtils.scale(locationRecord.getRb562d() * 0.001, 5));
sendNum++; sendNum++;
} }
String json = GsonUtil.toJson(xfzTcpMessage); String json = "#" + GsonUtil.toJson(xfzTcpMessage) + "!";
logger.info("project " + projectId + ": push calculation result to XFZ"); logger.info("project " + projectId + ": push calculation result to XFZ");
logger.info(json); logger.info(json);
if(!xfzTcpClient.isActive()) { xfzTcpClient.writeAndFlush(json);
xfzTcpClient.connect(); return sendNum;
try {
Thread.sleep(5000);
}
catch (Exception e){
}
}
if(xfzTcpClient.isActive()) {
xfzTcpClient.writeAndFlush(json);
return sendNum;
}
else{
logger.info("TCP disconnected!");
return 0;
}
} }
} }