您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“ExecutorService Callable Future多線程返回結(jié)果的原理是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
在并發(fā)多線程場景下,存在需要獲取各線程的異步執(zhí)行結(jié)果,這時(shí),就可以通過ExecutorService線程池結(jié)合Callable、Future來實(shí)現(xiàn)。
public class ExecutorTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Callable callable = new MyCallable(); Future future = executor.submit(callable); System.out.println("打印線程池返回值:" + future.get()); } } class MyCallable implements Callable<String>{ @Override public String call() throws Exception { return "測試返回值"; } }
執(zhí)行完成后,會(huì)打印出以下結(jié)果:
打印線程池返回值:測試返回值
可見,線程池執(zhí)行完異步線程任務(wù),我們是可以獲取到異步線程里的返回值。
那么,ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程是如何實(shí)現(xiàn)的呢?
首先,我們需要?jiǎng)?chuàng)建一個(gè)實(shí)現(xiàn)函數(shù)式接口Callable的類,該Callable接口只定義了一個(gè)被泛型修飾的call方法,這意味著,需要返回什么類型的值可以由具體實(shí)現(xiàn)類來定義——
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
因此,我自定義了一個(gè)實(shí)現(xiàn)Callable接口的類,該類的重寫了call方法,我們在執(zhí)行多線程時(shí)希望返回什么樣的結(jié)果,就可以在該重寫的call方法定義。
class MyCallable implements Callable<String>{ @Override public String call() throws Exception { return "測試返回值"; } }
在自定義的MyCallable類中,我在call方法里設(shè)置一個(gè)很簡單的String返回值 “測試返回值”,這意味著,我是希望在線程池執(zhí)行完異步線程任務(wù)時(shí),可以返回“測試返回值”這個(gè)字符串給我。
接下來,我們就可以創(chuàng)建該MyCallable類的對象,然后通過executor.submit(callable)丟到線程池里,線程池里會(huì)利用空閑線程來幫我們執(zhí)行一個(gè)異步線程任務(wù)。
ExecutorService executor = Executors.newSingleThreadExecutor(); Callable callable = new MyCallable(); Future future = executor.submit(callable);
值得注意一點(diǎn)是,若需要實(shí)現(xiàn)獲取線程返回值的效果,只能通過executor.submit(callable)去執(zhí)行,而不能通過executor.execute(Runnable command)執(zhí)行,因?yàn)閑xecutor.execute(Runnable command)只能傳入實(shí)現(xiàn)Runnable 接口的對象,但這類對象是不具備返回線程效果的功能。
進(jìn)入到executor.submit(callable)底層,具體實(shí)現(xiàn)在AbstractExecutorService類中??梢钥吹?,執(zhí)行到submit方法內(nèi)部時(shí),會(huì)將我們傳進(jìn)來的new MyCallable()對象作為參數(shù)傳入到newTaskFor(task)方法里——
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
這個(gè)newTaskFor(task)方法內(nèi)部具體實(shí)現(xiàn),是將new MyCallable()對象傳入構(gòu)造器中,生成了一個(gè)FutureTask對象。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
這個(gè)FutureTask對象實(shí)現(xiàn)RunableFuture接口,這個(gè)RunableFuture接口又繼承了Runnable,說明FutureTask類內(nèi)部會(huì)實(shí)現(xiàn)一個(gè)run方法,然后本身就可以當(dāng)做一個(gè)Runnable線程任務(wù),借助線程Thread(new FutureTask(.....)).start()方式開啟一個(gè)新線程,去異步執(zhí)行其內(nèi)部實(shí)現(xiàn)的run方法邏輯。
public class FutureTask<V> implements RunnableFuture<V>{.....} public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
分析到這里,可以知道FutureTask的核心方法一定是run方法,線程執(zhí)行start方法后,最后會(huì)去調(diào)用FutureTask的run方法。在講解這個(gè)run方法前,我們先去看一下創(chuàng)建FutureTask的初始化構(gòu)造方法底層邏輯new FutureTask(callable)
public class FutureTask<V> implements RunnableFuture<V> { private Callable<V> callable; ......//省略其余源碼 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); //通過構(gòu)造方法初始化Callable<V> callable賦值 this.callable = callable; this.state = NEW; // ensure visibility of callable } ......//省略其余源碼 }
可以看到,F(xiàn)utureTask(Callable callable)構(gòu)造器,主要是將我們先前創(chuàng)建的new MyCallable()對象傳進(jìn)來,賦值給FutureTask內(nèi)部定義的Callable callable引用,實(shí)現(xiàn)子類對象指向父類引用。這一點(diǎn)很關(guān)鍵,這就意味著,在初始化創(chuàng)建FutureTask對象后,我們是可以通過callable.call()來調(diào)用我們自定義設(shè)置可以返回“測試返回值”的call方法,這不就是我們希望在異步線程執(zhí)行完后能夠返回的值嗎?
我們不妨猜測一下整體返數(shù)主流程,在Thread(new FutureTask(.....)).start()開啟一個(gè)線程后,當(dāng)線程獲得了CPU時(shí)間片,就會(huì)去執(zhí)行FutureTask對象里的run方法,這時(shí)run方法里可以通過callable.call()調(diào)用到我們自定義的MyCallable#call()方法,進(jìn)而得到方法返回值 “測試返回值”——到這一步,只需要將這個(gè)返回值賦值給FutureTask里某個(gè)定義的對象屬性,那么,在主線程在通過獲取FutureTask里被賦值的X對象屬性值,不就可以拿到返回字符串值 “測試返回值”了嗎?
實(shí)現(xiàn)上,主體流程確實(shí)是這樣,只不過忽略了一些細(xì)節(jié)而已。
public void run() { //如果狀態(tài)不是NEW或者設(shè)置runner為當(dāng)前線程時(shí),說明FutureTask任務(wù)已經(jīng)取消,無法繼續(xù)執(zhí)行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //在該文中,callable被賦值為指向我們定義的new MyCallable()對象引用 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //c.call最后會(huì)調(diào)用new MyCallable()的call()方法,得到字符串返回值“測試返回值”給result result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } //正常執(zhí)行完c.call()方法時(shí),ran值為true,說明會(huì)執(zhí)行set(result)方法。 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
根據(jù)以上源碼簡單分析,可以看到run方法當(dāng)中,最終確實(shí)會(huì)執(zhí)行new MyCallable()的call()方法,得到字符串返回值“測試返回值”給result,然后執(zhí)行set(result)方法,根據(jù)set方法名就不難猜出,這是一個(gè)會(huì)賦值給某個(gè)字段的方法。
這里分析會(huì)忽略一些狀態(tài)值的講解,這塊會(huì)包括線程的取消、終止等內(nèi)容,后面我會(huì)出一片專門針對FutureTask源碼分析的文章再介紹,本文主要還是介紹異步線程返回結(jié)果的主要原理。
沿著以上分析,追蹤至set(result)方法里——
protected void set(V v) { //通過CAS原子操作,將運(yùn)行的線程設(shè)置為COMPLETING,說明線程已經(jīng)執(zhí)行完成中 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //若CAS原子比較賦值成功,說明線程可以被正常執(zhí)行完成的話,然后將result結(jié)果值賦值給outcome outcome = v; //線程正常完成結(jié)束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
這個(gè)方法的主要是,若該線程執(zhí)行能夠正常完成話,就將得到的返回值賦值給outcome,這個(gè)outcome是FutureTask的一個(gè)Object變量——
private Object outcome;
至此,就完成了流程的這一步——
最后,就是執(zhí)行主線程的根據(jù)ftask.get()獲取執(zhí)行完成的值,這個(gè)get可以設(shè)置超時(shí)時(shí)間,例如 ftask.get(2,TimeUnit.SECONDS)表示超過2秒還沒有獲取到線程返回值的話,就直接結(jié)束該get方法,繼續(xù)主線程往下執(zhí)行。
System.out.println("打印線程池返回值:" + ftask.get(2,TimeUnit.SECONDS));
進(jìn)入到get方法,可以看到當(dāng)狀態(tài)在s <= COMPLETING時(shí),表示任務(wù)還沒有執(zhí)行完,就會(huì)去執(zhí)行awaitDone(false, 0L)方法,這個(gè)方法表示,將一直做死循環(huán)等待線程執(zhí)行完成,才會(huì)跳出等待循環(huán)繼續(xù)往下走。若設(shè)置了超時(shí)時(shí)間,例如ftask.get(2,TimeUnit.SECONDS)),就會(huì)在awaitDone方法循環(huán)至2秒,在2秒內(nèi)發(fā)現(xiàn)線程狀態(tài)被設(shè)置為正常完成時(shí),就會(huì)跳出循環(huán),若2秒后線程沒有執(zhí)行完成,也會(huì)強(qiáng)制跳出循環(huán)了,但這種情況將無法獲取到線程結(jié)果值。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //循環(huán)等待線程執(zhí)行狀態(tài) s = awaitDone(false, 0L); return report(s); }
最后就是report(s)方法,可以看到outcome值最終賦值給Object x,若s==NORMAL表示線程任務(wù)已經(jīng)正常完成結(jié)束,就可以根據(jù)我們定義的類型進(jìn)行泛型轉(zhuǎn)換返回,我們定義的是String字符串類型,故而會(huì)返回字符串值,也就是 “測試返回值”。
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) //返回線程任務(wù)執(zhí)行結(jié)果 return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
你看,最后就能獲取到了異步線程執(zhí)行的結(jié)果返回給main主線程——
以上就是執(zhí)行線程任務(wù)run方法后,如何將線程任務(wù)結(jié)果返回給主線程,其實(shí),還少一個(gè)地方補(bǔ)充,就是如何將FutureTask任務(wù)丟給線程執(zhí)行,我們這里用到了線程池, 但是execute(ftask)底層同樣是使用一個(gè)了線程通過執(zhí)行start方法開啟一個(gè)線程,這個(gè)新運(yùn)行的線程最終會(huì)執(zhí)行FutureTask的run方法。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
可以簡單優(yōu)化下,直接用一個(gè)線程演示該案例,這樣看著更好理解些,當(dāng)時(shí),生產(chǎn)上是不會(huì)有這樣直接用一個(gè)線程來執(zhí)行的,更多是通過原生線程池——
public static void main(String[] args) throws Exception{ Callable callable = new MyCallable(); RunnableFuture<String> ftask = new FutureTask<String>(callable); new Thread(ftask).start(); System.out.println("打印線程池返回值:" + ftask.get()); }
“ExecutorService Callable Future多線程返回結(jié)果的原理是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。