Java并发编程-ScheduledThreadPoolExecutor源码分析

这是我参与8月更文挑战的第18天,活动详情查看:8月更文挑战

线程池系列

Java并发编程-线程池框架(一)
Java并发编程-线程池源码分析(二)
Java并发编程-JDK线程池和Spring线程池(三)
Java并发编程-线程池优雅关闭(四)
Java并发编程-阻塞队列BlockingQueue
Java并发编程-如何设置线程池大小?

前言

ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor。它主要用来在给定的延迟之后执行任务,或者定期执行任务。
ScheduledThreadPoolExecutor的功能与Timer类似,但比Timer更强大,更灵活,Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

1. ScheduledThreadPoolExecutor 框架

image.png

构造函数

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}
复制代码

ScheduledThreadPoolExecutor的本质依然是一个线程池,但是于其他的线程池不同的是使用了 DelayedWorkQueue().

1.1 ScheduledExecutorService

image.png

ScheduledExecutorService 本身继承了ExecutorService接口,并为调度任务额外提供了两种模式, 延迟执行和周期执行。

延迟执行

// 1. 根据参数中设定的延时,执行一次任务
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay, TimeUnit unit);
// 2. 根据参数中设定的延时,执行一次任务                                   
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay, TimeUnit unit);
                                  
复制代码

周期执行

//3. 创建并执行一个周期性动作,该动作在给定的初始延迟之后首先启用,然后在给定的周期之后启用;
//也就是说,执行将在initialDelay之后开始,然后initialDelay+period,然后initialDelay+ 2 * period,以此类推。
//如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只能通过取消或终止执行器来终止。
//如果该任务的任何执行花费的时间超过了它的周期,那么后续执行可能会延迟开始,但不会并发执行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);

//创建并执行一个周期性操作,该操作在给定的初始延迟之后首先启用,然后在一次执行终止和下一次执行开始之间启用给定的延迟。
//如果任务的任何执行遇到异常,则禁止后续执行。否则,任务只能通过取消或终止执行器来终止。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);

复制代码

1.2 内部类 ScheduledFutureTask

image.png
ScheduledFutureTask是ScheduledThreadPoolExecutor对于RunnableScheduledFuture的默认实现,并且继承了FutureTask。
它覆盖了FutureTask的run方法来实现对延时执行、周期执行的支持。

1.3 内部类 DelayedWorkQueue

DelayedWorkQueue是ScheduledThreadPoolExecutor中阻塞队列的实现,它内部使用了小根堆来使得自身具有优先队列的功能,并且通过Leader/Follower模式避免线程不必要的等待。
从DelayedWorkQueue中取出任务时,任务一定已经至少到了可以被执行的时间。

2. 使用案例

设置一个调度线程池, 设置两种不同的调度方法,并执行。

public class BeeperControl {
    // 定义一个定时调度线程池
    private static final ScheduledExecutorService scheduler =
            new ScheduledThreadPoolExecutor(1);

    public static void beepForAnHour() {
        final Runnable beeper = new Runnable() {
            @Override
            public void run() { System.out.println("beep"); }
        };
        // 设置周期执行, 10s执行一次
        final ScheduledFuture<?> beeperHandle =
                scheduler.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);

        // 延迟执行     
        scheduler.schedule(new Runnable() {
            @Override
            public void run() {
                beeperHandle.cancel(true);
            }
        }, 60 * 60, TimeUnit.SECONDS);
    }

    public static void main(String[] args) {
        beepForAnHour();
    }
}
复制代码

3. 源码分析

3.1 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 任务提交的入口方法主要是execute,submit, schedule, scheduleAtFixedRate以及scheduleWithFixedDelay这几类。

public void execute(Runnable command) {
    schedule(command, 0, NANOSECONDS);
}


public <T> Future<T> submit(Runnable task, T result) {
    return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}


public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

public <T> Future<T> submit(Callable<T> task) {
    return schedule(task, 0, NANOSECONDS);
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
    }

复制代码

3.2 ScheduledFutureTask

image.png

3.2.1 基本属性

// 任务的序列号,在排序中会用到。
private final long sequenceNumber;

// 任务可以被执行的时间,以纳秒表示。
private long time;

// 0表示非周期任务。正数表示fixed-rate模式,负数表示fixed-delay模式。
private final long period;

// The actual task to be re-enqueued by reExecutePeriodic
RunnableScheduledFuture<V> outerTask = this;

// 用于维护该任务在DelayedWorkQueue内部堆中的索引(在堆数组中的index)。
int heapIndex;

复制代码

3.2.2 ScheduledFutureTask#run方法

public void run() {
    // 是否是周期性任务
    boolean periodic = isPeriodic();
    // 检查任务是否可以被执行
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果非周期性任务直接调用run运行即可。    
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果成功runAndRest  
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 设置下次运行时间并调用reExecutePeriodic。
        setNextRunTime();
        // 需要重新将任务(outerTask)放到工作队列中。
        reExecutePeriodic(outerTask);
    }
}
复制代码

3.2.3 ScheduledFutureTask#cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {
    // 先调用父类FutureTask#cancel来取消任务
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // removeOnCancel开关用于控制任务取消后是否应该从队列中移除。
    if (cancelled && removeOnCancel && heapIndex >= 0)
        remove(this);
    return cancelled;
}
复制代码

3.3 DelayedWorkQueue

DelayedWorkQueue是ScheduledThreadPoolExecutor使用的工作队列。它内部维护了一个小根堆,根据任务的执行开始时间来维护任务顺序。但不同的地方在于,它对于ScheduledFutureTask类型的元素额外维护了元素在队列中堆数组的索引,用来实现快速取消。DelayedWorkQueue用了ReentrantLock+Condition来实现管程保证数据的线程安全性。

3.3.1 DelayedWorkQueue#offer方法

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);
        }
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}
复制代码

3.3.2 DelayedWorkQueue#take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
复制代码

3.3.3 DelayedWorkQueue#poll(long, TimeUnit)方法

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
复制代码

3.3.4 DelayedWorkQueue#remove方法

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = indexOf(x);
        if (i < 0)
            return false;

        setIndex(queue[i], -1);
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        if (s != i) {
            siftDown(i, replacement);
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}
复制代码

总结

  1. ScheduledThreadPoolExecutor内部使用了ScheduledFutureTask来表示任务,即使对于execute方法也将其委托至schedule方法,以零延时的形式实现。同时ScheduledThreadPoolExecutor也允许我们通过decorateTask方法来包装任务以实现定制化的封装。
  2. ScheduledThreadPoolExecutor内部使用的阻塞队列DelayedWorkQueue通过小根堆来实现优先队列的功能。由于DelayedWorkQueue是无界的,所以本质上对于ScheduledThreadPoolExecutor而言,maximumPoolSize并没有意义。

参考

Interface ScheduledExecutorService

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享