线程池基本参数解析

一、线程池构造方法参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
复制代码

corePoolSize: 核心线程池数量

maximumPoolSize:最大线程池数量(包含核心线程池数量)

keepAliveTime: 线程执行完后的存活时间和 TimeUnit 联合使用

TimeUnit:线程执行完后的存活时间和 keepAliveTime 联合使用

BlockingQueue:任务队列,当新的任务到来,核心线程数已满,会加入任务队列

ThreadFactory: 线程工厂,生产线程

RejectedExecutionHandler:新的任务到来,如果已超过最大线程数 且任务队列已满,则会对该任务进行拒绝策略


二、keepAliveTime

作用在非核心线程,如果也需要作用在核心线程上,那需要调用

public void allowCoreThreadTimeOut(boolean value)

三、阻塞队列

1.ArrayBlockingQueue

存储方式:数组 final Object[] items;

构造方法两个参数:int capcity 数组大小,boolean fair是否是公平锁

ReentrantLock:公平锁和非公平锁 主要区别在获取锁的机制上
公平锁:获取时会检查队列中其他任务是否要获取锁,如果其他任务要获取锁,先让其他任务获取
非公平锁:获取时不管队列中是否有任务要获取锁,直接尝试获取
复制代码
2.LinkedBlockingDeque

存储方式:双向链表

构造方法参数:无参默认 int 最大容量,也可以传入容量值

3.PriorityBlockingQueue

存储方式:数组 private transient Object[] queue

构造函数:参数1初始容量 默认11,参数2 :比较器

4.SychronizeQueue

没有存储容量,必须找到执行线程,找不到就执行拒绝策略

5.DelayedWorkQueue

存储方式:数组 ,默认大小 16

private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
复制代码

四、线程工厂

1.DefaultThreadFactory: 生成线程 组,线程编号,线程名字 线程优先级(默认是 5)
2.PrivilegedThreadFactory 继承 DefaultThreadFactory

五、拒绝策略

1.CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
复制代码

直接在调用者线程进行执行,前提是 线程池未关闭

2.AbortPolicy
public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}
复制代码

抛出异常

3.DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
复制代码

直接什么也不做 丢弃任务

4.DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
复制代码

从任务队列中删除最旧的,然后重新执行该任务,这里是个隐式循环,因为excute 可能会重新触发拒绝策略


六、ThreadPoolExecutor

1.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
复制代码

核心线程数和最大线程数相等机只有核心线程;任务队列大小无限制;DefaultThreadFactory(也可以传入定制);拒绝策略是AbortPolicy

2.CacheThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
复制代码

核心线程数是0 ,最大线程数是 MAX_VALUE,任务队列无容量,每来一个任务都会新开线程执行任务,执行完后存活一分钟 即可释放线程;DefaultThreadFactory(也可以传入定制);拒绝策略是AbortPolicy

3.SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
复制代码

核心线程和最大线程数量都是1,任务队列大小无限制,DefaultThreadFactory(也可以传入定制);拒绝策略是AbortPolicy

4.WorkStealingPool(java1.8)

Java8新增的创建线程池的方法,如果不主动设置它的并发数,那么这个方法就会以当前机器的CPU处理器个数为线程个数,这个线程池会并行处理任务,不能够保证任务执行的顺序

七、ThreadScheduledExecutor

1.SingleThreadScheduledExecutor

核心线程数是1,最大线程数无限制,非核心线程最大存活时间 是0 秒,执行完立即结束

2.ScheduledThreadPoolExecutor

核心线程数可传入,最大线程数无限制,非核心线程最大存活时间 是0 秒,执行完立即结束

八、基本执行与选择

cpu密集型任务,设置为CPU核心数+1; IO密集型任务,设置为CPU核心数*2;

CPU密集型任务指的是需要cpu进行大量计算的任务,提高CPU的利用率。核心线程数不宜设置过大,太多的线程会互相抢占cpu资源导致不断切换线程,反而浪费了cpu。最理想的情况是每个CPU都在进行计算,没有浪费,但很有可能其中的一个线程会突然挂起等待IO,此时额外的一个等待线程就可以马上进行工作,而不必等待挂起结束。

IO密集型任务指的是任务需要频繁进行IO操作,这些操作会导致线程长时间处于挂起状态,那么需要更多的线程来进行工作,不会让cpu都处于挂起状态,浪费资源。一般设置为cpu核心数的两倍即可

1.在线程数没有达到核心线程数时,每个新任务都会创建一个新的线程来执行任务。
2.当线程数达到核心线程数时,每个新任务会被放入到等待队列中等待被执行。
3.当等待队列已经满了之后,如果线程数没有到达总的线程数上限,那么会创建一个非核心线程来执行任务。
4.当线程数已经到达总的线程数限制时,新的任务会被拒绝策略者处理

九、三方使用的选择

1.Okhttp

核心线程数是 0 ,最大线程数是 Integer.MAX_VALUE,线程执行完后允许存活最大时间 60S,队列采用的是 SynchronousQueue,及无容量的队列,这里采用无容量的队列是因为 Dispatcher 自己有实现队列

//Dispatcher.java
  //最大同时异步请求个数
  private int maxRequests = 64;
  //单个host的同时最大请求数
  private int maxRequestsPerHost = 5;

  //准备执行的异步队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  //正在执行的异步队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  //正在执行的同步队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
 //线程池
public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

//加入准备队列 并判断能不能执行
void enqueue(AsyncCall call) {
    synchronized (this) {
      //加入准备队列
      readyAsyncCalls.add(call);
    }
    //执行
    promoteAndExecute();
  }
 
  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));
   
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      //将可以执行的异步请求集合筛选出来 如果已经超过同时最大请求个数则直接跳出循环,否则如果超过最大host同时请求个数 继续下次循环
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
        //如果正在之子那个的
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
    //执行筛选出来的可执行任务
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

复制代码
2.EventBus

核心线程数是0 ,最大线程数是 MAX_VALUE,任务队列无容量,每来一个任务都会新开线程执行任务,执行完后存活一分钟 即可释放线程;拒绝策略是AbortPolicy

public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享