溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

使用CompletableFuture怎么實(shí)現(xiàn)并發(fā)編程

發(fā)布時(shí)間:2021-06-22 16:00:01 來(lái)源:億速云 閱讀:349 作者:Leah 欄目:編程語(yǔ)言

今天就跟大家聊聊有關(guān)使用CompletableFuture怎么實(shí)現(xiàn)并發(fā)編程,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

實(shí)例化 首先,不管我們要做什么,我們第一步是需要構(gòu)造出 CompletableFuture 實(shí)例。

最簡(jiǎn)單的,我們可以通過(guò)構(gòu)造函數(shù)來(lái)進(jìn)行實(shí)例化:

CompletableFuture<String> cf = new CompletableFuture<String>(); 這個(gè)實(shí)例此時(shí)還沒(méi)有什么用,因?yàn)樗鼪](méi)有實(shí)際的任務(wù),我們選擇結(jié)束這個(gè)任務(wù):

// 可以選擇在當(dāng)前線程結(jié)束,也可以在其他線程結(jié)束 cf.complete("coding..."); 因?yàn)?CompletableFuture 是一個(gè) Future,我們用 String result = cf.get() 就能獲取到結(jié)果了。

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區(qū)別在于 join() 拋出的是 unchecked Exception。

上面的代碼確實(shí)沒(méi)什么用,下面介紹幾個(gè) static 方法,它們使用任務(wù)來(lái)實(shí)例化一個(gè) CompletableFuture 實(shí)例。

CompletableFuture.runAsync(Runnable runnable); CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier); CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) runAsync 方法接收的是 Runnable 的實(shí)例,意味著它沒(méi)有返回值 supplyAsync 方法對(duì)應(yīng)的是有返回值的情況 這兩個(gè)方法的帶 executor 的變種,表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在 ForkJoinPool.commonPool() 線程池中執(zhí)行的。 好的,現(xiàn)在我們已經(jīng)有了第一個(gè) CompletableFuture 實(shí)例了,我們來(lái)看接下來(lái)的內(nèi)容。

任務(wù)之間的順序執(zhí)行 我們先來(lái)看執(zhí)行兩個(gè)任務(wù)的情況,首先執(zhí)行任務(wù) A,然后將任務(wù) A 的結(jié)果傳遞給任務(wù) B。

其實(shí)這里有很多種情況,任務(wù) A 是否有返回值,任務(wù) B 是否需要任務(wù) A 的返回值,任務(wù) B 是否有返回值,等等。有個(gè)明確的就是,肯定是任務(wù) A 執(zhí)行完后再執(zhí)行任務(wù) B。

我們用下面的 6 行代碼來(lái)說(shuō):

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");

CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB"); 前面 3 行代碼演示的是,任務(wù) A 無(wú)返回值,所以對(duì)應(yīng)的,第 2 行和第 3 行代碼中,resultA 其實(shí)是 null。

第 4 行用的是 thenRun(Runnable runnable),任務(wù) A 執(zhí)行完執(zhí)行 B,并且 B 不需要 A 的結(jié)果。

第 5 行用的是 thenAccept(Consumer action),任務(wù) A 執(zhí)行完執(zhí)行 B,B 需要 A 的結(jié)果,但是任務(wù) B 不返回值。

第 6 行用的是 thenApply(Function fn),任務(wù) A 執(zhí)行完執(zhí)行 B,B 需要 A 的結(jié)果,同時(shí)任務(wù) B 有返回值。

這一小節(jié)說(shuō)完了,如果任務(wù) B 后面還有任務(wù) C,往下繼續(xù)調(diào)用 .thenXxx() 即可。

異常處理 說(shuō)到這里,我們順便來(lái)說(shuō)下 CompletableFuture 的異常處理。這里我們要介紹兩個(gè)方法:

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); 看下面的代碼:

CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD"); 上面的代碼中,任務(wù) A、B、C、D 依次執(zhí)行,如果任務(wù) A 拋出異常(當(dāng)然上面的代碼不會(huì)拋出異常),那么后面的任務(wù)都得不到執(zhí)行。如果任務(wù) C 拋出異常,那么任務(wù) D 得不到執(zhí)行。

那么我們?cè)趺刺幚懋惓D兀靠聪旅娴拇a,我們?cè)谌蝿?wù) A 中拋出異常,并對(duì)其進(jìn)行處理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); }) .exceptionally(ex -> "errorResultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join()); 上面的代碼中,任務(wù) A 拋出異常,然后通過(guò) .exceptionally() 方法處理了異常,并返回新的結(jié)果,這個(gè)新的結(jié)果將傳遞給任務(wù) B。所以最終的輸出結(jié)果是:

errorResultA resultB resultC resultD 再看下面的代碼,我們來(lái)看下另一種處理方式,使用 handle(BiFunction fn) 來(lái)處理異常:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") // 任務(wù) C 拋出異常 .thenApply(resultB -> {throw new RuntimeException();}) // 處理任務(wù) C 的返回值或異常 .handle(new BiFunction<Object, Throwable, Object>() { @Override public Object apply(Object re, Throwable throwable) { if (throwable != null) { return "errorResultC"; } return re; } }) .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join()); 上面的代碼使用了 handle 方法來(lái)處理任務(wù) C 的執(zhí)行結(jié)果,上面的代碼中,re 和 throwable 必然有一個(gè)是 null,它們分別代表正常的執(zhí)行結(jié)果和異常的情況。

當(dāng)然,它們也可以都為 null,因?yàn)槿绻饔玫哪莻€(gè) CompletableFuture 實(shí)例沒(méi)有返回值的時(shí)候,re 就是 null。

