您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Kotlin協(xié)程概念原理與使用實例分析”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
協(xié)程是Coroutine的中文簡稱,co表示協(xié)同、協(xié)作,routine表示程序。協(xié)程可以理解為多個互相協(xié)作的程序。協(xié)程是輕量級的線程,它的輕量體現(xiàn)在啟動和切換,協(xié)程的啟動不需要申請額外的堆棧空間;協(xié)程的切換發(fā)生在用戶態(tài),而非內(nèi)核態(tài),避免了復(fù)雜的系統(tǒng)調(diào)用。
1)更加輕量級,占用資源更少。
2)避免“回調(diào)地獄”,增加代碼可讀性。
3)協(xié)程的掛起不阻塞線程。
Kotlin協(xié)程原理核心體現(xiàn)在“續(xù)體傳遞”與“狀態(tài)機”兩部分。
1)續(xù)體傳遞
續(xù)體傳遞是一種代碼編寫風(fēng)格——續(xù)體傳遞風(fēng)格(Continuation-Passing-Style),簡稱為CPS。續(xù)體傳遞本質(zhì)上是代碼的回調(diào)與結(jié)果的傳遞。假設(shè)將順序執(zhí)行代碼分成兩部分,第一部分執(zhí)行完成,返回一個結(jié)果(可能為空、一個對象引用、一個具體的值)。接著通過回調(diào)執(zhí)行第二部分代碼,并傳入第一部分代碼返回的結(jié)果,這種形式的代碼編寫風(fēng)格就是續(xù)體傳遞風(fēng)格。
具體地,假設(shè)要計算一個復(fù)雜的計算,正常情況會這樣編寫,代碼如下:
fun calculate(a: Int, b: Int): Int = a + b fun main() { val result = calculate(1, 2) Log.d("liduo",result) }
把上面的代碼改造成續(xù)體傳遞風(fēng)格。首先,定義一個續(xù)體傳遞接口,代碼如下:
interface Continuation { fun next(result: Int) }
對calculate方法進行改造,代碼如下:
fun calculate(a: Int, b: Int, continuation: Continuation) = continuation.next(a + b) fun main() { calculate(1, 2) { result -> Log.d("liduo", result) } }
經(jīng)過續(xù)體傳遞改造后,打印日志的操作被封裝到了Continuation中,并且依賴計算操作的回調(diào)。如果continuation方法不回調(diào)執(zhí)行參數(shù)continuation,打印日志的操作將永遠不會被執(zhí)行。
原本順序執(zhí)行一段代碼(邏輯),在經(jīng)過一次續(xù)體改造后變成了兩段代碼(邏輯)。
2)狀態(tài)機
協(xié)程的代碼在經(jīng)過Kotlin編譯器處理時,會被優(yōu)化成狀態(tài)機模型。每段代碼有三個狀態(tài):未執(zhí)行、掛起、已恢復(fù)(完成)。處于未執(zhí)行狀態(tài)的代碼可以被執(zhí)行,執(zhí)行過程中發(fā)生掛起,會進入掛起狀態(tài),從掛起中恢復(fù)或執(zhí)行完畢會進入已恢復(fù)(完成)狀態(tài)。當(dāng)多個像這樣的代碼進行協(xié)作時,可以組合出更復(fù)雜的狀態(tài)機。
協(xié)程上下文是一組可以附加到協(xié)程中的持久化用戶定義對象,代碼如下:
interface CoroutineContext { // 重載"[]"操作 operator fun <E : Element> get(key: Key<E>): E? // 單值歸一化操作 fun <R> fold(initial: R, operation: (R, Element) -> R): R // 重載 "+"操作 operator fun plus(context: CoroutineContext): CoroutineContext // 獲取當(dāng)前指定key外的其他上下文 fun minusKey(key: Key<*>): CoroutineContext interface Element : CoroutineContext { val key: Key<*> } interface Key<E : Element> }
Element接口繼承自CoroutineContext接口,協(xié)程中的攔截器、調(diào)度器、異常處理器以及代表協(xié)程自身生命周期等重要的類,都實現(xiàn)了Element接口。
Element接口規(guī)定每個實現(xiàn)該接口的對象都要有一個獨一無二的Key,以便在需要的時候可以在協(xié)程上下文中快速的找到。因此,協(xié)程上下文可以理解為是一個Element的索引集,一個結(jié)構(gòu)介于Set和Map之間的索引集。
協(xié)程作用域用于管理作用域內(nèi)協(xié)程的生命周期,代碼如下:
interface CoroutineScope { // 作用域內(nèi)啟動協(xié)程的默認上下文 val coroutineContext: CoroutineContext }
協(xié)程中提供了兩個常用的方法來創(chuàng)建新的協(xié)程作用域,一個是coroutineScope方法,一個是supervisorScope方法,這兩種方法創(chuàng)建的作用域中的上下文會自動繼承父協(xié)程的上下文。除此之外,使用GlobalScope啟動協(xié)程,也會為協(xié)程創(chuàng)建一個新的協(xié)程作用域,但協(xié)程作用域的上下文為空上下文。
當(dāng)父協(xié)程被取消或發(fā)生異常時,會自動取消父協(xié)程所有的子協(xié)程。當(dāng)子協(xié)程取消或發(fā)生異常時,在coroutineScope作用域下,會導(dǎo)致父協(xié)程取消;而在supervisorScope作用域下,則不會影響父協(xié)程。
協(xié)程的作用域只對父子協(xié)程有效,對子孫協(xié)程無效。例如:啟動父協(xié)程,在supervisorScope作用域內(nèi)啟動子協(xié)程。當(dāng)子協(xié)程在啟動孫協(xié)程時,在不指定為supervisorScope作用域的情況下,默認為coroutineScope作用域。
協(xié)程調(diào)度器用于切換執(zhí)行協(xié)程的線程。常見的調(diào)度器有以下4種:
Dispatchers.Default:默認調(diào)度器。它使用JVM的共享線程池,該調(diào)度器的最大并發(fā)度是CPU的核心數(shù),默認為2。
Dispatchers.Unconfined:非受限調(diào)度器。該調(diào)度器不會限制代碼在指定的線程上執(zhí)行。即掛起函數(shù)后面的代碼不會主動恢復(fù)到掛起之前的線程去執(zhí)行,而是在執(zhí)行掛起函數(shù)的線程上執(zhí)行。
Dispatchers.IO:IO調(diào)度器。它將阻塞的IO任務(wù)分流到一個共享的線程池中。該調(diào)度器和Dispatchers.Default共享線程。
Dispatchers.Main:主線程調(diào)度器。一般用于操作與更新UI。
注意:Dispatchers.Default調(diào)度器和Dispatchers.IO 調(diào)度器分配的線程為守護線程。
協(xié)程共有以下四種啟動模式:
CoroutineStart.DEFAULT:立即執(zhí)行協(xié)程,可以隨時取消。
CoroutineStart.LAZY:創(chuàng)建一個協(xié)程,但不執(zhí)行,在用戶需要時手動觸發(fā)執(zhí)行。
CoroutineStart.ATOMIC:立即執(zhí)行協(xié)程,但在協(xié)程執(zhí)行前無法取消。目前處于試驗階段。
CoroutineStart.UNDISPATCHED:立即在當(dāng)前線程執(zhí)行協(xié)程,直到遇到第一個掛起。目前處于試驗階段。
每個協(xié)程在創(chuàng)建后都會返回一個Job接口指向的對象,一個Job對象代表一個協(xié)程,用于控制生命周期,代碼如下:
interface Job : CoroutineContext.Element { ... // 三個狀態(tài)標志 val isActive: Boolean val isCompleted: Boolean val isCancelled: Boolean // 獲取具體的取消異常 fun getCancellationException(): CancellationException // 啟動協(xié)程 fun start(): Boolean // 取消協(xié)程 fun cancel(cause: CancellationException? = null) ... // 等待協(xié)程執(zhí)行結(jié)束 suspend fun join() // 用于select語句 val onJoin: SelectClause0 // 用于注冊協(xié)程執(zhí)行結(jié)束的回調(diào) fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle ... }
1)協(xié)程狀態(tài)的轉(zhuǎn)換
在DEFAULT、ATOMIC、UNDISPATCHED這三個模式下,啟動協(xié)程會進入Active狀態(tài),而在LAZY模式下啟動的協(xié)程會進入New狀態(tài),需要在手動調(diào)用start方法后進入Active狀態(tài)。
Completing是一個內(nèi)部狀態(tài),對外不可感知。
2)狀態(tài)標識的變化
State | [isActive] | [isCompleted] | [isCancelled] |
---|---|---|---|
New | false | false | false |
Active | true | false | false |
Completing | true | false | false |
Cancelling | false | false | true |
Cancelled | false | true | true |
Completed | fasle | true | false |
1)runBlocking方法
fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
該方法用于在非協(xié)程作用域環(huán)境中啟動一個協(xié)程,并在這個協(xié)程中執(zhí)行l(wèi)ambda表達式中的代碼。同時,調(diào)用該方法會阻塞當(dāng)前線程,直到lambda表達式執(zhí)行完畢。該方法不應(yīng)該在協(xié)程中被調(diào)用,該方法設(shè)計的目的是為了讓suspend編寫的代碼可以在常規(guī)的阻塞代碼中調(diào)用。如果不設(shè)置協(xié)程調(diào)度器,那么協(xié)程將在當(dāng)前被阻塞的線程中執(zhí)行。示例代碼如下:
private fun main() { // 不指定調(diào)度器,在方法調(diào)用的線程執(zhí)行 runBlocking { // 這里是協(xié)程的作用域 Log.d("liduo", "123") } } private fun main() { // 指定調(diào)度器,在IO線程中執(zhí)行 runBlocking(Dispatchers.IO) { // 這里是協(xié)程的作用域 Log.d("liduo", "123") } }
2)launch方法
fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job
該方法用于在協(xié)程作用域中異步啟動一個新的協(xié)程,調(diào)用該方法不會阻塞線程。示例代碼如下:
private fun test() { // 作用域為GlobalScope // 懶啟動,主線程執(zhí)行 val job = GlobalScope.launch( context = Dispatchers.Main, start = CoroutineStart.LAZY) { Log.d("liduo", "123") } // 啟動協(xié)程 job.start() }
3)async方法
fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
該方法用于在協(xié)程作用域中中異步啟動一個新的協(xié)程,調(diào)用該方法不會阻塞線程。async方法與launch方法的不同之處在于可以攜帶返回值。調(diào)用該方法會返回一個Deferred接口指向的對象,調(diào)用該對象可以獲取協(xié)程執(zhí)行的結(jié)果。同時,Deferred接口繼承自Job接口,因此仍然可以操作協(xié)程的生命周期。示例代碼如下:
// suspend標記 private suspend fun test(): Int { // 作用域為GlobalScope,返回值為Int類型,,泛型可省略,自動推斷 val deffer = GlobalScope.async<Int> { Log.d("liduo", "123") // 延時1s delay(1000) 1 } // 獲取返回值 return deffer.await() }
通過調(diào)用返回的Deferred接口指向?qū)ο蟮腶wait方法可以獲取返回值。在調(diào)用await方法時,如果協(xié)程執(zhí)行完畢,則直接獲取返回值。如果協(xié)程還在執(zhí)行,則該方法會導(dǎo)致協(xié)程掛起,直到執(zhí)行結(jié)束或發(fā)生異常。
4)suspend關(guān)鍵字
suspend關(guān)鍵字用于修飾一個方法(lambda表達式)。suspend修飾的方法稱為suspend方法,表示方法在執(zhí)行中可能發(fā)生掛起。為什么是可能呢?比如下面的代碼雖然被suspend修飾,但實際并不會發(fā)生掛起:
private suspend fun test() { Log.d("liduo", "123") }
由于會發(fā)生掛起,因此suspend方法只能在協(xié)程中使用。suspend方法內(nèi)部可以調(diào)用其他的suspend方法,也可以非suspend方法。但suspend方法只能被其他的suspend方法調(diào)用。
5)withContext方法
suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T
該方法用于在當(dāng)前協(xié)程的執(zhí)行過程中,切換到調(diào)度器指定的線程去執(zhí)行參數(shù)block中的代碼,并返回一個結(jié)果。調(diào)用該方法可能會使當(dāng)前協(xié)程掛起,并在方法執(zhí)行結(jié)束時恢復(fù)掛起。示例代碼如下:
private suspend fun test() { // IO線程啟動并執(zhí)行,啟動模式DEFAULT GlobalScope.launch(Dispatchers.IO) { Log.d("liduo", "start") // 線程主切換并掛起,泛型可省略,自動推斷 val result = withContext<String>(Dispatchers.Main) { // 網(wǎng)絡(luò)請求 "json data" } // 切換回IO線程 Log.d("liduo", result) } }
6)suspend方法
inline fun <R> suspend(noinline block: suspend () -> R): suspend () -> R = block
該方法用于對掛起方法進行包裹,使掛起方法可以在非掛起方法中調(diào)用。該方法需要配合createCoroutine方法啟動協(xié)程。示例代碼如下:
// 返回包含當(dāng)前的協(xié)程代碼的續(xù)體 val continuation = suspend { // 執(zhí)行協(xié)程代碼 // 泛型可以修改需要的類型 }.createCoroutine(object : Continuation<Any> { override val context: CoroutineContext get() = EmptyCoroutineContext + Dispatchers.Main override fun resumeWith(result: Result<Any>) { // 獲取最終結(jié)果 } }) // 執(zhí)行續(xù)體內(nèi)容 continuation.resume(Unit)
一般開發(fā)中不會通過該方法啟動協(xié)程,但該方法可以更本質(zhì)的展示協(xié)程的啟動、恢復(fù)、掛起。
1)Channel
Channel用于協(xié)程間的通信。Channel本質(zhì)上是一個并發(fā)安全的隊列,類似BlockingQueue。在使用時,通過調(diào)用同一個Channel對象的send和receive方法實現(xiàn)通信,示例代碼如下:
suspend fun main() { // 創(chuàng)建 val channel = Channel<Int>() val producer = GlobalScope.launch { var i = 0 while (true){ // 發(fā)送 channel.send(i++) delay(1000) // channel不需要時要及時關(guān)閉 if(i == 10) channel.close() } } // 寫法1:常規(guī) val consumer = GlobalScope.launch { while(true){ // 接收 val element = channel.receive() Log.d("liduo", "$element") } } // 寫法2:迭代器 val consumer = GlobalScope.launch { val iterator = channel.iterator() while(iterator.hasNext()){ // 接收 val element = iterator.next() Log.d("liduo", "$element") } } // 寫法3:增強for循環(huán) val consumer = GlobalScope.launch { for(element in channel){ Log.d("liduo", "$element") } } // 上面的協(xié)程由于不是懶啟動,因此創(chuàng)建完成直接就會start去執(zhí)行 // 也就是說,代碼走到這里,上面的兩個協(xié)程已經(jīng)開始工作 // join方法會掛起當(dāng)前協(xié)程,而不是上面已經(jīng)啟動的兩個協(xié)程 // 在Android環(huán)境中,下面兩行代碼可以不用添加 // producer.join() // consumer.join() }
上述例子是一個經(jīng)典的生產(chǎn)者-消費者模型。在寫法1中,由于send方法和receive方法被suspend關(guān)鍵字修飾,因此,在默認情況下,當(dāng)生產(chǎn)速度與消費速度不匹配時,調(diào)用這兩個方法會導(dǎo)致協(xié)程掛起。
除此之外,Channel支持使用迭代器進行接收。其中,hasNext方法也可能會導(dǎo)致協(xié)程掛起。
Channel對象在不使用時要及時關(guān)閉,可以由發(fā)送者關(guān)閉,也可以由接收者關(guān)閉,具體取決于業(yè)務(wù)場景。
2)Channel的容量
Channel方法不是Channel的構(gòu)造方法,而是一個工廠方法,代碼如下:
fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> = when (capacity) { RENDEZVOUS -> RendezvousChannel() UNLIMITED -> LinkedListChannel() CONFLATED -> ConflatedChannel() BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) else -> ArrayChannel(capacity) }
在創(chuàng)建Channel時可以指定容量:
RENDEZVOUS:創(chuàng)建一個容量為0的Channel,類似于SynchronousQueue。send之后會掛起,直到被receive。枚舉值為0。
UNLIMITED:創(chuàng)建一個容量無限的Channel,內(nèi)部通過鏈表實現(xiàn)。枚舉值為Int.MAX_VALUE。
CONFLATED:創(chuàng)建一個容量為1的Channel,當(dāng)后一個的數(shù)據(jù)會覆蓋前一個數(shù)據(jù)。枚舉值為-1。
BUFFERED:創(chuàng)建一個默認容量的Channel,默認容量為kotlinx.coroutines.channels.defaultBuffer配置變量指定的值,未配置情況下,默認為64。枚舉值為-2。
如果capacity的值不為上述的枚舉值,則創(chuàng)建一個指定容量的Channel。
3)produce方法與actor方法
fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E>
fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit ): SendChannel<E>
與launch方法和async方法相同,使用produce方法與actor方法也可以啟動協(xié)程。但不同的是,在produce方法與actor方法中可以更簡潔的使用Channel。示例代碼如下:
// 啟動協(xié)程,返回一個接收Channel val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce { while(true){ delay(100) // 發(fā)送 send(1) } } // 啟動協(xié)程,返回一個發(fā)送Channel val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> { while(true){ // 接收 val element = receive() Log.d("liduo","$element") } }
produce方法與actor方法內(nèi)部對Channel對象做了處理,當(dāng)協(xié)程執(zhí)行完畢,自動關(guān)閉Channel對象。
但目前,produce方法還處于試驗階段(被ExperimentalCoroutinesApi注解修飾)。而actor方法也已經(jīng)過時(被ObsoleteCoroutinesApi注解修飾)。因此在實際開發(fā)中最好不要使用!
4)BroadcastChannel
當(dāng)遇到一個發(fā)送者對應(yīng)多個接收者的場景時,可以使用BroadcastChannel。代碼如下:
fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> = when (capacity) { 0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel") UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel") CONFLATED -> ConflatedBroadcastChannel() BUFFERED -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY) else -> ArrayBroadcastChannel(capacity) }
創(chuàng)建BroadcastChannel對象時,必須指定容量大小。接收者通過調(diào)用BroadcastChannel對象的openSubscription方法,獲取ReceiveChannel對象來接收消息。示例代碼如下:
// 創(chuàng)建BroadcastChannel,容量為5 val broadcastChannel = BroadcastChannel<Int>(5) // 創(chuàng)建發(fā)送者協(xié)程 GlobalScope.launch { // 發(fā)送 1 broadcastChannel.send(1) delay(100) // 發(fā)送 2 broadcastChannel.send(2) // 關(guān)閉 broadcastChannel.close() }.join() // 創(chuàng)建接收者1協(xié)程 GlobalScope.launch { // 獲取ReceiveChannel val receiveChannel = broadcastChannel.openSubscription() // 接收 for (element in receiveChannel) { Log.d("receiver_1: ", "$element") } }.join() // 創(chuàng)建接收者2協(xié)程 GlobalScope.launch { // 獲取ReceiveChannel val receiveChannel = broadcastChannel.openSubscription() // 接收 for (element in receiveChannel) { Log.d("receiver_2: ", "$element") } }.join()
每個接收者都可以收到發(fā)送者發(fā)送的每一條消息。使用擴展方法broadcast可以直接將Channel對象轉(zhuǎn)化為BroadcastChannel對象,示例代碼如下:
val channel = Channel<Int>() val broadcastChannel = channel.broadcast(10)
BroadcastChannel的很多方法也處于試驗階段(被ExperimentalCoroutinesApi注解修飾),使用時需慎重!
協(xié)程中提供了類似Java中Nio的select方法,用于多路復(fù)用,代碼如下:
suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R
以Channel的多路復(fù)用為例,具體看一下select方法的使用。示例代碼如下:
private suspend fun test() { // 創(chuàng)建一個Channel列表 val channelList = mutableListOf<Channel<Int>>() // 假設(shè)其中有5個Channel channelList.add(Channel()) channelList.add(Channel()) channelList.add(Channel()) channelList.add(Channel()) channelList.add(Channel()) // 調(diào)用select方法,協(xié)程掛起 val result = select<Int> { // 對5個Channel進行注冊監(jiān)聽,等待接收 channelList.forEach { it.onReceive } } // 當(dāng)5個Channel中任意一個接收到消息時,select掛起恢復(fù) // 并將返回值賦給result Log.d("liduo", "$result") }
除此之外,協(xié)程中還有很多接口定義了名字為"onXXX"的方法,比如Job接口的onJoin方法,Deferred接口的onAwait方法,都是用于配合select方法來進行多路復(fù)用。
協(xié)程中提供了sequence方法來生成序列。示例代碼如下:
private suspend fun test() { // 創(chuàng)建一個可以輸出奇數(shù)的序列,泛型可省略,自動推斷 val singleNumber = sequence<Int> { val i = 0 while (true) { // 在需要輸出的地方調(diào)用yield方法 yield(2 * i - 1) } } // 調(diào)用迭代器,獲取序列的輸出 singleNumber.iterator().forEach { Log.d("liduo", "$it") } // 獲取序列前五項,迭代輸出 singleNumber.take(5).forEach { Log.d("liduo", "$it") } }
調(diào)用yield方法會使協(xié)程掛起,同時輸出這個序列當(dāng)前生成的值。除此之外,也可以調(diào)用yieldAll方法來輸出序列產(chǎn)生值的合集,示例代碼如下:
private suspend fun test() { // 創(chuàng)建一個可以輸出奇數(shù)的序列,泛型可省略,自動推斷 val singleNumber = sequence<Int> { yieldAll(listOf(1,3,5,7)) yieldAll(listOf(9,11,13)) yieldAll(listOf(15,17)) } // 調(diào)用迭代器,獲取序列的輸出,最多為9項 singleNumber.iterator().forEach { Log.d("liduo", "$it") } // 獲取序列前五項,迭代輸出 singleNumber.take(5).forEach { // 1,3,5,7,9 Log.d("liduo", "$it") } }
協(xié)程中提供了類似RxJava的響應(yīng)式編程API——Flow(官方稱為異步冷數(shù)據(jù)流,官方也提供了創(chuàng)建熱數(shù)據(jù)流的方法)。
1)基礎(chǔ)使用
// 在主線程上調(diào)用 GlobalScope.launch(Dispatchers.Main) { // 創(chuàng)建流 flow<Int> { // 掛起,輸出返回值 emit(1) // 設(shè)置流執(zhí)行的線程,并消費流 }.flowOn(Dispatchers.IO).collect { Log.d("liduo", "$it") } }.join()
emit方法是一個掛起方法,類似sequence中的yield方法,用于輸出返回值。flowOn方法等同于Rxjava中的subscribeOn方法,用于切換flow執(zhí)行的線程。為了避免理解混淆,F(xiàn)low中沒有提供類似Rxjava中的observeOn方法,但可以通過指定流所在協(xié)程的上下文參數(shù)確定。collect方法等同于RxJava中的subscribe方法,用于觸發(fā)和消費流。
一個流可以被多次消費,示例代碼如下:
GlobalScope.launch(Dispatchers.IO) { val mFlow = flow<Int> { emit(1) }.flowOn(Dispatchers.Main) mFlow.collect { Log.d("liduo1", "$it") } mFlow.collect { Log.d("liduo2", "$it") } }.join()
2)異常處理
Flow支持類似try-catch-finally的異常處理。示例代碼如下:
flow<Int> { emit(1) // 拋出異常 throw NullPointerException() }.catch { cause: Throwable -> Log.d("liduo", "${cause.message}") }.onCompletion { cause: Throwable? -> Log.d("liduo", "${cause?.message}") }
catch方法用于捕獲異常。onCompletion方法等同于finally代碼塊。Kotlin不建議直接在flow中通過try-catch-finally代碼塊去捕獲異常!
Flow中還提供了類似RxJava的onErrorReturn方法的操作,示例代碼如下:
flow<Int> { emit(1) // 拋出異常 throw NullPointerException() }.catch { cause: Throwable -> Log.d("liduo", "${cause.message}") emit(-1) }
3)觸發(fā)分離
Flow支持提前寫好流的消費,在必要的時候再去觸發(fā)消費的操作。示例代碼如下:
// 創(chuàng)建Flow的方法 fun myFlow() = flow<Int> { // 生產(chǎn)過程 emit(1) }.onEach { // 消費過程 Log.d("liduo", "$it") } suspend fun main() { // 寫法1 GlobalScope.launch { // 觸發(fā)消費 myFlow().collect() }.join() // 寫法2 myFlow().launchIn(GlobalScope).join() }
4)注意
Flow中不提供取消collect的方法。如果要取消flow的執(zhí)行,可以直接取消flow所在的協(xié)程。
emit方法不是線程安全的,因此不要在flow中調(diào)用withContext等方法切換調(diào)度器。如果需要切換,可以使用channelFlow。
在本文中,啟動協(xié)程使用的都是GlobalScope,但在實際開發(fā)過程中,不應(yīng)該使用GlobalScope。GlobalScope會開啟一個全新的協(xié)程作用域,并且不受我們控制。假設(shè)Activity頁面關(guān)閉時,其中的協(xié)程還沒有運行結(jié)束,并且我們還無法取消協(xié)程的執(zhí)行,這時可能會導(dǎo)致內(nèi)存泄漏。因此,在實際開發(fā)中,可以自定義一個全局的協(xié)程作用域,或者至少按照以下方法書寫代碼:
// 實現(xiàn)CoroutineScope接口 class MainActivity : AppCompatActivity(),CoroutineScope by MainScope() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) // 直接啟動協(xié)程 launch { Log.d("liduo", "launch") } } override fun onDestroy() { super.onDestroy() // 取消頂級父協(xié)程 cancel() } }
MainScope的代碼如下:
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
Dispatchers.Main表示在主線程調(diào)度,SupervisorJob()表示子協(xié)程取消不會影響父協(xié)程。
“Kotlin協(xié)程概念原理與使用實例分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。