可动态调整的线程池

工具类主要是实现线程池的动态扩容

线程池工作流程图

以下直接上三个类的代码

指定线程名前缀的ThreadFactory

在创建线程池时有个参数要指定ThreadFactory,指定的目的是我们可以指定线程名字前缀,便于以后调试时区分线程

从JDK8的java.util.concurrent.Executors.DefaultThreadFactory复制而来,修改namePrefix即可

public class NamedThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    NamedThreadFactory(String name) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = name +
                poolNumber.getAndIncrement() +
                "-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
复制代码

可变长度的队列

复制JDK8java.util.concurrent.LinkedBlockingQueue,命名为ResizeableLinkedBlockingQueue,删除capacity前的final修饰符,添加volatile

private volatile int capacity;

public void setCapacity(int capacity) {
    this.capacity = capacity;
}
复制代码

工具类

可对照代码中的注释

public class DynamicThreadPool {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = buildThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, 10, false, "test-");
        print(executor);
        // 向线程池提交15个任务
        submit(executor, 15);
        print(executor);
        // 再次提交会抛异常,因为没有配置队列满了之后的拒绝策略
        // submit(executor, 10);

        // 修改线程池大小
        setCorePoolSize(executor, 10, 10);
        // 修改队列大小
        setQueueSize(executor, 20);
        // 再向线程池提交10个任务
        submit(executor, 10);

        while (true) {
            // 隔一秒打印一次线程池的状态
            TimeUnit.SECONDS.sleep(1);
            print(executor);
        }

        // 阻塞当前线程等待
        // Thread.currentThread().join();
    }

    private static void submit(ThreadPoolExecutor executor, int num) {
        for (int i = 0; i < num; i++) {
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 开始执行任务");
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 结束任务");
            });
        }
    }

    /**
     * 构建一个线程池
     *
     * @param corePoolSize           核心线程数,如果allowCoreThreadTimeOut为true时实际运行的线程数会小于此值,为false时则实际运行的线程数一定为此值
     * @param maxPoolSize            最大线程数,当corePoolSize不够时会增加线程数到此值
     * @param keepAliveTime          线程空闲此值时间后会关闭直到corePoolSize,如果allowCoreThreadTimeOut为true时核心线程数也会关闭
     * @param unit                   keepAliveTime的单位
     * @param queueSize              等待队列的长度,此队列满时线程池会采用拒绝策略
     * @param allowCoreThreadTimeOut 是否允许核心线程空闲后关闭
     * @param threadName             线程名前缀
     * @return
     */
    public static ThreadPoolExecutor buildThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, int queueSize, boolean allowCoreThreadTimeOut, String threadName) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
                maxPoolSize,
                keepAliveTime,
                unit,
                new ResizeableLinkedBlockingQueue<>(queueSize),
                new NamedThreadFactory(threadName));
        executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
        return executor;
    }

    /**
     * 设置核心线程数和最大线程数,需要同时设置并且两个值一样才能使corePoolSize生效
     *
     * @param executor
     * @param corePoolSize
     * @param maxPoolSize
     */
    public static void setCorePoolSize(ThreadPoolExecutor executor, int corePoolSize, int maxPoolSize) {
        executor.setCorePoolSize(corePoolSize);
        executor.setMaximumPoolSize(maxPoolSize);
    }

    public static void setMaxPoolSize(ThreadPoolExecutor executor, int maxPoolSize) {
        executor.setMaximumPoolSize(maxPoolSize);
    }

    public static void setKeepAliveTime(ThreadPoolExecutor executor, long keepAliveTime, TimeUnit unit) {
        executor.setKeepAliveTime(keepAliveTime, unit);
    }

    public static void setQueueSize(ThreadPoolExecutor executor, int queueSize) {
        ResizeableLinkedBlockingQueue queue = (ResizeableLinkedBlockingQueue) executor.getQueue();
        queue.setCapacity(queueSize);
    }

    /**
     * 预先启动所有核心线程
     *
     * @param executor
     */
    public static void prestartAllCoreThreads(ThreadPoolExecutor executor) {
        executor.prestartAllCoreThreads();
    }

    /**
     * 预先启动一个核心线程等待task
     *
     * @param executor
     * @return
     */
    public static boolean prestartCoreThread(ThreadPoolExecutor executor) {
        return executor.prestartCoreThread();
    }

    public static void print(ThreadPoolExecutor executor) {
        ResizeableLinkedBlockingQueue queue = (ResizeableLinkedBlockingQueue) executor.getQueue();
        System.out.println(Thread.currentThread().getName()
                + " 核心线程数:" + executor.getCorePoolSize()
                + " 活动线程数:" + executor.getActiveCount()
                + " 最大线程数:" + executor.getMaximumPoolSize()
                + " 线程池活跃度:" + div(executor.getActiveCount(), executor.getMaximumPoolSize(), 2)
                + " 任务完成数:" + executor.getCompletedTaskCount()
                + " 队列大小:" + (queue.size() + queue.remainingCapacity())
                + " 排队数:" + queue.size()
                + " 队列剩余大小:" + queue.remainingCapacity()
                + " 队列使用度:" + div(queue.size(), queue.size() + queue.remainingCapacity(), 2));
    }

    private static double div(int num1, int num2, int scale) {
        if (num2 == 0) {
            return 0.0;
        }
        return BigDecimal.valueOf(num1).divide(BigDecimal.valueOf(num2), scale, BigDecimal.ROUND_HALF_UP).doubleValue();
    }
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享