「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」
前言
之前对协程做了一些深入学习,回答了协程到底是什么,协程到底是怎么切换线程的等问题,有兴趣的同学可以了解下:
【带着问题学】协程到底是什么?
【带着问题学】协程到底是怎么切换线程的?
除了上文介绍的CoroutineContext
,CoroutineScope
等,协程中还有两个比较重要的概念,Channel
与Flow
本文主要对Flow
(冷流),Channel
(热流)做一个深入学习,主要包括以下内容
1.Channel
是什么及Channel
的基本使用
2.Channel
背后的原理解析与介绍
3.Flow
是什么及Flow
的基本使用
4.Flow
背后的原理解析与介绍
1. Channel
的基本使用
1.1 Channel
是什么?
Channel
实际上就是个队列,是一个面向多协程之间数据传输的 BlockQueue
,用于协程间通信
1.2 Channel
实现生产者消费者模式
传统Java
中的生产者-消费者模式很简单,一个或多个生产者线程,一个公用的阻塞队列(往往有ArrayBlockingQueue
和LinkedBlockingQueue
两种选择),以及一个或多个消费者线程。生产者源源不断地将数据入队到阻塞队列中,消费者则循环从队列中取出元素进行消费。
那么如果使用Channel
要如何实现生产者消费者模式呢?
fun produceAndConsume() {
GlobalScope.launch {
val channel = Channel<Int>()
val producer = GlobalScope.launch {
var i = 0
while (true) {
Log.i(tag, "生产者生产了:$i")
channel.send(i++)
delay(1000)
}
}
val consumer = GlobalScope.launch {
while (true) {
val element = channel.receive()
Log.i(tag, "消费者消费了:$element")
}
}
producer.join()
consumer.join()
}
}
//输出
I/ProduceAndConsume: 生产者生产了:0
I/ProduceAndConsume: 消费者消费了:0
I/ProduceAndConsume: 生产者生产了:1
I/ProduceAndConsume: 消费者消费了:1
I/ProduceAndConsume: 生产者生产了:2
I/ProduceAndConsume: 消费者消费了:2
I/ProduceAndConsume: 生产者生产了:3
I/ProduceAndConsume: 消费者消费了:3
复制代码
- 看得出来使用
Channel
来实现生产者消费者模式比较简单 - 生产者与消费者交替调用,这是因为生产者生产了之后如果发现缓存区满了就会挂起,消费者发现缓存区空了也会挂起
1.3 缓冲区容量
上面我们提到了缓冲区满了会挂起,那么缓冲区容量有多少呢?
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
复制代码
可以看出,当我们初始化时,传不同参数会有不同的实现
1.RendezvousChannel
缓存区大小为0,如果发送后没人接收,则会一直挂起
2.LinkedListChannel
与LinkedBlockingQueue
类似,容量没有限制
3.ConflatedChannel
有一个元素大小的缓存区,但每次有新元素过来,都会用新的替换旧的
4.ArrayChannel
与ArrayBlockingQueue
类似,接收一个值作为缓存区大小
2.Channel
原理解析
2.1 send
,receive
流程分析
上面我们介绍了,生产者与消费者是交替调用的,生产者生产了之后如果发现缓存区满了就会挂起,消费者发现缓存区空了也会挂起
具体流程如下:
- 若
receive
操作时队列包含Send
元素则异步唤醒send
协程 - 若
receive
操作时队列包不含Send
元素则挂起receive
协程 - 若
send
操作时队列包含receive
元素则异步唤醒receive
协程 - 若send操作时队列不包含receive元素则挂起send协程
由于篇幅原因就不在这里贴源码了,想要了解源码解析的同学可参考:Kotlin协程Channel中receive与send原理分析
2.2 Channel
与BlockingQueue
区别
1.挂起协程而非阻塞协程。Channel
使用挂起的send
,receive
代替了阻塞的put
,take
。
2.性能更好。相比BlockingQueue
的阻塞,这就涉及到线程的阻塞与唤醒,大量的线程资源会被浪费在阻塞状态下,Channel
的挂起性能更好
3.支持关闭。Channel
可以随时关闭,当发送者接收到关闭指令,将立即停止发送,当缓存区中的元素发送完成后,接收者也将关闭.
4.支持异常处理。Channel
使用结构化并发处理异常,可以实现:一个生产者或消费者协程抛出异常,所有生产者和消费者协程立即取消。可以避免多线程中某一个任务失败,误以为全部成功的问题
详情可参考:Kotlin Channel与生产者-消费者模式
2.3 Channel
是怎样保证线程安全的?
我们知道,Channel
可以用于多个协程间通信,多个协程可能运行在多个线程.
因此Channel
也需要处理线程安全的问题,是怎样保证的呢?
Channel
缓冲区分为链表与数组两种实现
2.3.1 链表实现
链表实现将缓存存储在LockFreeLinkedListHead
中
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
protected val queue = LockFreeLinkedListHead()
...
}
复制代码
LockFreeLinkedListHead
本身其实就是一个双向链表的节点,它所谓的LockFree
在Java
虚拟机上其实是通过CAS
原子操作来实现的
具体它的实现原理来源于一篇论文Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap
看起来有点复杂,有兴趣的同学可自行了解下~
2.3.2 数组实现
而对于数组版本,ArrayChannel
就简单粗暴了,内部就是一个数组
// 如果缓冲区大小大于 8,会先分配大小为 8 的数组,在后续进行扩容
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
复制代码
对这个数组读写时则直接用了一个ReentrantLock
进行加锁。
可以看出Channel
其实也是通过加锁或者CAS
来保证线程安全的
2.4 Channel
设计原则
Channel
是协程间通信的方式,它采用的是CSP(Communicating sequential processes)
模型,相比一些线程间通信的方案,它有以下特点:
Do not communicate by sharing memory; instead, share memory by communicating.
翻译过来就是:不要使用共享内存来通信,而是用通信来共享内存。
但是从本质上来看,计算机上线程和协程同步信息其实都是通过『共享内存』来进行的,因为无论是哪种通信模型,线程或者协程最终都会从内存中获取数据,Channel
的底层实现也需要对共享内存加锁来实现
既然都是共享内存那和我们自己使用共享内存有什么区别呢?
所以更为准确的说法是『为什么我们使用发送消息的方式来同步信息,而不是多个线程或者协程直接共享内存?』
- 1.首先,使用发送消息来同步信息相比于直接使用共享内存和互斥锁是一种更高级的抽象,使用更高级的抽象能够为我们在程序设计上提供更好的封装,让程序的逻辑更加清晰;
- 2.其次,消息发送在解耦方面与共享内存相比也有一定优势,我们可以将线程的职责分成生产者和消费者,并通过消息传递的方式将它们解耦,不需要再依赖共享内存;
- 3.最后,选择使用消息发送的方式,通过保证同一时间只有一个活跃的线程能够访问数据,能够从设计上天然地避免线程竞争和数据冲突的问题;
上面只是对CSP
模型的一个总结,如果感兴趣的同学,可以查看详细解析:为什么使用通信来共享内存
3. Flow
的基本使用
Flow
就是 Kotlin
协程与响应式编程模型结合的产物,你会发现它与 RxJava
非常像,二者之间也有相互转换的 API
,使用起来非常方便。
Flow
有以下特点:
1.冷数据流,不消费则不生产,这一点与Channel
正相反:Channel
的发送端并不依赖于接收端。
2.Flow
通过flowOn
改变数据发射的线程,数据消费线程则由协程所在线程决定
3.与RxJava
类似,支持通过catch
捕获异常,通过onCompletion
回调完成
4.Flow
没有提供取消方法,可以通过取消Flow
所在协程的方式来取消
具体使用如下:
lifecycleScope.launch {
flow {
for (i in 1..10) {
emit(i)
}
}.flowOn(Dispatchers.Main)
.catch {
//异常处理
}
.onCompletion {
//完成回调
}
.collect { num ->
// 具体的消费处理
// 只有collect时才会生产数据
// ...
}
}
复制代码
4.Flow
原理解析
我们上面介绍了Flow
的基本使用与特点,现在可以提出两个问题;
1.Flow
为什么是个冷流?
2.Flow
是怎么切换线程的?
4.1 Flow
为什么是个冷流?
冷流即开始消费时才生产数据,不消费则不生产,我们来看下源码
先看下flow{}
中发生了什么
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
复制代码
可以看出,flow{}
中做的事也很简单,主要就是创建了一个继承自AbstractFlow
的SafeFlow
再来看下AbstractFlow
中的内容
public abstract class AbstractFlow<T> : Flow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
// 1. collector 做一层包装
val safeCollector = SafeCollector(collector, coroutineContext)
try {
// 2. 处理数据接收者
collectSafely(safeCollector)
} finally {
// 3. 释放协程相关的参数
safeCollector.releaseIntercepted()
}
}
// collectSafely 方法应当遵循以下的约束
// 1. 不应当在collectSafely方法里面切换线程,比如 withContext(Dispatchers.IO)
// 2. collectSafely 默认不是线程安全的
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
复制代码
发现主要做了三件事:
1.对数据接收方FlowCollector
做了一层包装,也就是这个SafeCollector
2.调用它里面的抽象方法AbstractFlow#collectSafely
方法。
3.释放协程的一些信息。
结合以下之前看的SafeFlow
,它实现了AbstractFlow#collectSafely
方法,调用了collector.block()
,也就是运行了flow{}
块中的代码。
现在就很清晰了,为什么Flow
是冷流?
因为它会在每一次collect
的时候才会去触发发送数据的动作
4.2 Flow
是怎么切换线程的
Flow
切换线程的方式与协程切换线程是类似的
都是通过启动一个子协程,然后通过CoroutineContext
中的Dispatchers
切换线程
不同的地方在于Flow
切换过程中利用了Channel
来传递数据
由于Flow
切换线程的源码过多,就不在这里缀述了,有兴趣的同学可以跟一下源码,详情可见:flowOn()如何做到切换协程
总结
如上所示,本文主要讲述了Channel
,Flow
的基本使用与原理,提出了一些问题并做了解答.
更文不易,如果本文对你有所帮助,欢迎点赞收藏~
参考资料
破解 Kotlin 协程(9) – Channel 篇
Kotlin Channel与生产者-消费者模式
为什么使用通信来共享内存
破解 Kotlin 协程(11) – Flow 篇
抽丝剥茧Kotlin – 协程中绕不过的Flow