Java线程池源码分析与总结(上)

Java线程池技术

线程池概述

  • 池化技术应该是最常用的提高程序性能的手段,包括线程池与数据库连接池,常量池等等

  • 创建与销毁线程是比较耗费时间的,不利于处理Java程序的高并发,因此引入线程池,也就是维护一组可用的线程,如果有任务,就立即将线程池的空闲线程分配给任务,提升性能,如果线程池内所有的线程都是忙状态的话,可以将任务放到任务队列,或者创建一个新的线程并放入线程池,用于处理新的任务

  • 使用线程池的好处

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

    • 在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。

      为什么呢?

      使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

    • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。

    • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。(最原始粗放的服务器实现就是请求绑定一个套接字后就新开一个线程去处理,如果请求量巨大的时候,服务器是肯定要崩的,因为缺乏对线程资源的管理)

      • 线程池监控的方法:

        • SpringBoot 中的 Actuator 组件

        • 通过ThreadPoolExecutor的自有接口获取线程池信息

          image-20210415154301230

线程池在实际项目中的使用场景

  • 线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。

  • 举个项目中实际使用的例子:

    • dockerhub项目中Caches缓存中使用的定时线程池用来定时更新缓存的数据到数据库(执行异步任务
    • 其余的例子大都类似
  • 实际使用时要注意的一般规则

    • 使用线程池,而不是创建单个线程

    • 使用ThreadPoolExecutor构造函数而不是Executors工具类,下文有具体的解释

    • 显式的定义线程池名字,以业务名字作区分,便于定位问题

      • 可以使用自定义的ThreadFactory

        import java.util.concurrent.Executors;
        import java.util.concurrent.ThreadFactory;
        import java.util.concurrent.atomic.AtomicInteger;
        /**
         * 线程工厂,它设置线程名称,有利于我们定位问题。
         */
        public final class NamingThreadFactory implements ThreadFactory {
        
            private final AtomicInteger threadNum = new AtomicInteger();
            private final ThreadFactory delegate;
            private final String name;
        
            /**
             * 创建一个带名字的线程池生产工厂
             */
            public NamingThreadFactory(ThreadFactory delegate, String name) {
                this.delegate = delegate;
                this.name = name; // TODO consider uniquifying this
            }
        
            @Override 
            public Thread newThread(Runnable r) {
                Thread t = delegate.newThread(r);
                t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
                return t;
            }
        
        }
        复制代码
      • 使用guava的ThreadFactoryBuilder

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                                .setNameFormat(threadNamePrefix + "-%d")
                                .setDaemon(true).build();
        ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
        复制代码
    • 不同的业务使用不同的线程池

      • 一般建议是不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务
    • 有依赖关系的任务在使用同一个线程池在稍高的并发状况下可能会出现一种逻辑上的死锁,大概来说就是父任务A中调用了子任务B,父任务与子任务共用一个线程池,当父任务占据了全部的核心线程资源,并且子任务仍未执行时,无法退出对核心线程的占用,而与此同时子任务只能堆积在任务队列中,无法获得线程资源,如果又使用了无界队列的话,则会一直堆积直到OOM,具体的参考线程池运用不当的一次线上事故

线程池类的继承、实现关系

Executor框架

  • Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Threadstart 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题。

    补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误,如果用volatile修饰的话应该就能解决这个问题了,不知道Executor框架的出现是如何有助于解决此问题的呢?—不是很清楚

    Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单

  • 实际上在Executor框架中,还有一个线程池ForkJoinPool可能用的不太多,此类继承AbstractExecutorService,文章末尾会介绍到

  • 除了说Executor框架,还有一种说法就是JUC框架,也就是java.util.concurrent这个包下的所有的多线程相关类的总称

Executor的框架结构

任务的提交

向线程池提交任务有两种方法:

  1. execute方法

    1. 只接受Runnable的任务,不提供返回值,源码分析见下文(作为线程池的入口一定是要仔细分析的
  2. submit方法

    public Future<?> submit(Runnable task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<Void> ftask = newTaskFor(task, null);
      execute(ftask);
      return ftask;
    }
    
    /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
    public <T> Future<T> submit(Runnable task, T result) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task, result);
      execute(ftask);
      return ftask;
    }
    
    /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
    public <T> Future<T> submit(Callable<T> task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task);
      execute(ftask);
      return ftask;
    }
    
    
    复制代码
    • ThreadPoolExecutor没有实现自己的submit方法,而是沿用的父类AbstractExecutorService的实现

    • 接受RunnableCallable的任务,并提供Future类型返回值

      • submit内部将传入的任务统一封装为RunnableFuture类型,此类型实现了RunnableFuture接口,老缝合怪了~

      • 不同之处就在于传入Runnable的任务得到的Future可能无法得到有效的返回值,而Callable的任务能够得到返回结果

        • 提交Runnable任务时也可以指定一个返回结果,作为Future的返回结果,但是这个结果显然并不是任务执行完成的返回值,而是程序员事先传入的值,其作用类似于是一个flag值

          public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
          }
          
          public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
              throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
          }
          
          static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
              this.task = task;
              this.result = result;
            }
            public T call() {
              task.run();
              return result;
            }
          }
          复制代码
          • 可以清晰的看见,对于传入的Runnable任务会被转换为Callable类型,如果有传入预期的返回值,call函数中就会原封不动的返回,但是如果没有传入,就是返回null了
    • submit内部实际上仍然调用了execute方法

    • 此处补充CallableRunnable的差异:

      • 前者的执行方法内部可以有返回值,并且如果无法得到有效返回值还可以抛出异常,后者的执行方法中没有返回值也不能抛出异常
    • 补充Future接口的作用

      • 可以通过isDone判断任务是否执行完
      • 通过get方法获得执行结果
        1. 注意此方法是阻塞方法,需要等待任务执行完毕后才能返回

