溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RxJava線程切換過程是怎樣的

發(fā)布時(shí)間:2021-12-28 16:33:54 來(lái)源:億速云 閱讀:130 作者:柒染 欄目:云計(jì)算

今天就跟大家聊聊有關(guān)RxJava線程切換過程是怎樣的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

線程切換過程

下面我們就來(lái)看看它的又一利器,調(diào)度器Scheduler:就像我們所知道的,Scheduler是給Observable數(shù)據(jù)流添加多線程功能所準(zhǔn)備的,一般我們會(huì)通過使用subscribeOn()observeOn()方法傳入對(duì)應(yīng)的Scheduler去指定數(shù)據(jù)流的每部分操作應(yīng)該以何種方式運(yùn)行在何種線程。對(duì)于我們而言,最常見的莫過于在非主線程獲取并處理數(shù)據(jù)之后在主線程更新UI這樣的場(chǎng)景了:

這是我們十分常見的調(diào)用方法,一氣呵成就把不同線程之間的處理都搞定了,因?yàn)槭擎準(zhǔn)剿越Y(jié)構(gòu)也很清晰,我們現(xiàn)在來(lái)看看這其中的線程切換流程。

  • subscribeOn()

    當(dāng)我們調(diào)用subscribeOn()的時(shí)候:

    可以看到這里也是調(diào)用了create()去生成一個(gè)Observable,而OperatorSubscribeOn則是實(shí)現(xiàn)了OnSubscribe接口,同時(shí)將原始的Observable和我們需要的scheduler傳入:


    可以看出來(lái),這里對(duì)subscriber的處理與前文中OperatorMapcall()對(duì)subscriber的處理很相似。在這里我們同樣會(huì)根據(jù)傳入的subscriber構(gòu)造出新的Subscribers,不過這一系列的過程大部分都是由worker通過schedule()去執(zhí)行的,從后面setProducer()中對(duì)于線程的判斷,再結(jié)合subscribeOn()方法的目的我們能大概推測(cè)出,這個(gè)worker在一定程度上就相當(dāng)于一個(gè)新線程的代理執(zhí)行者,schedule()所實(shí)現(xiàn)的與Thread類中run()應(yīng)該十分類似。我們現(xiàn)在來(lái)看看這個(gè)worker的執(zhí)行過程。
    首先從Schedulers.io()進(jìn)入:

    這個(gè)通過hook拿到scheduler的過程我們先不管,直接進(jìn)CachedThreadScheduler,看它的createWorker()方法:

    這里的pool是一個(gè)原子變量引用AtomicReference,所持有的則是CachedWorkerPool,因而這個(gè)pool顧名思義就是用來(lái)保存worker的緩存池啦,我們從緩存池里拿到需要的worker并作了一層封裝成為EventLoopWorker

    在這里我們終于發(fā)現(xiàn)目標(biāo)ThreadWorker,它繼承自NewThreadWorker,之前的schedule()方法最終都會(huì)到這個(gè)scheduleActual()方法里:

    這里我們看到了executor線程池,我們用Schedulers.io()最終實(shí)現(xiàn)的線程切換的本質(zhì)就在這里了?,F(xiàn)在再結(jié)合之前的過程我們從頭梳理一下:

    subscribeOn()時(shí),我們會(huì)新生成一個(gè)Observable,它的成員onSubscribe會(huì)在目標(biāo)Subscriber訂閱時(shí)使用傳入的Scheduler的worker作為線程調(diào)度執(zhí)行者,在對(duì)應(yīng)的線程中通知原始Observable發(fā)送消息給這個(gè)過程中臨時(shí)生成的Subscriber,這個(gè)Subscriber又會(huì)通知到目標(biāo)Subscriber,這樣就完成了subscribeOn()的過程。

  • observeOn()

    下面我們接著來(lái)看看observeOn()

    我們直接看最終調(diào)用的部分,可以看到這里又是一個(gè)lift(),在這里傳入了OperatorObserveOn,它與OperatorSubscribeOn不同,是一個(gè)OperatorOperator的功能我們上文中已經(jīng)講過就不贅述了),它構(gòu)造出了新的觀察者ObserveOnSubscriber并實(shí)現(xiàn)了Action0接口:

    可以看出來(lái),這里ObserveOnSubscriber所有的發(fā)送給目標(biāo)Subscriber child的消息都被切換到了recursiveScheduler的線程作處理,也就達(dá)到了將線程切回的目的。

總結(jié)observeOn()整體流程如下:

對(duì)比subscribeOn()observeOn()這兩個(gè)過程,我們不難發(fā)現(xiàn)兩者的區(qū)別:subscribeOn()將初始Observable的訂閱事件整體都切換到了另一個(gè)線程;而observeOn()則是將初始Observable發(fā)送的消息切換到另一個(gè)線程通知到目標(biāo)Subscriber。前者把 “訂閱 + 發(fā)送” 的切換了一個(gè)線程,后者把 “發(fā)送” 切換了一個(gè)線程。所以,我們的代碼中所實(shí)現(xiàn)的功能其實(shí)是:

這樣就能很容易實(shí)現(xiàn)耗時(shí)任務(wù)在子線程操作,在主線程作更新操作等這些常見場(chǎng)景的功能啦。

4.其他角色

Subject
Subject在Rx系列是一個(gè)比較特殊的角色,它繼承了Observable的同時(shí)也實(shí)現(xiàn)了Observer接口,也就是說它既可作為觀察者,也可作為被觀察者,他一般被用來(lái)作為連接多個(gè)不同Observable、Observer之間的紐帶??赡苣銜?huì)奇怪,我們不是已經(jīng)有了像map()、flatMap()這類的操作符去變化 Observable數(shù)據(jù)流了嗎,為什么還要引入Subject這個(gè)東西呢?這是因?yàn)镾ubject所承擔(dān)的工作并非是針對(duì)Observable數(shù)據(jù)流內(nèi)容的轉(zhuǎn)換連接,而是數(shù)據(jù)流本身在Observable、Observer之間的調(diào)度。光這么說可能還是很模糊,我們舉個(gè)《RxJava Essentials》中的例子:

我們通過create()創(chuàng)建了一個(gè)PublishSubject,觀察者成功訂閱了這個(gè)subject,然而這個(gè)subject卻沒有任何數(shù)據(jù)要發(fā)送,我們只是知道他未來(lái)會(huì)發(fā)送的會(huì)是String值而已。之后,當(dāng)我們調(diào)用subject.onNext()時(shí),消息才被發(fā)送,Observer的onNext()被觸發(fā)調(diào)用,輸出了"Hello World"。

這里我們注意到,當(dāng)訂閱事件發(fā)生時(shí),我們的subject是沒有產(chǎn)生數(shù)據(jù)流的,直到它發(fā)射了"Hello World",數(shù)據(jù)流才開始運(yùn)轉(zhuǎn),試想我們?nèi)绻麑⒂嗛嗊^程和subject.onNext()調(diào)換一下位置,那么Observer就一定不會(huì)接受到"Hello World"了(這不是廢話嗎- -|||),因而這也在根本上反映了Observable的冷熱區(qū)別。

一般而言,我們的Observable都屬于Cold Observables,就像看視頻,每次點(diǎn)開新視頻我們都要從頭開始播放;而Subject則默認(rèn)屬于Hot Observables,就像看直播,視頻數(shù)據(jù)永遠(yuǎn)都是新的。
基于這種屬性,Subject自然擁有了對(duì)接收到的數(shù)據(jù)流進(jìn)行選擇調(diào)度等的能力了,因此,我們對(duì)于Subject的使用也就通常基于如下的思路:

在前面的例子里我們用到的是PublishSubject,它只會(huì)把在訂閱發(fā)生的時(shí)間點(diǎn)之后來(lái)自原始Observable的數(shù)據(jù)發(fā)射給觀察者。等一下,這功能聽起來(lái)是不是有些似曾相識(shí)呢?

沒錯(cuò),就是EventBus和Otto。(RxJava的出現(xiàn)慢慢讓Otto退出了舞臺(tái),現(xiàn)在Otto的Repo已經(jīng)是Deprecated狀態(tài)了,而EventBus依舊堅(jiān)挺)基于RxJava的觀察訂閱取消的能力和PublishSubject的功能,我們十分容易就能寫出實(shí)現(xiàn)了最基本功能的簡(jiǎn)易事件總線框架:

當(dāng)然Subject還有其他如BehaviorSubject、ReplaySubjectAsyncSubject等類型,大家可以去看官方文檔,寫得十分詳細(xì),這里就不介紹了。

三.后記

前面相信最近這段日子里,提到RxJava,大家就會(huì)想到Google最近剛剛開源的Agera。Agera作為專門為Android打造的Reactive Programming框架,難免會(huì)被拿來(lái)與RxJava做對(duì)比。本文前面RxJava的主體流程分析已近尾聲,現(xiàn)在我們?cè)賮?lái)看看Agera這東東又是怎么一回事。

首先先上結(jié)論:

Agera最初是為了Google Play Movies而開發(fā)的一個(gè)內(nèi)部框架,現(xiàn)在開源出來(lái)了,它雖然是在RxJava之后才出現(xiàn),但是完全獨(dú)立于RxJava,與它沒有任何關(guān)系(只不過開源的時(shí)間十分微妙罷了233333)。 與RxJava比起來(lái),Agera更加專注于Android的生命周期,而RxJava則更加純粹地面向Java平臺(tái)而非Android。

也許你可能會(huì)問:“那么RxAndroid呢,不是還有它嗎?”事實(shí)上,RxAndroid早在1.0版本的時(shí)候就進(jìn)行了很大的重構(gòu),很多模塊被拆分到其他的項(xiàng)目中去了,同時(shí)也刪除了部分代碼,僅存下來(lái)的部分多是和Android線程相關(guān)的部分,比如AndroidSchedulers、MainThreadSubscription等。鑒于這種情況,我們暫且不去關(guān)注RxAndroid,先把目光放在Agera上。

同樣也是基于觀察者模式,Agera和RxJava的角色分類大致相似,在Agera中,主要角色有兩個(gè):Observable(被觀察者)、Updatable(觀察者)。



是的,相較于RxJava中的Observable,Agera中的Observable只是一個(gè)簡(jiǎn)單的接口,也沒有范性的存在,Updatable亦是如此,這樣我們要如何做到消息的傳遞呢?這就需要另外一個(gè)接口了:

終于看到了泛型T,我們的消息的傳遞能力就是依賴于此接口了。所以我們將這個(gè)接口和基礎(chǔ)的Observable結(jié)合一下:

這里的Repository<T>在一定程度上就是我們想要的RxJava中的Observable<T>啦。類似地,Repository也有兩種類型的實(shí)現(xiàn):

  • Direct - 所包含的數(shù)據(jù)總是可用的或者是可被同步計(jì)算出來(lái)的;一個(gè)Direct的Repository總是處于活躍(active)狀態(tài)下

  • Deferred - 所包含的數(shù)據(jù)是異步計(jì)算或拉去所得;一個(gè)Deffered的Repository直到有Updatable被添加進(jìn)來(lái)之前都會(huì)是非活躍(inactive)狀態(tài)下
    是不是感到似曾相識(shí)呢?沒錯(cuò),Repository也是有冷熱區(qū)分的,不過我們現(xiàn)在暫且不去關(guān)注這一點(diǎn)?;氐缴厦娼又矗热滑F(xiàn)在發(fā)數(shù)據(jù)的角色有了,那么我們要如何接收數(shù)據(jù)呢?答案就是Receiver

相信看到這里,大家應(yīng)該也隱約感覺到了:在Agera的世界里,數(shù)據(jù)的傳輸與事件的傳遞是相互隔離開的,這是目前Agera與Rx系列的最大本質(zhì)區(qū)別。Agera所使用的是一種push event, pull data的模型,這意味著event并不會(huì)攜帶任何data,Updatable在需要更新時(shí),它自己會(huì)承擔(dān)起從數(shù)據(jù)源拉取數(shù)據(jù)的任務(wù)。這樣,提供數(shù)據(jù)的責(zé)任就從Observable中拆分了出來(lái)交給了Repository,讓其自身能夠?qū)W⒂诎l(fā)送一些簡(jiǎn)單的事件如按鈕點(diǎn)擊、一次下拉刷新的觸發(fā)等等。

那么,這樣的實(shí)現(xiàn)有什么好處呢?

當(dāng)這兩種處理分發(fā)邏輯分離開時(shí),Updatable就不必觀察到來(lái)自Repository的完整數(shù)據(jù)變化的歷史,畢竟在大多數(shù)場(chǎng)景下,尤其是更新UI的場(chǎng)景下,最新的數(shù)據(jù)往往才是有用的數(shù)據(jù)。

但是我就是需要看到變化的歷史數(shù)據(jù),怎么辦?

不用擔(dān)心,這里我們?cè)僬?qǐng)出一個(gè)角色Reservoir

顧名思義,Reservoir就是我們用來(lái)存儲(chǔ)變化中的數(shù)據(jù)的地方,它繼承了ReceiverRepository,也就相當(dāng)于同時(shí)具有了接收數(shù)據(jù),發(fā)送數(shù)據(jù)的能力。通過查看其具體實(shí)現(xiàn)我們可以知道它的本質(zhì)操作都是使用內(nèi)部的Queue實(shí)現(xiàn)的:通過accept()接收到數(shù)據(jù)后入列,通過get()拿到數(shù)據(jù)后出列。若一個(gè)Updatable觀察了此Reservoir,其隊(duì)列中發(fā)生調(diào)度變化后即將出列的下一個(gè)數(shù)據(jù)如果是可用的(非空),就會(huì)通知該Updatable,進(jìn)一步拉取這個(gè)數(shù)據(jù)發(fā)送給Receiver。

現(xiàn)在,我們已經(jīng)大概了解了這幾個(gè)角色的功能屬性了,接下來(lái)我們來(lái)看一段官方示例代碼:


是不是有些云里霧里的感覺呢?多虧有注釋,我們大概能夠猜出到底上面都做了什么:使用需要的圖片規(guī)格作為參數(shù)拼接到url中,拉取對(duì)應(yīng)的圖片并用ImageView顯示出來(lái)。我們結(jié)合API來(lái)看看整個(gè)過程:

  • Repositories.repositoryWithInitialValue(Result.absent())
    創(chuàng)建一個(gè)可運(yùn)行(抑或說執(zhí)行)的repository。
    初始化傳入值是Result,它用來(lái)概括一些諸如apply()、merge()的操作的結(jié)果的不可變對(duì)象,并且存在兩種狀態(tài)succeeded()、failed()。
    返回REventSource

  • observe()
    用于添加新的Observable作為更新我們的圖片的Event source,本例中不需要。
    返回RFrequency

  • onUpdatesPerLoop()
    在每一個(gè)Looper Thread loop中若有來(lái)自多個(gè)Event Source的update()處理時(shí),只需開啟一個(gè)數(shù)據(jù)處理流。
    返回RFlow

  • getFrom(new Supplier(…))
    忽略輸入值,使用來(lái)自給定Supplier的新獲取的數(shù)據(jù)作為輸出值。
    返回RFlow

  • goTo(executor)
    切換到給定的executor繼續(xù)數(shù)據(jù)處理流。

  • attemptTransform(function())
    使用給定的function()變換輸入值,若變換失敗,則終止數(shù)據(jù)流;若成功,則取新的變換后的值作為當(dāng)前流指令的輸出。
    返回RTermination

  • orSkip()
    若前面的操作檢查為失敗,就跳過剩下的數(shù)據(jù)處理流,并且不會(huì)通知所有已添加的Updatable。

  • thenTransform(function())
    與attemptTransform(function())相似,區(qū)別在于當(dāng)必要時(shí)會(huì)發(fā)出通知。
    返回RConfig

  • onDeactivation(SEND_INTERRUPT)
    用于明確repository不再active時(shí)的行為。
    返回RConfig

  • compile()
    執(zhí)行這個(gè)repository。
    返回Repository

整體流程乍看起來(lái)并沒有什么特別的地方,但是真正的玄機(jī)其實(shí)藏在執(zhí)行每一步的返回值里:
初始的REventSource<T, T>代表著事件源的開端,它從傳入值接收了T initialValue,這里的中,第一個(gè)T是當(dāng)前repository的數(shù)據(jù)的類型,第二個(gè)T則是數(shù)據(jù)處理流開端的時(shí)候的數(shù)據(jù)的類型。

之后,當(dāng)observe()調(diào)用后,我們傳入事件源給REventSource,相當(dāng)于設(shè)定好了需要的事件源和對(duì)應(yīng)的開端,這里返回的是RFrequency<T, T>,它繼承自REventSource,為其添加了事件源的發(fā)送頻率的屬性。

之后,我們來(lái)到了onUpdatesPerLoop(),這里明確了所開啟的數(shù)據(jù)流的個(gè)數(shù)(也就是前面所講的頻率)后,返回了RFlow,這里也就意味著我們的數(shù)據(jù)流正式生成了。同時(shí),這里也是流式調(diào)用的起點(diǎn)。

拿到我們的RFlow之后,我們就可以為其提供數(shù)據(jù)源了,也就是前面說的Supplier,于是調(diào)用getFrom(),這樣我們的數(shù)據(jù)流也就真正意義擁有了數(shù)據(jù)“干貨”。

有了數(shù)據(jù)之后我們就可以按具體需要進(jìn)行數(shù)據(jù)轉(zhuǎn)換了,這里我們可以直接使用transform(),返回RFlow,以便進(jìn)一步進(jìn)行流式調(diào)用;也可以調(diào)用attemptTransform()來(lái)對(duì)可能出現(xiàn)的異常進(jìn)行處理,比如orSkip()、orEnd()之后繼續(xù)進(jìn)行流式調(diào)用。

經(jīng)過一系列的流式調(diào)用之后,我們終于對(duì)數(shù)據(jù)處理完成啦,現(xiàn)在我們可以選擇先對(duì)成型的數(shù)據(jù)在做一次最后的包裝thenTransform(),或是與另一個(gè)Supplier合并thenMergeIn()等。這些處理之后,我們的返回值也就轉(zhuǎn)為了RConfig,進(jìn)入了最終配置和repository聲明結(jié)束的狀態(tài)。
在最終的這個(gè)配置過程中,我們調(diào)用了onDeactivation(),為這個(gè)repository明確了最終進(jìn)入非活躍狀態(tài)時(shí)的行為,如果不需要其他多余的配置的話,我們就可以進(jìn)入最終的compile()方法了。當(dāng)我們調(diào)用compile()時(shí),就會(huì)按照前面所走過的所有流程與配置去執(zhí)行并生成這個(gè)repository。到此,我們的repository才真正被創(chuàng)建了出來(lái)。

以上就是repository從無(wú)到有的全過程。當(dāng)repository誕生后,我們也就可以傳輸需要的數(shù)據(jù)啦。再回到上面的示例代碼:

我們?cè)?code>onResume()、onPause()這兩個(gè)生命周期下分別添加、移除了Updatable。相較于RxJava中通過Subscription去取消訂閱的做法,Agera的這種寫法顯然更為清晰也更為整潔。我們的Activity實(shí)現(xiàn)了Updatable和Receiver接口,直接看其實(shí)現(xiàn)方法:

可以看到這里repository將數(shù)據(jù)發(fā)送給了receiver,也就是自己,在對(duì)應(yīng)的accept()方法中接收到我們想要的bitmap后,這張圖片也就顯示出來(lái)了,示例代碼中的完整流程也就結(jié)束了。

總結(jié)一下上述過程:

  • 首先Repositories.repositoryWithInitialValue()生成原點(diǎn)REventSource。

  • 配置完Observable之后進(jìn)入RFrequency狀態(tài),接著配置數(shù)據(jù)流的流數(shù)。

  • 前面配置完成后,數(shù)據(jù)流RFlow生成,之后通過getFrom()、mergeIn()、transform()等方法可進(jìn)一步進(jìn)行流式調(diào)用;也可以使用attemptXXX()方法代替原方法,后面接著調(diào)用orSkip()、orEnd()進(jìn)行error handling處理。當(dāng)使用attemptXXX()方法時(shí),數(shù)據(jù)流狀態(tài)會(huì)變?yōu)镽Termination,它代表此時(shí)的狀態(tài)已具有終結(jié)數(shù)據(jù)流的能力,是否終結(jié)數(shù)據(jù)流要根據(jù)failed check觸發(fā),結(jié)合后面跟著調(diào)用的orSkip()orEnd(),我們的數(shù)據(jù)流會(huì)從RTermination再次切換為RFlow,以便進(jìn)行后面的流式調(diào)用。

  • 經(jīng)過前面一系列的流式處理,我們需要結(jié)束數(shù)據(jù)流時(shí),可以選擇調(diào)用thenXXX()方法,對(duì)數(shù)據(jù)流進(jìn)行最終的處理,處理之后,數(shù)據(jù)流狀態(tài)會(huì)變?yōu)?RConfig;也可以為此行為添加error handling處理,選擇thenAttemptXXX()方法,后面同樣接上orSkip()、orEnd()即可,最終數(shù)據(jù)流也會(huì)轉(zhuǎn)為Rconfig狀態(tài)。

  • 此時(shí),我們可以在結(jié)束前按需要選擇對(duì)數(shù)據(jù)流進(jìn)行最后的配置,例如:調(diào)用onDeactivation()配置從“訂閱”到“取消訂閱”的過程是否需要繼續(xù)執(zhí)行數(shù)據(jù)流等等。

  • 一切都部署完畢后,我們compile()這個(gè)RConfig,得到最終的成型的Repository,它具有添加Updatable、發(fā)送數(shù)據(jù)通知Receiver的能力。

  • 我們根據(jù)需要添加Updatable,repository在數(shù)據(jù)流處理完成后會(huì)通過update()發(fā)送event通知Updatable。

  • Updatable收到通知后則會(huì)拉取repository的成果數(shù)據(jù),并將數(shù)據(jù)通過accept()發(fā)送給Receiver。完成 Push event, pull data 的流程。

看完上述內(nèi)容,你們對(duì)RxJava線程切換過程是怎樣的有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI