TransferQueue源码

原理

TransferQueue在BlockingQueue的基础上添加了:

  1. tryTransfer();//尝试将元素直接给到消费者
  2. transfer();//将元素给到消费者,必要时等待
  3. hasWaitingConsumer() //是否有等待的消费者
  4. getWaitingConsumerCount()//返回有几个等待的消费者数量

xfer()

TransferQueue中的核心方法无非是:put(),take(),poll(),offer()和上面的几个方法。查看源码就会发现大部分都有用到xfer()方法,只是参数不同。

   public void put(E e) {
        xfer(e, true, ASYNC, 0);
    }

 public boolean offer(E e, long timeout, TimeUnit unit) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
    
    public boolean tryTransfer(E e) {
        return xfer(e, true, NOW, 0) == null;
    }
  public void transfer(E e) throws InterruptedException {
        if (xfer(e, true, SYNC, 0) != null) {
            Thread.interrupted(); // failure possible only due to interrupt
            throw new InterruptedException();
        }
    }
    
  public E take() throws InterruptedException {
        E e = xfer(null, false, SYNC, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
    
     public E poll() {
        return xfer(null, false, NOW, 0);
    }
 
复制代码

xfer()方法

/**
     *
     *
     * @param e 元素
     * @param haveData  put是true,take是false
     * @param how 分为四种类型:NOW(立即), ASYNC(异步), SYNC(同步), or TIMED(超时时间)
     * @param nanos  超时时间 ,仅用在TIMED中
     * @return 匹配返回item,不匹配返回e
     */
    private E xfer(E e, boolean haveData, int how, long nanos) {
        if (haveData && (e == null)) //如果是插入数据但是数据为null
            throw new NullPointerException();
        Node s = null;                        

        retry:
        for (;;) {                          

            for (Node h = head, p = h; p != null;) { // 从头查找匹配的节点
                boolean isData = p.isData;
                Object item = p.item;
                //(item != null) == isData 可能是put,也可能是take未匹配
                if (item != p && (item != null) == isData) { 
                    if (isData == haveData)   // 相同类型无法匹配
                        break;
                    if (p.casItem(item, e)) { // 匹配,cas修改item
                        //更新head
                        for (Node q = p; q != h;) {
                            Node n = q.next;  
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 
                            
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;       
                        }
                        LockSupport.unpark(p.waiter);
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                //指向下个节点
                Node n = p.next;
                p = (p != n) ? n : (h = head); 
            }
            //如果不需要立即返回
            if (how != NOW) { 
               //首次,创建节点
                if (s == null)
                    s = new Node(e, haveData);
                //尝试在末尾添加    
                Node pred = tryAppend(s, haveData);
                if (pred == null) //表示不能在末尾加,模式与末尾节点的模式相反
                    continue retry;     
                //不是异步的,进行阻塞等待    
                if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }

复制代码

总结:TransferQueue 会先尝试和head匹配,匹配失败就插入末尾。与SychronousQueue相比多了缓存队列,与BlockingQueue相比可以直接将元素传递

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享