SychronousQueue原理
- SychronousQueue是无缓冲阻塞队列,是BlockingQueue的实现类
- 因为是无缓冲,没有内部容量因此 peek(),contains(),clear(),isEmpty()等等方法是无效的。
- 内部通过Transferer实现传输,有两个实现类:TransferQueue(公平)和TransferStack(非公平)
TransferStack(非公平)
static final class TransferStack<E> extends Transferer<E> {
//SNode的三种模式
// 表示是消费者
static final int REQUEST = 0;
//表示是生产者
static final int DATA = 1;
//表示正在匹配中
static final int FULFILLING = 2;
static final class SNode {
volatile SNode next; // 下一个节点
volatile SNode match; // 匹配的节点
volatile Thread waiter; // 当前等待的线程
Object item; // 数据
int mode; // 上面三种模式
//头节点
volatile SNode head;
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/*
* 1.如果头节点为null或者当前节点和头节点的模式相同,就入栈
* 2.如果模式不同并且头节点没有正在匹配,现将当前节点入栈,然后尝试匹配这两个节 * 点
* 3.如果头节点正在匹配中,就辅助匹配
*/
SNode s = null;
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // 如果头节点为null或者当前节点和头节点的模式相同,就入栈
if (timed && nanos <= 0) { // 不用等待超时
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos); //自旋+阻塞,如果匹配到就返回。
if (m == s) { //被取消
clean(s);
return null;
}
//已经匹配成功,将移除已经匹配的节点
if ((h = head) != null && h.next == s)
casHead(h, s.next);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // 如果模式不同并且头节点没有正在匹配
if (h.isCancelled())
casHead(h, h.next);
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
SNode m = s.next;
if (m == null) { //匹配为null,证明已经被其他线程匹配了
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
s.casNext(m, mn);
}
}
} else { // //如果头节点正在匹配中,协助匹配
SNode m = h.next;
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//当前线程
Thread w = Thread.currentThread();
// 自旋次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) // 如果被打断就尝试取消
s.tryCancel();
SNode m = s.match;
if (m != null)
return m;
if (timed) { //如果设置超时时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0) //有自旋次数时,进行自旋
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null) //没有设置等待线程,就设置当前线程为等待线程
s.waiter = w;
else if (!timed) //不需要超时就直接阻塞
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
// m是s的下一个节点,这时候m节点的线程应该是阻塞状态的
boolean tryMatch(SNode s) {
// 如果m还没有匹配者,就把s作为它的匹配者
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { //不为空就唤醒当前线程
waiter = null;
LockSupport.unpark(w);
}
// 匹配到了返回true
return true;
}
// 可能其它线程先一步匹配了m,返回其是否是s
return match == s;
}
}
复制代码
TransferQueue(公平)
如果队列为空或者节点与尾节点类型相同,就从尾部添加入队,通过awaitFulfill方法,只有在超时/打断/匹配的情况下才会结束,其他情况,head.next会先自旋,其他节点会直接阻塞
如果不为空并且与head.next节点匹配,更新头节点,dequeue线程
static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next; // 下一个节点
volatile Object item; // 数据
volatile Thread waiter; // 当前线程
final boolean isData; //是否是数据节点
}
//头节点
transient volatile QNode head;
//尾结点
transient volatile QNode tail;
/* 对应 中断或超时的 前继节点,这个节点存在的意义是标记, 它的下个节点要删除
* 当你要删除节点node, 若节点node是队列的末尾, 则开始用这个节点,因为删除一个节点 直接 A.CASNext(B, B.next) 就可以,但是当节点B是整个队列中的末尾元素时,一个线程删除节点B, 一个线程在节点B之后插入节点这样操作容易致使插入的节点丢失, 这个cleanMe很像ConcurrentSkipListMap 中的删除添加的marker节点, 他们都是起着相同的作用
*/
transient volatile QNode cleanMe;
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
boolean isData = (e != null); //生产者不为null,消费者为null
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 没有初始化,跳过
continue;
if (h == t || t.isData == isData) { // 尾部节点的类型和当前节点的类型相同,入队
QNode tn = t.next;
if (t != tail) //tail发生改变了
continue;
if (tn != null) { // 其他线程添加了,更新尾节点
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 超时
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s))
continue;
advanceTail(t, s);
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // 节点因超时或打断要移除
clean(t, s);
return null;
}
if (!s.isOffList()) {
advanceHead(t, s);
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else {
QNode m = h.next;
if (t != tail || m == null || h != head)
continue;
Object x = m.item;
if (isData == (x != null) || //head.next节点是否和当前节点匹配
x == m ||
!m.casItem(x, e)) {
advanceHead(h, m);
continue;
}
// 成功
advanceHead(h, m); //更新头节点
LockSupport.unpark(m.waiter); //释放锁
return (x != null) ? (E)x : e;
}
}
}
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END