Day859.高性能队列Disruptor -Java 并发编程实战

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

高性能队列Disruptor

Hi我是阿昌今天学习记录的是关于高性能队列Disruptor的内容。

并发容器 中Java SDK 提供了 2 个有界队列

  • ArrayBlockingQueue
  • LinkedBlockingQueue

它们都是基于 ReentrantLock 实现的在高并发场景下锁的效率并不高那有没有更好的替代品呢有一种性能更高的有界队列Disruptor

Disruptor 是一款高性能的有界内存队列目前应用非常广泛Log4j2、Spring Messaging、HBase、Storm 都用到了 Disruptor那 Disruptor 的性能为什么这么高呢

Disruptor 项目团队曾经写过一篇论文详细解释了其原因可以总结为如下

  1. 内存分配更加合理使用 RingBuffer 数据结构数组元素在初始化时一次性全部创建提升缓存命中率
  2. 对象循环利用避免频繁 GC
  3. 能够避免伪共享提升缓存利用率
  4. 采用无锁算法避免频繁加锁、解锁的性能消耗
  5. 支持批量消费消费者可以无锁方式消费多个消息

其中前三点涉及到的知识比较多重点讲解前三点先来聊聊 Disruptor 如何使用。

下面的代码出自官方示例略做了一些修改相较而言Disruptor 的使用比 Java SDK 提供 BlockingQueue 要复杂一些但是总体思路还是一致的其大致情况如下

  • 在 Disruptor 中生产者生产的对象也就是消费者消费的对象称为 Event使用 Disruptor 必须自定义 Event例如示例代码的自定义 Event 是 LongEvent
  • 构建 Disruptor 对象除了要指定队列大小外还需要传入一个 EventFactory示例代码中传入的是LongEvent::new
  • 消费 Disruptor 中的 Event 需要通过 handleEventsWith() 方法注册一个事件处理器发布 Event 则需要通过 publishEvent() 方法。
//自定义Event
class LongEvent {
  private long value;
  public void set(long value) {
    this.value = value;
  }
}
//指定RingBuffer大小,
//必须是2的N次方
int bufferSize = 1024;

//构建Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(
    LongEvent::new,
    bufferSize,
    DaemonThreadFactory.INSTANCE);

//注册事件处理器
disruptor.handleEventsWith(
  (event, sequence, endOfBatch) ->
    System.out.println("E: "+event));

//启动Disruptor
disruptor.start();

//获取RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//生产Event
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++){
  bb.putLong(0, l);
  //生产者生产消息
  ringBuffer.publishEvent(
    (event, sequence, buffer) -> 
      event.set(buffer.getLong(0)), bb);
  Thread.sleep(1000);
}

一、RingBuffer 如何提升性能

Java SDK 中 ArrayBlockingQueue 使用数组作为底层的数据存储而 Disruptor 是使用 RingBuffer 作为数据存储。

RingBuffer 本质上也是数组所以仅仅将数据存储从数组换成 RingBuffer 并不能提升性能但是 Disruptor 在 RingBuffer 的基础上还做了很多优化其中一项优化就是和内存分配有关的。


先了解一下程序的局部性原理。简单来讲程序的局部性原理指的是在一段时间内程序的执行会限定在一个局部范围内。这里的“局部性”可以从两个方面来理解

  • 一个是时间局部性
  • 另一个是空间局部性

时间局部性指的是 程序中的某条指令一旦被执行不久之后这条指令很可能再次被执行如果某条数据被访问不久之后这条数据很可能再次被访问。

空间局部性指某块内存一旦被访问不久之后这块内存附近的内存也很可能被访问

CPU 的缓存就利用了程序的局部性原理CPU 从内存中加载数据 X 时会将数据 X 缓存在高速缓存 Cache 中实际上 CPU 缓存 X 的同时还缓存了 X 周围的数据因为根据程序具备局部性原理X 周围的数据也很有可能被访问。

从另外一个角度来看如果程序能够很好地体现出局部性原理也就能更好地利用 CPU 的缓存从而提升程序的性能。

Disruptor 在设计 RingBuffer 的时候就充分考虑了这个问题下面对比着 ArrayBlockingQueue 来分析一下。

首先是 ArrayBlockingQueue。生产者线程向 ArrayBlockingQueue 增加一个元素每次增加元素 E 之前都需要创建一个对象 E如下图所示ArrayBlockingQueue 内部有 6 个元素这 6 个元素都是由生产者线程创建的由于创建这些元素的时间基本上是离散的所以这些元素的内存地址大概率也不是连续的

ArrayBlockingQueue 内部结构图

再看看 Disruptor 是如何处理的。Disruptor 内部的 RingBuffer 也是用数组实现的但是这个数组中的所有元素在初始化时是一次性全部创建的所以这些元素的内存地址大概率是连续的相关的代码如下所示。

for (int i=0; i<bufferSize; i++){
  //entries[]就是RingBuffer内部的数组
  //eventFactory就是前面示例代码中传入的LongEvent::new
  entries[BUFFER_PAD + i] 
    = eventFactory.newInstance();
}

Disruptor 内部 RingBuffer 的结构可以简化成下图那问题来了数组中所有元素内存地址连续能提升性能吗能为什么呢因为消费者线程在消费的时候是遵循空间局部性原理的消费完第 1 个元素很快就会消费第 2 个元素

当消费第 1 个元素 E1 的时候CPU 会把内存中 E1 后面的数据也加载进 Cache如果 E1 和 E2 在内存中的地址是连续的那么 E2 也就会被加载进 Cache 中然后当消费第 2 个元素的时候由于 E2 已经在 Cache 中了所以就不需要从内存中加载了这样就能大大提升性能。
Disruptor 内部 RingBuffer 结构图
除此之外在 Disruptor 中生产者线程通过 publishEvent() 发布 Event 的时候并不是创建一个新的 Event而是通过 event.set() 方法修改 Event 也就是说 RingBuffer 创建的 Event 是可以循环利用的这样还能避免频繁创建、删除 Event 导致的频繁 GC 问题。


二、如何避免“伪共享”

高效利用 Cache能够大大提升性能所以要努力构建能够高效利用 Cache 的内存结构。而从另外一个角度看努力避免不能高效利用 Cache 的内存结构也同样重要。

有一种叫做“伪共享False sharing”的内存布局就会使 Cache 失效那什么是“伪共享”呢

伪共享和 CPU 内部的 Cache 有关Cache 内部是按照缓存行Cache Line管理的缓存行的大小通常是 64 个字节

CPU 从内存中加载数据 X会同时加载 X 后面64-size(X)个字节的数据。下面的示例代码出自 Java SDK 的 ArrayBlockingQueue其内部维护了 4 个成员变量分别是队列数组 items、出队索引 takeIndex、入队索引 putIndex 以及队列中的元素总数 count。

/** 队列数组 */
final Object[] items;
/** 出队索引 */
int takeIndex;
/** 入队索引 */
int putIndex;
/** 队列中元素总数 */
int count;

当 CPU 从内存中加载 takeIndex 的时候会同时将 putIndex 以及 count 都加载进 Cache。

下图是某个时刻 CPU 中 Cache 的状况为了简化缓存行中仅列出了 takeIndex 和 putIndex。

CPU 缓存示意图
假设线程 A 运行在 CPU-1 上执行入队操作入队操作会修改 putIndex而修改 putIndex 会导致其所在的所有核上的缓存行均失效此时假设运行在 CPU-2 上的线程执行出队操作出队操作需要读取 takeIndex由于 takeIndex 所在的缓存行已经失效所以 CPU-2 必须从内存中重新读取。入队操作本不会修改 takeIndex但是由于 takeIndex 和 putIndex 共享的是一个缓存行就导致出队操作不能很好地利用 Cache这其实就是伪共享。

简单来讲伪共享指的是 由于共享缓存行导致缓存无效的场景

ArrayBlockingQueue 的入队和出队操作是用锁来保证互斥的所以入队和出队不会同时发生。如果允许入队和出队同时发生那就会导致线程 A 和线程 B 争用同一个缓存行这样也会导致性能问题。

所以为了更好地利用缓存必须避免伪共享那如何避免呢

CPU 缓存失效示意图

方案很简单每个变量独占一个缓存行、不共享缓存行就可以了具体技术是缓存行填充

比如想让 takeIndex 独占一个缓存行可以在 takeIndex 的前后各填充 56 个字节这样就一定能保证 takeIndex 独占一个缓存行。下面的示例代码出自 DisruptorSequence 对象中的 value 属性就能避免伪共享因为这个属性前后都填充了 56 个字节。

Disruptor 中很多对象例如 RingBuffer、RingBuffer 内部的数组都用到了这种填充技术来避免伪共享。

//前填充56字节
class LhsPadding{
    long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding{
    volatile long value;
}
//后填充56字节
class RhsPadding extends Value{
    long p9, p10, p11, p12, p13, p14, p15;
}
class Sequence extends RhsPadding{
  //省略实现
}

三、Disruptor 中的无锁算法

ArrayBlockingQueue 是利用管程实现的中规中矩生产、消费操作都需要加锁实现起来简单但是性能并不十分理想。

Disruptor 采用的是无锁算法很复杂但是核心无非是生产和消费两个操作。

Disruptor 中最复杂的是入队操作所以重点来看看入队操作是如何实现的。

对于入队操作最关键的要求是不能覆盖没有消费的元素

对于出队操作最关键的要求是不能读取没有写入的元素所以 Disruptor 中也一定会维护类似出队索引和入队索引这样两个关键变量。

Disruptor 中的 RingBuffer 维护了入队索引但是并没有维护出队索引这是因为在 Disruptor 中多个消费者可以同时消费每个消费者都会有一个出队索引所以 RingBuffer 的出队索引是所有消费者里面最小的那一个。

下面是 Disruptor 生产者入队操作的核心代码看上去很复杂其实逻辑很简单

如果没有足够的空余位置就出让 CPU 使用权然后重新计算

反之则用 CAS 设置入队索引。

//生产者获取n个写入位置
do {
  //cursor类似于入队索引指的是上次生产到这里
  current = cursor.get();
  //目标是在生产n个
  next = current + n;
  //减掉一个循环
  long wrapPoint = next - bufferSize;
  //获取上一次的最小消费位置
  long cachedGatingSequence = gatingSequenceCache.get();
  //没有足够的空余位置
  if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){
    //重新计算所有消费者里面的最小值位置
    long gatingSequence = Util.getMinimumSequence(
        gatingSequences, current);
    //仍然没有足够的空余位置出让CPU使用权重新执行下一循环
    if (wrapPoint > gatingSequence){
      LockSupport.parkNanos(1);
      continue;
    }
    //从新设置上一次的最小消费位置
    gatingSequenceCache.set(gatingSequence);
  } else if (cursor.compareAndSet(current, next)){
    //获取写入位置成功跳出循环
    break;
  }
} while (true);

四、总结

Disruptor 在优化并发性能方面可谓是做到了极致优化的思路大体是两个方面

  • 一个是利用无锁算法避免锁的争用
  • 另外一个则是将硬件CPU的性能发挥到极致。

尤其是后者在 Java 领域基本上属于经典之作了。

发挥硬件的能力一般是 C 这种面向硬件的语言常干的事儿C 语言领域经常通过调整内存布局优化内存占用而 Java 领域则用的很少原因在于 Java 可以智能地优化内存布局内存布局对 Java 程序员的透明的。

这种智能的优化大部分场景是很友好的但是如果想通过填充方式避免伪共享就必须绕过这种优化关于这方面 Disruptor 提供了经典的实现可以参考。

由于伪共享问题如此重要所以 Java 也开始重视它了比如 Java 8 中提供了避免伪共享的注解@sun.misc.Contended通过这个注解就能轻松避免伪共享需要设置 JVM 参数 -XX:-RestrictContended

不过避免伪共享是以牺牲内存为代价的所以具体使用的时候还是需要仔细斟酌。


单机提升性能不外乎是围绕CPU内存和IO想办法。

CPU:

  • 1.避免线程切换单线程对于多线程进行线程绑定使用CAS无锁技术
  • 2.利用CPU缓存还有缓存填充设计数据结构和算法

内存

  • 1.多级缓存应用缓存第三方缓存系统缓存
  • 2.数组优于链表连续的内容地址
  • 3.避免频繁内存碎片利用池思想复用对象

解决IO产生的速度差:

  • 1.多路复用
  • 2.队列削峰
  • 3.协程

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java