Disruptor 高性能队列

Disruptor 是由 Java 编写的基于内存的高性能队列,用于在异步事件处理体系结构中提供低延迟,高吞吐量的工作队列。与ArrayBlockingQueue 一样,它通常用于多个线程间的消息传递(常见场景就是生产者-消费者模型),Disruptor 从功能、性能都远好于 ArrayBlockingQueue ,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用 Disruptor 作为 ArrayBlockingQueue 的替代者。

常见队列及底层实现

JDK中常见的线程安全的内置队列如下,队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是 ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成 LinkedBlockingQueue 和 ConcurrentLinkedQueue 两大类,前者也通过锁的方式来实现线程安全,而后者通过 CAS 实现。通过 CAS 方式实现的队列都是无界的,在稳定性要求特别高的系统中,为了防止生产者速度过快导致的内存溢出,同时也为了减少 GC 对系统性能的影响,会尽量选择数组/堆这样的数据结构,ArrayBlockingQueue 似乎成了唯一选择。

LinkedBlockingQueue 低性能的真凶

1、加锁的性能损耗

Disruptor 论文中讲述了一个实验:

  • 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
  • 机器环境:2.4G 6核
  • 运算: 64位的计数器累加5亿次
Method Time (ms)
Single thread 300
Single thread with CAS 5,700
Single thread with lock 10,000
Single thread with volatile write 4,700
Two threads with CAS 30,000
Two threads with lock 224,000

CAS操作比单线程无锁慢了1个数量级,有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。
单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。
在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。

综上可知,加锁的性能是最差的。

ComparaAndSwap(CAS)

CAS 操作,要么比较并交换成功,要么比较并交换失败。由 CPU 保证原子性。CAS 操作包含三个操作数——内存位置(V)、期望值(A)和新值(B),如果内存位置的值与期望值匹配,那么处理器就会自动将该位置值更新为新值,否则,处理器不做任何操作;

CAS 通过 JNI(Java Native Interface) 的代码来实现;运行Java调用C/C++,而CompareAndSwapxxx系列的方法就是借助C语言来调用CPU底层指令实现的,以常用的Intel x86平台来说,最终映射到CPU的指令为cmpxchg,这是一个原子指令,CPU 执行此命令时,实现比较替换操作;

对于Intel x86平台来说,cmpxchg指令如何保证多核心下的线程安全呢?其实系统底层进行 CAS 操作的时候,会判断当前系统是否为多核心系统;如果是多核心,就给从总线加锁,加所之后只有一个线程对总线加所成功,加锁成功之后,才会执行CAS操作,也就是说CAS的原子性时平台级别的,无需考虑多核心 CPU 对 CAS 操作有啥影响。

另外一个值得注意的点就是 CAS 的 ABA 问题,CAS 需要在操作值的时候检查下值,有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,在CAS方法执行之前,被其他线程修改为B,然后又修改回了A,那么CAS方法执行检查的时候就会发现他的值没有修改,但是实际缺变化了,这就是 CAS 的 ABA 问题。如何解决呢?JDK 中使用了 AtomicStampedReference, AtomicStampedReference 主要包含一个对象引用以及一个可以自动更新的正数 stamp 的Pair对象来解决ABA问题;

JUC中的锁都依赖于同步器 AbstractQueuedSynchronizer,而 AbstractQueuedSynchronizer 的核心就是CAS操作。下面是 AbstractQueuedSynchronizer 同步队列的入队操作,也是一种乐观锁的实现,多线程情况下,操作头节点和尾节点都有可能失败,失败后会再次尝试,直到成功:

/**
  * Inserts node into queue, initializing if necessary. See picture above.
  * @param node the node to insert
  * @return node's predecessor
  */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
复制代码

2、伪共享问题

下图是CPU的3级缓存基本结构,L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。
img
当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中。

另外,线程之间共享一份数据的时候,需要一个线程把数据写回主存,而另一个线程访问主存中相应的数据。下面是从CPU访问不同层级数据的时间概念,可见CPU读取主存中的数据会比从L1中读取慢了近2个数量级:
img

Cache是由很多个缓存行组成的。每个缓存行通常是64字节,并且它有效地引用主内存中的一块儿地址。一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量。

CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个缓存行(这也是为什么选择数组不选择链表,在数组中可以依靠缓存行得到高效的访问)。在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个。因此可以非常快的遍历这个数组,也可以非常快速的遍历在连续内存块中分配的任意数据结构。下面的例子测试利用缓存行的特性和不利用缓存行的特性的效果对比:

package cn.tim;

public class CacheLineEffect {
    static long[][] arr;

    public static void main(String[] args) {
        arr = new long[1024 * 1024][];
        for (int i = 0; i < 1024 * 1024; i++) {
            arr[i] = new long[8];
            for (int j = 0; j < 8; j++) {
                arr[i][j] = 0L;
            }
        }
        long sum = 0L;
        long marked = System.currentTimeMillis();
        for (int i = 0; i < 1024 * 1024; i+=1) {
            for(int j =0; j< 8;j++){
                sum = arr[i][j];
            }
        }
        System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");

        marked = System.currentTimeMillis();
        for (int i = 0; i < 8; i+=1) {
            for(int j =0; j< 1024 * 1024;j++){
                sum = arr[j][i];
            }
        }
        System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");

        marked = System.currentTimeMillis();
        for (int i = 0; i < 8; i+=1) {
            for(int j =0; j< 1024 * 1024;j++){
                sum = arr[j][i];
            }
        }
    }
}
复制代码

img
img
但是缓存行也不是万能的,因为缓存行依然带来了一个缺点,我在这里举个例子说明这个缺点,可以想象有个数组队列,ArrayQueue,他的数据结构如下:

class ArrayQueue{
    long maxSize;
    long currentIndex;
    ......
}
复制代码

对于 maxSize 是我们一开始就定义好的,数组的大小,对于 currentIndex,是标志我们当前队列的位置,这个变化比较快,可以想象你访问 maxSize 的时候,是不是把 currentIndex 也加载进来了,这个时候,其他线程更新 currentIndex,就会把 CPU 中的缓存行置为无效,这是 CPU 规定的,他并不是只把 currentIndex 置为无效,而是整个缓存行,如果此时又继续访问 maxSize 他依然得继续从内存中读取,但是 maxSize 却是我们一开始定义好的,我们应该访问缓存即可,但是却被我们经常改变的 currentIndex 所影响,这个问题就是伪共享问题。

为了解决上面缓存行出现的问题,在 Disruptor 中采用了 Padding 的方式:

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
    protected volatile long value;
}

class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}
复制代码

其中的 Value 就被其他一些无用的 long 变量给填充了。这样你修改 Value 的时候,就不会影响到其他变量的缓存行。

其实也不用上面写的那么复杂,在 JDK8 中提供了 @Contended 的注解,当然一般来说只允许JDK中内部使用,如果你自己使用那就得配置Jvm参数 -RestricContentended = fase,将限制这个注解置位取消。很多文章分析了 ConcurrentHashMap ,但是都把这个注解给忽略掉了,在ConcurrentHashMap 中就使用了这个注解,在 ConcurrentHashMap 每个桶都是单独的用计数器去做计算,而这个计数器由于时刻都在变化,所以被用这个注解进行填充缓存行优化,以此来增加性能。
img
在 ArrayBlockingQueue 中,takeIndex、putIndex、count 这三个变量很容易放到一个缓存行中, 但是之间修改没有太多的关联. 所以每次修改, 都会使之前缓存的数据失效, 从而不能完全达到共享的效果。当生产者线程put一个元素到 ArrayBlockingQueue 时, putIndex 会修改, 从而导致消费者线程的缓存中的缓存行无效, 需要从主存中重新读取,这就是在 ArrayBlockingQueue 中的伪共享问题。
img

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** items index for next take, poll, peek or remove */
    /** 需要被取走的元素下标 */
    int takeIndex; 

    /** items index for next put, offer, or add */
    /** 可被元素插入的位置的下标 */
    int putIndex;

    /** Number of elements in the queue */
    /**  队列中元素的数量 */
    int count;

    ......
复制代码

Disruptor高性能的设计

分析完了 ArrayBlockingQueue 低效的原因,下面来看看 Disruptor 是如何设计的。Disruptor主要是通过以下设计来解决队列速度慢的问题:
1、环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

2、无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

3、元素位置定位
数组长度2^n,通过位运算,加快定位的速度,这与HashMap是一样的。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

1、Disruptor 的数据结构 —— RingBuffer

下面来看看 Disruptor 的队列底层数据结构 —— RingBuffer。它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程)间传递数据的buffer。
img
基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用的元素。如下图右边的图片表示序号,这个序号指向数组的索引4的位置。
img
随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。
img
要找到数组中当前序号指向的元素,槽的个数最好设置为2的N次方,2的N次方换成二进制就是1000,100,10,1这样的数字, sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。

因为它是数组,所以要比链表快,数组内元素的内存地址的连续性存储的。这是对CPU缓存友好的。也就是说,在硬件级别,数组中的元素是会被预加载的,因此在ringbuffer当中,CPU 无需时不时去主存加载数组中的下一个元素。因为只要一个元素被加载到缓存行,其他相邻的几个元素也会被加载进同一个缓存行。
其次,你可以为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。

RingBuffer 没有尾指针。只维护了一个指向下一个可用位置的序号。而且不删除 buffer 中的数据,也就是说这些数据一直存放在 buffer 中,直到新的数据覆盖他们。当初 LMAX 团队选择用环形 buffer 的最初原因就是想要提供可靠的消息传递。我们需要将已经被服务发送过的消息保存起来,这样当另外一个服务通过拒绝应答信号告诉我们没有成功收到消息时,我们能够重新发送给他们。环形 buffer 非常适合这个场景,它维护了一个指向尾部的序号,当收到拒绝应答信号请求,可以重发从那一点到当前序号之间的所有消息:
img

如何从Ringbuffer读取

img
消费者(Consumer)是一个想从 Ring Buffer 里读取数据的线程,它可以访问 ConsumerBarrier 对象——这个对象由 RingBuffer 创建并且代表消费者与RingBuffer 进行交互。就像 Ring Buffer 显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了 Ring Buffer 里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9。

消费者可以调用 ConsumerBarrier 对象的 waitFor()方法,传递它所需要的下一个序号:

final long availableSeq = consumerBarrier.waitFor(nextSequence);
复制代码

ConsumerBarrier 返回 RingBuffer 的最大可访问序号 —— 在上面的例子中是12。ConsumerBarrier 有一个 WaitStrategy 方法来决定它如何等待这个序号。
接下来,消费者会一直逛来逛去,等待更多数据被写入 Ring Buffer。并且,写入数据后消费者会收到通知——节点 9,10,11 和 12 已写入。现在序号 12 到了,消费者可以指示 ConsumerBarrier 去拿这些序号里的数据了。
img
在 Disruptor 中采用了数组的方式保存了我们的数据,上面我们也介绍了采用数组保存我们访问时很好的利用缓存,但是在 Disruptor 中进一步选择采用了环形数组进行保存数据,也就是RingBuffer。在这里先说明一下环形数组并不是真正的环形数组,在 RingBuffer 中是采用取余的方式进行访问的,比如数组大小为 10,0访问的是数组下标为0这个位置,其实10,20等访问的也是数组的下标为0的这个位置。

以前需要逐个节点地询问 『我可以拿下一个数据吗?现在可以了么?现在呢?』,消费者(Consumer)现在只需要简单的说『当你拿到的数字比我这个要大的时候请告诉我』,函数返回值会告诉它有多少个新的节点可以读取数据了。因为这些新的节点的确已经写入了数据(Ring Buffer本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。这太好了,不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。

另一个好处是——你可以用多个消费者(Consumer)去读同一个 RingBuffer ,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在Disruptor 的协调下实现真正的并发数据处理。

如何写入 Ringbuffer

写入 RingBuffer时的核心就是不要让 Ring 重叠;如何通知消费者;生产者一端的批处理;以及多个生产者如何协同工作。
写入 Ring Buffer 的过程涉及到两阶段提交 (two-phase commit)。首先,生产者需要申请 buffer 里的下一个节点。然后,当生产者向节点写完数据,它将会调用 ProducerBarrier 的 commit 方法。那么首先来看看第一步。『给我 Ring Buffer 里的下一个节点』,这句话听起来很简单。的确,从生产者角度来看它很简单:简单地调用 ProducerBarrier 的 nextEntry() 方法,这样会返回给你一个 Entry 对象,这个对象就是 Ring Buffer 的下一个节点。在后台,由 ProducerBarrier 负责所有的交互细节来从 Ring Buffer 中找到下一个节点,然后才允许生产者向它写入数据。
img
现在假设只有一个生产者写入 Ring Buffer。过一会儿我们再处理多个生产者的复杂问题。ConsumerTrackingProducerBarrier 对象拥有所有正在访问 RingBuffer 的消费者列表。这看起来有点儿奇怪-我从没有期望 ProducerBarrier 了解任何有关消费端那边的事情。但是等等,这是有原因的。因为我们不想与队列“混为一谈”(队列需要追踪队列的头和尾,它们有时候会指向相同的位置),Disruptor 由消费者负责通知它们处理到了哪个序列号,而不是 Ring Buffer。所以,如果我们想确定我们没有让 Ring Buffer 重叠,需要检查所有的消费者们都读到了哪里。
在上图中,有一个 消费者 顺利的读到了最大序号 12(用红色/粉色高亮)。第二个消费者 有点儿落后——可能它在做 I/O 操作之类的——它停在序号 3。因此消费者 2 在赶上消费者 1 之前要跑完整个 Ring Buffer 一圈的距离。

现在生产者想要写入 Ring Buffer 中序号 3 占据的节点,因为它是 Ring Buffer 当前游标的下一个节点。但是 ProducerBarrier 明白现在不能写入,因为有一个消费者正在占用它。所以,ProducerBarrier 停下来自旋 (spins),等待,直到那个消费者离开。
现在可以想像消费者 2 已经处理完了一批节点,并且向前移动了它的序号。可能它挪到了序号 9(因为消费端的批处理方式,现实中我会预计它到达 12,但那样的话这个例子就不够有趣了)。
img
上图显示了当消费者 2 挪动到序号 9 时发生的情况。在这张图中我已经忽略了ConsumerBarrier,因为它没有参与这个场景。
ProducerBarier 会看到下一个节点——序号 3 那个已经可以用了。它会抢占这个节点上的 Entry(我还没有特别介绍 Entry 对象,基本上它是一个放写入到某个序号的 Ring Buffer 数据的桶),把下一个序号(13)更新成 Entry 的序号,然后把 Entry 返回给生产者。生产者可以接着往 Entry 里写入数据。

现在到了第二阶段:提交新的数据
img
绿色表示最近写入的 Entry,序号是 13。当生产者结束向 Entry 写入数据后,它会要求 ProducerBarrier 提交。
ProducerBarrier 先等待 RingBuffer 的游标追上当前的位置(对于单生产者这毫无意义-比如,我们已经知道游标到了 12 ,而且没有其他人正在写入 Ring Buffer)。然后 ProducerBarrier 更新 RingBuffer 的游标到刚才写入的 Entry 序号-在我们这儿是 13。接下来,ProducerBarrier 会让消费者知道 buffer 中有新东西了。它戳一下 ConsumerBarrier 上的 WaitStrategy 对象说-“喂,醒醒!有事情发生了!”(注意-不同的 WaitStrategy 实现以不同的方式来实现提醒,取决于它是否采用阻塞模式。)

现在消费者 1 可以读 Entry 13 的数据,消费者 2 可以读 Entry 13 以及前面的所有数据,然后它们都过得很 happy。

ProducerBarrier 上的批处理

Disruptor 可以同时在生产者和 消费者 两端实现批处理。还记得伴随着程序运行,消费者 2 最后达到了序号 9 吗?ProducerBarrier 可以在这里做一件很狡猾的事-它知道 Ring Buffer 的大小,也知道最慢的消费者位置。因此它能够发现当前有哪些节点是可用的。
img
如果 ProducerBarrier 知道 RingBuffer 的游标指向 12,而最慢的消费者在 9 的位置,它就可以让生产者写入节点 3,4,5,6,7 和 8,中间不需要再次检查消费者的位置。

Disruptor 使用方法

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享