溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

rxjs系列 -- Observale與Observer

發(fā)布時間:2020-07-17 05:43:03 來源:網(wǎng)絡(luò) 閱讀:370 作者:eflypro小普 欄目:開發(fā)技術(shù)

在RxJS中,一個數(shù)據(jù)流的完整流向至少需要包含Observable和Observer。Observable是被觀察者,Observer是觀察者,Observer訂閱Observable,Observable向Observer推送數(shù)據(jù)以完成整個過程。

可以說一個完整的RxJS數(shù)據(jù)流就是Observable和Observer之間的互動游戲。

Observable實現(xiàn)了下面兩種設(shè)計模式:

觀察者模式
迭代器模式

由于不是寫設(shè)計模式的文章,所以一筆帶過,感興趣的可以自己看下相關(guān)的書,需要一提的是,任何一種設(shè)計模式,指的都是解決某個特定類型問題的方法。問題復(fù)雜多變,往往不是靠單獨一種設(shè)計模式能夠解決的,更需要多種設(shè)計模式的組合,而RxJS的Observable就是觀察者模式和迭代器模式的組合。

Observale與Observer的互動

訂閱

首先我們先創(chuàng)建一個Observable,作為數(shù)據(jù)源

 const { Observable } = rxjs;    const sourceObservable$ = new Observable(observer => {
        observer.next('hello');
        observer.next('rxjs');
    });

然后再設(shè)置一個observer,作為訂閱者

const Observer = {        next: res => console.log(res)
    };

最后Observer訂閱Observable,數(shù)據(jù)開始流動

   sourceObservable$.subscribe(Observer);    // hello
    // rxjs

一個非常簡單的數(shù)據(jù)流就完成了。

狀態(tài)流轉(zhuǎn)

在上面的例子中,只有next這一種狀態(tài),不斷地往下游傳遞數(shù)據(jù),但實際上,數(shù)據(jù)流在流動過程中有三種狀態(tài):

next,正常流轉(zhuǎn)到下一個狀態(tài)
error,捕獲到異常,流動中止
complete,數(shù)據(jù)流已經(jīng)完成,流動終止

接下來改一下上面的代碼,補上另外兩種狀態(tài)

const sourceObservable$ = new Observable(observer => {
        observer.next('hello');
        observer.error('the data is wrong!');
        observer.next('rxjs');
        observer.complete();
    });    const Observer = {        next: res => console.log(res),        error: err => console.log(err),        complete: () => console.log('complete!')
    }

    sourceObservable$.subscribe(Observer);    // hello
    // the data is wrong!

現(xiàn)在再看Observer是不是有種似曾相識的感覺?沒錯,Observer其實就是一個迭代器了。

至此,不難發(fā)現(xiàn),Observable與Observer的互動過程其實就是,Observer通過觀察者模式向Observable注入了一個迭代器,通過next游標(biāo)控制數(shù)據(jù)源Observable的數(shù)據(jù)流動,并根據(jù)實際流動過程中可能發(fā)生的情況將狀態(tài)流轉(zhuǎn)到error或者complete。

取消訂閱

現(xiàn)在Observable與Observer已經(jīng)通過subscribe建立了聯(lián)系,但有時候我們需要把這種聯(lián)系斷開,比如組件銷毀的時候。這時候就需要取消訂閱了,看下面的例子

const sourceObservable$ = new Observable(observer => {        let number = 1;
        setInterval(() => {
            observer.next(number++);
        }, 1000);
    });    const Observer = {        next: res => console.log(res),        error: err => console.log(err),        complete: () => console.log('complete!')
    }    const subscription = sourceObservable$.subscribe(Observer);

    setTimeout(() => {        // 取消訂閱
        subscription.unsubscribe();
    }, 4000);    // 1
    // 2
    // 3
    // 4

需要注意的是,在本例子中,雖然取消訂閱了,但是作為數(shù)據(jù)源的sourceObservable$并沒有終結(jié),因為始終沒有調(diào)用complete方法,只是Observer不再接收推送的數(shù)據(jù)了

為了便于觀察其中的差異,我們將sourceObservable$改一下

const sourceObservable$ = new Observable(observer => {        let number = 1;
        setInterval(() => {            console.log('subscribe:' + number);
            observer.next(number++);
        }, 1000);
    });    // subscribe: 1
    // 1
    // subscribe: 2
    // 2
    // subscribe: 3
    // 3
    // subscribe: 4
    // 4
    // subscribe: 5
    // subscribe: 6
    // ...

Hot Observable和Cold Observable

假設(shè)我們有這樣一個場景,一個Observable被兩個ObserverA、ObserverB先后相隔N秒訂閱了,那么ObserverB是否需要接收訂閱之前的數(shù)據(jù)呢?

其實沒有固定的答案,是否接收需要根據(jù)實際的業(yè)務(wù)場景來定,正是因為如此,所以便有了Hot Observable以及Cold Observable。

Hot Observable:熱被觀察對象,類似于直播,看到的內(nèi)容是從你打開直播的那一刻開始的,之前的內(nèi)容已經(jīng)錯過。只能接收訂閱那一刻開始的數(shù)據(jù)。
Cold Observable:冷被觀察對象,類似于錄播,看到的內(nèi)容是你打開的視頻的第一秒種開始的。每次訂閱都會從頭開始接收數(shù)據(jù)

先看下Hot Observable,每次訂閱只推送當(dāng)前的數(shù)據(jù),所以不會在每次訂閱時重置數(shù)據(jù)推送,代碼如下:

 // 先產(chǎn)生數(shù)據(jù)
    let number = 1;    const sourceObservale$ = new Observable(observer => {        let num = number;
        setInterval(() => {
            observer.next(num++);
            number = num;
        }, 1000);
    });    const ObserverA = ObserverB = {        next: item => console.log(item),        error: err => console.log(err),        complete: () => console.log('complete!')
    }

    sourceObservale$.subscribe(ObserverA);

    setTimeout(() => {
        sourceObservale$.subscribe(ObserverB);
    }, 2000);    // 1 => A
    // 2 => A
    // 3 => A
    // 3 => B
    // 4 => A
    // 4 => B
    // ..

而對于Cold Observable,每次訂閱都是重頭開始推送,所以每一次訂閱都會重置數(shù)據(jù)推送,代碼如下:

const sourceObservale$ = new Observable(observer => {        // 訂閱時產(chǎn)生數(shù)據(jù)
        let number = 1;
        setInterval(() => {
            observer.next(number++);
        }, 1000);
    });    // 中間不變
    ...    // 1 => A
    // 2 => A
    // 3 => A
    // 1 => B
    // 4 => A
    // 2 => B
    // ..

從這里也可以看出,Observable是具有惰性求值的,只有在被訂閱的時候才會執(zhí)行內(nèi)部邏輯,而Cold Observable則更進(jìn)一步,在沒有被訂閱的時候,連數(shù)據(jù)都不會產(chǎn)生。
rxjs系列 -- Observale與Observer
睿江云官網(wǎng)鏈接:http://www.eflycloud.com/#register?salesID=6DGNUTUAV

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI