溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Rxjava有什么用

發(fā)布時(shí)間:2021-07-24 10:13:51 來源:億速云 閱讀:124 作者:小新 欄目:移動(dòng)開發(fā)

小編給大家分享一下Rxjava有什么用,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

官方的介紹

1.支持Java6+

2.android 2.3+

3.異步的

4.基于觀察者設(shè)計(jì)模式(Observer、Observable)不懂設(shè)計(jì)模式的可以移步到此:淺談Java設(shè)計(jì)模式(十五)觀察者模式(Observer)

5.Subscribe (訂閱)

正式使用RxJava

用框架或者庫(kù)都是為了簡(jiǎn)潔、方便,RxJava也不例外它能使你的代碼邏輯更加的簡(jiǎn)潔。舉個(gè)例子之前我們先來引入依賴的 gradle 代碼:

compile 'io.reactivex:rxjava:1.0.14'  
compile 'io.reactivex:rxandroid:1.0.1'

既然是基于異步,當(dāng)然要在處理比較耗時(shí)的操作上才能彰顯它的優(yōu)勢(shì)!現(xiàn)在我們假設(shè)有這樣一個(gè)需求:

需要實(shí)現(xiàn)一個(gè)多個(gè)下載的圖片并且顯示的功能,它的作用可以添加多個(gè)下載操作,由于下載這一過程較為耗時(shí),需要放在后臺(tái)執(zhí)行,而圖片的顯示則必須在 UI 線程執(zhí)行。常用的實(shí)現(xiàn)方式有多種,我這里貼出其中一種:

new Thread() { 
  @Override 
  public void run() { 
    super.run(); 
    for (File folder : folders) { 
      File[] files = folder.listFiles(); 
      for (File file : files) { 
        if (file.getName().endsWith(".png")) { 
          final Bitmap bitmap = getBitmapFromFile(file); 
          getActivity().runOnUiThread(new Runnable() { 
            @Override 
            public void run() { 
              imageCollectorView.addImage(bitmap); 
            } 
          }); 
        } 
      } 
    } 
  } 
}.start();

里面的判斷是不是看起來有點(diǎn)暈暈,當(dāng)然這是我自己寫的,我一眼就能看清楚里面的邏輯,但是如果換做是別人來閱讀你的代碼,這就比較的尷尬了!

我們來看看使用RxJava的代碼:

Observable.from(folders) 
  .flatMap(new Func1<File, Observable<File>>() { 
    @Override 
    public Observable<File> call(File file) { 
      return Observable.from(file.listFiles()); 
    } 
  }) 
  .filter(new Func1<File, Boolean>() { 
    @Override 
    public Boolean call(File file) { 
      return file.getName().endsWith(".png"); 
    } 
  }) 
  .map(new Func1<File, Bitmap>() { 
    @Override 
    public Bitmap call(File file) { 
      return getBitmapFromFile(file); 
    } 
  }) 
  .subscribeOn(Schedulers.io()) 
  .observeOn(AndroidSchedulers.mainThread()) 
  .subscribe(new Action1<Bitmap>() { 
    @Override 
    public void call(Bitmap bitmap) { 
      imageCollectorView.addImage(bitmap); 
    } 
  });

是不是明了,雖然說算不上簡(jiǎn)單,但是習(xí)慣了就一如既往了!

如果你使用的AndroidStudio的話,你打開Java文件的時(shí)候,你會(huì)看到被自動(dòng) Lambda 化的預(yù)覽,這將讓你更加清晰地看到程序邏輯:

Observable.from(folders) 
  .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) 
  .filter((Func1) (file) -> { file.getName().endsWith(".png") }) 
  .map((Func1) (file) -> { getBitmapFromFile(file) }) 
  .subscribeOn(Schedulers.io()) 
  .observeOn(AndroidSchedulers.mainThread()) 
  .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

不過如果你對(duì)Java8還不是很了解的話呢這一段可以暫時(shí)忽略,但是你可以移步到這里了解一下Java8:Java8部分新特性介紹

看完代碼,是不是有種相見恨晚的沖動(dòng)?別急,我們來慢慢了解RxJava!

前面已經(jīng)提到他是基于Java觀察者設(shè)計(jì)模式的,這個(gè)模式上面有給大家鏈接,可以去看看,這里不不坐過多的介紹,我們來介紹一下RxJava中的觀察者模式:

