您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)java中的響應(yīng)式編程,文章內(nèi)容質(zhì)量較高,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
響應(yīng)式編程
作為響應(yīng)式編程方向上的第一步,微軟在.NET生態(tài)系統(tǒng)中創(chuàng)建了Rx庫(Reactive Extensions)。RxJava是在JVM上對它的實現(xiàn)。
響應(yīng)式編程是一個異步編程范式,通常出現(xiàn)在面向?qū)ο蟮恼Z言中,作為觀察者模式的一個擴展。
它關(guān)注數(shù)據(jù)的流動、變化的傳播。這意味著可以輕易地使用編程語言表示靜態(tài)(如數(shù)組)或動態(tài)(如事件發(fā)射源)數(shù)據(jù)流。
響應(yīng)式流
隨著時間的推移,一個專門為Java的標(biāo)準(zhǔn)化出現(xiàn)了。它是一個規(guī)范,定義了一些接口和交互規(guī)則,用于JVM平臺上的響應(yīng)式庫。它就是響應(yīng)式流(Reactive Streams),它的這些接口已經(jīng)被集成到Java 9里,在java.util.concurrent.Flow這個父類里。響應(yīng)式流和迭代器較相似,不過迭代器是基于“拉”(pull)的,而響應(yīng)式流是基于“推”(push)的。迭代器的使用其實是命令式編程,因為由開發(fā)者決定什么時候調(diào)用next()獲取下一個元素。在響應(yīng)式流中,與上面等價的是發(fā)布者-訂閱者。但當(dāng)有新的可用元素時,是由發(fā)布者推給訂閱者的。這個“推”就是響應(yīng)式的關(guān)鍵所在。
另外,對被推過來元素的操作也是以聲明的方式進行的,程序員只需表達(dá)做什么就行了,不需要管怎么做。
發(fā)布者使用onNext方法向訂閱者推送新元素,使用onError方法告知一個錯誤,使用onComplete方法告知已經(jīng)結(jié)束。
可見,錯誤處理和完成(結(jié)束)也是以一個良好的方式被處理。錯誤和結(jié)束都可以終止序列。
這種方式非常靈活。這種模式支持0個(沒有)元素/1個元素/n(多)個元素(包括無限序列,如果滴答的鐘表)這些情況。
Reactor粉墨登場
Reactor是第四代響應(yīng)式庫,是一個響應(yīng)式編程范式的實現(xiàn),用于在JVM平臺上基于響應(yīng)式流規(guī)范構(gòu)建非阻塞異步應(yīng)用。
它極大地實現(xiàn)了JVM上響應(yīng)式流的規(guī)范(http://www.reactive-streams.org/)。
它是一個完全非阻塞響應(yīng)式編程的基石,帶有高效需求管理(以管理“后壓”的形式)。
它直接集成Java函數(shù)式API,特別是CompletableFuture,Stream和Duration。
它支持使用reactor-netty工程實現(xiàn)非阻塞跨進程通信,適合微服務(wù)架構(gòu),支持HTTP(包括Websockets),TCP和UDP。
注:Reactor要求Java 8+
講了這么多,是不是要首先思考下,為什么我們需要這樣一個異步的響應(yīng)式庫?
阻塞就是浪費
現(xiàn)代的應(yīng)用能達(dá)到非常多的并發(fā)用戶,即使現(xiàn)代硬件的能力被持續(xù)改進,現(xiàn)代軟件的性能仍然是一個關(guān)鍵的關(guān)注點
大體上有兩種方式可以改進一個程序的性能:
1、并行化,使用更多的線程和更多的硬件資源
2、提高效率,在當(dāng)前資源用量的情況下尋求更高效率
通常,Java開發(fā)者使用阻塞代碼來寫程序。這種實踐性很好,直到遇到性能瓶頸。
此時會引入額外線程,運行相似的阻塞代碼。但是這種擴展方法在資源利用方面會引起爭論和導(dǎo)致并發(fā)問題。
更糟糕的是,阻塞浪費資源。如果你仔細(xì)看,一旦一個程序涉及到一些延遲(特別是I/O,像數(shù)據(jù)庫請求或網(wǎng)絡(luò)調(diào)用),資源就被浪費,因為線程現(xiàn)在是空閑的,在等待數(shù)據(jù)。
所以并行化方式不是銀彈。我們有必要讓硬件發(fā)揮完全的力量,但是關(guān)于資源浪費的影響和原因也是非常復(fù)雜的。
異步性來營救
前面提到的第二種方式是尋求更高效率,可以作為資源浪費問題的一個解決方案。
通過寫異步非阻塞代碼,你能讓執(zhí)行切換到其它活動的任務(wù),使用相同的底層資源,稍后再回到當(dāng)前的處理上。
但是如何產(chǎn)生異步代碼到JVM上呢?Java提供兩種異步編程模型:
1、Callbacks,異步方法沒有返回值,但是會帶一個回調(diào),當(dāng)結(jié)果可用時回調(diào)會被調(diào)用。
2、Futures,異步方法立即返回一個Future<T>,異步處理過程就是計算一個T值,使用Future對象包裝了對它的訪問。這個值不是立即可用的,該對象可以被輪詢來查看T值是否可用。
這兩種技術(shù)都足夠好嗎?并不是對每種情況都是的,兩種方式都有局限性。
回調(diào)比較難于組合在一起,很快就會導(dǎo)致代碼難以閱讀和維護(眾所周知的“回調(diào)地獄”)。
與回調(diào)相比,F(xiàn)utures稍微好一點,但是仍然在組合方面做得不好。組合多個Futures對象到一起是可行的但是并不容易。
Future
也有其它問題,很容易因為調(diào)用了get()方法造成了另一個阻塞。
另外,它不支持延遲計算,缺乏對多個值的支持,缺乏高級錯誤處理。
從命令式到響應(yīng)式編程
像Reactor這樣的響應(yīng)式庫的目標(biāo)就是解決在JVM上“傳統(tǒng)”異步方式的弊端,同時也關(guān)注一些額外方面:
可組合性和可讀性。
數(shù)據(jù)作為流,被豐富的操作符操作,什么都不會發(fā)生,直到你訂閱,后壓,消費者通知生產(chǎn)者發(fā)射的速率太快了,高級別而不是高數(shù)值抽象。
可組合性和可讀性
可組合性,其實就是編排多個異步任務(wù)的能力,使前一個任務(wù)的結(jié)果作為后續(xù)任務(wù)的輸入,或以fork-join(分叉-合并)的方式執(zhí)行若干個任務(wù),或在更高的級別重復(fù)利用這些異步任務(wù)。
任務(wù)編排的能力和代碼的可讀性和可維護性緊密地耦合在一起。隨著異步處理在數(shù)量和復(fù)雜度上的增加,組合和閱讀代碼變得更加困難。
就像我們看到的,回調(diào)模型雖然簡單,但是當(dāng)回調(diào)里嵌套回調(diào),達(dá)到多層時就會變成回調(diào)地獄。
Reactor提供豐富的組合選項,使嵌套級別最小,讓代碼的組織結(jié)構(gòu)能反映出在進行什么樣的抽象處理,且通常保持在同級別上。
裝配線類比
你可以認(rèn)為響應(yīng)式應(yīng)用處理數(shù)據(jù)就像通過一個裝配(生產(chǎn))線。Reactor既是傳送帶又是工作站。
原材料從一個源(原始發(fā)布者)持續(xù)不斷地獲取,以一個完成的產(chǎn)品被推送給消費者(訂閱者)結(jié)束。
原材料可以經(jīng)過許多不同的轉(zhuǎn)換,如其它的中間步驟,或者是一個更大裝配線的一部分。
如果在某個地方出現(xiàn)一個小故障或阻塞了,出問題的工作站可以向上游發(fā)出通知來限制原材料的流動(速率)。
操作符
在Reactor里,操作符就是裝配線類比中的工作站。每一個操作符都向一個發(fā)布者添加某些行為,把上一步的發(fā)布者包裝到一個新的實例里。整個鏈就是這樣被鏈接起來的。
所以數(shù)據(jù)一開始從第一個發(fā)布者出來,然后沿著鏈往下游移動,且被每一個鏈接轉(zhuǎn)換。最后,一個訂閱者結(jié)束了這個處理。
響應(yīng)式流規(guī)范并沒有明確規(guī)定操作符,不過Reactor就提供了豐富的操作符,它們涉及到很多方面,從簡單的轉(zhuǎn)換、過濾到復(fù)雜的編排、錯誤處理。
只要不訂閱,就什么都不發(fā)生
當(dāng)你寫一個發(fā)布者鏈時,默認(rèn),數(shù)據(jù)是不會開始進入鏈中的。相反,你只是創(chuàng)建了異步處理的一個抽象描述。
通過訂閱這個行為(動作),才把發(fā)布者和訂閱者連接起來,然后才會觸發(fā)數(shù)據(jù)在鏈里流動。
這是在內(nèi)部實現(xiàn)好的,通過來自于訂閱者的request信號往上游傳播,一路逆流而上直到最開始的發(fā)布者那里。
Reactor核心特性
Reactor引入可組合響應(yīng)式的類型,實現(xiàn)了發(fā)布者接口,但也提供了豐富的操作符,就是Flux和Mono。
Flux
,流動,表示0到N個元素。
Mono
,單個,表示0或1個元素。
它們之間的不同主要在語義上,表示異步處理的粗略基數(shù)。
如一個http請求只會產(chǎn)生一個響應(yīng),把它表示為Mono<HttpResponse>顯然更有意義,且它只提供相對于0/1這樣上下文的操作符,因為此時count操作顯然沒有太大意義。
操作符可以改變處理的最大基數(shù),也會切換到相關(guān)類型上。如count操作符雖然存在于Flux<T>上,但它的返回值卻是一個Mono<Long>。
Flux<T>
一個Flux<T>是一個標(biāo)準(zhǔn)的Publisher<T>,表示一個異步序列,可以發(fā)射0到N個元素,可以通過一個完成信號或錯誤信號終止。
就像在響應(yīng)式流規(guī)范里那樣,這3種類型的信號轉(zhuǎn)化為對一個下游訂閱者的onNext,onComplete,onError3個方法的調(diào)用。
這3個方法也可以理解為事件/回調(diào),且它們都是可選的。
如沒有onNext但有onComplete,表示一個空的有限序列。既沒有onNext也沒有onComplete,表示一個空的無限序列(沒有什么實際用途,可用于測試)。
無限序列也沒有必要是空的,如Flux.interval(Duration)產(chǎn)生一個Flux<Long> ,它是無限的,從鐘表里發(fā)射出的規(guī)則的“嘀嗒”。
Mono<T>
一個Mono<T>是一個特殊的Publisher<T>,最多發(fā)射一個元素,可以使用onComplete信號或onError信號來終止。
它提供的操作符只是Flux提供的一個子集,同樣,一些操作符(如把Mono和Publisher結(jié)合起來)可以把它切換到一個Flux。
如Mono#concatWith(Publisher)返回一個Flux,然而Mono#then(Mono)返回的是另一個Mono。
Mono可以用于表示沒有返回值的異步處理(與Runnable相似),用Mono<Void>表示。
創(chuàng)建Flux或Mono,并訂閱它們
最容易的方式就是使用它們各自的工廠方法:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo");
當(dāng)談到訂閱時,可以使用Java 8的lambda表達(dá)式,訂閱方法有多種不同的變體,帶有不同的回調(diào)。
下面是方法簽名:
//訂閱并觸發(fā)序列 subscribe(); //可以對每一個產(chǎn)生的值進行處理 subscribe(Consumer<? super T> consumer); //還可以響應(yīng)一個錯誤 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); //還可以在成功結(jié)束后執(zhí)行一些代碼 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); //還可以對Subscription執(zhí)行一些操作 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
使用Disposable取消訂閱
這些基于lambda的訂閱方法都返回一個Disposable類型,通過調(diào)用它的dispose()來取消這個訂閱。
對于Flux和Mono,取消就是一個信號,表明源應(yīng)該停止生產(chǎn)元素。然而,不保證立即生效,一些源可能生產(chǎn)元素非常快,以致于還沒有收到取消信號就已經(jīng)生產(chǎn)完了。
以上就是java中的響應(yīng)式編程介紹,看完之后是否有所收獲呢?如果想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊,感謝各位的閱讀。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。