一次非典型的Netty内存泄露案例复盘

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

背景

作为后端开发相信大家或多或少都接触过Nettty说起Netty真实又爱又恨因为基于它可以很简单的开发高性能的Java网络通信服务但同时要是不小心就会出现各种奇奇怪怪的问题特别是由于特殊的内存管理机制很容易出现内存泄漏问题即OOM问题。这些天就遇到了类似的问题排查和解决起来确实废了不小功夫特别这里记录一下。

其实Netty内存泄漏问题一般都比较好解决典型的就是各种ByteBuf没有被Release但如果遇到非典型的问题就比较考验技术人员功力的时候了。

现象

监控系统显示有一个服务在下班前突然自己挂掉了。我看了一下这个服务今天确实调整了一下日志配置配置有错误没办法与logstash通讯但这个应该不会导致OOM问题才对。

于是查了一下日志显示最后是OOM了才很明显是由于内存泄漏造成的。

日志如下

image-20230115180147854

为了确认问题的可重复性又重启了服务第二天上班又用htop看了一下确实内存占用增加了一块证实并非偶发原因应该是代码中存在问题导致。于是开启了定位的过程。

定位过程

获取Dump文件

由于在生产系统中直接去定位会有影响还是研究一下dump文件

jps找到问题服务的pidjmap导出内存问题件zip压缩一下弄到开发机器。

分析dump文件

既然已经有了dump文件就选择一个工具来分析就好了我选择的是比较好使的HeroDump至少界面看着比较好看还有自动提示。

image-20230112160218956

明显这407个NioEventLoop不正常一个1M都能有400M+了占了90%+的内存就他没跑了。

过了一会第二次的快照分析也完成了结果相似而且NioEventLoop更是暴涨到1339了。

image-20230112160158339

可以看到NioEventLoop持续的在积累。

继续查看实际占用内存的数据类型

image-20230115204036928

这些byte[]应该是NioEventLoop下属的数据。

到这里可以看出主要问题是NioEventLoop的垃圾回收机制没有发挥作用或是回收的速度没有新增的速度快导致这个NioEventLoop持续增长。

这个结果反映了是不是一个好定位的问题因为问题往往都是出现在业务代码中而现在定位的结果却显示Netty本身的 核心NioEventLoop有问题。而Netty本身存在这么重大缺陷几乎是不可能的这条线是追查不下去了。

代码分析

于是只能分析负责客户端连接的代码


@Service
@Slf4j
public class ClientModeConnectionManager {

    /**
     * 定时刷新连接情况
     */
    @Scheduled(fixedRate = 30 * 1000L)
    public void refreshConnection() {

        //没有初始化完成就跳出
        if (!flag) {
            return;
        }
        //查询配置文件中全部的设备信息
        Map<String, String> all = configService.getAllByType("server");

        //断开不再启用的设备
        connectionCacheService.getAll().forEach((puid, channel) -> {
            if (configService.findByPuid(puid) == null) {
                disconnect(puid);
            }
        });
        all.forEach((address, puid) -> {
            if (StringUtils.isEmpty(puid) || StringUtils.isEmpty(address)) {
                log.error("配置错误{}-{}", puid, address);
                return;
            }
            //查询在配置文件中未连接的设备
            if (!isConnected(puid)) {
                executorService.submit(() -> {
                    try {
                        connectToFacility(address, puid);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
        });
    }

    private boolean isConnected(String puid) {

        if (connectionCacheService.isConnected(puid)) {
            Channel channel = connectionCacheService.getChannel(puid);
            if (channel.isActive()) {
                return true;
            } else {
                disconnect(puid);
                return false;
            }
        } else {
            disconnect(puid);
            return false;
        }
    }

    private void disconnect(String puid) {
        Channel channel = connectionCacheService.getChannel(puid);
        if (channel != null) {
            try {
                channel.close().sync();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            connectionCacheService.clearChannel(puid);
        }

        EventLoopGroup eventExecutors = clients.get(puid);
        if (eventExecutors != null) {
            eventExecutors.shutdownGracefully();
        }
    }


    @PreDestroy
    public void stop() throws InterruptedException {
        Map<String, String> all = configService.getAllByType("server");
        for (String puid : all.values()) {
            if (StringUtils.isEmpty(puid)) {
                log.error("配置错误{}", puid);
                continue;
            }
            //查询在配置文件中未连接的设备
            if (connectionCacheService.isConnected(puid)) {
                Channel channel = connectionCacheService.getChannel(puid);
                channel.closeFuture().sync();
            }
            EventLoopGroup eventExecutors = clients.get(puid);
            if (eventExecutors != null) {
                eventExecutors.shutdownGracefully();
            }

        }
    }

    private synchronized void connectToFacility(String address, String puid) throws InterruptedException {
        log.info("开始尝试客户端模式连接{},PUID:{}", address, puid);
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        String[] array = address.split(":");
        String ip = array[0];
        int port = Integer.parseInt(array[1]);
        Bootstrap bs = new Bootstrap();

        bs.group(bossGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        // 处理来自服务端的响应信息
                        socketChannel.pipeline().addLast(channelHandlerAdapter);
                    }
                });

        // 客户端开启
        ChannelFuture cf = bs.connect(ip, port).sync();
        log.info("主机{}连接成功客户端模式,PUID:{}", ip, puid);

        clients.put(puid, bossGroup);
        Channel oldChannel = connectionCacheService.getChannel(puid);
        if (oldChannel != null && (!cf.channel().equals(oldChannel))) {
            try {
                oldChannel.closeFuture().sync();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        connectionCacheService.setChannel(puid, cf.channel());
    }
}

从代码中可以看出来这个服务跟大部分Netty使用场景不太一样大部分netty都是用作服务端了但这里是用作客户端。这就可能是问题所在Netty客户端开发人员没啥经验就很可能出现比较大的设计问题。

从上面的分析可以得到的结论应该是客户端管理服务也就是ClientModeConnectionManager中出现的问题。

通过对ClientModeConnectionManager的进一步分析同时对比各种示例程序发现有两个大的不同点。

  1. EventLoopGroup一般不作为局部变量使用
  2. 最后要释放EventLoopGroup也就是执行 eventLoopGroup.shutdownGracefully(),并不是要释放Bootstrap。

当然如果不是神仙或是对Netty极为熟悉的大佬是不可能一次性找到问题的这个发现是通过逐项的修改对比和实验才确定的。

最终为了更好的管理EventLoopGroup生命周期对代码进行一定的调整将连接停止逻辑独立为ConnectThread其实叫ClientThread更准确。具体请见下面示例和模板。

逻辑反向闭环

这个是定位问题非常重要的一步就是用你得到的结论再反向按照逻辑推出出现的问题如果逻辑不通那必定还存在其他问题。

逻辑推演过程

  1. EventLoopGroup虽然是局部变量但其实NioEventLoop是在独立的线程运行无法被垃圾回收
  2. 客户端管理代码没有在服务端连接异常的时候释放NioEventLoop也就是没有执行eventLoopGroup.shutdownGracefully()导致eventLoopGroup以及NioEventLoop相关的内存没有得到释放
  3. ClientModeConnectionManager每30s会进行一次刷新不断的尝试重新连接服务端但由于特殊原因服务端一直不在线因此会一直创建eventLoopGroup和对应的NioEventLoop
  4. NioEventLoop及其内部数据的内存不断积累最终导致OOM

修复确认

就算是逻辑反向闭环了也需要实验来最终验证最后还是要测试说了算的。

修复前

image-20230112151316926

image-202301121515015548小时内必OOM

修复后

在这里插入图片描述

在这里插入图片描述

持续测试

在生产系统上6.6%至少持续了5小时。

在这里插入图片描述

28小时后维持在6.7

在这里插入图片描述

45小时候依然在6.7%

在这里插入图片描述

image-20230115095832540

改善措施

启动方式改进

在启动命令行中添加-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\tmp参数保证异常挂掉时能够生成dump文件好第一时间定位问题。

建立Netty代码模板

形成标准的Netty客户端和服务端模板避免此类问题再次产生

客户端模板

ClientManager.java


/**
 * 服务启动监听器
 *
 * @author ZEW
 */
@Service
@Slf4j
public class ClientModeConnectionManager {

    /**
     * 定时刷新连接情况
     */
    @Scheduled(fixedRate = 30 * 1000L)
    public void refreshConnection() {

        //没有初始化完成就跳出
        if (!flag) {
            return;
        }
        //查询配置文件中全部的设备信息
        Map<String, String> all = configService.getAllByType("server");

        //断开不再启用的设备
        connectionCacheService.getAll().forEach((puid, channel) -> {
            if (configService.findByPuid(puid) == null) {
                disconnect(puid);
            }
        });
        all.forEach((address, puid) -> {
            if (StringUtils.isEmpty(puid) || StringUtils.isEmpty(address)) {
                log.error("配置错误{}-{}", puid, address);
                return;
            }
            //查询在配置文件中未连接的设备
            if (!isConnected(puid)) {
                disconnect(puid);
                ConnectThread task = new ConnectThread(puid, address, connectionCacheService, rabbitTemplate, configService);
                threads.put(puid, task);
                executorService.submit(task);

            }


        });


    }

    private boolean isConnected(String puid) {
        ConnectThread connectThread = threads.get(puid);
        if (connectThread == null) {

           return false;
        }
        return connectThread.isConnect();

    }

    private void disconnect(String puid) {
        ConnectThread connectThread = threads.get(puid);
        if (connectThread != null) {

            connectThread.disconnect();
        }
        threads.remove(puid);
    }


    @PreDestroy
    public void stop() throws InterruptedException {
        Map<String, String> all = configService.getAllByType("server");
        for (String puid : all.values()) {
            if (StringUtils.isEmpty(puid)) {
                log.error("配置错误{}", puid);
                continue;
            }
            //查询在配置文件中未连接的设备
            if (connectionCacheService.isConnected(puid)) {
                Channel channel = connectionCacheService.getChannel(puid);
                channel.closeFuture().sync();
            }
            disconnect(puid);


        }
    }


}

ConnectThread.java


/**
 * @author zew
 */
@Slf4j
public class ConnectThread implements Runnable {


    @Override
    public void run() {
        try {
            //避免重复连接导致NioEventLoopGroup和NioEventLoop重复创建内存溢出
            disconnect();
            connectToFacility(address, puid);
            if (currentChannel != null) {
                currentChannel.closeFuture().sync();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            log.error("{} CONNECT FAILED", puid, e);
            return;
        } finally {
            disconnect();
        }
        log.info("{} TRANSPORT FINISH", puid);


    }

    private void connectToFacility(String address, String puid) throws InterruptedException {
        log.info("START TO CONNECT TO {},PUID:{}", address, puid);
        String[] array = address.split(":");
        String ip = array[0];
        int port = Integer.parseInt(array[1]);
        bootstrap = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup();
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIME_OUT)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        // 处理来自服务端的响应信息
                        socketChannel.pipeline().addLast(
                                new ServerChannelHandlerAdapter(configService, connectionCacheService, rabbitTemplate));
                    }
                });

        try {
            // 客户端开启
            ChannelFuture cf = bootstrap.connect(ip, port).sync();
            if (cf.awaitUninterruptibly(TIME_OUT, TimeUnit.MILLISECONDS) && cf.isSuccess()) {
                log.info("HOST {} connect successfulCLIENT,PUID:{}", ip, puid);
                currentChannel = cf.channel();
                connectionCacheService.setChannel(puid, currentChannel);
            } else {
                log.warn("HOST {} connect failedCLIENT,PUID:{}", ip, puid);
            }
        } catch (InterruptedException e) {
            disconnect();
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            log.error("HOST {} connect failedCLIENT,PUID:{}", ip, puid, e);
        }
    }

    public void disconnect() {
        if (currentChannel != null) {
            try {
                currentChannel.close().sync();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        if (eventLoopGroup != null) {
            try {
                eventLoopGroup.shutdownGracefully().sync();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            eventLoopGroup = null;
        }
        bootstrap = null;
    }

    public boolean isConnect() {
        if (currentChannel == null) {
            return false;
        }
        if (currentChannel.isActive()) {
            return true;
        } else {
            log.warn("Channel:{} is not active,then close it", puid);
            return false;
        }

    }
}

这里的客户端代码还有改进空间比如重连机制可以作为listener单独处理。

服务端模板


/**
 * 服务启动监听器
 *
 * @author ZEW
 */
@Component
@Slf4j
public class ServerModeConnectionListener {
    @Value("${facility.connection.port:10030}")
    private Integer port;
    /**
     * 创建bootstrap
     */
    private final ServerBootstrap serverBootstrap = new ServerBootstrap();
    /**
     * BOSS
     */
    private final EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * Worker
     */
    private final EventLoopGroup work = new NioEventLoopGroup();

    /**
     * 通道适配器
     */
    @Resource
    private ServerChannelHandlerAdapter channelHandlerAdapter;

    /**
     * 关闭服务器方法
     */
    @PreDestroy
    public void close() {
        log.info("关闭服务器....");
        //优雅退出
        work.shutdownGracefully();
    }

    /**
     * 开启及服务线程
     */
    @PostConstruct
    public void start() {
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 4, 2,
                TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new DefaultThreadFactory("直连服务端模式-%d"));
        executorService.submit(() -> {
            // 从配置文件中(application.yml)获取服务端监听端口号
            serverBootstrap.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO));

            try {
                //设置事件处理
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(channelHandlerAdapter);
                    }
                });
                log.info("netty服务器在[{}]端口启动监听", port);
                ChannelFuture f = serverBootstrap.bind(port).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.info("[出现异常] 释放资源");

                Thread.currentThread().interrupt();
            } finally {
                work.shutdownGracefully();
            }
        });

    }
}

增加理论深度并结合实践

深入理解Netty原理和GC理论加速问题定位原因

  • 了解NioEventLoop和NioEventLoop关系
  • 了解客户端和服务端代码范式
  • 了解哪些地方需要同步处理哪些地方可以是异步的
  • 哪些地方需要使用Sync等待

总结

OOM问题往往最令人烦躁的就是难以定位和确定是否修复这个问题反反复复解决了一周。当然生产系统在人肉运维的情况下没有出现问题。

  • 实践出真知定位问题的能力更能够反映技术水平
  • 多看看官方文档比较靠谱而且往往都有例子来。
  • 单因素对比实验是在没有足够线索和理论支撑的情况下定位问题原因的有效方法。

参考资源

  1. https://netty.io/wiki/
  2. https://netty.io/4.1/xref/io/netty/example/udt/echo/bytes/ByteEchoClient.html#ByteEchoClient
  3. https://juejin.cn/post/7053793963680989198
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6