取兩個(gè)任務(wù)的結(jié)果 上面一節(jié),我們說(shuō)的是,任務(wù) A 執(zhí)行完 -> 任務(wù) B 執(zhí)行完 -> 執(zhí)行任務(wù) C,它們之間有先后執(zhí)行關(guān)系,因?yàn)楹竺娴娜蝿?wù)依賴(lài)于前面的任務(wù)的結(jié)果。

這節(jié)我們來(lái)看怎么讓任務(wù) A 和任務(wù) B 同時(shí)執(zhí)行,然后取它們的結(jié)果進(jìn)行后續(xù)操作。這里強(qiáng)調(diào)的是任務(wù)之間的并行工作,沒(méi)有先后執(zhí)行順序。

如果使用 Future 的話,我們通常是這么寫(xiě)的:

ExecutorService executorService = Executors.newCachedThreadPool();

Future<String> futureA = executorService.submit(() -> "resultA"); Future<String> futureB = executorService.submit(() -> "resultB");

String resultA = futureA.get(); String resultB = futureB.get(); 接下來(lái),我們看看 CompletableFuture 中是怎么寫(xiě)的,看下面的幾行代碼:

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B"); cfA.runAfterBoth(cfB, () -> {}); 第 3 行代碼和第 4 行代碼演示了怎么使用兩個(gè)任務(wù)的結(jié)果 resultA 和 resultB,它們的區(qū)別在于,thenAcceptBoth 表示后續(xù)的處理不需要返回值,而 thenCombine 表示需要返回值。

如果你不需要 resultA 和 resultB,那么還可以使用第 5 行描述的 runAfterBoth 方法。

注意,上面的寫(xiě)法和下面的寫(xiě)法是沒(méi)有區(qū)別的:

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");

cfA.thenAcceptBoth(CompletableFuture.supplyAsync(() -> "resultB"), (resultA, resultB) -> {}); 千萬(wàn)不要以為這種寫(xiě)法任務(wù) A 執(zhí)行完了以后再執(zhí)行任務(wù) B。

取多個(gè)任務(wù)的結(jié)果 接下來(lái),我們將介紹兩個(gè)非常簡(jiǎn)單的靜態(tài)方法:allOf() 和 anyOf() 方法。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...} public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...} 這兩個(gè)方法都非常簡(jiǎn)單,簡(jiǎn)單介紹一下。

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");

CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC); // 所以這里的 join() 將阻塞,直到所有的任務(wù)執(zhí)行結(jié)束 future.join(); 由于 allOf 聚合了多個(gè) CompletableFuture 實(shí)例,所以它是沒(méi)有返回值的。這也是它的一個(gè)缺點(diǎn)。

anyOf 也非常容易理解,就是只要有任意一個(gè) CompletableFuture 實(shí)例執(zhí)行完成就可以了,看下面的例子:

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");

CompletableFuture<Object> future = CompletableFuture.anyOf(cfA, cfB, cfC); Object result = future.join(); 最后一行的 join() 方法會(huì)返回最先完成的任務(wù)的結(jié)果,所以它的泛型用的是 Object,因?yàn)槊總€(gè)任務(wù)可能返回的類(lèi)型不同。

either 方法 如果你的 anyOf(...) 只需要處理兩個(gè) CompletableFuture 實(shí)例,那么也可以使用 xxxEither() 來(lái)處理,

cfA.acceptEither(cfB, result -> {}); cfA.acceptEitherAsync(cfB, result -> {}); cfA.acceptEitherAsync(cfB, result -> {}, executorService);

cfA.applyToEither(cfB, result -> {return result;}); cfA.applyToEitherAsync(cfB, result -> {return result;}); cfA.applyToEitherAsync(cfB, result -> {return result;}, executorService);

cfA.runAfterEither(cfA, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}, executorService); 上面的各個(gè)帶 either 的方法,表達(dá)的都是一個(gè)意思,指的是兩個(gè)任務(wù)中的其中一個(gè)執(zhí)行完成,就執(zhí)行指定的操作。它們幾組的區(qū)別也很明顯,分別用于表達(dá)是否需要任務(wù) A 和任務(wù) B 的執(zhí)行結(jié)果,是否需要返回值。

大家可能會(huì)對(duì)這里的幾個(gè)變種有盲區(qū),這里順便說(shuō)幾句。

