SychronousQueue源码解析

SychronousQueue原理

  1. SychronousQueue是无缓冲阻塞队列,是BlockingQueue的实现类
  2. 因为是无缓冲,没有内部容量因此 peek(),contains(),clear(),isEmpty()等等方法是无效的。
  3. 内部通过Transferer实现传输,有两个实现类:TransferQueue(公平)和TransferStack(非公平)

TransferStack(非公平)

 static final class TransferStack<E> extends Transferer<E> {
       

         //SNode的三种模式
         // 表示是消费者
        static final int REQUEST    = 0;
         //表示是生产者
        static final int DATA       = 1;
         //表示正在匹配中
        static final int FULFILLING = 2;

       
        static final class SNode {
            volatile SNode next;        // 下一个节点
            volatile SNode match;       // 匹配的节点
            volatile Thread waiter;     // 当前等待的线程
            Object item;                // 数据
            int mode;                   // 上面三种模式

       //头节点
        volatile SNode head;

        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
            /*
             * 1.如果头节点为null或者当前节点和头节点的模式相同,就入栈
             * 2.如果模式不同并且头节点没有正在匹配,现将当前节点入栈,然后尝试匹配这两个节              * 点
             * 3.如果头节点正在匹配中,就辅助匹配
             */

            SNode s = null; 
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // 如果头节点为null或者当前节点和头节点的模式相同,就入栈
                    if (timed && nanos <= 0) {      // 不用等待超时
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);    
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos); //自旋+阻塞,如果匹配到就返回。
                        if (m == s) {    //被取消          
                            clean(s);
                            return null;
                        }
                        //已经匹配成功,将移除已经匹配的节点
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // 如果模式不同并且头节点没有正在匹配
                    if (h.isCancelled())            
                        casHead(h, h.next);         
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { 
                            SNode m = s.next;       
                            if (m == null) {        //匹配为null,证明已经被其他线程匹配了
                                casHead(s, null);   
                                s = null;          
                                break;             
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  
                                s.casNext(m, mn);  
                        }
                    }
                } else {                            // //如果头节点正在匹配中,协助匹配
                    SNode m = h.next;              
                    if (m == null)                  
                        casHead(h, null);         
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          
                            casHead(h, mn);         
                        else                        
                            h.casNext(m, mn);      
                    }
                }
            }
        }

       
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
           //超时时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            //当前线程
            Thread w = Thread.currentThread();
            // 自旋次数
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted()) // 如果被打断就尝试取消
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)    
                    return m;
                if (timed) {   //如果设置超时时间
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) { 
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0) //有自旋次数时,进行自旋
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null) //没有设置等待线程,就设置当前线程为等待线程
                    s.waiter = w;
                else if (!timed) //不需要超时就直接阻塞
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold) 
                    LockSupport.parkNanos(this, nanos);
            }
        }

       
        boolean shouldSpin(SNode s) {
            SNode h = head;
            return (h == s || h == null || isFulfilling(h.mode));
        }

    // m是s的下一个节点,这时候m节点的线程应该是阻塞状态的
    boolean tryMatch(SNode s) {
        // 如果m还没有匹配者,就把s作为它的匹配者
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    //不为空就唤醒当前线程
                waiter = null;
                LockSupport.unpark(w);
            }
            // 匹配到了返回true
            return true;
        }
        // 可能其它线程先一步匹配了m,返回其是否是s
        return match == s;
    }


    }
复制代码

TransferQueue(公平)

如果队列为空或者节点与尾节点类型相同,就从尾部添加入队,通过awaitFulfill方法,只有在超时/打断/匹配的情况下才会结束,其他情况,head.next会先自旋,其他节点会直接阻塞
如果不为空并且与head.next节点匹配,更新头节点,dequeue线程

   static final class TransferQueue<E> extends Transferer<E> {
        static final class QNode {
            volatile QNode next;          // 下一个节点
            volatile Object item;         // 数据
            volatile Thread waiter;       // 当前线程
            final boolean isData;         //是否是数据节点
         
        }

        //头节点  
        transient volatile QNode head;
        //尾结点
        transient volatile QNode tail;
        /* 对应 中断或超时的 前继节点,这个节点存在的意义是标记, 它的下个节点要删除
        * 当你要删除节点node, 若节点node是队列的末尾, 则开始用这个节点,因为删除一个节点 直接 A.CASNext(B, B.next) 就可以,但是当节点B是整个队列中的末尾元素时,一个线程删除节点B, 一个线程在节点B之后插入节点这样操作容易致使插入的节点丢失, 这个cleanMe很像ConcurrentSkipListMap 中的删除添加的marker节点, 他们都是起着相同的作用
        */
        transient volatile QNode cleanMe;
        @SuppressWarnings("unchecked")
        E transfer(E e, boolean timed, long nanos) {
         

            QNode s = null;
            boolean isData = (e != null); //生产者不为null,消费者为null

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // 没有初始化,跳过
                    continue;                       

                if (h == t || t.isData == isData) { // 尾部节点的类型和当前节点的类型相同,入队
                    QNode tn = t.next;
                    if (t != tail)                  //tail发生改变了
                        continue;
                    if (tn != null) {               // 其他线程添加了,更新尾节点
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // 超时
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        
                        continue;

                    advanceTail(t, s);              
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // 节点因超时或打断要移除
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           
                        advanceHead(t, s);          
                        if (x != null)             
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            
                    QNode m = h.next;               
                    if (t != tail || m == null || h != head)
                        continue;                   

                    Object x = m.item;
                    if (isData == (x != null) ||     //head.next节点是否和当前节点匹配
                        x == m ||                   
                        !m.casItem(x, e)) {         
                        advanceHead(h, m);          
                        continue;
                    }
                                                                // 成功
                            advanceHead(h, m);                  //更新头节点
                        LockSupport.unpark(m.waiter);           //释放锁
                    return (x != null) ? (E)x : e;         
                }
            }
        }

      
     

       
    }
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享