溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Kotlin續(xù)體、續(xù)體攔截器和調(diào)度器實(shí)例分析

發(fā)布時(shí)間:2022-08-01 14:02:17 來源:億速云 閱讀:116 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Kotlin續(xù)體、續(xù)體攔截器和調(diào)度器實(shí)例分析”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

一.Continuation

Continuation接口是協(xié)程中最核心的接口,代表著掛起點(diǎn)之后的續(xù)體,代碼如下:

public interface Continuation<in T> {
    // 續(xù)體的上下文
    public val context: CoroutineContext
    // 該方法用于恢復(fù)續(xù)體的執(zhí)行
    // result為掛起點(diǎn)執(zhí)行完成的返回值,T為返回值的類型
    public fun resumeWith(result: Result<T>)
}

Continuation圖解

Kotlin續(xù)體、續(xù)體攔截器和調(diào)度器實(shí)例分析

二.ContinuationInterceptor

ContinuationInterceptor接口繼承自Element接口,是協(xié)程中的續(xù)體攔截器,代碼如下:

public interface ContinuationInterceptor : CoroutineContext.Element {
    // 攔截器的Key
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    // 攔截器對續(xù)體進(jìn)行攔截時(shí)會調(diào)用該方法,并對continuation進(jìn)行緩存
    // 攔截判斷:根據(jù)傳入的continuation對象與返回的continuation對象是否相同
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    // 當(dāng)interceptContinuation方法攔截的協(xié)程執(zhí)行完畢后,會調(diào)用該方法
    public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        /* do nothing by default */
    }
    // get方法多態(tài)實(shí)現(xiàn)
    public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
        @OptIn(ExperimentalStdlibApi::class)
        if (key is AbstractCoroutineContextKey<*, *>) {
            @Suppress("UNCHECKED_CAST")
            return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
        }
        @Suppress("UNCHECKED_CAST")
        return if (ContinuationInterceptor === key) this as E else null
    }
    // minusKey方法多態(tài)實(shí)現(xiàn)
    public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
        @OptIn(ExperimentalStdlibApi::class)
        if (key is AbstractCoroutineContextKey<*, *>) {
            return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
        }
        return if (ContinuationInterceptor === key) EmptyCoroutineContext else this
    }
}

三.CoroutineDispatcher

CoroutineDispatcher類繼承自AbstractCoroutineContextElement類,實(shí)現(xiàn)了ContinuationInterceptor接口,是協(xié)程調(diào)度器的基類,代碼如下:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    // ContinuationInterceptor的多態(tài)實(shí)現(xiàn),調(diào)度器本質(zhì)上就是攔截器
    @ExperimentalStdlibApi
    public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
        ContinuationInterceptor,
        { it as? CoroutineDispatcher })
    // 用于判斷調(diào)度器是否要調(diào)用dispatch方法進(jìn)行調(diào)度,默認(rèn)為true
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    // 調(diào)度的核心方法,在這里進(jìn)行調(diào)度,執(zhí)行block
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    // 如果調(diào)度是由Yield方法觸發(fā)的,默認(rèn)通過dispatch方法實(shí)現(xiàn)
    @InternalCoroutinesApi
    public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
    // ContinuationInterceptor接口的方法,將續(xù)體包裹成DispatchedContinuation,并傳入當(dāng)前調(diào)度器
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    // 釋放父協(xié)程與子協(xié)程的關(guān)聯(lián)。
    @InternalCoroutinesApi
    public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
    }
    // 重載了"+"操作,直接返回others
    // 因?yàn)閮蓚€調(diào)度器相加沒有意義,同一個上下文中只能有一個調(diào)度器
    // 如果需要加的是調(diào)度器對象,則直接替換成最新的,因此直接返回
    public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
    override fun toString(): String = "$classSimpleName@$hexAddress"
}

四.EventLoop

EventLoop類繼承自CoroutineDispatcher類,用于協(xié)程中任務(wù)的分發(fā)執(zhí)行,只在runBlocking方法中和Dispatchers.Unconfined調(diào)度器中使用。與Handler中的Looper類似,在創(chuàng)建后會存儲在當(dāng)前線程的ThreadLocal中。EventLoop本身不支持延時(shí)執(zhí)行任務(wù),如果需要可以自行繼承EventLoop并實(shí)現(xiàn)Delay接口,EventLoop中預(yù)留了一部分變量和方法用于延時(shí)需求的擴(kuò)展。

為什么協(xié)程需要EventLoop呢?協(xié)程的本質(zhì)是續(xù)體傳遞,而續(xù)體傳遞的本質(zhì)是回調(diào),假設(shè)在Dispatchers.Unconfined調(diào)度下,要連續(xù)執(zhí)行多個suspend方法,就會有多個續(xù)體傳遞,假設(shè)suspend方法達(dá)到一定數(shù)量后,就會造成StackOverflow,進(jìn)而引起崩潰。同樣的,我們知道調(diào)用runBlocking會阻塞當(dāng)前線程,而runBlocking阻塞的原理就是執(zhí)行“死循環(huán)”,因此需要在循環(huán)中做任務(wù)的分發(fā),去執(zhí)行內(nèi)部協(xié)程在Dispatchers.Unconfined調(diào)度器下加入的任務(wù)。

EventLoop代碼如下:

internal abstract class EventLoop : CoroutineDispatcher() {
    // 用于記錄使用當(dāng)前EventLoop的runBlocking方法和Dispatchers.Unconfined調(diào)度器的數(shù)量
    private var useCount = 0L
    // 表示當(dāng)前的EventLoop是否被暴露給其他的線程
    // runBlocking會將EventLoop暴露給其他線程
    // 因此,當(dāng)runBlocking使用時(shí),shared必須為true
    private var shared = false
    // Dispatchers.Unconfined調(diào)度器的任務(wù)執(zhí)行隊(duì)列
    private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
    // 處理任務(wù)隊(duì)列的下一個任務(wù),該方法只能在EventLoop所在的線程調(diào)用
    // 返回值<=0,說明立刻執(zhí)行下一個任務(wù)
    // 返回值>0,說明等待這段時(shí)間后,執(zhí)行下一個任務(wù)
    // 返回值為Long.MAX_VALUE,說明隊(duì)列里沒有任務(wù)了 
    public open fun processNextEvent(): Long {
        if (!processUnconfinedEvent()) return Long.MAX_VALUE
        return 0
    }
    // 隊(duì)列是否為空
    protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
    // 下一個任務(wù)多長時(shí)間后執(zhí)行
    protected open val nextTime: Long
        get() {
            val queue = unconfinedQueue ?: return Long.MAX_VALUE
            return if (queue.isEmpty) Long.MAX_VALUE else 0L
        }
    // 任務(wù)的核心處理方法
    public fun processUnconfinedEvent(): Boolean {
        // 若隊(duì)列為空,則返回
        val queue = unconfinedQueue ?: return false
        // 從隊(duì)首取出一個任務(wù),如果為空,則返回
        val task = queue.removeFirstOrNull() ?: return false
        // 執(zhí)行
        task.run()
        return true
    }
    // 表示當(dāng)前EventLoop是否可以在協(xié)程上下文中被調(diào)用
    // EventLoop本質(zhì)上也是協(xié)程上下文
    // 如果EventLoop在runBlocking方法中使用,必須返回true
    public open fun shouldBeProcessedFromContext(): Boolean = false
    // 向隊(duì)列中添加一個任務(wù)
    public fun dispatchUnconfined(task: DispatchedTask<*>) {
        // 若隊(duì)列為空,則創(chuàng)建一個新的隊(duì)列
        val queue = unconfinedQueue ?:
            ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
        queue.addLast(task)
    }
    // EventLoop當(dāng)前是否還在被使用
    public val isActive: Boolean
        get() = useCount > 0
    // EventLoop當(dāng)前是否還在被Unconfined調(diào)度器使用
    public val isUnconfinedLoopActive: Boolean
        get() = useCount >= delta(unconfined = true)
    // 判斷隊(duì)列是否為空
    public val isUnconfinedQueueEmpty: Boolean
        get() = unconfinedQueue?.isEmpty ?: true
    // 下面三個方法用于計(jì)算使用當(dāng)前的EventLoop的runBlocking方法和Unconfined調(diào)度器的數(shù)量
    // useCount是一個64位的數(shù),
    // 它的高32位用于記錄Unconfined調(diào)度器的數(shù)量,低32位用于記錄runBlocking方法的數(shù)量
    private fun delta(unconfined: Boolean) =
        if (unconfined) (1L shl 32) else 1L
    fun incrementUseCount(unconfined: Boolean = false) {
        useCount += delta(unconfined)
        // runBlocking中使用,shared為true
        if (!unconfined) shared = true 
    }
    fun decrementUseCount(unconfined: Boolean = false) {
        useCount -= delta(unconfined)
        // 如果EventLoop還在被使用
        if (useCount > 0) return
        assert { useCount == 0L }
        // 如果EventLoop不被使用了,并且在EventLoop中使用過
        if (shared) {
            // 關(guān)閉相關(guān)資源,并在ThreadLocal中移除
            shutdown()
        }
    }
    protected open fun shutdown() {}
}

協(xié)程中提供了EventLoopImplBase類,間接繼承自EventLoop,實(shí)現(xiàn)了Delay接口,用來延時(shí)執(zhí)行任務(wù)。同時(shí),協(xié)程中還提供單例對象ThreadLocalEventLoop用于EventLoop在ThreadLocal中的存儲。

“Kotlin續(xù)體、續(xù)體攔截器和調(diào)度器實(shí)例分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI