Kotlin协程(一)——之语言特性

Kotlin协程(一)——之语言特性

Kotlin 协程简单介绍

初步印象

协程是线程之下更小的处理单元,本质上协程是轻量级的线程。本文章先把协程的简单用法梳理一遍,至于细节再在以后的文章里结合网络请求、IO操作在进行深入研究。

协程比较轻量

协程是基于线程的封装,底层还是依赖于线程

协程可以简化异步编程

相关概念

kotlinx.coroutines 是由 JetBrains 开发的功能丰富的协程库。它包含本指南中涵盖的很多启用高级协程的原语,包括 launchasync 等等。

协程基本使用

创建协程

  • 方式一: GlobalScope.launch 顶层协程,虽然它很轻量,但它运行时仍会消耗一些内存资源。全局协程类似守护线程,当其他协程执行完毕之后自动关闭。

    GlobalScope.launch {
        // 顶层协程
    }
    复制代码
  • 方式二: runBlocking 协程构建器将函数转换为协程,在执行操作所在的指定作用域内启动协程。

    fun main() = runBlocking {
        launch {
            delay(1000L)
            println("World!")
        }
        println("Hello,")
    }
    复制代码
  • 方式三:使用coroutineScope构建器声明自己的作用域,runBlockingcoroutineScope 类似都会等待其协程体以及所有子协程结束,runBlocking方法会阻塞当前线程来等待, 而 coroutineScope只是挂起。具体表现就是runBlocking中后续代码会在launch代码块之后执行, coroutineScope则相反。

    fun main() = runBlocking {
        launch {
            delay(200L)
            println("2. Task from runBlocking!")
        }
        coroutineScope {
            launch {
                delay(500L)
                println("3. Task from nested launch!")
            }
    
            delay(100L)
            println("1. Task from coroutineScope scrope!")
        }
        println("4. Coroutine scope is over!")
    }
    复制代码
  • 方式四:挂起函数, 对前面几种方法的抽取,为了和普通函数的作用域相区分使用关键字suspend

    fun main() = runBlocking {
        launch { doWork() }
        println("Hello,")
    }
    
    suspend fun doWork() {
        delay(1000L)
        println("World!")
    }
    复制代码

取消协程(超时)

  • 取消协程:通过Job的cancle()join() 或者 cancelAndJoin()。协程的取消是协作的,即执行完当前协程当中的任务之后关闭,协程会运行到了它结束为止。CoroutineScopeisActive可用于监控协程状态。
    fun main() = runBlocking {
        val job = launch {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        }
        delay(1300L)
        println("main: I'm tired of waiting!")
        job.cancel()          // 取消该作业
        job.join()            // 等待作业执行结束
        println("main: Now I can quit.")
    }
    复制代码
  • 超时协程: 使用 withTimeout 或者 withTimeoutOrNull函数来延迟取消追踪,前者会抛异常
    // withTimeout
    fun main() = runBlocking {
        withTimeout(1300L) {
            repeat(100) {i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        }
        println("Result is Done")
    }
    
    // withTimeoutOrNull
    fun main() = runBlocking {
        val result = withTimeoutOrNull(1300L) {
            repeat(2) {i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
            "Done"
        }
        println("Result is $result")
    }
    复制代码

挂起函数

  • 挂起函数默认是顺序执行

    fun main() = runBlocking {
        val time = measureTimeMillis {
            val one = doOne()
            val two = doTwo()
            println("The answer is ${one + two}")
        }
        println("Completed in $time ms")
    }
    
    suspend fun doOne(): Int {
        delay(1000L) // 假设我们在这里做了一些有用的事
        return 13
    }
    
    suspend fun doTwo(): Int {
        delay(1000L) // 假设我们在这里做了一些有用的事
        return 29
    }
    
    // 执行时间为 2015 ms 结果是 42
    复制代码
  • 使用 async 实现异步操作,async 可以通过将 start 参数设置为 CoroutineStart.LAZY变为惰性的,即延时加载,当调用start时才加载

    // 异步执行
    fun main() = runBlocking {
        val time = measureTimeMillis {
            val one = async { doOne() }
            val two = async { doTwo() }
            println("The answer is ${one.await() + two.await()}")
        }
        println("Completed in $time ms")
    }
    
    // 惰性启动
    fun main() = runBlocking {
        val time = measureTimeMillis {
            val one = async(start = CoroutineStart.LAZY) { doOne() }
            val two = async(start = CoroutineStart.LAZY) { doTwo() }
            // 不调用start() 默认按顺序执行   2034 ms
            println("The answer is ${one.await() + two.await()}")
    
            //调用start() 默认按异步执行      1035 ms
    //        one.start() // 启动第一个
    //        two.start() // 启动第二个
    //        println("The answer is ${one.await() + two.await()}")
        }
        println("Completed in $time ms")
    }
    
    suspend fun doOne(): Int {
        delay(1000L) // 假设我们在这里做了一些有用的事
        return 13
    }
    
    suspend fun doTwo(): Int {
        delay(1000L) // 假设我们在这里做了一些有用的事
        return 29
    }
    
    // 执行时间为 1045 ms 结果是 42
    复制代码
  • coroutineScope 实现结构化并发。asyncCoroutineScope 的扩展函数,因此可以直接用coroutineScope 抽取。

    fun main() = runBlocking {
        val time = measureTimeMillis {
            println("The answer is ${concurrentSum()}")
        }
        println("Completed in $time ms")
    }
    
    suspend fun concurrentSum() = coroutineScope {
        val one = async { doOne() }
        val two = async { doTwo() }
        one.await() + two.await()
    }
    
    suspend fun doOne(): Int {
        delay(1000L) // 假设我们在这里做了一些有用的事
        return 13
    }
    
    suspend fun doTwo(): Int {
        delay(1000L) // 假设我们在这里做了一些有用的事
        return 29
    }
    复制代码

调度器

  • 协程调度器确定了哪些线程或与线程相对应的协程执⾏,将协程限制在⼀个特定的线程执⾏,或将它分派到⼀个线程池,亦或是让它不受限地运⾏。
    • launch { } 不传参启动了承袭的上下⽂调度器,

    • Dispatchers.Unconfined 是⾮受限调度器,调⽤时启动了⼀个协程,恢复线程中的协程由被调⽤的挂起函数来决定

    • Dispatchers.DefaultGlobalScope.launch {},⽤共享的后台线程池

    • ExecutorCoroutineDispatcher 启动了⼀个新的线程

      fun main() = runBlocking<Unit> {
          launch {
              print("main runBlocking        I'm working in thread ${Thread.currentThread().name} \n")
          }
          launch(Dispatchers.Unconfined) {
              print("Unconfined              I'm working in thread ${Thread.currentThread().name} \n")
          }
          launch(Dispatchers.Default) {
              print("Default                 I'm working in thread ${Thread.currentThread().name} \n")
          }
          launch(newSingleThreadContext("MyOwnThread")) {
              print("newSingleThreadContext  I'm working in thread ${Thread.currentThread().name} \n")
          }
      }
      复制代码

流 Flow

  • Flow 类似Java8当中的 StreamFlow 对异步支持更加友好

  • 流使用emit 函数 发射 值,使用 collect 函数收集值,函数不再标有 suspend 修饰符

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(1000)
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        launch {
            for (k in 1..3) {
                println("I'm not blocked $k")
                delay(1000)
            }
        }
        simple().collect { value -> println(value) }
    }
    复制代码
  • 创建流

    • flow {} 构造器

      fun simple(): Flow<Int> = flow {
          for (i in 1..3) {
              delay(1000)
              emit(i)
          }
      }
      复制代码
    • flowOf构建器定义了一个发射固定值集的流

      fun simple(): Flow<Int> = flowOf(1, 2, 3)
      复制代码
    • 使用 .asFlow() 扩展函数,可以将各种集合与序列转换为流

      (1..3).asFlow().collect { value -> println(value) }
      复制代码
    • 常用函数 mapfiltertransformtake

    • 末端操作collecttoListtoSetfirstsinglereduceflod

    • 其他函数flowOn()更改上下文buffer()缓冲conflate()合并

通道 Channel

  • 通道提供了一种在流中传输值的方法

  • Channel提供挂起的sendreceive

    @Test
    fun test_channel() =  runBlocking {
        val channel = Channel<Int>()
        launch {
            // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
            for (x in 1..5) channel.send(x * x)
        }
        // here we print five received integers:
        repeat(5) { println(channel.receive()) }
        println("Done!")
    }
    复制代码
  • Channel可以保证所有先前发送出去的元素都在通道关闭前被接收到

  • Channel实现自SendChannelReceiveChannel

异常处理

  • 协程构建器有两种形式:自动传播异常(launchactor)或向用户暴露异常(asyncproduce),前者这类构建器将异常视为未捕获异常,直接抛出,后者依赖用户来最终消费异常

    fun test_channel() =  runBlocking {
        val job = GlobalScope.launch { // launch 根协程
            println("Throwing exception from launch")
            throw IndexOutOfBoundsException() // 我们将在控制台打印 Thread.defaultUncaughtExceptionHandler
        }
        job.join()
        println("Joined failed job")
        val deferred = GlobalScope.async { // async 根协程
            println("Throwing exception from async")
            throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
        }
        try {
            deferred.await()
            println("Unreached")
        } catch (e: ArithmeticException) {
            println("Caught ArithmeticException")
        }
    }
    复制代码

竞态与并发

  • 协程可用多线程调度器并发执行,需要考虑同步访问共享的可变状态

  • 方法一:对线程、协程都有效的常规解决方法,使用线程安全的数据类型

    suspend fun massiveRun(action: suspend () -> Unit) {
        val n = 100  // 启动的协程数量
        val k = 1000 // 每个协程重复执行同一动作的次数
        val time = measureTimeMillis {
            coroutineScope { // 协程的作用域
                repeat(n) {
                    launch {
                        repeat(k) { action() }
                    }
                }
            }
        }
        println("Completed ${n * k} actions in $time ms")
    }
    
    var counter = AtomicInteger()
    
    @Test
    fun test_channel() = runBlocking {
        withContext(Dispatchers.Default) {
            massiveRun {
                counter.incrementAndGet()
            }
        }
        println("Counter = $counter")
    }
    复制代码
  • 方法二: 对特定共享状态的所有访问权都限制在单个线程中

    suspend fun massiveRun(action: suspend () -> Unit) {
        val n = 100  // 启动的协程数量
        val k = 1000 // 每个协程重复执行同一动作的次数
        val time = measureTimeMillis {
            coroutineScope { // 协程的作用域
                repeat(n) {
                    launch {
                        repeat(k) { action() }
                    }
                }
            }
        }
        println("Completed ${n * k} actions in $time ms")
    }
    
    val counterContext = newSingleThreadContext("CounterContext")
    var counter = 0
    
    @Test
    fun test_channel() = runBlocking {
        withContext(Dispatchers.Default) {
            massiveRun {
                withContext(counterContext) {
                    counter++
                }
            }
        }
        println("Counter = $counter")
    }
    复制代码
  • 方法三:将线程限制是在更大段代码中执行的

    suspend fun massiveRun(action: suspend () -> Unit) {
        val n = 100  // 启动的协程数量
        val k = 1000 // 每个协程重复执行同一动作的次数
        val time = measureTimeMillis {
            coroutineScope { // 协程的作用域
                repeat(n) {
                    launch {
                        repeat(k) { action() }
                    }
                }
            }
        }
        println("Completed ${n * k} actions in $time ms")
    }
     
    val counterContext = newSingleThreadContext("CounterContext")
    var counter = 0
    
    @Test
    fun test_channel() = runBlocking {
        withContext(counterContext) {
            massiveRun {
                counter++
            }
        }
        println("Counter = $counter")
    }
    复制代码
  • 方法四: 互斥锁

    suspend fun massiveRun(action: suspend () -> Unit) {
        val n = 100  // 启动的协程数量
        val k = 1000 // 每个协程重复执行同一动作的次数
        val time = measureTimeMillis {
            coroutineScope { // 协程的作用域
                repeat(n) {
                    launch {
                        repeat(k) { action() }
                    }
                }
            }
        }
        println("Completed ${n * k} actions in $time ms")
    }
    
    val mutex = Mutex()
    var counter = 0
    
    @Test
    fun test_channel() = runBlocking {
        withContext(Dispatchers.Default) {
            massiveRun {
                mutex.withLock {
                    counter++
                }
            }
        }
        println("Counter = $counter")
    }
    复制代码
  • 方法五: 使用Actors结合通道,区分加数据和取数据,一个 actor 是一个协程

    suspend fun massiveRun(action: suspend () -> Unit) {
        val n = 100  // 启动的协程数量
        val k = 1000 // 每个协程重复执行同一动作的次数
        val time = measureTimeMillis {
            coroutineScope { // 协程的作用域
                repeat(n) {
                    launch {
                        repeat(k) { action() }
                    }
                }
            }
        }
        println("Completed ${n * k} actions in $time ms")
    }
    
    fun CoroutineScope.counterActor() = actor<CounterMsg> {
        var counter = 0 // actor 状态
        for (msg in channel) { // 即将到来消息的迭代器
            when (msg) {
                is IncCounter -> counter++
                is GetCounter -> msg.response.complete(counter)
            }
        }
    }
    
    @Test
    fun test_channel(): Unit = runBlocking {
        val counter = counterActor()
        withContext(Dispatchers.Default) {
            massiveRun {
                counter.send(IncCounter)
            }
        }
        val response = CompletableDeferred<Int>()
        counter.send(GetCounter(response))
        println("Counter = ${response.await()}")
        counter.close() // 关闭该actor
    }
    
    // kotlin 1.4及之前需要单独或在sealed类中写继承类
    sealed class CounterMsg
    object IncCounter : CounterMsg()
    class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
    复制代码

参考资料

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享