Java 并发编程——线程池开篇

这是我参与更文挑战的第 9 天,活动详情查看: 更文挑战

前面我们针对 Lock 锁、编发编程中的工具类进行了学习。通过这些知识可以完成基本的并发编程程序设计。后面就开始学习 JUC 中的 Executor 框架。

由于线程的生命周期中包括创建、就绪、运行、阻塞、销毁阶段,当我们待处理的任务数目较小时,我们可以自己创建几个线程来处理相应的任务,但当有大量的任务时,由于创建、销毁线程需要很大的开销,运用线程池这些问题就大大的缓解了。

1. Executor 概述

1.1 类图结构

Executor类图框架.png

1.2 Executor 简介

Executor 接口作为 Executor 框架的核心,只定义了一个 execute(Runnable) 方法。

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the <tt>Executor</tt> implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution.
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
复制代码

execute 方法用于执行给定的 Runnable,具体的执行策略依赖于具体的 Executor 实例。

Executor 类设计的思想是解耦 task 的提交和执行逻辑。

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
复制代码

注意 Executor 中执行 task 并不一定是在异步线程中执行。比如下面的例子就是在提交任务的线程中执行:

class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}
复制代码

通常情况下,在 execute 方法中执行任务的线程会跟提交任务的线程区分开,放在异步中进行执行。比如下面的小例子:

class NewThreadTaskExecutor implements Executor{

	@Override
	public void execute(Runnable command) {
		new Thread(command).start();
	}
}
复制代码

一般在实际使用中,Executor 的实现会添加很多设计方案来控制 task 的执行逻辑。比如可以将 task 通过 QueueStack 进行管理来控制。

class SerialTaskExecutor implements Executor{
	private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingDeque<>();
	private final Executor executor = null;
	private Runnable active;
	
	public SerialTaskExecutor(Runnable active) {
		super();
		this.active = active;
	}

	@Override
	public void execute(Runnable command) {
		if(command == null){
			throw new NullPointerException();
		}
		/**将任务添加到队列中*/
		taskQueue.offer(command);
		scheduleNext();
	}

	private void scheduleNext() {
		while((active = taskQueue.poll()) != null){
			executor.execute(active);
		}
	}
}
复制代码

Executor 的实现通过实现 execute() 方法达到针对 task 执行逻辑的定制。

2. ExecutorService

ExecutorService 是一个继承于 Executor 的接口,它丰富了 Executor 提供的接口,提供了关闭的 shutdown() 方法以及创建返回 Futuresubmit() 方法。

2.1 shutdown 关闭

如果 ExecutorService 已关闭,提交任务会抛出异常,ExecutorService 提供了两个关闭方法:

  • shutdown():关闭 ExecutorService,在关闭前会允许已提交的任务继续执行完成,完成后进行关闭。
  • shutdownNow():立即关闭 ExecutorService,并尝试终止正在执行的 task 任务。

在使用 ExecutorService 的时候,如果 ExecutorService 已不在使用则必须进行关闭释放所占用的资源。

2.2 创造构建返回 Feature

/**
 * Submits a Runnable task for execution and returns a Future
 * representing that task. The Future's <tt>get</tt> method will
 * return <tt>null</tt> upon <em>successful</em> completion.
 *
 * @param task the task to submit
 * @return a Future representing pending completion of the task
 * @throws RejectedExecutionException if the task cannot be
 *         scheduled for execution
 * @throws NullPointerException if the task is null
 */
Future<?> submit(Runnable task);
复制代码

3. AbstractExecutorService

AbstractExecutorServiceExecutorService 的具体实现类,实现了:

  • submit 方法
  • invokeAny 方法
  • invokeAll 方法

AbstractExecutorService 中通过 newTaskFor() 方法构建一个 RunnableFuture 对象,然后传递给 execute 方法进行执行。

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
复制代码

4. ThreadPoolExecutor 和 ScheduledThreadPoolExecutor

线程池类,线程池类主要是为了解决两类问题:

  • 针对大量的异步请求,通过重用线程池中的线程,来减少每个线程创建和销毁的性能开销。
  • 对线程进行一些维护和管理,比如定时开始,周期执行,并发数控制。

5. Executors

Executors 工具类给我们提供了方便的方法直接创建线程池。

  • ExecutorService newFixedThreadPool(int nThreads):创建一个指定大小的线程池
  • ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory):创建一个指定大小的线程池
  • ExecutorService newSingleThreadExecutor():创建一个单线程的线程池
  • ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
  • ExecutorService newCachedThreadPool():创建一个单线程的线程池
  • ExecutorService newCachedThreadPool(ThreadFactory threadFactory):根据用户的任务数创建相应的线程来处理,该线程池不会对线程数目加以限制,完全依赖于JVM能创建线程的数量,可能引起内存不足。
  • ScheduledExecutorService newSingleThreadScheduledExecutor()
  • ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

6. ThreadFactory

ThreadFactory 用于创建线程的工具类,主要是减少 new Thread() 创建线程的这种动作。

class SimpleThreadFactory implements ThreadFactory{

	@Override
	public Thread newThread(Runnable r) {
		return new Thread(r);
	}
}
复制代码

7. Future

Java 并发编程中,通过 Future 来达标异步执行的结果返回,同时可以来检测异步的执行结果是否结束。执行结果只有在异步线程执行完毕后通过 get 方法进行获取,如果异步没有执行完毕,则会处于阻塞等待的状态。同时提供 cancel 方法进行任务的取消。

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Future<String> future = executorService.submit(new Callable<String>() {

        @Override
        public String call() throws Exception {
            return "异步线程执行结果";
        }
    });
    System.out.print(Thread.currentThread().getName() + ":" + future.get().toString());
}
复制代码

以上就是线程池 Executor 框架中涉及的关键接口和类,总体功能介绍就这么多。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享