菜单

duckflew
发布于 2024-09-13 / 79 阅读
0
0

未命名文章

定时连接基于Netty封装的Tcp客户端 出现阻塞问题

基于Netty 封装了一个TCP客户端如下,系统的执行逻辑比较简单,实现一个定时任务 用SpringBoot提供的api实现,五秒钟查询一次数据库(目前测试的数据量不大) 对所有的客户端发起一次连接,采用并发异步的方式进行连接 用Netty的future客监听连接的情况

客户端管理类

/**
 * 客户端管理器
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class UnitClientManager {
    private final Map<Long, Client> CLIENT_MAP = new ConcurrentHashMap<>();
    private final PropProcessor propProcessor;
    private final UnitService unitService;
    private final CanProtocolLoader canProtocolLoader;
    @Scheduled(cron = "0/5 * * * * ?")
    public void checkConnection() {
        List<Unit> units = unitService.getAll();
        log.info("unit size={}", units.size());
        for (Unit unit : units) {
            processUnit(unit);
        }
    }

    private void processUnit(Unit unit) {
        Client client = getById(unit.getId());
        if (client == null) {
            Client newClient = createClient(unit);
            CLIENT_MAP.put(unit.getId(), newClient);
        } else if (!client.isOnline()){
            client.connect();
        }
    }


    private Client createClient(Unit unit) {
        switch (unit.getUnitType()) {
            case  ELEVATOR,TWIN->{
                return new CanTCPClient(unit,propProcessor,canProtocolLoader);
            }
            case ESCALATOR,TRAVELATOR ->{
                return new ModbusClient(unit, propProcessor);
            }
            default->{
                log.error("Unsupported device type: {}", unit);
            }
        }
        return null;
    }
    public Client getById(Long id) {
        return  CLIENT_MAP.get(id);
    }
    public List<Client> allClients() {
        return List.copyOf(CLIENT_MAP.values());
    }

    private void closeAllConnections() {
        List<CompletableFuture<Void>> futures = CLIENT_MAP.values().stream()
                .map(Client::close)
                .toList();
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    }
    @PreDestroy
    public void destroy() {
        // 关闭所有的连接
        closeAllConnections();
        // 优雅关闭EventLoopGroup
    }
}

tcp客户端类

package com.rms.session;

import com.rms.codec.BasicDecoder;
import com.rms.codec.ExceptionHandler;
import com.rms.codec.can.CanCodec;
import com.rms.codec.can.CanFrameDecoder;
import com.rms.domain.entity.Unit;
import com.rms.handler.DeviceOnlineHandler;
import com.rms.handler.PropProcessor;
import com.rms.protocol.CanProtocolLoader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import static com.rms.constant.Constants.*;

/**
 * 连接CAN网关的客户端
 */
@Slf4j
public class CanTCPClient extends Client{
    private static final EventLoopGroup eventLoopGroup= new NioEventLoopGroup();
    private final PropProcessor propProcessor;
    private final CanProtocolLoader canProtocolLoader;
    private final Lock lock= new ReentrantLock();
    @Setter
    private Channel channel;
    public CanTCPClient(Unit unit, PropProcessor propProcessor, CanProtocolLoader canProtocolLoader) {
        super(unit);
        this.propProcessor = propProcessor;
        this.canProtocolLoader = canProtocolLoader;
    }

    @Override
    public boolean isOnline() {
        return gatewayOnline&&deviceOnline;
    }

    @Override
    public CompletableFuture<Void> close() {
        if (channel == null) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ChannelFuture channelFuture = channel.close();
        channelFuture.addListener(future -> {
            if (future.isSuccess()) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(future.cause());
            }
        });
        return completableFuture;
    }

    @Override
    public void connect(){
        String ipAddress = unit.getIpAddress();
        int port = unit.getPort();
        // 启动客户端
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
                .option(ChannelOption.TCP_NODELAY, true);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) {
                socketChannel.pipeline()
                        .addLast(new CanCodec())
                        .addLast(new DeviceOnlineHandler())
                        .addLast(new CanFrameDecoder(canProtocolLoader,new BasicDecoder()))
                        .addLast(propProcessor)
                        .addLast(new ExceptionHandler());
                socketChannel.attr(CLIENT_ATTRIBUTE_KEY).set(CanTCPClient.this);
                setChannel(socketChannel);
            }
        });
        bootstrap.connect(ipAddress, port)
                .addListener(f->{
                    if (f.isSuccess()){
                        log.info("[{}]连接成功",unit.getName());
                        setGatewayOnline(true);
                        setDeviceOnline(true);
                    }
                    else {
                        log.info("[{}]连接失败",unit.getName());
                        setGatewayOnline(false);
                        setDeviceOnline(false);
                    }
                });
        }
}

