这是我参与8月更文挑战的第18天,活动详情查看:8月更文挑战
线程池系列
Java并发编程-线程池框架(一)
Java并发编程-线程池源码分析(二)
Java并发编程-JDK线程池和Spring线程池(三)
Java并发编程-线程池优雅关闭(四)
Java并发编程-阻塞队列BlockingQueue
Java并发编程-如何设置线程池大小?
前言
ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor。它主要用来在给定的延迟之后执行任务,或者定期执行任务。
ScheduledThreadPoolExecutor的功能与Timer类似,但比Timer更强大,更灵活,Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
1. ScheduledThreadPoolExecutor 框架
构造函数
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
复制代码
ScheduledThreadPoolExecutor的本质依然是一个线程池,但是于其他的线程池不同的是使用了 DelayedWorkQueue().
1.1 ScheduledExecutorService
ScheduledExecutorService 本身继承了ExecutorService接口,并为调度任务额外提供了两种模式, 延迟执行和周期执行。
延迟执行
// 1. 根据参数中设定的延时,执行一次任务
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 2. 根据参数中设定的延时,执行一次任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
复制代码
周期执行
//3. 创建并执行一个周期性动作,该动作在给定的初始延迟之后首先启用,然后在给定的周期之后启用;
//也就是说,执行将在initialDelay之后开始,然后initialDelay+period,然后initialDelay+ 2 * period,以此类推。
//如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只能通过取消或终止执行器来终止。
//如果该任务的任何执行花费的时间超过了它的周期,那么后续执行可能会延迟开始,但不会并发执行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//创建并执行一个周期性操作,该操作在给定的初始延迟之后首先启用,然后在一次执行终止和下一次执行开始之间启用给定的延迟。
//如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只能通过取消或终止执行器来终止。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
复制代码
1.2 内部类 ScheduledFutureTask
ScheduledFutureTask是ScheduledThreadPoolExecutor对于RunnableScheduledFuture的默认实现,并且继承了FutureTask。
它覆盖了FutureTask的run方法来实现对延时执行、周期执行的支持。
1.3 内部类 DelayedWorkQueue
DelayedWorkQueue是ScheduledThreadPoolExecutor中阻塞队列的实现,它内部使用了小根堆来使得自身具有优先队列的功能,并且通过Leader/Follower模式避免线程不必要的等待。
从DelayedWorkQueue中取出任务时,任务一定已经至少到了可以被执行的时间。
2. 使用案例
设置一个调度线程池, 设置两种不同的调度方法,并执行。
public class BeeperControl {
// 定义一个定时调度线程池
private static final ScheduledExecutorService scheduler =
new ScheduledThreadPoolExecutor(1);
public static void beepForAnHour() {
final Runnable beeper = new Runnable() {
@Override
public void run() { System.out.println("beep"); }
};
// 设置周期执行, 10s执行一次
final ScheduledFuture<?> beeperHandle =
scheduler.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);
// 延迟执行
scheduler.schedule(new Runnable() {
@Override
public void run() {
beeperHandle.cancel(true);
}
}, 60 * 60, TimeUnit.SECONDS);
}
public static void main(String[] args) {
beepForAnHour();
}
}
复制代码
3. 源码分析
3.1 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 任务提交的入口方法主要是execute,submit, schedule, scheduleAtFixedRate以及scheduleWithFixedDelay这几类。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
复制代码
3.2 ScheduledFutureTask
3.2.1 基本属性
// 任务的序列号,在排序中会用到。
private final long sequenceNumber;
// 任务可以被执行的时间,以纳秒表示。
private long time;
// 0表示非周期任务。正数表示fixed-rate模式,负数表示fixed-delay模式。
private final long period;
// The actual task to be re-enqueued by reExecutePeriodic
RunnableScheduledFuture<V> outerTask = this;
// 用于维护该任务在DelayedWorkQueue内部堆中的索引(在堆数组中的index)。
int heapIndex;
复制代码
3.2.2 ScheduledFutureTask#run方法
public void run() {
// 是否是周期性任务
boolean periodic = isPeriodic();
// 检查任务是否可以被执行
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果非周期性任务直接调用run运行即可。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果成功runAndRest
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次运行时间并调用reExecutePeriodic。
setNextRunTime();
// 需要重新将任务(outerTask)放到工作队列中。
reExecutePeriodic(outerTask);
}
}
复制代码
3.2.3 ScheduledFutureTask#cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 先调用父类FutureTask#cancel来取消任务
boolean cancelled = super.cancel(mayInterruptIfRunning);
// removeOnCancel开关用于控制任务取消后是否应该从队列中移除。
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
复制代码
3.3 DelayedWorkQueue
DelayedWorkQueue是ScheduledThreadPoolExecutor使用的工作队列。它内部维护了一个小根堆,根据任务的执行开始时间来维护任务顺序。但不同的地方在于,它对于ScheduledFutureTask类型的元素额外维护了元素在队列中堆数组的索引,用来实现快速取消。DelayedWorkQueue用了ReentrantLock+Condition来实现管程保证数据的线程安全性。
3.3.1 DelayedWorkQueue#offer方法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
复制代码
3.3.2 DelayedWorkQueue#take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
复制代码
3.3.3 DelayedWorkQueue#poll(long, TimeUnit)方法
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
复制代码
3.3.4 DelayedWorkQueue#remove方法
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
复制代码
总结
- ScheduledThreadPoolExecutor内部使用了ScheduledFutureTask来表示任务,即使对于execute方法也将其委托至schedule方法,以零延时的形式实现。同时ScheduledThreadPoolExecutor也允许我们通过decorateTask方法来包装任务以实现定制化的封装。
- ScheduledThreadPoolExecutor内部使用的阻塞队列DelayedWorkQueue通过小根堆来实现优先队列的功能。由于DelayedWorkQueue是无界的,所以本质上对于ScheduledThreadPoolExecutor而言,maximumPoolSize并没有意义。