您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Java8通過(guò)CompletableFuture怎么實(shí)現(xiàn)異步回調(diào)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
CompletableFuture
是Java 8 中新增的一個(gè)類(lèi),它是對(duì)Future接口的擴(kuò)展。從下方的類(lèi)繼承關(guān)系圖中我們看到其不僅實(shí)現(xiàn)了Future接口,還有CompletionStage接口,當(dāng)Future需要顯示地完成時(shí),可以使用CompletionStage接口去支持完成時(shí)觸發(fā)的函數(shù)和操作,當(dāng)2個(gè)以上線程同時(shí)嘗試完成、異常完成、取消一個(gè)CompletableFuture時(shí),只有一個(gè)能成功。
CompletableFuture
主要作用就是簡(jiǎn)化我們異步編程的復(fù)雜性,支持函數(shù)式編程,可以通過(guò)回調(diào)的方式處理計(jì)算結(jié)果。
在java5中,JDK為我們提供了Callable和Future,使我們可以很容易的完成異步任務(wù)結(jié)果的獲取,但是通過(guò)Future的get獲取異步任務(wù)結(jié)果會(huì)導(dǎo)致主線程的阻塞,這樣在某些場(chǎng)景下是非常消耗CPU資源的,進(jìn)而Java8為我們提供了CompletableFuture,使我們無(wú)需阻塞等待,而是通過(guò)回調(diào)的方式去處理結(jié)果,并且還支持流式處理、組合異步任務(wù)等操作。
如果不熟悉Callable
和Future
的,可以看小編之前更新的這篇文章Java從源碼看異步任務(wù)計(jì)算FutureTask
下面我們就CompletableFuture 的使用進(jìn)行簡(jiǎn)單分類(lèi):
創(chuàng)建任務(wù):
supplyAsync/runAsync
異步回調(diào):
thenApply/thenAccept/thenRun
thenApplyAsync/thenAcceptAsync/thenRunAsync
exceptionally
handle/whenComplete
組合處理:
thenCombine / thenAcceptBoth / runAfterBoth
applyToEither / acceptEither / runAfterEither
thenCompose
allOf / anyOf
具體內(nèi)容請(qǐng)參照以下案例:
public static void main(String[] args) throws Exception { // 1.帶返回值的異步任務(wù)(不指定線程池,默認(rèn)ForkJoinPool.commonPool(),單核ThreadPerTaskExecutor) CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> { return 1 + 1; }); System.out.println("cf1 result: " + cf1.get()); // 2.無(wú)返回值的異步任務(wù)(不指定線程池,默認(rèn)ForkJoinPool.commonPool(),單核ThreadPerTaskExecutor) CompletableFuture cf2 = CompletableFuture.runAsync(() -> { int a = 1 + 1; }); System.out.println("cf2 result: " + cf2.get()); // 3.指定線程池的帶返回值的異步任務(wù),runAsync同理 CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> { return 1 + 1; }, Executors.newCachedThreadPool()); System.out.println("cf3 result: " + cf3.get()); // 4.回調(diào),任務(wù)執(zhí)行完成后執(zhí)行的動(dòng)作 CompletableFuture<Integer> cf4 = cf1.thenApply((result) -> { System.out.println("cf4回調(diào)拿到cf1的結(jié)果 result : " + result); return result + 1; }); System.out.println("cf4 result: " + cf4.get()); // 5.異步回調(diào)(將回調(diào)任務(wù)提交到線程池),任務(wù)執(zhí)行完成后執(zhí)行的動(dòng)作后異步執(zhí)行 CompletableFuture<Integer> cf5 = cf1.thenApplyAsync((result) -> { System.out.println("cf5回調(diào)拿到cf1的結(jié)果 result : " + result); return result + 1; }); System.out.println("cf5 result: " + cf5.get()); // 6.回調(diào)(同thenApply但無(wú)返回結(jié)果),任務(wù)執(zhí)行完成后執(zhí)行的動(dòng)作 CompletableFuture cf6 = cf1.thenAccept((result) -> { System.out.println("cf6回調(diào)拿到cf1的結(jié)果 result : " + result); }); System.out.println("cf6 result: " + cf6.get()); // 7.回調(diào)(同thenAccept但無(wú)入?yún)?,任務(wù)執(zhí)行完成后執(zhí)行的動(dòng)作 CompletableFuture cf7 = cf1.thenRun(() -> { }); System.out.println("cf7 result: " + cf7.get()); // 8.異?;卣{(diào),任務(wù)執(zhí)行出現(xiàn)異常后執(zhí)行的動(dòng)作 CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("出現(xiàn)異常"); }); CompletableFuture<Integer> cf8 = cf.exceptionally((result) -> { return -1; }); System.out.println("cf8 result: " + cf8.get()); // 9.當(dāng)某個(gè)任務(wù)執(zhí)行完成后執(zhí)行的回調(diào)方法,會(huì)將執(zhí)行結(jié)果或者執(zhí)行期間拋出的異常傳遞給回調(diào)方法 // 如果是正常執(zhí)行則異常為null,回調(diào)方法對(duì)應(yīng)的CompletableFuture的result和該任務(wù)一致; // 如果該任務(wù)正常執(zhí)行,則get方法返回執(zhí)行結(jié)果,如果是執(zhí)行異常,則get方法拋出異常。 CompletableFuture<Integer> cf9 = cf1.handle((a, b) -> { if (b != null) { b.printStackTrace(); } return a; }); System.out.println("cf9 result: " + cf9.get()); // 10 與handle類(lèi)似,無(wú)返回值 try { CompletableFuture<Integer> cf10 = cf.whenComplete((a, b) -> { if (b != null) { b.printStackTrace(); } }); System.out.println("cf10 result: " + cf10.get()); } catch (Exception e) { System.out.println("cf10 出現(xiàn)異常?。?!"); } // 11 組合處理(兩個(gè)都完成,然后執(zhí)行)有入?yún)?,有返回? CompletableFuture<Integer> cf11 = cf1.thenCombine(cf3, (r1, r2) -> { return r1 + r2; }); System.out.println("cf11 result: " + cf11.get()); // 12 組合處理(兩個(gè)都完成,然后執(zhí)行)有入?yún)?,無(wú)返回值 CompletableFuture cf12 = cf1.thenAcceptBoth(cf3, (r1, r2) -> { }); System.out.println("cf12 result: " + cf12.get()); // 13 組合處理(兩個(gè)都完成,然后執(zhí)行)無(wú)入?yún)?,無(wú)返回值 CompletableFuture cf13 = cf1.runAfterBoth(cf3, () -> { }); System.out.println("cf13 result: " + cf13.get()); // 14 組合處理(有一個(gè)完成,然后執(zhí)行)有入?yún)?,有返回? CompletableFuture<Integer> cf14 = cf1.applyToEither(cf3, (r) -> { return r; }); System.out.println("cf14 result: " + cf14.get()); // 15 組合處理(有一個(gè)完成,然后執(zhí)行)有入?yún)?,無(wú)返回值 CompletableFuture cf15 = cf1.acceptEither(cf3, (r) -> { }); System.out.println("cf15 result: " + cf15.get()); // 16 組合處理(有一個(gè)完成,然后執(zhí)行)無(wú)入?yún)?,無(wú)返回值 CompletableFuture cf16 = cf1.runAfterEither(cf3, () -> { }); System.out.println("cf16 result: " + cf16.get()); // 17 方法執(zhí)行后返回一個(gè)新的CompletableFuture CompletableFuture<Integer> cf17 = cf1.thenCompose((r) -> { return CompletableFuture.supplyAsync(() -> { return 1 + 1; }); }); System.out.println("cf17 result: " + cf17.get()); // 18 多個(gè)任務(wù)都執(zhí)行成功才會(huì)繼續(xù)執(zhí)行 CompletableFuture.allOf(cf1,cf2,cf3).whenComplete((r, t) -> { System.out.println(r); }); // 18 多個(gè)任務(wù)任意一個(gè)執(zhí)行成功就會(huì)繼續(xù)執(zhí)行 CompletableFuture.anyOf(cf1,cf2,cf3).whenComplete((r, t) -> { System.out.println(r); }); }
首先我們可以從注釋中看到,它對(duì)CompletionStage
、Future
接口擴(kuò)展的一些描述,這些也是它的一些重點(diǎn)。
除了直接操作狀態(tài)和結(jié)果的相關(guān)方法外,CompletableFuture
還實(shí)現(xiàn)了CompletionStage
接口的如下策略:
(1)為非異步方法的依賴完成提供的操作,可以由完成當(dāng)前CompletableFuture
的線程執(zhí)行,也可以由完成方法的任何其他調(diào)用方執(zhí)行。
(2)所有沒(méi)有顯式Executor參數(shù)的異步方法都使用ForkJoinPool.commonPool()
執(zhí)行(除非它不支持至少兩個(gè)并行級(jí)別,在這種情況下,將創(chuàng)建一個(gè)新線程來(lái)運(yùn)行每個(gè)任務(wù))。為了簡(jiǎn)化監(jiān)視、調(diào)試和跟蹤,所有生成的異步任務(wù)都是CompletableFuture的實(shí)例,異步完成任務(wù)。
不了解ForkJoinPool
的可以閱讀小編之前更新的這篇文章一文帶你了解Java中的ForkJoin。
(3)所有CompletionStage
方法都是獨(dú)立于其他公共方法實(shí)現(xiàn)的,因此一個(gè)方法的行為不會(huì)受到子類(lèi)中其他方法重寫(xiě)的影響。
CompletableFuture實(shí)現(xiàn)了Future接口的如下策略:
因?yàn)椋ㄅcFutureTask不同)這個(gè)類(lèi)對(duì)導(dǎo)致它完成的計(jì)算沒(méi)有直接控制權(quán),所以取消被視為另一種形式的異常完成,所以cancel操作被視為是另一種異常完成形式(new CancellationException()具有相同的效果。)。方法isCompletedExceptionally()
可以用來(lái)確定一個(gè)CompletableFuture是否以任何異常的方式完成。
如果異常完成時(shí)出現(xiàn)CompletionException
,方法get()和get(long,TimeUnit)會(huì)拋出一個(gè)ExecutionException
,其原因與相應(yīng)CompletionException中的原因相同。為了簡(jiǎn)化在大多數(shù)上下文中的使用,該類(lèi)還定義了join()和getNow()方法,在這些情況下直接拋出CompletionException。
我們先看一下CompletableFuture
是如何創(chuàng)建異步任務(wù)的,我們可以看到起創(chuàng)建異步任務(wù)的核心實(shí)現(xiàn)是兩個(gè)入?yún)?,一個(gè)入?yún)⑹荅xecutor,另一個(gè)入?yún)⑹荢upplier(函數(shù)式編程接口)。其中也提供了一個(gè)入?yún)⒌闹剌d,一個(gè)入?yún)⒌闹剌d方法會(huì)獲取默認(rèn)的Executor,當(dāng)系統(tǒng)是單核的會(huì)使用ThreadPerTaskExecutor,多核時(shí)使用ForkJoinPool.commonPool()
。
注意:這里默認(rèn)ForkJoinPool.commonPool()線程池,如果所有異步任務(wù)都使用該線程池話,出現(xiàn)問(wèn)題不容易定位,如果長(zhǎng)時(shí)間占用該線程池可能影響其他業(yè)務(wù)的正常操作,stream的并行流也是使用的該線程池。
其中還封裝了靜態(tài)內(nèi)部類(lèi)AsyncSupply
,該類(lèi)代表這個(gè)異步任務(wù),實(shí)現(xiàn)了Runnable
,重寫(xiě)了run方法。
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; } /** * 靜態(tài)內(nèi)部類(lèi),繼承了ForkJoinTask<Void>、實(shí)現(xiàn)了Runnable、AsynchronousCompletionTask */ static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { CompletableFuture<T> dep; Supplier<T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { this.dep = dep; this.fn = fn; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) {} public final boolean exec() { run(); return true; } public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } } }
Supplier類(lèi)是一個(gè)函數(shù)式的接口,@FunctionalInterface
注解就是函數(shù)式編程的標(biāo)記。
package java.util.function; @FunctionalInterface public interface Supplier<T> { T get(); }
異步任務(wù)回調(diào),我們以thenApply/thenApplyAsync
為例來(lái)看一下其實(shí)現(xiàn)原理,方法名含有Async的會(huì)傳入asyncPool。uniApplyStage
方法通過(guò)判斷e是否有值,來(lái)區(qū)分是從哪個(gè)方法進(jìn)來(lái)的。thenApply
不會(huì)傳入 Executor,它優(yōu)先讓當(dāng)前線程來(lái)執(zhí)行后續(xù) stage 的任務(wù)。
當(dāng)發(fā)現(xiàn)前一個(gè) stage 已經(jīng)執(zhí)行完畢時(shí),直接讓當(dāng)前線程來(lái)執(zhí)行后續(xù) stage 的 task。
當(dāng)發(fā)現(xiàn)前一個(gè) stage 還沒(méi)執(zhí)行完畢時(shí),則把當(dāng)前 stage 包裝成一個(gè) UniApply 對(duì)象,放到前一個(gè) stage 的棧中。執(zhí)行前一個(gè) stage 的線程,執(zhí)行完畢后,接著執(zhí)行后續(xù) stage 的 task。
thenApplyAsync
會(huì)傳入一個(gè) Executor,它總是讓 Executor 線程池里面的線程來(lái)執(zhí)行后續(xù) stage 的任務(wù)。
把當(dāng)前 stage 包裝成一個(gè) UniApply 對(duì)象,放到前一個(gè) stage 的棧中,直接讓 Executor 來(lái)執(zhí)行。
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); // Async直接進(jìn)入,不是Async執(zhí)行uniApply嘗試獲取結(jié)果 if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; } final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) { Object r; Throwable x; // 判斷當(dāng)前CompletableFuture是否已完成,如果沒(méi)完成則返回false;如果完成了則執(zhí)行下面的邏輯。 if (a == null || (r = a.result) == null || f == null) return false; tryComplete: if (result == null) { // 判斷任務(wù)結(jié)果是否是AltResult類(lèi)型 if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } try { // 判斷當(dāng)前任務(wù)是否可以執(zhí)行 if (c != null && !c.claim()) return false; // 獲取任務(wù)結(jié)果 @SuppressWarnings("unchecked") S s = (S) r; // 執(zhí)行 completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true; } static final class UniApply<T,V> extends UniCompletion<T,V> { Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } } final void push(UniCompletion<?,?> c) { if (c != null) { while (result == null && !tryPushStack(c)) lazySetNext(c, null); // clear on failure } } final boolean completeValue(T t) { return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t); }
我們?cè)?code>thenCombine方法為例看一下CompletableFuture
是如何處理組合任務(wù)的,我們可以看到thenCombine的源碼與thenApply的源碼基本上是一直的,只不過(guò)組合的時(shí)候不僅僅是判斷一個(gè),需要集合具體場(chǎng)景,判斷多個(gè)CompletableFuture
。
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } private <U,V> CompletableFuture<V> biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.biApply(this, b, f, null)) { BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; } final <R,S> boolean biApply(CompletableFuture<R> a, CompletableFuture<S> b, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c) { Object r, s; Throwable x; // 此處不止要判斷a還得判斷b if (a == null || (r = a.result) == null || b == null || (s = b.result) == null || f == null) return false; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } // 這里不止判斷a的結(jié)果r還要判斷b的結(jié)果s if (s instanceof AltResult) { if ((x = ((AltResult)s).ex) != null) { completeThrowable(x, s); break tryComplete; } s = null; } // 最后將rr, ss傳入 try { if (c != null && !c.claim()) return false; @SuppressWarnings("unchecked") R rr = (R) r; @SuppressWarnings("unchecked") S ss = (S) s; completeValue(f.apply(rr, ss)); } catch (Throwable ex) { completeThrowable(ex); } } return true; } static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn) { super(executor, dep, src, snd); this.fn = fn; } // tryFire方法也同樣的多可個(gè)b final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; CompletableFuture<U> b; if ((d = dep) == null || !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) return null; dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } }
“Java8通過(guò)CompletableFuture怎么實(shí)現(xiàn)異步回調(diào)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。