编辑本页

目录

协程上下文与调度器

协程总是运行在一些以 CoroutineContext 类型为代表的上下文中,它们被定义在了 Kotlin 的标准库里。

协程上下文是各种不同元素的集合。其中主元素是协程中的 Job, 我们在前面的文档中见过它以及它的调度器,而本文将对它进行介绍。

调度器与线程

协程上下文包括了一个 协程调度器 (请参见 CoroutineDispatcher),它确定了相应的协程在执行时使用一个或多个线程。协程调度器可以将协程的执行局限在指定的线程中,调度它运行在线程池中或让它不受限的运行。

所有的协程构建器诸如 launchasync 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。

尝试下面的示例:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    launch { // 运行在父协程的上下文中,即 runBlocking 主协程
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { // 将会获取默认调度器
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
//sampleEnd    
}

可以在这里获取完整代码。

它执行后得到了如下输出(也许顺序会有所不同):

Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

当调用 launch { …… } 时不传参数,它从启动了它的 CoroutineScope 中承袭了上下文(以及调度器)。在这个案例中,它从 main 线程中的 runBlocking 主协程承袭了上下文。

Dispatchers.Unconfined 是一个特殊的调度器且似乎也运行在 main 线程中,但实际上, 它是一种不同的机制,这会在后文中讲到。

该默认调度器,当协程在 GlobalScope 中启动的时候被使用, 它代表 Dispatchers.Default 使用了共享的后台线程池, 所以 GlobalScope.launch { …… } 也可以使用相同的调度器—— launch(Dispatchers.Default) { …… }

newSingleThreadContext 为协程的运行启动了一个线程。 一个专用的线程是一种非常昂贵的资源。 在真实的应用程序中两者都必须被释放,当不再需要的时候,使用 close 函数,或存储在一个顶级变量中使它在整个应用程序中被重用。

非受限调度器 vs 受限调度器

Dispatchers.Unconfined 协程调度器会在程序运行到第一个挂起点时,在调用者线程中启动。挂起后,它将在挂起函数执行的线程中恢复,恢复的线程完全取决于该挂起函数在哪个线程执行。非受限调度器适合协程不消耗 CPU 时间也不更新任何限于特定线程的共享数据(如 UI)的情境。

另一方面,协程调度器默认承袭外部的 CoroutineScope 的调度器。 特别是 runBlocking 的默认协程调度器仅限于调用者线程,因此承袭它将会把执行限制在该线程中, 并具有可预测的 FIFO 调度的效果。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    launch(Dispatchers.Unconfined) { // 非受限的——将和主线程一起工作
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch { // 父协程的上下文,主 runBlocking 协程
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
//sampleEnd    
}

可以在这里获取完整代码。

执行后的输出:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

因此,该协程从 runBlocking {……} 协程中承袭了上下文并在主线程中执行,同时使用非受限调度器的协程从被执行 delay 函数的默认执行者线程中恢复。

非受限的调度器是一种高级机制,可以在某些极端情况下提供帮助而不需要调度协程以便稍后执行或产生不希望的副作用, 因为某些操作必须立即在协程中执行。 非受限调度器不应该被用在通常的代码中。

调试协程与线程

协程可以在一个线程上挂起并在其它线程上恢复。 甚至一个单线程的调度器也是难以弄清楚协程在何时何地正在做什么事情。使用通常调试应用程序的方法是让线程在每一个日志文件的日志声明中打印线程的名字。这种特性在日志框架中是普遍受支持的。但是在使用协程时,单独的线程名称不会给出很多协程上下文信息,所以 kotlinx.coroutines 包含了调试工具来让它更简单。

使用 -Dkotlinx.coroutines.debug JVM 参数运行下面的代码:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking<Unit> {
//sampleStart
    val a = async {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
//sampleEnd    
}

可以在这里获取完整代码。

这里有三个协程,其中主协程是 (#1) —— runBlocking, 而另外两个协程计算延期的值 a (#2) 和 b (#3)。 它们都在 runBlocking 上下文中执行并且被限制在了主线程内。 这段代码的输出如下:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

log 函数将线程的名字打印在了方括号中,你可以看到是 main 线程,但是当前正在执行的协程的标识符被附加到它上面。当调试模式开启的时候该标识符被连续赋值给所有已经被创建的协程。

你可以在 newCoroutineContext 函数的文档中阅读有关调试工具的更多信息。

在不同线程间跳转

使用 -Dkotlinx.coroutines.debug JVM 参数运行下面的代码(参见调试):

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() {
//sampleStart
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
//sampleEnd    
}

可以在这里获取完整代码。

它演示了一些新技术。其中一个使用 runBlocking 来显式指定了一个上下文,并且另一个使用 withContext 函数来改变协程的上下文,而仍然驻留在相同的协程中,你可以在下面的输出中看到:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

注意,在这个例子中,当我们不再需要某个在 newSingleThreadContext 中创建的线程的时候, 它使用了 Kotlin 标准库中的 use 函数来释放该线程。

上下文中的任务

协程的 Job 是它上下文中的一部分。协程可以在它所属的上下文中使用 coroutineContext[Job] 表达式来取回它:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    println("My job is ${coroutineContext[Job]}")
//sampleEnd    
}

可以在这里获取完整代码。

当它运行于调试模式时将处理一些任务:

My job is "coroutine#1":BlockingCoroutine{Active}@6d311334

请注意,CoroutineScope 中的 isActive 只是 coroutineContext[Job]?.isActive == true 的一种方便的快捷方式。

子协程

当一个协程被其它协程在 CoroutineScope 中启动的时候, 它将通过 CoroutineScope.coroutineContext 来承袭上下文,并且这个新协程的 Job 将会成为父协程任务的 任务。当一个父协程被取消的时候,所有它的子协程也会被递归的取消。

然而,当 GlobalScope 被用来启动一个协程时,它与作用域无关且是独立被启动的。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    // 启动一个协程来处理某种传入请求(request)
    val request = launch {
        // 孵化了两个子任务, 其中一个通过 GlobalScope 启动
        GlobalScope.launch {
            println("job1: I run in GlobalScope and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // 另一个则承袭了父协程的上下文
        launch {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
    }
    delay(500)
    request.cancel() // 取消请求(request)的执行
    delay(1000) // 延迟一秒钟来看看发生了什么
    println("main: Who has survived request cancellation?")
//sampleEnd
}

可以在这里获取完整代码。

这段代码的输出如下:

job1: I run in GlobalScope and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

父协程的职责

一个父协程总是等待所有的子协程执行结束。父协程并不显式的跟踪所有子协程的启动以及不必使用 Job.join 在最后的时候等待它们:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    // 启动一个协程来处理某种传入请求(request)
    val request = launch {
        repeat(3) { i -> // 启动少量的子任务
            launch  {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒的时间
                println("Coroutine $i is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join() // 等待请求的完成,包括其所有子协程
    println("Now processing of the request is complete")
//sampleEnd
}

可以在这里获取完整代码。

结果如下所示:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

命名协程以用于调试

协程日志会频繁记录的时候以及当你只是需要来自相同协程的关联日志记录, 自动分配 id 是非常棒的。然而,当协程与执行一个明确的请求或与执行一些显式的后台任务有关的时候,出于调试的目的给它明确的命名是更好的做法。 CoroutineName 上下文元素可以给线程像给函数命名一样命名。它在协程被执行且调试模式被开启时将显示线程的名字。

下面的例子演示了这一概念:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking(CoroutineName("main")) {
//sampleStart
    log("Started main coroutine")
    // 运行两个后台值计算
    val v1 = async(CoroutineName("v1coroutine")) {
        delay(500)
        log("Computing v1")
        252
    }
    val v2 = async(CoroutineName("v2coroutine")) {
        delay(1000)
        log("Computing v2")
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
//sampleEnd    
}

可以在这里获取完整代码。

程序执行使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

[main @main#1] Started main coroutine
[main @v1coroutine#2] Computing v1
[main @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

组合上下文中的元素

有时我们需要在协程上下文中定义多个元素。我们可以使用 + 操作符来实现。 比如说,我们可以显式指定一个调度器来启动协程并且同时显式指定一个命名:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("I'm working in thread ${Thread.currentThread().name}")
    }
//sampleEnd    
}

可以在这里获取完整代码。

这段代码使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

I'm working in thread DefaultDispatcher-worker-1 @test#2

Coroutine scope

让我们把有关上下文、子协程以及任务的知识梳理一下。假设我们的应用程序中有一个在生命周期中的对象,但这个对象并不是协程。假如,我们写了一个 Android 应用程序并在上下文中启动了多个协程来为 Android activity 进行异步操作来拉取以及更新数据,或作动画等。当 activity 被销毁的时候这些协程必须被取消以防止内存泄漏。We, of course, can manipulate contexts and jobs manually to tie activity's and coroutines lifecycles, but kotlinx.coroutines provides an abstraction that encapsulates that: CoroutineScope. You should be already familiar with coroutine scope as all coroutine builders are declared as extensions on it.

We manage a lifecycle of our coroutines by creating an instance of CoroutineScope that is tied to the lifecycle of our activity. CoroutineScope instance can be created by CoroutineScope() or MainScope() factory functions. The former creates a general-purpose scope, while the latter creates scope for UI applications and uses Dispatchers.Main as default dispatcher:

class Activity {
    private val mainScope = MainScope()

    fun destroy() {
        mainScope.cancel()
    }
    // 继续运行……

Alternatively, we can implement CoroutineScope interface in this Activity class. The best way to do it is to use delegation with default factory functions. We also can combine the desired dispatcher (we used Dispatchers.Default in this example) with the scope:

    class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {
    // to be continued ...

现在,在这个 Activity 的作用域中启动协程,且没有明确指定它们的上下文。在示例中,我们启动了十个协程并延迟不同的时间:

    // 在 Activity 类中
    fun doSomething() {
        // 在示例中启动了 10 个协程,且每个都工作了不同的时长
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

在我们的 main 函数中我们创建了 activity,调用我们的 doSomething 测试函数,并在 500 毫秒后销毁 activity, 所有已经启动了的协程都被取消了,如果我们等待的话可以确认没有任何东西被打印在屏幕上:

import kotlin.coroutines.*
import kotlinx.coroutines.*

class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {

    fun destroy() {
        cancel() // Extension on CoroutineScope
    }
    // 继续运行……

    // class Activity continues
    fun doSomething() {
        // 在示例中启动了 10 个协程,且每个都工作了不同的时长
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

fun main() = runBlocking<Unit> {
//sampleStart
    val activity = Activity()
    activity.doSomething() // 运行测试函数
    println("Launched coroutines")
    delay(500L) // 延迟半秒钟
    println("Destroying activity!")
    activity.destroy() // 取消所有的协程
    delay(1000) // 为了在视觉上确认它们没有工作
//sampleEnd    
}

可以在这里获取完整代码。

这个示例的输出如下所示:

Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

你可以看到,只有前两个协程打印了消息,而另一个协程在 Activity.destroy() 中被单次调用了 job.cancel()

线程局部数据

有时能够传递一些线程局部的数据很方便,但是,对于协程来说,它们不受任何特定线程的约束,所以很难手动的去实现它并且不写出大量的样板代码。

ThreadLocalasContextElement 扩展函数在这里会充当救兵。它创建了额外的上下文元素, 且保留给定 ThreadLocal 的值,并在每次协程切换其上下文时恢复它。

它很容易在下面的代码中演示:

import kotlinx.coroutines.*

val threadLocal = ThreadLocal<String?>() // 声明线程局部变量

fun main() = runBlocking<Unit> {
//sampleStart
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
//sampleEnd    
}

可以在这里获取完整代码。

在这个例子中我们使用 Dispatchers.Default 在后台线程池中启动了一个新的协程,所以它工作在线程池中的不同线程中,但它仍然具有线程局部变量的值, 我们指定使用 threadLocal.asContextElement(value = "launch"), 无论协程执行在什么线程中都是没有问题的。 因此,输出如(调试)所示:

Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

Note how easily one may forget the corresponding context element and then still safely access thread local. To avoid such situations, it is recommended to use ensurePresent method and fail-fast on improper usages.

ThreadLocal 具有一流的支持,可以与任何原始的 kotlinx.coroutines 一起使用。 它有一个关键限制:当线程局部发生突变,新值不会传递到协程调用者中 (作为上下文元素不能跟踪所有的 ThreadLocal 对象访问)并且下次挂起时更新的值将丢失。 在协程中使用 withContext 线程局部的值,可以查看 asContextElement 的更多细节。

另外,一个值可以存储在一个可变的域中,例如 class Counter(var i: Int),是的,反过来, 可以存储在线程局部的变量中。然而,在这个案例中你完全有责任来进行同步可能的对这个可变的域进行的并发的修改。

对于高级的使用,例如,那些在内部使用线程局部传递数据的用于与日志记录 MDC 集成,以及事务上下文或任何其它库,请参阅应实现的 ThreadContextElement 接口的文档。