Java线程池源码分析与总结(下)

使用ThreadPoolExecutor直接创建线程池

  • 使用有界队列和有限数量的线程数会保证安全

线程池的状态

  • 在关闭线程池章节中,查看源码实际上会发现线程池有许多状态:

    /**
         * The main pool control state, ctl, is an atomic integer packing
         * two conceptual fields
         *   workerCount, indicating the effective number of threads
         *   runState,    indicating whether running, shutting down etc
         *
         * In order to pack them into one int, we limit workerCount to
       * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
         * billion) otherwise representable. If this is ever an issue in
         * the future, the variable can be changed to be an AtomicLong,
         * and the shift/mask constants below adjusted. But until the need
         * arises, this code is a bit faster and simpler using an int.
         *
         * The workerCount is the number of workers that have been
         * permitted to start and not permitted to stop.  The value may be
         * transiently different from the actual number of live threads,
         * for example when a ThreadFactory fails to create a thread when
         * asked, and when exiting threads are still performing
         * bookkeeping before terminating. The user-visible pool size is
         * reported as the current size of the workers set.
         *
         * The runState provides the main lifecycle control, taking on values:
         *
         *   RUNNING:  Accept new tasks and process queued tasks
         *   SHUTDOWN: Don't accept new tasks, but process queued tasks
         *   STOP:     Don't accept new tasks, don't process queued tasks,
         *             and interrupt in-progress tasks
         *   TIDYING:  All tasks have terminated, workerCount is zero,
         *             the thread transitioning to state TIDYING
         *             will run the terminated() hook method
         *   TERMINATED: terminated() has completed
         *
         * The numerical order among these values matters, to allow
         * ordered comparisons. The runState monotonically increases over
         * time, but need not hit each state. The transitions are:
         *
         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TIDYING
         *    When both queue and pool are empty
         * STOP -> TIDYING
         *    When pool is empty
         * TIDYING -> TERMINATED
         *    When the terminated() hook method has completed
         *
         * Threads waiting in awaitTermination() will return when the
         * state reaches TERMINATED.
         *
         * Detecting the transition from SHUTDOWN to TIDYING is less
         * straightforward than you'd like because the queue may become
         * empty after non-empty and vice versa during SHUTDOWN state, but
         * we can only terminate if, after seeing that it is empty, we see
         * that workerCount is 0 (which sometimes entails a recheck -- see
         * below).
         */
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    复制代码
    • 将注释翻译下来就是对着几个线程池状态的具体描述:

      • 其中五个状态

        RUNNING:接收新的任务,处理队列中的任务;

        SHUTDOWN:不接收新的任务,但处理队列中的任务

        STOP:不接收新的任务,不处理队列中的任务,中断正在执行的任务

        TIDYING:所有任务都终止,有效线程数为0线程过度到TIDYING时会调用terminated钩子方法

        TERMINATED:terminated()方法执行完毕后进入该状态;

      • 状态之间的转换

        RUNNING -> SHUTDOWN:调用shutdown方法;

        (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow方法;

        SHUTDOWN -> TIDYING:当线程池和任务队列都为空(队列中没有未执行的任务了,并且所有线程都完成了工作处于赋闲状态);

        STOP -> TIDYING:当线程池中工作线程数量为0(其实就是变为stop状态,对所有正在执行任务的线程执行中断,也不再处理队列中未处理的任务,一旦中断全部完成,所有工作线程数量就为0了,直接进入tidying状态,也不管队列中的任务了);

        TIDYING -> TERMINATED:当terminated方法执行完毕;

      • 状态转换示意图

        image-20210419143355740

  • 线程池状态是由命名为ctl的AtomicIntegr的成员变量持有的(共32位),包含以下两个信息:

    • 线程池状态-最高3位

    • 线程池中线程数量-低29位

      // 初始化线程池状态-RUNNING 0工作线程
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      // 设置位数 高3位与低29位分别表示线程池状态与线程池工作线程数量
      private static final int COUNT_BITS = Integer.SIZE - 3;
      // 线程的最大数量大概是5亿多
      private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
      
      // runState is stored in the high-order bits
      private static final int RUNNING    = -1 << COUNT_BITS;  // 111
      private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 000
      private static final int STOP       =  1 << COUNT_BITS;  // 001
      private static final int TIDYING    =  2 << COUNT_BITS;  // 010
      private static final int TERMINATED =  3 << COUNT_BITS;  // 011
      
      // Packing and unpacking ctl
      // 根据ctl获取线程池状态
      private static int runStateOf(int c)     { return c & ~CAPACITY; }
      // 根据ctl获取线程池中工作线程数量
      private static int workerCountOf(int c)  { return c & CAPACITY; }
      // 使用runstate与workercount组装ctl,初始状态下rs 为RUNNING wc为0
      private static int ctlOf(int rs, int wc) { return rs | wc; }
      
      private static boolean runStateLessThan(int c, int s) {
        return c < s;
      }
      
      private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
      }
      // 注意大小比较关系
      private static boolean isRunning(int c) {
        return c < SHUTDOWN;
      }
      复制代码
      • 事实上Integer.MAX_VALUE大于CAPACITY,所以可以说实际上下边的Executors中的构造函数中的最大线程池数量是根本无法达到的
      • 为什么要把线程池数量和线程池状态维护在一个变量中?
        • 因为事实上是需要维护线程池内有效线程数量和线程池状态的一致性的(源码中实际上许多地方是同时判断线程池状态与线程池内有效线程的数量的),如果二者分开维护会因为维护二者的一致性而浪费锁资源;
        • 并且可以通过位运算去分别计算得到线程池状态与工作线程数量,效率也是很高的

