溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Reactive-MongoDB如何異步Java Driver

發(fā)布時(shí)間:2021-09-29 09:27:09 來源:億速云 閱讀:202 作者:柒染 欄目:大數(shù)據(jù)

Reactive-MongoDB如何異步Java Driver,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

Reactive-MongoDB如何異步Java Driver

 一、關(guān)于 異步驅(qū)動

從3.0 版本開始,MongoDB 開始提供異步方式的驅(qū)動(Java Async Driver),這為應(yīng)用提供了一種更高性能的選擇。

但實(shí)質(zhì)上,使用同步驅(qū)動(Java Sync  Driver)的項(xiàng)目也不在少數(shù),或許是因?yàn)橄热霝橹鞯脑?同步Driver的文檔說明更加的完善),又或者是為了兼容舊的 MongoDB 版本。

無論如何,由于 Reactive 的發(fā)展,未來使用異步驅(qū)動應(yīng)該是一個(gè)趨勢。

在使用 Async Driver 之前,需要對 Reactive 的概念有一些熟悉。

二、理解 Reactive (響應(yīng)式)

響應(yīng)式(Reactive)是一種異步的、面向數(shù)據(jù)流的開發(fā)方式,最早是來自于.NET 平臺上的 Reactive Extensions  庫,隨后被擴(kuò)展為各種編程語言的實(shí)現(xiàn)。

在著名的 Reactive Manifesto(響應(yīng)式宣言) 中,對 Reactive 定義了四個(gè)特征:

Reactive-MongoDB如何異步Java Driver

  • 及時(shí)響應(yīng)(Responsive):系統(tǒng)能及時(shí)的響應(yīng)請求。

  •  

  • 有韌性(Resilient):系統(tǒng)在出現(xiàn)異常時(shí)仍然可以響應(yīng),即支持容錯(cuò)。

  • 有彈性(Elastic):在不同的負(fù)載下,系統(tǒng)可彈性伸縮來保證運(yùn)行。

  • 消息驅(qū)動(Message Driven):不同組件之間使用異步消息傳遞來進(jìn)行交互,并確保松耦合及相互隔離。

在響應(yīng)式宣言的所定義的這些系統(tǒng)特征中,無一不與響應(yīng)式的流有若干的關(guān)系,于是乎就有了 2013年發(fā)起的 響應(yīng)式流規(guī)范(Reactive Stream  Specification)。

https://www.reactive-streams.org/

其中,對于響應(yīng)式流的處理環(huán)節(jié)又做了如下定義:

  • 具有處理無限數(shù)量的元素的能力,即允許流永不結(jié)束

  • 按序處理

  • 異步地傳遞元素

  • 實(shí)現(xiàn)非阻塞的負(fù)壓(back-pressure)

Java 平臺則是在 JDK 9 版本上發(fā)布了對 Reactive Streams 的支持。

下面介紹響應(yīng)式流的幾個(gè)關(guān)鍵接口:

  • Publisher

Publisher 是數(shù)據(jù)的發(fā)布者。Publisher 接口只有一個(gè)方法 subscribe,用于添加數(shù)據(jù)的訂閱者,也就是 Subscriber。

  • Subscriber

Subscriber 是數(shù)據(jù)的訂閱者。Subscriber 接口有4個(gè)方法,都是作為不同事件的處理器。在訂閱者成功訂閱到發(fā)布者之后,其  onSubscribe(Subscription s) 方法會被調(diào)用。

Subscription 表示的是當(dāng)前的訂閱關(guān)系。

當(dāng)訂閱成功后,可以使用 Subscription 的 request(long n) 方法來請求發(fā)布者發(fā)布 n  條數(shù)據(jù)。發(fā)布者可能產(chǎn)生3種不同的消息通知,分別對應(yīng) Subscriber 的另外3個(gè)回調(diào)方法。

數(shù)據(jù)通知:對應(yīng) onNext 方法,表示發(fā)布者產(chǎn)生的數(shù)據(jù)。

錯(cuò)誤通知:對應(yīng) onError 方法,表示發(fā)布者產(chǎn)生了錯(cuò)誤。

結(jié)束通知:對應(yīng) onComplete 方法,表示發(fā)布者已經(jīng)完成了所有數(shù)據(jù)的發(fā)布。

在上述3種通知中,錯(cuò)誤通知和結(jié)束通知都是終結(jié)通知,也就是在終結(jié)通知之后,不會再有其他通知產(chǎn)生。

  • Subscription

Subscription 表示的是一個(gè)訂閱關(guān)系。除了之前提到的 request 方法之外,還有 cancel 方法用來取消訂閱。需要注意的是,在  cancel 方法調(diào)用之后,發(fā)布者仍然有可能繼續(xù)發(fā)布通知。但訂閱最終會被取消。

這幾個(gè)接口的關(guān)系如下圖所示:

Reactive-MongoDB如何異步Java Driver

圖片出處:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html

MongoDB 的異步驅(qū)動為 mongo-java-driver-reactivestreams 組件,其實(shí)現(xiàn)了 Reactive Stream  的上述接口。

> 除了 reactivestream 之外,MongoDB 的異步驅(qū)動還包含 RxJava 等風(fēng)格的版本,有興趣的讀者可以進(jìn)一步了解

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/

三、使用示例

接下來,通過一個(gè)簡單的例子來演示一下 Reactive 方式的代碼風(fēng)格:

A. 引入依賴

org.mongodb    mongodb-driver-reactivestreams    1.11.0

> 引入mongodb-driver-reactivestreams 將會自動添加 reactive-streams, bson,  mongodb-driver-async組件

B. 連接數(shù)據(jù)庫

//服務(wù)器實(shí)例表List servers =newArrayList(); servers.add(newServerAddress("localhost",27018));//配置構(gòu)建器MongoClientSettings.Builder settingsBuilder =MongoClientSettings.builder();//傳入服務(wù)器實(shí)例 settingsBuilder.applyToClusterSettings(         builder -> builder.hosts(servers));//構(gòu)建 Client 實(shí)例MongoClient mongoClient =MongoClients.create(settingsBuilder.build());

C. 實(shí)現(xiàn)文檔查詢

//獲得數(shù)據(jù)庫對象MongoDatabase database = client.getDatabase(databaseName);//獲得集合MongoCollection collection = database.getCollection(collectionName);//異步返回PublisherFindPublisher publisher = collection.find();//訂閱實(shí)現(xiàn) publisher.subscribe(newSubscriber(){     @Override     publicvoid onSubscribe(Subscription s){         System.out.println("start...");         //執(zhí)行請求         s.request(Integer.MAX_VALUE);      }     @Override     publicvoid onNext(Document document){         //獲得文檔         System.out.println("Document:"+ document.toJson());     }      @Override     publicvoid onError(Throwable t){         System.out.println("error occurs.");     }      @Override     publicvoid onComplete(){         System.out.println("finished.");     }});

注意到,與使用同步驅(qū)動不同的是,collection.find()方法返回的不是 Cursor,而是一個(gè)  FindPublisher對象,這是Publisher接口的一層擴(kuò)展。

而且,在返回 Publisher 對象時(shí),此時(shí)并沒有產(chǎn)生真正的數(shù)據(jù)庫IO請求。真正發(fā)起請求需要通過調(diào)用  Subscription.request()方法。

在上面的代碼中,為了讀取由 Publisher 產(chǎn)生的結(jié)果,通過自定義一個(gè)Subscriber,在onSubscribe 事件觸發(fā)時(shí)就執(zhí)行  數(shù)據(jù)庫的請求,之后分別對 onNext、onError、onComplete進(jìn)行處理。

盡管這種實(shí)現(xiàn)方式是純異步的,但在使用上比較繁瑣。試想如果對于每個(gè)數(shù)據(jù)庫操作都要完成一個(gè)Subscriber 邏輯,那么開發(fā)的工作量是巨大的。

為了盡可能復(fù)用重復(fù)的邏輯,可以對Subscriber的邏輯做一層封裝,包含如下功能:

  • 使用 List 容器對請求結(jié)果進(jìn)行緩存

  • 實(shí)現(xiàn)阻塞等待結(jié)果的方法,可指定超時(shí)時(shí)間

  • 捕獲異常,在等待結(jié)果時(shí)拋出

代碼如下:

publicclassObservableSubscriberimplementsSubscriber{      //響應(yīng)數(shù)據(jù)     privatefinalList received;     //錯(cuò)誤信息     privatefinalList errors;     //等待對象     privatefinalCountDownLatch latch;     //訂閱器     privatevolatileSubscription subscription;     //是否完成     privatevolatileboolean completed;      publicObservableSubscriber(){         this.received =newArrayList();         this.errors =newArrayList();         this.latch =newCountDownLatch(1);     }      @Override     publicvoid onSubscribe(finalSubscription s){         subscription = s;     }      @Override     publicvoid onNext(final T t){         received.add(t);     }      @Override     publicvoid onError(finalThrowable t){         errors.add(t);         onComplete();     }      @Override     publicvoid onComplete(){         completed =true;         latch.countDown();     }      publicSubscription getSubscription(){         return subscription;     }      publicList getReceived(){         return received;     }      publicThrowable getError(){         if(errors.size()>0){             return errors.get(0);         }         returnnull;     }      publicboolean isCompleted(){         return completed;     }      /**      * 阻塞一定時(shí)間等待結(jié)果      *      * @param timeout      * @param unit      * @return      * @throws Throwable      */     publicListget(finallong timeout,finalTimeUnit unit)throwsThrowable{         return await(timeout, unit).getReceived();     }      /**      * 一直阻塞等待請求完成      *      * @return      * @throws Throwable      */     publicObservableSubscriber await()throwsThrowable{         return await(Long.MAX_VALUE,TimeUnit.MILLISECONDS);     }      /**      * 阻塞一定時(shí)間等待完成      *      * @param timeout      * @param unit      * @return      * @throws Throwable      */     publicObservableSubscriber await(finallong timeout,finalTimeUnit unit)throwsThrowable{         subscription.request(Integer.MAX_VALUE);         if(!latch.await(timeout, unit)){             thrownewMongoTimeoutException("Publisher onComplete timed out");         }         if(!errors.isEmpty()){             throw errors.get(0);         }         returnthis;     }}

借助這個(gè)基礎(chǔ)的工具類,我們對于文檔的異步操作就變得簡單多了。

比如對于文檔查詢的操作可以改造如下:

ObservableSubscriber subscriber =newObservableSubscriber(); collection.find().subscribe(subscriber);//結(jié)果處理 subscriber.get(15,TimeUnit.SECONDS).forEach( d ->{     System.out.println("Document:"+ d.toJson());});

當(dāng)然,這個(gè)例子還有可以繼續(xù)完善,比如使用 List 作為緩存,則要考慮數(shù)據(jù)量的問題,避免將全部(或超量) 的文檔一次性轉(zhuǎn)入內(nèi)存。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI