【孔乙已】生产者消费者有四样写法,你知道吗?

所谓生产者消费者模式,即N个线程进行生产,同时N个线程进行消费,两种角色通过内存缓冲区进行通信。
生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。
但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:
1.如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
2.如果共享数据区为空的话,阻塞消费者继续消费数据;

image.png

弄懂生产者-消费者问题能够加深我们对并发编程的理解,同时这也是面试中常见的面试题
下面就介绍一下生产者消费者模式的四种写法
1.wait/notify方式实现
2.Lock方式实现
3.BlockingQueue方式实现
4.Kotlin Channel方式实现

1.wait/notify方式实现

Java 中,可以通过配合调用 Object 对象的 wait() 方法和 notify()方法或 notifyAll() 方法来实现线程间的通信。
在线程中调用 wait() 方法,将阻塞当前线程,同时释放对象锁,直至等到其他线程调用了 notify() 方法或 notifyAll() 方法进行通知之后,当前线程才能从wait()方法中返回,继续执行下面的操作

其中要注意两点

1.永远使用while循环对wait条件进行判断,而不是if语句中进行wait条件的判断

现象
当有多个消费者时,使用if会抛出异常

原因分析
当使用if做条件判断时,当线程被唤醒后就继续执行消费者方法,但此时如果元素已经被另一个消费者消费了,那么就会抛出异常

解决方案
永远使用while循环对wait条件进行判断,而不是if语句中进行wait条件的判断

2.使用NotifyAll而不是使用notify

现象
如果是多消费者和多生产者情况,如果使用notify方法可能会出现“假死”的情况,即唤醒的是同类线程。

原因分析
假设当前多个生产者线程会调用wait方法阻塞等待,当其中的生产者线程获取到对象锁之后使用notify通知处于WAITTING状态的线程,如果唤醒的仍然是生产者线程,就会造成所有的消费者线程都处于等待状态。

解决方案
使用NotifyAll而不是使用notify

代码实现如下:

public class ProductorConsumerDemo1 {

    public static void main(String[] args) {

        LinkedList linkedList = new LinkedList();
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(linkedList, 8));
        }

        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(linkedList));
        }

    }

    static class Productor implements Runnable {

        private List<Integer> list;
        private int maxLength;

        public Productor(List list, int maxLength) {
            this.list = list;
            this.maxLength = maxLength;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.size() == maxLength) {
                            System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                            list.wait();
                            System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                        }
                        Random random = new Random();
                        int i = random.nextInt();
                        System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                        list.add(i);
                        list.notifyAll();
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }
    }

    static class Consumer implements Runnable {

        private List<Integer> list;

        public Consumer(List list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.isEmpty()) {
                            System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                            list.wait();
                            System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                        }
                        Integer element = list.remove(0);
                        System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                        list.notifyAll();
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

}
复制代码

2.Lock方式实现

Lock中的Condition可以实现上面Objectwait,notify一样的效果
await对应wait,signal对应notify,signalAll对应notifyAll
下面直接来看看实现,代码与使用wait,notify基本上是一样的,只是同步方式不同

public class ProductorConsumerDemo2 {

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();

    public static void main(String[] args) {
        LinkedList linkedList = new LinkedList();
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(linkedList, 8, lock));
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(linkedList, lock));
        }

    }

    static class Productor implements Runnable {

        private List<Integer> list;
        private int maxLength;
        private Lock lock;

        public Productor(List list, int maxLength, Lock lock) {
            this.list = list;
            this.maxLength = maxLength;
            this.lock = lock;
        }

        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.size() == maxLength) {
                        System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                        full.await();
                        System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                    list.add(i);
                    empty.signalAll();
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    static class Consumer implements Runnable {

        private List<Integer> list;
        private Lock lock;

        public Consumer(List list, Lock lock) {
            this.list = list;
            this.lock = lock;
        }

        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.isEmpty()) {
                        System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                        empty.await();
                        System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Integer element = list.remove(0);
                    System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                    full.signalAll();
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}
复制代码

3.BlockingQueue方式实现

由于BlockingQueue内部实现就附加了两个阻塞操作。
即当队列已满时,阻塞向队列中插入数据的线程,直至队列中未满;当队列为空时,阻塞从队列中获取数据的线程,直至队列非空时为止.
所以使用BlockingQueue来实现生产者消费者模式非常简单方便,关于BlockingQueue的更多细节可见:并发容器之BlockingQueue详解

下面直接看下生产者消费者实现

public class ProductorConsumerDmoe3 {

    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(queue));
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(queue));
        }
    }

    static class Productor implements Runnable {
        private BlockingQueue queue;

        public Productor(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
                    queue.put(i);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        private BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer element = (Integer) queue.take();
                    System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
复制代码

4.Kotlin Channel方式实现

随着Kotlin的普及,使用协程来处理并发也变成了一个更加方便的选择
使用Kotlin Channel同样可以实现生产者消费者模式

1.一个 Channel 是一个和 BlockingQueue 非常相似的概念。
2.Channel相比BlockingQueue代替了阻塞的 put 操作并提供了挂起的 send,还替代了阻塞的 take 操作并提供了挂起的 receive
3.相比BlockingQueue的阻塞,Channel的挂起性能更好
4.Channel还有个特点是阻塞队列没有,它可以随时关闭,当发送者接收到关闭指令,将立即停止发送。

实现如下:

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) {
        val item = x * x
        println("生产者生产了:$item")
        send(x * x)
        delay(1000)
    }
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach {
        println("消费者消费了$it")
    }
    println("Done!")
}
复制代码

通过Channel方式,可以比较方便的实现生产者消费者模式

总结

本文主要介绍了生产者消费者模式的四种写法
1.wait/notify方式实现
2.Lock方式实现
3.BlockingQueue方式实现
4.Kotlin Channel方式实现

参考资料

实现生产者消费者的三种方式
Kotlin Channel与生产者-消费者模式

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享