Kotlin协程(一)——之语言特性
Kotlin 协程简单介绍
初步印象
协程是线程之下更小的处理单元,本质上协程是轻量级的线程。本文章先把协程的简单用法梳理一遍,至于细节再在以后的文章里结合网络请求、IO操作在进行深入研究。
协程比较轻量
协程是基于线程的封装,底层还是依赖于线程
协程可以简化异步编程
相关概念
kotlinx.coroutines
是由JetBrains
开发的功能丰富的协程库。它包含本指南中涵盖的很多启用高级协程的原语,包括launch
、async
等等。
协程基本使用
创建协程
-
方式一:
GlobalScope.launch
顶层协程,虽然它很轻量,但它运行时仍会消耗一些内存资源。全局协程类似守护线程,当其他协程执行完毕之后自动关闭。GlobalScope.launch { // 顶层协程 } 复制代码
-
方式二:
runBlocking
协程构建器将函数转换为协程,在执行操作所在的指定作用域内启动协程。fun main() = runBlocking { launch { delay(1000L) println("World!") } println("Hello,") } 复制代码
-
方式三:使用
coroutineScope
构建器声明自己的作用域,runBlocking
与coroutineScope
类似都会等待其协程体以及所有子协程结束,runBlocking
方法会阻塞当前线程来等待, 而coroutineScope
只是挂起。具体表现就是runBlocking
中后续代码会在launch
代码块之后执行,coroutineScope
则相反。fun main() = runBlocking { launch { delay(200L) println("2. Task from runBlocking!") } coroutineScope { launch { delay(500L) println("3. Task from nested launch!") } delay(100L) println("1. Task from coroutineScope scrope!") } println("4. Coroutine scope is over!") } 复制代码
-
方式四:挂起函数, 对前面几种方法的抽取,为了和普通函数的作用域相区分使用关键字
suspend
fun main() = runBlocking { launch { doWork() } println("Hello,") } suspend fun doWork() { delay(1000L) println("World!") } 复制代码
取消协程(超时)
- 取消协程:通过Job的
cancle()
和join()
或者cancelAndJoin()
。协程的取消是协作的,即执行完当前协程当中的任务之后关闭,协程会运行到了它结束为止。CoroutineScope
的isActive
可用于监控协程状态。fun main() = runBlocking { val job = launch { repeat(1000) { i -> println("job: I'm sleeping $i ...") delay(500L) } } delay(1300L) println("main: I'm tired of waiting!") job.cancel() // 取消该作业 job.join() // 等待作业执行结束 println("main: Now I can quit.") } 复制代码
- 超时协程: 使用
withTimeout
或者withTimeoutOrNull
函数来延迟取消追踪,前者会抛异常// withTimeout fun main() = runBlocking { withTimeout(1300L) { repeat(100) {i -> println("I'm sleeping $i ...") delay(500L) } } println("Result is Done") } // withTimeoutOrNull fun main() = runBlocking { val result = withTimeoutOrNull(1300L) { repeat(2) {i -> println("I'm sleeping $i ...") delay(500L) } "Done" } println("Result is $result") } 复制代码
挂起函数
-
挂起函数默认是顺序执行
fun main() = runBlocking { val time = measureTimeMillis { val one = doOne() val two = doTwo() println("The answer is ${one + two}") } println("Completed in $time ms") } suspend fun doOne(): Int { delay(1000L) // 假设我们在这里做了一些有用的事 return 13 } suspend fun doTwo(): Int { delay(1000L) // 假设我们在这里做了一些有用的事 return 29 } // 执行时间为 2015 ms 结果是 42 复制代码
-
使用
async
实现异步操作,async
可以通过将start
参数设置为CoroutineStart.LAZY
变为惰性的,即延时加载,当调用start
时才加载// 异步执行 fun main() = runBlocking { val time = measureTimeMillis { val one = async { doOne() } val two = async { doTwo() } println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") } // 惰性启动 fun main() = runBlocking { val time = measureTimeMillis { val one = async(start = CoroutineStart.LAZY) { doOne() } val two = async(start = CoroutineStart.LAZY) { doTwo() } // 不调用start() 默认按顺序执行 2034 ms println("The answer is ${one.await() + two.await()}") //调用start() 默认按异步执行 1035 ms // one.start() // 启动第一个 // two.start() // 启动第二个 // println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") } suspend fun doOne(): Int { delay(1000L) // 假设我们在这里做了一些有用的事 return 13 } suspend fun doTwo(): Int { delay(1000L) // 假设我们在这里做了一些有用的事 return 29 } // 执行时间为 1045 ms 结果是 42 复制代码
-
coroutineScope
实现结构化并发。async
时CoroutineScope
的扩展函数,因此可以直接用coroutineScope
抽取。fun main() = runBlocking { val time = measureTimeMillis { println("The answer is ${concurrentSum()}") } println("Completed in $time ms") } suspend fun concurrentSum() = coroutineScope { val one = async { doOne() } val two = async { doTwo() } one.await() + two.await() } suspend fun doOne(): Int { delay(1000L) // 假设我们在这里做了一些有用的事 return 13 } suspend fun doTwo(): Int { delay(1000L) // 假设我们在这里做了一些有用的事 return 29 } 复制代码
调度器
- 协程调度器确定了哪些线程或与线程相对应的协程执⾏,将协程限制在⼀个特定的线程执⾏,或将它分派到⼀个线程池,亦或是让它不受限地运⾏。
-
launch { }
不传参启动了承袭的上下⽂调度器, -
Dispatchers.Unconfined
是⾮受限调度器,调⽤时启动了⼀个协程,恢复线程中的协程由被调⽤的挂起函数来决定 -
Dispatchers.Default
同GlobalScope.launch {}
,⽤共享的后台线程池 -
ExecutorCoroutineDispatcher
启动了⼀个新的线程fun main() = runBlocking<Unit> { launch { print("main runBlocking I'm working in thread ${Thread.currentThread().name} \n") } launch(Dispatchers.Unconfined) { print("Unconfined I'm working in thread ${Thread.currentThread().name} \n") } launch(Dispatchers.Default) { print("Default I'm working in thread ${Thread.currentThread().name} \n") } launch(newSingleThreadContext("MyOwnThread")) { print("newSingleThreadContext I'm working in thread ${Thread.currentThread().name} \n") } } 复制代码
-
流 Flow
-
Flow 类似
Java8
当中的Stream
,Flow
对异步支持更加友好 -
流使用
emit
函数 发射 值,使用collect
函数收集值,函数不再标有suspend
修饰符fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(1000) emit(i) } } fun main() = runBlocking<Unit> { launch { for (k in 1..3) { println("I'm not blocked $k") delay(1000) } } simple().collect { value -> println(value) } } 复制代码
-
创建流
-
flow {}
构造器fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(1000) emit(i) } } 复制代码
-
flowOf
构建器定义了一个发射固定值集的流fun simple(): Flow<Int> = flowOf(1, 2, 3) 复制代码
-
使用
.asFlow()
扩展函数,可以将各种集合与序列转换为流(1..3).asFlow().collect { value -> println(value) } 复制代码
-
常用函数
map
、filter
、transform
、take
-
末端操作
collect
、toList
、toSet
、first
、single
、reduce
、flod
-
其他函数
flowOn()更改上下文
、buffer()缓冲
、conflate()合并
-
通道 Channel
-
通道提供了一种在流中传输值的方法
-
Channel
提供挂起的send
和receive
@Test fun test_channel() = runBlocking { val channel = Channel<Int>() launch { // this might be heavy CPU-consuming computation or async logic, we'll just send five squares for (x in 1..5) channel.send(x * x) } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!") } 复制代码
-
Channel
可以保证所有先前发送出去的元素都在通道关闭前被接收到 -
Channel
实现自SendChannel
和ReceiveChannel
异常处理
-
协程构建器有两种形式:自动传播异常(
launch
与actor
)或向用户暴露异常(async
与produce
),前者这类构建器将异常视为未捕获异常,直接抛出,后者依赖用户来最终消费异常fun test_channel() = runBlocking { val job = GlobalScope.launch { // launch 根协程 println("Throwing exception from launch") throw IndexOutOfBoundsException() // 我们将在控制台打印 Thread.defaultUncaughtExceptionHandler } job.join() println("Joined failed job") val deferred = GlobalScope.async { // async 根协程 println("Throwing exception from async") throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待 } try { deferred.await() println("Unreached") } catch (e: ArithmeticException) { println("Caught ArithmeticException") } } 复制代码
竞态与并发
-
协程可用多线程调度器并发执行,需要考虑同步访问共享的可变状态
-
方法一:对线程、协程都有效的常规解决方法,使用线程安全的数据类型
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // 启动的协程数量 val k = 1000 // 每个协程重复执行同一动作的次数 val time = measureTimeMillis { coroutineScope { // 协程的作用域 repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } var counter = AtomicInteger() @Test fun test_channel() = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter.incrementAndGet() } } println("Counter = $counter") } 复制代码
-
方法二: 对特定共享状态的所有访问权都限制在单个线程中
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // 启动的协程数量 val k = 1000 // 每个协程重复执行同一动作的次数 val time = measureTimeMillis { coroutineScope { // 协程的作用域 repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } val counterContext = newSingleThreadContext("CounterContext") var counter = 0 @Test fun test_channel() = runBlocking { withContext(Dispatchers.Default) { massiveRun { withContext(counterContext) { counter++ } } } println("Counter = $counter") } 复制代码
-
方法三:将线程限制是在更大段代码中执行的
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // 启动的协程数量 val k = 1000 // 每个协程重复执行同一动作的次数 val time = measureTimeMillis { coroutineScope { // 协程的作用域 repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } val counterContext = newSingleThreadContext("CounterContext") var counter = 0 @Test fun test_channel() = runBlocking { withContext(counterContext) { massiveRun { counter++ } } println("Counter = $counter") } 复制代码
-
方法四: 互斥锁
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // 启动的协程数量 val k = 1000 // 每个协程重复执行同一动作的次数 val time = measureTimeMillis { coroutineScope { // 协程的作用域 repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } val mutex = Mutex() var counter = 0 @Test fun test_channel() = runBlocking { withContext(Dispatchers.Default) { massiveRun { mutex.withLock { counter++ } } } println("Counter = $counter") } 复制代码
-
方法五: 使用
Actors
结合通道,区分加数据和取数据,一个 actor 是一个协程suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // 启动的协程数量 val k = 1000 // 每个协程重复执行同一动作的次数 val time = measureTimeMillis { coroutineScope { // 协程的作用域 repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 // actor 状态 for (msg in channel) { // 即将到来消息的迭代器 when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } } @Test fun test_channel(): Unit = runBlocking { val counter = counterActor() withContext(Dispatchers.Default) { massiveRun { counter.send(IncCounter) } } val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) println("Counter = ${response.await()}") counter.close() // 关闭该actor } // kotlin 1.4及之前需要单独或在sealed类中写继承类 sealed class CounterMsg object IncCounter : CounterMsg() class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() 复制代码