京东一面:20种异步,你知道几种? 含协程

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

背景说明

异步作为性能调优核心方式之一经常被用于各种高并发场景。

很多场景多会使用到异步比如

场景1 超高并发 批量 写 mysql 、批量写 elasticSearch

场景2 超高并发 批量 IO

场景3 超高并发 发送短信、发邮件

场景4 超高并发 发送消息

场景5 超高吞吐 生产者、 消费者 场景

场景6 超高吞吐 发布、 订阅 场景

场景7 分布式的 通知场景

场景8异步回调场景

场景9其他的 异步场景 不知道能列举出多少总之非常多

总之异步作为性能调优核心方式之一经常被用于各种高并发场景。

所以异步是一个非常、非常核心的面试知识点

在40岁老架构师 尼恩的社群中其相关面试题是一个非常、非常高频的交流话题。

其大概的出题形式有

形式1异步化作为应用调优的一个常用方式你知道具体有几种方式实现吗

形式2异步的实现方式有几种

形式3异步调用你知道哪些

形式4异步调用你用过哪些

形式5 实现异步的10多种方式你用过哪些

形式6 实现异步的10多种方式你知道几个 出自社群 美团一面

形式7 请参考 《尼恩Java面试宝典》还有很多变种…。

这里尼恩给大家做一下系统化、体系化的梳理

使得大家可以充分展示一下大家雄厚的 “技术肌肉”让面试官爱到 “不能自已、口水直流”

并把这个 题目以及参考答案收入咱们的 《尼恩Java面试宝典

供后面的小伙伴参考提升大家的 3高 架构、设计、开发水平。

注本文以 PDF 持续更新最新尼恩 架构笔记、面试题 的PDF文件请从这里获取码云


首先、什么是异步

同步

调用方在调用过程中持续阻塞一直到返回结果。

同步获取结果的方式是 主动等待。

异步

调用方在调用过程中不会阻塞 不直接等待返回结果 而是执行其他任务。

异步获取结果的方式是 被动通知或者 被动回调。

然后梳理一下异步的20种实现方式

  1. 新建线程Thread 实现异步
  2. 线程池化 实现异步
  3. Future 阻塞式异步
  4. guava 回调式异步
  5. Netty 回调式异步
  6. Servlet 3.0 异步
  7. CompletableFuture 回调式异步
  8. JDK 9 Flow 响应式编程
  9. RxJava 响应式 异步
  10. Reactor 响应式 异步
  11. Spring注解@Async 异步
  12. EventBus 框架 发布订阅模式异步
  13. Spring ApplicationEvent 事件 发布订阅模式
  14. RocketMq 消息队列 分布式 发布订阅模式Pub/Sub 异步
  15. Redis 消息队列 分布式 发布订阅模式Pub/Sub 异步
  16. Distruptor 框架异步
  17. ForkJoin 框架异步
  18. RocketMQ源码中ServiceThread 能急能缓的高性能异步
  19. Kotlin 协程 异步
  20. Project Loom 协程 异步

方式1新建线程Thread异步

在《Java 高并发核心 编程 卷2 加强版》 9.2.1 小节中介绍了一个经典的 异步案例《泡茶的案例》

分别设计三条线程泡茶线程MainThread主线程、烧水线程HotWarterThread、清洗线程WashThread。

  • 泡茶线程的工作是启动清洗线程、启动烧水线程等清洗、烧水的工作完成后泡茶喝

  • 清洗线程的工作是洗茶壶、洗茶杯

  • 烧水线程的工作是洗好水壶灌上凉水放在火上一直等水烧开。

其中负责烧水的线程HotWarterThread、负责WashThread 的线程都是通过线程Thread异步 的方式完成异步执行的。

具体的内容和示意图来自于 《Java 高并发核心 编程 卷2 加强版》 9.2.1 小节。 示意图如下

在这里插入图片描述

线程Thread异步 的知识非常多是Java 高并发的基础知识

具体请参见 《Java 高并发核心 编程 卷2 加强版》1.3节示意图如下

在这里插入图片描述

《Java 高并发核心 编程 卷2 加强版》 电子版PDF是免费的版本不断升级提取最新版本可以找尼恩即可。

方式2线程池化 异步

Thread线程和OS内核线程是一一对应的关系频繁的创建、销毁浪费系统资源并且涉及到进行内核态和用户态的切换这一切的一切都是低性能的。

如何提升性能呢可以将 线程池化 就是线程池。

我们可以采用线程池 下面的代码是来自于经历过 双十一 100Wqps 超高并发考验的 JD Hotkey框架的源码

public class AsyncPool {
    private static ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();

    public static void asyncDo(Runnable runnable) {
        threadPoolExecutor.submit(runnable);
    }

    public static void shutDown() {
        threadPoolExecutor.shutdown();
    }
}

然后可以将业务逻辑封装到RunnableCallable中交由线程池来执行。

下面的业务代码 是尼恩的第26章视频中基于 JD hotkey 源码二次定制的、 用于 实现三级缓存的 数据一致性的 异步刷新代码

package com.jd.platform.hotkey.worker.netty.pusher;
....省略 import 
/**
 * 推送到各客户端服务器
 *
 * @author wuweifeng wrote on 2020-02-24
 * @version 1.0
 */
@Component
public class RocketMqPusher implements IPusher {
    /**
     * 热key集中营
     */
    private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();

    private static Logger logger = LoggerFactory.getLogger(RocketMqPusher.class);

    @Resource
    MqSender mqSender;
    public static class MqSender{

      public   DefaultMQProducer mqProducer;
      public   String topic;

        public MqSender(DefaultMQProducer mqProducer, String topic) {
            this.mqProducer = mqProducer;
            this.topic = topic;
        }

        public void sendToMq(Command cmd) {
            Message msg = new Message(topic, "", "", cmd.json().getBytes());
            try {
                mqProducer.send(msg, 100000);
            } catch (Exception e) {
                logger.error("Failed to publish {} to RocketMQ", cmd.json(), e);
            }
        }
    }

    /**
     * 给客户端推key信息
     */
    @Override
    public void push(HotKeyModel model) {
        hotKeyStoreQueue.offer(model);
    }

    @Override
    public void remove(HotKeyModel model) {
//        push(model);
    }

    /**
     * 和dashboard那边的推送主要区别在于给app推送每10ms一次dashboard那边1s一次
     */
    @PostConstruct
    public void batchPushToClient() {
        AsyncPool.asyncDo(() -> {
            while (true) {
                try {
                    List<HotKeyModel> tempModels = new ArrayList<>();
                    //每10ms推送一次
                    Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);
                    if (CollectionUtil.isEmpty(tempModels)) {
                        continue;
                    }
                    String[] keys = tempModels.stream().map(m -> m.getKey()).toArray(String[]::new);
                    mqSender.sendToMq(new Command(Command.OPT_HOT_KEY, null, keys));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

线程Thread异步 的知识非常多是Java 高并发的基础知识

具体请参见 《Java 高并发核心 编程 卷2 加强版》1.6节示意图如下

在这里插入图片描述

《Java 高并发核心 编程 卷2 加强版》 电子版PDF是免费的版本不断升级提取最新版本可以找尼恩即可。

方式3Future 阻塞式异步

为了获取异步线程的返回结果以及更好的对异步线程的干预Java在1.5版本之后提供了一种新的多线程的创建方式—— FutureTask方式。

FutureTask方式包含了一系列的Java相关的类处于java.util.concurrent包中。

使用FutureTask方式进行异步调用时所涉及的重要组件为FutureTask类和Callable接口。

Future 的调用方式属于阻塞式异步

主要原因在于在获取异步线程处理结果时需要主线程主动通过Future.get() 去获取

如果异步线程没有执行完那么Future.get() 会阻塞 调用线程一直到超时。

参考的代码如下

具体的内容和示意图来自于 《Java 高并发核心 编程 卷2 加强版》 9.2.1 小节。 示意图如下

在这里插入图片描述

阻塞式异步Future的不足之处

Future的不足之处的包括以下几点

  • 无法被动接收异步任务的计算结果

    虽然我们可以主动将异步任务提交给线程池中的线程来执行但是待异步任务执行结束之后主线程无法得到任务完成与否的通知它需要通过get方法主动获取任务执行的结果。

  • Future件彼此孤立

有时某一个耗时很长的异步任务执行结束之后你想利用它返回的结果再做进一步的运算该运算也会是一个异步任务两者之间的关系需要程序开发人员手动进行绑定赋予Future并不能将其形成一个任务流pipeline每一个Future都是彼此之间都是孤立的所以才有了后面的CompletableFutureCompletableFuture就可以将多个Future串联起来形成任务流。

  • Futrue没有很好的错误处理机制

    截止目前如果某个异步任务在执行发的过程中发生了异常调用者无法被动感知必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误从而在做进一步的判断处理。

伪异步 与 纯异步

异步调用目的在于防止当前业务线程被阻塞。

但是 Future 阻塞式异步 属于 伪异步

伪异步 就是 将任务包装为Runnable/ Callable 作为Biz业务线程被调用线程的任务去执行并调用方阻塞等待当前Biz 线程不阻塞

纯异步为回调式 异步。他们的区别不在于是否将请求放入另一个线程池执行而在于是否有线程阻塞等待Response。

在前面的异步阻塞版本的泡茶喝的实现中

  • 泡茶线程是调用线程

  • 烧水或者清洗线程是被调用线程

  • 调用线程和被调用线程之间是一种主动关系而不是被动关系。

  • 泡茶线程需要主动获取烧水或者清洗线程的执行结果二者协同的方式是调用方阻塞。

为什么说二者协同的方式是调用方阻塞调用方线程需要通过join()或Future.get()阻塞式的干预 异步操作或者获取 异步结果这里是阻塞模式的异步 伪异步

这种调用方线程的阻塞是 线程资源的一种浪费。

线程资源是宝贵的。怎么充分的利用线程资源呢

有效方式之一 回调模式的 异步。实现 纯纯的异步

方式4guava 回调式异步

由于JDK在1.8之前没有 回调式异步组件于是出现了很多 开源的 回调式异步组件

比较常用的是 guava 的回调式异步。

Guava是Google提供的Java扩展包它提供了一种异步回调的解决方案。

Guava中与异步回调相关的源码处于com.google.common.util.concurrent包中。

包中的很多类都用于对java.util.concurrent的能力扩展和能力增强。

比如Guava的异步任务接口ListenableFuture扩展了Java的Future接口实现了异步回调的的能力。

使用 Guava 组件实现 异步回调模式 的泡茶喝 实例 具体的原理和代码请参见《Java 高并发核心 编程 卷2 加强版》9.5.4 章, 示意图如下:

在这里插入图片描述

出于 篇幅原因这里不做赘述 请大家看PDF电子书 可找尼恩面试获取最新版 书里非常细致。

方式5Netty 回调式异步

由于JDK在1.8之前没有 回调式异步组件于是出现了很多 开源的 回调式异步组件

Netty 也算其中之一。

Netty 是 一个 著名的高性能NIO王者框架 是 IO 的王者组件。 具体请参见尼恩梳理的四大王者组件。

Netty 除了作为NIO框架之王其子模也是可以单独使用的比如说异步回调模块

Netty 的 回调式异步组件 更加牛掰为啥呢

通过Netty源码可以知道 Netty 的 回调式异步组件不光提供了外部的回调监听设置而且可以在异步代码中 通过Promise接口可以对回调结果进行干预比如说在进行回调之前执行一些其他的操作。

当然Netty的源码更加复杂这部分内容 请参考尼恩的《第21章视频彻底穿透Netty源码视频》。

使用 Netty 的 回调式异步组件 实现 异步回调模式 的泡茶喝 实例 具体的原理和代码请参见《Java 高并发核心 编程 卷2 加强版》9.5.4 章, 示意图如下:

在这里插入图片描述

出于 篇幅原因这里不做赘述 请大家看PDF电子书 可找尼恩面试获取最新版 书里非常细致。

Callback Hell回调地狱问题

无论是 Google Guava 包中的 ListenableFuture还是 Netty的 GenericFutureListener都是需要设置专门的Callback 回调钩子

Guava 包中的 ListenableFuture设置Callback 回调钩子的实例如下



        ListenableFuture<Boolean> wFuture = gPool.submit(wJob);

        Futures.addCallback(wFuture, new FutureCallback<Boolean>() {
            public void onSuccess(Boolean r) {
                if (!r) {
                    Print.tcfo("杯子洗不了没有茶喝了");
                } else {

                    countDownLatch.countDown();

                }
            }

            public void onFailure(Throwable t) {
                Print.tcfo("杯子洗不了没有茶喝了");
            }
        });

调用方通过 Futures.addCallback() 添加处理结果的回调函数。

这样避免获取并处理异步任务执行结果阻塞调起线程的问题。

Callback 是将任务执行结果作为接口的入参在任务完成时回调 Callback 接口执行后续任务从而解决纯 Future 方案无法方便获得任务执行结果的问题。

但 Callback 产生了新的问题那就是代码可读性的问题。

因为使用 Callback 之后代码的字面形式和其所表达的业务含义不匹配即业务的先后关系到了代码层面变成了包含和被包含的关系。

因此如果大量使用 Callback 机制将使大量的存在先后次序的业务逻辑在代码形式上转换成层层嵌套

从而导致业务先后次序在代码维度被打乱最终造成代码不可理解、可读性差、难以理解、难以维护。

这便是所谓的 Callback Hell回调地狱问题。

Callback Hell 问题可以从两个方向进行一定的解决

  • 一是链式调用

  • 二是事件驱动机制。

前被 CompletableFuture、反应式编程等技术采用前者被如 EventBus、Vert.x 所使用。

方式6Servlet 3.0 异步

梳理一下 Callback 异步回调的使用场景。

Callback 真正体现价值是它与 NIO 技术结合之后。

  • CPU 密集型场景采用 Callback 回调没有太多意义
  • IO 密集型场景如果是使用 BIO模式Callback 同样没有意义因为一个连接一个线程IO线程是因为 IO 而阻塞。
  • IO 密集型场景如果是使用 NIO 模式使用Callback 才有意义。 NIO是少量IO线程负责大量IO通道IO线程需要避免线程阻塞所以也必须使用 Callback 才能使应用得以被开发出来。

所以高性能的 NIO 框架如 Netty 都是基于 Callback 异步回调的。

但是在微服务流行的今天Netty 却没有在WEB服务器中占据统治地位。

微服务系统中多级服务调用很常见一个服务先调 A再用结果 A 调 B然后用结果 B 调用 C等等。

如果使用Netty 作为底层服务器IO 线程能大大降低能处理的连接数/请求数也能大大增加那么为啥Netty 却没有在WEB服务器中占据统治地位呢

这其中的难度来自两方面

  • 一是 NIO 和 Netty 本身的技术难度
  • 二是 Callback hellCallback 风格所导致的代码理解和维护的困难。

因此Netty 通常用于在基础架构层面在业务系统中应用较少。

这也是大厂小伙人人要求 精通Netty而中小厂小伙伴不怎么认识Netty的原因。

当然作为IO之王学习Netty对应提升大家的内功是至关重要的。

Netty的知识具体请参见《Java高并发核心编程 卷1 加强版》,很多小伙伴靠此书入的门。

直接使用 Netty 开发WEB应用会遇到技术难度挑战、以及 Callback Hell 问题。

所以Servlet 3.0 提供了一个异步解决方案。

什么是servlet异步请求

Servlet 3.0 之前一个普通 Servlet 的主要工作流程大致如下

1Servlet 接收到请求之后可能需要对请求携带的数据进行一些预处理

2调用业务接口的某些方法以完成业务处理

3根据处理的结果提交响应Servlet 线程结束。

其中第二步处理业务逻辑时候很可以碰到比较耗时的任务此时servlet主线程会阻塞等待完成业务处理

对于并发比较大的请求可能会产生性能瓶颈则servlet3.0之后再此处做了调整引入了异步的概念。

1Servlet 接收到请求之后可能需要对请求携带的数据进行一些预处理

2调用业务接口的某些方法过程中request.startAsync()请求获取一个AsyncContext

3紧接着servlet线程退出(回收到线程池)但是响应response对象仍旧保持打开状态新增线程会使用AsyncContext处理并响应结果。

4AsyncContext处理完成触发某些监听通知结果

    @WebServlet(urlPatterns = "/demo", asyncSupported = true)
    public class AsyncDemoServlet extends HttpServlet {
        @Override
        public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException, ServletException {
// Do Something

            AsyncContext ctx = req.startAsync();
            startAsyncTask(ctx);
        }
    }

    private void startAsyncTask(AsyncContext ctx) {
        requestRpcService(result -> {
            try {
                PrintWriter out = ctx.getResponse().getWriter();
                out.println(result);
                out.flush();
                ctx.complete();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

Servlet 3.0 的出现解决了在过去基于 Servlet 的 Web 应用中接受请求和返回响应必须在同一个线程的问题实现了如下目标

  1. 可以避免了 Web 容器的线程被阻塞挂起
  2. 使请求接收之后的任务处理可由专门线程完成
  3. 不同任务可以实现线程池隔离
  4. 结合 NIO 技术实现更高效的 Web 服务

除了直接使用 Servlet 3.0也可以选择 Spring MVC 的 Deferred Result。

示例Spring MVC DeferredResult

 @GetMapping("/async-deferredresult")
    public DeferredResult<ResponseEntity<?>> handleReqDefResult(Model model) {
        LOG.info("Received async-deferredresult request");
        DeferredResult<ResponseEntity<?>> output = new DeferredResult<>();

        ForkJoinPool.commonPool().submit(() -> {
            LOG.info("Processing in separate thread");
            try {
                Thread.sleep(6000);
            } catch (InterruptedException e) {
            }
            output.setResult(ResponseEntity.ok("ok"));
        });

        LOG.info("servlet thread freed");
        return output;
    }

Servlet 3.0 的技术局限

Servlet 3.0 并不是用来解决前面提到的 Callback Hell 问题的它只是降低了异步 Web 编程的技术门槛。

对于 Callback Hell 问题使用 Servlet 3.0 或类似技术时同样会遇到。

解决 Callback Hell 还需另寻他法。

方式7回调式 异步CompletableFuture

JDK 1.8之前并没有实现回调式的异步CompletableFuture是JDK 1.8引入的实现类实现了JDK内置的异步回调模式异步。

CompletableFuture的创新是通过 链式调用解决 Callback Hell回调地狱问题 让代码变得的可理解行更强可读性 更强。

CompletableFuture 该类实现了Future和CompletionStage两个接口。

该类的实例作为一个异步任务可以在自己异步执行完成之后触发一些其他的异步任务从而达到异步回调的效果。

使用CompletableFuture实现泡茶喝实例的实现参考如下

package com.crazymakercircle.completableFutureDemo;

import com.crazymakercircle.util.Print;

import java.util.concurrent.CompletableFuture;

import static com.crazymakercircle.util.ThreadUtil.sleepSeconds;

public class DrinkTea {
    private static final int SLEEP_GAP = 3;//等待3秒

    public static void main(String[] args) {

        // 任务 1
        CompletableFuture<Boolean> washJob =
                CompletableFuture.supplyAsync(() ->
                {
                    Print.tcfo("洗茶杯");
                    //线程睡眠一段时间代表清洗中
                    sleepSeconds(SLEEP_GAP);
                    Print.tcfo("洗完了");

                    return true;
                });

        // 任务 2
        CompletableFuture<Boolean> hotJob =
                CompletableFuture.supplyAsync(() ->
                {
                    Print.tcfo("洗好水壶");
                    Print.tcfo("烧开水");

                    //线程睡眠一段时间代表烧水中
                    sleepSeconds(SLEEP_GAP);
                    Print.tcfo("水开了");
                    return true;

                });


        // 任务 3任务 1 和任务 2 完成后执行泡茶
        CompletableFuture<String> drinkJob =
                washJob.thenCombine(hotJob, (hotOk, washOK) ->
                {
                    if (hotOk && washOK) {
                        Print.tcfo("泡茶喝茶喝完");
                        return "茶喝完了";
                    }
                    return "没有喝到茶";
                });

        // 等待任务 3 执行结果
        Print.tco(drinkJob.join());


    }
}


具体的内容和示意图来自于 《Java 高并发核心 编程 卷2 加强版》10章。 示意图如下

在这里插入图片描述

出于 篇幅原因这里不做赘述 请大家看PDF电子书 可找尼恩面试获取最新版 书里非常细致。

方式8JDK 9 Flow 响应式编程

但是 JDK 8 的 CompletableFuture 属于链式调用它在形式上带有一些响应式编程的函数式代码风格。

因为 Callback Hell 对代码可读性有很大杀伤力从开发人员的角度来讲反应式编程技术和链式调用一样使得代码可读性要比 Callback 提升了许多。

响应式流从2013年开始作为提供非阻塞背压的异步流处理标准的倡议。

Reactive Stream是一套基于发布/订阅模式的数据处理规范。

更确切地说Reactive流目的是“找到最小的一组接口方法和协议用来描述必要的操作和实体以实现这样的目标以非阻塞背压方式实现数据的异步流”。

**响应式流Reactive Streams**是一个响应式编程的规范用来为具有非阻塞背压Back pressure的异步流处理提供标准用最小的一组接口、方法和协议用来描述必要的操作和实体。这里涉及到一个关键概念叫 Backpressure国内大部分翻译为背压我们先来了解这是什么。

响应式编程其实就是对数据流的编程而对流的处理对数据流的变化进行响应是通过异步监听的方式来处理的。既然是异步监听就涉及到监听事件的发布者和订阅者数据流其实就是由发布者生产再由一个或多个订阅者进行消费的元素item序列。

那么如果发布者生产元素的速度和订阅者消费元素的速度不一样是否会出现问题呢其实就两种情况

  • 发布者生产的速度比订阅者消费的速度慢那生产的元素可以及时被处理订阅者处理完只要等待发布者发送下一元素即可这不会产生什么问题。
  • 发布者生产的速度比订阅者消费的速度快那生产的元素无法被订阅者及时处理就会产生堆积如果堆积的元素多了订阅者就会承受巨大的资源压力(pressure)而有可能崩溃。

要应对第二种情况就需要进行流控制flow control。

流控制有多种方案其中一种机制就是 Back pressure即背压机制其实就是下游能够向上游反馈流量需求的机制。

如果生产者发出的数据比消费者能够处理的数据量大而且快时消费者可能会被迫一直再获取或处理数据消耗越来越多的资源从而埋下潜在的风险。为了防止这一点消费者可以通知生产者降低生产数据的速度。生产者可以通过多种方式来实现这一要求这时候我们就会用到背压机制。

采用背压机制后消费者告诉生产者降低生产数据速度并保存元素知道消费者能够处理更多的元素。使用背压可以有效地避免过快的生产者压制消费者。如果生产者要一直生产和保存元素使用背压也可能会要求其拥有无限制的缓冲区。生产者也可以实现有界缓冲区来保存有限数量的元素如果缓冲区已满可以选择放弃。

背压的实现方式

背压的实现方式有两种

  • 一种是阻塞式背压
  • 另一种是非阻塞式背压。

1、阻塞式背压

阻塞式背压是比较容易实现的例如当生产者和消费者在同一个线程中运行时其中任何一方都将阻塞其他线程的执行。这就意味着当消费者被执行时生产者就不能发出任何新的数据元素。因而也需要一中自然地方式来平衡生产数据和消费数据的过程。

在有些情况下阻塞式背压会出现不良的问题比如当生产者有多个消费者时不是所有消费者都能以同样的速度消费消息。当消费者和生产者在不同环境中运行时这就达不到降压的目的了。

2、非阻塞式背压

背压机制应该以非阻塞式的方式工作实现非阻塞式背压的方法是放弃推策略采用拉策略。生产者发送消息给消费者等操作都可以保存在拉策略当中消费者会要求生产者生成多少消息量而且最多只能发送这些量然后等到更多消息的请求。

OKBackpressure 解释完毕。 其实《Java 高并发核心编程卷3 加强版》介绍得更加清楚大家可以系统化的去看看书。

JDK 8 的 CompletableFuture 不算是反应式编程不遵守Reactive Stream (响应式流/反应流) 规范。

JDK 9 Flow 是JDK对Reactive Stream (响应式流/反应流) 的实现

Java 9中新增了反应式/响应式编程的Api-Flow

Flow中存在Publisher发布者Subscriber订阅者Subscription订阅和`Processor处理器。

Flow`结构如下:

img

JDK 9 Flow 旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者而不需要发布者阻塞或订阅者需要有无限制的缓冲区或丢弃。

当然实施响应式编程需要完整的解决方案单靠 Flow 是不够的还是需要Netflix RxJava、 Project Reactor 这样的完整解决方案。

但是 JDK 层面的技术能提供统一的技术抽象和实现在统一技术方面还是有积极意义的。

所以这里不对 JDK 9 Flow 做介绍。

方式9RxJava 响应式 异步

在JDK 9 Flow 之前响应式编程 的框架早就存在。

比如说 席卷了android 端编程的 RxJava 框架。RxJava 是一种响应式编程来创建基于事件的异步操作库。

这个组件是 Netflix的杰作也叫作Netflix RxJava。

这个框架在Java 后端的中间件中也有广泛使用比如在Hystrix 源码中就用大量用到。

总之RxJava 框架很重要具体请参见 《Java 高并发核心编程 卷3 加强版》

使用 RxJava 基于事件流的链式调用、代码 逻辑清晰简洁。

package com.crazymakercircle.completableFutureDemo;

import com.crazymakercircle.util.Print;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;

import static com.crazymakercircle.util.ThreadUtil.sleepMilliSeconds;
import static com.crazymakercircle.util.ThreadUtil.sleepSeconds;

public class IntegrityDemo {
    /**
     * 模拟模拟RPC调用1
     */
    public String rpc1() {
        //睡眠400ms,模拟执行耗时
        sleepMilliSeconds(600);
        Print.tcfo("模拟RPC调用服务器 server 1");
        return "sth. from server 1";
    }

    /**
     * 模拟模拟RPC调用2
     */
    public String rpc2() {
        //睡眠400ms,模拟执行耗时
        sleepMilliSeconds(600);
        Print.tcfo("模拟RPC调用服务器 server 2");
        return "sth. from server 2";
    }

    @Test
    public void rpcDemo() throws Exception {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() ->
        {
            return rpc1();
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> rpc2());
        CompletableFuture<String> future3 = future1.thenCombine(future2,
                (out1, out2) ->
                {
                    return out1 + " & " + out2;
                });
        String result = future3.get();
        Print.tco("客户端合并最终的结果" + result);
    }

    @Test
    public void rxJavaDemo() throws Exception {
        Observable<String> observable1 = Observable.fromCallable(() ->
        {
            return rpc1();
        }).subscribeOn(Schedulers.newThread());
        Observable<String> observable2 = Observable
                .fromCallable(() -> rpc2()).subscribeOn(Schedulers.newThread());

        Observable.merge(observable1, observable2)
                .observeOn(Schedulers.newThread())
                .toList()
                .subscribe((result) -> Print.tco("客户端合并最终的结果" + result));

        sleepSeconds(Integer.MAX_VALUE);
    }

}

如果需要从设计模式的高度理解Rxjava、或者响应式编程请参考 《Java 高并发核心 编程 卷3 加强版》5章。 示意图如下

《Java 高并发核心 编程 卷3 加强版》5章

方式10Reactor 响应式 异步

目前在 Java 领域实现了反应式编程的技术除了 Netflix RxJava 还有 Spring 的 Project Reactor。

Netflix 网飞 的RxJava 出现时间更早在前端开发领域应用的比后端更要广泛一些。

Spring 的 Project Reactor的 3.0 版本作为 Spring 5 的基础在17年底发布推动了后端领域反应式编程的发展。

关于Reactor 响应式异步的 内容请参考尼恩的另外一篇博客

Flux、Mono、Reactor 实战史上最全

这篇文章被小伙伴认为是全网介绍 Reactor 组件讲得最好的文章

在这里插入图片描述

其实尼恩的其他文章非常优质大家都可以看看。

由于性能比较高响应式编程越来越普及 建议大家及早掌握。

尼恩的50个群中很多开始响应式编程的小伙伴反馈 入手难、后面爽。

方式11Spring的@Async异步

在Spring中使用@Async标注某方法可以使该方法变成异步方法这些方法在被调用的时候将会在独立的线程中进行执行调用者不需等待该方法执行完成。

但在Spring中使用@Async注解需要使用@EnableAsync来开启异步调用。

一个 AsyncService参考代码如下

public interface AsyncService {

    MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);

    MessageResult sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {

    @Autowired
    private IMessageHandler mesageHandler;

    @Override
    @Async("taskExecutor")
    public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {
        try {

            Thread.sleep(1000);
            mesageHandler.sendSms(callPrefix, mobile, actionType, content);

        } catch (Exception e) {
            log.error("发送短信异常 -> ", e)
        }
    }
    
    @Override
    @Async("taskExecutor")
    public sendEmail(String email, String subject, String content) {
        try {

            Thread.sleep(1000);
            mesageHandler.sendsendEmail(email, subject, content);

        } catch (Exception e) {
            log.error("发送email异常 -> ", e)
        }
    }
}

@Async注解默认使用系统自定义线程池。

在实际项目中推荐等方式是是使用自定义线程池的模式。

可在项目中设置多个线程池在异步调用的时候指明需要调用的线程池名称比如@Async(“taskName”)

自定义异步线程池的代码如下

/**
 * 线程池参数配置多个线程池实现线程池隔离
@EnableAsync
@Configuration
public class TaskPoolConfig {
    /**
     * 自定义线程池
     *
     **/
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        //返回可用处理器的Java虚拟机的数量 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系统最大线程数   " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(16);
        //最大线程数
        executor.setMaxPoolSize(20);
        //配置队列容量默认值为Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //设置此执行程序应该在关闭时阻止的最大秒数以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
        executor.setAwaitTerminationSeconds(60);
        //等待所有的任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

方式12EventBus 发布订阅模式异步

实际开发中常常 通过事件总线EventBus/AsyncEventBus进行JAVA模块解耦

比如在顶级开源组件 JD hotkey的源码中 就多次用到 EventBus/AsyncEventBus进行JAVA模块解耦

掌握了 EventBus 在平时的开发中多了一个神器

在这里插入图片描述

EventBus 是 Guava 的事件处理机制是观察者模式生产/消费模型的一种实现。

EventBus是google的Guava库中的一个处理组件间通信的事件总线它基于发布/订阅模式实现了多组件之间通信的解耦合事件产生方和事件消费方实现解耦分离提升了通信的简洁性。

观察者模式在我们日常开发中使用非常广泛

例如在订单系统中订单状态或者物流信息的变更会向用户发送APP推送、短信、通知卖家、买家等等审批系统中审批单的流程流转会通知发起审批用户、审批的领导等等。

Observer模式也是 JDK 中自带就支持的其在 1.0 版本就已经存在 Observer

不过随着 Java 版本的飞速升级其使用方式一直没有变化许多程序库提供了更加简单的实现例如 Guava EventBus、RxJava、EventBus 等

为什么要用 EventBus 其优点

EventBus 优点

  • 相比 Observer 编程简单方便
  • 通过自定义参数可实现同步、异步操作以及异常处理
  • 单进程使用无网络影响

缺点

  • 只能单进程使用
  • 项目异常重启或者退出不保证消息持久化

如果需要分布式使用还是需要使用 MQ

使用事件总线的场景

当一个事件的发生(事件产生方)需要触发很多事件(事件消费方)的时候我们通常会在事件产生方中分别的去调用那些事件消费方这样往往是很浪费资源。事件的产生方与事件的消费方产生了极大的耦合如果我们要改动某一个事件消费方我们很可能还要改动事件的产生方。

在工作中经常会遇见使用异步的方式来发送事件或者触发另外一个动作经常用到的框架是MQ分布式方式通知; 如果是同一个jvm里面通知的话就可以使用EventBus 事件总线。

由于EventBus使用起来简单、便捷因此工作中会经常用到。

EventBus 是线程安全的分发事件到监听器并提供相应的方式让监听器注册它们自己。

EventBus允许组件之间进行 “发布-订阅” 式的通信而不需要这些组件彼此知道对方。

EventBus是专门设计用来替代传统的Java进程内的使用显示注册方式的事件发布模式。

EventBus不是一个通用的发布-订阅系统也不是用于进程间通信。

EventBus有三个关键要素

img

1、事件Event)

事件是EventBus之间相互通信的基本单位一个Event可以是任何类型。

对没错就是Object只要你想将任意一个Bean作为事件这个类不需要做任何改变就可以作为事件Event。不过在项目中不会这么随便除非对代码严谨度没什么要求。。

一般会定义特定的事件类类名以Event作为后缀里面定义一些变量或者函数等。

2、事件发布者Publisher

事件发布者就是发送事件到EventBus事件总线的一方事件发布者调用Post()方法将事件发给EventBus。

你可以在程序的任何地方调用EventBus的post()方法发送事件给EventBus由EventBus发送给订阅者们。

3、事件订阅者Subscriber

事件订阅者就是接收事件的一方这些订阅者需要在自己的方法上添加@Subscribe注解声明自己为事件订阅者。不过只声明是不够的还需要将自己所在的类注册到EventBus中EventBus才能扫描到这个订阅者。

关于EventBus 的 原理和实操内容请参考尼恩的另外一篇博客

通过事件总线EventBus/AsyncEventBus进行JAVA模块解耦 史上最全

方法13Spring ApplicationEvent事件实现异步

Spring内置了简便的事件机制原理和EventBus 差不多

通过Spring ApplicationEvent事件 可以非常方便的实现事件驱动核心类包括

  • ApplicationEvent具体事件内容事件抽象基类可继承该类自定义具体事件
  • ApplicationEventPublisher事件发布器可以发布ApplicationEvent也可以发布普通的Object对象
  • ApplicationListener事件监听器可以使用注解@EventListener
  • TransactionalEventListener事务事件监听可监听事务提交前、提交后、事务回滚、事务完成成功或失败

使用示例不定义事件直接发布Object对象同步

1、定义发送事件对象

public class UserEntity {
    private long id;
    private String name;
    private String msg;
}

2、定义事件监听器

可以添加条件condition限制监听具体的事件

@Slf4j
@Component
public class RegisterListener {

    @EventListener(condition = "#entity.id != null and #entity.async==false ")
    public void handlerEvent(UserEntity entity) {

        try {
            // 休眠5秒
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("handlerEvent: {}", entity);

    }

}

3、定义发送接口以及实现类

public interface IRegisterService {

    public void register(String name);

}
12345
@Service
public class RegisterServiceImpl implements IRegisterService {
    @Resource
    private ApplicationEventPublisher applicationEventPublisher;

    @Override
    public void publish(String name) {
        UserEntity entity = new UserEntity();
        entity.setName(name);
        entity.setId(1L);
        entity.setMsg("新用户注册同步调用");
        applicationEventPublisher.publishEvent(entity);
    }
}

4、测试Controller类进行测试

@Slf4j
@Controller
public class TestController {

    @Resource
    private IRegisterService registerService;

    @RequestMapping("test")
    @ResponseBody
    public void test1(String name) {
        registerService.publish(name);
        log.info("执行同步调用结束");
    }
}


在浏览器中输入地址http://localhost/test?name=nik

控制台输出

handlerEvent: UserEntity(id=1, name=nik, msg=新用户注册同步调用)
执行同步调用结束

但是上面的案例是同步事件如果需要编程异步事件还需要加上额外的注解

1、在启动类添加异步注解 @EnableAsync

2、在监听方法上添加注解 @Async

@Async
@EventListener(condition = "#entity.name != null and #entity.async ")
public void handlerEventAsync(UserEntity entity) {

    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.info("handlerEventAsync: {}", entity);
}

方法14 RocketMq 消息队列 分布式 发布订阅模式 异步

上游系统对下游系统的调用若为同步调用则会大大降低系统的吞吐量与并发度且系统耦合度太高。

而异步调用则会解决这些问题。

所以两层之间若要实现由同步到异步的转化一般性做法就是在这两层间添加一个MQ层。

在这里插入图片描述

RocketMq 消息队列 分布式 发布订阅模式Pub/Sub

基于事件的系统中Pub/Sub是目前广泛使用的通信模型它采用事件作为基本的通信机制提供大规模系统所要求的松散耦合的交互模式订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。

上游生产者参考

  1. 创建producer组
  2. 设置NameServer地址 如果实在安装不上可以使用这个地址115.159.88.63:9876
  3. startr生产者
  4. 发送消息获取结果
  5. 结束producer
public class Producer {

    public static void main(String[] args) throws Exception {

        //1.创建生产者组
        DefaultMQProducer producer = new DefaultMQProducer("producer-hello");

        //2.设置NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        //3.启动producer实例
        producer.start();

        //4.创建消息
        Message message = new Message("log-topic", "info-tag", "这是一个info信息".getBytes(RemotingHelper.DEFAULT_CHARSET));

        //5.发送消息
        SendResult result = producer.send(message);

        //6.关闭producer实例
        System.out.println("发送完毕结果: "+result);
    }
}

下游消费者参考

  1. 创建consumer组
  2. 设置Name Server地址
  3. 设置消费位置从最开始销毁
  4. 设置消息回调处理监听 -> 处理消息
  5. Start consumer
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-hello");

        //2.设置NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        //3.订阅topic指定tag标签
        consumer.subscribe("log-topic","info-tag");

        //4.注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently(){

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s 接收到新的消息:  %n", Thread.currentThread().getName());
                msgs.stream().forEach(messageExt -> {
                    String body = null;
                    try {
                        body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    System.out.println(body);
                });
			   //失败消费稍后尝试消费,会进行多次重试
                //return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5.启动消费者
        consumer.start();

        System.out.println("消费者启动...");

    }
}

  • DefaultMQPushConsumer 消费者 可以指定 consumerGroupName
  • consumer.setNamesrvAddr : 设置name server 地址
  • consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET) 从什么位置开始消费
  • consumer.subscribe(“topic_log”, “tags_error”) 订阅某个topic下的某个tags的消息
  • consumer.registerMessageListener 注册消息监听器拿到消息后进行消息处理。
  • ConsumeConcurrentlyStatus 消费者消费结果状态ConsumeConcurrentlyStatus.CONSUME_SUCCESS代表成功,ConsumeConcurrentlyStatus.RECONSUME_LATER代表消费失败稍后重试会进行多次重试

方法15Redis 消息队列 分布式 发布订阅模式 异步

在springboot项目中一般分布式 发布订阅模式 异步都是用RocketMQ的方式

如果集成太麻烦了而一般系统里面已经有了redis就用了redis做异步的功能

Redis发布订阅(pub/sub)是一种消息通信模式

  • 发送者(pub)发送消息
  • 订阅者(sub)接收消息。

Redis 发布订阅(pub/sub)实现了消息系统发送者(在redis术语中称为发布者)在接收者(订阅者)接收消息时发送消息。传送消息的链路称为信道。

在Redis中客户端可以订阅任意数量的信道。

  • 消息发布

消息发布者即publish客户端无需独占链接你可以在publish消息的同时使用同一个redis-client链接进行其他操作例如INCR等

  • 消息订阅

消息订阅者即subscribe客户端需要独占链接即进行subscribe期间redis-client无法穿插其他操作此时client以阻塞的方式等待“publish端”的消息这一点很好理解因此subscribe端需要使用单独的链接甚至需要在额外的线程中使用。

Redis 消息队列特点

  • 发送者发布者不是计划发送消息给特定的接收者订阅者。而是发布的消息分到不同的频道不需要知道什么样的订阅者订阅。
  • 订阅者对一个或多个频道感兴趣只需接收感兴趣的消息不需要知道什么样的发布者发布的。

这种发布者和订阅者的解耦合可以带来更大的扩展性和更加动态的网络拓扑。

发布及订阅功能通过redis client 使用参考

# 订阅一个redisChat频道
 
redis 127.0.0.1:6379> SUBSCRIBE redisChat  
Reading messages... (press Ctrl-C to quit) 
1) "subscribe" 
2) "redisChat" 
3) (integer) 1
 
#发布消息到redisChat频道发布成功后订阅者会收到信息
redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"  
(integer) 1  
redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by yiibai"  
(integer) 1   
1) "message" 
2) "redisChat" 
3) "Redis is a great caching technique" 
1) "message" 
2) "redisChat" 
3) "Learn redis by yiibai"

Redis发布订阅命令

PUBLISH channel message	        #将信息发送到指定的频道。
SUBSCRIBE channel [channel …]	#订阅给定的一个或多个频道的信息。
UNSUBSCRIBE [channel [channel …]]	#退订给定的频道。
 
PSUBSCRIBE pattern [pattern …]	#订阅一个或多个符合给定模式的频道。根据模式来订阅可以订阅许多频道
PUNSUBSCRIBE [pattern [pattern …]]	#退订所有给定模式的频道。
 
PUBSUB subcommand [argument [argument …]]	#查看订阅与发布系统状态。

关于Java的API使用封装的Jedis API或者 Redssion API 即可。

方法16: Distruptor 框架异步

Disruptor是一个优秀的并发框架使用无锁编程+环形队列架构 是高性能异步队列的王者组件

Disruptor 可以使用在 非常多 的生产者 消费者 异步场景

  • 单生产者多消费者场景
  • 多生产者单消费者场景
  • 单生产者多消费者场景
  • 多个消费者串行消费场景
  • 菱形方式执行场景
  • 链式并行执行场景
  • 多组消费者相互隔离场景
  • 多组消费者航道执行模式

下面 以 单生产者多消费者并行场景为例看看场景介绍。

在并发系统中提高性能最好的方式之一就是单一写者原则对Disruptor也是适用的。

如果在生产者单消费者 需求中仅仅有一个事件生产者那么可以设置为单一生产者模式来提高系统的性能。

在这里插入图片描述

其他场景非常多具体请参考

请参考 尼恩的 《Disruptor 框架 红宝书》

方法17ForkJoin 框架异步

Fork/Join框架是JDK1.7提供的一个并行任务执行框架它可以把一个大任务分成多个可并行执行的子任务然后合并每个子任务的结果得到的大任务的结果。

有点类似Hadoop的MapReduceFork/Join框架也可以分成两个核心操作

  • Fork操作将大任务分割成若干个可以并行执行的子任务
  • Join操作合并子任务的执行结果

这里的内容也是非常多具体请参考 《Java 高并发核心 编程 卷2 加强版》 8.3小节。 示意图如下

在这里插入图片描述

其工作窃取算法非常有价值

尼恩在指导小伙伴写简历的时候多次用到 工作窃取算法

方法18RocketMQ源码中ServiceThread 能急能缓的高性能异步

RocketMQ源码中, 实现了一种特殊的高性能异步: 能急能缓 ServiceThread 异步

能急能缓 ServiceThread 异步 有两个特点

  • 既能周期性的执行异步任务
  • 还能紧急的时候执行应急性的任务

RocketMQ的吞吐量达到 70WqpsServiceThread 的异步框架发挥了重要的价值。

尼恩 在 实现《10WQps推送中台实操》使用了 ServiceThread 异步框架在 实现《100WQps三级缓存组件实操》,实操时也使用了 ServiceThread 异步框架。

总之 ServiceThread 异步框架是RocketMq的精华之一性能非常高也 非常好用。

有关 ServiceThread 异步框架的内容请参见 尼恩 《穿透 RocketMq 架构和源码》视频。

内容太多还涉及到 RocketMQ源码对闭锁的改造所以在这里不做赘述。

方式19Kotlin 协程 异步

20 18 年一种新的 JVM 编程语言开始流行Kotlin。

Kotlin 首先流行在 Android 开发领域因为它得到了 Google 的首肯和支持。

但对于后端开发领域因为一项特性使得 Kotlin 也非常值得注意。那就是 Kotlin Coroutine后文称 Kotlin 协程。

协程技术不是什么新技术它在很多语言中都有实现比如大家所熟悉的 Python、Lua、Go 都是支持协程的。

在不同语言中协程的实现方法各有不同。因为 Kotlin 的运行依赖于 JVM不能对 JVM 进行修改因此Kotlin 不能在底层支持协程。

同时Kotlin 是一门编程语言需要在语言层面支持协程而不是像框架那样在语言层面之上支持。

因此Kotlin 对协程支持最核心的部分是在编译器中。因为对这部分原理的解释在之前文章中都有涉及因此不在这里重复。

使用 Kotlin 协程之后最大的好处是异步代码的可读性大大提高。

如果上一个示例用 Kotlin 协程实现那就是下面的样子

 @GetMapping("/coroutine/{personId}")
fun getNumberOfMessages(@PathVariable personId: String) = mono(Unconfined) {
    val person = peopleRepository.findById(personId).awaitFirstOrDefault(null)
            ?: throw NoSuchElementException("No person can be found by $personId")

    val lastLoginDate = auditRepository.findByEmail(person.email).awaitSingle().eventDate

    val numberOfMessages =
            messageRepository.countByMessageDateGreaterThanAndEmail(lastLoginDate, person.email).awaitSingle()

    "Hello ${person.name}, you have $numberOfMessages messages since $lastLoginDate"
}

这里的功能是查询最近邮件数Kotlin 协程版

目前在 Spring 应用中使用 Kotlin 协程还有些小繁琐但在 Spring Boot 2.2 中可以直接在 Spring WebFlux 方法上使用 suspend 关键字。

Kotlin 协程最大的意义就是可以用看似指令式编程方式Imperative Programming即传统编程方式去写异步编程代码。并发和代码可读性似乎两全其美了。

Kotlin 协程的局限性

Kotlin 协程依赖于各种基于 Callback 的技术

所以只有当一段代码使用了 ListenableFuture、CompletableFuture、Project Reactor、RxJava 等技术时才能用 Kotlin 协程进行改造优化。

那对于其它的会阻塞线程的技术如 Object.wait、Thread.sleep、Lock、BIO 等Kotlin 协程就无能为力了。

另外一个局限性源于 Kotlin 本身。

虽然 Kotlin 兼容 Java但这种兼容并非完美。

因此对于组件尤其是基础组件的开发并不推荐使用 Kotlin而是更推荐使用 Java。这也导致 Kotlin 协程的使用范围被进一步地限制。

方式20Project Loom

前面讲到虽然 Kotlin 协程看上去很好但在使用上还是有着种种限制。那有没有更好的选择呢

答案是 Project Loom (https://openjdk.java.net/projects/loom/))。

这个项目在 18 年底的时候已经达到可初步演示的原型阶段。

不同于之前的方案Project Loom 是从 JVM 层面对多线程技术进行彻底的改变。

下面这幅图很好地展示了目前 Java 并发编程方面的困境简单的代码并发、伸缩能力差并发、伸缩能力强的代码复杂难以与现有代码整合。

img

各种框架或其它 JVM 编程语言的解决方案都在使用场景上有限制。

例如 Kotlin 协程必须基于各种 Callback 技术而 Callback 技术有存在编写、调试困难的问题。为了使 Java 并发能力在更大范围上得到提升从底层进行改进便是必然。

为了让简单和高并发这两个目标兼得我们需要 Project Loom 这个项目。

Project Loom 设计思想与之前的一个开源 Java 协程技术非常相似。而现在 Project Loom 的主要设计开发人员 Ron Pressler 就是来自 Quasar Fiber。

这个开源技术就是 Quasar Fiber https://docs.paralleluniverse.co/quasar/

Project Loom 的被发起原因也很简单长期以来Java 的线程是与操作系统的线程一一对应的这限制了 Java 平台并发能力的提升。

Project Loom原理

Project Loom 引入了 虚拟线程作为 java.lang.Thread 的实例, 这是一种的轻量级用户模式线程

虚拟线程 可以理解为 操作系统 平台线程的一个任务。操作系统 平台线程很庞大并且依赖于操作系统。

Java 不能随意改进平台线程操作系统直接将这些线程分配给处理器。

有了虚拟线程后 JDK 的调度程序将虚拟线程 提交到 平台线程操作系统像往常一样将其分配给处理器。

那么虚拟线程如何工作

我们都知道阻塞线程是邪恶的会对应用程序的性能产生负面影响。

使用虚拟线程之后当虚拟线程阻塞 I/O 或 JDK 中的某些阻塞操作时平台线程就不会阻塞了。

怎么做的呢 当虚拟线程阻塞 I/O 或 JDK 中的某些阻塞操作时例如 BlockingQueue.take()它会自动从平台线程中卸载 或者说解除合作关系。JDK 的调度程序可以在这个空闲的平台线程上挂载和运行其他虚拟线程。

当阻塞操作准备好完成时它会将虚拟线程提交回调度程序调度程序会将虚拟线程挂载到可用的平台线程上以恢复执行。

当然平台线程 不必 虚拟线程一一对应更像是 原始的线程和任务的关系。

因此我们现在可以构建具有高吞吐量的高并发应用程序而无需增加线程数量默认情况下虚拟线程的 Executor 将使用与可用处理器数量一样多的平台线程。

使用方法

在引入 Project Loom 之后JDK 将引入一个新类java.lang.Fiber

此类与 java.lang.Thread 一起都成为了 java.lang.Strand 的子类。

即线程变成了一个虚拟的概念有两种实现方法Fiber 所表示的轻量线程和 Thread 所表示的传统的重量级线程。

对于应用开发人员使用 Project Loom 很简单

 Fiber f = Fiber.schedule(() -> {
        println("Hello 1");
        lock.lock(); // 等待锁不会挂起线程
        try {
            println("Hello 2");
        } finally {
            lock.unlock();
        }
        println("Hello 3");
    })

只需执行 Fiber.schedule(Runnable task) 就能在 Fiber 中执行任务。

最重要的是上面例子中的 lock.lock() 操作将不再挂起底层线程。

除了 Lock 不再挂起线程以外像 Socket BIO 操作也不再挂起线程。

synchronized以及 Native 方法中线程挂起操作无法避免。

   synchronized (monitor) {
// 在 Fiber 中调用这条语句还是会挂起线程。
    socket.getInputStream().read();
}

如上所示Fiber 的使用非常简单。因此让现有系统使用 Project Loom 很容易。

像 Tomcat、Jetty 这样的 Web 容器只需将处理请求操作从使用 ThreadPoolExecutor execute 或 submit 改为使用 Fiber schedule 即可。

这个视频 https://www.youtube.com/watch?v=vbGbXUjlRyQ&t=1240s 中的 Demo 展示了 Jetty 使用 Project Loom 改造之后并发吞吐能力的大幅提升。

关于 Project Loom 的实操和性能对比尼恩后面通过专题视频给大家进行介绍。

参考文献

通过事件总线EventBus/AsyncEventBus进行JAVA模块解耦 史上最全

队列之王 Disruptor 原理、架构、源码 一文穿透

https://blog.csdn.net/crazymakercircle/article/details/128264803

https://xie.infoq.cn/article/deba3305beba3838c2cf6c102

推荐阅读

网易二面CPU狂飙900%该怎么处理

阿里二面千万级、亿级数据如何性能优化 教科书级 答案来了

峰值21WQps、亿级DAU小游戏《羊了个羊》是怎么架构的

场景题假设10W人突访你的系统如何做到不 雪崩

2个大厂 100亿级 超大流量 红包 架构方案

Nginx面试题史上最全 + 持续更新

K8S面试题史上最全 + 持续更新

操作系统面试题史上最全、持续更新

Docker面试题史上最全 + 持续更新

Springcloud gateway 底层原理、核心实战 (史上最全)

Flux、Mono、Reactor 实战史上最全

sentinel 史上最全

Nacos (史上最全)

TCP协议详解 (史上最全)

分库分表 Sharding-JDBC 底层原理、核心实战史上最全

clickhouse 超底层原理 + 高可用实操 史上最全

nacos高可用图解+秒懂+史上最全

队列之王 Disruptor 原理、架构、源码 一文穿透

环形队列、 条带环形队列 Striped-RingBuffer 史上最全

一文搞定SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系史上最全

单例模式史上最全

红黑树 图解 + 秒懂 + 史上最全

分布式事务 秒懂

缓存之王Caffeine 源码、架构、原理史上最全10W字 超级长文

缓存之王Caffeine 的使用史上最全

Java Agent 探针、字节码增强 ByteBuddy史上最全

Docker原理图解+秒懂+史上最全

Redis分布式锁图解 - 秒懂 - 史上最全

Zookeeper 分布式锁 - 图解 - 秒懂

Zookeeper Curator 事件监听 - 10分钟看懂

Netty 粘包 拆包 | 史上最全解读

Netty 100万级高并发服务器配置

Springcloud 高并发 配置 一文全懂》## 推荐阅读

网易二面CPU狂飙900%该怎么处理

阿里二面千万级、亿级数据如何性能优化 教科书级 答案来了

峰值21WQps、亿级DAU小游戏《羊了个羊》是怎么架构的

场景题假设10W人突访你的系统如何做到不 雪崩

2个大厂 100亿级 超大流量 红包 架构方案

Nginx面试题史上最全 + 持续更新

K8S面试题史上最全 + 持续更新

操作系统面试题史上最全、持续更新

Docker面试题史上最全 + 持续更新

Springcloud gateway 底层原理、核心实战 (史上最全)

Flux、Mono、Reactor 实战史上最全

sentinel 史上最全

Nacos (史上最全)

TCP协议详解 (史上最全)

分库分表 Sharding-JDBC 底层原理、核心实战史上最全

clickhouse 超底层原理 + 高可用实操 史上最全

nacos高可用图解+秒懂+史上最全

队列之王 Disruptor 原理、架构、源码 一文穿透

环形队列、 条带环形队列 Striped-RingBuffer 史上最全

一文搞定SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系史上最全

单例模式史上最全

红黑树 图解 + 秒懂 + 史上最全

分布式事务 秒懂

缓存之王Caffeine 源码、架构、原理史上最全10W字 超级长文

缓存之王Caffeine 的使用史上最全

Java Agent 探针、字节码增强 ByteBuddy史上最全

Docker原理图解+秒懂+史上最全

Redis分布式锁图解 - 秒懂 - 史上最全

Zookeeper 分布式锁 - 图解 - 秒懂

Zookeeper Curator 事件监听 - 10分钟看懂

Netty 粘包 拆包 | 史上最全解读

Netty 100万级高并发服务器配置

Springcloud 高并发 配置 一文全懂

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6