所谓生产者消费者模式,即N个线程进行生产,同时N个线程进行消费,两种角色通过内存缓冲区进行通信。
生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。
但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:
1.如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
2.如果共享数据区为空的话,阻塞消费者继续消费数据;
弄懂生产者-消费者问题能够加深我们对并发编程的理解,同时这也是面试中常见的面试题
下面就介绍一下生产者消费者模式的四种写法
1.wait/notify
方式实现
2.Lock
方式实现
3.BlockingQueue
方式实现
4.Kotlin Channel
方式实现
1.wait/notify
方式实现
Java 中,可以通过配合调用 Object
对象的 wait()
方法和 notify()
方法或 notifyAll()
方法来实现线程间的通信。
在线程中调用 wait()
方法,将阻塞当前线程,同时释放对象锁,直至等到其他线程调用了 notify()
方法或 notifyAll()
方法进行通知之后,当前线程才能从wait()
方法中返回,继续执行下面的操作
其中要注意两点
1.永远使用while
循环对wait
条件进行判断,而不是if
语句中进行wait
条件的判断
现象
当有多个消费者时,使用if
会抛出异常
原因分析
当使用if
做条件判断时,当线程被唤醒后就继续执行消费者方法,但此时如果元素已经被另一个消费者消费了,那么就会抛出异常
解决方案
永远使用while
循环对wait
条件进行判断,而不是if
语句中进行wait
条件的判断
2.使用NotifyAll
而不是使用notify
现象
如果是多消费者和多生产者情况,如果使用notify
方法可能会出现“假死”的情况,即唤醒的是同类线程。
原因分析
假设当前多个生产者线程会调用wait
方法阻塞等待,当其中的生产者线程获取到对象锁之后使用notify
通知处于WAITTING
状态的线程,如果唤醒的仍然是生产者线程,就会造成所有的消费者线程都处于等待状态。
解决方案
使用NotifyAll
而不是使用notify
代码实现如下:
public class ProductorConsumerDemo1 {
public static void main(String[] args) {
LinkedList linkedList = new LinkedList();
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i < 5; i++) {
service.submit(new Productor(linkedList, 8));
}
for (int i = 0; i < 10; i++) {
service.submit(new Consumer(linkedList));
}
}
static class Productor implements Runnable {
private List<Integer> list;
private int maxLength;
public Productor(List list, int maxLength) {
this.list = list;
this.maxLength = maxLength;
}
@Override
public void run() {
while (true) {
synchronized (list) {
try {
while (list.size() == maxLength) {
System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait");
list.wait();
System.out.println("生产者" + Thread.currentThread().getName() + " 退出wait");
}
Random random = new Random();
int i = random.nextInt();
System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
list.add(i);
list.notifyAll();
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
static class Consumer implements Runnable {
private List<Integer> list;
public Consumer(List list) {
this.list = list;
}
@Override
public void run() {
while (true) {
synchronized (list) {
try {
while (list.isEmpty()) {
System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait");
list.wait();
System.out.println("消费者" + Thread.currentThread().getName() + " 退出wait");
}
Integer element = list.remove(0);
System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element);
list.notifyAll();
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
复制代码
2.Lock
方式实现
Lock
中的Condition
可以实现上面Object
的wait
,notify
一样的效果
await
对应wait
,signal
对应notify
,signalAll
对应notifyAll
下面直接来看看实现,代码与使用wait
,notify
基本上是一样的,只是同步方式不同
public class ProductorConsumerDemo2 {
private static ReentrantLock lock = new ReentrantLock();
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();
public static void main(String[] args) {
LinkedList linkedList = new LinkedList();
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i < 5; i++) {
service.submit(new Productor(linkedList, 8, lock));
}
for (int i = 0; i < 10; i++) {
service.submit(new Consumer(linkedList, lock));
}
}
static class Productor implements Runnable {
private List<Integer> list;
private int maxLength;
private Lock lock;
public Productor(List list, int maxLength, Lock lock) {
this.list = list;
this.maxLength = maxLength;
this.lock = lock;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
while (list.size() == maxLength) {
System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait");
full.await();
System.out.println("生产者" + Thread.currentThread().getName() + " 退出wait");
}
Random random = new Random();
int i = random.nextInt();
System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
list.add(i);
empty.signalAll();
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
static class Consumer implements Runnable {
private List<Integer> list;
private Lock lock;
public Consumer(List list, Lock lock) {
this.list = list;
this.lock = lock;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
while (list.isEmpty()) {
System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait");
empty.await();
System.out.println("消费者" + Thread.currentThread().getName() + " 退出wait");
}
Integer element = list.remove(0);
System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element);
full.signalAll();
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
复制代码
3.BlockingQueue
方式实现
由于BlockingQueue
内部实现就附加了两个阻塞操作。
即当队列已满时,阻塞向队列中插入数据的线程,直至队列中未满;当队列为空时,阻塞从队列中获取数据的线程,直至队列非空时为止.
所以使用BlockingQueue
来实现生产者消费者模式非常简单方便,关于BlockingQueue
的更多细节可见:并发容器之BlockingQueue详解
下面直接看下生产者消费者实现
public class ProductorConsumerDmoe3 {
private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i < 5; i++) {
service.submit(new Productor(queue));
}
for (int i = 0; i < 10; i++) {
service.submit(new Consumer(queue));
}
}
static class Productor implements Runnable {
private BlockingQueue queue;
public Productor(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Random random = new Random();
int i = random.nextInt();
System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
queue.put(i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
private BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer element = (Integer) queue.take();
System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
4.Kotlin Channel
方式实现
随着Kotlin
的普及,使用协程来处理并发也变成了一个更加方便的选择
使用Kotlin Channel
同样可以实现生产者消费者模式
1.一个 Channel
是一个和 BlockingQueue
非常相似的概念。
2.Channel
相比BlockingQueue
代替了阻塞的 put
操作并提供了挂起的 send
,还替代了阻塞的 take
操作并提供了挂起的 receive
3.相比BlockingQueue
的阻塞,Channel
的挂起性能更好
4.Channel
还有个特点是阻塞队列没有,它可以随时关闭,当发送者接收到关闭指令,将立即停止发送。
实现如下:
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) {
val item = x * x
println("生产者生产了:$item")
send(x * x)
delay(1000)
}
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach {
println("消费者消费了$it")
}
println("Done!")
}
复制代码
通过Channel
方式,可以比较方便的实现生产者消费者模式
总结
本文主要介绍了生产者消费者模式的四种写法
1.wait/notify
方式实现
2.Lock
方式实现
3.BlockingQueue
方式实现
4.Kotlin Channel
方式实现