LinkedBlockingQueue源码解析(JDK1.8)

LinkedBlockingQueue源码解析(JDK1.8)

LinkedBlockingQueue是基于链表的先进先出的无界队列

同样我们还是先上类的关系图

LinkedBlockingQueue类的关系图

从类的关系图中可以看出LinkedBlockingQueue继承一个抽象类和实现了两个接口,然后分别简单介绍一下:

  • AbstractQueue:这里主要提供增删查等的相关操作
  • BlockingQueue:提供更多情况下的增删查等操作
  • Serializable:启用其序列化功能操作

属性

属性相关的源码


    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

复制代码

private final int capacity;//存储元素的最大容量

private final AtomicInteger count = new AtomicInteger();//当前阻塞队列中的元素数量

transient Node head;//链表的头节点

private transient Node last;//链表的尾节点

private final ReentrantLock takeLock = new ReentrantLock();//元素出队时线程所获取的锁

private final Condition notEmpty = takeLock.newCondition();//读取操作时是否让线程等待

private final ReentrantLock putLock = new ReentrantLock();//元素入队时线程所获取的锁

private final Condition notFull = putLock.newCondition();//添加操作时是否让线程等待

节点

    static class Node<E> {
        E item;//存储自身节点的元素

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;//存储的下一个节点

        Node(E x) { item = x; }
    }
复制代码

构造方法


    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

复制代码

从三个构造方法中可以看出主要是capacity这个的参数,capacity默认值为Integer.MAX_VALUE,支持自己设置capacity

预备知识

AtomicInteger是一个提供原子操作的Integer类,通过线程安全的方式操作加减。提供原子操作来进行Integer的使用,因此十分适合高并发情况下的使用。

ReentrantLock是一个支持响应中断、超时、尝试获取锁,可关联多个条件队列,是个可重入的公平锁和非公平锁。

Condition这个主要是用来实现ReentrantLock的wait和notify的功能

增删改查

入队操作


    public void put(E e) throws InterruptedException {
        //元素为null,则抛出空指针异常
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        //构造新的节点
        Node<E> node = new Node<E>(e);
        //获取到put操作的锁
        final ReentrantLock putLock = this.putLock;
        //获取当前元素的数量
        final AtomicInteger count = this.count;
        //执行可中断的锁获取操作,即意味着如果线程由于获取锁而处于Blocked状态时,线程是可以被中断而不再继续等待,这也是一种避免死锁的一种方式,不会因为发现到死锁之后而由于无法中断线程最终只能重启应用。
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            //如果队列中元素的容量达到最大容量,则线程处于等待状态,直到队列有空闲的容量才能继续执行 
            while (count.get() == capacity) {
                notFull.await();
            }
            //让元素入队,enqueue在下面分析
            enqueue(node);
            //getAndIncrement()方法对应的代码是unsafe.getAndAddInt(this, valueOffset, 1),即获取当前的元素个数并+1
            c = count.getAndIncrement();
            //c+1得到的结果是新元素入队之后的元素数量,这个数量小于最大容量,则去唤醒其他等待入队的线程,这里putLock、takeLock是各管各的,因此也是各自唤醒各自的等待线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
        //c等于0,代表队列之前是空队列,则去通知其他在等待的获取元素相关的队列,因为现在队列不能空了,可以取元素了
        if (c == 0)
            signalNotEmpty();
    }
    
    
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        //这个代码比较绕,其实就是last.next = node;last=node;意思是当前的尾节点的下一个节点是新节点,然后尾节点变成最新的节点,完成了链表的插入并重新设置了尾节点
        last = last.next = node;
    }    

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        //加锁
        takeLock.lock();
        try {
            //唤醒等待获取元素的线程
            notEmpty.signal();
        } finally {
            //释放锁
            takeLock.unlock();
        }
    }
    
复制代码

上面看了put()的入队操作,我们再来看一下类似的offer()方法是怎么入队的


    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        //元素为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        //时间转换为毫秒
        long nanos = unit.toNanos(timeout);
        int c = -1;
        //获取put的锁
        final ReentrantLock putLock = this.putLock;
        //得到元素的个数
        final AtomicInteger count = this.count;
        //执行可中断的锁获取操作,即意味着如果线程由于获取锁而处于Blocked状态时,线程是可以被中断而不再继续等待,这也是一种避免死锁的一种方式,不会因为发现到死锁之后而由于无法中断线程最终只能重启应用。
        putLock.lockInterruptibly();
        try {
            //如果队列中元素的容量达到最大容量,则线程处于等待状态,直到队列有空闲的容量才能继续执行 
            while (count.get() == capacity) {
                //如果剩余等待的时间小于等于0,直接返回失败
                if (nanos <= 0)
                    return false;
                //使的当前线程挂起,直到发生 其他线程调用当前的Condition对象的signal方法或者signalAll方法;其它线程中断了当前线程,此时将会抛出InterruptedException,当前线程继续执行;当前线程被虚假唤醒;指定的超时时间到,超时时间单位为纳秒。这四种情况,才会返回
                nanos = notFull.awaitNanos(nanos);
            }
            //入队,前面说过了
            enqueue(new Node<E>(e));
            //getAndIncrement()方法对应的代码是unsafe.getAndAddInt(this, valueOffset, 1),即获取当前的元素个数并+1
            c = count.getAndIncrement();
            //c+1得到的结果是新元素入队之后的元素数量,这个数量小于最大容量,则去唤醒其他等待入队的线程,这里putLock、takeLock是各管各的,因此也是各自唤醒各自的等待线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
        //c等于0,代表队列之前是空队列,则去通知其他在等待的获取元素相关的队列,因为现在队列不能空了,可以取元素了
        if (c == 0)
            signalNotEmpty();
        return true;
    }


    public boolean offer(E e) {
        //元素为null,直接抛出空指针异常
        if (e == null) throw new NullPointerException();
        //获取元素的个数
        final AtomicInteger count = this.count;
        //元素的个数等于最大容量,表示队列已满,直接返回false失败
        if (count.get() == capacity)
            return false;
        int c = -1;
        //新节点构建
        Node<E> node = new Node<E>(e);
        //获取put锁
        final ReentrantLock putLock = this.putLock;
        //加锁
        putLock.lock();
        try {
            //元素的个数小于最大容量,队列未满,进行入队操作
            if (count.get() < capacity) {
                //入队,上面put()中分析过这个函数了,就是链表拆入元素
                enqueue(node);
                //getAndIncrement()方法对应的代码是unsafe.getAndAddInt(this, valueOffset, 1),即获取当前的元素个数并+1
                c = count.getAndIncrement();
                //c+1得到的结果是新元素入队之后的元素数量,这个数量小于最大容量,则去唤醒其他等待入队的线程,这里putLock、takeLock是各管各的,因此也是各自唤醒各自的等待线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //释放锁
            putLock.unlock();
        }
        //c等于0,代表队列之前是空队列,则去通知其他在等待的获取元素相关的队列,因为现在队列不能空了,可以取元素了
        if (c == 0)
            signalNotEmpty();
        //返回入队是否成功    
        return c >= 0;
    }
    
复制代码

总结一下入队:利用锁进行加锁,然后链表拆入新元素,唤醒其他put等待线程,释放锁。

删除元素


    public boolean remove(Object o) {
        //删除的元素为null,直接返回false
        if (o == null) return false;
        //put锁、take锁都加锁
        fullyLock();
        try {
            //从头节点开始查找,直到找到第一个相同元素就进行删除操作
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                 //找到相同元素,当前节点p,trail是当前节点p的前一个节点
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
           //put锁、take锁都释放锁
            fullyUnlock();
        }
    }

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        //设置当前待删除节点的元素为null
        p.item = null;
        //当前节点的前一个的下一个是当前节点的下一个节点
        trail.next = p.next;
        //如果尾节点是p,代表删除的是尾节点,需重新设置尾节点,则新的尾节点是trail
        if (last == p)
            last = trail;
        //getAndDecrement的代码是unsafe.getAndAddInt(this, valueOffset, -1),即获取当前的元素个数并-1,等于最大容量,则去唤醒其他等待删除的线程
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
    
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
    
复制代码

出队操作


    public E take() throws InterruptedException {
        E x;
        int c = -1;
        //获取元素个数
        final AtomicInteger count = this.count;
        //获取take出队锁
        final ReentrantLock takeLock = this.takeLock;
        //执行可中断的锁获取操作,即意味着如果线程由于获取锁而处于Blocked状态时,线程是可以被中断而不再继续等待,这也是一种避免死锁的一种方式,不会因为发现到死锁之后而由于无法中断线程最终只能重启应用。
        takeLock.lockInterruptibly();
        try {
            //元素个数为0,代表没有元素可出队,则进入等待状态
            while (count.get() == 0) {
                notEmpty.await();
            }
            //出队并得到出队的元素
            x = dequeue();
           //getAndDecrement的代码是unsafe.getAndAddInt(this, valueOffset, -1),即获取当前的元素个数并-1
            c = count.getAndDecrement();
            //c大于1,则去唤醒其他等待出队的线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            //释放锁
            takeLock.unlock();
        }
        //当c等于最大容量,即在获取当前元素之前,队列已经满了,而此时获取元素之后,队列就会空出一个位置,故当前线程会唤醒执行插入操作的线程通知其他中的一个可以进行插入操作。
        if (c == capacity)
            signalNotFull();
        return x;
    }


    //链表的元素移除,头节点移除
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }   


    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //唤醒put等待的锁
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
复制代码

上面看了take()的出队操作,我们再来看一下类似的poll()方法是怎么出队的,可以参考offer()方法的自行分析一下

 
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    } 

复制代码

其他方法

peek()查询得到头节点的元素

 
    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    } 

复制代码

相关对比

  • ArrayBlockingQueue:是基于数组的先进先出的有界循环队列,使用的是同一个lock,所以即使在多核CPU的情况下,其读取和操作的都无法做到并行

  • LinkedBlockingQueue:基于链表的先进先出的无界队列,读取和插入操作所使用的锁是两个不同的lock,它们之间的操作互相不受干扰,因此两种操作可以并行完成,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。

队列

  • Deque 双端队列:
  • 未实现阻塞接口的:
    • LinkedList : 实现了Deque接口,受限的队列
    • PriorityQueue : 优先队列,本质维护一个有序列表。可自然排序亦可传递 comparator构造函数实现自定义排序。
    • ConcurrentLinkedQueue:基于链表 线程安全的队列。增加删除O(1) 查找O(n)
  • 实现阻塞接口的:实现BlockingQueue接口的五个阻塞队列,其特点:线程阻塞时,不是直接添加或者删除元素,而是等到有空间或者元素时,才进行操作。
    • ArrayBlockingQueue: 基于数组的有界队列
    • LinkedBlockingQueue: 基于链表的无界队列
    • PriorityBlockingQueue:基于优先次序的无界队列
    • DelayQueue:基于时间优先级的队列
    • SynchronousQueue:内部没有容器的队列 较特别 –其独有的线程一一配对通信机制
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享