1、cfA.acceptEither(cfB, result -> {}); 和 cfB.acceptEither(cfA, result -> {}); 是一個(gè)意思;

2、第二個(gè)變種,加了 Async 后綴的方法,代表將需要執(zhí)行的任務(wù)放到 ForkJoinPool.commonPool() 中執(zhí)行(非完全嚴(yán)謹(jǐn));第三個(gè)變種很好理解,將任務(wù)放到指定線程池中執(zhí)行;

3、難道第一個(gè)變種是同步的?不是的,而是說(shuō),它由任務(wù) A 或任務(wù) B 所在的執(zhí)行線程來(lái)執(zhí)行,取決于哪個(gè)任務(wù)先結(jié)束。

compose update on 2019-07-26

這里我們簡(jiǎn)單來(lái)說(shuō)說(shuō) CompletableFuture 的最后一塊拼圖,compose 方法。

前面我們介紹了 thenAcceptBoth 和 thenCombine 用于聚合兩個(gè)任務(wù),其實(shí) compose 也是一樣的功能,它們本質(zhì)上都是為了讓多個(gè) CompletableFuture 實(shí)例形成一個(gè)鏈。

我們還是用代碼來(lái)說(shuō)吧:

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> { System.out.println("processing a..."); return "hello"; });

CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> { System.out.println("processing b..."); return " world"; });

CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> { System.out.println("processing c..."); return ", I'm robot!"; }); 我們示例三個(gè)實(shí)例的情況,這邊不介紹 thenAcceptBoth 了,我們來(lái)看下 thenCombine:

cfA.thenCombine(cfB, (resultA, resultB) -> { System.out.println(resultA + resultB); // hello world return resultA + resultB; }).thenCombine(cfC, (resultAB, resultC) -> { System.out.println(resultAB + resultC); // hello world, I'm robot! return resultAB + resultC; }); 我們先有 cfA,然后和 cfB 組成一個(gè)鏈:cfA -> cfB,然后又組合了 cfC,形成鏈:cfA -> cfB -> cfC。

這里有個(gè)隱藏的點(diǎn):cfA、cfB、cfC 它們完全沒(méi)有數(shù)據(jù)依賴(lài)關(guān)系,我們只不過(guò)是聚合了它們的結(jié)果。

這下看 compose 就清楚了:

CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> { // 第一個(gè)實(shí)例的結(jié)果 return "hello"; }).thenCompose(resultA -> CompletableFuture.supplyAsync(() -> { // 把上一個(gè)實(shí)例的結(jié)果傳遞到這里 return resultA + " world"; })).thenCompose(resultAB -> CompletableFuture.supplyAsync(() -> { // 到這里大家應(yīng)該很清楚了 return resultAB + ", I'm robot"; }));

System.out.println(result.join()); // hello world, I'm robot 前面一個(gè) CompletableFuture 實(shí)例的結(jié)果可以傳遞到下一個(gè)實(shí)例中,這就是 compose 和 combine 的主要區(qū)別。

combine 是把結(jié)果進(jìn)行聚合,但是 compose 更像是把多個(gè)已有的 cf 實(shí)例組合成一個(gè)整體的實(shí)例。

thenCompose 和 thenApply 的區(qū)別 評(píng)論區(qū)有同學(xué)關(guān)注到了 thenApply 和 thenCompose,這里簡(jiǎn)單說(shuō)說(shuō)。

我們來(lái)看看它們的方法貼到一起對(duì)比一下:

public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } 使用示例:

CompletableFuture<String> future1 = CompletableFuture .supplyAsync(() -> "hello") .thenApply(cfA -> cfA + " world");

CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> "hello") .thenCompose(cfA -> CompletableFuture.supplyAsync(() -> cfA + " world")); 它們都需要接收一個(gè) Function,這個(gè)函數(shù)的主要的區(qū)別在于 thenApply 中返回一個(gè)具體的值,而 thenCompose 返回一個(gè)新的 cf 實(shí)例。

thenApply 類(lèi)似于 map 操作,把 cf 實(shí)例的結(jié)果加工成另一個(gè)值,像 Stream 里面的 map() 方法。它還有一個(gè)很重要的特征,這里是同步的操作。

如果你希望執(zhí)行一個(gè)異步的 map 操作,那么就應(yīng)該使用 thenCompose 了,比如上面的第二個(gè)例子。

我們來(lái)繼續(xù)較真一下,我們可以讓 thenApply 的 Function 也返回 CompletableFuture 實(shí)例,不就實(shí)現(xiàn)了異步的需求:

CompletableFuture<CompletableFuture<String>> future = CompletableFuture .supplyAsync(() -> "hello") .thenApply(cfA -> CompletableFuture.supplyAsync(() -> cfA + " world")); 可是,返回值我們可就不太喜歡了。說(shuō)到這里,大家可能會(huì)想到 Stream 里面的 flatMap() 了

看完上述內(nèi)容,你們對(duì)使用CompletableFuture怎么實(shí)現(xiàn)并發(fā)編程有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI