溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Project?Reactor響應(yīng)式編程是什么

發(fā)布時間:2023-04-04 10:35:48 來源:億速云 閱讀:103 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了Project Reactor響應(yīng)式編程是什么的相關(guān)知識,內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Project Reactor響應(yīng)式編程是什么文章都會有所收獲,下面我們一起來看看吧。

什么是響應(yīng)式編程?

響應(yīng)式編程是一種編程范式,它關(guān)注數(shù)據(jù)的變化和傳播,而不是控制流。響應(yīng)式編程可以提高程序的性能、彈性和可伸縮性,使程序能夠及時響應(yīng)用戶的需求和環(huán)境的變化。在本文中,我們將介紹Java中的響應(yīng)式編程的基本概念、原理和實踐。

響應(yīng)式編程的核心思想是將數(shù)據(jù)和行為抽象為流(Stream),流可以表示任何異步的事件或值,比如用戶輸入、網(wǎng)絡(luò)請求、數(shù)據(jù)庫查詢等。流可以被觀察(Observable),也就是說,可以有一個或多個觀察者(Observer)訂閱流,并在流發(fā)生變化時接收通知。流還可以被操作(Operator),也就是說,可以對流進行各種轉(zhuǎn)換、過濾、組合等操作,從而生成新的流。

響應(yīng)式編程的優(yōu)勢:

  • 可以將復(fù)雜的異步邏輯簡化為聲明式的數(shù)據(jù)流操作,避免了回調(diào)地獄(Callback Hell)、阻塞線程和競態(tài)條件等問題。

  • 可以提高程序的性能和資源利用率,通過減少線程的上下文切換和阻塞,以及利用反應(yīng)流的背壓(Backpressure)機制來控制數(shù)據(jù)流的速度,也就是說,可以讓下游的觀察者控制上游的數(shù)據(jù)源的發(fā)送速率,從而防止數(shù)據(jù)溢出或浪費。

  • 可以提高程序的表達力和靈活性,通過使用函數(shù)式編程的風(fēng)格和操作符來組合和轉(zhuǎn)換數(shù)據(jù)流,以及利用反應(yīng)式編程框架和庫來簡化異步和事件驅(qū)動編程的復(fù)雜度

響應(yīng)式編程的缺點:

  • 降低程序的可讀性和維護性,通過使用嵌套、回調(diào)、訂閱等方式來處理異步事件,以及使用反應(yīng)流的操作符來處理數(shù)據(jù)流,可能導(dǎo)致代碼難以理解和調(diào)試。

Java中的響應(yīng)式編程

Java中有多種框架和庫可以實現(xiàn)響應(yīng)式編程,比如RxJava、Spring Reactor、Vert.x等。這些框架和庫都遵循了Reactive Streams規(guī)范,這是一套定義了非阻塞背壓的異步流處理標(biāo)準(zhǔn)的接口。

Reactive Streams規(guī)范主要包括四個接口:

  • Publisher:發(fā)布者,表示一個數(shù)據(jù)源,可以發(fā)出零個或多個數(shù)據(jù),并通知訂閱者完成或出錯。

  • Subscriber:訂閱者,表示一個數(shù)據(jù)消費者,可以訂閱一個發(fā)布者,并在收到數(shù)據(jù)、完成或出錯時做出相應(yīng)的動作。

  • Subscription:訂閱,表示發(fā)布者和訂閱者之間的關(guān)系,可以用來請求或取消數(shù)據(jù)。

  • Processor:處理器,表示一個既是發(fā)布者又是訂閱者的中間組件,可以對數(shù)據(jù)進行處理或轉(zhuǎn)換。

Project Reactor

Project Reactor是一個完全非阻塞的包含背壓支持的響應(yīng)式編程基石。它是Spring生態(tài)系統(tǒng)中Spring Reactive的基礎(chǔ),被用于如Spring WebFlux, Spring Data和Spring Cloud Gateway等項目中。

基本概念

Project Reactor的核心思想是將數(shù)據(jù)和事件看作是流(stream),流可以被創(chuàng)建,轉(zhuǎn)換,過濾,合并,分組,緩沖,錯誤處理等等。流是惰性的,只有當(dāng)有訂閱者(subscriber)訂閱時才會開始發(fā)射數(shù)據(jù)或事件。流可以是有限的,也可以是無限的,可以是同步的,也可以是異步的,可以是單線程的,也可以是多線程的。流還可以支持背壓(backpressure),即訂閱者可以控制流的速度,避免被過多的數(shù)據(jù)或事件淹沒。

核心組件

Project Reactor提供了兩個主要的接口來表示流:

  • Flux: 表示一個包含0到N個元素的流

  • Mono: 表示一個包含0到1個元素的流。

它們都是Publisher<T>的實現(xiàn),可以發(fā)出0-N個元素的異步序列,并根據(jù)訂閱者的需求推送元素。

Flux表示的是包含0到N個元素的異步序列,可以被onComplete信號或者onError信號所終止。

Mono表示的是包含0或1個元素的異步序列,也可以被onComplete信號或者onError信號所終止。

Flux和Mono之間可以進行轉(zhuǎn)換。

代碼示例

Mono

// 創(chuàng)建一個Mono對象,包含一個字符串元素
Mono<String> mono = Mono.just("Hello World");

// 訂閱這個Mono對象,并打印元素值
mono.subscribe(System.out::println);

使用Mono.just方法創(chuàng)建了一個包含一個字符串元素的Mono對象,然后使用subscribe方法訂閱了這個對象,并提供了一個回調(diào)函數(shù)來打印元素值。當(dāng)Mono對象發(fā)出元素值時,回調(diào)函數(shù)就會被調(diào)用。

Mono to Flux

把Mono轉(zhuǎn)換成Flux的一種方法是使用flux()方法,它會返回一個包含Mono發(fā)出的元素的Flux,或者如果Mono為空,則返回一個空的Flux。例如:

// 創(chuàng)建一個Mono對象,包含一個整數(shù)元素
Mono<Integer> mono = Mono.just(1);

// 使用flux()方法把Mono轉(zhuǎn)換成Flux
Flux<Integer> flux = mono.flux();

// 訂閱這個Flux對象,并打印元素值
flux.subscribe(System.out::println); // 輸出1

另一種方法是使用concatWith()方法,它會將Mono與另一個Publisher連接起來,形成一個Flux。例如:

// 創(chuàng)建一個Mono對象,包含一個整數(shù)元素
Mono<Integer> mono = Mono.just(1);

// 創(chuàng)建一個Flux對象,包含兩個整數(shù)元素
Flux<Integer> flux = Flux.just(2, 3);

// 使用concatWith()方法把Mono和Flux連接起來
Flux<Integer> result = mono.concatWith(flux);

// 訂閱這個Flux對象,并打印元素值
result.subscribe(System.out::println); // 輸出1, 2, 3

Mono常用的操作

// 從一個固定的值創(chuàng)建一個Mono
Mono.just("Hello").subscribe(System.out::println); // 輸出Hello

// 從一個Callable對象創(chuàng)建一個Mono
Callable<String> callable = () -> "World";
Mono.fromCallable(callable).subscribe(System.out::println); // 輸出World

// 從一個Supplier對象創(chuàng)建一個Mono
Supplier<String> supplier = () -> "Supplier!";
Mono.fromSupplier(supplier).subscribe(System.out::println); // 輸出Supplier!

// 對Mono發(fā)出的元素進行映射操作
Mono.just("Hello").map(s -> s + " World").subscribe(System.out::println); // 輸出Hello World

// 對Mono發(fā)出的元素進行扁平化操作
Mono.just("Hello")
    .flatMap(s -> Mono.just(s + " World"))
    .subscribe(System.out::println); // 輸出Hello World

// 對Mono發(fā)出的元素進行過濾操作
Mono.just(1).filter(i -> i > 0).subscribe(System.out::println); // 輸出1

// 將多個Mono合并為一個Mono
Mono.zip(Mono.just("Hello"), Mono.just("World"))
    .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 輸出Hello World

// 將多個Mono合并為一個Flux
Mono.just("Hello")
    .mergeWith(Mono.just("World"))
    .subscribe(System.out::println); // 輸出Hello World

// 在這個Mono完成后,繼續(xù)處理另一個發(fā)布者
Mono.just("Hello").then(Mono.just("World")).subscribe(System.out::println); // 輸出World

// 在這個Mono發(fā)出元素時,執(zhí)行一個副作用操作
Mono.just("Hello")
    .doOnNext(s -> System.out.println("Before: " + s))
    .map(s -> s + " World")
    .doOnNext(s -> System.out.println("After: " + s))
    .subscribe(); 
// 輸出
// Before: Hello 
// After: Hello World

Flux

// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 訂閱這個Flux對象,并打印元素值
flux.subscribe(System.out::println);

使用Flux.just方法創(chuàng)建了一個包含三個整數(shù)元素的Flux對象,然后使用subscribe方法訂閱了這個對象,并提供了一個回調(diào)函數(shù)來打印元素值。當(dāng)Flux對象發(fā)出元素值時,回調(diào)函數(shù)就會被調(diào)用。

Flux to Mono

把Flux轉(zhuǎn)換成Mono的一種方法是使用next()方法,它會返回Flux發(fā)出的第一個元素,或者如果Flux為空,則返回一個空的Mono。

例如:

// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 使用next()方法把Flux轉(zhuǎn)換成Mono
Mono<Integer> mono = flux.next();

// 訂閱這個Mono對象,并打印元素值
mono.subscribe(System.out::println); // 輸出1

另一種方法是使用collectList()方法,它會把Flux發(fā)出的所有元素收集到一個列表中,并返回一個包含這個列表的Mono。

例如:

// 創(chuàng)建一個Flux對象,包含三個整數(shù)元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 使用collectList()方法把Flux轉(zhuǎn)換成Mono
Mono<List<Integer>> mono = flux.collectList();

// 訂閱這個Mono對象,并打印元素值
mono.subscribe(System.out::println); // 輸出[1, 2, 3]

Flux常用的操作

// 從多個固定的值創(chuàng)建一個Flux
Flux.just("Hello", "World").subscribe(System.out::println); // 輸出Hello World

// 從一個數(shù)組對象創(chuàng)建一個Flux
String[] array = {"Hello", "World"};
Flux.fromArray(array).subscribe(System.out::println); // 輸出Hello World

// 從一個Iterable對象創(chuàng)建一個Flux
List<String> list = Arrays.asList("Hello", "World");
Flux.fromIterable(list).subscribe(System.out::println); // 輸出Hello World

// 從一個Stream對象創(chuàng)建一個Flux
Stream<String> stream = Stream.of("Hello", "World");
Flux.fromStream(stream).subscribe(System.out::println); // 輸出Hello World

// 創(chuàng)建一個包含指定范圍內(nèi)整數(shù)的Flux
Flux.range(1, 5).subscribe(System.out::println); // 輸出1 2 3 4 5

// 創(chuàng)建一個按照指定時間間隔從0整數(shù)遞增的Flux
Duration duration = Duration.ofSeconds(1);
Flux<Long> interval = Flux.interval(duration);
interval.subscribe(System.out::println);
// 使用blockLast阻塞主線程,防止程序立即退出
interval.blockLast();
// 輸出結(jié)果每秒打印一次
// 0
// 1
// 2
// 3
// 4
// ...

// 對Flux發(fā)出的每個元素進行映射操作
Flux.just("Hello", "World").map(s -> s + "!")
    .subscribe(System.out::println); // 輸出Hello! World!

// 對Flux發(fā)出的每個元素進行扁平化操作
Flux.just("Hello", "World")
    .flatMap(s -> Flux.just(s + "!"))
    .subscribe(System.out::println); //輸出Hello! World!

// 對Flux發(fā)出的每個元素進行過濾操作
Flux.range(1, 5).filter(i -> i % 2 == 0).subscribe(System.out::println); 
// 輸出2 4

// 將多個Flux合并為一個Flux
Flux.zip(Flux.just("Hello"), Flux.just("World"))
    .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); 
// 輸出Hello World

// 將多個Flux合并為一個Flux
Flux.just("Hello").mergeWith(Flux.just("World")).subscribe(System.out::println); 
//  輸出Hello World

// 將多個Flux合并為一個Flux
Flux.just("Hello").concatWith(Flux.just("World")).subscribe(System.out::println); 
// 輸出Hello World

// 將所有元素收集到一個List中
Flux.just("Hello", "World").collectList().subscribe(list -> System.out.println(list)); // 輸出[Hello, World]

Flux的zip、mergeWith、concatWith區(qū)別

zip、mergeWith和concatWith都是用來將多個Flux合并為一個Flux的操作,但是它們有一些區(qū)別:

zip會將多個Flux的元素按照一對一的方式進行合并,形成一個包含元組的Flux,每個元組中包含了每個源Flux的一個元素。如果源Flux的元素個數(shù)不一致,那么zip會以最短的Flux為基準(zhǔn),多余的元素會被丟棄。

  Flux<String> flux1 = Flux.just("A", "B", "C");
  Flux<Integer> flux2 = Flux.just(1, 2, 3, 4);
  Flux<Tuple2<String, Integer>> flux3 = Flux.zip(flux1, flux2);
  flux3.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); 
  // 輸出A 1 B 2 C 3
  // 4不會輸出,因為最短的Flux是flux1,長度是3

mergeWith會將多個Flux的元素按照時間順序進行合并,形成一個包含所有元素的Flux。如果源Flux的元素有重疊,那么mergeWith會保留所有的元素。

  Duration duration1 = Duration.ofMillis(100);
  Duration duration2 = Duration.ofMillis(200);
  Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i);
  Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i);
  Flux<String> flux = flux1.mergeWith(flux2);
  flux.subscribe(System.out::println); 
  // 輸出A0 B0 A1 B1 A2 A3 B2 A4 B3 A5 ...

concatWith會將多個Flux的元素按照訂閱順序進行合并,形成一個包含所有元素的Flux。如果源Flux的元素有重疊,那么concatWith會保留所有的元素。concatWith會等待上一個源Flux完成后才訂閱下一個源Flux。

  Duration duration1 = Duration.ofMillis(100);
  Duration duration2 = Duration.ofMillis(200);
  // 每100ms遞增1,打印5次結(jié)束
  Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i).take(5);
  Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i).take(5);
  Flux<String> flux3 = flux1.concatWith(flux2);
  flux3.subscribe(System.out::println);
  // 避免程序立即退出
  flux3.blockLast();
  // 輸出 
  // A0
  // A1
  // A2
  // A3
  // A4
  // B0
  // B1
  // B2
  // B3
  // B4

可以看到,Project Reactor和Java 8 Stream的用法看起來很像,因為它們都提供了一些函數(shù)式編程的方法,用來對數(shù)據(jù)流進行操作,例如map、filter、reduce等。但是它們的本質(zhì)是不同的,主要有以下幾個區(qū)別:

  • Project Reactor是基于Reactive Streams規(guī)范的一個實現(xiàn),它支持異步、非阻塞、反應(yīng)式的編程模式,而Java 8 Stream是基于集合類的一個擴展,它支持同步、阻塞、命令式的編程模式。

  • Project Reactor是基于Push模式的,它可以讓數(shù)據(jù)源主動推送數(shù)據(jù)給訂閱者,并且支持背壓機制,讓訂閱者可以控制數(shù)據(jù)的流速,而Java 8 Stream是基于Pull模式的,它需要訂閱者主動拉取數(shù)據(jù)源的數(shù)據(jù),并且沒有背壓機制,可能會導(dǎo)致內(nèi)存溢出或者性能下降。

  • Project Reactor可以處理無限流或者有限流,它可以通過短路操作來終止無限流,而Java 8 Stream只能處理有限流,它不能處理無限流或者異步流。

  • Project Reactor可以在多線程或者單線程環(huán)境下運行,它可以通過parallel或者sequential方法來切換并行或者串行模式,而Java 8 Stream只能在單線程環(huán)境下運行,它只能通過parallelStream方法來創(chuàng)建并行流。

關(guān)于“Project Reactor響應(yīng)式編程是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“Project Reactor響應(yīng)式編程是什么”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI