【摘要】 java并发编程
为了充分利用计算机的资源,吧计算机的性能发挥到最大.
什么是高并发
并发和并行的区别
并发 concurrency 单核 线程”同时“操作(交替)
并行:多核同时执行。
高并发是指我们设计的程序,可以支持海量任务的执行在时间段上重叠的情况。
高并发的标准:
QPS:每秒响应的 HTTP 请求数量,QPS 不是并发数。吞吐量:单位时间内处…
java并发编程
为了充分利用计算机的资源,吧计算机的性能发挥到最大.
什么是高并发
并发和并行的区别
并发 concurrency 单核 线程”同时“操作(交替)
并行:多核同时执行。
高并发是指我们设计的程序,可以支持海量任务的执行在时间段上重叠的情况。
高并发的标准:
- QPS:每秒响应的 HTTP 请求数量,QPS 不是并发数。
- 吞吐量:单位时间内处理的请求数,由 QPS 和并发数来决定。
- 平均响应时间:系统对一个请求作出响应的评价时间。
QPS = 并发数 / 平均响应时间
- 并发用户数:同时承载正常使用系统的用户人数
互联网分布式架构设计,提高系统并发能力的方式:
- 垂直扩展
- 水平扩展
垂直扩展
提升单机处理能力
1、提升单机的硬件设备,增加 CPU 核数,升级网卡,硬盘扩容,升级内存。
2、提升单机的架构性能,使用 Cache 提高效率,使用异步请求来增加单服务吞吐量,NoSQL 提升数据库访问能力。
水平扩展
集群:一个厨师搞不定,多雇几个厨师一起炒菜,多个人干同一件事情。
分布式:给厨师雇两个助手,一个负责洗菜,一个负责切菜,厨师只负责炒菜,一件事情拆分成多个步骤,由不同的人去完成。
站点层扩展:Nginx 反向代理,一个 Tomcat 跑不动,那就 10 个 Tomcat 去跑。
服务层扩展:RPC 框架实现远程调用,Spring Boot/Spring Cloud,Dubbo,分布式架构,将业务逻辑拆分到不同的 RPC Client,各自完成对应的业务,如果某项业务并发量很大,增加新的 RPC Client,就能扩展服务层的性能,做到理论上的无限高并发。
数据层扩展:在数据量很大的情况下,将原来的一台数据库服务器,拆分成多台,以达到扩充系统性能的目的,主从复制,读写分离,分表分库。
进程和线程
sleep 是Thread 给的方法(线程对象) wait 是Object给的方法(对象资源)
sleep 让当前线程暂停,而wait 是让访问当前线程的对象暂停
sleep 不释放锁 ,wait 释放锁。
synchronized 修饰非静态方法 锁定是 方法的调用者。
syncchronize 修饰静态方法,锁定的是类。
静态方法和类同时存在的时候也不会同步,一个是类一个是对象。
默认两个线程数;
- main
- GC垃圾回收机制
java 本书无法开启线程,java无法操作硬件,只能通过调用本地方法,c++编写的动态函数库。
Thread
Runnable
Callable(有返回值)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YMUvm04O-1620915972900)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210506142445397.png)]
实例化FutureTask 传入 实例化Callable 对象再传入进Runnable方法里面。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class Test { public static void main(String[] args) { MyCallable myCallable = new MyCallable(); FutureTask<String> futureTask = new FutureTask(myCallable); Thread thread = new Thread(futureTask); thread.start(); try { String value = futureTask.get(); System.out.println(value); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
}
class MyCallable implements Callable<String>{ @Override public String call() throws Exception { System.out.println("callable"); return "hello"; }
}
synchronized 是通过 JVM 实现锁机制,ReentrantLock 是通过 JDK 实现锁机制。
synchronized 是一个关键字,ReentrantLock 是一个类。
重入锁:可以给同一个资源添加多把锁。
synchronized 是线程执行完毕之后自动释放锁,ReentrantLock 需要手动解锁
lock可以手动中断锁
synchronized 和 lock 的区别
1、synchronized 自动上锁,自动释放锁,Lock 手动上锁,手动释放锁。
2、synchronized 无法判断是否获取到了锁,Lock 可以判断是否拿到了锁。
3、synchronized 拿不到锁就会一直等待,Lock 不一定会一直等待。
4、synchronized 是 Java 关键字,Lock 是接口。
5、synchronized 是非公平锁,Lock 可以设置是否为公平锁。
公平锁:很公平,排队,当锁没有被占用时,当前线程需要判断队列中是否有其他等待线程。
非公平锁:不公平,插队,当锁没有被占用时,当前线程可以直接占用,而不需要判断当前队列中是否有等待线程。
实际开发中推荐使用 Lock 的方式。
ReentrantLock 具备限时性的特点,可以判断某个线程在一定的时间段内能否获取到锁,使用 tryLock 方法,返回值是 boolean 类型,true 表示可以获取到锁,false 表示无法获取到锁。
package 综合练习;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Test9 { public static void main(String[] args) { TimeLock timeLock = new TimeLock(); new Thread(() -> { timeLock.getLock();}, "A").start(); new Thread(() -> { timeLock.getLock(); }, "B").start(); }
}
class TimeLock { private ReentrantLock lock = new ReentrantLock(); public void getLock() { try { if (lock.tryLock(3, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + "拿到了锁"); Thread.sleep(5000); } else { System.out.println(Thread.currentThread().getName() + "拿不到锁"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if(lock.isHeldByCurrentThread())//谁上的谁关闭 lock.unlock(); } }
}
生产者与消费者
package 综合练习.Test1;
public class Test { private int num; public static void main(String[] args) { Test test = new Test(); new Thread(() -> { for (int i = 0; i < 30; i++) { test.in();
} }, "A").start(); new Thread(() -> { for (int i = 0; i < 30; i++) { test.out(); } }, "B").start();
}
public synchronized void in() { while (num!=0){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } num++; System.out.println("生产了" + num + "汉堡"); this.notify();
}
public synchronized void out() { while (num==0){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } num--; System.out.println("卖出了" + num + "汉堡"); this.notify();
}
}
必须使用while判断,不能用if,因为if会存在虚假唤醒,虚假唤醒就是一些wait方法会在除了notify的其他情况被唤醒,不是真正的唤醒。
private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void in() { lock.lock(); while (num != 0) { try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } num++; condition.signal(); System.out.println("生产了" + num + "汉堡"); lock.unlock(); }
使用Lock 来加锁就得用Condition 中提供的condition.await() 和condition.signal()来暂停和和唤醒线程
ConcurrentModificationException
并发访问异常
CopyonWrite 操作修改到复制了原数据的数据中,结束后,将其引用指向复制的数据
List<String> list = new ArrayList<>();
Vector<String> vector = new Vector<>();
List<String> list1 = Collections.synchronizedList(new ArrayList<>());
List<String> list2 = new CopyOnWriteArrayList<>();
Set<String> set=new ConcurrentSkipListSet<>();
JUC工具类
CountDownLatch 减法计数器
当两个线程一起开始时,可以说使用减法计时器,如果想要一个线程先开始,等到计时器清零则另一个进程开始。
coutDown():计数器减一
await():计数器停止,唤醒其他线程
new CountDownLatch(100)、coutDown()、await() 必须配合起来使用,创建对象的时候赋的值是多少,coutDown() 就必须执行多少次,否则计数器是没有清零的,计数器就不会停止,其他线程也无法唤醒,所以必须保证计数器清零,coutDown() 的调用次数必须大于构造函数的参数值。
CyclicBarrier 加法计数器 唤醒计数器线程执行任务,并且计数器可以重复使用
以上计数器在执行时不满足条件程序都会卡住
Semaphore:计数信号量
现在可以访问某些资源的线程数量。
Semaphore 只有 3 个操作:
- 初始化
- 获取许可
- 释放
获取信号量再执行,执行完毕后,释放给其他的。
读写锁
接口ReadWriteLock,可以多线程同时读,
目的可以同时读,不能同时写、
写入锁也叫独占锁,读锁也叫共享锁。
package 综合练习.JUC工具类;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class 读写锁 { public static void main(String[] args) { cache cache1 = new cache(); for (int i = 0; i < 10; i++) { final int t = i; new Thread(() -> { cache1.write(t, String.valueOf(t)); }).start(); } for (int i = 0; i < 10; i++) { final int t = i; new Thread(() -> { cache1.read(t); }).start(); } }
}
class cache { private Map<Integer, String> map = new HashMap<>(); private ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); public void write(Integer key, String value) { readWriteLock.writeLock().lock(); System.out.println(key + "开始写入"); map.put(key, value); System.out.println(key + "写入完毕"); readWriteLock.writeLock().unlock(); } public void read(Integer key) { readWriteLock.readLock().lock(); System.out.println(key + "开始读取"); map.get(key); System.out.println(key + "读取完毕"); readWriteLock.readLock().unlock(); }
}
线程池
预先创建好一定数量的对象,存入有到线程池中,需要用到时候直接取出来,用完之后不要销毁,返回到缓冲池中。
原理
线程池初始化会创建一定数量的线程对象
任务进行排队获取,如果线程数量不够,则进行排队队列,此时可以进行申请开辟新的线程对象直到最大值,如果此时依然不能满足需求,则队列外再来调用的对象只能进行拒绝。
优势
- 提高资源利用率
- 提高任务响应速度
- 便于统一管理线程对象
- 可以控制最大并发数
线程池具体设计思想
- 核心池大小
- 线程池的最大容量
- 等待队列
线程池 workQueue
- ArrayBlockQueue 基于数组的先进先出队列,创建时必须指定大小
- LinkedBlockingQueue: 先进先出 队列,创建时不指定大小,默认Integer.MAX_VALUE。
- Synchronized 不会提交任务,新建一个线程来执行新来的任务
- PriorityBlockingQueue:具有优先级的阻塞队列。
- 拒绝策略
线程池启动会按照核心池大小启动相应数量的线程对象
,如果线程对象都被占用,则后来的任务就进人等待队列中,当线程对象中的任务执行完毕后,线程对象恢复到空闲状态,等待队列中的任务就可以获取空闲状态中的线程资源执行任务
如果等待队列和线程对象都满了,则申请开启新的线程对象,但是 线程池有最大上限,不能超过它,
如果线程池已经达到最大上限,就是线程池达到最大容量并且等待队列已满,则只能相应的策略拒绝后来的任务!
工具类线程池三种模式
public class Test { public static void main(String[] args) { //单例模式
// ExecutorService executorService= Executors.newSingleThreadExecutor(); //指定线程数量
// ExecutorService executorService=Executors.newFixedThreadPool(3); //缓存线程池(由电脑配置决定)
// ExecutorService executorService=Executors.newCachedThreadPool(); for (int i = 0; i <10000 ; i++) { final int temp=i; executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+"-------"+temp); }); } }
}
建议直接实例化ThreadPoolExecutor 实现定制化线程池
corePoolSize(核心池大小)
maximumPolSize(线程池最大容量)
keepAliveTime(空任务时线程存活时间)在它大于corePoolSize 才生效
TimeUnit(存活时间单位)
BlockingQueue 等待队列。存储等待执行的任务
ThreadFactory 线程工厂,创建线程对象
RejectedExecutionHandler 拒绝策略
- AbortPolicy 抛出异常
- DiscardPolicy 放弃任务,不抛出异常
- DiscardOldestPolicy 尝试与等待队列中最前面的任务去争夺,不抛出异常
- CallerRunn 谁调用谁处理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MzD6ZrEJ-1620915972901)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210507125546283.png)]
package 综合练习.线程池;
import java.util.concurrent.*;
public class 手动使用 { public static void main(String[] args) { //ExecutorService表示线程池
// ExecutorService executorService=Executors.newSingleThreadExecutor();
// executorService.shutdown();//关闭线程池 ExecutorService executorService = null; try { executorService = new ThreadPoolExecutor(2, 3, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), //内部类 new ThreadPoolExecutor.CallerRunsPolicy() ); for (int i = 0; i <6; i++) { executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"办理业务"); }); } } catch (Exception e) { e.printStackTrace(); } finally { executorService.shutdown(); } }
}
1、Executors 工具类的 3 种实现
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorService executorService = Executors.newCachedThreadPool();
2、7 个参数
corePoolSize:核心池的大小
maximumPoolSize:线程池的最大容量
keepAliveTime:线程存活时间(在没有任务可执行的情况下),必须是线程池中的数量大于 corePoolSize,才会生效
TimeUnit:存活时间单位
BlockingQueue:等待队列,存储等待执行的任务
ThreadFactory:线程工厂,用来创建线程对象
RejectedExecutionHandler:拒绝策略
3、4 种拒绝策略
1、AbortPolicy:直接抛出异常
2、DiscardPolicy:放弃任务,不抛出异常
3、DiscardOldestPolicy:尝试与等待队列中最前面的任务去争夺,不抛出异常
4、CallerRunsPolicy:谁调用谁处理
ForJoin框架
用几个类完成操作 高并发
对象池的补充
将大任务拆分成小任务给各线程并发操作,最后合并成一个结果。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EW9S0kKm-1620915972902)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210507140605915.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-G6Oj72Uj-1620915972904)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210507145500326.png)]
Volatile 是 JVM 提供的轻量级同步机制,可见性,主内存对象线程可见。
线程存活时间(在没有任务可执行的情况下),必须是线程池中的数量大于 corePoolSize,才会生效
TimeUnit:存活时间单位
BlockingQueue:等待队列,存储等待执行的任务
ThreadFactory:线程工厂,用来创建线程对象
RejectedExecutionHandler:拒绝策略
3、4 种拒绝策略
1、AbortPolicy:直接抛出异常
2、DiscardPolicy:放弃任务,不抛出异常
3、DiscardOldestPolicy:尝试与等待队列中最前面的任务去争夺,不抛出异常
4、CallerRunsPolicy:谁调用谁处理
**ForJoin框架**
用几个类完成操作 高并发
对象池的补充
将大任务拆分成小任务给各线程并发操作,最后合并成一个结果。
[外链图片转存中...(img-EW9S0kKm-1620915972902)] [外链图片转存中...(img-G6Oj72Uj-1620915972904)]
Volatile 是 JVM 提供的轻量级同步机制,**可见性**,主内存对象线程可见。
一个线程的任务执行完后就会将更新回去到主内存中的值。空的就不会。
文章来源: blog.csdn.net,作者:qq_18125111,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_18125111/article/details/116769201