ThreadPoolExecutor的整体架构与运行流程

image-20210419133922120

  • 线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收

Executor框架内的核心类–ThreadPoolExecutor

  • 学习ThreadPoolExecutor构造方法(针对参数最多的学习)
public ThreadPoolExecutor(int corePoolSize,

​                              int maximumPoolSize,

​                              long keepAliveTime,

​                              TimeUnit unit,

​                              BlockingQueue<Runnable> workQueue,

​                              ThreadFactory threadFactory,

​                              RejectedExecutionHandler handler) {

​        if (corePoolSize < 0 ||

​            maximumPoolSize <= 0 ||

​            maximumPoolSize < corePoolSize ||

​            keepAliveTime < 0)

​            throw new IllegalArgumentException();

​        if (workQueue == null || threadFactory == null || handler == null)

​            throw new NullPointerException();

​        this.acc = System.getSecurityManager() == null ?

​                null :

​                AccessController.getContext();

​        this.corePoolSize = corePoolSize;

​        this.maximumPoolSize = maximumPoolSize;

​        this.workQueue = workQueue;

​        this.keepAliveTime = unit.toNanos(keepAliveTime);

​        this.threadFactory = threadFactory;

​        this.handler = handler;

​    }
复制代码
  • ThreadPoolExecutor 3 个最重要的参数:

    • corePoolSize : 核心线程数定义了最小可以同时运行的线程数量,所谓最小的含义就是,这些线程创建后即便没有任务执行,是空闲的,也要维持运行,除非设置了allowCoreThreadTimeOut,如果设置为true,则在超过keepAliveTime之后,空闲的核心线程也会被回收
      • 注意这里会有一个误解需要澄清:并不是线程池启动起来后就立即维护起corePoolSize个线程,也是按需求来的,最先分配任务的corePoolSize自动成为核心线程
    • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。(注意是可以同时运行的最大的线程数量,不是当前正在运行的线程数量
    • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。拿队列当缓存,直到队列放满报错或者有线程空闲推出队列
      • 需要注意的是这个队列存储的仅仅是execute方法提交的Runnable任务,而不是其他的什么复杂的结构
  • ThreadPoolExecutor其他常见参数:

    • keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁(注意这里针对的是核心线程外的线程,核心线程也就是最小维持数量的线程会一直维持运行),而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
      • 其起作用的机制在于getTask函数中调用的阻塞队列的poll函数的超时设置
    • unit : keepAliveTime 参数的时间单位。
    • threadFactory :executor 创建新线程的时候会用到。
    • handler :饱和策略。关于饱和策略下面单独介绍一下。当提交的任务过多而不能及时处理(这里的及时处理指的是任务队列已经满了,并且线程池已经达到了允许的最大线程量,并且都在工作)时,我们可以定制策略来处理任务
  • 饱和策略(对应的是任务拒绝模块)

    • 定义:饱和就是当任务队列满了,并且线程池当前同时运行的线程数量已经达到设定的最大值时的状态,更准确地定义应该是任务拒绝策略,而不仅仅是饱和策略,因为线程池饱和的时候会执行拒绝,线程池状态不是running状态时,也要对新提交的任务执行拒绝策略

    • 任务的拒绝是通过reject函数完成的, 默认提供4个拒绝策略,当然也可以实现自己的拒绝策略

      final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
      }
      复制代码
    • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理,此policy也是使用Executors工具类创建线程池以及我们不指定饱和策略使用ThreadPoolExecutor构造函数时的默认的饱和策略

      • 如果是比较关键的业务,推荐使用此拒绝策略,这样在系统不能承载更多任务时,及时通过异常发现
    • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程(一般是主线程)中运行(run)被拒绝的任务,如果执行程序(线程池)已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能(因为Main线程去处理新提交的任务去了,就无法处理新的请求了)。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略

      • 此策略适合处理大量计算任务的任务类型
        • 多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕
      • 此策略可以做分流,请求可以分别缓存在线程池工作队列->工作线程(一般是main线程)->TCP 层->客户端 达成了一定程度的可伸缩功能
    • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。(直接丢弃掉,甚至不会抛出异常

    • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求(所谓的抛弃最早的未处理的任务请求,就是抛弃下一个待处理的任务,处于头部的任务),丢弃后再次尝试提交新的任务

    • 对于以上几种饱和策略的理解补充:

      • 不管是哪种策略,都会(除了DiscardPolicy策略)判断线程是不是shutdown状态,如果是会直接忽略新的任务
  • 任务队列(线程安全的阻塞队列)

    • 如果execute是线程池的任务调度器,是作为整个线程池的入口的话,那么任务缓冲队列可以看作是整个线程池的神经中枢,作为神经中枢,其起到了解耦任务与线程的操作,之所以有这一层缓冲,才有了所谓的调度—其实是一个生产者、消费者模式,其中生产者(一般是创建线程池的线程,main线程)投放任务,消费者(线程池中的工作线程)取任务去执行
    • 显然是一个队列模型,构造函数要求的任务队列类型是BlockingQueue<Runnable>,但是这个是一个接口类,真正可以使用的实现类有如下几种(具体的描述看自己总结的线程安全的容器这个文章):
      • LinkedBlockingQueue 无界队列,所谓的无界队列就是队列长度默认设置为Integr.MAX_VALUE,使用工具类创建线程池时,newFixedThreadPool与SingleThreadExecutor都是使用的此类型的队列
      • ArrayBlockingQueue 有界队列,一般自己使用的就是此队列,数组实现,一旦创建大小不可更改
      • DelayedWorkQueue 定时任务的线程池使用的是此队列,按照任务的下一次执行的时间的早晚进行排序
      • SynchronousQueue 直接提交,也就是不把任务存储,而知直接提交给线程,newCachedTHreadPool使用的就是这个
      • 除此之外还有多种任务队列可供使用,包括优先级队列,双端队列等等
  • 线程工厂

    • 在创建线程池的时候,还可以传入一个重要的参数就是线程工厂,默认情况下的线程池创建线程的过程都是其内部的DefaultThreadFactory,但是如果要用自定义的方式创建线程,以实现对于线程池创建的线程的监控与控制的话,就需要用到这个线程工厂的参数
execute方法–任务调度
  • 任务调度是整个线程池的入口,是整个线程池的核心所在,而这个任务调度对应的实际上就是execute方法

    • execute作为任务调度方法的大致运作流程是根据线程池的运行状态,工作线程的数量与运行策略来决定新提交的任务的三种可能的去向:
      • 直接申请线程执行
      • 缓冲到阻塞队列中
      • 使用配置好的拒绝策略,拒绝任务的执行
  • 线程池中最重要的方法一定是任务的提交执行方法,又由于submit内部实际调用了execute方法,所以直接查看ThreadPoolExecutor的execute方法

    // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
    
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    
    // 获取当前线程池内正在运行的线程数量
    private static int workerCountOf(int c) {
    
    ​    return c & CAPACITY;
    
    }
    
    
    // 任务队列
    private final BlockingQueue<Runnable> workQueue;
    
    
    
    public void execute(Runnable command) {
    
    ​    // 首先肯定是检查的任务的有效性,如果为null就要报空指针异常if (command == null)
    
    ​            throw new NullPointerException();
    
    ​        /*
    
    ​         \* Proceed in 3 steps:
    
    ​         *
    
    ​         \* 1. If fewer than corePoolSize threads are running, try to
    
    ​         \* start a new thread with the given command as its first
    
    ​         \* task.  The call to addWorker atomically checks runState and
    
    ​         \* workerCount, and so prevents false alarms that would add
    
    ​         \* threads when it shouldn't, by returning false.
    
    ​         *
    
    ​         \* 2. If a task can be successfully queued, then we still need
    
    ​         \* to double-check whether we should have added a thread
    
    ​         \* (because existing ones died since last checking) or that
    
    ​         \* the pool shut down since entry into this method. So we
    
    ​         \* recheck state and if necessary roll back the enqueuing if
    
    ​         \* stopped, or start a new thread if there are none.
    
    ​         *
    
    ​         \* 3. If we cannot queue task, then we try to add a new
    
    ​         \* thread.  If it fails, we know we are shut down or saturated
    
    ​         \* and so reject the task.
    
    ​         */// 检查完提交的任务的有效性之后就要执行如上英文注释的三个步骤的处理了// 获得线程池的状态与当前运行的线程的数量的记录(ThreadPoolExecutor类中定义了五种线程池状态)
      			 // ctl更像是一个线程池的运行时上下文的状态维护变量int c = ctl.get();
    
    ​        // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。if (workerCountOf(c) < corePoolSize) {
    
    ​            if (addWorker(command, true))
    
    ​                return;
    						// addWorker失败,重新获得线程池状态,以进行下一步判断
    ​            c = ctl.get();
    
    ​        }
    
    ​        // 2.如果当前之行的线程数量大于等于 corePoolSize 或者addWorker失败(失败的原因可能是有效线程的数量已经大于corePoolSize,所以需要缓存任务)后就会走到这里// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会将任务加入到任务队列中,并且判断是否能加入到任务队列中if (isRunning(c) && workQueue.offer(command)) {
    
    ​            // 成功将任务添加到任务队列中// 再次检查线程池中的线程状态,并再次检查线程池中是否有可用的线程,因为自从上一次检查后,可能有线程已经完成了工作或者线程池已经shutdown了int recheck = ctl.get();
    
    ​            // 如果线程池状态不是running状态,就要从任务队列中移除任务,相当于一次回滚,并执行构造函数中参数指定的饱和策略if (! isRunning(recheck) && remove(command))
    								// 执行回滚
    ​                reject(command);
    
    ​            // 如果当前线程池是running状态并且工作线程为0(之前运行的工作线程被回收了,corePoolSize也有可能被回收)就新创建一个线程,其中worker的初始任务为null
    						// else if (workerCountOf(recheck) == 0)
    
    ​                addWorker(null, false);
    
    ​        }
    
    ​        // 3. 任务队列已经满了,或者线程池已经不是running状态了 因此做最后的尝试,在创建一个新的线程试试,如果创建失败,表示线程池已经满了,因此执行饱和策略else if (!addWorker(command, false))
    
    ​            reject(command);
    
    }
    复制代码
    • 事实上,源代码的注释上就已经说的很清楚了。

      • 文字说明
        • 首先要保证的是,线程池状态必须是RUNNING状态才会接受新的任务,如果不是RUNNING状态,会直接执行拒绝策略reject函数
        • 如果当前线程池中的有效线程池数量小于corePoolSize说明还可以无脑添加新的线程并使其执行新的任务(这也说明corePoolSize数量的线程也是懒创建的,不是默认就自动维护这么多数量的线程
        • 如果大于的话,就要尝试将新的任务缓存到任务队列中,如果任务队列没有满就加到任务队列,如果满了,没办法只能继续扩张线程数量了,此时判断当前的有效线程数量是否小于maximumPoolSize如果小于,则创建新的线程并用来执行新的任务;如果有效线程数量已经大于maximumPoolSize,只能去执行拒绝策略了
      • 不要忽略源码中的double-check机制,在成功将任务加到缓存队列中后,要double-check线程池的状态是否还是RUNNING状态,如果不是及时回滚提交的任务,执行拒绝策略,除此之外还要检查工作线程是否已经全部被回收,如果全部被回收,需要创建一个空的Worker以备用,不能让线程池为空(如果线程池一直为空的话,下一个任务相当于纯创建一个线程,无法发挥线程池的性能优势了)
线程管理与任务的获取
  • 线程池为了方便的掌握线程的状态与维护线程的周期,设计了工作线程对象WorkerWorker起作用的最关键的就是实现了Runnable的接口(使得Worker可以作为线程任务被执行,相当于将提交的任务做了包装)和继承了AQS类(控制线程的中断,维护线程的生命周期)

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {
      final Thread thread;
      Runnable firstTask;
      // ...
    }
    复制代码
    • Worker中最重要的两个成员变量

      • thread,在Worker的构造函数中被创建,使用的是ThreadPoolExecutor创建时传入的threadFactory去执行线程的构建

      • firstTask,firstTask用来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况,执行完firstTask后再去队列中取后续的任务如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建

        • 上边的描述针对的是一般的线程池,对于定时任务线程池来说,添加核心线程时,firstTask也是为null

        image-20210423201541482

  • addWorker函数(任务调度时创建核心线程执行任务或者创建非核心线程)

    // 全局锁,并发操作必备
    private final ReentrantLock mainLock = new ReentrantLock();
    // 跟踪线程池的最大大小,应该只有在持有全局锁mainLock的前提下才访问此属性
    private int largestPoolSize;
    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<>();
    //获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //判断线程池的状态是否为 Running
    private static boolean isRunning(int c) {
      return c < SHUTDOWN;
    }
    
    // 返回true表示创建并启动线程成功
    // firstaTask就是这个线程的初始任务
    // 第二个参数为true表示新的worker也就是工作线程是尝试加到corePool中还是maximumPool中
    
    private boolean addWorker(Runnable firstTask, boolean core) {
            // retry标志位常用于多循环嵌套的流程控制
            retry:
            for (;;) {
              	// 获取线程池状态
                int c = ctl.get();
                int rs = runStateOf(c);
    
                
                // 如果状态>= SHUTDOWN 表示线程池是正在关闭(SHUTDOWN)或者已经关闭(>SHUTDOWN)状态的处置方法:
                // 如果此时是shutdown状态,并且没有分配初始任务,并且任务队列不为空,则是允许创建新的worker的(此时新创建的worker用来在SHUTDOWN状态下,执行任务队列中剩余的任务),违反任一则是不允许的,比如线程池已经关闭(>SHUTDOWN)或者是SHUTDOWN状态,但是附加了自己的初始任务,是不允许的,只能执行队列中剩余的任务,或者队列已经为空了,不再需要新的worker了,也会创建失败
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                 		// 获取线程池中的线程数量
                    int wc = workerCountOf(c);
                    // 判断当前线程池的数量与哪个值比较,如果已经达到最终的最大值CAPACITY,立即返回false
                    // 否则根据参数判断与哪个值比较,是最小值还是最大值比较,如果目标是创建核心线程,就和corePoolSize比较,如果已经达到设计大小了,就创建失败,如果目标是创建非核心线程,就和maximumPoolSize比较
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 上一步判断后还可以添加worker,就使用CAS增加worker计数
                    if (compareAndIncrementWorkerCount(c))
                        // 跳出整个循环
                        break retry;
                    // CAS 失败
                    c = ctl.get();
                    // 如果线程池状态发生了改变,
                    if (runStateOf(c) != rs)
                        // 从头开始执行整个外部的for循环,重新根据线程池状态进行判决
                        continue retry;
                    // 省略的else表示的就是CAS失败的原因是线程数量被同步修改了,只需要重新执行内部的for循环,根据线程数量进行判决即可
                }
            }
      
      
      		  // 线程数量成功更新,
    				// 初始化工作线程启动成功标志
            boolean workerStarted = false;
      			// 初始化工作线程创建成功标志
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 创建worker实例
                w = new Worker(firstTask);
                // 获得worker持有的线程实例
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                   // 加锁
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        // 获取线程池状态
                        int rs = runStateOf(ctl.get());
    										// < shutdown也就是running状态下执行操作  
                        // 或者是在SHUTDOWN状态下,并且firstTask为空时执行下述操作
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 判断线程是否已经启动了,若已经启动了,抛出异常
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                          	// 将新创建的worker实例添加到worker集合中,是一个HashSet集合
                            workers.add(w);
                           // 更新largestPoolSize
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                           // 设置worker添加成功标志位
                            workerAdded = true;
                        }
                    } finally {
                        // 释放锁
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        // 启动worker
                        t.start();
                        // 设置worker启动成功标志位
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                   // worker启动失败,要执行回滚
                   // 从工作线程集合中移除新添加的Worker实例
                   // 线程池状态中线程池数量-1
                   // tryTerminate
                  addWorkerFailed(w);
            }
            // 返回worker是否启动的状态
            return workerStarted;
      }
    复制代码
    • addWorker源码中,也能看见线程池状态与线程池数量共同决定流程走向的场景,这就是为什么要把这两个状态维护在一个变量中的原因
    • 线程池通过维护workers这个集合来维护线程不被回收,当需要回收时,只需要将其引用消除,也就是将Worker对象消除即可,jvm会完成后续的回收(详见线程回收小节)
    • 流程图

    image-20210426092911659

  • runWorker函数,worker开始执行任务

    /*
    * 如何addWorker中启动线程的语句t.start()转到了runWorker方法呢?
    */
    
    // Worker的线程实例是在Worker构造函数中完成初始化的,注意,传入newThread的是this,也就是Worker实例本身被当做一个Runnable任务提交到了线程中,所以调用线程实例的start方法时,就会执行Runnable任务也就是Worker实例的run方法
    
    Worker(Runnable firstTask) {
      setState(-1); // 将AQS计数设置为-1,目的是为了在worker初始化导致runWorker被执行的期间内不被中断
      this.firstTask = firstTask;
      // thread成员变量由默认的或者指定的线程工厂创建,传入的Runnable参数是Worker实例本身
      this.thread = getThreadFactory().newThread(this);
    }
    
    // 新线程启动后执行此方法
    public void run() {
      runWorker(this);
    }
    
    
    
    // addWorker: 
    // 创建worker实例
    w = new Worker(firstTask);
    final Thread t = w.thread;
    
    // 开启新线程后执行Runnbale参数的run方法,也就是Worker实例的run方法
    t.start()
      
    /*
    *  上述代码是addWorker中的启动线程的代码
    *  下边的代码是runWorker中的代码
    */
    
      
    // 实际在新线程中执行的方法
    final void runWorker(Worker w) {
          Thread wt = Thread.currentThread();
          // 首先保存初始任务。再清空初始任务
          Runnable task = w.firstTask;
          w.firstTask = null;
          w.unlock(); // worker 允许中断,此时worker的状态是空闲状态,可以被回收(中断)
          
          // 初始化线程异常退出标志位
          boolean completedAbruptly = true;
          try {
            // 执行初始化任务,或者while循环不断的阻塞以从任务队列获得新的任务,除非getTask返回null,表示已经无法获得任务,需要执行线程回收
            while (task != null || (task = getTask()) != null) {
              // 成功的获取了任务
              // 开始执行任务,不允许中断,此时线程是非空闲状态
              w.lock();
              
              // 执行recheck 如果线程池已经关闭了,并且当前线程还没有中断,就要执行对当前线程的中断,否则要保证当前线程不是中断状态
              if ((runStateAtLeast(ctl.get(), STOP) ||
                   (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                  !wt.isInterrupted())
                wt.interrupt();
              try {
                // 钩子函数,默认的钩子函数的函数体为空,可以去构造ThreadPoolExecutor的子类去复写此钩子函数
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                  // 执行任务
                  task.run();
                } catch (RuntimeException x) {
                  thrown = x; throw x;
                } catch (Error x) {
                  thrown = x; throw x;
                } catch (Throwable x) {
                  thrown = x; throw new Error(x);
                } finally {
                  // 钩子函数与beforeExecute同理
                  afterExecute(task, thrown);
                }
              } finally {
                task = null;
                // worker的完成的任务数量加1,注意此时是线程安全的
                w.completedTasks++;
                // 释放锁
                w.unlock();
              }
            }
            // 线程不是因为异常退出的,而是因为无法获得任务导致退出的
            completedAbruptly = false;
          } finally {
            // while循环已无法通过getTask获得新的任务了,具体的原因参考后续的getTask方法
            // 执行线程回收
            // 如果是因为task执行时出现异常,completedAbruptly为true,否则为false
            processWorkerExit(w, completedAbruptly);
          }
        }
    复制代码
    • 如何addWorker中启动线程的语句t.start()转到了runWorker方法呢?这一块比较绕,可以直接看代码的注释
      • 这里就体现了Worker类实现Runnable接口的作用,就是将自己包装为可执行任务
    • 总的来说,Worker启动后的命运就是孜孜不倦的执行任务与启动任务,这就是打工人?
    • 导致出现线程回收的原因有两个
      • 任务执行过程中出现异常
      • 无法从队列中获取新的任务,具体原因参考geTask方法
  • getTask方法,也就是从队列中获取任务的方法也是很重要的,主要功能是核心线程获取任务或保持阻塞,非核心线程获取任务,或超时返回null,进而线程生命周期结束

    private Runnable getTask() {
      			// 获取任务超时的标志位
            boolean timedOut = false;
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
               
                //线程池状态是STOP之后的状态,表示已经不处理任务了,或者是SHUTDOWN时,任务队列已经为空,想处理也没的处理了,就直接返回null,worker会被直接回收
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    // 工作线程数量-1
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
              //是否超时控制,allowCoreThreadTimeOut默认false,代表不对核心线程做超时限制,对于超出核心线程的线程需要控制超时
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
              
              
    					//当线程数大于最大线程数,即线程池已经满了,或者需要做超时控制且上次获取任务就已经超时这两个任一的条件下
              //且线程数大于1或者队列为空,尝试将线程数减一并返回null
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    // 失败重试,重新根据线程池状态与线程池中线程数量做判断
                    continue;
                }
    
                try {
                  //当需要超时控制时,在keepAliveTime时间内没有获取到任务的话会设置超时标志位,如果没有超时限制,则调用take获取任务,此时线程是阻塞等待获取任务的
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    // 阻塞等待获取任务时,整个worker并没有加锁,也就是被认为是空闲状态,可能会被回收掉
                    timedOut = false;
                }
            }
        }
    复制代码
    • 这里需要补充的就是任务队列的poll与take方法虽然名称差异比较大,但是唯一的差异在于前者是加了超时时间,后者是阻塞

      image-20210422141443980

    • getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收

      • 比如在下边这段代码中

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          if (compareAndDecrementWorkerCount(c))
            return null;
          // 失败重试,重新根据线程池状态与线程池中线程数量做判断
          continue;
        }
        复制代码
        • 当前线程池内的线程数量超过最大值会进行线程回收
        • 存在超时设置,并且上一次获取任务已经超时时,如果任务队列还有不少任务,线程数量又恰好只有一个,是不会对当前这个独苗进行回收的,而是再试试
    • decrementWorkerCountcompareAndDecrementWorkerCount二者的区别是什么

      • decrementWorkerCount内部是在循环调用compareAndDecrementWorkerCount,换句话说就是,必须要尝试将工作线程数量-1,因为确实不需要此线程了,而compareAndDecrementWorkerCount直接拿来用,只是尝试一次将工作线程-1,如果失败的话,就要重新根据状态做出可能与之前不同的判断
  • 线程回收,processWorkerExit

    实际上,关于线程回收,是有两种场景的:1. 主动的线程回收,比如processWorkerExit函数这样的 2. 探查式的回收,或者说是被动的回收,比如interruptIdleWorkers

    主动回收:在runWorker函数中,如果无法再获得任务,就会跳出执行此线程回收函数,实际上线程池中线程的回收依赖的是JVM的自动回收,线程池要做的只是把线程的引用消除而已

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            // 一个标志位,是否是因为发生线程异常,所以进入的此方法
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                // 工作线程数-1
                decrementWorkerCount();
            final ReentrantLock mainLock = this.mainLock;
            // 加锁,因为要进行审计计数了
            mainLock.lock();
            try {
                // 统计此worker的完成的任务数目
                completedTaskCount += w.completedTasks;
             		// 从线程池中移除此线程
                // 执行remove方法完毕后,实际上已经完成了线程的回收,但是由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程-----即所谓的线程状态自适应的过程
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    				
      			// 尝试中断、回收空闲线程
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    // 线程池状态是RUNNING或SHUTDOWN状态并且并非因为异常导致线程关闭的情况下
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    // 如果线程够用,就直接返回,否则还要添加一个worker到线程池
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
              // 如果因为线程异常导致的线程关闭的话,还需要再向线程池中补充一个worker
              // 或者是此时线程数量不能满足最小要求时也要再添加一个worker
                addWorker(null, false);
            }
        }
    复制代码

    被动回收:上述代码中提到的tryTerminate方法,也就是在某worker结束生命周期后判断线程池是否要关闭以及回收空闲线程,以便有效的管理线程池的生命周期,在所有可能导致线程池终止的地方都调用了此方法

    final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                //当线程池状态是RUNNING(状态正常)或者已经TIDYING或者已经TERMINATED(线程已经快关闭了)或者SHUTDOWN且还有任务没有被执行(SHUTDOWN状态需要处理完队列中的任务),直接返回
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
              // 当前线程池状态是STOP状态或是SHUTDOWN状态但任务列表为空时,如果线程数量不为0,需要最多终止1个空闲的线程,上边所述的stop状态或者shutdown状态并且queue为空统称为终止流程开始的状态
              // 如果线程数不为0,则中断一个阻塞等待任务的空闲的工作线程
                if (workerCountOf(c) != 0) { 
                    // 尝试中断最多一个阻塞等待任务的空闲的工作线程
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    						// 如果当前工作线程数量为0就准备关闭线程池
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                  // 尝试设置线程池状态为tidying状态
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            // 如果设置成功调用此钩子方法
                            terminated();
                        } finally {
                            // 钩子方法执行完毕后,设置状态为TERMINATED,并设置线程数量为0
                            ctl.set(ctlOf(TERMINATED, 0));
                            // 通知调用awaitTermination的主线程,已经进入了TERMINATION状态
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // CAS失败的话,就重新根据状态进行判断
            }
        }
    复制代码
    • 调用了tryTerminate方法的地方有

      • addWorkerFailed
      • processWorkerExit
      • shutdown
      • shutdownNow
      • remove从队列中移除某任务
      • purge从队列中移除所有被取消的任务
    • 在被动回收过程中,最重要的就是能了解线程的当前状态,在主动回收中尚且可以知道线程是需要回收的,但是被动回收时实际上并不清楚线程池中线程的状态,Worker通过继承AQS,使用AQS来实现不可重入的独占锁(使用AQS的独占模式)这个功能

      • 没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态
      • lock方法一旦获取了独占锁,表示当前线程正在执行任务中,如果正在执行任务,则不应该中断线程
      • 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断

      image-20210427175959020

    • interruptIdleWorkers,中断空闲线程,使其不再阻塞等待任务

      private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          for (Worker w : workers) {
            Thread t = w.thread;
            // 判断线程是否已经被中断,是的话就什么都不做
            // 若未被中断,还要尝试获取worker的锁,此时如果worker如果已经通过lock方法获取了锁,则因为其不可重入的特性,导致此处为false,即对该worker不做任务处理
            // 使用tryLock方法来判断线程池中的线程是否是空闲状态
            if (!t.isInterrupted() && w.tryLock()) {
              try {
                // 执行线程中断
                t.interrupt();
              } catch (SecurityException ignore) {
              } finally {
                // worker释放锁
                w.unlock();
              }
            }
            // 如果未true,最多只会中断一个空闲线程,也可能一个线程也没有中断
            // 如果为false,则会持续遍历全部的worker,并尝试中断所有的空闲的线程
            if (onlyOne)
              break;
          }
        } finally {
          mainLock.unlock();
        }
      }
      
      
      // shutdownNow函数中调用的中断所有工作线程的方法
      private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          for (Worker w : workers)
            // 粗暴的中断所有线程
            w.interruptIfStarted();
        } finally {
          mainLock.unlock();
        }
      }
      
      // 定义在worker类中,粗暴的打断所有的已经执行过runWorker方法的worker
      void interruptIfStarted() {
        Thread t;
        // getState() >= 0即state != -1,也就是不是刚初始化的Worker,而是已经运行runWorker的Worker
        // 直接在线程层面执行中断,而不管worker此时是否是正在运行的状态(不用去获取worker的锁)
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
          try {
            t.interrupt();
          } catch (SecurityException ignore) {
          }
        }
      }
      
      // shutdown中调用了中断所有空闲线程的方法
      private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
      }
      复制代码
    • 只有在线程池终止流程开始状态下(线程池状态准备转入TIDYING状态,但是还有空闲线程的时候),传入的参数为true,其余调用都是false,也就是中断所有的空闲线程

      • 为什么仅在tryTerminate方法中,传入的参数为true,也就是最多中断一个空闲的线程呢?(解释的不是很清除,自己不是很懂…….)

        • 当前线程池状态是STOP状态或是SHUTDOWN状态但任务列表为空时,如果线程数量还不为0,这说明,有可能是剩余的所有线程都是阻塞,而不能传递shutdown的指令,在线程池终止流程开始的状态下,必须最多使一个阻塞在等待获取任务的线程中断,才能传播shutdown信号,以免所有的线程陷入等待而无法关闭线程池

        • 中断一个空闲线程,也能保证在线程池已经是SHUTDOWN状态后,新来的Worker也能最终退出

        • 综上,为了保证线程池未来最终能够终止,总是仅中断一个空闲的工作程序就足够了,但是shutdown会中断所有空闲的工作程序,以便多余的工作程序迅速退出

        • 参考interruptIdleWorkers的注释

          Interrupts threads that might be waiting for tasks (as indicated by not being locked) so they can check for termination or configuration changes. Ignores SecurityExceptions (in which case some threads may remain uninterrupted).
          Params:
          onlyOne – If true, interrupt at most one worker. This is called only from tryTerminate when termination is otherwise enabled but there are still other workers. In this case, at most one waiting worker is interrupted to propagate shutdown signals in case all threads are currently waiting. Interrupting any arbitrary thread ensures that newly arriving workers since shutdown began will also eventually exit. To guarantee eventual termination, it suffices to always interrupt only one idle worker, but shutdown() interrupts all idle workers so that redundant workers exit promptly, not waiting for a straggler task to finish.

  • 从AQS的角度理解Worker的生命周期

    Worker使用的是AQS的独占模式,使用独占的特性来判断Worker本身是空闲状态(未上锁)还是工作状态(上锁)

    //1. worker初始化
    Worker(Runnable firstTask) {
      setState(-1); // 设置AQS计数标志为-1,其目的是为了防止初始化到runWorker执行这段时间内被中断
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
    }
    
    // 2. runWorker函数
    final void runWorker(Worker w) {
      Thread wt = Thread.currentThread();
      Runnable task = w.firstTask;
      w.firstTask = null;
      // 至此worker是被以被中断的,也就是进入了空闲状态
      w.unlock();
      // ...
      w.lock();
    }
    
    // worker释放锁
    public void unlock()      { release(1); }
    
    // 独占模式下释放资源
    public final boolean release(int arg) {
      if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
          unparkSuccessor(h);
        return true;
      }
      return false;
    }
    
    protected boolean tryRelease(int unused) {
      // 设置独占的线程为null
      setExclusiveOwnerThread(null);
      // 设置状态为0
      setState(0);
      return true;
    }
    
    // worker上锁
    public void lock()        { acquire(1); }
    
    public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }
    
    // worker的实现,其实根本没有用到参数--1 因为规定就是状态1为上锁的状态,所以直接用的常量1
    protected boolean tryAcquire(int unused) {
      // 尝试获得worker的锁,必须保证锁状态的旧状态是0,才能设置状态为1
      if (compareAndSetState(0, 1)) {
        // 设置当前线程为独占线程
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }
    
    // interruptIdleWorkers函数执行时尝试中断空闲的线程,会通过尝试获取锁的方法来判断线程的状态
    // 在tryAcquire方法中尝试设置状态为1,但是状态的当前值应是0(即执行unlock()之后),才能设置成功
    // 这一点也保证了,在Worker初始化设置状态为-1到runWorker的状态设置为0时,是能够保证不被中断的
    public boolean tryLock()  { return tryAcquire(1); }
    复制代码

线程池数量大小的确定

线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。

很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。

上下文切换:

多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换

上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。

Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。

类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。

如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

有一个简单并且适用面比较广的公式:

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

ForkJoin线程池

  • 此线程池用来把大量数据的计算进行拆分(比如一个超大数组的求和),分配给线程池中的多个线程并行去执行,有并行计算那味了

  • Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。还有stream的许多操作底层都用了ForkJoin线程池

参考

  1. Java线程池实现原理及其在美团业务中的实践–美团技术团队
  2. Java线程池学习总结
  3. Java线程池
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享