定时连接基于Netty封装的Tcp客户端 出现阻塞问题
基于Netty 封装了一个TCP客户端如下,系统的执行逻辑比较简单,实现一个定时任务 用SpringBoot提供的api实现,五秒钟查询一次数据库(目前测试的数据量不大) 对所有的客户端发起一次连接,采用并发异步的方式进行连接 用Netty的future客监听连接的情况
客户端管理类
/**
* 客户端管理器
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class UnitClientManager {
private final Map<Long, Client> CLIENT_MAP = new ConcurrentHashMap<>();
private final PropProcessor propProcessor;
private final UnitService unitService;
private final CanProtocolLoader canProtocolLoader;
@Scheduled(cron = "0/5 * * * * ?")
public void checkConnection() {
List<Unit> units = unitService.getAll();
log.info("unit size={}", units.size());
for (Unit unit : units) {
processUnit(unit);
}
}
private void processUnit(Unit unit) {
Client client = getById(unit.getId());
if (client == null) {
Client newClient = createClient(unit);
CLIENT_MAP.put(unit.getId(), newClient);
} else if (!client.isOnline()){
client.connect();
}
}
private Client createClient(Unit unit) {
switch (unit.getUnitType()) {
case ELEVATOR,TWIN->{
return new CanTCPClient(unit,propProcessor,canProtocolLoader);
}
case ESCALATOR,TRAVELATOR ->{
return new ModbusClient(unit, propProcessor);
}
default->{
log.error("Unsupported device type: {}", unit);
}
}
return null;
}
public Client getById(Long id) {
return CLIENT_MAP.get(id);
}
public List<Client> allClients() {
return List.copyOf(CLIENT_MAP.values());
}
private void closeAllConnections() {
List<CompletableFuture<Void>> futures = CLIENT_MAP.values().stream()
.map(Client::close)
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
@PreDestroy
public void destroy() {
// 关闭所有的连接
closeAllConnections();
// 优雅关闭EventLoopGroup
}
}
tcp客户端类
package com.rms.session;
import com.rms.codec.BasicDecoder;
import com.rms.codec.ExceptionHandler;
import com.rms.codec.can.CanCodec;
import com.rms.codec.can.CanFrameDecoder;
import com.rms.domain.entity.Unit;
import com.rms.handler.DeviceOnlineHandler;
import com.rms.handler.PropProcessor;
import com.rms.protocol.CanProtocolLoader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import static com.rms.constant.Constants.*;
/**
* 连接CAN网关的客户端
*/
@Slf4j
public class CanTCPClient extends Client{
private static final EventLoopGroup eventLoopGroup= new NioEventLoopGroup();
private final PropProcessor propProcessor;
private final CanProtocolLoader canProtocolLoader;
private final Lock lock= new ReentrantLock();
@Setter
private Channel channel;
public CanTCPClient(Unit unit, PropProcessor propProcessor, CanProtocolLoader canProtocolLoader) {
super(unit);
this.propProcessor = propProcessor;
this.canProtocolLoader = canProtocolLoader;
}
@Override
public boolean isOnline() {
return gatewayOnline&&deviceOnline;
}
@Override
public CompletableFuture<Void> close() {
if (channel == null) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
ChannelFuture channelFuture = channel.close();
channelFuture.addListener(future -> {
if (future.isSuccess()) {
completableFuture.complete(null);
} else {
completableFuture.completeExceptionally(future.cause());
}
});
return completableFuture;
}
@Override
public void connect(){
String ipAddress = unit.getIpAddress();
int port = unit.getPort();
// 启动客户端
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
.addLast(new CanCodec())
.addLast(new DeviceOnlineHandler())
.addLast(new CanFrameDecoder(canProtocolLoader,new BasicDecoder()))
.addLast(propProcessor)
.addLast(new ExceptionHandler());
socketChannel.attr(CLIENT_ATTRIBUTE_KEY).set(CanTCPClient.this);
setChannel(socketChannel);
}
});
bootstrap.connect(ipAddress, port)
.addListener(f->{
if (f.isSuccess()){
log.info("[{}]连接成功",unit.getName());
setGatewayOnline(true);
setDeviceOnline(true);
}
else {
log.info("[{}]连接失败",unit.getName());
setGatewayOnline(false);
setDeviceOnline(false);
}
});
}
}
系统启动之后 运行一两次之后会卡死
Name:dataSource, Connection:8, Time:2, Success:True
Type:Prepared, Batch:False, QuerySize:1, BatchSize:0
Query:["select u1_0.id,u1_0.bms_id,u1_0.create_time,fc1_0.id,fc1_0.config_name,fc1_0.create_time,fc1_0.floor_info,fc1_0.last_modified_time,u1_0.group_name,u1_0.ip_address,u1_0.last_modified_time,u1_0.location,u1_0.name,u1_0.port,u1_0.shaft_name,u1_0.unit_type,u1_0.up_car from unit u1_0 left join floor_config fc1_0 on fc1_0.id=u1_0.floor_config_id"]
Params:[()]
2024-09-13 15:44:10.017 INFO [][] [scheduling-1] c.r.session.UnitClientManager:37 - unit size=16
2024-09-13 15:44:10.075 INFO [][] [nioEventLoopGroup-4-8] com.rms.session.CanTCPClient:94 - [L4井道下轿厢]连接成功
2024-09-13 15:44:10.075 INFO [][] [nioEventLoopGroup-4-10] com.rms.session.CanTCPClient:94 - [L2井道上轿厢]连接成功
2024-09-13 15:44:10.075 INFO [][] [nioEventLoopGroup-4-3] com.rms.session.CanTCPClient:94 - [L2井道上轿厢]连接成功
2024-09-13 15:44:10.075 INFO [][] [nioEventLoopGroup-4-9] com.rms.session.CanTCPClient:94 - [L1井道上轿厢]连接成功
2024-09-13 15:44:10.075 INFO [][] [nioEventLoopGroup-4-13] com.rms.session.CanTCPClient:94 - [L3井道下轿厢]连接成功
2024-09-13 15:44:10.075 INFO [][] [nioEventLoopGroup-4-14] com.rms.session.CanTCPClient:94 - [L4井道上轿厢]连接成功
2024-09-13 15:44:10.077 INFO [][] [nioEventLoopGroup-4-12] com.rms.session.CanTCPClient:94 - [L3井道上轿厢]连接成功
2024-09-13 15:44:10.113 INFO [][] [nioEventLoopGroup-4-1] com.rms.session.CanTCPClient:94 - [A2]连接成功
2024-09-13 15:44:11.074 INFO [][] [nioEventLoopGroup-4-11] com.rms.session.CanTCPClient:99 - [L2井道下轿厢]连接失败
2024-09-13 15:44:11.074 INFO [][] [nioEventLoopGroup-4-16] com.rms.session.CanTCPClient:99 - [L5井道上轿厢]连接失败
2024-09-13 15:44:11.074 INFO [][] [nioEventLoopGroup-4-7] com.rms.session.CanTCPClient:99 - [L4井道上轿厢]连接失败
2024-09-13 15:44:11.076 INFO [][] [nioEventLoopGroup-4-2] com.rms.session.CanTCPClient:99 - [L1井道上轿厢]连接失败
2024-09-13 15:44:11.076 INFO [][] [nioEventLoopGroup-4-5] com.rms.session.CanTCPClient:99 - [L3井道上轿厢]连接失败
2024-09-13 15:44:11.076 INFO [][] [nioEventLoopGroup-4-15] com.rms.session.CanTCPClient:99 - [L4井道下轿厢]连接失败
2024-09-13 15:44:11.076 INFO [][] [nioEventLoopGroup-4-4] com.rms.session.CanTCPClient:99 - [L2井道下轿厢]连接失败
2024-09-13 15:44:11.076 INFO [][] [nioEventLoopGroup-4-6] com.rms.session.CanTCPClient:99 - [L3井道下轿厢]连接失败