ArrayBlockingQueue源码解析(JDK1.8)
ArrayBlockingQueue是基于数组的先进先出的有界循环队列
同样我们还是先上类的关系图
从类的关系图中可以看出ArrayBlockingQueue继承一个抽象类和实现了两个接口,然后分别简单介绍一下:
- AbstractQueue:这里主要提供增删查等的相关操作
- BlockingQueue:提供更多情况下的增删查等操作
- Serializable:启用其序列化功能操作
属性
属性相关的源码
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null;
复制代码
final Object[] items;//存储元素的数组
int takeIndex;//读取元素时的下标
int putIndex;//添加元素时的下标
int count;//元素的个数
final ReentrantLock lock;//控制并发的锁
private final Condition notEmpty;//读取操作时是否让线程等待
private final Condition notFull;//添加操作时是否让线程等待
transient Itrs itrs = null;//允许队列操作更新迭代器状态
构造方法
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
复制代码
从三个构造方法中可以看出主要是capacity、fair这两个的不同参数
capacity:设置容量,必传值
fair:是否公平,默认值为false,是设置ReentrantLock是公平锁还是非公平锁的
预备知识
ReentrantLock是一个支持响应中断、超时、尝试获取锁,可关联多个条件队列,是个可重入的公平锁和非公平锁。
Condition这个主要是用来实现ReentrantLock的wait和notify的功能
增删改查
入队操作
public void put(E e) throws InterruptedException {
//检验元素是否为空,为null就抛出空指针异常
checkNotNull(e);
//拿到当前的锁
final ReentrantLock lock = this.lock;
//进行锁的抢占
lock.lockInterruptibly();
try {
//当队列长度等于数组长度,表示队列已满,这里使用while来循环等待被唤醒
while (count == items.length)
//put操作时让当前线程处于等待状态
notFull.await();
//当队列没有满并且获取到锁时就进行enqueue入队操作
enqueue(e);
} finally {
//完成锁的释放
lock.unlock();
}
}
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//设置当前数组putIndex位置的元素为x,这里是尾部插入
items[putIndex] = x;
//putIndex++,然后如果putIndex的大小等于数据的长度,则设置putIndex=0,这里是循环队列的思想
if (++putIndex == items.length)
putIndex = 0;
//元素的个数+1
count++;
//put操作结束,唤醒其他正在处于等待状态的线程
notEmpty.signal();
}
复制代码
另外offer()函数也可以入队,思路也差不多,也可以了解一下,总结一下入队操作,先检查,然后判断队列没有满并且拿到锁进行入队操作进行设置数组相关位置的值,设置完成,唤醒其他处于等待状态的线程
删除元素
remove()具体元素的出队
public boolean remove(Object o) {
//出队的元素为null,直接返回失败
if (o == null) return false;
//得到数据
final Object[] items = this.items;
//拿到锁
final ReentrantLock lock = this.lock;
//进行锁住操作
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//循环查找i位置的数据释放就是要删除出队的数据,如果是就进行删除
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
//元素个数没有大于0直接返回失败
return false;
} finally {
//锁的释放
lock.unlock();
}
}
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
//待删除的下标等于读取的下标
if (removeIndex == takeIndex) {
// removing front item; just advance
//设置数组对应下标的值为null
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
//元素的个数-1
count--;
//通知所有迭代器头节点出队
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
//从待删除的下标开始循环查找并设置数据i位置的数据
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
//通过所有迭代器有内部节点出队
if (itrs != null)
itrs.removedAt(removeIndex);
}
//唤醒其他等待的线程
notFull.signal();
}
复制代码
take()是队列头数据出队,poll()方法也是一样的
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//尝试去获取锁,如果锁被其他线程占用,那么当前线程就属于等待状态
lock.lockInterruptibly();
try {
//如果元素个数为0,就让当前线程等待,并且释放锁
while (count == 0)
notEmpty.await();
//如果队列不为空,则进行出队,从队列头部取元素
return dequeue();
} finally {
//完成锁的释放
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取takeIndex位置的元素并将其转换为泛型
E x = (E) items[takeIndex];
//设置takeIndex位置数据为null,便于gc回收
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//通知所有迭代器做相关清理工作
if (itrs != null)
itrs.elementDequeued();
//唤醒其他等待线程
notFull.signal();
return x;
}
复制代码
上面涉及到了Itrs的两个方法:elementDequeued()、removedAt(),有兴趣的可以看一下具体的实现。
其他方法
contains()查询队列中是否有这个元素
//也运用到了锁,然后循环从takeIndex位置找,找到了就返回true,当takeIndex==putIndex就跳出循环返回false
public boolean contains(Object o) {
//元素为null,
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
复制代码
相关对比
-
ArrayBlockingQueue:是基于数组的先进先出的有界循环队列
-
LinkedBlockingQueue:基于链表的先进先出的无界队列
队列
- Deque 双端队列:
- 未实现阻塞接口的:
- LinkedList : 实现了Deque接口,受限的队列
- PriorityQueue : 优先队列,本质维护一个有序列表。可自然排序亦可传递 comparator构造函数实现自定义排序。
- ConcurrentLinkedQueue:基于链表 线程安全的队列。增加删除O(1) 查找O(n)
- 实现阻塞接口的:实现BlockingQueue接口的五个阻塞队列,其特点:线程阻塞时,不是直接添加或者删除元素,而是等到有空间或者元素时,才进行操作。
- ArrayBlockingQueue: 基于数组的有界队列
- LinkedBlockingQueue: 基于链表的无界队列
- PriorityBlockingQueue:基于优先次序的无界队列
- DelayQueue:基于时间优先级的队列
- SynchronousQueue:内部没有容器的队列 较特别 –其独有的线程一一配对通信机制