您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)Java中怎么利用CompletableFuture實(shí)現(xiàn)異步編程,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
CompletableFuture 的核心優(yōu)勢(shì)
為了領(lǐng)略 CompletableFuture 異步編程的優(yōu)勢(shì),這里我們用 CompletableFuture 重新實(shí)現(xiàn)前面曾提及的燒水泡茶程序。首先還是需要先完成分工方案,在下面的程序中,我們分了 3 個(gè)任務(wù):任務(wù) 1 負(fù)責(zé)洗水壺、燒開水,任務(wù) 2 負(fù)責(zé)洗茶壺、洗茶杯和拿茶葉,任務(wù) 3 負(fù)責(zé)泡茶。其中任務(wù) 3 要等待任務(wù) 1 和任務(wù) 2 都完成后才能開始。這個(gè)分工如下圖所示。
燒水泡茶分工方案
// 任務(wù) 1:洗水壺 -> 燒開水CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{ System.out.println("T1: 洗水壺..."); sleep(1, TimeUnit.SECONDS); System.out.println("T1: 燒開水..."); sleep(15, TimeUnit.SECONDS);});// 任務(wù) 2:洗茶壺 -> 洗茶杯 -> 拿茶葉CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ System.out.println("T2: 洗茶壺..."); sleep(1, TimeUnit.SECONDS); System.out.println("T2: 洗茶杯..."); sleep(2, TimeUnit.SECONDS); System.out.println("T2: 拿茶葉..."); sleep(1, TimeUnit.SECONDS); return " 龍井 ";});// 任務(wù) 3:任務(wù) 1 和任務(wù) 2 完成后執(zhí)行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{ System.out.println("T1: 拿到茶葉:" + tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; });// 等待任務(wù) 3 執(zhí)行結(jié)果System.out.println(f3.join());void sleep(int t, TimeUnit u) { try { u.sleep(t); }catch(InterruptedException e){}}// 一次執(zhí)行結(jié)果:T1: 洗水壺...T2: 洗茶壺...T1: 燒開水...T2: 洗茶杯...T2: 拿茶葉...T1: 拿到茶葉: 龍井T1: 泡茶...上茶: 龍井
從整體上來看,我們會(huì)發(fā)現(xiàn)
無需手工維護(hù)線程,沒有繁瑣的手工維護(hù)線程的工作,給任務(wù)分配線程的工作也不需要我們關(guān)注;語義更清晰,例如f3 = f1.thenCombine(f2, ()->{})
能夠清晰地表述“任務(wù) 3 要等待任務(wù) 1 和任務(wù) 2 都完成后才能開始”;代碼更簡(jiǎn)練并且專注于業(yè)務(wù)邏輯,幾乎所有代碼都是業(yè)務(wù)邏輯相關(guān)的。
領(lǐng)略 CompletableFuture 異步編程的優(yōu)勢(shì)之后,下面我們?cè)敿?xì)介紹 CompletableFuture 的使用。
創(chuàng)建 CompletableFuture 對(duì)象
創(chuàng)建 CompletableFuture 對(duì)象主要靠下面代碼中展示的這 4 個(gè)靜態(tài)方法,我們先看前兩個(gè)。在燒水泡茶的例子中,我們已經(jīng)使用了runAsync(Runnable runnable)
和 supplyAsync(Supplier<U> supplier)
,它們之間的區(qū)別是:Runnable 接口的 run() 方法沒有返回值,而 Supplier 接口的 get() 方法是有返回值的。
前兩個(gè)方法和后兩個(gè)方法的區(qū)別在于:后兩個(gè)方法可以指定線程池參數(shù)。
默認(rèn)情況下 CompletableFuture 會(huì)使用公共的 ForkJoinPool 線程池,這個(gè)線程池默認(rèn)創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism
來設(shè)置 ForkJoinPool 線程池的線程數(shù))。如果所有 CompletableFuture 共享一個(gè)線程池,那么一旦有任務(wù)執(zhí)行一些很慢的 I/O 操作,就會(huì)導(dǎo)致線程池中所有線程都阻塞在 I/O 操作上,從而造成線程饑餓,進(jìn)而影響整個(gè)系統(tǒng)的性能。所以,強(qiáng)烈建議你要根據(jù)不同的業(yè)務(wù)類型創(chuàng)建不同的線程池,以避免互相干擾
。
// 使用默認(rèn)線程池static CompletableFuture<Void> runAsync(Runnable runnable)static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)// 可以指定線程池static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
創(chuàng)建完 CompletableFuture 對(duì)象之后,會(huì)自動(dòng)地異步執(zhí)行 runnable.run() 方法或者 supplier.get() 方法,對(duì)于一個(gè)異步操作,你需要關(guān)注兩個(gè)問題:一個(gè)是異步操作什么時(shí)候結(jié)束,另一個(gè)是如何獲取異步操作的執(zhí)行結(jié)果。因?yàn)?CompletableFuture 類實(shí)現(xiàn)了 Future 接口,所以這兩個(gè)問題你都可以通過 Future 接口來解決。另外,CompletableFuture 類還實(shí)現(xiàn)了 CompletionStage 接口,這個(gè)接口內(nèi)容實(shí)在是太豐富了,在 1.8 版本里有 40 個(gè)方法,這些方法我們?cè)撊绾卫斫饽兀?/p>
理解 CompletionStage 接口
可以站在分工的角度類比一下工作流。任務(wù)是有時(shí)序關(guān)系的,比如有串行關(guān)系、并行關(guān)系、匯聚關(guān)系
等。這樣說可能有點(diǎn)抽象,這里還舉前面燒水泡茶的例子,其中洗水壺和燒開水就是串行關(guān)系,洗水壺、燒開水和洗茶壺、洗茶杯這兩組任務(wù)之間就是并行關(guān)系,而燒開水、拿茶葉和泡茶就是匯聚關(guān)系。
串行關(guān)系
并行關(guān)系
匯聚關(guān)系
CompletionStage 接口可以清晰地描述任務(wù)之間的這種時(shí)序關(guān)系,例如前面提到的f3 = f1.thenCombine(f2, ()->{})
描述的就是一種匯聚關(guān)系。燒水泡茶程序中的匯聚關(guān)系是一種 AND 聚合關(guān)系,這里的 AND 指的是所有依賴的任務(wù)(燒開水和拿茶葉)都完成后才開始執(zhí)行當(dāng)前任務(wù)(泡茶)。既然有 AND 聚合關(guān)系,那就一定還有 OR 聚合關(guān)系,所謂 OR 指的是依賴的任務(wù)只要有一個(gè)完成就可以執(zhí)行當(dāng)前任務(wù)。
最后就是異常,CompletionStage 接口也可以方便地描述異常處理。
下面我們就來一一介紹,CompletionStage 接口如何描述串行關(guān)系、AND 聚合關(guān)系、OR 聚合關(guān)系以及異常處理。
1. 描述串行關(guān)系
CompletionStage 接口里面描述串行關(guān)系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 這四個(gè)系列的接口。
thenApply 系列函數(shù)里參數(shù) fn 的類型是接口 Function<T, R>,這個(gè)接口里與 CompletionStage 相關(guān)的方法是R apply(T t)
,這個(gè)方法既能接收參數(shù)也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<R>
。
而 thenAccept 系列方法里參數(shù) consumer 的類型是接口Consumer<T>
,這個(gè)接口里與 CompletionStage 相關(guān)的方法是void accept(T t)
,這個(gè)方法雖然支持參數(shù),但卻不支持回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>
thenRun 系列方法里 action 的參數(shù)是 Runnable,所以 action 既不能接收參數(shù)也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage<Void>
這些方法里面 Async 代表的是異步執(zhí)行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,這個(gè)系列的方法會(huì)新創(chuàng)建出一個(gè)子流程,最終結(jié)果和 thenApply 系列是相同的。
CompletionStage<R> thenApply(fn);CompletionStage<R> thenApplyAsync(fn);CompletionStage<Void> thenAccept(consumer);CompletionStage<Void> thenAcceptAsync(consumer);CompletionStage<Void> thenRun(action);CompletionStage<Void> thenRunAsync(action);CompletionStage<R> thenCompose(fn);CompletionStage<R> thenComposeAsync(fn);
通過下面的示例代碼,你可以看一下 thenApply() 方法是如何使用的。首先通過 supplyAsync() 啟動(dòng)一個(gè)異步流程,之后是兩個(gè)串行操作,整體看起來還是挺簡(jiǎn)單的。不過,雖然這是一個(gè)異步流程,但任務(wù)①②③卻是串行執(zhí)行的,②依賴①的執(zhí)行結(jié)果,③依賴②的執(zhí)行結(jié)果。
CompletableFuture<String> f0 = CompletableFuture.supplyAsync( () -> "Hello World") //① .thenApply(s -> s + " QQ") //② .thenApply(String::toUpperCase);//③System.out.println(f0.join());// 輸出結(jié)果HELLO WORLD QQ
2. 描述 AND 匯聚關(guān)系
CompletionStage 接口里面描述 AND 匯聚關(guān)系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,這些接口的區(qū)別也是源自 fn、consumer、action 這三個(gè)核心參數(shù)不同。
CompletionStage<R> thenCombine(other, fn);CompletionStage<R> thenCombineAsync(other, fn);CompletionStage<Void> thenAcceptBoth(other, consumer);CompletionStage<Void> thenAcceptBothAsync(other, consumer);CompletionStage<Void> runAfterBoth(other, action);CompletionStage<Void> runAfterBothAsync(other, action);
3. 描述 OR 匯聚關(guān)系
CompletionStage 接口里面描述 OR 匯聚關(guān)系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,這些接口的區(qū)別也是源自 fn、consumer、action 這三個(gè)核心參數(shù)不同。
CompletionStage applyToEither(other, fn);CompletionStage applyToEitherAsync(other, fn);CompletionStage acceptEither(other, consumer);CompletionStage acceptEitherAsync(other, consumer);CompletionStage runAfterEither(other, action);CompletionStage runAfterEitherAsync(other, action);
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{ int t = getRandom(5, 10); sleep(t, TimeUnit.SECONDS); return String.valueOf(t);});CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ int t = getRandom(5, 10); sleep(t, TimeUnit.SECONDS); return String.valueOf(t);});CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s);System.out.println(f3.join());
4. 異常處理
雖然上面我們提到的 fn、consumer、action 它們的核心方法都不允許拋出可檢查異常,但是卻無法限制它們拋出運(yùn)行時(shí)異常
,例如下面的代碼,執(zhí)行
CompletableFuture<Integer> f0 = CompletableFuture. .supplyAsync(()->(7/0)) .thenApply(r->r*10);System.out.println(f0.join());
CompletionStage 接口給我們提供的方案非常簡(jiǎn)單,比 try{}catch{}還要簡(jiǎn)單,下面是相關(guān)的方法,使用這些方法進(jìn)行異常處理和串行操作是一樣的,都支持鏈?zhǔn)骄幊谭绞健?/p>
CompletionStage exceptionally(fn);CompletionStage<R> whenComplete(consumer);CompletionStage<R> whenCompleteAsync(consumer);CompletionStage<R> handle(fn);CompletionStage<R> handleAsync(fn);
下面的示例代碼展示了如何使用 exceptionally() 方法來處理異常,exceptionally() 的使用非常類似于 try{}catch{}中的 catch{},但是由于支持鏈?zhǔn)骄幊谭绞?,所以相?duì)更簡(jiǎn)單。
whenComplete() 和 handle() 系列方法就類似于 try{}finally{}中的 finally{},無論是否發(fā)生異常都會(huì)執(zhí)行 whenComplete() 中的回調(diào)函數(shù) consumer 和 handle() 中的回調(diào)函數(shù) fn。
whenComplete() 和 handle() 的區(qū)別在于 whenComplete() 不支持返回結(jié)果,而 handle() 是支持返回結(jié)果的。
CompletableFuture<Integer> f0 = CompletableFuture .supplyAsync(()->7/0)) .thenApply(r->r*10) .exceptionally(e->0);System.out.println(f0.join());
以上就是Java中怎么利用CompletableFuture實(shí)現(xiàn)異步編程,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。