溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RxJava怎么使用

發(fā)布時間:2021-12-13 14:53:57 來源:億速云 閱讀:138 作者:iii 欄目:云計算

本篇內(nèi)容主要講解“RxJava怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“RxJava怎么使用”吧!

準備工作之一:日志

本次詩函通過打印日志來觀察代碼執(zhí)行情況,會打印時間和執(zhí)行線程,這里用的是slf4j+log4j的方式;

工程創(chuàng)建完畢后,結(jié)構(gòu)如下:

RxJava怎么使用

  • log4j.propertieds文件的位置請注意,需要放在上圖紅框位置;

  • 為了在日志中打印當前線程,log4j的配置如上圖綠框所示, %t表示當前線程, %r表示程序已經(jīng)執(zhí)行的時間;

  • 在pom文件中,對日志的依賴為:

<dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.8.0-alpha2</version>
    </dependency>

準備工作之二:單元測試

驗證代碼是通過單元測試實現(xiàn)的,pom.xml文件中,對單元測試的依賴為:

<dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

單元測試代碼在如下圖紅框位置:

RxJava怎么使用

準備工作之三:支持lambda

支持lambda表達式表現(xiàn)在maven支持和intellij idea工具支持兩個方面,具體設置請參照《設置Intellij idea和maven,支持lambda表達式》

準備工作結(jié)束,可以正式開發(fā)了

RxJava的依賴庫

依賴庫選用1.0.10版本,如下:

<dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxjava</artifactId>
      <version>1.0.10</version>
    </dependency>

最簡單的觀察者模式實現(xiàn)

第一個例子,我們實踐最簡單的用法:

  1. 創(chuàng)建App.java類,聲明日志服務:

public class App 
{
    private static final Logger logger = LoggerFactory.getLogger(App.class);
  1. 開發(fā)doExecute方法實現(xiàn)基于Rxjava的觀察者模式:

public void doExecute(){
        logger.debug("start doExecute");

        //聲明一個觀察者,用來響應被觀察者發(fā)布的事件
        Observer<String> observer = new Observer<String>() {
            /**
             * 被觀察者發(fā)布結(jié)束事件的時候,該方法會被調(diào)用
             */
            public void onCompleted() {
                logger.debug("start onCompleted");
            }

            /**
             * 被觀察者發(fā)布事件期間,和觀察者處理事件期間,發(fā)生異常的時候,該方法都會被調(diào)用
             */
            public void onError(Throwable throwable) {
                logger.debug("start onError : " + throwable);
            }

            /**
             * 被觀察者發(fā)布事件后,該方法會被調(diào)用
             * @param s
             */
            public void onNext(String s) {
                logger.debug("start onNext [" + s + "]");
            }
        };

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            public void call(Subscriber<? super String> subscriber) {
                //向觀察者發(fā)布事件
                subscriber.onNext("Hello");
                //再次向觀察者發(fā)布事件
                subscriber.onNext("world");
                //通知觀察者,訂閱結(jié)束
                subscriber.onCompleted();
            }
        });

        logger.debug("try subscribe");
        
        //執(zhí)行訂閱
        observable.subscribe(observer);

        logger.debug("finish doExecute");
    }

代碼的邏輯很簡單,定義觀察者(observer),被觀察者(observable),執(zhí)行訂閱; 3. 本次測試用junit來執(zhí)行,在test目錄下創(chuàng)建一個AppTest類,具體的目錄和內(nèi)容如下圖:

RxJava怎么使用

打開控制臺,在pom.xml文件所在目錄下執(zhí)行<font color="blue">mvn test</font>,即可看到日志:

2017-06-10 10:02:02  [ main:0 ] - [ DEBUG ]  start doExecute
2017-06-10 10:02:02  [ main:19 ] - [ DEBUG ]  try subscribe
2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onNext [Hello]
2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onNext [world]
2017-06-10 10:02:02  [ main:22 ] - [ DEBUG ]  start onCompleted
2017-06-10 10:02:02  [ main:23 ] - [ DEBUG ]  finish doExecute

執(zhí)行的代碼是observable.subscribe,此代碼執(zhí)行后,觀察者的onNext和onCompleted被回調(diào);

簡化的觀察者

在上面的doExecute方法中,我們創(chuàng)建的被觀察者實現(xiàn)了onNext,onError,onCompleted這三個方法,有的場景下我們只關注onNext,對onError和onCompleted都不關心,此時我們可以使用Action1對象來替代Observer,代碼如下:

public void doAction(){
        logger.debug("start doAction");

        Action1<String> onNextAction = new Action1<String>() {
            public void call(String s) {
                logger.debug("start Action1 onNextAction [" + s + "]");
            }
        };

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("world");
                subscriber.onCompleted();
            }
        });

        logger.debug("try subscribe");

        observable.subscribe(onNextAction);

        logger.debug("finish doAction");
    }

可以看到,只要一個Action1對象即可;

另外,對于錯誤回調(diào)也可以用Action1來實現(xiàn),事件完成的回調(diào)用Action0,Action0的特點是方法沒有返回,對于的這些Action,observable.subscribe方法提供了各種重載,我們可以按照自己需要來決定使用哪種,傳入哪些Action;

簡化的被觀察者

在上面的doExecute方法中,被觀察者發(fā)布了兩個事件:onNext("Hello")和onNext("world"),我們創(chuàng)建被觀察者是通過Observable.create,然后在call方法中寫入onNext("Hello"),onNext("world")最后在寫上subscriber.onCompleted(),對于這種發(fā)布確定的對象事件的場景,rxjava已經(jīng)做了簡化,直接上代碼:

public void doFromChain(){
        logger.debug("start doFromChain");


        //聲明一個觀察者,用來響應被觀察者發(fā)布的事件
        Action1<String> observer = new Action1<String>() {
            /**
             * 被觀察者發(fā)布事件后,該方法會被調(diào)用
             * @param s
             */
            public void call(String s) {
                logger.debug("start onNext [" + s + "]");
            }
        };


        String[] array = {"Hello", "world"};

        //from方法可以直接創(chuàng)建被觀察者,并且發(fā)布array中的元素對應的事件
        Observable.from(array).subscribe(observer);

        logger.debug("finish doFromChain");
    }

如上代碼,之前我們創(chuàng)建被觀察者,并且在call方法中依次執(zhí)行onNext的操作,這些事情都被Observable.from(array)簡化了;

進一步簡化的被觀察者

Observable.from接受的是一個數(shù)組,而Observable.just可以直接接受多個元素,我們連創(chuàng)建數(shù)組的步驟都省略掉了,再把Action1簡化為lambda,可以得到更加簡化的代碼:

public void doJustChain(){
        logger.debug("start doJustChain");

        Observable.just("Hello", "world")
                .subscribe(s -> logger.debug("start onNext [" + s + "]"));

        logger.debug("finish doJustChain");
    }

經(jīng)歷了以上的實戰(zhàn),我們對Rxjava的基本能力有了了解,下面了解一些更復雜的用法;

基本變換

試想,如果被觀察者發(fā)布的事件是int型,但是觀察者是處理String型事件的,那么此觀察者如何才能處理被觀察者發(fā)布的事件呢,除了修改觀察者或者被觀察者的代碼,我們還可以使用Rxjava的變換方法-map:

public void doMap(){
        logger.debug("start doMap");

        Observable.just(1001, 1002)
        .map(intValue -> "int[" + intValue + "]")
        .subscribe(s -> logger.debug("Action1 call invoked [" + s + "]"));


        logger.debug("finish doMap");
    }