系统启动之后 运行一两次之后会卡死

Name:dataSource, Connection:8, Time:2, Success:True
Type:Prepared, Batch:False, QuerySize:1, BatchSize:0
Query:["select u1_0.id,u1_0.bms_id,u1_0.create_time,fc1_0.id,fc1_0.config_name,fc1_0.create_time,fc1_0.floor_info,fc1_0.last_modified_time,u1_0.group_name,u1_0.ip_address,u1_0.last_modified_time,u1_0.location,u1_0.name,u1_0.port,u1_0.shaft_name,u1_0.unit_type,u1_0.up_car from unit u1_0 left join floor_config fc1_0 on fc1_0.id=u1_0.floor_config_id"]
Params:[()]
2024-09-13 15:44:10.017 INFO  [][] [scheduling-1] c.r.session.UnitClientManager:37 - unit size=16
2024-09-13 15:44:10.075 INFO  [][] [nioEventLoopGroup-4-8] com.rms.session.CanTCPClient:94 - [L4井道下轿厢]连接成功
2024-09-13 15:44:10.075 INFO  [][] [nioEventLoopGroup-4-10] com.rms.session.CanTCPClient:94 - [L2井道上轿厢]连接成功
2024-09-13 15:44:10.075 INFO  [][] [nioEventLoopGroup-4-3] com.rms.session.CanTCPClient:94 - [L2井道上轿厢]连接成功
2024-09-13 15:44:10.075 INFO  [][] [nioEventLoopGroup-4-9] com.rms.session.CanTCPClient:94 - [L1井道上轿厢]连接成功
2024-09-13 15:44:10.075 INFO  [][] [nioEventLoopGroup-4-13] com.rms.session.CanTCPClient:94 - [L3井道下轿厢]连接成功
2024-09-13 15:44:10.075 INFO  [][] [nioEventLoopGroup-4-14] com.rms.session.CanTCPClient:94 - [L4井道上轿厢]连接成功
2024-09-13 15:44:10.077 INFO  [][] [nioEventLoopGroup-4-12] com.rms.session.CanTCPClient:94 - [L3井道上轿厢]连接成功
2024-09-13 15:44:10.113 INFO  [][] [nioEventLoopGroup-4-1] com.rms.session.CanTCPClient:94 - [A2]连接成功
2024-09-13 15:44:11.074 INFO  [][] [nioEventLoopGroup-4-11] com.rms.session.CanTCPClient:99 - [L2井道下轿厢]连接失败
2024-09-13 15:44:11.074 INFO  [][] [nioEventLoopGroup-4-16] com.rms.session.CanTCPClient:99 - [L5井道上轿厢]连接失败
2024-09-13 15:44:11.074 INFO  [][] [nioEventLoopGroup-4-7] com.rms.session.CanTCPClient:99 - [L4井道上轿厢]连接失败
2024-09-13 15:44:11.076 INFO  [][] [nioEventLoopGroup-4-2] com.rms.session.CanTCPClient:99 - [L1井道上轿厢]连接失败
2024-09-13 15:44:11.076 INFO  [][] [nioEventLoopGroup-4-5] com.rms.session.CanTCPClient:99 - [L3井道上轿厢]连接失败
2024-09-13 15:44:11.076 INFO  [][] [nioEventLoopGroup-4-15] com.rms.session.CanTCPClient:99 - [L4井道下轿厢]连接失败
2024-09-13 15:44:11.076 INFO  [][] [nioEventLoopGroup-4-4] com.rms.session.CanTCPClient:99 - [L2井道下轿厢]连接失败
2024-09-13 15:44:11.076 INFO  [][] [nioEventLoopGroup-4-6] com.rms.session.CanTCPClient:99 - [L3井道下轿厢]连接失败


评论