溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

java9新特性Reactive?Stream響應式編程API怎么用

發(fā)布時間:2022-03-15 16:55:32 來源:億速云 閱讀:258 作者:iii 欄目:開發(fā)技術

這篇文章主要介紹“java9新特性Reactive Stream響應式編程API怎么用”,在日常操作中,相信很多人在java9新特性Reactive Stream響應式編程API怎么用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”java9新特性Reactive Stream響應式編程API怎么用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

一、Java9 Reactive Stream API

Java 9提供了一組定義響應式流編程的接口。所有這些接口都作為靜態(tài)內(nèi)部接口定義在java.util.concurrent.Flow類里面。

java9新特性Reactive?Stream響應式編程API怎么用

下面是Java 響應式編程中的一些重要角色和概念,先簡單理解一下

發(fā)布者(Publisher)是潛在的無限數(shù)量的有序數(shù)據(jù)元素的生產(chǎn)者。 它根據(jù)收到的需求(subscription)向當前訂閱者發(fā)布一定數(shù)量的數(shù)據(jù)元素。

訂閱者(Subscriber)從發(fā)布者那里訂閱并接收數(shù)據(jù)元素。與發(fā)布者建立訂閱關系后,發(fā)布者向訂閱者發(fā)送訂閱令牌(subscription),訂閱者可以根據(jù)自己的處理能力請求發(fā)布者發(fā)布數(shù)據(jù)元素的數(shù)量。

訂閱令牌(subscription)表示訂閱者與發(fā)布者之間建立的訂閱關系。 當建立訂閱關系后,發(fā)布者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與發(fā)布者進行交互,例如請求數(shù)據(jù)元素的數(shù)量或取消訂閱。

二、Java響應式編程四大接口

2.1.Subscriber Interface(訂閱者訂閱接口)

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

onSubscribe:在發(fā)布者接受訂閱者的訂閱動作之后,發(fā)布任何的訂閱消息之前被調用。新創(chuàng)建的Subscription訂閱令牌對象通過此方法傳遞給訂閱者。

onNext:下一個待處理的數(shù)據(jù)項的處理函數(shù)

onError:在發(fā)布者或訂閱遇到不可恢復的錯誤時調用

onComplete:當沒有訂閱者調用(包括onNext()方法)發(fā)生時調用。

2.2.Subscription Interface (訂閱令牌接口)

訂閱令牌對象通過Subscriber.onSubscribe()方法傳遞

public static interface Subscription {    public void request(long n);    public void cancel();}

request(long n)是無阻塞背壓概念背后的關鍵方法。訂閱者使用它來請求n個以上的消費項目。這樣,訂閱者控制了它當前能夠接收多少個數(shù)據(jù)。cancel()由訂閱者主動來取消其訂閱,取消后將不會在接收到任何數(shù)據(jù)消息。

2.3.Publisher Interface(發(fā)布者接口)

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

調用該方法,建立訂閱者Subscriber與發(fā)布者Publisher之間的消息訂閱關系。

2.4.Processor Interface(處理器接口)

處理者Processor 可以同時充當訂閱者和發(fā)布者,起到轉換發(fā)布者&mdash;&mdash;訂閱者管道中的元素的作用。用于將發(fā)布者T類型的數(shù)據(jù)元素,接收并轉換為類型R的數(shù)據(jù)并發(fā)布。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

二、實戰(zhàn)案例

現(xiàn)在我們要去實現(xiàn)上面的四個接口來完成響應式編程

Subscription Interface訂閱令牌接口通常不需要我們自己編程去實現(xiàn),我們只需要在知道request()方法和cancle()方法含義即可。

Publisher Interface發(fā)布者接口,Java 9 已經(jīng)默認為我們提供了實現(xiàn)SubmissionPublisher,該實現(xiàn)類除了實現(xiàn)Publisher接口的方法外,提供了一個方法叫做submit()來完成消息數(shù)據(jù)的發(fā)送。

Subscriber Interface訂閱者接口,通常需要我們自己去實現(xiàn)。因為在數(shù)據(jù)訂閱接收之后,不同的業(yè)務有不同的處理邏輯。

Processor實際上是 Publisher Interface和Subscriber Interface的集合體,有需要數(shù)據(jù)類型轉換及數(shù)據(jù)處理的需求才去實現(xiàn)這個接口

下面的例子實現(xiàn)的式字符串的數(shù)據(jù)消息訂閱處理

實現(xiàn)訂閱者Subscriber Interface

import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<String> {
  private Flow.Subscription subscription;  //訂閱令牌
  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("訂閱關系建立onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }
  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // 一個消息處理完成之后,可以繼續(xù)調用subscription.request(n);向發(fā)布者要求數(shù)據(jù)發(fā)送
      //subscription.request(n);
  }
  @Override
  public void onError(Throwable throwable) {
      System.out.println("onError: " + throwable);
  }
  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}

SubmissionPublisher消息發(fā)布者

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   //建立訂閱關系,可以有多個訂閱者
      sb.submit("數(shù)據(jù) 1");  //發(fā)送消息1
      sb.submit("數(shù)據(jù) 2"); //發(fā)送消息2
      sb.submit("數(shù)據(jù) 3"); //發(fā)送消息3
      executor.shutdown();
  }
}

控制臺打印輸出結果

訂閱關系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 數(shù)據(jù) 1
item: 數(shù)據(jù) 2

請注意:即使發(fā)布者submit了3條數(shù)據(jù),MySubscriber也僅收到了2條數(shù)據(jù)進行了處理。是因為我們在MySubscriber#onSubscribe()方法中使用了subscription.request(2);。這就是“背壓”的響應式編程效果,我有能力處理多少數(shù)據(jù),就會通知消息發(fā)布者給多少數(shù)據(jù)。

到此,關于“java9新特性Reactive Stream響應式編程API怎么用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI