1、修改TCP的重连机制,只有在发送数据时才connect

This commit is contained in:
weidong 2025-07-20 21:57:45 +08:00
parent e286c355ff
commit 2fd12b9361

View File

@ -29,7 +29,7 @@ public class TCPClient {
TCPListener listener; TCPListener listener;
public void start() { public void start() {
new Thread(this::connect, host+":"+port+" forwarder tcp-client").start(); //new Thread(this::connect, host+":"+port+" forwarder tcp-client").start();
} }
public void init(String dest_addr, int dest_port, TCPListener listener) { public void init(String dest_addr, int dest_port, TCPListener listener) {
@ -64,14 +64,14 @@ public class TCPClient {
if (!future.isSuccess()) { if (!future.isSuccess()) {
logger.info("{}:{} tcp connect failed. {}",host,port,future.cause().toString()); logger.info("{}:{} tcp connect failed. {}",host,port,future.cause().toString());
//重连交给后端线程执行 //重连交给后端线程执行
future.channel().eventLoop().schedule(() -> { /*future.channel().eventLoop().schedule(() -> {
logger.info("{}:{} tcp client reconnect",host,port); logger.info("{}:{} tcp client reconnect",host,port);
try { try {
connect(); connect();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
}, 5000, TimeUnit.MILLISECONDS); }, 5000, TimeUnit.MILLISECONDS);*/
} else { } else {
/*future.channel().config().setWriteBufferWaterMark(new WriteBufferWaterMark( /*future.channel().config().setWriteBufferWaterMark(new WriteBufferWaterMark(
1024 * 1024, // low 1024 * 1024, // low
@ -89,10 +89,26 @@ public class TCPClient {
} }
} }
boolean tryReconnect() throws Exception{
new Thread(this::connect, host+":"+port+" forwarder tcp-client").start();
for(int i=0; i<20; i++){
Thread.sleep(50);
if(channel.isActive()) return true;
}
return false;
}
public void writeAndFlush(String json) { public void writeAndFlush(String json) {
ByteBuf sendBuffer = Unpooled.buffer(); ByteBuf sendBuffer = Unpooled.buffer();
sendBuffer.writeBytes(json.getBytes(StandardCharsets.UTF_8)); sendBuffer.writeBytes(json.getBytes(StandardCharsets.UTF_8));
logger.info("send to {}: {}",host,json); logger.info("send to {}: {}",host,json);
if(!channel.isActive()){
try {
if(!tryReconnect()) return;
} catch (Exception e) {
logger.error(e.toString());
}
}
channel.writeAndFlush(sendBuffer).addListener(future -> { channel.writeAndFlush(sendBuffer).addListener(future -> {
if (future.isSuccess()) { if (future.isSuccess()) {
logger.info("send to tcp:"+host+" succeed."); logger.info("send to tcp:"+host+" succeed.");
@ -110,7 +126,7 @@ public class TCPClient {
} }
public void onDisconnect(boolean isIdle){ public void onDisconnect(boolean isIdle){
if(connectTime.isBefore(LocalDateTime.now().minusMinutes(1))) { /*if(connectTime.isBefore(LocalDateTime.now().minusMinutes(1))) {
connect(); connect();
} }
else{ else{
@ -121,7 +137,7 @@ public class TCPClient {
logger.error(e.toString()); logger.error(e.toString());
} }
},isIdle?30:10, TimeUnit.SECONDS); },isIdle?30:10, TimeUnit.SECONDS);
} }*/
} }
public void onMessage(String msg){ public void onMessage(String msg){