文章19 | 阅读 10458 | 点赞0
我们在协程的第一篇就已经提过,协程的运行是依赖于线程的。那么协程与线程之间的关系到底是怎样的呢?
再看launch{}和aysnc{}:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
会存在一个context参数,我们目前还没有介绍过,我们可以通过这个参数,来指定使用哪个协程分发器,从而决定协程运行在哪个线程或者线程池上。
/**
* Base class that shall be extended by all coroutine dispatcher implementations.
*
* The following standard implementations are provided by `kotlinx.coroutines` as properties on
* [Dispatchers] objects:
*
* * [Dispatchers.Default] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
* is specified in their context. It uses a common pool of shared background threads.
* This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
* * [Dispatchers.IO] -- uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
* operations (like file I/O and blocking socket I/O).
* * [Dispatchers.Unconfined] -- starts coroutine execution in the current call-frame until the first suspension.
* On first suspension the coroutine builder function returns.
* The coroutine resumes in whatever thread that is used by the
* corresponding suspending function, without confining it to any specific thread or pool.
* **Unconfined dispatcher should not be normally used in code**.
* * Private thread pools can be created with [newSingleThreadContext] and [newFixedThreadPoolContext].
* * An arbitrary [Executor][java.util.concurrent.Executor] can be converted to dispatcher with [asCoroutineDispatcher] extension function.
*
* This class ensures that debugging facilities in [newCoroutineContext] function work properly.
*/
这是CoroutineDispatcher类的文档说明:CoroutineDispatcher类是所有协程分发器实现的基类。它有如下标准实现(作为Dispatchers类的属性存在):
下面我们就通过一个例子来说明这些协程分发器的作用:
fun main() = runBlocking<Unit> {
launch {
println("No params, thread : ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) {
println("Unconfined, thread: ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) {
println("Default, thread: ${Thread.currentThread().name}")
}
GlobalScope.launch {
println("GlobalScope, thread: ${Thread.currentThread().name}")
}
}
先看运行结果:
Unconfined, thread: main
Default, thread: DefaultDispatcher-worker-1
GlobalScope, thread: DefaultDispatcher-worker-3
No params, thread : main
可能每个人的输出结果的顺序都不太一样,这是正常的,但是每行的结果是一样的。
launch(Dispatchers.Default) {
println("Default, thread: ${Thread.currentThread().name}")
launch {
println("In Default, no params, thread: ${Thread.currentThread().name}")
}
}
现在的运行结果为:
Unconfined, thread: main
Default, thread: DefaultDispatcher-worker-1
In Default, no params, thread: DefaultDispatcher-worker-3
GlobalScope, thread: DefaultDispatcher-worker-2
No params, thread : main
注意第三行和第五行,同样的不带参数的launch,它所运行的线程是不一样的。
好,我们现在来总结协程的运行线程的判定:如果我们没有显式指定分发器,那么它会考虑从启动它的协程上下文去继承;如果我们显式指定了分发器,那么就使用指定的分发器来运行线程。如这里在Default中由launch{}启动的协程,它就会从外层launch(Dispatchers.Default)继承过来,再考虑外层的launch{}(输出结果为No params, thread : main),它是由runBlocking{}继承过来,由于runBlocking是运行在主线程中,所以它也是运行在主线程中。如果你还疑问runBlocking为什么运行在主线程,我们来看看runBlocking的实现:
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
可以看到,它会使用 Thread.currentThread()获取当前它所在的线程,由于我们这里阻塞的是main线程,所以它自然会运行在main线程上。
fun main() = runBlocking<Unit> {
launch(Dispatchers.Unconfined) {
println("before delay: thread -> " + Thread.currentThread().name)
delay(100)
println("after delay: thread -> " + Thread.currentThread().name)
}
}
输出结果为:
before delay: thread -> main
after delay: thread -> kotlinx.coroutines.DefaultExecutor
会发现在调用delay方法前后,它所运行的线程是不一样的。那么它的运行机制到底是怎样的呢?使用Dispatchers.Unconfined分发器的协程,它会在运行在启动它的协程上下文中去继承,这里也就是main线程,直到遇到第一个挂起点(也就是这里的delay挂起函数);当它从挂起函数中恢复执行后,它所运行的线程就变成了挂起函数所在的线程。
fun main() = runBlocking<Unit> {
val executorDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
launch(executorDispatcher) {
println("Executors, thread: ${Thread.currentThread().name}")
executorDispatcher.close()
}
}
Executors, thread: pool-1-thread-1
通过asCoroutineDispatcher()这个扩展方法,我们可以将newSingleThreadExecutor的线程池转换成一个分发器,然后使用这个分发器去启动我们的协程。这里有一个注意点,那就是一定要调用close关闭这个分发器。大家可以尝试注释掉executorDispatcher.close()这行代码,然后运行程序,你会发现,虽然控制台有结果输出,但是我们的程序并没有退出,就是由于我们自己创建的线程池一直在占用着资源。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/xlh1191860939/article/details/105021018
内容来源于网络,如有侵权,请联系作者删除!