代碼中可以看到,map方法接受的是Func1接口的實現(xiàn),由于此接口只聲明了一個方法,所以這里被簡化成了lambda表達式,lambda表達式的入?yún)⒂蒵ust的入?yún)㈩愋屯茢喽鴣恚莍nt型,返回的是字符串,后面的代碼就可以直接用String型的消費者來處理事件了;

更自由的變換

map方法提供了一對一的映射,但是實際場景中未必是一對一的,例如一個int數(shù)字要發(fā)起兩個String事件,map就不合適了,RxJava還有個flatMap方法,可以提供這種能力,此處沒用lambda來簡化,可以看的更清楚:

public void doFlatMap(){
        logger.debug("start doFlatMap");

        Observable.just(101, 102, 103)
                .flatMap(new Func1<Integer, Observable<String>>() {
                    public Observable<String> call(final Integer integer) {
                        return Observable.create(new Observable.OnSubscribe<String>() {
                            public void call(Subscriber<? super String> subscriber) {
                                subscriber.onNext("after flatMap (" + integer + ")");
                                subscriber.onNext("after flatMap (" + (integer+1000) + ")");
                            }
                        });
                    }
                })
                .subscribe(s -> logger.debug("Action1 call invoked [" + s + "]"));

        logger.debug("finish doFlatMap");
    }

可以看到,被觀察者發(fā)布了三個int事件:101, 102, 103,在flatMap中訂閱了這三個事件,每個事件都可以新建一個被觀察者,這個被觀察者拿到了101,102,103,然后可以按實際需求,選擇發(fā)布一個或者多個String事件,甚至不發(fā)布,這里發(fā)布的事件,都會被觀察者收到;

線程調(diào)度

Rxjava可以指定被觀察者發(fā)布事件的線程,也可以制定觀察者處理事件的線程:

public void doSchedule(){
        logger.debug("start doSchedule");

        Observable.create(subscriber -> {
            logger.debug("enter subscribe");
            subscriber.onNext("Hello");
            subscriber.onCompleted();
        })
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .flatMap(str -> {
            logger.debug("enter flatMap");
            return Observable.create(
                    subscriber -> subscriber.onNext("after flatMap (" + str + ")")
            );
            }
        )
        .observeOn(Schedulers.newThread())
        .subscribe(s -> logger.debug("Observer's onNext invoked [" + s + "]"));
        logger.debug("finish doSchedule");
    }

subscribeOn()方法指定了被觀察者發(fā)布事件的時候使用io類型的線程處理,參數(shù)Schedulers.io()表示指定的線程來自內(nèi)部實現(xiàn)的一個無數(shù)量上限的線程池,可以重用空閑的線程,適合處理io相關的業(yè)務,特點是等待時間長,cup占用低;

observeOn()方法表示觀察者處理事件的時候使用新線程處理,Schedulers.newThread()表示總是啟用新線程,并在新線程執(zhí)行操作; 上面代碼用了兩次observeOn,分別用來指定flatMap中處理事件以及觀察者中處理事件的線程;

執(zhí)行代碼的結(jié)果:

2017-06-10 12:15:42  [ main:0 ] - [ DEBUG ]  start doSchedule
2017-06-10 12:15:42  [ RxCachedThreadScheduler-1:156 ] - [ DEBUG ]  enter subscribe
2017-06-10 12:15:42  [ main:156 ] - [ DEBUG ]  finish doSchedule
2017-06-10 12:15:42  [ RxNewThreadScheduler-2:157 ] - [ DEBUG ]  enter flatMap
2017-06-10 12:15:42  [ RxNewThreadScheduler-1:164 ] - [ DEBUG ]  Observer's onNext invoked [after flatMap (Hello)]

RxCachedThreadScheduler-1:156表示來自線程池的緩存線程; RxNewThreadScheduler-2:157和RxNewThreadScheduler-1:164表示新的線程;

常用的參數(shù)類型還有: Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程; Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

到此,相信大家對“RxJava怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關內(nèi)容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI