您好,登錄后才能下訂單哦!
這篇文章主要介紹了Project Reactor響應(yīng)式編程是什么的相關(guān)知識,內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Project Reactor響應(yīng)式編程是什么文章都會有所收獲,下面我們一起來看看吧。
響應(yīng)式編程是一種編程范式,它關(guān)注數(shù)據(jù)的變化和傳播,而不是控制流。響應(yīng)式編程可以提高程序的性能、彈性和可伸縮性,使程序能夠及時響應(yīng)用戶的需求和環(huán)境的變化。在本文中,我們將介紹Java中的響應(yīng)式編程的基本概念、原理和實踐。
響應(yīng)式編程的核心思想是將數(shù)據(jù)和行為抽象為流(Stream),流可以表示任何異步的事件或值,比如用戶輸入、網(wǎng)絡(luò)請求、數(shù)據(jù)庫查詢等。流可以被觀察(Observable),也就是說,可以有一個或多個觀察者(Observer)訂閱流,并在流發(fā)生變化時接收通知。流還可以被操作(Operator),也就是說,可以對流進行各種轉(zhuǎn)換、過濾、組合等操作,從而生成新的流。
響應(yīng)式編程的優(yōu)勢:
可以將復(fù)雜的異步邏輯簡化為聲明式的數(shù)據(jù)流操作,避免了回調(diào)地獄(Callback Hell)、阻塞線程和競態(tài)條件等問題。
可以提高程序的性能和資源利用率,通過減少線程的上下文切換和阻塞,以及利用反應(yīng)流的背壓(Backpressure)機制來控制數(shù)據(jù)流的速度,也就是說,可以讓下游的觀察者控制上游的數(shù)據(jù)源的發(fā)送速率,從而防止數(shù)據(jù)溢出或浪費。
可以提高程序的表達力和靈活性,通過使用函數(shù)式編程的風(fēng)格和操作符來組合和轉(zhuǎn)換數(shù)據(jù)流,以及利用反應(yīng)式編程框架和庫來簡化異步和事件驅(qū)動編程的復(fù)雜度
響應(yīng)式編程的缺點:
降低程序的可讀性和維護性,通過使用嵌套、回調(diào)、訂閱等方式來處理異步事件,以及使用反應(yīng)流的操作符來處理數(shù)據(jù)流,可能導(dǎo)致代碼難以理解和調(diào)試。
Java中有多種框架和庫可以實現(xiàn)響應(yīng)式編程,比如RxJava、Spring Reactor、Vert.x等。這些框架和庫都遵循了Reactive Streams規(guī)范,這是一套定義了非阻塞背壓的異步流處理標(biāo)準(zhǔn)的接口。
Reactive Streams規(guī)范主要包括四個接口:
Publisher:發(fā)布者,表示一個數(shù)據(jù)源,可以發(fā)出零個或多個數(shù)據(jù),并通知訂閱者完成或出錯。
Subscriber:訂閱者,表示一個數(shù)據(jù)消費者,可以訂閱一個發(fā)布者,并在收到數(shù)據(jù)、完成或出錯時做出相應(yīng)的動作。
Subscription:訂閱,表示發(fā)布者和訂閱者之間的關(guān)系,可以用來請求或取消數(shù)據(jù)。
Processor:處理器,表示一個既是發(fā)布者又是訂閱者的中間組件,可以對數(shù)據(jù)進行處理或轉(zhuǎn)換。
Project Reactor是一個完全非阻塞的包含背壓支持的響應(yīng)式編程基石。它是Spring生態(tài)系統(tǒng)中Spring Reactive的基礎(chǔ),被用于如Spring WebFlux, Spring Data和Spring Cloud Gateway等項目中。
Project Reactor的核心思想是將數(shù)據(jù)和事件看作是流(stream),流可以被創(chuàng)建,轉(zhuǎn)換,過濾,合并,分組,緩沖,錯誤處理等等。流是惰性的,只有當(dāng)有訂閱者(subscriber)訂閱時才會開始發(fā)射數(shù)據(jù)或事件。流可以是有限的,也可以是無限的,可以是同步的,也可以是異步的,可以是單線程的,也可以是多線程的。流還可以支持背壓(backpressure),即訂閱者可以控制流的速度,避免被過多的數(shù)據(jù)或事件淹沒。
Project Reactor提供了兩個主要的接口來表示流:
Flux: 表示一個包含0到N個元素的流
Mono: 表示一個包含0到1個元素的流。
它們都是Publisher<T>的實現(xiàn),可以發(fā)出0-N個元素的異步序列,并根據(jù)訂閱者的需求推送元素。
Flux表示的是包含0到N個元素的異步序列,可以被onComplete信號或者onError信號所終止。
Mono表示的是包含0或1個元素的異步序列,也可以被onComplete信號或者onError信號所終止。
Flux和Mono之間可以進行轉(zhuǎn)換。
// 創(chuàng)建一個Mono對象,包含一個字符串元素 Mono<String> mono = Mono.just("Hello World"); // 訂閱這個Mono對象,并打印元素值 mono.subscribe(System.out::println);
使用Mono.just方法創(chuàng)建了一個包含一個字符串元素的Mono對象,然后使用subscribe方法訂閱了這個對象,并提供了一個回調(diào)函數(shù)來打印元素值。當(dāng)Mono對象發(fā)出元素值時,回調(diào)函數(shù)就會被調(diào)用。
Mono to Flux
把Mono轉(zhuǎn)換成Flux的一種方法是使用flux()方法,它會返回一個包含Mono發(fā)出的元素的Flux,或者如果Mono為空,則返回一個空的Flux。例如:
// 創(chuàng)建一個Mono對象,包含一個整數(shù)元素 Mono<Integer> mono = Mono.just(1); // 使用flux()方法把Mono轉(zhuǎn)換成Flux Flux<Integer> flux = mono.flux(); // 訂閱這個Flux對象,并打印元素值 flux.subscribe(System.out::println); // 輸出1
另一種方法是使用concatWith()方法,它會將Mono與另一個Publisher連接起來,形成一個Flux。例如:
// 創(chuàng)建一個Mono對象,包含一個整數(shù)元素 Mono<Integer> mono = Mono.just(1); // 創(chuàng)建一個Flux對象,包含兩個整數(shù)元素 Flux<Integer> flux = Flux.just(2, 3); // 使用concatWith()方法把Mono和Flux連接起來 Flux<Integer> result = mono.concatWith(flux); // 訂閱這個Flux對象,并打印元素值 result.subscribe(System.out::println); // 輸出1, 2, 3
Mono常用的操作
// 從一個固定的值創(chuàng)建一個Mono Mono.just("Hello").subscribe(System.out::println); // 輸出Hello // 從一個Callable對象創(chuàng)建一個Mono Callable<String> callable = () -> "World"; Mono.fromCallable(callable).subscribe(System.out::println); // 輸出World // 從一個Supplier對象創(chuàng)建一個Mono Supplier<String> supplier = () -> "Supplier!"; Mono.fromSupplier(supplier).subscribe(System.out::println); // 輸出Supplier! // 對Mono發(fā)出的元素進行映射操作 Mono.just("Hello").map(s -> s + " World").subscribe(System.out::println); // 輸出Hello World // 對Mono發(fā)出的元素進行扁平化操作 Mono.just("Hello") .flatMap(s -> Mono.just(s + " World")) .subscribe(System.out::println); // 輸出Hello World // 對Mono發(fā)出的元素進行過濾操作 Mono.just(1).filter(i -> i > 0).subscribe(System.out::println); // 輸出1 // 將多個Mono合并為一個Mono Mono.zip(Mono.just("Hello"), Mono.just("World")) .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 輸出Hello World // 將多個Mono合并為一個Flux Mono.just("Hello") .mergeWith(Mono.just("World")) .subscribe(System.out::println); // 輸出Hello World // 在這個Mono完成后,繼續(xù)處理另一個發(fā)布者 Mono.just("Hello").then(Mono.just("World")).subscribe(System.out::println); // 輸出World // 在這個Mono發(fā)出元素時,執(zhí)行一個副作用操作 Mono.just("Hello") .doOnNext(s -> System.out.println("Before: " + s)) .map(s -> s + " World") .doOnNext(s -> System.out.println("After: " + s)) .subscribe(); // 輸出 // Before: Hello // After: Hello World
// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素 Flux<Integer> flux = Flux.just(1, 2, 3); // 訂閱這個Flux對象,并打印元素值 flux.subscribe(System.out::println);
使用Flux.just方法創(chuàng)建了一個包含三個整數(shù)元素的Flux對象,然后使用subscribe方法訂閱了這個對象,并提供了一個回調(diào)函數(shù)來打印元素值。當(dāng)Flux對象發(fā)出元素值時,回調(diào)函數(shù)就會被調(diào)用。
Flux to Mono
把Flux轉(zhuǎn)換成Mono的一種方法是使用next()方法,它會返回Flux發(fā)出的第一個元素,或者如果Flux為空,則返回一個空的Mono。
例如:
// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素 Flux<Integer> flux = Flux.just(1, 2, 3); // 使用next()方法把Flux轉(zhuǎn)換成Mono Mono<Integer> mono = flux.next(); // 訂閱這個Mono對象,并打印元素值 mono.subscribe(System.out::println); // 輸出1
另一種方法是使用collectList()方法,它會把Flux發(fā)出的所有元素收集到一個列表中,并返回一個包含這個列表的Mono。
例如:
// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素 Flux<Integer> flux = Flux.just(1, 2, 3); // 使用collectList()方法把Flux轉(zhuǎn)換成Mono Mono<List<Integer>> mono = flux.collectList(); // 訂閱這個Mono對象,并打印元素值 mono.subscribe(System.out::println); // 輸出[1, 2, 3]
Flux常用的操作
// 從多個固定的值創(chuàng)建一個Flux Flux.just("Hello", "World").subscribe(System.out::println); // 輸出Hello World // 從一個數(shù)組對象創(chuàng)建一個Flux String[] array = {"Hello", "World"}; Flux.fromArray(array).subscribe(System.out::println); // 輸出Hello World // 從一個Iterable對象創(chuàng)建一個Flux List<String> list = Arrays.asList("Hello", "World"); Flux.fromIterable(list).subscribe(System.out::println); // 輸出Hello World // 從一個Stream對象創(chuàng)建一個Flux Stream<String> stream = Stream.of("Hello", "World"); Flux.fromStream(stream).subscribe(System.out::println); // 輸出Hello World // 創(chuàng)建一個包含指定范圍內(nèi)整數(shù)的Flux Flux.range(1, 5).subscribe(System.out::println); // 輸出1 2 3 4 5 // 創(chuàng)建一個按照指定時間間隔從0整數(shù)遞增的Flux Duration duration = Duration.ofSeconds(1); Flux<Long> interval = Flux.interval(duration); interval.subscribe(System.out::println); // 使用blockLast阻塞主線程,防止程序立即退出 interval.blockLast(); // 輸出結(jié)果每秒打印一次 // 0 // 1 // 2 // 3 // 4 // ... // 對Flux發(fā)出的每個元素進行映射操作 Flux.just("Hello", "World").map(s -> s + "!") .subscribe(System.out::println); // 輸出Hello! World! // 對Flux發(fā)出的每個元素進行扁平化操作 Flux.just("Hello", "World") .flatMap(s -> Flux.just(s + "!")) .subscribe(System.out::println); //輸出Hello! World! // 對Flux發(fā)出的每個元素進行過濾操作 Flux.range(1, 5).filter(i -> i % 2 == 0).subscribe(System.out::println); // 輸出2 4 // 將多個Flux合并為一個Flux Flux.zip(Flux.just("Hello"), Flux.just("World")) .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 輸出Hello World // 將多個Flux合并為一個Flux Flux.just("Hello").mergeWith(Flux.just("World")).subscribe(System.out::println); // 輸出Hello World // 將多個Flux合并為一個Flux Flux.just("Hello").concatWith(Flux.just("World")).subscribe(System.out::println); // 輸出Hello World // 將所有元素收集到一個List中 Flux.just("Hello", "World").collectList().subscribe(list -> System.out.println(list)); // 輸出[Hello, World]
Flux的zip、mergeWith、concatWith區(qū)別
zip、mergeWith和concatWith都是用來將多個Flux合并為一個Flux的操作,但是它們有一些區(qū)別:
zip會將多個Flux的元素按照一對一的方式進行合并,形成一個包含元組的Flux,每個元組中包含了每個源Flux的一個元素。如果源Flux的元素個數(shù)不一致,那么zip會以最短的Flux為基準(zhǔn),多余的元素會被丟棄。
Flux<String> flux1 = Flux.just("A", "B", "C"); Flux<Integer> flux2 = Flux.just(1, 2, 3, 4); Flux<Tuple2<String, Integer>> flux3 = Flux.zip(flux1, flux2); flux3.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 輸出A 1 B 2 C 3 // 4不會輸出,因為最短的Flux是flux1,長度是3
mergeWith會將多個Flux的元素按照時間順序進行合并,形成一個包含所有元素的Flux。如果源Flux的元素有重疊,那么mergeWith會保留所有的元素。
Duration duration1 = Duration.ofMillis(100); Duration duration2 = Duration.ofMillis(200); Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i); Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i); Flux<String> flux = flux1.mergeWith(flux2); flux.subscribe(System.out::println); // 輸出A0 B0 A1 B1 A2 A3 B2 A4 B3 A5 ...
concatWith會將多個Flux的元素按照訂閱順序進行合并,形成一個包含所有元素的Flux。如果源Flux的元素有重疊,那么concatWith會保留所有的元素。concatWith會等待上一個源Flux完成后才訂閱下一個源Flux。
Duration duration1 = Duration.ofMillis(100); Duration duration2 = Duration.ofMillis(200); // 每100ms遞增1,打印5次結(jié)束 Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i).take(5); Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i).take(5); Flux<String> flux3 = flux1.concatWith(flux2); flux3.subscribe(System.out::println); // 避免程序立即退出 flux3.blockLast(); // 輸出 // A0 // A1 // A2 // A3 // A4 // B0 // B1 // B2 // B3 // B4
可以看到,Project Reactor和Java 8 Stream的用法看起來很像,因為它們都提供了一些函數(shù)式編程的方法,用來對數(shù)據(jù)流進行操作,例如map、filter、reduce等。但是它們的本質(zhì)是不同的,主要有以下幾個區(qū)別:
Project Reactor是基于Reactive Streams規(guī)范的一個實現(xiàn),它支持異步、非阻塞、反應(yīng)式的編程模式,而Java 8 Stream是基于集合類的一個擴展,它支持同步、阻塞、命令式的編程模式。
Project Reactor是基于Push模式的,它可以讓數(shù)據(jù)源主動推送數(shù)據(jù)給訂閱者,并且支持背壓機制,讓訂閱者可以控制數(shù)據(jù)的流速,而Java 8 Stream是基于Pull模式的,它需要訂閱者主動拉取數(shù)據(jù)源的數(shù)據(jù),并且沒有背壓機制,可能會導(dǎo)致內(nèi)存溢出或者性能下降。
Project Reactor可以處理無限流或者有限流,它可以通過短路操作來終止無限流,而Java 8 Stream只能處理有限流,它不能處理無限流或者異步流。
Project Reactor可以在多線程或者單線程環(huán)境下運行,它可以通過parallel或者sequential方法來切換并行或者串行模式,而Java 8 Stream只能在單線程環(huán)境下運行,它只能通過parallelStream方法來創(chuàng)建并行流。
關(guān)于“Project Reactor響應(yīng)式編程是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“Project Reactor響應(yīng)式編程是什么”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。