溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Java多線程工具CompletableFuture怎么使用

發(fā)布時(shí)間:2022-08-25 17:00:43 來(lái)源:億速云 閱讀:219 作者:iii 欄目:開(kāi)發(fā)技術(shù)

本文小編為大家詳細(xì)介紹“Java多線程工具CompletableFuture怎么使用”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“Java多線程工具CompletableFuture怎么使用”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來(lái)學(xué)習(xí)新知識(shí)吧。

    前言

    Future的問(wèn)題

    寫(xiě)多線程程序的時(shí)候,可以使用Future從一個(gè)異步線程中拿到結(jié)果,但是如果使用過(guò)程中會(huì)發(fā)現(xiàn)一些問(wèn)題:

    • 如果想要對(duì)Future的結(jié)果做進(jìn)一步的操作,需要阻塞當(dāng)前線程

    • 多個(gè)Future不能被鏈?zhǔn)降膱?zhí)行,每個(gè)Future的結(jié)果都是獨(dú)立的,期望對(duì)一個(gè)Future的結(jié)果做另外一件異步的事情;

    • 沒(méi)有異常處理策略,如果Future執(zhí)行失敗了,需要手動(dòng)捕捉

    CompletableFuture應(yīng)運(yùn)而生

    為了解決Future問(wèn)題,JDK在1.8的時(shí)候給我們提供了一個(gè)好用的工具類(lèi)CompletableFuture;

    它實(shí)現(xiàn)了Future和CompletionStage接口,針對(duì)Future的不足之處給出了相應(yīng)的處理方式。

    • 在異步線程執(zhí)行結(jié)束后可以自動(dòng)回調(diào)我們新的處理邏輯,無(wú)需阻塞

    • 可以對(duì)多個(gè)異步任務(wù)進(jìn)行編排,組合或者排序

    • 異常處理

    CompletableFuture的核心思想是將每個(gè)異步任務(wù)都可以看做一個(gè)步驟(CompletionStage),然后其他的異步任務(wù)可以根據(jù)這個(gè)步驟做一些想做的事情。

    CompletionStage定義了許多步驟處理的方法,功能非常強(qiáng)大,這里就只列一下日常中常用到的一些方法供大家參考。

    使用方式

    基本使用-提交異步任務(wù)

    簡(jiǎn)單的使用方式

    異步執(zhí)行,無(wú)需結(jié)果:

    // 可以執(zhí)行Executors異步執(zhí)行,如果不指定,默認(rèn)使用ForkJoinPool
    CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));

    異步執(zhí)行,同時(shí)返回結(jié)果:

    // 同樣可以指定線程池
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!");
    System.out.println(stringCompletableFuture.get());

    處理上個(gè)異步任務(wù)結(jié)果

    • thenRun: 不需要上一步的結(jié)果,直接直接新的操作

    • thenAccept:獲取上一步異步處理的內(nèi)容,進(jìn)行新的操作

    • thenApply: 獲取上一步的內(nèi)容,然后產(chǎn)生新的內(nèi)容

    所有加上Async后綴的,代表新的處理操作仍然是異步的。Async的操作都可以指定Executors進(jìn)行處理

    Java多線程工具CompletableFuture怎么使用

    // Demo
           CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    // 針對(duì)上一步的結(jié)果做處理,產(chǎn)生新的結(jié)果
                    .thenApplyAsync(s -> s.toUpperCase())
                    // 針對(duì)上一步的結(jié)果做處理,不返回結(jié)果
                    .thenAcceptAsync(s -> System.out.println(s))
                    // 不需要上一步返回的結(jié)果,直接進(jìn)行操作
                    .thenRunAsync(() -> System.out.println("end"));
            ;

    對(duì)兩個(gè)結(jié)果進(jìn)行選用-acceptEither

    當(dāng)我們有兩個(gè)回調(diào)在處理的時(shí)候,任何完成都可以使用,兩者結(jié)果沒(méi)有關(guān)系,那么使用acceptEither。

    兩個(gè)異步線程誰(shuí)先執(zhí)行完成,用誰(shuí)的結(jié)果,其余類(lèi)型的方法也是如此。

    Java多線程工具CompletableFuture怎么使用

    Java多線程工具CompletableFuture怎么使用

    // 返回abc
    CompletableFuture
                    .supplyAsync(() -> {
                        SleepUtils.sleep(100);
                        return "Hello CompletableFuture!";
                    })
                    .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            System.out.println(s);
                        }
                    });
    // 返回Hello CompletableFuture!       
    CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .acceptEither(CompletableFuture.supplyAsync(() -> {
                        SleepUtils.sleep(100);
                        return "abc";
                    }), new Consumer<String>() {
                        @Override
                        public void accept(String s) {
                            System.out.println(s);
                        }
                    });

    對(duì)兩個(gè)結(jié)果進(jìn)行合并-thenCombine, thenAcceptBoth

    thenCombine

    當(dāng)我們有兩個(gè)CompletionStage時(shí),需要對(duì)兩個(gè)的結(jié)果進(jìn)行整合處理,然后計(jì)算得出一個(gè)新的結(jié)果。

    • thenCompose是對(duì)上一個(gè)CompletionStage的結(jié)果進(jìn)行處理,返回結(jié)果,并且返回類(lèi)型必須是CompletionStage。

    • thenCombine是得到第一個(gè)CompletionStage的結(jié)果,然后拿到當(dāng)前的CompletionStage,兩者的結(jié)果進(jìn)行處理。

            CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172);
    
            CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65)
                    .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() {
                        @Override
                        public Double apply(Integer wight, Integer height) {
                            return wight * 10000.0 / (height * height);
                        }
                    })
                    ;

    thenAcceptBoth

    需要兩個(gè)異步CompletableFuture的結(jié)果,兩者都完成的時(shí)候,才進(jìn)入thenAcceptBoth回調(diào)。

    Java多線程工具CompletableFuture怎么使用

    Java多線程工具CompletableFuture怎么使用

    // thenAcceptBoth案例:
            CompletableFuture
                    .supplyAsync(() -> "Hello CompletableFuture!")
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() {
                    		// 參數(shù)一為我們剛開(kāi)始運(yùn)行時(shí)的CompletableStage,新傳入的作為第二個(gè)參數(shù)
                        @Override
                        public void accept(String s, String s2) {
                            System.out.println("param1=" + s + ", param2=" + s2);
                        }
                    });
    // 結(jié)果:param1=Hello CompletableFuture!, param2=abc

    異常處理

    當(dāng)我們使用CompleteFuture進(jìn)行鏈?zhǔn)秸{(diào)用的時(shí)候,多個(gè)異步回調(diào)中,如果有一個(gè)執(zhí)行出現(xiàn)問(wèn)題,那么接下來(lái)的回調(diào)都會(huì)停止,所以需要一種異常處理策略。

    exceptionally

    exceptionally是當(dāng)出現(xiàn)錯(cuò)誤時(shí),給我們機(jī)會(huì)進(jìn)行恢復(fù),自定義返回內(nèi)容。

            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("發(fā)生錯(cuò)誤");
            }).exceptionally(throwable -> {
                log.error("調(diào)用錯(cuò)誤 {}", throwable.getMessage(), throwable);
                return "異常處理內(nèi)容";
            });

    handle

    exceptionally是只有發(fā)生異常時(shí)才會(huì)執(zhí)行,而handle則是不管是否發(fā)生錯(cuò)誤都會(huì)執(zhí)行。

    CompletableFuture.supplyAsync(() -> {
        return "abc";
    })
    .handle((r,err) -> {
        log.error("調(diào)用錯(cuò)誤 {}", err.getMessage(), err);
        // 對(duì)結(jié)果做額外的處理
        return r;
    })
    ;

    案例

    大量用戶(hù)發(fā)送短信|消息

    需求為對(duì)某個(gè)表中特定條件的用戶(hù)進(jìn)行短信通知,但是短信用戶(hù)有成百上千萬(wàn),如果使用單線程讀取效率會(huì)很慢。這個(gè)時(shí)候可以考慮使用多線程的方式進(jìn)行讀??;

    1、將讀取任務(wù)拆分為多個(gè)不同的子任務(wù),指定讀取的偏移量和個(gè)數(shù)

      // 假設(shè)有500萬(wàn)條記錄
            long recordCount = 500 * 10000;
            int subTaskRecordCount = 10000;
            // 對(duì)記錄進(jìn)行分片
            List<Map> subTaskList = new LinkedList<>();
            for (int i = 0; i < recordCount / 500; i++) {
                // 如果子任務(wù)結(jié)構(gòu)復(fù)雜,建議使用對(duì)象
                HashMap<String, Integer> subTask = new HashMap<>();
                subTask.put("index", i);
                subTask.put("offset", i * subTaskRecordCount);
                subTask.put("count", subTaskRecordCount);
                subTaskList.add(subTask);
            }

    2、使用多線程進(jìn)行批量讀取

      // 進(jìn)行subTask批量處理,拆分為不同的任務(wù)
            subTaskList.stream()
                    .map(subTask -> CompletableFuture.runAsync(()->{
                        // 讀取數(shù)據(jù),然后處理
                        // dataTunel.read(subTask);
                    },excuturs))   // 使用應(yīng)用的通用任務(wù)線程池
                    .map(c -> ((CompletableFuture<?>) c).join());

    3、進(jìn)行業(yè)務(wù)邏輯處理,或者直接在讀取完進(jìn)行業(yè)務(wù)邏輯處理也是可以;

    并發(fā)獲取商品不同信息

    在系統(tǒng)拆分比較細(xì)的時(shí)候,價(jià)格,優(yōu)惠券,庫(kù)存,商品詳情等信息分散在不同的系統(tǒng)中,有時(shí)候需要同時(shí)獲取商品的所有信息, 有時(shí)候可能只需要獲取商品的部分信息。

    當(dāng)然問(wèn)題點(diǎn)在于要調(diào)用多個(gè)不同的系統(tǒng),需要將RT降低下來(lái),那么需要進(jìn)行并發(fā)調(diào)用;

         List<Task> taskList = new ArrayList<>();
            List<Object> result = taskList.stream()
                    .map(task -> CompletableFuture.supplyAsync(()->{
    //                    handlerMap.get(task).query();
                        return "";
                    }, executorService))
                    .map(c -> c.join())
                    .collect(Collectors.toList());

    問(wèn)題

    thenRun和thenRunAsync有什么區(qū)別

    • 如果不使用傳入的線程池,大家用默認(rèn)的線程池ForkJoinPool

    • thenRun用的默認(rèn)和上一個(gè)任務(wù)使用相同的線程池

    • thenRunAsync在執(zhí)行新的任務(wù)的時(shí)候可以接受傳入一個(gè)新的線程池,使用新的線程池執(zhí)行任務(wù);

    handle和exceptional有什么區(qū)別

    exceptionally是只有發(fā)生異常時(shí)才會(huì)執(zhí)行,而handle則是不管是否發(fā)生錯(cuò)誤都會(huì)執(zhí)行。

    讀到這里,這篇“Java多線程工具CompletableFuture怎么使用”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過(guò)才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI