溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Python響應(yīng)式類(lèi)庫(kù)RxPy的示例分析

發(fā)布時(shí)間:2021-06-15 09:23:23 來(lái)源:億速云 閱讀:180 作者:小新 欄目:開(kāi)發(fā)技術(shù)

這篇文章給大家分享的是有關(guān)Python響應(yīng)式類(lèi)庫(kù)RxPy的示例分析的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。

一、基本概念

Reactive X中有幾個(gè)核心的概念,先來(lái)簡(jiǎn)單介紹一下。

1.1、Observable和Observer(可觀察對(duì)象和觀察者)

首先是Observable和Observer,它們分別是可觀察對(duì)象和觀察者。Observable可以理解為一個(gè)異步的數(shù)據(jù)源,會(huì)發(fā)送一系列的值。Observer則類(lèi)似于消費(fèi)者,需要先訂閱Observable,然后才可以接收到其發(fā)射的值??梢哉f(shuō)這組概念是設(shè)計(jì)模式中的觀察者模式和生產(chǎn)者-消費(fèi)者模式的綜合體。

1.2、Operator(操作符)

另外一個(gè)非常重要的概念就是操作符了。操作符作用于Observable的數(shù)據(jù)流上,可以對(duì)其施加各種各樣的操作。更重要的是,操作符還可以鏈?zhǔn)浇M合起來(lái)。這樣的鏈?zhǔn)胶瘮?shù)調(diào)用不僅將數(shù)據(jù)和操作分隔開(kāi)來(lái),而且代碼更加清晰可讀。一旦熟練掌握之后,你就會(huì)愛(ài)上這種感覺(jué)的。

1.3、Single(單例)

在RxJava和其變體中,還有一個(gè)比較特殊的概念叫做Single,它是一種只會(huì)發(fā)射同一個(gè)值的Observable,說(shuō)白了就是單例。當(dāng)然如果你對(duì)Java等語(yǔ)言比較熟悉,那么單例想必也很熟悉。

1.4、Subject(主體)

主體這個(gè)概念非常特殊,它既是Observable又是Observer。正是因?yàn)檫@個(gè)特點(diǎn),所以Subject可以訂閱其他Observable,也可以將發(fā)射對(duì)象給其他Observer。在某些場(chǎng)景中,Subject會(huì)有很大的作用。

1.5、Scheduler(調(diào)度器)

默認(rèn)情況下Reactive X只運(yùn)行在當(dāng)前線程下,但是如果有需要的話,也可以用調(diào)度器來(lái)讓Reactive X運(yùn)行在多線程環(huán)境下。有很多調(diào)度器和對(duì)應(yīng)的操作符,可以處理多線程場(chǎng)景下的各種要求。

1.6、Observer和Observable

先來(lái)看看一個(gè)最簡(jiǎn)單的例子,運(yùn)行的結(jié)果會(huì)依次打印這些數(shù)字。這里的of是一個(gè)操作符,可以根據(jù)給定的參數(shù)創(chuàng)建一個(gè)新的Observable。創(chuàng)建之后,就可以訂閱Observable,三個(gè)回調(diào)方法在對(duì)應(yīng)的時(shí)機(jī)執(zhí)行。一旦Observer訂閱了Observable,就會(huì)接收到后續(xù)Observable發(fā)射的各項(xiàng)值。

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)

這個(gè)例子看起來(lái)好像很簡(jiǎn)單,并且看起來(lái)沒(méi)什么用。但是當(dāng)你了解了Rx的一些核心概念,就會(huì)理解到這是一個(gè)多么強(qiáng)大的工具。更重要的是,Observable生成數(shù)據(jù)和訂閱的過(guò)程是異步的,如果你熟悉的話,就可以利用這個(gè)特性做很多事情。

1.7、操作符

在RxPy中另一個(gè)非常重要的概念就是操作符了,甚至可以說(shuō)操作符就是最重要的一個(gè)概念了。幾乎所有的功能都可以通過(guò)組合各個(gè)操作符來(lái)實(shí)現(xiàn)。熟練掌握操作符就是學(xué)好RxPy的關(guān)鍵了。操作符之間也可以用pipe函數(shù)連接起來(lái),構(gòu)成復(fù)雜的操作鏈。

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))

在RxPy中有大量操作符,可以完成各種各樣的功能。我們來(lái)簡(jiǎn)單看看其中一些常用的操作符。如果你熟悉Java8的流類(lèi)庫(kù)或者其他函數(shù)式編程類(lèi)庫(kù)的話,應(yīng)該對(duì)這些操作符感到非常親切。

1.8、創(chuàng)建型操作符

首先是創(chuàng)建Observable的操作符,列舉了一些比較常用的創(chuàng)建型操作符。

Python響應(yīng)式類(lèi)庫(kù)RxPy的示例分析

1.9、過(guò)濾型操作符

過(guò)濾型操作符的主要作用是對(duì)Observable進(jìn)行篩選和過(guò)濾。

Python響應(yīng)式類(lèi)庫(kù)RxPy的示例分析

1.10、轉(zhuǎn)換型操作符

Python響應(yīng)式類(lèi)庫(kù)RxPy的示例分析

1.11、算術(shù)操作符

Python響應(yīng)式類(lèi)庫(kù)RxPy的示例分析

1.12、Subject

Subject是一種特殊的對(duì)象,它既是Observer又是Observable。不過(guò)這個(gè)對(duì)象一般不太常用,但是假如某些用途還是很有用的。所以還是要介紹一下。下面的代碼,因?yàn)橛嗛喌臅r(shí)候第一個(gè)值已經(jīng)發(fā)射出去了,所以只會(huì)打印訂閱之后才發(fā)射的值。

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject同時(shí)是Observer和Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

另外還有幾個(gè)特殊的Subject,下面來(lái)介紹一下。

1.13、ReplaySubject

ReplaySubject是一個(gè)特殊的Subject,它會(huì)記錄所有發(fā)射過(guò)的值,不論什么時(shí)候訂閱的。所以它可以用來(lái)當(dāng)做緩存來(lái)使用。ReplaySubject還可以接受一個(gè)bufferSize參數(shù),指定可以緩存的最近數(shù)據(jù)數(shù),默認(rèn)情況下是全部。

下面的代碼和上面的代碼幾乎完全一樣,但是因?yàn)槭褂昧薘eplaySubject,所以所有的值都會(huì)被打印。當(dāng)然大家也可以試試把訂閱語(yǔ)句放到其他位置,看看輸出是否會(huì)產(chǎn)生變化。

# ReplaySubject會(huì)緩存所有值,如果指定參數(shù)的話只會(huì)緩存最近的幾個(gè)值
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4

1.14、BehaviorSubject

BehaviorSubject是一個(gè)特殊的Subject,它只會(huì)記錄最近一次發(fā)射的值。而且在創(chuàng)建它的時(shí)候,必須指定一個(gè)初始值,所有訂閱它的對(duì)象都可以接收到這個(gè)初始值。當(dāng)然如果訂閱的晚了,這個(gè)初始值同樣會(huì)被后面發(fā)射的值覆蓋,這一點(diǎn)要注意。

# BehaviorSubject會(huì)緩存上次發(fā)射的值,除非Observable已經(jīng)關(guān)閉
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4

1.15、AsyncSubject

AsyncSubject是一個(gè)特殊的Subject,顧名思義它是一個(gè)異步的Subject,它只會(huì)在Observer完成的時(shí)候發(fā)射數(shù)據(jù),而且只會(huì)發(fā)射最后一個(gè)數(shù)據(jù)。因此下面的代碼僅僅會(huì)輸出4.假如注釋掉最后一行co_completed調(diào)用,那么什么也不會(huì)輸出。

# AsyncSubject會(huì)緩存上次發(fā)射的值,而且僅會(huì)在Observable關(guān)閉后開(kāi)始發(fā)射
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4

1.16、Scheduler

雖然RxPy算是異步的框架,但是其實(shí)它默認(rèn)還是運(yùn)行在單個(gè)線程之上的,因此如果使用了某些會(huì)阻礙線程運(yùn)行的操作,那么程序就會(huì)卡死。當(dāng)然針對(duì)這些情況,我們就可以使用其他的Scheduler來(lái)調(diào)度任務(wù),保證程序能夠高效運(yùn)行。

下面的例子創(chuàng)建了一個(gè)ThreadPoolScheduler,它是基于線程池的調(diào)度器。兩個(gè)Observable用subscribe_on方法指定了調(diào)度器,因此它們會(huì)使用不同的線程來(lái)工作。

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))

如果你觀察過(guò)各個(gè)操作符的API的話,可以發(fā)現(xiàn)大部分操作符都支持可選的Scheduler參數(shù),為操作符指定一個(gè)調(diào)度器。如果操作符上指定了調(diào)度器的話,會(huì)優(yōu)先使用這個(gè)調(diào)度器;其次的話,會(huì)使用subscribe方法上指定的調(diào)度器;如果以上都沒(méi)有指定的話,就會(huì)使用默認(rèn)的調(diào)度器。

二、應(yīng)用場(chǎng)景

好了,介紹了一些Reactive X的知識(shí)之后,下面來(lái)看看如何來(lái)使用Reactive X。在很多應(yīng)用場(chǎng)景下,都可以利用Reactive X來(lái)抽象數(shù)據(jù)處理,把概念簡(jiǎn)單化。

2.1、防止重復(fù)發(fā)送

很多情況下我們都需要控制事件的發(fā)生間隔,比如有一個(gè)按鈕不小心按了好幾次,只希望第一次按鈕生效。這種情況下可以使用debounce操作符,它會(huì)過(guò)濾Observable,小于指定時(shí)間間隔的數(shù)據(jù)會(huì)被過(guò)濾掉。debounce操作符會(huì)等待一段時(shí)間,直到過(guò)了間隔時(shí)間,才會(huì)發(fā)射最后一次的數(shù)據(jù)。如果想要過(guò)濾后面的數(shù)據(jù),發(fā)送第一次的數(shù)據(jù),則要使用throttle_first操作符。

下面的代碼可以比較好的演示這個(gè)操作符,快速按回車(chē)鍵發(fā)送數(shù)據(jù),注意觀察按鍵和數(shù)據(jù)顯示之間的關(guān)系,還可以把throttle_first操作符換成debounce操作符,然后再看看輸出會(huì)發(fā)生什么變化,還可以完全注釋掉pipe中的操作符,再看看輸出會(huì)有什么變化。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce操作符,僅在時(shí)間間隔之外的可以發(fā)射

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break

2.2、操作數(shù)據(jù)流

如果需要對(duì)一些數(shù)據(jù)進(jìn)行操作,那么同樣有一大堆操作符可以滿(mǎn)足需求。當(dāng)然這部分功能并不是Reactive X獨(dú)有的,如果你對(duì)Java 8的流類(lèi)庫(kù)有所了解,會(huì)發(fā)現(xiàn)這兩者這方面的功能幾乎是完全一樣的。

下面是個(gè)簡(jiǎn)單的例子,將兩個(gè)數(shù)據(jù)源結(jié)合起來(lái),然后找出來(lái)其中所有的偶數(shù)。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# 操作數(shù)據(jù)流
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))

再或者一個(gè)利用reduce的簡(jiǎn)單例子,求1-100的整數(shù)和。

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))

感謝各位的閱讀!關(guān)于“Python響應(yīng)式類(lèi)庫(kù)RxPy的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI