关于Collections和Sequence请看:关于kotlin中的Collections、Sequence、Channel和Flow (一)
Channel
简介
Channel
是一个和 BlockingQueue
非常相似的概念。其中一个不同是它代替了阻塞的 put
操作并提供了挂起的 send
,还替代了阻塞的 take
操作并提供了挂起的 receive
。
Channel
是并发安全的,它可以用来连接协程,实现不同协程的通信。
简单使用
val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer
launch {
while (true) {
println(channel.receive())
}
}
复制代码
既然我们说 Channel
实际上就是一个队列,队列不应该有缓冲区吗,那么这个缓冲区一旦满了,并且也一直没有人调用 receive
取走元素的话,send
就挂起了。那么接下来我们看下 Channel
的缓冲区的定义:
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
复制代码
-
RENDEZVOUS
就是 0,这个词本意就是描述“不见不散”的场景,所以你不来 receive,我这 send 就一直搁这儿挂起等着。换句话说,我们开头的例子里面,如果 consumer 不 receive,producer 里面的第一个 send 就给挂起了。 -
UNLIMITED
比较好理解,无限制,从它给出的实现LinkedListChannel
来看,这一点也与我们的LinkedBlockingQueue
类似。 -
CONFLATED
,这个词是合并的意思,这个类型的Channel
有一个元素大小的缓冲区,但每次有新元素过来,都会用新的替换旧的。 -
BUFFERED
了,它接收一个值作为缓冲区容量的大小,默认64。
但Channel
是热流,即使没有消费者,它的生产操作也会执行。如果你不接收,那么你可能再也接收不到。
因为刚才说了channel
类似于BlockingQueue
, 它的send()
和receive()
其实也是入队出队的操作,假定有多个消费者那它们就会竞争:
val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer 1
launch {
while (true) {
println("~~~"+channel.receive())
}
}
//consumer 2
launch {
while (true) {
println("!!!"+channel.receive())
}
}
部分输出:
~~~0
~~~1
!!!2
~~~3
!!!4
~~~5
!!!6
~~~7
!!!8
~~~9
复制代码
发现基本是交替获取到值。那如果想全都接收怎么办呢: 使用BroadcastChannel
:
val channel = BroadcastChannel<Int>(Channel.BUFFERED)
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer 1
launch {
while (true) {
println("~~~"+channel.openSubscription().receive())
}
}
//consumer 2
launch {
while (true) {
println("!!!"+channel.openSubscription().receive())
}
}
部分输出:
~~~1
!!!1
~~~2
!!!2
~~~3
!!!3
~~~4
!!!4
复制代码
还有一点要注意的是,channel
需要手动关闭。
Channel 版本的序列生成器
上面说到 sequence
无法享受更上层的协程框架概念下的各种能力,还有一点 sequence
显然不是线程安全的,而 Channel
可以在并发场景下使用。
launch {
val channel = produce(Dispatchers.Unconfined) {
send(1)
send(2)
}
for (item in channel) {
println("got : $item")
}
}
复制代码
但Channel
即使没有人“消费”,值依旧会生产,这会造成一定的浪费。
那么能不能Sequence + Channel
搞一下?
Flow
简介
Flow
是在 Kotlin Coroutines 1.2.0 alpha
之后新增的一套API,也叫做异步流,是 Kotlin
协程与响应式编程模型结合的产物。
什么是响应式编程
响应式编程基于观察者模式,是一种面向数据流和变化传播的声明式编程方式。换个说法就是:响应式编程是使用异步数据流进行编程。【响应式编程】
Flow 解决了什么
异步挂起函数能够返回单一值,那么我们如何返回多个异步计算的值呢?而这个就是
Kotlin Flow
解决的问题。
和Channel对比
- Flow是“冷”?的 ,和Sequence一样,只有遇到末端操作才会执行,但又不一样↓
- Flow是响应式的,由生产者回调给消费者 (sequence是消费端通知生产端)
- 它基于协程构建,因此提供了结构化并发和取消的所有好处。
- 丰富的操作符
channel的【操作符】在kotlin 1.4标记为弃用,未来是要移除掉的
如何使用
Flow
有多种构建方式,以下是最简单的方式:
viewModelScope.launch{
//构建 flow
val testFlow= flow {
emit(1)
}
//消费Flow
testFlow.collect {
println(it)
}
}
复制代码
怎么就是冷流了
一个 Flow
创建出来之后,不消费则不生产,多次消费则多次生产,生产和消费总是相对应的。
所谓冷数据流,就是只有消费时才会生产的数据流,这一点与Channel
相反:Channel
的发送端并不依赖于接收端。
收集器是具有单一挂起功能的流接口收集,它是终端操作符:
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
复制代码
发射器是FlowCollector,具有一个称为emit的单个挂起函数
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
复制代码
在其内部,收集器和发射器的整个机制只是调用两边的函数,
而suspend关键字则为其增加魔力。
线程切换
- 使用flowOn()来切换流的执行线程,flowOn 指定的调度器影响前面的逻辑。
fun main() = runBlocking {
flow {
emit("Context")
println(" emit on ${Thread.currentThread().name}")
}
.flowOn(Dispatchers.IO)
.map {
println(" map on ${Thread.currentThread().name}")
it + " Preservation"
}
.flowOn(Dispatchers.Default)
.collect { value ->
println(" collect on ${Thread.currentThread().name}")
println(value)
}
}
复制代码
输出:
emit on DefaultDispatcher-worker-2
map on DefaultDispatcher-worker-1
collect on main
Context Preservation
复制代码
异常处理
Flow
从不捕获或处理下游⬇️流中发生的异常,它们仅使用catch
运算符捕获上游⬆️发生的异常。
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}
复制代码
开始和结束
如果你想在流的开始和结尾处进行一些操作。
onCompletion 用起来比较类似于 try ... catch ... finally
中的 finally
,无论前面是否存在异常,它都会被调用,参数 t
则是前面未捕获的异常。
flow {
emit(1)
}.onStart {
println("smart")
}.onCompletion { t: Throwable ->
println("caught error: $t")
}.collect {
println(it)
}
复制代码
输出:
smart
1
end
复制代码
Flow
的设计初衷是希望确保流操作中异常透明。因此禁止?在flow
的构建中 try catch
:
Wrong:
flow {
try {
emit(1)
throw ArithmeticException("Div 0")
} catch (t: Throwable){
println("caught error: $t")
} finally {
println("finally.")
}
}
复制代码
末端操作符
前面的例子当中,我们用 collect 消费 Flow 的数据。collect
是最基本的末端操作符,除了 collect
之外,还有其他常见的末端操作符,大体分为三种类:
- 集合类型转换操作,包括
toList
、toSet
等。 - 聚合操作,包括将 Flow 规约到单值的
reduce
、fold
等操作,以及获得单个元素的操作包括single
、singleOrNull
、first
等。 - 无操作
collect()
和launchIn()
等。
实际上,识别是否为末端操作符,还有一个简单方法,由于 Flow
的消费端一定需要运行在协程当中,因此末端操作符都是挂起函数。
Flow 的取消
Flow 没有提供取消操作,Flow 的消费依赖于collect
这样的末端操作符,而它们又必须在协程当中调用,因此 Flow 的取消主要依赖于末端操作符所在的协程的状态。
val job = launch {
val intFlow = flow {
(1..3).forEach {
delay(1000)
emit(it)
}
}
intFlow.collect { println(it) }
}
delay(2500)
job.cancel()
复制代码
其他 Flow 的创建方式
flow { ... }
是基础的创建方式,还有其他构建器使流的声明更简单:
flowOf
构建器定义了一个发射固定,集的流。- 使用
.asFlow()
扩展函数,可以将各种集合与序列转换为流。
在flow { ... }
中无法随意切换调度器,这是因为 emit
函数不是线程安全的:
flow {
withContext(Dispatchers.IO){ //error
emit(2)
}
emit(1)
}.collect {
println(it)
}
复制代码
想要在生成元素时切换调度器,就须使用channelFlow
函数来创建 Flow
:
channelFlow {
send(1)
withContext(Dispatchers.IO) {
send(2)
}
}
复制代码
SharedFlow
上面我们说flow
是冷流,只有collect 之后才触发”生产”,那我就想要一个”热”流咋整呢?
**SharedFlow**
就是解决这个问题。在SharedFlow
之前通常是使用BroadcastChannel
然后asFlow
去实现,但这种实现方式不够优雅,和Channel过于耦合。因此在Coroutine
1.4时推出了SharedFlow.
它是一个**“热”流**,且可以有多个订阅者。
简单使用:
val broadcasts=MutableSharedFlow<String>()
viewModelScope.launch{
broadcasts.emit("Hello")
broadcasts.emit("SharedFlow")
}
lifecycleScope.launch{
broadcasts.collect {
print(it)
}
}
复制代码
StateFlow
StateFlow
是SharedFlow
的一个比较特殊的变种,而SharedFlow
又是Kotlin
数据流当中比较特殊的一种类型。StateFlow 与 LiveData 是最接近的,因为:
- 它始终是有值的。
- 它的值是唯一的。
- 它允许被多个观察者共用 (因此是共享的数据流)。
- 它永远只会把最新的值重现给订阅者,这与活跃观察者的数量是无关的。
当暴露 UI 的状态给视图时,应该使用
StateFlow
。这是一种安全和高效的观察者,专门用于容纳 UI 状态。
简单来说就是类似LiveData
,但是更好用!
StateFlow
仅在值已更新且不相同值时返回。简单来说,假定两个值x和y,其中x是最初发出的值,y是要发出的值,如果(x == y)
不执行任何操作,(x !=y)
则仅在此情况下才发出新值。
简单使用:
val stateFlow = MutableStateFlow(UIState.Loading)//初始状态
stateFlow.value = UIState.Error
launch {
stateFlow.collect {
...
}
}
复制代码
更多信息参阅:StateFlow和SharedFlow
背压
只要是响应式编程,就一定会有背压问题,我们先来看看背压究竟是什么:
生产者生产数据的速度超过了消费者消费的速度导致的问题。
但得益于suspend功能,可以在Kotlin流程中实现透明的背压管理。
当流的收集器不堪重负时,它可以简单地挂起发射器,并在准备好接受更多元素时将其resume。
但为了保证数据不丢失,我们也会考虑添加缓存来缓解问题:
flow {
List(100) {
emit(it)
}
}.buffer()
复制代码
我们也可以为 buffer 指定一个容量。不过,如果我们只是单纯地添加缓存,而不是从根本上解决问题就始终会造成数据积压。 (就像我们板球的聊天室消息缓存池)。
问题产生的根本原因是生产和消费速率的不匹配,除直接优化消费者的性能以外,我们也可以采取一些取舍的手段。
第一种是 conflate
新数据会覆盖老数据,例如:
flow {
List(100) {
emit(it)
}
}.conflate()
.collect { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
复制代码
我们快速地发送了 100 个元素,最后接收到的只有两个,当然这个结果每次都不一定一样:
Collecting 1
1 collected
Collecting 99
99 collected
复制代码
第二种是 collectLatest
。顾名思义,只处理最新的数据,这看上去似乎与 conflate
没有区别,其实区别大了:它并不会直接用新数据覆盖老数据,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消。
还是前面的例子,我们稍作修改:
flow {
List(100) {
emit(it)
}
}.collectLatest { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
复制代码
结果:
Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
复制代码
除 collectLatest
之外还有 mapLatest
、flatMapLatest
等等,都是这个作用。
在项目中的实战
近年来flow
是谷歌大力支持技术,像Room
,DataStore
, Paging3
等都支持了Flow
,那你还等什么呢,学起来,用起来。
普通suspend请求改造
可以发送多个值,UI 状态完全由数据驱动,比如Follow
按钮就可以改造一下,先Loading
后展示结果:
@WorkerThread
fun getObservableUserEvents(userId: String?):Flow<Result<ObservableUserEvents>{
return flow {
emit(Result.Loading)
if (userId == null) {
emit(sessionRepository.getSessions())
}
}
}
复制代码
重试机制
我要给某一网络请求增加重试机制:
override suspend fun getTrendsList() = flow<Result<xxx>> {
...
emit(Result.Success(result))
}.retry(2).catch { e ->
emit(Result.Error(e))
}
复制代码
当lifecycleScope遇上flow
搜索有多个tab,都要监听搜索的触发,但是一次预期是触发一个tab的搜索。在ViewPager里,旁边的Fragment是onPause,此时依旧可以收到livedata
回调,但是使用lifecycleScope
和flow
即可解决这个问题,因为launchWhenResumed
不在Resume
时会挂起:
lifecycleScope.launchWhenResumed{
searchRequestFlow.collect{request->
doSearch(request)
}
}
复制代码
而在Lifecycle 2.4.0
之后提供了一个新的API repeatOnLifecycle
,可以指定生命周期状态,并且在离开状态时不是简单的挂起,而是取消协程,当生命周期恢复时:
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.navigationActions.collect {
...
}
}
}
复制代码
官方倾向于使用 repeatOnLifecycle API
收集数据流,而不是在 launchWhenX API
内部进行收集。由于后面的 API
会挂起协程,而不是在 Lifecycle
处于 STOPPED
状态时取消。上游数据流会在后台保持活跃状态,并可能会发出新的项并耗用资源。
“数据倒灌”?不存在的
在之前,我们会使用LiveData
发送Event
去和UI
交互,或者执行某段逻辑,但是有些时候页面重建导致LiveData重新绑定,此时会立即收到回调导致触发逻辑。为了解决这个问题,在Flow
和Channel
都还没稳定的时候,谷歌的示例使用封装的Event
来判断事件是否处理过:
open class Event<out T>(private val content: T) {
var hasBeenHandled = false
private set // Allow external read but not write
/**
* Returns the content and prevents its use again.
*/
fun getContentIfNotHandled(): T? {
return if (hasBeenHandled) {
null
} else {
hasBeenHandled = true
content
}
}
/**
* Returns the content, even if it's already been handled.
*/
fun peekContent(): T = content
}
复制代码
但这只是个简单的封装,只能有一个观察者,想要应用在复杂场景还得设计个Manager
来管理多个观察者。但是使用SharedFlow
则不会有这个问题,毕竟LiveData
本来就不也是用来干这活的,人家设计来是和UI
来绑定的。
因为SharedFlow
是热流,事件被广播给未知数量的订阅者。在没有订阅者的情况下,任何发布的事件都会立即删除。它是一种用于必须立即处理或根本不处理的事件的设计模式。
使用示例:
val scrollToEvent: SharedFlow<ScheduleScrollEvent> =
loadSessionsResult.combineTransform(currentEventIndex) { result, currentEventIndex ->
emit(ScheduleScrollEvent(currentEventIndex))
}.shareIn(viewModelScope, WhileViewSubscribed, replay = 0)
复制代码
针对事件发送有些时候也可以使用Channel
,这个看业务场景:Channel
会每个事件都传递给单个订阅者。一旦Channel
缓冲区满了,会尝试在没有订阅者的情况下暂停事件发布,等待订阅者出现。默认情况下永远不会删除已发布的事件。(不过通过设置也可以无缓存或者仅缓存一个)
使用示例:
// SIDE EFFECTS: Navigation actions
private val _navigationActions = Channel<NavigationAction>(capacity = Channel.CONFLATED)
val navigationActions = _navigationActions.receiveAsFlow()
复制代码
debounce
搜索监听输入框,输入时执行搜索,这里要进行debounce,避免发出过多的sug请求:
val query=MutableStateFlow<String?>(null)
fun onTextChanged(text:String){
query.value=text
}
launch{
query.debounce(100).collect{text->
text?.let{
doSearch(text)
}
}
}
复制代码
多路复用
同时请求缓存和网络,网络先到则更新缓存,并取消协程,缓存先到则数据发送到UI后继续执行,直到网络数据返回。
listOf(
async { dataSource.getCacheData() },
async { dataSource.getRemoteData() })
.map { deferred ->
flow { emit(deferred.await()) }
}.merge().onEach { result ->
//网络数据
if (result.requestType == RequestType.NETWORK) {
if (isActive) {
_source.postValue(result)
}
if (result is Result.Success) {
result.data?.let { newData ->
//更新缓存
dataSource.flushCache(newData)
}
cancel()
}
} else {
//缓存数据
if (result is Result.Success) {
if (isActive) {
_source.postValue(result)
}
}
}
}.onCompletion {
isPreLoading.set(false)
}.launchIn(this)
复制代码
组合多个流
- Zip
每次各取一个,一旦其中一个流完成,结果流就完成,并在剩余流上调用cancel。
val flow = flowOf("4K显示器", "2K显示器", "1080P显示器")
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.zip(flow2) { i, s -> i + " 发给了 "+s }.collect {
println(it)
}
4K显示器 发给了 小明
2K显示器 发给了 小陈
1080P显示器 发给了 小红
复制代码
- Combine
通过组合每个流的最近发射的值,使用转换函数生成其值:
val flow = flowOf("Tom", "Jack", "Lucifer")
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
println(it)
}
Tom 和 小明握了手
Jack 和 小明握了手
Jack 和 小陈握了手
Lucifer 和 小陈握了手
Lucifer 和 小红握了手
Lucifer 和 小十一郎握了手
复制代码
如果我们对 第一个flow的发射加一个延迟:
val flow = flowOf("Tom", "Jack", "Lucifer").onEach { delay(10) }
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
println(it)
}
Tom 和 小十一郎握了手
Jack 和 小十一郎握了手
Lucifer 和 小十一郎握了手
复制代码
由于第一个流加了延迟,当数据发射时,第二个流已经发送完毕了,那么对于第二个流来说,最新值就是“小十一郎”。所以结果就成了上面那样。
Flow
操作符虽然比RxJava
少些,但满足大部分场景,其他操作符剩余的大家自行研究吧~
更多操作符请查阅:
Kotlin Flow
参考
协程 Flow 最佳实践 | 基于 Android 开发者峰会应用
哇 你都看到这了,点个赞再走呗。~