工具类主要是实现线程池的动态扩容
线程池工作流程图
以下直接上三个类的代码
指定线程名前缀的ThreadFactory
在创建线程池时有个参数要指定ThreadFactory
,指定的目的是我们可以指定线程名字前缀,便于以后调试时区分线程
从JDK8的java.util.concurrent.Executors.DefaultThreadFactory
复制而来,修改namePrefix
即可
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
NamedThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = name +
poolNumber.getAndIncrement() +
"-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
复制代码
可变长度的队列
复制JDK8java.util.concurrent.LinkedBlockingQueue
,命名为ResizeableLinkedBlockingQueue
,删除capacity
前的final
修饰符,添加volatile
private volatile int capacity;
public void setCapacity(int capacity) {
this.capacity = capacity;
}
复制代码
工具类
可对照代码中的注释
public class DynamicThreadPool {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = buildThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, 10, false, "test-");
print(executor);
// 向线程池提交15个任务
submit(executor, 15);
print(executor);
// 再次提交会抛异常,因为没有配置队列满了之后的拒绝策略
// submit(executor, 10);
// 修改线程池大小
setCorePoolSize(executor, 10, 10);
// 修改队列大小
setQueueSize(executor, 20);
// 再向线程池提交10个任务
submit(executor, 10);
while (true) {
// 隔一秒打印一次线程池的状态
TimeUnit.SECONDS.sleep(1);
print(executor);
}
// 阻塞当前线程等待
// Thread.currentThread().join();
}
private static void submit(ThreadPoolExecutor executor, int num) {
for (int i = 0; i < num; i++) {
executor.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 开始执行任务");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 结束任务");
});
}
}
/**
* 构建一个线程池
*
* @param corePoolSize 核心线程数,如果allowCoreThreadTimeOut为true时实际运行的线程数会小于此值,为false时则实际运行的线程数一定为此值
* @param maxPoolSize 最大线程数,当corePoolSize不够时会增加线程数到此值
* @param keepAliveTime 线程空闲此值时间后会关闭直到corePoolSize,如果allowCoreThreadTimeOut为true时核心线程数也会关闭
* @param unit keepAliveTime的单位
* @param queueSize 等待队列的长度,此队列满时线程池会采用拒绝策略
* @param allowCoreThreadTimeOut 是否允许核心线程空闲后关闭
* @param threadName 线程名前缀
* @return
*/
public static ThreadPoolExecutor buildThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, int queueSize, boolean allowCoreThreadTimeOut, String threadName) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
new ResizeableLinkedBlockingQueue<>(queueSize),
new NamedThreadFactory(threadName));
executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return executor;
}
/**
* 设置核心线程数和最大线程数,需要同时设置并且两个值一样才能使corePoolSize生效
*
* @param executor
* @param corePoolSize
* @param maxPoolSize
*/
public static void setCorePoolSize(ThreadPoolExecutor executor, int corePoolSize, int maxPoolSize) {
executor.setCorePoolSize(corePoolSize);
executor.setMaximumPoolSize(maxPoolSize);
}
public static void setMaxPoolSize(ThreadPoolExecutor executor, int maxPoolSize) {
executor.setMaximumPoolSize(maxPoolSize);
}
public static void setKeepAliveTime(ThreadPoolExecutor executor, long keepAliveTime, TimeUnit unit) {
executor.setKeepAliveTime(keepAliveTime, unit);
}
public static void setQueueSize(ThreadPoolExecutor executor, int queueSize) {
ResizeableLinkedBlockingQueue queue = (ResizeableLinkedBlockingQueue) executor.getQueue();
queue.setCapacity(queueSize);
}
/**
* 预先启动所有核心线程
*
* @param executor
*/
public static void prestartAllCoreThreads(ThreadPoolExecutor executor) {
executor.prestartAllCoreThreads();
}
/**
* 预先启动一个核心线程等待task
*
* @param executor
* @return
*/
public static boolean prestartCoreThread(ThreadPoolExecutor executor) {
return executor.prestartCoreThread();
}
public static void print(ThreadPoolExecutor executor) {
ResizeableLinkedBlockingQueue queue = (ResizeableLinkedBlockingQueue) executor.getQueue();
System.out.println(Thread.currentThread().getName()
+ " 核心线程数:" + executor.getCorePoolSize()
+ " 活动线程数:" + executor.getActiveCount()
+ " 最大线程数:" + executor.getMaximumPoolSize()
+ " 线程池活跃度:" + div(executor.getActiveCount(), executor.getMaximumPoolSize(), 2)
+ " 任务完成数:" + executor.getCompletedTaskCount()
+ " 队列大小:" + (queue.size() + queue.remainingCapacity())
+ " 排队数:" + queue.size()
+ " 队列剩余大小:" + queue.remainingCapacity()
+ " 队列使用度:" + div(queue.size(), queue.size() + queue.remainingCapacity(), 2));
}
private static double div(int num1, int num2, int scale) {
if (num2 == 0) {
return 0.0;
}
return BigDecimal.valueOf(num1).divide(BigDecimal.valueOf(num2), scale, BigDecimal.ROUND_HALF_UP).doubleValue();
}
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END