RxJava 的觀察者模式

一、說明

1)RxJava 有四個(gè)基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時(shí)候發(fā)出事件來通知 Observer。

2)與傳統(tǒng)觀察者模式不同, RxJava 的事件回調(diào)方法除了普通事件 onNext() (相當(dāng)于 onClick() / onEvent())之外,還定義了兩個(gè)特殊的事件:onCompleted() 和 onError()。

3)onCompleted(): 事件隊(duì)列完結(jié)。RxJava 不僅把每個(gè)事件單獨(dú)處理,還會(huì)把它們看做一個(gè)隊(duì)列。RxJava 規(guī)定,當(dāng)不會(huì)再有新的 onNext() 發(fā)出時(shí),需要觸發(fā) onCompleted() 方法作為標(biāo)志。

4)onError(): 事件隊(duì)列異常。在事件處理過程中出異常時(shí),onError() 會(huì)被觸發(fā),同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出。

5)在一個(gè)正確運(yùn)行的事件序列中, onCompleted() 和 onError() 有且只有一個(gè),并且是事件序列中的最后一個(gè)。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)。

二、實(shí)現(xiàn)

1) 創(chuàng)建 Observer

Observer 即觀察者,它決定事件觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨椤?RxJava 中的 Observer 接口的實(shí)現(xiàn)方式:

Observer<String> observer = new Observer<String>() { 
  @Override 
  public void onNext(String s) { 
    Log.d(tag, "Item: " + s); 
  } 
  @Override 
  public void onCompleted() { 
    Log.d(tag, "Completed!"); 
  } 
  @Override 
  public void onError(Throwable e) { 
    Log.d(tag, "Error!"); 
  } 
};

除了 Observer 接口之外,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對(duì) Observer 接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的:

Subscriber<String> subscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { 
    Log.d(tag, "Item: " + s); 
  } 
  @Override 
  public void onCompleted() { 
    Log.d(tag, "Completed!"); 
  } 
  @Override 
  public void onError(Throwable e) { 
    Log.d(tag, "Error!"); 
  } 
};

不僅基本使用方式一樣,實(shí)質(zhì)上,在 RxJava 的 subscribe 過程中,Observer 也總是會(huì)先被轉(zhuǎn)換成一個(gè) Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區(qū)別對(duì)于使用者來說主要有兩點(diǎn):

onStart(): 這是 Subscriber 增加的方法。它會(huì)在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置。這是一個(gè)可選方法,默認(rèn)情況下它的實(shí)現(xiàn)為空。需要注意的是,如果對(duì)準(zhǔn)備工作的線程有要求(例如彈出一個(gè)顯示進(jìn)度的對(duì)話框,這必須在主線程執(zhí)行), onStart() 就不適用了,因?yàn)樗偸窃?subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來做準(zhǔn)備工作,可以使用 doOnSubscribe() 方法,具體可以在后面的文中看到。

unsubscribe(): 這是 Subscriber 所實(shí)現(xiàn)的另一個(gè)接口 Subscription 的方法,用于取消訂閱。在這個(gè)方法被調(diào)用后,Subscriber 將不再接收事件。一般在這個(gè)方法調(diào)用前,可以使用 isUnsubscribed() 先判斷一下狀態(tài)。 unsubscribe() 這個(gè)方法很重要,因?yàn)樵?subscribe() 之后, Observable 會(huì)持有 Subscriber 的引用,這個(gè)引用如果不能及時(shí)被釋放,將有內(nèi)存泄露的風(fēng)險(xiǎn)。所以最好保持一個(gè)原則:要在不再使用的時(shí)候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。

2) 創(chuàng)建 Observable

Observable 即被觀察者,它決定什么時(shí)候觸發(fā)事件以及觸發(fā)怎樣的事件。 RxJava 使用 create() 方法來創(chuàng)建一個(gè) Observable ,并為它定義事件觸發(fā)規(guī)則:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() { 
  @Override 
  public void call(Subscriber<? super String> subscriber) { 
    subscriber.onNext("Hello"); 
    subscriber.onNext("Hi"); 
    subscriber.onNext("Aloha"); 
    subscriber.onCompleted(); 
  } 
});

可以看到,這里傳入了一個(gè) OnSubscribe 對(duì)象作為參數(shù)。OnSubscribe 會(huì)被存儲(chǔ)在返回的 Observable 對(duì)象中,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng) Observable 被訂閱的時(shí)候,OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)(對(duì)于上面的代碼,就是觀察者Subscriber 將會(huì)被調(diào)用三次 onNext() 和一次 onCompleted())。這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實(shí)現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式。

create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法?;谶@個(gè)方法, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列,例如:

just(T...): 將傳入的參數(shù)依次發(fā)送出來。

Observable observable = Observable.just("Hello", "Hi", "Aloha"); 
// 將會(huì)依次調(diào)用: 
// onNext("Hello"); 
// onNext("Hi"); 
// onNext("Aloha"); 
// onCompleted();

from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對(duì)象后,依次發(fā)送出來。

String[] words = {"Hello", "Hi", "Aloha"}; 
Observable observable = Observable.from(words); 
// 將會(huì)依次調(diào)用: 
// onNext("Hello"); 
// onNext("Hi"); 
// onNext("Aloha"); 
// onCompleted();

上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價(jià)的。

3) Subscribe (訂閱)

創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了。代碼形式很簡(jiǎn)單:

observable.subscribe(observer); 
// 或者: 
observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。 
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載。 
public Subscription subscribe(Subscriber subscriber) { 
  subscriber.onStart(); 
  onSubscribe.call(subscriber); 
  return subscriber; 
}

可以看到,subscriber() 做了3件事:

1.調(diào)用 Subscriber.onStart() 。這個(gè)方法在前面已經(jīng)介紹過,是一個(gè)可選的準(zhǔn)備方法。

2.調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 。在這里,事件發(fā)送的邏輯開始運(yùn)行。從這也可以看出,在 RxJava 中, Observable 并不是在創(chuàng)建的時(shí)候就立即開始發(fā)送事件,而是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法執(zhí)行的時(shí)候。

3.將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().
除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調(diào),RxJava 會(huì)自動(dòng)根據(jù)定義創(chuàng)建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() { 
  // onNext() 
  @Override 
  public void call(String s) { 
    Log.d(tag, s); 
  } 
}; 
Action1<Throwable> onErrorAction = new Action1<Throwable>() { 
  // onError() 
  @Override 
  public void call(Throwable throwable) { 
    // Error handling 
  } 
}; 
Action0 onCompletedAction = new Action0() { 
  // onCompleted() 
  @Override 
  public void call() { 
    Log.d(tag, "completed"); 
  } 
}; 
 
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext() 
observable.subscribe(onNextAction); 
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError() 
observable.subscribe(onNextAction, onErrorAction); 
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted() 
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

簡(jiǎn)單解釋一下這段代碼中出現(xiàn)的 Action1 和 Action0。 Action0 是 RxJava 的一個(gè)接口,它只有一個(gè)方法 call(),這個(gè)方法是無(wú)參無(wú)返回值的;由于 onCompleted() 方法也是無(wú)參無(wú)返回值的,因此 Action0 可以被當(dāng)成一個(gè)包裝對(duì)象,將 onCompleted() 的內(nèi)容打包起來將自己作為一個(gè)參數(shù)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)。這樣其實(shí)也可以看做將 onCompleted() 方法作為參數(shù)傳進(jìn)了 subscribe(),相當(dāng)于其他某些語(yǔ)言中的『閉包』。 Action1 也是一個(gè)接口,它同樣只有一個(gè)方法 call(T param),這個(gè)方法也無(wú)返回值,但有一個(gè)參數(shù);與 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無(wú)返回值的,因此 Action1 可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)。事實(shí)上,雖然 Action0 和 Action1 在 API 中使用最廣泛,但 RxJava 是提供了多個(gè) ActionX 形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無(wú)返回值的方法。

4) 場(chǎng)景示例

下面舉兩個(gè)例子:

a. 打印字符串?dāng)?shù)組

將字符串?dāng)?shù)組 names 中的所有字符串依次打印出來:

String[] names = ...; 
Observable.from(names) 
  .subscribe(new Action1<String>() { 
    @Override 
    public void call(String name) { 
      Log.d(tag, name); 
    } 
  });

b. 由 id 取得圖片并顯示

由指定的一個(gè) drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中,并在出現(xiàn)異常的時(shí)候打印 Toast 報(bào)錯(cuò):

int drawableRes = ...; 
ImageView imageView = ...; 
Observable.create(new OnSubscribe<Drawable>() { 
  @Override 
  public void call(Subscriber<? super Drawable> subscriber) { 
    Drawable drawable = getTheme().getDrawable(drawableRes)); 
    subscriber.onNext(drawable); 
    subscriber.onCompleted(); 
  } 
}).subscribe(new Observer<Drawable>() { 
  @Override 
  public void onNext(Drawable drawable) { 
    imageView.setImageDrawable(drawable); 
  } 
 
  @Override 
  public void onCompleted() { 
  } 
 
  @Override 
  public void onError(Throwable e) { 
    Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); 
  } 
});

正如上面兩個(gè)例子這樣,創(chuàng)建出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來,一次 RxJava 的基本使用就完成了。非常簡(jiǎn)單。

注意:在 RxJava 的默認(rèn)規(guī)則中,事件的發(fā)出和消費(fèi)都是在同一個(gè)線程的。也就是說,如果只用上面的方法,實(shí)現(xiàn)出來的只是一個(gè)同步的觀察者模式。觀察者模式本身的目的就是『后臺(tái)處理,前臺(tái)回調(diào)』的異步機(jī)制,因此異步對(duì)于 RxJava 是至關(guān)重要的。而要實(shí)現(xiàn)異步,則需要用到 RxJava 的另一個(gè)概念: Scheduler 。

線程控制 —— Scheduler (一)

前言:

在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個(gè)線程調(diào)用 subscribe(),就在哪個(gè)線程生產(chǎn)事件;在哪個(gè)線程生產(chǎn)事件,就在哪個(gè)線程消費(fèi)事件。如果需要切換線程,就需要用到 Scheduler (調(diào)度器)。

1) Scheduler 的 API (一)

在RxJava 中,Scheduler ——調(diào)度器,相當(dāng)于線程控制器,RxJava 通過它來指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程。RxJava 已經(jīng)內(nèi)置了幾個(gè) Scheduler ,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:

Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler。

Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。

Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫(kù)、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計(jì)算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。

Schedulers.computation(): 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會(huì)被 I/O 等操作限制性能的操作,例如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU。

另外, Android 還有一個(gè)專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運(yùn)行。

有了這幾個(gè) Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個(gè)方法來對(duì)線程進(jìn)行控制了。 * subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時(shí)所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。 * observeOn(): 指定 Subscriber 所運(yùn)行在的線程?;蛘呓凶鍪录M(fèi)的線程。

代碼來理解上面的文字?jǐn)⑹觯?/p>

Observable.just(1, 2, 3, 4) 
  .subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程 
  .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程 
  .subscribe(new Action1<Integer>() { 
    @Override 
    public void call(Integer number) { 
      Log.d(tag, "number:" + number); 
    } 
  });

上面這段代碼中,由于 subscribeOn(Schedulers.io()) 的指定,被創(chuàng)建的事件的內(nèi)容 1、2、3、4 將會(huì)在 IO 線程發(fā)出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 。事實(shí)上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用于多數(shù)的 『后臺(tái)線程取數(shù)據(jù),主線程顯示』的程序策略。

而前面提到的由圖片 id 取得圖片并顯示的例子,如果也加上這兩句:

int drawableRes = ...; 
ImageView imageView = ...; 
Observable.create(new OnSubscribe<Drawable>() { 
  @Override 
  public void call(Subscriber<? super Drawable> subscriber) { 
    Drawable drawable = getTheme().getDrawable(drawableRes)); 
    subscriber.onNext(drawable); 
    subscriber.onCompleted(); 
  } 
}) 
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程 
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程 
.subscribe(new Observer<Drawable>() { 
  @Override 
  public void onNext(Drawable drawable) { 
    imageView.setImageDrawable(drawable); 
  } 
  @Override 
  public void onCompleted() { 
  } 
  @Override 
  public void onError(Throwable e) { 
    Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); 
  } 
});

那么,加載圖片將會(huì)發(fā)生在 IO 線程,而設(shè)置圖片則被設(shè)定在了主線程。這就意味著,即使加載圖片耗費(fèi)了幾十甚至幾百毫秒的時(shí)間,也不會(huì)造成絲毫界面的卡頓。

2) Scheduler 的原理 (一)

RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把線程切換了,怎么做到的?而且 subscribe() 不是最外層直接調(diào)用的方法嗎,它竟然也能被指定線程?)。然而 Scheduler 的原理需要放在后面講,因?yàn)樗脑硎且韵乱还?jié)《變換》的原理作為基礎(chǔ)的。

好吧這一節(jié)其實(shí)我屁也沒說,只是為了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。

變換

RxJava 提供了對(duì)事件序列進(jìn)行變換的支持,這是它的核心功能之一,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的對(duì)象或整個(gè)序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或事件序列。概念說著總是模糊難懂的,來看 API。

1) API

首先看一個(gè) map() 的例子:

Observable.just("images/logo.png") // 輸入類型 String 
  .map(new Func1<String, Bitmap>() { 
    @Override 
    public Bitmap call(String filePath) { // 參數(shù)類型 String 
      return getBitmapFromPath(filePath); // 返回類型 Bitmap 
    } 
  }) 
  .subscribe(new Action1<Bitmap>() { 
    @Override 
    public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap 
      showBitmap(bitmap); 
    } 
  });

這里出現(xiàn)了一個(gè)叫做 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個(gè)接口,用于包裝含有一個(gè)參數(shù)的方法。 Func1 和 Action 的區(qū)別在于, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。

可以看到,map() 方法將參數(shù)中的 String 對(duì)象轉(zhuǎn)換成一個(gè) Bitmap 對(duì)象后返回,而在經(jīng)過 map() 方法后,事件的參數(shù)類型也由 String 轉(zhuǎn)為了 Bitmap。這種直接變換對(duì)象并返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠(yuǎn)不止這樣,它不僅可以針對(duì)事件對(duì)象,還可以針對(duì)整個(gè)事件隊(duì)列,這使得 RxJava 變得非常靈活。我列舉幾個(gè)常用的變換:

map(): 事件對(duì)象的直接變換,具體功能上面已經(jīng)介紹過。它是 RxJava 最常用的變換。

flatMap(): 這是一個(gè)很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它。 首先假設(shè)這么一種需求:假設(shè)有一個(gè)數(shù)據(jù)結(jié)構(gòu)『學(xué)生』,現(xiàn)在需要打印出一組學(xué)生的名字。實(shí)現(xiàn)方式很簡(jiǎn)單:

Student[] students = ...; 
Subscriber<String> subscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String name) { 
    Log.d(tag, name); 
  } 
  ... 
}; 
Observable.from(students) 
  .map(new Func1<Student, String>() { 
    @Override 
    public String call(Student student) { 
      return student.getName(); 
    } 
  }) 
  .subscribe(subscriber);

很簡(jiǎn)單。那么再假設(shè):如果要打印出每個(gè)學(xué)生所需要修的所有課程的名稱呢?(需求的區(qū)別在于,每個(gè)學(xué)生只有一個(gè)名字,但卻有多個(gè)課程。)首先可以這樣實(shí)現(xiàn):

Student[] students = ...; 
Subscriber<Student> subscriber = new Subscriber<Student>() { 
  @Override 
  public void onNext(Student student) { 
    List<Course> courses = student.getCourses(); 
    for (int i = 0; i < courses.size(); i++) { 
      Course course = courses.get(i); 
      Log.d(tag, course.getName()); 
    } 
  } 
  ... 
}; 
Observable.from(students) 
  .subscribe(subscriber);

依然很簡(jiǎn)單。那么如果我不想在 Subscriber 中使用 for 循環(huán),而是希望 Subscriber 中直接傳入單個(gè)的 Course 對(duì)象呢(這對(duì)于代碼復(fù)用很重要)?用 map() 顯然是不行的,因?yàn)?map() 是一對(duì)一的轉(zhuǎn)化,而我現(xiàn)在的要求是一對(duì)多的轉(zhuǎn)化。那怎么才能把一個(gè) Student 轉(zhuǎn)化成多個(gè) Course 呢?

這個(gè)時(shí)候,就需要用 flatMap() 了:

Student[] students = ...; 
Subscriber<Course> subscriber = new Subscriber<Course>() { 
  @Override 
  public void onNext(Course course) { 
    Log.d(tag, course.getName()); 
  } 
  ... 
}; 
Observable.from(students) 
  .flatMap(new Func1<Student, Observable<Course>>() { 
    @Override 
    public Observable<Course> call(Student student) { 
      return Observable.from(student.getCourses()); 
    } 
  }) 
  .subscribe(subscriber);

從上面的代碼可以看出, flatMap() 和 map() 有一個(gè)相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對(duì)象。但需要注意,和 map() 不同的是, flatMap() 中返回的是個(gè) Observable 對(duì)象,并且這個(gè) Observable 對(duì)象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中。 flatMap() 的原理是這樣的:1. 使用傳入的事件對(duì)象創(chuàng)建一個(gè) Observable 對(duì)象;2. 并不發(fā)送這個(gè) Observable, 而是將它激活,于是它開始發(fā)送事件;3. 每一個(gè)創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個(gè) Observable ,而這個(gè) Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。這三個(gè)步驟,把事件拆成了兩級(jí),通過一組新創(chuàng)建的 Observable 將初始的對(duì)象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個(gè)『鋪平』就是 flatMap() 所謂的 flat。

擴(kuò)展:由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請(qǐng)求。示例代碼(Retrofit + RxJava):

networkClient.token() // 返回 Observable<String>,在訂閱時(shí)請(qǐng)求 token,并在響應(yīng)后發(fā)送 token 
  .flatMap(new Func1<String, Observable<Messages>>() { 
    @Override 
    public Observable<Messages> call(String token) { 
      // 返回 Observable<Messages>,在訂閱時(shí)請(qǐng)求消息列表,并在響應(yīng)后發(fā)送請(qǐng)求到的消息列表 
      return networkClient.messages(); 
    } 
  }) 
  .subscribe(new Action1<Messages>() { 
    @Override 
    public void call(Messages messages) { 
      // 處理顯示消息列表 
      showMessages(messages); 
    } 
  });

傳統(tǒng)的嵌套請(qǐng)求需要使用嵌套的 Callback 來實(shí)現(xiàn)。而通過 flatMap() ,可以把嵌套的請(qǐng)求寫在一條鏈中,從而保持程序邏輯的清晰。

throttleFirst(): 在每次事件觸發(fā)后的一定時(shí)間間隔內(nèi)丟棄新的事件。常用作去抖動(dòng)過濾,例如按鈕的點(diǎn)擊監(jiān)聽器: RxView.clickEvents(button) // RxBinding 代碼,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間

隔為 500ms .subscribe(subscriber); 媽媽再也不怕我的用戶手抖點(diǎn)開兩個(gè)重復(fù)的界面啦。
此外, RxJava 還提供很多便捷的方法來實(shí)現(xiàn)事件序列的變換,這里就不一一舉例了。

2) 變換的原理:lift()

這些變換雖然功能各有不同,但實(shí)質(zhì)上都是針對(duì)事件序列的處理和再發(fā)送。而在 RxJava 的內(nèi)部,它們是基于同一個(gè)基礎(chǔ)的變換方法: lift(Operator)。首先看一下 lift() 的內(nèi)部實(shí)現(xiàn)(僅核心代碼):

// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載。

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { 
  return Observable.create(new OnSubscribe<R>() { 
    @Override 
    public void call(Subscriber subscriber) { 
      Subscriber newSubscriber = operator.call(subscriber); 
      newSubscriber.onStart(); 
      onSubscribe.call(newSubscriber); 
    } 
  }); 
}

這段代碼很有意思:它生成了一個(gè)新的 Observable 并返回,而且創(chuàng)建新 Observable 所用的參數(shù) OnSubscribe 的回調(diào)方法 call() 中的實(shí)現(xiàn)竟然看起來和前面講過的 Observable.subscribe() 一樣!然而它們并不一樣喲~不一樣的地方關(guān)鍵就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的對(duì)象不同(高能預(yù)警:接下來的幾句話可能會(huì)導(dǎo)致身體的嚴(yán)重不適)——

subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對(duì)象,這個(gè)沒有問題,但是 lift() 之后的情況就復(fù)雜了點(diǎn)。

當(dāng)含有 lift() 時(shí):

1.lift() 創(chuàng)建了一個(gè) Observable 后,加上之前的原始 Observable,已經(jīng)有兩個(gè) Observable 了;

2.而同樣地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了兩個(gè) OnSubscribe;

3.當(dāng)用戶調(diào)用經(jīng)過 lift() 后的 Observable 的 subscribe() 的時(shí)候,使用的是 lift() 所返回的新的 Observable ,于是它所觸發(fā)的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個(gè) OnSubscribe;

4.而這個(gè)新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在這個(gè) call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個(gè)新的 Subscriber(Operator 就是在這里,通過自己的 call() 方法將新 Subscriber 和原始 Subscriber 進(jìn)行關(guān)聯(lián),并插入自己的『變換』代碼以實(shí)現(xiàn)變換),然后利用這個(gè)新 Subscriber 向原始 Observable 進(jìn)行訂閱。

這樣就實(shí)現(xiàn)了 lift() 過程,有點(diǎn)像一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換。
精簡(jiǎn)掉細(xì)節(jié)的話,也可以這么說:在 Observable 執(zhí)行了 lift(Operator) 方法之后,會(huì)返回一個(gè)新的 Observable,這個(gè)新的 Observable 會(huì)像一個(gè)代理一樣,負(fù)責(zé)接收原始的 Observable 發(fā)出的事件,并在處理后發(fā)送給 Subscriber。

舉一個(gè)具體的 Operator 的實(shí)現(xiàn)。下面這是一個(gè)將事件中的 Integer 對(duì)象轉(zhuǎn)換成 String 的例子,僅供參考:

observable.lift(new Observable.Operator<String, Integer>() { 
  @Override 
  public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { 
    // 將事件序列中的 Integer 對(duì)象轉(zhuǎn)換為 String 對(duì)象 
    return new Subscriber<Integer>() { 
      @Override 
      public void onNext(Integer integer) { 
        subscriber.onNext("" + integer); 
      } 
      @Override 
      public void onCompleted() { 
        subscriber.onCompleted(); 
      } 
      @Override 
      public void onError(Throwable e) { 
        subscriber.onError(e); 
      } 
    }; 
  } 
});

3) compose: 對(duì) Observable 整體的變換

除了 lift() 之外, Observable 還有一個(gè)變換方法叫做 compose(Transformer)。它和 lift() 的區(qū)別在于, lift() 是針對(duì)事件項(xiàng)和事件序列的,而 compose() 是針對(duì) Observable 自身進(jìn)行變換。舉個(gè)例子,假設(shè)在程序中有多個(gè) Observable ,并且他們都需要應(yīng)用一組相同的 lift() 變換。你可以這么寫:

observable1 
  .lift1() 
  .lift2() 
  .lift3() 
  .lift4() 
  .subscribe(subscriber1); 
observable2 
  .lift1() 
  .lift2() 
  .lift3() 
  .lift4() 
  .subscribe(subscriber2); 
observable3 
  .lift1() 
  .lift2() 
  .lift3() 
  .lift4() 
  .subscribe(subscriber3); 
observable4 
  .lift1() 
  .lift2() 
  .lift3() 
  .lift4() 
  .subscribe(subscriber1);

你覺得這樣太不軟件工程了,于是你改成了這樣:

private Observable liftAll(Observable observable) { 
  return observable 
    .lift1() 
    .lift2() 
    .lift3() 
    .lift4(); 
} 
... 
liftAll(observable1).subscribe(subscriber1); 
liftAll(observable2).subscribe(subscriber2); 
liftAll(observable3).subscribe(subscriber3); 
liftAll(observable4).subscribe(subscriber4);

可讀性、可維護(hù)性都提高了??墒?Observable 被一個(gè)方法包起來,這種方式對(duì)于 Observale 的靈活性似乎還是增添了那么點(diǎn)限制。怎么辦?這個(gè)時(shí)候,就應(yīng)該用 compose() 來解決了:

public class LiftAllTransformer implements Observable.Transformer<Integer, String> { 
  @Override 
  public Observable<String> call(Observable<Integer> observable) { 
    return observable 
      .lift1() 
      .lift2() 
      .lift3() 
      .lift4(); 
  } 
} 
... 
Transformer liftAll = new LiftAllTransformer(); 
observable1.compose(liftAll).subscribe(subscriber1); 
observable2.compose(liftAll).subscribe(subscriber2); 
observable3.compose(liftAll).subscribe(subscriber3); 
observable4.compose(liftAll).subscribe(subscriber4);

像上面這樣,使用 compose() 方法,Observable 可以利用傳入的 Transformer 對(duì)象的 call 方法直接對(duì)自身進(jìn)行處理,也就不必被包在方法的里面了。

compose() 的原理比較簡(jiǎn)單,不附圖嘍。

線程控制:Scheduler (二)

除了靈活的變換,RxJava 另一個(gè)牛逼的地方,就是線程的自由控制。

1) Scheduler 的 API (二)

前面講到了,可以利用 subscribeOn() 結(jié)合 observeOn() 來實(shí)現(xiàn)線程控制,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程??墒窃诹私饬?map() flatMap() 等變換方法后,有些好事的(其實(shí)就是當(dāng)初剛接觸 RxJava 時(shí)的我)就問了:能不能多切換幾次線程?
答案是:能。因?yàn)?observeOn() 指定的是 Subscriber 的線程,而這個(gè) Subscriber 并不是(嚴(yán)格說應(yīng)該為『不一定是』,但這里不妨理解為『不是』)subscribe() 參數(shù)中的 Subscriber ,而是 observeOn() 執(zhí)行時(shí)的當(dāng)前 Observable 所對(duì)應(yīng)的 Subscriber ,即它的直接下級(jí) Subscriber 。換句話說,observeOn() 指定的是它之后的操作所在的線程。因此如果有多次切換線程的需求,只要在每個(gè)想要切換線程的位置調(diào)用一次 observeOn() 即可。上代碼:

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定 
  .subscribeOn(Schedulers.io()) 
  .observeOn(Schedulers.newThread()) 
  .map(mapOperator) // 新線程,由 observeOn() 指定 
  .observeOn(Schedulers.io()) 
  .map(mapOperator2) // IO 線程,由 observeOn() 指定 
  .observeOn(AndroidSchedulers.mainThread)  
  .subscribe(subscriber); // Android 主線程,由 observeOn() 指定

如上,通過 observeOn() 的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換。

不過,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能調(diào)用一次的。

又有好事的(其實(shí)還是當(dāng)初的我)問了:如果我非要調(diào)用多次 subscribeOn() 呢?會(huì)有什么效果?

這個(gè)問題先放著,我們還是從 RxJava 線程控制的原理說起吧。

2) Scheduler 的原理(二)

其實(shí), subscribeOn() 和 observeOn() 的內(nèi)部實(shí)現(xiàn),也是用的 lift()。具體看圖(不同顏色的箭頭表示不同的線程):
從圖中可以看出,subscribeOn() 和 observeOn() 都做了線程切換的工作(圖中的 "schedule..." 部位)。不同的是, subscribeOn() 的線程切換發(fā)生在 OnSubscribe 中,即在它通知上一級(jí) OnSubscribe 時(shí),這時(shí)事件還沒有開始發(fā)送,因此 subscribeOn() 的線程控制可以從事件發(fā)出的開端就造成影響;而 observeOn() 的線程切換則發(fā)生在它內(nèi)建的 Subscriber 中,即發(fā)生在它即將給下一級(jí) Subscriber 發(fā)送事件時(shí),因此 observeOn() 控制的是它后面的線程。

3) 延伸:doOnSubscribe()

然而,雖然超過一個(gè)的 subscribeOn() 對(duì)事件處理的流程沒有影響,但在流程之前卻是可以利用的。

在前面講 Subscriber 的時(shí)候,提到過 Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由于在 subscribe() 發(fā)生時(shí)就被調(diào)用了,因此不能指定線程,而是只能執(zhí)行在 subscribe() 被調(diào)用時(shí)的線程。這就導(dǎo)致如果 onStart() 中含有對(duì)線程有要求的代碼(例如在界面上顯示一個(gè) ProgressBar,這必須在主線程執(zhí)行),將會(huì)有線程非法的風(fēng)險(xiǎn),因?yàn)橛袝r(shí)你無(wú)法預(yù)測(cè) subscribe() 將會(huì)在什么線程執(zhí)行。

而與 Subscriber.onStart() 相對(duì)應(yīng)的,有一個(gè)方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程。默認(rèn)情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。

示例代碼:

Observable.create(onSubscribe) 
  .subscribeOn(Schedulers.io()) 
  .doOnSubscribe(new Action0() { 
    @Override 
    public void call() { 
      progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行 
    } 
  }) 
  .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程 
  .observeOn(AndroidSchedulers.mainThread()) 
  .subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一個(gè) subscribeOn() ,就能指定準(zhǔn)備工作的線程了。

以上是“Rxjava有什么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI