线程池自定义异常处理

一个栗子

模拟线程池中任务在执行过程中发生异常。

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            int i = 1 / 0;
        });
复制代码

这段代码在执行中,肯定会报一个除0异常,但是我们不会收到任何错误的信息。原因是线程池会将执行过程中发生的异常信息存储起来,然后通过调用get方法时,如果有异常,会抛出一个被ExecutionException包裹的异常;具体原理可查看 juejin.cn/post/696172… ,所以我们需要判断任务执行时,是否抛出了异常,可以通过try-catch代码块,捕获处理异常。

    @Test
    public void test() throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try{
            executorService.submit(() -> {
                int i = 1 / 0;
            }).get();
        }catch (ExecutionException e){
            e.printStackTrace();
        }
    }
复制代码

这种方式虽然可以处理异常,但是如果调用了get方法,会阻塞当前线程,直到run方法执行完毕,这样一来就失去了异步的意义,显然是不行的。


针对这样的情况,我们可以将我们自己执行的任务进行包装,然后再提交给线程池处理,这样一来,任务执行报错时,可以直接走我们的提前准备的异常处理逻辑,这样,即可实现程序既是异步执行,任务执行出错,线程池也不会吃掉我们的异常。

大致意思如下:

    @Test
    public void test() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        // 需要提交给线程池的任务
        Runnable task = () -> {
            int i = 1 / 0;
        };

        // 包装任务,做异常处理
        Runnable taskWrapper = () -> {
            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
                // todo 异常处理
            }
        };

        executorService.submit(taskWrapper);
    }
复制代码

控制台输出:

java.lang.ArithmeticException: / by zero
	at org.ywb.practise.difficulty.ExecutorMain.lambda$test$0(ExecutorMain.java:20)
	at org.ywb.practise.difficulty.ExecutorMain.lambda$test$1(ExecutorMain.java:26)
复制代码

核心的处理逻辑就是这样,但是如果这样写代码,估计会被主管打死 :-(,为了保证程序的复用性,可以稍作改装~

  1. runnable异常处理
@FunctionalInterface
public interface RunnableErrorHandler {
    /**
     * runnable 异常处理
     *
     * @param throwable 异常
     */
    void errorHandler(Throwable throwable);
}
复制代码
  1. 一个包装线程池的类
public class ExecutorServiceWrapper {

    private final ExecutorService threadPoolExecutor;

    private RunnableErrorHandler defaultRunnableErrHandler;

    public ExecutorServiceWrapper(ExecutorService threadPoolExecutor) {
        this.threadPoolExecutor = threadPoolExecutor;
    }

    public ExecutorServiceWrapper(ExecutorService threadPoolExecutor, RunnableErrorHandler defaultRunnableErrHandler) {
        this.threadPoolExecutor = threadPoolExecutor;
        this.defaultRunnableErrHandler = defaultRunnableErrHandler;
    }

    /**
     * 不传入异常处理机制,程序使用默认异常处理机制
     *
     * @param task 执行的任务
     * @return future<Void>
     */
    public Future<?> submit(Runnable task) {
        return threadPoolExecutor.submit(() -> {
            try {
                task.run();
            } catch (Throwable e) {
                if (defaultRunnableErrHandler != null) {
                    defaultRunnableErrHandler.errorHandler(e);
                }
            }
        });
    }

    /**
     * 自定义异常处理机制
     *
     * @param task         执行的任务
     * @param errorHandler 异常处理
     * @return future<Void>
     */
    public Future<?> submit(Runnable task, RunnableErrorHandler errorHandler) {
        return threadPoolExecutor.submit(() -> {
            try {
                task.run();
            } catch (Throwable e) {
                errorHandler.errorHandler(e);
            }
        });
    }
    
}
复制代码

这里只提供了关于Runnable的封装,可以自行脑补Callable的方法。

  1. 演示
    1. 这里在构造线程池wrapper时,传入默认的异常处理机制,打印异常堆栈
    2. 第一个任务使用默认异常处理机制
    3. 第二个任务使用自定义异常处理机制
    @Test
    public void test1() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        // 包装原线程池,传入默认异常处理机制
        ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService, Throwable::printStackTrace);
        // 使用通用异常处理机制
        executorServiceWrapper.submit(() -> {
            int i = 1 / 0;
        });

        // 传入自定义异常处理机制
        executorServiceWrapper.submit(() -> {
            int i = 1 / 0;
        }, throwable -> {
            // 打印异常信息
            System.err.println("customer---" + throwable.getMessage());
        });
    }
复制代码

输出:
可以看出,两个任务在发生错误时,分别按照规定走了特定的异常处理机制。

java.lang.ArithmeticException: / by zero
	at org.ywb.practise.difficulty.ExecutorMain.lambda$test1$2(ExecutorMain.java:43)
	at org.ywb.practise.difficulty.ExecutorServiceWrapper.lambda$submit$0(ExecutorServiceWrapper.java:30)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
customer---/ by zero
复制代码

拓展阅读,其他框架对线程池异常处理的支持

如果业务相对简单,我任务使用上面的操作就可以了,但是如果业务要求比较多,希望得到更多的支持,可以使用下面的guava提供的工具。

Guava

  1. 引入依赖
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.0-jre</version>
        </dependency>
复制代码
  1. 使用演示
    @Test
    public void test() {
        // 包装线程池
        ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());

        // 包装任务
        ListenableFutureTask<Void> listenableFutureTask = ListenableFutureTask.create(() -> {
            int i = 1 / 0;
        }, null);

        // 给任务添加回调
        Futures.addCallback(listenableFutureTask, new FutureCallback<Void>() {
            @Override
            public void onSuccess(@Nullable Void result) {
                // 成功后的回调
                System.out.println("success");
            }

            @Override
            public void onFailure(Throwable t) {
                // 异常处理
                t.printStackTrace();
            }
        }, guavaExecutor);

        // 提交任务
        guavaExecutor.submit(listenableFutureTask);
    }
复制代码

guava 不仅对失败做了处理,还可以通过OnSuccess方法,对任务执行的结果添加后续操作。

netty

netty的处理方法简直爆赞,但是遗憾的是我们不能直接用,它实现的是一个自己框架使用的基于事件的一个线程池。他返回的Future结果是这样的,大家瞻仰一下,netty真的是一个非常赞的框架,希望大家有机会都学习学习。

public interface Future<V> extends java.util.concurrent.Future<V> {

    /**
     * Returns {@code true} if and only if the I/O operation was completed
     * successfully.
     */
    boolean isSuccess();

    /**
     * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
     */
    boolean isCancellable();

    /**
     * Returns the cause of the failed I/O operation if the I/O operation has
     * failed.
     *
     * @return the cause of the failure.
     *         {@code null} if succeeded or this future is not
     *         completed yet.
     */
    Throwable cause();

    /**
     * Adds the specified listener to this future.  The
     * specified listener is notified when this future is
     * {@linkplain #isDone() done}.  If this future is already
     * completed, the specified listener is notified immediately.
     */
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * Adds the specified listeners to this future.  The
     * specified listeners are notified when this future is
     * {@linkplain #isDone() done}.  If this future is already
     * completed, the specified listeners are notified immediately.
     */
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * Removes the first occurrence of the specified listener from this future.
     * The specified listener is no longer notified when this
     * future is {@linkplain #isDone() done}.  If the specified
     * listener is not associated with this future, this method
     * does nothing and returns silently.
     */
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * Removes the first occurrence for each of the listeners from this future.
     * The specified listeners are no longer notified when this
     * future is {@linkplain #isDone() done}.  If the specified
     * listeners are not associated with this future, this method
     * does nothing and returns silently.
     */
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     */
    Future<V> sync() throws InterruptedException;

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future
     * failed.
     */
    Future<V> syncUninterruptibly();

    /**
     * Waits for this future to be completed.
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    Future<V> await() throws InterruptedException;

    /**
     * Waits for this future to be completed without
     * interruption.  This method catches an {@link InterruptedException} and
     * discards it silently.
     */
    Future<V> awaitUninterruptibly();

    /**
     * Waits for this future to be completed within the
     * specified time limit.
     *
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * Waits for this future to be completed within the
     * specified time limit.
     *
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    boolean await(long timeoutMillis) throws InterruptedException;

    /**
     * Waits for this future to be completed within the
     * specified time limit without interruption.  This method catches an
     * {@link InterruptedException} and discards it silently.
     *
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    /**
     * Waits for this future to be completed within the
     * specified time limit without interruption.  This method catches an
     * {@link InterruptedException} and discards it silently.
     *
     * @return {@code true} if and only if the future was completed within
     *         the specified time limit
     */
    boolean awaitUninterruptibly(long timeoutMillis);

    /**
     * Return the result without blocking. If the future is not done yet this will return {@code null}.
     *
     * As it is possible that a {@code null} value is used to mark the future as successful you also need to check
     * if the future is really done with {@link #isDone()} and not rely on the returned {@code null} value.
     */
    V getNow();

    /**
     * {@inheritDoc}
     *
     * If the cancellation was successful it will fail the future with a {@link CancellationException}.
     */
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享