原理
TransferQueue在BlockingQueue的基础上添加了:
- tryTransfer();//尝试将元素直接给到消费者
- transfer();//将元素给到消费者,必要时等待
- hasWaitingConsumer() //是否有等待的消费者
- getWaitingConsumerCount()//返回有几个等待的消费者数量
xfer()
TransferQueue中的核心方法无非是:put(),take(),poll(),offer()和上面的几个方法。查看源码就会发现大部分都有用到xfer()方法,只是参数不同。
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll() {
return xfer(null, false, NOW, 0);
}
复制代码
xfer()方法
/**
*
*
* @param e 元素
* @param haveData put是true,take是false
* @param how 分为四种类型:NOW(立即), ASYNC(异步), SYNC(同步), or TIMED(超时时间)
* @param nanos 超时时间 ,仅用在TIMED中
* @return 匹配返回item,不匹配返回e
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null)) //如果是插入数据但是数据为null
throw new NullPointerException();
Node s = null;
retry:
for (;;) {
for (Node h = head, p = h; p != null;) { // 从头查找匹配的节点
boolean isData = p.isData;
Object item = p.item;
//(item != null) == isData 可能是put,也可能是take未匹配
if (item != p && (item != null) == isData) {
if (isData == haveData) // 相同类型无法匹配
break;
if (p.casItem(item, e)) { // 匹配,cas修改item
//更新head
for (Node q = p; q != h;) {
Node n = q.next;
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
}
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break;
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
//指向下个节点
Node n = p.next;
p = (p != n) ? n : (h = head);
}
//如果不需要立即返回
if (how != NOW) {
//首次,创建节点
if (s == null)
s = new Node(e, haveData);
//尝试在末尾添加
Node pred = tryAppend(s, haveData);
if (pred == null) //表示不能在末尾加,模式与末尾节点的模式相反
continue retry;
//不是异步的,进行阻塞等待
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
复制代码
总结:TransferQueue 会先尝试和head匹配,匹配失败就插入末尾。与SychronousQueue相比多了缓存队列,与BlockingQueue相比可以直接将元素传递
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END