SOFAJRaft源码阅读-ShutdownHook如何优雅的停机

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

Java程序经常会遇到进程挂掉的情况,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码。JAVA中的ShutdownHook提供了比较好的方案。而在SOFAJRaft-example模块的CounterServer-main方法中就使用了shutdownHook实现优雅停机。
@Author:Akai-yuan
@更新时间:2023/1/25

1.触发场景与失效场景

JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,可以注册一个JVM关闭的钩子这个钩子可以在以下几种场景中被调用:

  1. 程序正常退出
  2. 执行了System.exit()方法
  3. 终端使用Ctrl+C触发的中断
  4. 系统关闭
  5. OutOfMemory宕机
  6. 使用Kill pid命令干掉进程(使用 **kill -9 pid **是不会被调用的)

以下几种情况中是无法被调用的:

  1. 通过kill -9命令杀死进程——所以kill -9一定要慎用;
  2. 程序中执行了Runtime.getRuntime().halt()方法;
  3. 操作系统突然崩溃,或机器掉电(用电设备因断电、失电、或电的质量达不到要求而不能正常工作)。

2.addShutdownHook方法简述

Runtime.getRuntime().addShutdownHook(shutdownHook);

该方法指,在JVM中增加一个关闭的钩子,当JVM关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,JVM才会关闭。所以这些钩子可以在JVM关闭的时候进行内存清理、对象销毁、关闭连接等操作。

3.SOFAJRaft中钩子函数的实现

通过反射获取到grpcServer实例的shutdown方法和awaitTerminationLimit方法,并添加到钩子函数当中

public static void blockUntilShutdown() {
        if (rpcServer == null) {
            return;
        }
        //当RpcFactoryHelper中维护的工厂类型是GrpcRaftRpcFactory时进入if条件内部
        if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals(RpcFactoryHelper.rpcFactory().getClass()
            .getName())) {
            try {
                //反射获取grpcServer中维护的(io.grpc包下的)server实例
                Method getServer = rpcServer.getClass().getMethod("getServer");
                Object grpcServer = getServer.invoke(rpcServer);
                //反射获取server实例的shutdown方法和awaitTerminationLimit方法
                Method shutdown = grpcServer.getClass().getMethod("shutdown");
                Method awaitTerminationLimit = grpcServer.getClass().getMethod("awaitTermination", long.class,
                    TimeUnit.class);
            	//添加一个shutdownHook线程执行方法
                Runtime.getRuntime().addShutdownHook(new Thread() {
                    @Override
                    public void run() {
                        try {
                            shutdown.invoke(grpcServer);
                            awaitTerminationLimit.invoke(grpcServer, 30, TimeUnit.SECONDS);
                        } catch (Exception e) {
                            // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                            e.printStackTrace(System.err);
                        }
                    }
                });
                //执行awaitTermination方法
                Method awaitTermination = grpcServer.getClass().getMethod("awaitTermination");
                awaitTermination.invoke(grpcServer);
            } catch (Exception e) {
                LOG.error("Failed to block grpc server", e);
            }
        }
    }

4.grpc中的shutdown方法

GrpcServer下的shutdown方法与本文的钩子函数无关,此处再对比分析一下GrpcServer的shutdown方法。

    public void shutdown() {
        //CAS
        //当且仅当期待值为true时(与当前AtomicBoolean类型的started一致),设置为false关闭
        if (!this.started.compareAndSet(true, false)) {
            return;
        }
        ExecutorServiceHelper.shutdownAndAwaitTermination(this.defaultExecutor);
        GrpcServerHelper.shutdownAndAwaitTermination(this.server);
    }

ExecutorServiceHelper#shutdownAndAwaitTermination:
我们可以发现实际上就是在执行ExecutorService 中 的shutdown()、shutdownNow()、awaitTermination() 方法,那么我们来区别以下这几个方法

public static boolean shutdownAndAwaitTermination(final ExecutorService pool, final long timeoutMillis) {
        if (pool == null) {
            return true;
        }
        // 禁止提交新任务
        pool.shutdown();
        final TimeUnit unit = TimeUnit.MILLISECONDS;
        final long phaseOne = timeoutMillis / 5;
        try {
            // 等待一段时间以终止现有任务
            if (pool.awaitTermination(phaseOne, unit)) {
                return true;
            }
            pool.shutdownNow();
            // 等待一段时间,等待任务响应被取消
            if (pool.awaitTermination(timeoutMillis - phaseOne, unit)) {
                return true;
            }
            LOG.warn("Fail to shutdown pool: {}.", pool);
        } catch (final InterruptedException e) {
            // (Re-)cancel if current thread also interrupted
            pool.shutdownNow();
            // preserve interrupt status
            Thread.currentThread().interrupt();
        }
        return false;
    }

  1. shutdown():停止接收新任务,原来的任务继续执行

1、停止接收新的submit的任务;
2、已经提交的任务(包括正在跑的和队列中等待的),会继续执行完成;
3、等到第2步完成后,才真正停止;


  1. shutdownNow():停止接收新任务,原来的任务停止执行

1、跟 shutdown() 一样,先停止接收新submit的任务;
2、忽略队列里等待的任务;
3、尝试将正在执行的任务interrupt中断;
4、返回未执行的任务列表;
说明:
它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。
所以,shutdownNow() 并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的。


  1. awaitTermination(long timeOut, TimeUnit unit):当前线程阻塞

当前线程阻塞,直到:

  • 等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
  • 或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
  • 或者 线程被中断,抛出InterruptedException

然后会监测 ExecutorService 是否已经关闭,返回true(shutdown请求后所有任务执行完毕)或false(已超时)

GrpcServerHelper#shutdownAndAwaitTermination
与ExecutorServiceHelper类中的shutdownAndAwaitTermination方法类似的,该方法将优雅的关闭grpcServer.

public static boolean shutdownAndAwaitTermination(final Server server, final long timeoutMillis) {
        if (server == null) {
            return true;
        }
        // disable new tasks from being submitted
        server.shutdown();
        final TimeUnit unit = TimeUnit.MILLISECONDS;
        final long phaseOne = timeoutMillis / 5;
        try {
            // wait a while for existing tasks to terminate
            if (server.awaitTermination(phaseOne, unit)) {
                return true;
            }
            server.shutdownNow();
            // wait a while for tasks to respond to being cancelled
            if (server.awaitTermination(timeoutMillis - phaseOne, unit)) {
                return true;
            }
            LOG.warn("Fail to shutdown grpc server: {}.", server);
        } catch (final InterruptedException e) {
            // (Re-)cancel if current thread also interrupted
            server.shutdownNow();
            // 保持中断状态
            Thread.currentThread().interrupt();
        }
        return false;
    }
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6