销毁(关闭)线程池

  1. shutdown方法 关闭线程池,线程池的状态变为 SHUTDOWN线程池不再接受新任务了,但是队列里的任务得执行完毕

    1. 执行shutdown方法后,可以执行awaitTermination方法,则会等待指定的时间让线程池关闭,若在指定时间内关闭则返回true,否则false

    2. shutdown源码分析

      public void shutdown() {
              final ReentrantLock mainLock = this.mainLock;
        			// 上锁
              mainLock.lock();
              try {
                  // 判断调用者是否有权限shutdown线程池
                  checkShutdownAccess();
                  // CAS 设置线程池状态为SHUTDOWN
                  advanceRunState(SHUTDOWN);
                  // 中断所有空闲线程
                  interruptIdleWorkers();
                  // 钩子函数
                  onShutdown(); // hook for ScheduledThreadPoolExecutor
              } finally {
                  // 解锁
                  mainLock.unlock();
              }
              // 尝试终止线程池
              tryTerminate();
          }
      复制代码
      • 对于interruptIdleWorkers函数的解析与tryTerminate的解析放在了后边
  2. shutdownNow方法 闭线程池,线程的状态变为 STOP线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的任务列表

    public List<Runnable> shutdownNow() {
      List<Runnable> tasks;
      final ReentrantLock mainLock = this.mainLock;
      // 上锁
      mainLock.lock();
      try {
        // 判断调用者是否有权限shutdown线程池
        checkShutdownAccess();
        // CAS 设置线程池状态为STOP
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 从队列中获取剩余的未执行的工作列表
        tasks = drainQueue();
      } finally {
        mainLock.unlock();
      }
      // 尝试终止线程池
      tryTerminate();
      // 返回未执行的任务列表
      return tasks;
    }
    复制代码
    • interruptWorkers的解析放到了后文中
  3. 使用如下两个方法来判断线程池是否完全关闭

    1. isTerminated() 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true,或者是执行shutdownNow后,线程池内的线程全部被中断,工作线程数量为0后返回true
    2. isShutdown() 当调用 shutdown() 方法后返回为 true。

Executor框架使用图

  1. 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
  2. 把创建完成的实现 Runnable/Callable接口的 对象直接交给 execute 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 submit 执行(ExecutorService.submit(Runnable task)ExecutorService.submit(Callable <T> task))。
  3. 如果执行 ExecutorService.submit(…)ExecutorService 将返回一个实现Future接口的对象(刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象
  4. 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

线程池的使用

使用Executors工具类创建线程池

  • 创建线程池的最方便的做法是使用Executors工具类,可以创建普通的线程池与可以执行定时任务的线程池,但是简单的创建方法意味着封装的程度高,就会导致自由度低,甚至有一些风险

普通的线程池

  • 固定线程数量的线程池

    • 该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。

      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
      }
      // 默认任务队列的长度是Integer.MAX_VALUE
      public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
      }
      复制代码
      • 核心线程数量与最大线程数量一致,也就是核心线程就是全部可用的线程了
      • 使用的是无界的队列,不会拒绝任务,但是也因此也带来了隐患。
      • 由于使用无界队列, maximumPoolSize 将是事实上的无效参数,因为不可能存在任务队列满的情况(可以将任务队列视作系统内最大,所以不用设置最大线程数,因为再多的任务也完全可以缓存在队列中)。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPoolcorePoolSizemaximumPoolSize 被设置为同一个值。
        • 因为同样的理由,使用无界队列时 keepAliveTime 将是一个无效参数(因为不会有核心线程之外的其余线程)(当然,如果空闲核心线程被允许超时回收的话,就是有用的了,即是,如果空闲就会立即展开回收)
      • 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;所以一旦corePoolSize设置不对的话,将会有大量任务干等着,并且性能也没有完全发挥
      • 允许创建的线程个数虽然有限制,但是允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM
  • 仅有一个线程的线程池

    • 可以视为是固定线程数量线程池的特值情况,即nThreads为1的情况

      public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        // 使用包装类包装过的,用来保证:
        return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory));
      }
      复制代码
  • 动态分配线程数量的线程池

    • 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

      public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>(),
                                            threadFactory);
      }
      复制代码
      • 核心线程数量设置为0,不养闲人,没有任务时,线程池也不会白白占用资源
      • 因为核心线程树为0,线程池中的所有线程都不是核心线程,因此都会在60秒内,接不到活时被回收
      • 动态分配与按需创建的功能的实现应归功于SynchronousQueue类型的任务队列,这个队列不会缓存任务,而是如果有空闲线程就一定会交给空闲线程执行,没有空闲线程就直接创建新线程:
        • 在execute方法中首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前线程池中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成。
        • 当初始条件下线程池内的线程数量为0时,或者线程池中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)这种情况下,offer方法将返回false,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成
      • 虽然队列使用的是有界队列,但是最大线程数量是Integer.MAX_VALUE,这意味着线程池可以不受控的一直接受任务,直到栈空间OOM

执行定时任务的线程池

定时任务线程池的创建
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
  int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}


public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue(), threadFactory, handler);
}
复制代码
  • 虽然队列使用的是有界队列,但是最大线程数量是Integer.MAX_VALUE,这意味着线程池可以不受控的一直接受任务,直到栈空间OOM

  • 需要注意的是,尽管ScheduledExecutorService是内部调用了父类ThreadPoolExecutord的构造方法,但是其内部实现的核心入口方法不再是ThreadPoolExecutor的execute方法,而是ScheduledThreadPoolExecutor中的delayExecute方法

  • 定时任务的实现依赖于延迟队列DelayedWorkQueue

  • 可以发现执行定时任务可以使用springboot中的@Scheduled注解,也可以使用底层的定时任务线程池实际上本线程池基本不会用,因为实现定时任务有其他的方案,比如springboot的注解与quartz等等

    备注: Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等

执行不同种类的定时任务
  • 一次性的延迟任务 schedule方法

    public ScheduledFuture<?> schedule(Runnable command,
                                           long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    复制代码
    • 这里值得补充的是,ScheduledThreadPoolExecutor重写了executesubmit方法,两个方法内部实际上都是简单地调用schedule方法来实现的
  • 以上一次任务开始为基准固定间隔循环执行任务 scheduleAtFixedRate方法

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    复制代码
  • 以上一次任务结束为基准固定间隔循环执行任务 scheduleWithFixedDelay方法

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(-delay));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    复制代码
  • 后两者的区别见图

    • 引出两个问题:

      • 如果在fixed-rate模式下,任务的执行时间大于间隔时间,那么任务是怎样安排执行的呢? 答案是:后续的任务会在上一个任务执行完毕后再开始执行,而不管执行间隔了,也就是延迟执行,而不是并发执行

      • 如果做定时间隔任务时,前边的任务出现异常,后续的任务会继续执行吗? 答:一旦出现异常,当前的任务与后续的任务都不会再执行,而是卡住,并且能通过自定义afterExecute方法来处理异常,保证抛出异常的任务取消,而其他任务继续执行

定时任务线程池的大致工作原理的理解
  • 上边说过了,定时任务线程池的核心入口就是上边三种类型的任务方法中都有的一个方法–就是delayedExecute,但是在说这个关键的入口方法之前,不得说下,调用方法前对于提交的任务的包装,包装这一块设计到的类比较多,先用一张类图大致把握

    image-20210425151258617

  • 首先包装为ScheduledFutureTask

    // 用于包装schedule(Runnable)提交的任务
    // result为null,ns是纳秒为单位的,要触发执行任务的系统时间
    ScheduledFutureTask(Runnable r, V result, long ns) {
      super(r, result);
      this.time = ns;
      this.period = 0;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // 包装scheduleWithFixedDelay和scheduleAtFixedRate提交的任务
    // result 为null
    // ns是纳秒为单位的,下一次要触发执行任务的系统时间
    // period是以纳秒为单位的任务循环周期
    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
      super(r, result);
      this.time = ns;
      this.period = period;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // 包装schedule(Callable)提交的任务
    // ns是纳秒为单位的,要触发执行任务的系统时间
    ScheduledFutureTask(Callable<V> callable, long ns) {
      super(callable);
      this.time = ns;
      this.period = 0;
      this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    
    // 关键的run方法
    public void run() {
      // 首先判断是不是周期性执行的任务
      boolean periodic = isPeriodic();
      // 判断当前的线程池能否执行定时任务,如果不能则取消任务
      if (!canRunInCurrentRunState(periodic))
        cancel(false);
      else if (!periodic)
        // 如果不是周期性任务,也就是一次性的定时任务的话,直接执行提交的任务
        ScheduledFutureTask.super.run();
      // 如果是周期性执行的任务,首先执行提交的任务,并将任务的状态重置为初始化状态,以备下一次执行
      else if (ScheduledFutureTask.super.runAndReset()) {
        // 执行完毕后计算下一次执行的时间
        setNextRunTime();
        // 重新提交当前的任务到延时队列中,用于下一个周期的执行
        reExecutePeriodic(outerTask);
      }
    }
    
    // 计算下一次要执行任务的时间
    // time表示下一次执行任务的时间,period是用来计算time的周期时间
    private void setNextRunTime() {
      long p = period;
      if (p > 0)
        // scheduleAtFixedRate
        // 在第一次执行完任务后,下一次要执行的时间就是完全按照周期来执行,不管到底什么时候执行完的(也就是now),之后的每次执行都是如此
        time += p;
      else
        // scheduleWithFixedDelay
        // 第一次执行完任务后,下一次要执行的时间是以当前时间为基准计算的,也就是上一次完成任务的时间为基准计算的,之后的每次执行都是如此
        time = triggerTime(-p);
    }
    
    // 用于在延迟队列中按照下一次触发的顺序进行排序
    public int compareTo(Delayed other) {
      if (other == this) // compare zero if same object
        return 0;
      if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
          return -1;
        else if (diff > 0)
          return 1;
        // 触发时间一致的,按照提交的顺序来
        else if (sequenceNumber < x.sequenceNumber)
          return -1;
        else
          return 1;
      }
      long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
      return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }
    复制代码
    • scheduleWithFixedDelayscheduleAtFixedRate在实现时的区别就在于此次包装过程中,前者传入的周期是unit.toNanos(-delay)而后者是unit.toNanos(perioid)
      • 其原理在于setNextRunTime方法中,详见方法注释
    • 在此次包装过程中,定时循环任务与一次行的定时任务在实现上除了period之外还有一个区别就是outerTask
      • 定时循环任务会持有此属性,以便能够在本轮任务执行完毕后,将当前的任务重新提交到延迟队列中,以备下一轮周期的执行,参考reExecutePeriodic方法
  • 其次包装为RunnableScheduleFuture

    protected <V> RunnableScheduledFuture<V> decorateTask(
      Runnable runnable, RunnableScheduledFuture<V> task) {
      return task;
    }
    protected <V> RunnableScheduledFuture<V> decorateTask(
      Callable<V> callable, RunnableScheduledFuture<V> task) {
      return task;
    }
    复制代码
    • 实际上是直接返回RunnableScheduledFuture,但是没有看懂为什么要用这样的一个方法类型提升
  • 定时任务线程池的入口方法delayedExecute

    private void delayedExecute(RunnableScheduledFuture<?> task) {
      // 1. 判断线程池是不是shutdown状态,如果是执行拒绝策略
      if (isShutdown())
        reject(task);
      else {
        // 2. 首先就是向DelayedWorkQueue中添加任务
        super.getQueue().add(task);
        // 3. 不管是一般的线程池还是执行定时任务的线程池,都会在向队列中添加完任务后执行re-check
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
          task.cancel(false);
        else
          // 4. 如果通过了recheck,执行此方法
          // 确保线程池内有线程运行
          ensurePrestart();
      }
    }
    
    void ensurePrestart() {
      int wc = workerCountOf(ctl.get());
      // 对于Executors创建的线程池来说,核心线程数量为0,所以会保证有非核心线程执行
      if (wc < corePoolSize)
        addWorker(null, true);
      else if (wc == 0)
        addWorker(null, false);
    }
    复制代码
    • 如果线程池状态不是SHUTDOWN的话,直接向队列中添加任务,而没有直接让线程去执行任务的场景
  • addWorker开始,后续的就是标准的线程池的线程管理与任务获取的流程了,也就是说定时任务线程池与一般线程池的主要区别在于任务调度部分,而连接任务管理与线程管理的通道–延时队列也需要大致了解下

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
            implements BlockingQueue<Runnable> {
     
     // 任务调度时提交任务的方法就是add方法
     public boolean add(Runnable e) {
       return offer(e);
     }
      
      public boolean offer(Runnable x) {
        if (x == null)
          throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          int i = size;
          if (i >= queue.length)
            grow();
          size = i + 1;
          if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
          } else {
            // 按照排序规则,选择合适的位置插入到队列中
            siftUp(i, e);
          }
          if (queue[0] == e) {
            leader = null;
            available.signal();
          }
        } finally {
          lock.unlock();
        }
        return true;
      }
      // 按照排序规则,选择合适的位置插入到队列中
      private void siftUp(int k, RunnableScheduledFuture<?> key) {
        while (k > 0) {
          int parent = (k - 1) >>> 1;
          RunnableScheduledFuture<?> e = queue[parent];
          // 按照RunnableScheduledFuture的time属性进行排序
          if (key.compareTo(e) >= 0)
            break;
          queue[k] = e;
          setIndex(e, k);
          k = parent;
        }
        queue[k] = key;
        setIndex(key, k);
      }
      
      // getTask中,核心线程取任务(无超时时间)
      // 如果当前不能获取,就阻塞等待
      public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
              available.await();
            else {
              long delay = first.getDelay(NANOSECONDS);
              if (delay <= 0)
                return finishPoll(first);
              first = null; // don't retain ref while waiting
              if (leader != null)
                available.await();
              else {
                Thread thisThread = Thread.currentThread();
                leader = thisThread;
                try {
                  available.awaitNanos(delay);
                } finally {
                  if (leader == thisThread)
                    leader = null;
                }
              }
            }
          }
        } finally {
          if (leader == null && queue[0] != null)
            available.signal();
          lock.unlock();
        }
      }
      
      //  getTask中,非核心线程取任务或则核心线程获取任务(允许超时回收)
      public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
          for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
              if (nanos <= 0)
                return null;
              else
                nanos = available.awaitNanos(nanos);
            } else {
              long delay = first.getDelay(NANOSECONDS);
              if (delay <= 0)
                return finishPoll(first);
              if (nanos <= 0)
                return null;
              first = null; // don't retain ref while waiting
              if (nanos < delay || leader != null)
                nanos = available.awaitNanos(nanos);
              else {
                Thread thisThread = Thread.currentThread();
                leader = thisThread;
                try {
                  long timeLeft = available.awaitNanos(delay);
                  nanos -= delay - timeLeft;
                } finally {
                  if (leader == thisThread)
                    leader = null;
                }
              }
            }
          }
        } finally {
          if (leader == null && queue[0] != null)
            available.signal();
          lock.unlock();
        }
      }
      
    }
    复制代码
    • DelayedWorkQueue的内部存储是RunnableScheduledFuture类型的数组
    • 提交任务与获取任务用的是同一把锁

参考

  1. Java线程池实现原理及其在美团业务中的实践–美团技术团队
  2. Java线程池学习总结
  3. Java线程池
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享