溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

java多線程如何通過CompletableFuture組裝異步計(jì)算單元

發(fā)布時(shí)間:2023-05-12 10:29:55 來源:億速云 閱讀:109 作者:zzz 欄目:編程語言

今天小編給大家分享一下java多線程如何通過CompletableFuture組裝異步計(jì)算單元的相關(guān)知識(shí)點(diǎn),內(nèi)容詳細(xì),邏輯清晰,相信大部分人都還太了解這方面的知識(shí),所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

    CompletableFuture 介紹

    CompletableFuture是1.8引入的新特性,一些比較復(fù)雜的異步計(jì)算場(chǎng)景,尤其是需要串聯(lián)多個(gè)異步計(jì)算單元的場(chǎng)景,可以考慮使用 CompletableFuture 來實(shí)現(xiàn)。

    在現(xiàn)實(shí)世界中,我們需要解決的復(fù)雜問題都是要分為若干步驟。就像我們的代碼一樣,一個(gè)復(fù)雜的邏輯方法中,會(huì)調(diào)用多個(gè)方法來一步一步實(shí)現(xiàn)。

    設(shè)想如下場(chǎng)景,植樹節(jié)要進(jìn)行植樹,分為下面幾個(gè)步驟:

    • 挖坑 10 分鐘

    • 拿樹苗 5 分鐘

    • 種樹苗 20 分鐘

    • 澆水 5 分鐘

    其中 1 和 2 可以并行,1 和 2 都完成了才能進(jìn)行步驟 3,然后才能進(jìn)行步驟 4。

    我們有如下幾種實(shí)現(xiàn)方式:

    只有一個(gè)人種樹

    如果現(xiàn)在只有一個(gè)人植樹,要種 100 棵樹,那么只能按照如下順序執(zhí)行:

    java多線程如何通過CompletableFuture組裝異步計(jì)算單元

    圖中僅列舉種三棵樹示意??梢钥吹酱袌?zhí)行,只能種完一棵樹再種一棵,那么種完 100 棵樹需要 40 * 100 = 4000 分鐘。 這種方式對(duì)應(yīng)到程序,就是單線程同步執(zhí)行。

    三個(gè)人同時(shí)種樹,每個(gè)人負(fù)責(zé)種一棵樹

    如何縮短種樹時(shí)長(zhǎng)呢?你肯定想這還不好辦,學(xué)習(xí)了這么久的并發(fā),這肯定難不倒我。不是要種 100 棵樹嗎?那我找 100 個(gè)人一塊種,每個(gè)人種一棵。那么只需要 40 分鐘就可以種完 100 棵樹了。

    沒錯(cuò),如果你的程序有個(gè)方法叫做 plantTree,里面包含了如上四部,那么你起 100 個(gè)線程就可以了。但是,請(qǐng)注意,100 個(gè)線程的創(chuàng)建和銷毀需要消耗大量的系統(tǒng)資源。并且創(chuàng)建和銷毀線程都有時(shí)間消耗。此外CPU的核數(shù)并不能真的支持100個(gè)線程并發(fā)。如果我們要種1萬棵樹呢?總不能起一萬個(gè)線程吧?

    所以這只是理想情況,我們一般是通過線程池來執(zhí)行,并不會(huì)真的啟動(dòng)100個(gè)線程。

    多個(gè)人同時(shí)種樹

    種每一棵樹的時(shí)候,不依賴的步驟可以分不同的人并行干

    這種方式可以進(jìn)一步縮短種樹的時(shí)長(zhǎng),因?yàn)榈谝徊酵诳雍偷诙侥脴涿缈梢詢蓚€(gè)人并行去做,所以每棵樹只需要35 分鐘。如下圖:

    java多線程如何通過CompletableFuture組裝異步計(jì)算單元

    如果程序還是 100 個(gè)主線程并發(fā)運(yùn)行 plantTree 方法,那么只需要 35 分鐘種完 100 顆樹。 這里需要注意每個(gè)線程中,由于還要并發(fā)兩個(gè)線程去做 1,2 兩個(gè)步驟。實(shí)際運(yùn)行中會(huì)又 100 x 3 = 300 個(gè)線程參與植樹。但是負(fù)責(zé) 1,2 步驟的線程只會(huì)短暫參與,然后就閑置了。

    這種方法和第二種方式也存在大量創(chuàng)建線程的問題。所以也只是理想情況。

    假如只有 4 個(gè)人植樹,每個(gè)人只負(fù)責(zé)自己的步驟

    java多線程如何通過CompletableFuture組裝異步計(jì)算單元

    可以看到一開始小王挖完第一個(gè)坑后,小李已經(jīng)取回兩個(gè)樹苗,但此時(shí)小張才能開始種第一個(gè)樹苗。此后小張就可以一個(gè)接一個(gè)的去種樹苗了,并且在他種下一棵樹苗的時(shí)候,小趙可以并行澆水。按照這個(gè)流程走下來,種完 100 顆樹苗需要 10+20x100+5=2015 分鐘。比單線程的4000分鐘好了很多,但是遠(yuǎn)遠(yuǎn)比不上 100 個(gè)線程并發(fā)種樹的速度。不過不要忘記 100 個(gè)線程并發(fā)只是理想情況,而本方法只用了 4 個(gè)線程。

    我們?cè)賹?duì)分工做下調(diào)整。每個(gè)人不只干自己的工作,一旦自己的工作做完了就看有沒有其他工作可以做。比如小王挖坑完后,發(fā)現(xiàn)可以種樹苗,那么他就去種樹苗。小李拿樹苗完成后也可以去挖坑或者種樹苗。這樣整體的效率就會(huì)更高了。如果基于這種思想,那么我們實(shí)際上把任務(wù)分成了 4 類,每類 100 件,一共 400 件任務(wù)。400 件任務(wù)全部完成,意味著整個(gè)任務(wù)就完成了。那么任務(wù)的參與者只需要知道任務(wù)的依賴,然后不斷領(lǐng)取可以執(zhí)行的任務(wù)去執(zhí)行。這樣的效率將會(huì)是最高的。

    前文說到我們不可能通過100個(gè)線程并發(fā)來執(zhí)行任務(wù),所以一般情況下我們都會(huì)使用線程池,這和上面的設(shè)計(jì)思想不謀而合。使用線程池后,由于第四種方式把步驟拆的更細(xì),提高了并發(fā)的可能性。因此速度會(huì)比第二種方式更快。那么和第三種比起來,哪種更快呢?如果線程數(shù)量可以無窮大,這兩個(gè)方法能達(dá)到的最短時(shí)間是一樣的,都是 35 分鐘。不過在線程有限的情況下,第四種方式對(duì)線程的使用率會(huì)更高,因?yàn)槊總€(gè)步驟都可以并行執(zhí)行(參與種樹的人完成自己的工作后,都可以去幫助其他人),線程的調(diào)度更為靈活,所以線程池中的線程很難閑下來,一直保持在運(yùn)轉(zhuǎn)之中。是的,誰都不能偷懶。而第三種由于只能并發(fā)在 plantTree 方法及挖坑和拿樹苗,所以不如第四種方式靈活

    上文講了這么多,主要是要說明 CompletableFuture 出現(xiàn)的原因。他用來把復(fù)雜任務(wù)拆解為一個(gè)個(gè)銜接的異步執(zhí)行步驟,從而提升整體的效率。我們回一下小節(jié)題目:誰都不能偷懶。沒錯(cuò),這就是 CompletableFuture 要達(dá)到的效果,通過對(duì)計(jì)算單元的抽象,讓線程能夠高效的并發(fā)參與每一個(gè)步驟。同步的代碼通過 CompletableFuture 可以完全改造為異步代碼。下面我們就來看看如何使用 CompletableFuture。

    CompletableFuture 使用

    CompletableFuture 實(shí)現(xiàn)了 Future 接口并且實(shí)現(xiàn)了 CompletionStage 接口。Future 接口我們已經(jīng)很熟悉了,而CompletionStage 接口定了異步計(jì)算步驟之間的規(guī)范,這樣確保一步一步能夠銜接上。CompletionStage 定義了38 個(gè) public 的方法用于異步計(jì)算步驟間的銜接。接下來我們會(huì)挑選一些常用的,相對(duì)使用頻率較高的方法,來看看如何使用。

    已知計(jì)算結(jié)果

    如果你已經(jīng)知道 CompletableFuture 的計(jì)算結(jié)果,可以使用靜態(tài)方法 completedFuture。傳入計(jì)算結(jié)果,聲明CompletableFuture 對(duì)象。在調(diào)用 get 方法時(shí)會(huì)立即返回傳入的計(jì)算結(jié)果,不會(huì)被阻塞,如下代碼:

    public static void main(String[] args) throws Exception{
        CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello World");
        System.out.println("result is " + completableFuture.get());
    }
    // result is Hello World

    是不是覺得這種用法沒有什么意義?既然知道計(jì)算結(jié)果了,直接使用就好了,為什么還要通過 CompletableFuture 進(jìn)行包裝?這是因?yàn)楫惒接?jì)算單元需要通過 CompletableFuture 進(jìn)行銜接,所以有的時(shí)候我們即使已經(jīng)知道計(jì)算結(jié)果,也需要包裝為 CompletableFuture,才能融入到異步計(jì)算的流程之中。

    封裝有返回值的異步計(jì)算邏輯

    這是我們最常用的方式。把需要異步計(jì)算的邏輯封裝為一個(gè)計(jì)算單元,交由 CompletableFuture 去運(yùn)行。如下面的代碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成

    這里我們使用了 CompletableFuture 的 supplyAsync 方法,以 lambda 表達(dá)式的方式向其傳遞了一個(gè) supplier 接口的實(shí)現(xiàn)。

    可見 completableFuture.get() 拿到的計(jì)算結(jié)果就是你傳入函數(shù)執(zhí)行后 return 的值。那么如果你有需要異步計(jì)算的邏輯,那么就可以放到 supplyAsync 傳入的函數(shù)體中。這段函數(shù)是如何被異步執(zhí)行的呢?如果你跟入代碼可以看到其實(shí) supplyAsync 是通過 Executor,也就是線程池來運(yùn)行這段函數(shù)的。completableFuture 默認(rèn)使用的是ForkJoinPool,當(dāng)然你也可以通過為 supplyAsync 指定其他 Excutor,通過第二個(gè)參數(shù)傳入 supplyAsync 方法。

    supplyAsync 使用場(chǎng)景非常多,舉個(gè)簡(jiǎn)單的例子,主程序需要調(diào)用多個(gè)微服務(wù)的接口請(qǐng)求數(shù)據(jù),那么就可以啟動(dòng)多個(gè) CompletableFuture,調(diào)用 supplyAsync,函數(shù)體中是關(guān)于不同接口的調(diào)用邏輯。這樣不同的接口請(qǐng)求就可以異步同時(shí)運(yùn)行,最后再等全部接口返回時(shí),執(zhí)行后面的邏輯。

    封裝無返回值的異步計(jì)算邏輯

    supplyAsync 接收的函數(shù)是有返回值的。有些情況我們只是一段計(jì)算過程,并不需要返回值。這就像 Runnable 的run 方法,并沒有返回值。這種情況我們可以使用 runAsync方法,如下面的代碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> System.out.println("挖坑完成"));
        completableFuture.get();
    }
    // 挖坑完成

    runAsync 接收 runnable 接口的函數(shù)。所以并無返回值。栗子中的邏輯只是打印“挖坑完成”。

    進(jìn)一步處理異步返回的結(jié)果,并返回新的計(jì)算結(jié)果

    當(dāng)我們通過 supplyAsync 完成了異步計(jì)算,返回 CompletableFuture,此時(shí)可以繼續(xù)對(duì)返回結(jié)果進(jìn)行加工,如下面的代碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenApply(s -> s + ", 并且歸還鐵鍬")
                .thenApply(s -> s + ", 全部完成。");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 并且歸還鐵鍬, 全部完成。

    在調(diào)用 supplyAsync 后,我們兩次鏈?zhǔn)秸{(diào)用 thenApply 方法。s 是前一步 supplyAsync 返回的計(jì)算結(jié)結(jié)果,我們對(duì)結(jié)算結(jié)果進(jìn)行了兩次再加工。我們可以通過 thenApply 不斷對(duì)計(jì)算結(jié)果進(jìn)行加工處理。 如果想異步運(yùn)行 thenApply 的邏輯,可以使用 thenApplyAsync。使用方法相同,只不過會(huì)通過線程池異步運(yùn)行。

    進(jìn)一步處理異步返回的結(jié)果,無返回

    這種場(chǎng)景你可以使用thenApply。這個(gè)方法可以讓你處理上一步的返回結(jié)果,但無返回值。參照如下代碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenAccept(s -> System.out.println(s + ", 并且歸還鐵鍬"));
        completableFuture.get();
    }

    這里可以看到 thenAccept 接收的函數(shù)沒有返回值,只有業(yè)務(wù)邏輯。處理后返回 CompletableFuture 類型對(duì)象。

    既不需要返回值,也不需要上一步計(jì)算結(jié)果,只想在執(zhí)行結(jié)束后再執(zhí)行一段代碼

    此時(shí)你可以使用 thenRun 方法,他接收 Runnable 的函數(shù),沒有輸入也沒有輸出,僅僅是在異步計(jì)算結(jié)束后回調(diào)一段邏輯,比如記錄 log 等。參照下面代碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenAccept(s -> System.out.println(s + ", 并且歸還鐵鍬"))
                .thenRun(() -> System.out.println("挖坑工作已經(jīng)全部完成"));
        completableFuture.get();
    }
    // 挖坑完成, 并且歸還鐵鍬
    // 挖坑工作已經(jīng)全部完成

    可以看到在 thenAccept 之后繼續(xù)調(diào)用了 thenRun,僅僅是打印了日志而已

    組合 Future 處理邏輯

    我們可以把兩個(gè) CompletableFuture 組合起來使用,如下面的代碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", 并且歸還鐵鍬"));
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 并且歸還鐵鍬

    thenApply 和 thenCompose 的關(guān)系就像 stream中的 map 和 flatmap。從上面的例子來看,thenApply 和thenCompose 都可以實(shí)現(xiàn)同樣的功能。但是如果你使用一個(gè)第三方的庫,有一個(gè)API返回的是CompletableFuture 類型,那么你就只能使用 thenCompose方法。

    組合Futurue結(jié)果

    如果你有兩個(gè)異步操作互相沒有依賴,但是第三步操作依賴前兩部計(jì)算的結(jié)果,那么你可以使用 thenCombine 方法來實(shí)現(xiàn),如下面代碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成")
                .thenCombine(CompletableFuture.supplyAsync(() -> ", 拿樹苗完成"), (x, y) -> x + y + "植樹完成");
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑完成, 拿樹苗完成植樹完成

    挖坑和拿樹苗可以同時(shí)進(jìn)行,但是第三步植樹則祖堯前兩步完成后才能進(jìn)行。

    可以看到符合我們的預(yù)期。使用場(chǎng)景之前也提到過。我們調(diào)用多個(gè)微服務(wù)的接口時(shí),可以使用這種方式進(jìn)行組合。處理接口調(diào)用間的依賴關(guān)系。 當(dāng)你需要兩個(gè) Future 的結(jié)果,但是不需要再加工后向下游傳遞計(jì)算結(jié)果時(shí),可以使用 thenAcceptBoth,用法一樣,只不過接收的函數(shù)沒有返回值。

    并行處理多個(gè) Future

    假如我們對(duì)微服務(wù)接口的調(diào)用不止兩個(gè),并且還有一些其它可以異步執(zhí)行的邏輯。主流程需要等待這些所有的異步操作都返回時(shí),才能繼續(xù)往下執(zhí)行。此時(shí)我們可以使用 CompletableFuture.allOf 方法。它接收 n 個(gè) CompletableFuture,返回一個(gè) CompletableFuture。對(duì)其調(diào)用 get 方法后,只有所有的 CompletableFuture 全完成時(shí)才會(huì)繼續(xù)后面的邏輯。我們看下面示例代碼:

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("挖坑完成");
        });
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("取樹苗完成");
        });
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("取肥料完成");
        });
        CompletableFuture.allOf(future1, future2, future3).get();
        System.out.println("植樹準(zhǔn)備工作完成!");
    }
    // 挖坑完成
    // 取肥料完成
    // 取樹苗完成
    // 植樹準(zhǔn)備工作完成!
    異常處理

    在異步計(jì)算鏈中的異常處理可以采用 handle 方法,它接收兩個(gè)參數(shù),第一個(gè)參數(shù)是計(jì)算及過,第二個(gè)參數(shù)是異步計(jì)算鏈中拋出的異常。使用方法如下:

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            if (1 == 1) {
                throw new RuntimeException("Computation error");
            }
            return "挖坑完成";
        }).handle((result, throwable) -> {
            if (result == null) {
                return "挖坑異常";
            }
            return result;
        });
        System.out.println("result is " + completableFuture.get());
    }
    // result is 挖坑異常

    代碼中會(huì)拋出一個(gè) RuntimeException,拋出這個(gè)異常時(shí) result 為 null,而 throwable 不為null。根據(jù)這些信息你可以在 handle 中進(jìn)行處理,如果拋出的異常種類很多,你可以判斷 throwable 的類型,來選擇不同的處理邏輯。

    以上就是“java多線程如何通過CompletableFuture組裝異步計(jì)算單元”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會(huì)為大家更新不同的知識(shí),如果還想學(xué)習(xí)更多的知識(shí),請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI