编辑本页

目录

协程上下文与调度器

协程总是运行在一些以 CoroutineContext 类型为代表的上下文中,它们被定义在了 Kotlin 的标准库里。

协程上下文是各种不同元素的集合。其中主元素是协程中的 Job, 我们在前面的文档中见过它以及它的调度器,而本文将对它进行介绍。

调度器与线程

协程上下文包括了一个 协程调度器 (请参见 CoroutineDispatcher),它确定了相应的协程在执行时使用一个或多个线程。协程调度器可以将协程的执行局限在指定的线程中,调度它运行在线程池中或让它不受限的运行。

所有的协程构建器诸如 launchasync 接收一个可选的 CoroutineContext 参数,它可以被用来显式的为一个新协程或其它上下文元素指定一个调度器。

尝试下面的示例:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    launch { // 运行在父协程的上下文中,即 runBlocking 主协程
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { // 将会获取默认调度器
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) { // 将使它获得一个新的线程
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
//sampleEnd    
}

你可以点击这里获得完整代码

它执行后得到了如下输出(也许顺序会有所不同):

Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

当调用 launch { …… } 时不传参数,它从启动了它的 CoroutineScope 中承袭了上下文(以及调度器)。在这个案例中,它从 main 线程中的 runBlocking 主协程承袭了上下文。

Dispatchers.Unconfined 是一个特殊的调度器且似乎也运行在 main 线程中,但实际上, 它是一种不同的机制,这会在后文中讲到。

该默认调度器,当协程在 GlobalScope 中启动的时候被使用, 它代表 Dispatchers.Default 使用了共享的后台线程池, 所以 GlobalScope.launch { …… } 也可以使用相同的调度器—— launch(Dispatchers.Default) { …… }

newSingleThreadContext 为协程的运行启动了一个新的线程。 一个专用的线程是一种非常昂贵的资源。 在真实的应用程序中两者都必须被释放,当不再需要的时候,使用 close 函数,或存储在一个顶级变量中使它在整个应用程序中被重用。

非受限调度器 vs 受限调度器

Dispatchers.Unconfined 协程调度器在被调用的线程中启动协程,但是这只有直到程序运行到第一个挂起点的时候才行。挂起后,它将在完全由该所运行的线程中恢复挂起被调用的函数。非受限的调度器是合适的,当协程没有消耗 CPU 时间或更新共享数据(比如UI界面)时它被限制在了指定的线程中。

另一方面,默认的,一个调度器承袭自外部的 CoroutineScope。 而 runBlocking 协程的默认调度器,特别是, 被限制在调用它的线程,因此承袭它在限制有可预测的 FIFO 调度的线程的执行上是非常有效果的。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    launch(Dispatchers.Unconfined) { // 非受限的——将和主线程一起工作
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch { // 父协程的上下文,主 runBlocking 协程
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
//sampleEnd    
}

你可以点击这里获得完整代码

执行后的输出:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

因此,该协程从 runBlocking {……} 协程中承袭了上下文并在主线程中执行,同时使用非受限调度器的协程从被执行 delay 函数的默认执行者线程中恢复。

非受限的调度器是一种高级机制,可以在某些极端情况下提供帮助而不需要调度协程以便稍后执行或产生不希望的副作用, 因为某些操作必须立即在协程中执行。 非受限调度器不应该被用在通常的代码中。

调试协程与线程

协程可以在一个线程上挂起并恢复其它线程。 甚至一个单线程的调度器是非常难以弄清楚协程何时,在哪里,正在做什么的。使用通常的方法来调试应用程序是让线程在每一个日志文件的日志声明中打印线程的名字。这种特性在日志框架中是普遍受支持的。使用协程时,单独的线程名称不会给出很多上下文,所以 kotlinx.coroutines 包含了调试工具来让它更简单。

使用 -Dkotlinx.coroutines.debug JVM 参数运行下面的代码:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking<Unit> {
//sampleStart
    val a = async {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
//sampleEnd    
}

你可以点击这里获得完整代码

这里有三个协程,其中主协程是 (#1) —— runBlocking, 而另外两个协程计算延期的值 a (#2) 和 b (#3)。 它们都在 runBlocking 上下文中执行并且被限制在了主线程内。 这段代码的输出如下:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

log 函数将线程的名字打印在了方括号中,你可以看到是 main 线程,但是当前正在执行的协程的标识符被附加到它上面。当调试模式开启的时候该标识符被连续赋值给所有已经被创建的协程。

你可以在 newCoroutineContext 函数的文档中阅读有关调试工具的更多信息。

在不同线程间跳转

使用 -Dkotlinx.coroutines.debug JVM 参数运行下面的代码(参见调试):

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() {
//sampleStart
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
//sampleEnd    
}

你可以点击这里获得完整代码

它演示了一些新技术。其中一个使用 runBlocking 来显式指定了一个上下文,并且另一个使用 withContext 函数来改变协程的上下文,而仍然驻留在相同的协程中,你可以在下面的输出中看到:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

注意,在这个例子中,当我们不再需要某个在 newSingleThreadContext 中创建的线程的时候, 它使用了 Kotlin 标准库中的 use 函数来释放该线程。

上下文中的任务

协程的 Job 是它上下文中的一部分。协程可以在它所属的上下文中使用 coroutineContext[Job] 表达式来取回它:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    println("My job is ${coroutineContext[Job]}")
//sampleEnd    
}

你可以点击这里获得完整代码

当它运行于调试模式时将处理一些任务:

My job is "coroutine#1":BlockingCoroutine{Active}@6d311334

请注意,CoroutineScope 中的 isActive 只是 coroutineContext[Job]?.isActive == true 的一种方便的快捷方式。

子协程

当一个协程被其它协程在 CoroutineScope 中启动的时候, 它将通过 CoroutineScope.coroutineContext 来承袭上下文,并且这个新协程的 Job 将会成为父协程任务的 任务。当一个父协程被取消的时候,所有它的子协程也会被递归的取消。

然而,当 GlobalScope 被用来启动一个协程时,它与作用域无关且是独立被启动的。

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    // 启动一个协程来处理某种传入请求(request)
    val request = launch {
        // 孵化了两个子任务, 其中一个通过 GlobalScope 启动
        GlobalScope.launch {
            println("job1: I run in GlobalScope and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // 另一个则承袭了父协程的上下文
        launch {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
    }
    delay(500)
    request.cancel() // 取消请求(request)的执行
    delay(1000) // 延迟一秒钟来看看发生了什么
    println("main: Who has survived request cancellation?")
//sampleEnd
}

你可以点击这里获得完整代码

这段代码的输出如下:

job1: I run in GlobalScope and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

父协程的职责

一个父协程总是等待所有的子协程执行结束。父协程并不显式的跟踪所有子协程的启动以及不必使用 Job.join 在最后的时候等待它们:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    // 启动一个协程来处理某种传入请求(request)
    val request = launch {
        repeat(3) { i -> // 启动少量的子任务
            launch  {
                delay((i + 1) * 200L) // 延迟200毫秒、400毫秒、600毫秒的时间
                println("Coroutine $i is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join() // 等待请求的完成,包括其所有子协程
    println("Now processing of the request is complete")
//sampleEnd
}

你可以点击这里获得完整代码

结果如下所示:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

命名协程以用于调试

协程日志会频繁记录的时候以及当你只是需要来自相同协程的关联日志记录, 自动分配 id 是非常棒的。然而,当协程与执行一个明确的请求或与执行一些显式的后台任务有关的时候,出于调试的目的给它明确的命名是更好的做法。 CoroutineName 上下文元素可以给线程像给函数命名一样命名。它在协程被执行且调试模式被开启时将显示线程的名字。

下面的例子演示了这一概念:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking(CoroutineName("main")) {
//sampleStart
    log("Started main coroutine")
    // 运行两个后台值计算
    val v1 = async(CoroutineName("v1coroutine")) {
        delay(500)
        log("Computing v1")
        252
    }
    val v2 = async(CoroutineName("v2coroutine")) {
        delay(1000)
        log("Computing v2")
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
//sampleEnd    
}

你可以点击这里获得完整代码

程序执行使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

[main @main#1] Started main coroutine
[main @v1coroutine#2] Computing v1
[main @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

组合上下文中的元素

有时我们需要在协程上下文中定义多个元素。我们可以使用 + 操作符来实现。 比如说,我们可以显式指定一个调度器来启动协程并且同时显式指定一个命名:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//sampleStart
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("I'm working in thread ${Thread.currentThread().name}")
    }
//sampleEnd    
}

你可以点击这里获得完整代码

这段代码使用了 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

I'm working in thread DefaultDispatcher-worker-1 @test#2

通过显式任务取消

让我们把有关上下文、子协程以及任务的知识梳理一下。假设我们的应用程序中有一个在生命周期中的对象,但这个对象并不是协程。假如,我们写了一个 Android 应用程序并在上下文中启动了多个协程来为 Android activity 进行异步操作来拉取以及更新数据,或作动画等。当 activity 被销毁的时候这些协程必须被取消以防止内存泄漏。

我们通过创建一个 Job 的实例来管理协程的生命周期,并让它与我们的 activity 的生命周期相关联。当一个 activity 被创建的时候一个任务(job)实例被使用 Job() 工厂函数创建,并且当这个 activity 被销毁的时候它也被取消,就像下面这样:

class Activity : CoroutineScope {
    lateinit var job: Job

    fun create() {
        job = Job()
    }

    fun destroy() {
        job.cancel()
    }
    // 继续运行……

我们也可以在 Actvity 类中实现 CoroutineScope 接口。我们只需提供一个覆盖的 CoroutineScope.coroutineContext 属性来在作用域中为协程指定上下文。我们结合所需要的调度器(我们在这个例子中使用 Dispatchers.Default)和任务:

    // 在 Activity 类中
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job
    // 继续运行……

现在,在这个 Activity 的作用域中启动协程,且没有明确指定它们的上下文。在示例中,我们启动了十个协程并延迟不同的时间:

    // 在 Activity 类中
    fun doSomething() {
        // 在示例中启动了10个协程,且每个都工作了不同的时长
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // 延迟200毫秒、400毫秒、600毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

在我们的 main 函数中我们创建了 activity,调用我们的 doSomething 测试函数,并在500毫秒后销毁 activity, 所有已经启动了的协程都被取消了,如果我们等待的话可以确认没有任何东西被打印在屏幕上:

import kotlin.coroutines.*
import kotlinx.coroutines.*

class Activity : CoroutineScope {
    lateinit var job: Job

    fun create() {
        job = Job()
    }

    fun destroy() {
        job.cancel()
    }
    // 继续运行……

    // Activity 类继续
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Default + job
    // 继续运行……

    // Activity 类继续
    fun doSomething() {
        // 在示例中启动了10个协程,且每个都工作了不同的时长
        repeat(10) { i ->
            launch {
                delay((i + 1) * 200L) // 延迟200毫秒、400毫秒、600毫秒等等不同的时间
                println("Coroutine $i is done")
            }
        }
    }
} // Activity 类结束

fun main() = runBlocking<Unit> {
//sampleStart
    val activity = Activity()
    activity.create() // 启动一个 activity
    activity.doSomething() // 运行测试函数
    println("Launched coroutines")
    delay(500L) // 延迟半秒钟
    println("Destroying activity!")
    activity.destroy() // 取消所有的协程
    delay(1000) // 为了在视觉上确认它们没有工作
//sampleEnd    
}

你可以点击这里获得完整代码

这个示例的输出如下所示:

Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

你可以看到,只有前两个协程打印了消息,而另一个协程在 Activity.destroy() 中被单次调用了 job.cancel()

线程局部数据

有时能够传递一些线程局部的数据很方便,但是,对于协程来说,它们不受任何特定线程的约束,所以很难手动的去实现它并且不写出大量的样板代码。

ThreadLocalasContextElement 扩展函数在这里会充当救兵。它创建了额外的上下文元素, 且保留给定 ThreadLocal 的值,并在每次协程切换其上下文时恢复它。

它很容易在下面的代码中演示:

import kotlinx.coroutines.*

val threadLocal = ThreadLocal<String?>() // 声明线程局部变量

fun main() = runBlocking<Unit> {
//sampleStart
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
       println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
//sampleEnd    
}

你可以点击这里获得完整代码

在这个例子中我们使用 Dispatchers.Default 在后台线程池中启动了一个新的协程,所以它工作在线程池中的不同线程中,但它仍然具有线程局部变量的值, 我们指定使用 threadLocal.asContextElement(value = "launch"), 无论协程执行在什么线程中都是没有问题的。 因此,输出如(调试)所示:

Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

ThreadLocal 具有一流的支持,可以与任何原始的 kotlinx.coroutines 一起使用。 它有一个关键限制:当线程局部发生突变,新值不会传递到协程调用者中 (作为上下文元素不能跟踪所有的 ThreadLocal 对象访问)并且下次挂起时更新的值将丢失。 在协程中使用 withContext 线程局部的值,可以查看 asContextElement 的更多细节。

另外,一个值可以存储在一个可变的域中,例如 class Counter(var i: Int),是的,反过来, 可以存储在线程局部的变量中。然而,在这个案例中你完全有责任来进行同步可能的对这个可变的域进行的并发的修改。

对于高级的使用,例如,那些在内部使用线程局部传递数据的用于与日志记录 MDC 集成,以及事务上下文或任何其它库,请参阅应实现的 ThreadContextElement 接口的文档。