初步了解高性能队列——Disruptor(Java)

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

高性能队列——Disruptor

① 概述

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列研发的初衷是解决内部的内存队列的延迟问题而不是分布式队列。基于Disruptor开发的系统单线程能支撑每秒600万订单2010年在QCon演讲后获得了业界关注。

传统队列存在的问题

队列有界性结构队列类型
ArrayBlockingQueue有界加锁数组阻塞
LinkedBlockingQueue可选加锁链表阻塞
ConcurrentLinkedQueue无界无锁链表非阻塞
LinkedTransferQueue无界无锁链表阻塞
PriorityBlockingQueue无界加锁阻塞
DelayQueue无界加锁阻塞

队列的底层数据结构一般分成三种数组、链表和堆。其中堆这里是为了实现带有优先级特性的队列暂且不考虑。
在稳定性和性能要求特别高的系统中为了防止生产者速度过快导致内存溢出只能选择有界队列同时为了减少Java的垃圾回收对系统性能的影响会尽量选择array/heap格式的数据结构。这样筛选下来符合条件的队列就只有ArrayBlockingQueue。但是ArrayBlockingQueue是通过加锁的方式保证线程安全而且ArrayBlockingQueue还存在伪共享问题这两个问题严重影响了性能。


缓存行

Cache是由很多个cache line组成的。每个cache line通常是64字节并且它有效地引用主内存中的一块儿地址。一个Java的long类型变量是8字节因此在一个缓

存行中可以存8个long类型的变量。

CPU每次从主存中拉取数据时会把相邻的数据也存入同一个cache line。

在访问一个long数组的时候如果数组中的一个值被加载到缓存中它会自动加载另外7个。因此你能非常快的遍历这个数组。

伪共享

ArrayBlockingQueue有三个成员变量 - takeIndex需要被取走的元素下标 - putIndex可被元素插入的位置的下标 - count队列中元素的数量

这三个变量很容易放到一个缓存行中但是之间修改没有太多的关联。所以每次修改都会使之前缓存的数据失效从而不能完全达到共享的效果。

在这里插入图片描述

如上图所示当生产者线程put一个元素到ArrayBlockingQueue时putIndex会修改从而导致消费者线程的缓存中的缓存行无效需要从主存中重新读取。

这种无法充分使用缓存行特性的现象称为伪共享。

对于伪共享一般的解决方案是增大数组元素的间隔使得由不同线程存取的元素位于不同的缓存行上以空间换时间。

② 高性能的原因

Disruptor通过以下设计来解决队列速度慢的问题

  • 环形数组结构

    为了避免垃圾回收采用数组而非链表。同时数组对处理器的缓存机制更加友好。数组元素不会被回收避免频繁的GC。

  • 元素位置定位

    数组长度2^n通过位运算加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型即使100万QPS的处理速度也需要30万年才能用完。

  • 无锁设计

    每个生产者或者消费者线程会先申请可以操作的元素在数组中的位置申请到之后直接在该位置写入或者读取数据。采用CAS无锁方式保证线程的安全性

环形数组

  • 因为是数组所以要比链表快而且根据我们对上面缓存行的解释知道数组中的一个元素加载相邻的数组元素也是会被预加载的因此在这样的结构中cpu无需时不时去主存加载数组中的下一个元素。而且你可以为数组预先分配内存使得数组对象一直存在除非程序终止。这就意味着不需要花大量的时间用于垃圾回收。此外不像链表那样需要为每一个添加到其上面的对象创造节点对象—对应的当删除节点时需要执行相应的内存清理操作。环形数组中的元素采用覆盖方式避免了jvm的GC。
  • 其次结构作为环形数组的大小为2的n次方这样元素定位可以通过位运算效率会更高这个跟一致性哈希中的环形策略有点像。在disruptor中这个环形结构就是RingBuffer既然是数组那么就有大小而且这个大小必须是2的n次方。
  • 实质只是一个普通的数组只是当放置数据填充满队列即到达2^n-1位置之后再填充数据就会从0开始覆盖之前的数据于是就相当于一个环

生产和消费模式

在Disruptor中生产者分为单生产者和多生产者而消费者并没有区分。单生产者情况下就是普通的生产者向RingBuffer中放置数据消费者获取最大可消费

位置并进行消费。而多生产者时候又多出了一个跟RingBuffer同样大小的Buffer称为AvailableBuffer。在多生产者中每个生产者首先通过CAS竞争获取可

以写的空间然后再进行慢慢往里放数据如果正好这个时候消费者要消费数据那么每个消费者都需要获取最大可消费的下标这个下标是在AvailableBuffer进

行获取得到的最长连续的序列下标。

单生产者

生产者单线程写数据的流程比较简单

  1. 申请写入m个元素
  2. 若是有m个元素可以入则返回最大的序列号。这儿主要判断是否会覆盖未读的元素
  3. 若是返回的正确则生产者开始写入元素。

在这里插入图片描述

多生产者

Disruptor在多个生产者的情况下引入了一个与Ring Buffer大小相同的bufferavailable Buffer。当某个位置写入成功的时候便把availble Buffer相应的位置置位标记为写入成功。读取的时候会遍历available Buffer来判断元素是否已经就绪。

多个生产者写入的时候

  1. 申请写入m个元素
  2. 若是有m个元素可以写入则返回最大的序列号。每个生产者会被分配一段独享的空间
  3. 生产者写入元素写入元素的同时设置available Buffer里面相应的位置以标记自己哪些位置是已经写入成功的。

如下图所示Writer1和Writer2两个线程写入数组都申请可写的数组空间。Writer1被分配了下标3到下表5的空间Writer2被分配了下标6到下标9的空间。

Writer1写入下标3位置的元素同时把available Buffer相应位置置位标记已经写入成功往后移一位开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

在这里插入图片描述

牛逼的下标指针

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding
{...}

RingBuffer的指针Sequence属于一个volatile变量同时也是我们能够不用锁操作就能实现Disruptor的原因之一而且通过缓存行补充避免伪共享问题。 该所谓指针是通过一直自增的方式来获取下一个可写或者可读数据该数据是Long类型不用担心会爆掉。

③ demo

  • 定义一个简单的消息体

    /**
     * 消息体
     */
    @Data
    public class MessageModel {
        private String message;
    }
    
  • 创建构造消息的工厂需要继承EventFactory 用于初始化RingBuffer

    /**
     * 构造EventFactory 用于初始化RingBuffer
     */
    public class HelloEventFactory implements EventFactory<MessageModel> {
        // 初始化RingBuffer
        @Override
        public MessageModel newInstance() {
            return new MessageModel();
        }
    }
    
  • 创建消费者

    /**
     * 构造EventHandler-消费者
     */
    @Slf4j
    public class HelloEventHandler implements EventHandler<MessageModel> {
        @Override
        public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
            try {
                //这里停止1000ms是为了确定消费消息是异步的
                Thread.sleep(1000);
                log.info("消费者处理消息开始");
                if (event != null) {
                    log.info("消费者消费的信息是{}",event);
                }
            } catch (Exception e) {
                log.info("消费者处理消息失败");
            }
            log.info("消费者处理消息结束");
        }
    }
    
  • 配置Disruptor

    /**
     * 配置Disruptor
     */
    @Configuration
    public class MQManager {
    
    
        @Bean("messageModel")
        public RingBuffer<MessageModel> messageModelRingBuffer() {
            // 定义用于事件处理的线程池 Disruptor通过java.util.concurrent.ExecutorService提供的线程来触发consumer的事件处理
            ExecutorService executor = Executors.newFixedThreadPool(2);
    
            // 指定事件工厂
            HelloEventFactory factory = new HelloEventFactory();
    
            // 指定RingBuffer字节大小必须为2的N次方通过位运算加快定位的速度否则将影响效率
            // 当放置数据填充满队列即到达2^n-1位置之后再填充数据就会从0开始覆盖之前的数据
            int bufferSize = 4;
    
            // 单生产者模式获取额外的性能
            // 创建disruptor方式一使用自定义的线程池创建已弃用
            Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                    ProducerType.SINGLE, new BlockingWaitStrategy());
    
            // 创建disruptor方式二传入线程工厂
            // 生产者的线程工厂
    //        ThreadFactory threadFactory = Executors.defaultThreadFactory();
    //        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, threadFactory,
    //                ProducerType.SINGLE, new BlockingWaitStrategy());
            // 设置事件业务处理器---消费者
            disruptor.handleEventsWith(new HelloEventHandler());
    
            // 启动disruptor线程
            disruptor.start();
    
            // 获取ringbuffer环用于接取生产者生产的事件
            RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
    
            return ringBuffer;
        }
    }
    
  • 创建生产者生产消息

    public interface DisruptorMqService {
    
        /**
         * 消息
         * @param message
         */
        void sayHelloMq(String message) throws InterruptedException;
    }
    
    
    
    
    @Slf4j
    @Service
    public class DisruptorMqServiceImpl implements DisruptorMqService {
    
        @Autowired
        private RingBuffer<MessageModel> messageModelRingBuffer;
    
        @Override
        public void sayHelloMq(String message) throws InterruptedException {
            for (int l = 0; true; l++) {
                log.info("record the message: {}", message);
                //获取下一个Event槽的下标
                long sequence = messageModelRingBuffer.next();
                log.info("sequence = {}", sequence);
                try {
                    //给Event填充数据
                    MessageModel event = messageModelRingBuffer.get(sequence);
                    event.setMessage(message + l);
                    log.info("往消息队列中添加消息{}", event);
                } catch (Exception e) {
                    log.error("failed to add event to messageModelRingBuffer for : e = {},{}", e, e.getMessage());
                } finally {
                    //发布Event激活观察者去消费将sequence传递给改消费者
                    //注意最后的publish方法必须放在finally中以确保必须得到调用如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
                    messageModelRingBuffer.publish(sequence);
                }
                Thread.sleep(5000);
            }
    
        }
    }
    
  • 测试

    @Autowired
    private DisruptorMqService disruptorMqService;
    
    /**
         * 项目内部使用Disruptor做消息队列
         *
         * @throws Exception
         */
    @Test
    public void sayHelloMqTest() throws Exception {
        disruptorMqService.sayHelloMq("消息到了Hello world!");
        log.info("消息队列已发送完毕");
        // 这里停止2000ms是为了确定是处理消息是异步的
        Thread.sleep(2000);
    }
    
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java