【协程进阶】Flow与Channel深入解析

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!

前言

之前对协程做了一些深入学习,回答了协程到底是什么,协程到底是怎么切换线程的等问题,有兴趣的同学可以了解下:
【带着问题学】协程到底是什么?
【带着问题学】协程到底是怎么切换线程的?

除了上文介绍的CoroutineContext,CoroutineScope等,协程中还有两个比较重要的概念,ChannelFlow
本文主要对Flow(冷流),Channel(热流)做一个深入学习,主要包括以下内容
1.Channel是什么及Channel的基本使用
2.Channel背后的原理解析与介绍
3.Flow是什么及Flow的基本使用
4.Flow背后的原理解析与介绍

1. Channel的基本使用

1.1 Channel是什么?

Channel实际上就是个队列,是一个面向多协程之间数据传输的 BlockQueue,用于协程间通信

1.2 Channel实现生产者消费者模式

传统Java中的生产者-消费者模式很简单,一个或多个生产者线程,一个公用的阻塞队列(往往有ArrayBlockingQueueLinkedBlockingQueue两种选择),以及一个或多个消费者线程。生产者源源不断地将数据入队到阻塞队列中,消费者则循环从队列中取出元素进行消费。

那么如果使用Channel要如何实现生产者消费者模式呢?

    fun produceAndConsume() {
        GlobalScope.launch {
            val channel = Channel<Int>()

            val producer = GlobalScope.launch {
                var i = 0
                while (true) {
                    Log.i(tag, "生产者生产了:$i")
                    channel.send(i++)
                    delay(1000)
                }
            }

            val consumer = GlobalScope.launch {
                while (true) {
                    val element = channel.receive()
                    Log.i(tag, "消费者消费了:$element")
                }
            }
            producer.join()
            consumer.join()
        }
    }
//输出
I/ProduceAndConsume: 生产者生产了:0
I/ProduceAndConsume: 消费者消费了:0
I/ProduceAndConsume: 生产者生产了:1
I/ProduceAndConsume: 消费者消费了:1
I/ProduceAndConsume: 生产者生产了:2
I/ProduceAndConsume: 消费者消费了:2
I/ProduceAndConsume: 生产者生产了:3
I/ProduceAndConsume: 消费者消费了:3
复制代码
  1. 看得出来使用Channel来实现生产者消费者模式比较简单
  2. 生产者与消费者交替调用,这是因为生产者生产了之后如果发现缓存区满了就会挂起,消费者发现缓存区空了也会挂起

1.3 缓冲区容量

上面我们提到了缓冲区满了会挂起,那么缓冲区容量有多少呢?

    public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
        when (capacity) {
            RENDEZVOUS -> RendezvousChannel()
            UNLIMITED -> LinkedListChannel()
            CONFLATED -> ConflatedChannel()
            else -> ArrayChannel(capacity)
        }
复制代码

可以看出,当我们初始化时,传不同参数会有不同的实现
1.RendezvousChannel缓存区大小为0,如果发送后没人接收,则会一直挂起
2.LinkedListChannelLinkedBlockingQueue类似,容量没有限制
3.ConflatedChannel有一个元素大小的缓存区,但每次有新元素过来,都会用新的替换旧的
4.ArrayChannelArrayBlockingQueue类似,接收一个值作为缓存区大小

2.Channel原理解析

2.1 send,receive流程分析

上面我们介绍了,生产者与消费者是交替调用的,生产者生产了之后如果发现缓存区满了就会挂起,消费者发现缓存区空了也会挂起
具体流程如下:

  1. receive操作时队列包含Send元素则异步唤醒send协程
  2. receive操作时队列包不含Send元素则挂起receive协程
  3. send操作时队列包含receive元素则异步唤醒receive协程
  4. 若send操作时队列不包含receive元素则挂起send协程

由于篇幅原因就不在这里贴源码了,想要了解源码解析的同学可参考:Kotlin协程Channel中receive与send原理分析

2.2 ChannelBlockingQueue区别

1.挂起协程而非阻塞协程。Channel使用挂起的send,receive代替了阻塞的put,take
2.性能更好。相比BlockingQueue的阻塞,这就涉及到线程的阻塞与唤醒,大量的线程资源会被浪费在阻塞状态下,Channel的挂起性能更好
3.支持关闭。Channel可以随时关闭,当发送者接收到关闭指令,将立即停止发送,当缓存区中的元素发送完成后,接收者也将关闭.
4.支持异常处理。Channel使用结构化并发处理异常,可以实现:一个生产者或消费者协程抛出异常,所有生产者和消费者协程立即取消。可以避免多线程中某一个任务失败,误以为全部成功的问题

详情可参考:Kotlin Channel与生产者-消费者模式

2.3 Channel是怎样保证线程安全的?

我们知道,Channel可以用于多个协程间通信,多个协程可能运行在多个线程.
因此Channel也需要处理线程安全的问题,是怎样保证的呢?

Channel缓冲区分为链表与数组两种实现

2.3.1 链表实现

链表实现将缓存存储在LockFreeLinkedListHead

    internal abstract class AbstractSendChannel<E> : SendChannel<E> { 
        protected val queue = LockFreeLinkedListHead()
        ...
    }
复制代码

LockFreeLinkedListHead本身其实就是一个双向链表的节点,它所谓的LockFreeJava虚拟机上其实是通过CAS原子操作来实现的
具体它的实现原理来源于一篇论文Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap
看起来有点复杂,有兴趣的同学可自行了解下~

2.3.2 数组实现

而对于数组版本,ArrayChannel就简单粗暴了,内部就是一个数组

    // 如果缓冲区大小大于 8,会先分配大小为 8 的数组,在后续进行扩容
    private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
复制代码

对这个数组读写时则直接用了一个ReentrantLock进行加锁。
可以看出Channel其实也是通过加锁或者CAS来保证线程安全的

2.4 Channel设计原则

Channel是协程间通信的方式,它采用的是CSP(Communicating sequential processes)模型,相比一些线程间通信的方案,它有以下特点:

Do not communicate by sharing memory; instead, share memory by communicating.

翻译过来就是:不要使用共享内存来通信,而是用通信来共享内存

但是从本质上来看,计算机上线程和协程同步信息其实都是通过『共享内存』来进行的,因为无论是哪种通信模型,线程或者协程最终都会从内存中获取数据,Channel的底层实现也需要对共享内存加锁来实现

既然都是共享内存那和我们自己使用共享内存有什么区别呢?
所以更为准确的说法是『为什么我们使用发送消息的方式来同步信息,而不是多个线程或者协程直接共享内存?』

  • 1.首先,使用发送消息来同步信息相比于直接使用共享内存和互斥锁是一种更高级的抽象,使用更高级的抽象能够为我们在程序设计上提供更好的封装,让程序的逻辑更加清晰;
  • 2.其次,消息发送在解耦方面与共享内存相比也有一定优势,我们可以将线程的职责分成生产者和消费者,并通过消息传递的方式将它们解耦,不需要再依赖共享内存;
  • 3.最后,选择使用消息发送的方式,通过保证同一时间只有一个活跃的线程能够访问数据,能够从设计上天然地避免线程竞争和数据冲突的问题;

上面只是对CSP模型的一个总结,如果感兴趣的同学,可以查看详细解析:为什么使用通信来共享内存

3. Flow的基本使用

Flow 就是 Kotlin 协程与响应式编程模型结合的产物,你会发现它与 RxJava 非常像,二者之间也有相互转换的 API,使用起来非常方便。
Flow有以下特点:
1.冷数据流,不消费则不生产,这一点与Channel正相反:Channel的发送端并不依赖于接收端。
2.Flow通过flowOn改变数据发射的线程,数据消费线程则由协程所在线程决定
3.与RxJava类似,支持通过catch捕获异常,通过onCompletion 回调完成
4.Flow没有提供取消方法,可以通过取消Flow所在协程的方式来取消

具体使用如下:

lifecycleScope.launch {
    flow {
        for (i in 1..10) {
            emit(i)
        }
    }.flowOn(Dispatchers.Main)
        .catch {
            //异常处理
        }
        .onCompletion {
            //完成回调
        }
        .collect { num ->
            // 具体的消费处理
            // 只有collect时才会生产数据
            // ...
        }
}
复制代码

4.Flow原理解析

我们上面介绍了Flow的基本使用与特点,现在可以提出两个问题;
1.Flow为什么是个冷流?
2.Flow是怎么切换线程的?

4.1 Flow为什么是个冷流?

冷流即开始消费时才生产数据,不消费则不生产,我们来看下源码
先看下flow{}中发生了什么

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}
复制代码

可以看出,flow{}中做的事也很简单,主要就是创建了一个继承自AbstractFlowSafeFlow
再来看下AbstractFlow中的内容

public abstract class AbstractFlow<T> : Flow<T> {

    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
  		// 1. collector 做一层包装
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
  			// 2. 处理数据接收者
            collectSafely(safeCollector)
        } finally {
          	// 3. 释放协程相关的参数
            safeCollector.releaseIntercepted()
        }
    }

    // collectSafely 方法应当遵循以下的约束
    // 1. 不应当在collectSafely方法里面切换线程,比如 withContext(Dispatchers.IO)
    // 2. collectSafely 默认不是线程安全的
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}
复制代码

发现主要做了三件事:
1.对数据接收方FlowCollector 做了一层包装,也就是这个SafeCollector
2.调用它里面的抽象方法AbstractFlow#collectSafely 方法。
3.释放协程的一些信息。

结合以下之前看的SafeFlow,它实现了AbstractFlow#collectSafely方法,调用了collector.block(),也就是运行了flow{}块中的代码。
现在就很清晰了,为什么Flow是冷流?
因为它会在每一次collect的时候才会去触发发送数据的动作

4.2 Flow是怎么切换线程的

Flow切换线程的方式与协程切换线程是类似的
都是通过启动一个子协程,然后通过CoroutineContext中的Dispatchers切换线程
不同的地方在于Flow切换过程中利用了Channel来传递数据

由于Flow切换线程的源码过多,就不在这里缀述了,有兴趣的同学可以跟一下源码,详情可见:flowOn()如何做到切换协程

总结

图片[1]-【协程进阶】Flow与Channel深入解析-一一网
如上所示,本文主要讲述了Channel,Flow的基本使用与原理,提出了一些问题并做了解答.
更文不易,如果本文对你有所帮助,欢迎点赞收藏~

参考资料

破解 Kotlin 协程(9) – Channel 篇
Kotlin Channel与生产者-消费者模式
为什么使用通信来共享内存
破解 Kotlin 协程(11) – Flow 篇
抽丝剥茧Kotlin – 协程中绕不过的Flow

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享