JUC并发编程(8):ThreadPool线程池

ThreadPool线程池

参考:

0、引言

用jdbc操作过数据库应该知道,操作数据库需要和数据库建立连接,拿到连接之后才能操作数据库,用完之后销毁。数据库连接的创建和销毁其实是比较耗时的,真正和业务相关的操作耗时是比较短的。每个数据库操作之前都需要创建连接,为了提升系统性能,后来出现了数据库连接池,系统启动的时候,先创建很多连接放在池子里面,使用的时候,直接从连接池中获取一个,使用完毕之后返回到池子里面,继续给其他需要者使用,这其中就省去创建连接的时间,从而提升了系统整体的性能。

线程池和数据库连接池的原理也差不多,创建线程去处理业务,可能创建线程的时间比处理业务的时间还长一些,如果系统能够提前为我们创建好线程,我们需要的时候直接拿来使用,用完之后不是直接将其关闭,而是将其返回到线程池中,给其他需要这使用,这样直接节省了创建和销毁的时间,提升了系统的性能。

简单的说,在使用了线程池之后,创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池。

1、线程池概述

线程池是一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。简单来说,线程都放在一个池里,每次使用都从池里获取,用完后归还到池中。

线程池做的工作是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超过数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

一般线程池主要分为以下4个组成部分:

  • 线程池管理器:用于创建并管理线程池;
  • 工作线程:线程池中的线程;
  • 任务接口:每个任务必须实现的接口,用于工作线程调度其运行;
  • 任务队列:用于存放待处理的任务,提供一种缓冲机制。

它的主要特点

  1. 提⾼线程的利⽤率(线程复用)
  2. 提⾼响应速度
  3. 便于统⼀管理线程对象
  4. 可控制最⼤并发数

看cpu的核数

System.out.println(Runtime.getRuntime().availableProcessors());
复制代码

2、线程池架构

Java中的线程池是通过Executor框架实现的,该框架中用到了ExecutorExecutorsExecutorServiceThreadPoolExecutor这几个类

image.png

3、线程池使用方式

3.1、一池N线程

主要实现方法:

Executors.newFixedThreadPool(int nThreads)
复制代码

源码

public static ExecutorService newFixedThreadPool (int nThreads) {
    return new ThreadPoolExecutor (nThreads,nThreads,
                                   keepAliveTime: 0L,TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()) ;
}
复制代码

主要特点如下:

  • 创建一个定长线程池,并且可控制线程的最大并发数,超出的线程会在队列中等待
  • newFixedThreadPool方法创建的线程池corePoolsizeMaxmumPoolsize是相等的,它底层使用的是LinkedBlockingQueue阻塞队列
  • 线程池中的线程处于一定的量,可以很好的控制线程的并发量
  • 线程可以重复被使用,在显示关闭之前,都将一直存在
  • 超出一定量的线程被提交时候需在队列中等待
  • tip:定长线程池的大小最好根据系统资源进行设置,如Runtime.getRuntime().availableProcessors();

3.2、一池一线程

主要实现方法:

Executors.newSingleThreadExecutor()
复制代码

源码

@NotNull public static ExecutorService newSingleThreadExecutor (){
    return new Finali zableDelegatedExecutorService
        (new ThreadPoolExecutor(corePoolSize: 1,maximumPoolSize: 1,
                                 keepAliveTime: OL,
                                 TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>())) ;
}
复制代码

主要特点如下:

  • 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,之后提交的线程活动将会排在队列中以此 执,保证所有任务都按照指定顺序执行
  • newSingleThreadExecutorcorePoolsizeMaxmumPoolsize都设置为1,它使用的是LinkedBlockingQueue阻塞队列

3.3、可扩容线程池(线程池根据需求创建线程)

主要实现方法:

Executors.newCachedThreadPool() 
复制代码

源码

@NotNull public static ExecutorService newCachedThreadPool () {
    return new ThreadPoolExecutor (corePoolSize: 0,Integer.MAX_VALUE,
                                   keepAliveTime: 60L, TimeUnit.SECONDS,
                                   new SynchronousQueue<Runnable>()) ;
}
复制代码

主要特点如下:

  • 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程。若无可回收,则创建新线程
  • newCachedThreadPoolcorePoolsize设置为0MaxmumPoolsize设置为Integer.MAX_VALUE,它使用的是synchronousQueue
  • 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
  • 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
  • 当线程池中,没有可用线程,会重新创建一个线程
  • 也就是说来了任务就创建线程运行,如果线程空闲超过60秒,就销毁线程
  • 遇强则强

3.4、代码演示

public class ExecutorDemo {
    public static void main(String[] args) {
        
        //一池N线程
        ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
        //一池一线程
        ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
        //一池可扩容线程
        ExecutorService threadPool3 = Executors.newCachedThreadPool();
        
        //模拟20个用户来办理业务,每个用户就是一个来自外部的请求线程
        try{
            for(int i=1;i<=20;i++){
                threadPool3.execute(()->{ //线程池执行
                    System.out.println(Thread.currentThread().getName()+"  办理业务");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool3.shutdown();//将线程放回线程池,关闭
        }
    }
}
复制代码

4、线程池的七大参数

上述三种线程池的创建方式中,从源码可以看到内部均使用如下代码创建线程池 ,即jdk中提供了线程池的具体实现,实现类是:java.util.concurrent.ThreadPoolExecutor,主要构造方法:

public ThreadPoolExecutor(int corePoolSize,       //核心线程数量
                          int maximumPoolSize,    //最大线程数量
                          long keepAliveTime,     //非核心线程空闲时的存活时间
                          TimeUnit unit,          //存活时间单位
                          BlockingQueue<Runnable> workQueue,   //存放提交但未执行任务的队列
                          ThreadFactory threadFactory,         //线程工厂
                          RejectedExecutionHandler handler)    // 等待队列满后的拒绝策略

复制代码

每个参数的具体含义如下:

image.png

  • corePoolSize:线程池中的常驻核心线程数
    • 在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程【今日的当值窗口[2个]】
    • 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中.
  • maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值大于等于1—5个[2[黑色正方形]+3(灰色正方形)]
  • keepAliveTime:多余的空闲线程存活时间,当空间时间达到keepAliveTime值时,多余的线程会被销毁直到只剩下corePoolSize个线程为止(非核心线程)。
    • 即如果没有任务处理了,有些线程会空闲,空闲的时间超过了这个值,会被回收掉。如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率。
    • 假设这样一种场景,候客区三个已经满了,这个时候又来了3个,那么其他三个窗口就会打开办理业务,这个时候原来在候客区的用户办理业务,新来的三个在候客区等待。随着时间的推移,可能没人来了,候客区的用户也都办理完了,这个时候非当值窗口空闲,这个参数就是空闲多久后关闭非当值窗口
  • unit:keepAliveTime的单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是一个枚举java.util.concurrent.TimeUnit,这个枚举也经常使用,有兴趣的可以看一下其源码。
  • workQueue:任务队列,阻塞队列,被提交但尚未被执行的任务(候客区),用于缓存待处理任务的阻塞队列,常见的有4种,本文后面有介绍。
  • threadFactory:表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可
  • handler:拒绝策略,表示当线程队列满了并且工作线程大于等于线程池的最大显示 数(maxnumPoolSize)时如何来拒绝
    • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行。
    • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
    • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
    • DiscardPolicy:该策略直接默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

5、线程池4种拒绝策略

  • new ThreadPoolExecutor.AbortPolicy():银行满了,还有人进来,不处理这个,抛出异常
  • new ThreadPoolExecutor.CallerRunsPolicy():哪来的去哪里!
  • new ThreadPoolExecutor.DiscardPolicy():队列满了,丢掉任务,不会抛出异常
  • new ThreadPoolExecutor.DiscardOldestPolicy():队列满了,尝试去和最早的竞争,也不会抛出异常

6、线程池的底层工作原理

image.png

总结调用线程池的execute方法处理任务,执行execute方法的过程:

  • 在创建了线程池后,线程池中的线程数为零
  • 当调用 execute() 方法添加一个请求任务时,线程池做出如下判断:
    • 如果正在运行的线程数量小于 corePoolSize ,那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于 corePoolSize ,那么将这个任务放入阻塞队列;
    • 如果队列满了且正在运行的线程数量还小于 maximumPoolSize ,那么还是要创建非核心线程立即优先运行这个任务;比如上图中没有蓝色包围的三个线程
    • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
  • 当一个线程完成任务时,它会从队列中取下一个任务来执行
  • 当一个线程无事可做超过一定时间(keepAliveTime)时,线程会判断:
    • 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
    • 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

image.png

当向线程池提交一个任务之后,线程池的处理流程如下:

  • 判断是否达到核心线程数,若未达到,则直接创建新的线程处理当前传入的任务,否则进入下个流程
  • 线程池中的工作队列是否已满,若未满,则将任务丢入工作队列中先存着等待处理,否则进入下个流程
  • 是否达到最大线程数,若未达到,则创建新的线程处理当前传入的任务,否则交给线程池中的饱和策略进行处理。

7、自定义线程池

不允许使用上面所有的 Executors 的方式创建线程池,因为阿里java开发手册规定:

image.png

如下代码

线程池核心线程数量为2,最大为5,阻塞队列为3,多运行几次,可以看到拒绝策略AbortPolicy的效果。

线程池的使用步骤:

  1. 调用构造方法创建线程池
  2. 调用线程池的excute方法处理任务
  3. 关闭线程池
public class ThreadPoolDemo1 {
    public static void main(String[] args) {
        ExecutorService threadService = new ThreadPoolExecutor(2,
                5,
                2L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),//阻塞队列
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());//拒绝策略

        try{
            for(int i=1;i<=10;i++){
                threadService.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"办理业务");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadService.shutdown();
        }
    }
}
复制代码

结果:由于拒绝策略的原因,抛出异常

pool-1-thread-1办理业务
pool-1-thread-3办理业务
pool-1-thread-2办理业务
pool-1-thread-5办理业务
pool-1-thread-3办理业务
pool-1-thread-1办理业务
pool-1-thread-4办理业务
pool-1-thread-2办理业务
java.util.concurrent.RejectedExecutionException: Task IO_File.ReentrantReadWriteLockDemo$$Lambda$1/2093631819@378bf509 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at IO_File.ReentrantReadWriteLockDemo.main(ReentrantReadWriteLockDemo.java:23)
复制代码

演示2:给线程池中线程起一个有意义的名字

给线程池中线程起一个有意义的名字,在系统出现问题的时候,通过线程堆栈信息可以更容易发现系统中问题所在。

自定义创建工厂需要实现java.util.concurrent.ThreadFactory接口中的Thread newThread(Runnable r)方法,参数为传入的任务,需要返回一个工作线程。

示例代码:

public class CustomizeThreadFactory {

    static AtomicInteger threadNum = new AtomicInteger(1);

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
                60L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(10), new ThreadFactory() {
                            @Override
                            public Thread newThread(Runnable r) {
                                Thread thread = new Thread(r);
                                thread.setName("自定义线程-" + threadNum.getAndIncrement());
                                return thread;
                            }
        });

        for (int i = 0; i < 5; i++) {
            String taskName = "任务-" + i;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "处理" + taskName);
            });
        }
        executor.shutdown();
    }
}
复制代码

运行上面的代码,输出结果:

自定义线程-2处理任务-1
自定义线程-5处理任务-4
自定义线程-4处理任务-3
自定义线程-1处理任务-0
自定义线程-3处理任务-2
复制代码

代码中在任务中输出了当前线程的名称,可以看到是我们自定义的名称。

通过jstack查看线程的堆栈信息,也可以看到我们自定义的名称,我们可以将代码中executor.shutdown();先给注释掉让程序先不退出,然后通过jstack查看,如下:

"自定义线程-5" #15 prio=5 os_prio=0 tid=0x00000000194fb000 nid=0x4ff0 waiting on condition [0x000000001a26f000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"自定义线程-4" #14 prio=5 os_prio=0 tid=0x00000000194fa800 nid=0x3450 waiting on condition [0x000000001a16f000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"自定义线程-3" #13 prio=5 os_prio=0 tid=0x00000000194f5800 nid=0x4808 waiting on condition [0x000000001a06f000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"自定义线程-2" #12 prio=5 os_prio=0 tid=0x00000000194f5000 nid=0x1c10 waiting on condition [0x0000000019f6e000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

"自定义线程-1" #11 prio=5 os_prio=0 tid=0x00000000194f4000 nid=0x20bc waiting on condition [0x0000000019e6e000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000d6090520> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

复制代码

8、线程池中常见5种工作队列

任务太多的时候,工作队列用于暂时缓存待处理的任务,jdk中常见的5种阻塞队列:

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序。
  • LinkedBlockingQueue:是一个基于链表结构的阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。
  • SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用这个队列。
  • PriorityBlockingQueue:优先级队列,进入队列的元素按照优先级会进行排序。

9、面试题

线程池用过吗?生产上你是如何设置合理参数

线程池的拒绝策略请你谈谈

  • 等待队列也已经排满了,再也塞不下新的任务了。同时,线程池的maximumPoolSize也到达了极限,无法继续为新任务服务,这时我们需要拒绝策略机制合理的处理这个问题
  • JDK内置的拒绝策略
    • AbortPolicy(默认):直接抛出RejectedException异常阻止系统正常运行
    • CallerRunsPolicy:调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是返回给调用者进行处理
    • DiscardOldestPolicy:将最早进入队列的任务删除,之后再尝试加入队列
    • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常.如果允许任务丢失,这是最好的拒绝策略
  • 以上内置策略均实现了RejectExecutionHandler接口

你在工作中单一的/固定数的/可变你的三种创建线程池的方法,你用哪个多?超级大坑

  • ①答案是一个都不用,我们生产上只能使用自定义的
  • ②Executors中JDK给你提供了为什么不用?
  • 参考阿里巴巴java开发手册

你在工作中是如何创建线程池的,是否自定义过线程池使用

①AbortPolicy:最大不会抛出异常的值= maximumPoolSize + new LinkedBlockingDeque<Runnable>(3) =8个。如果超过8个,默认的拒绝策略会抛出异常

②CallerRunPolicy: 如果超过8个,不会抛出异常,会返回给调用者去

③DiscardOldestPolicy:如果超过8个,将最早进入队列的任务删除,之后再尝试加入队列

④DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的拒绝策略

public class MyThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
            2,
            5, //Runtime.getRuntime().availableProcessors(),
            1L,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<Runnable>(3),
            Executors.defaultThreadFactory(),
            //默认抛出异常
            //new ThreadPoolExecutor.AbortPolicy()
            //回退调用者
            //new ThreadPoolExecutor.CallerRunsPolicy()
            //处理不来的不处理,丢弃时间最长的
            //new ThreadPoolExecutor.DiscardOldestPolicy()
            //直接丢弃任务,不予任何处理也不抛出异常
            new ThreadPoolExecutor.DiscardPolicy()
        );
        //模拟10个用户来办理业务 没有用户就是来自外部的请求线程.
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
        //threadPoolInit();
    }
}
复制代码

结果

pool-1-thread-2	 办理业务
pool-1-thread-2	 办理业务
pool-1-thread-2	 办理业务
pool-1-thread-2	 办理业务
pool-1-thread-2	 办理业务
pool-1-thread-2	 办理业务
pool-1-thread-2	 办理业务
pool-1-thread-1	 办理业务
pool-1-thread-4	 办理业务
pool-1-thread-3	 办理业务
复制代码

合理配置线程池你是如何考虑的?

①. CPU密集型

  • 并发:CPU一核,模拟出来多条线程(多线程操作同一资源)
  • 并行:CPU多核,多个线程可以同时执行:线程池(多个人一起走)
  • 并发编程的本质:充分利用CPU资源

image.png

②. IO密集型

image.png

线程池的最大的大小如何设置 (CPU密集型和IO密集型—调优)

public class Demo6 {

    public static void main(String[] args) {
        //自定义线程池(工作必备)
        /*
		* 最大线程该如何定义
		* 1.CPU密集型:几核就是几,可以保持CPU的效率最高
		* 2.IO密集型:>判断程序中十分耗IO的线程
		* 程序15大型任务 io十分耗资源
		* 
		*/

        //获取CPU的核数
        System.out.println(Runtime.getRuntime().availableProcessors());

        ExecutorService threadPool=new ThreadPoolExecutor(
            2, 
            Runtime.getRuntime().availableProcessors(),
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardOldestPolicy()
            //队列满了,尝试去和最早的竞争,也不会抛出异常
        );

        try {
            //最大承载=Deque+max=8
            //java.util.concurrent.RejectedExecutionException
            for (int i = 1; i <=9; i++) {
                //使用线程池之后,利用线程池创建线程
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } finally {
            //线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}
复制代码

10、使用ThreadFactoryBuilder类来创建线程工厂

JDK 自带工具类创建的线程池存在的问题

  • 有的线程池可以无限添加任务或线程,容易导致 OOM;比如FixedThreadPool和 CachedThreadPool。
  • 使用 JDK 自带的线程工厂 (ThreadFactory)创建的线程名称都是类似pool-1-thread-1的形式,第一个数字是线程池编号,第二个数字是线程编号,这样很不利于系统异常时排查问题。

使用可自定义线程名称的线程工厂,借助大名鼎鼎的谷歌开源工具库 Guava,引入依赖

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>28.2-jre</version>
</dependency>
复制代码

然后使用其提供的ThreadFactoryBuilder类来创建线程工厂,Demo如下:

public class ThreadPoolDemo {

    // 线程数
    public static final int THREAD_POOL_SIZE = 16;

    public static void main(String[] args) throws InterruptedException {
        // 使用 ThreadFactoryBuilder 创建自定义线程名称的 ThreadFactory
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("hyn-demo-pool-%d").build();
        
        // 创建线程池,其中任务队列需要结合实际情况设置合理的容量
        ThreadPoolExecutor executor = new ThreadPoolExecutor(THREAD_POOL_SIZE,
                THREAD_POOL_SIZE,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());
        
        // 新建 1000 个任务,每个任务是打印当前线程名称
        for (int i = 0; i < 1000; i++) {
            executor.execute(() -> System.out.println(Thread.currentThread().getName()));
        }
        // 优雅关闭线程池
        executor.shutdown();
        executor.awaitTermination(1000L, TimeUnit.SECONDS);
        // 任务执行完毕后打印"Done"
        System.out.println("Done");
    }
}

复制代码

输出结果:

.........
hyn-demo-pool-2
hyn-demo-pool-6
hyn-demo-pool-13
hyn-demo-pool-12
hyn-demo-pool-15
Done
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享