您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“并發(fā)編程之如何理解Future&FutureTask”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
前言
Java線程實(shí)現(xiàn)方式主要有四種:
繼承Thread類
實(shí)現(xiàn)Runnable接口
實(shí)現(xiàn)Callable接口通過FutureTask包裝器來創(chuàng)建Thread線程
使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程。
其中前兩種方式線程執(zhí)行完后都沒有返回值,后兩種是帶返回值的。
Callable 和 Runnable 接口
Runnable接口
// 實(shí)現(xiàn)Runnable接口的類將被Thread執(zhí)行,表示一個基本的任務(wù) public interface Runnable { // run方法就是它所有的內(nèi)容,就是實(shí)際執(zhí)行的任務(wù) public abstract void run(); }
沒有返回值
run 方法沒有返回值,雖然有一些別的方法也能實(shí)現(xiàn)返回值得效果,比如編寫日志文件或者修改共享變量等等,但是不僅容易出錯,效率也不高。
不能拋出異常
public class RunThrowExceptionDemo { /** * 普通方法可以在方法簽名中拋出異常 * * @throws IOException */ public void normalMethod() throws IOException { throw new IOException(); } class RunnableImpl implements Runnable { /** * run 方法內(nèi)無法拋出 checked Exception,除非使用 try catch 進(jìn)行處理 */ @Override public void run() { try { throw new IOException(); } catch (IOException e) { e.printStackTrace(); } } } }
可以看到普通方法 normalMethod 可以在方法簽名上拋出異常,這樣上層接口就可以捕獲這個異常進(jìn)行處理,但是實(shí)現(xiàn) Runnable 接口的類,run 方法無法拋出 checked Exception,只能在方法內(nèi)使用 try catch 進(jìn)行處理,這樣上層就無法得知線程中的異常。
設(shè)計導(dǎo)致
其實(shí)這兩個缺陷主要原因就在于 Runnable 接口設(shè)計的 run 方法,這個方法已經(jīng)規(guī)定了 run() 方法的返回類型是 void,而且這個方法沒有聲明拋出任何異常。所以,當(dāng)實(shí)現(xiàn)并重寫這個方法時,我們既不能改返回值類型,也不能更改對于異常拋出的描述,因?yàn)樵趯?shí)現(xiàn)方法的時候,語法規(guī)定是不允許對這些內(nèi)容進(jìn)行修改的。
Runnable 為什么設(shè)計成這樣?
假設(shè) run() 方法可以返回返回值,或者可以拋出異常,也無濟(jì)于事,因?yàn)槲覀儾]有辦法在外層捕獲并處理,這是因?yàn)檎{(diào)用 run() 方法的類(比如 Thread 類和線程池)是 Java 直接提供的,而不是我們編寫的。 所以就算它能有一個返回值,我們也很難把這個返回值利用到,而 Callable 接口就是為了解決這兩個問題。
Callable接口
public interface Callable<V> { //返回接口,或者拋出異常 V call() throws Exception; }
可以看到 Callable 和 Runnable 接口其實(shí)比較相似,都只有一個方法,也就是線程任務(wù)執(zhí)行的方法,區(qū)別就是 call 方法有返回值,而且聲明了 throws Exception。
Callable 和 Runnable 的不同之處
方法名 :Callable 規(guī)定的執(zhí)行方法是 call(),而 Runnable 規(guī)定的執(zhí)行方法是 run();
返回值 :Callable 的任務(wù)執(zhí)行后有返回值,而 Runnable 的任務(wù)執(zhí)行后是沒有返回值的;
拋出異常 :call() 方法可拋出異常,而 run() 方法是不能拋出受檢查異常的;
與 Callable 配合的有一個 Future 接口,通過 Future 可以了解任務(wù)執(zhí)行情況,或者取消任務(wù)的執(zhí)行,還可獲取任務(wù)執(zhí)行的結(jié)果,這些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 強(qiáng)大。
Future接口
Future的作用
簡單來說就是利用線程達(dá)到異步的效果,同時還可以獲取子線程的返回值。
比如當(dāng)做一定運(yùn)算的時候,運(yùn)算過程可能比較耗時,有時會去查數(shù)據(jù)庫,或是繁重的計算,比如壓縮、加密等,在這種情況下,如果我們一直在原地等待方法返回,顯然是不明智的,整體程序的運(yùn)行效率會大大降低。
我們可以把運(yùn)算的過程放到子線程去執(zhí)行,再通過 Future 去控制子線程執(zhí)行的計算過程,最后獲取到計算結(jié)果。這樣一來就可以把整個程序的運(yùn)行效率提高,是一種異步的思想。
Future的方法
Future 接口一共有5個方法,源代碼如下:
public interface Future<V> { /** * 嘗試取消任務(wù),如果任務(wù)已經(jīng)完成、已取消或其他原因無法取消,則失敗。 * 1、如果任務(wù)還沒開始執(zhí)行,則該任務(wù)不應(yīng)該運(yùn)行 * 2、如果任務(wù)已經(jīng)開始執(zhí)行,由參數(shù)mayInterruptIfRunning來決定執(zhí)行該任務(wù)的線程是否應(yīng)該被中斷,這只是終止任務(wù)的一種嘗試。若mayInterruptIfRunning為true,則會立即中斷執(zhí)行任務(wù)的線程并返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務(wù)執(zhí)行線程。 * 3、調(diào)用這個方法后,以后對isDone方法調(diào)用都返回true。 * 4、如果這個方法返回true,以后對isCancelled返回true。 */ boolean cancel(boolean mayInterruptIfRunning); /** * 判斷任務(wù)是否被取消了,如果調(diào)用了cance()則返回true */ boolean isCancelled(); /** * 如果任務(wù)完成,則返回ture * 任務(wù)完成包含正常終止、異常、取消任務(wù)。在這些情況下都返回true */ boolean isDone(); /** * 線程阻塞,直到任務(wù)完成,返回結(jié)果 * 如果任務(wù)被取消,則引發(fā)CancellationException * 如果當(dāng)前線程被中斷,則引發(fā)InterruptedException * 當(dāng)任務(wù)在執(zhí)行的過程中出現(xiàn)異常,則拋出ExecutionException */ V get() throws InterruptedException, ExecutionException; /** * 線程阻塞一定時間等待任務(wù)完成,并返回任務(wù)執(zhí)行結(jié)果,如果則超時則拋出TimeoutException */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
get方法(獲取結(jié)果)
get 方法最主要的作用就是獲取任務(wù)執(zhí)行的結(jié)果,該方法在執(zhí)行時的行為取決于 Callable 任務(wù)的狀態(tài),可能會發(fā)生以下 7 種情況。
任務(wù)已經(jīng)執(zhí)行完,執(zhí)行 get 方法可以立刻返回,獲取到任務(wù)執(zhí)行的結(jié)果。
任務(wù)還沒有開始執(zhí)行,比如我們往線程池中放一個任務(wù),線程池中可能積壓了很多任務(wù),還沒輪到我去執(zhí)行的時候,就去 get 了,在這種情況下,相當(dāng)于任務(wù)還沒開始,我們?nèi)フ{(diào)用 get 的時候,會當(dāng)前的線程阻塞,直到任務(wù)完成再把結(jié)果返回回來。
任務(wù)正在執(zhí)行中,但是執(zhí)行過程比較長,所以我去 get 的時候,它依然在執(zhí)行的過程中。這種情況調(diào)用 get 方法也會阻塞當(dāng)前線程,直到任務(wù)執(zhí)行完返回結(jié)果。
任務(wù)執(zhí)行過程中拋出異常,我們再去調(diào)用 get 的時候,就會拋出 ExecutionException 異常,不管我們執(zhí)行 call 方法時里面拋出的異常類型是什么,在執(zhí)行 get 方法時所獲得的異常都是 ExecutionException。
任務(wù)被取消了,如果任務(wù)被取消,我們用 get 方法去獲取結(jié)果時則會拋出 CancellationException。
任務(wù)被中斷了,如果任務(wù)被當(dāng)前線程中斷,我們用 get 方法去獲取結(jié)果時則會拋出InterruptedException。
任務(wù)超時,我們知道 get 方法有一個重載方法,那就是帶延遲參數(shù)的,調(diào)用了這個帶延遲參數(shù)的 get 方法之后,如果 call 方法在規(guī)定時間內(nèi)正常順利完成了任務(wù),那么 get 會正常返回;但是如果到達(dá)了指定時間依然沒有完成任務(wù),get 方法則會拋出 TimeoutException,代表超時了。
參考示例:
package com.niuh.future; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(new FutureTask()); try { Integer res = future.get(2000, TimeUnit.MILLISECONDS); System.out.println("Future線程返回值:" + res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } static class FutureTask implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(new Random().nextInt(3000)); return new Random().nextInt(10); } } }
isDone方法(判斷是否執(zhí)行完畢)
isDone() 方法,該方法是用來判斷當(dāng)前這個任務(wù)是否執(zhí)行完畢了
package com.niuh.future; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureIsDoneDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(new FutureTask()); try { for (int i = 0; i < 3; i++) { Thread.sleep(1000); System.out.println("線程是否完成:" + future.isDone()); } Integer res = future.get(2000, TimeUnit.MILLISECONDS); System.out.println("Future 線程返回值:" + res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } static class FutureTask implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(2000); return new Random().nextInt(10); } } }
執(zhí)行結(jié)果:
線程是否完成:false 線程是否完成:false 線程是否完成:true Future 線程返回值:9
可以看到前兩次 isDone 方法的返回結(jié)果是 false,因?yàn)榫€程任務(wù)還沒有執(zhí)行完成,第三次 isDone 方法的返回結(jié)果是 ture。
注意:這個方法返回 true 則代表執(zhí)行完成了,返回 false 則代表還沒完成。但返回 true,并不代表這個任務(wù)是成功執(zhí)行的,比如說任務(wù)執(zhí)行到一半拋出了異常。那么在這種情況下,對于這個 isDone 方法而言,它其實(shí)也是會返回 true 的,因?yàn)閷λ鼇碚f,雖然有異常發(fā)生了,但是這個任務(wù)在未來也不會再被執(zhí)行,它確實(shí)已經(jīng)執(zhí)行完畢了。所以 isDone 方法在返回 true 的時候,不代表這個任務(wù)是成功執(zhí)行的,只代表它執(zhí)行完畢了。
我們將上面的示例稍作修改再來看下結(jié)果,修改 FutureTask 代碼如下:
static class FutureTask implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(2000); throw new Exception("故意拋出異常"); } }
執(zhí)行結(jié)果:
雖然拋出了異常,但是 isDone 方法的返回結(jié)果依然是 ture。
這段代碼說明了:
即便任務(wù)拋出異常,isDone 方法依然會返回 true。
雖然拋出的異常是 IllegalArgumentException,但是對于 get 而言,它拋出的異常依然是 ExecutionException。
雖然在任務(wù)執(zhí)行到2秒的時候就拋出了異常,但是真正要等到我們執(zhí)行 get 的時候,才看到了異常。
cancel方法(取消任務(wù)的執(zhí)行)
如果不想執(zhí)行某個任務(wù)了,則可以使用 cancel 方法,會有以下三種情況:
第一種情況最簡單,那就是當(dāng)任務(wù)還沒有開始執(zhí)行時,一旦調(diào)用 cancel,這個任務(wù)就會被正常取消,未來也不會被執(zhí)行,那么 cancel 方法返回 true。
第二種情況也比較簡單。如果任務(wù)已經(jīng)完成,或者之前已經(jīng)被取消過了,那么執(zhí)行 cancel 方法則代表取消失敗,返回 false。因?yàn)槿蝿?wù)無論是已完成還是已經(jīng)被取消過了,都不能再被取消了。
第三種情況比較特殊,就是這個任務(wù)正在執(zhí)行,這個時候執(zhí)行 cancel 方法是不會直接取消這個任務(wù)的,而是會根據(jù)我們傳入的參數(shù)做判斷。cancel 方法是必須傳入一個參數(shù),該參數(shù)叫作 mayInterruptIfRunning,它是什么含義呢?如果傳入的參數(shù)是 true,執(zhí)行任務(wù)的線程就會收到一個中斷的信號,正在執(zhí)行的任務(wù)可能會有一些處理中斷的邏輯,進(jìn)而停止,這個比較好理解。如果傳入的是 false 則就代表不中斷正在運(yùn)行的任務(wù),也就是說,本次 cancel 不會有任何效果,同時 cancel 方法會返回 false。
參考示例:
package com.niuh.future; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureCancelDemo { static ExecutorService executorService = Executors.newSingleThreadExecutor(); public static void main(String[] args) { // 當(dāng)任務(wù)還沒有開始執(zhí)行 // demo1(); // 如果任務(wù)已經(jīng)執(zhí)行完 // demo2(); // 如果任務(wù)正在進(jìn)行中 demo3(); } private static void demo1() { for (int i = 0; i < 1000; i++) { executorService.submit(new FutureTask()); } Future<String> future = executorService.submit(new FutureTask()); try { boolean cancel = future.cancel(false); System.out.println("Future 任務(wù)是否被取消:" + cancel); String res = future.get(2000, TimeUnit.MILLISECONDS); System.out.println("Future 線程返回值:" + res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { executorService.shutdown(); } } private static void demo2() { Future<String> future = executorService.submit(new FutureTask()); try { Thread.sleep(1000); boolean cancel = future.cancel(false); System.out.println("Future 任務(wù)是否被取消:" + cancel); } catch (InterruptedException e) { e.printStackTrace(); } finally { executorService.shutdown(); } } private static void demo3() { Future<String> future = executorService.submit(new FutureInterruptTask()); try { Thread.sleep(1000); boolean cancel = future.cancel(true); System.out.println("Future 任務(wù)是否被取消:" + cancel); } catch (InterruptedException e) { e.printStackTrace(); } finally { executorService.shutdown(); } } static class FutureTask implements Callable<String> { @Override public String call() throws Exception { return "正常返回"; } } static class FutureInterruptTask implements Callable<String> { @Override public String call() throws Exception { while (!Thread.currentThread().isInterrupted()) { System.out.println("循環(huán)執(zhí)行"); Thread.sleep(500); } System.out.println("線程被中斷"); return "正常返回"; } } }
這里,我們來分析下第三種情況(任務(wù)正在進(jìn)行中),當(dāng)我們設(shè)置 true 時,線程停止
循環(huán)執(zhí)行 循環(huán)執(zhí)行 Future 任務(wù)是否被取消:true
當(dāng)我們設(shè)置 false 時,任務(wù)雖然也被取消成功,但是線程依然執(zhí)行。
循環(huán)執(zhí)行 循環(huán)執(zhí)行 Future 任務(wù)是否被取消:true 循環(huán)執(zhí)行 循環(huán)執(zhí)行 循環(huán)執(zhí)行 循環(huán)執(zhí)行 ......
那么如何選擇傳入 true 還是 false 呢?
傳入 true 適用的情況是,明確知道這個任務(wù)能夠處理中斷。
傳入 false 適用于什么情況呢?如果我們明確知道這個線程不能處理中斷,那應(yīng)該傳入 false。我們不知道這個任務(wù)是否支持取消(是否能響應(yīng)中斷),因?yàn)樵诖蠖鄶?shù)情況下代碼是多人協(xié)作的,對于這個任務(wù)是否支持中斷,我們不一定有十足的把握,那么在這種情況下也應(yīng)該傳入 false。如果這個任務(wù)一旦開始運(yùn)行,我們就希望它完全的執(zhí)行完畢。在這種情況下,也應(yīng)該傳入 false。
需要注意的是,雖然示例中寫了 !Thread.currentThread().isInterrupted() 方法來判斷中斷,但是實(shí)際上并不是通過我們的代碼來進(jìn)行中斷,而是 Future#cancel(true) 內(nèi)部調(diào)用 t.interrupt 方法修改線程的狀態(tài)之后,Thread.sleep 會拋出 InterruptedException 異常,線程池中會執(zhí)行異常的相關(guān)邏輯,并退出當(dāng)前任務(wù)。 sleep 和 interrupt 會產(chǎn)生意想不到的效果。
比如我們將 FutureInterruptTask 代碼修改為 while(true) 形式,調(diào)用 cancel(true) 方法線程還是會被中斷。
static class FutureInterruptTask implements Callable<String> { @Override public String call() throws Exception { while (true) { System.out.println("循環(huán)執(zhí)行"); Thread.sleep(500); } } }
isCancelled方法(判斷是否被取消)
isCancelled 方法,判斷是否被取消,它和 cancel 方法配合使用,比較簡單,可以參考上面的示例。
Callable 和 Future 的關(guān)系
Callable 接口相比于 Runnable 的一大優(yōu)勢是可以有返回結(jié)果,返回結(jié)果就可以用 Future 類的 get 方法來獲取 。因此,F(xiàn)uture 相當(dāng)于一個存儲器,它存儲了 Callable 的 call 方法的任務(wù)結(jié)果。
除此之外,我們還可以通過 Future 的 isDone 方法來判斷任務(wù)是否已經(jīng)執(zhí)行完畢了,還可以通過 cancel 方法取消這個任務(wù),或限時獲取任務(wù)的結(jié)果等,總之 Future 的功能比較豐富。
FutureTask
Future只是一個接口,不能直接用來創(chuàng)建對象,其實(shí)現(xiàn)類是FutureTask,JDK1.8修改了FutureTask的實(shí)現(xiàn),JKD1.8不再依賴AQS來實(shí)現(xiàn),而是通過一個volatile變量state以及CAS操作來實(shí)現(xiàn)。FutureTask結(jié)構(gòu)如下所示:
我們來看一下 FutureTask 的代碼實(shí)現(xiàn):
public class FutureTask implements RunnableFuture {...}
可以看到,它實(shí)現(xiàn)了一個接口,這個接口叫作 RunnableFuture。
RunnableFuture接口
我們來看一下 RunnableFuture 接口的代碼實(shí)現(xiàn):
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
既然 RunnableFuture 繼承了 Runnable 接口和 Future 接口,而 FutureTask 又實(shí)現(xiàn)了 RunnableFuture 接口,所以 FutureTask 既可以作為 Runnable 被線程執(zhí)行,又可以作為 Future 得到 Callable 的返回值。
FutureTask源碼分析
成員變量
/* * 當(dāng)前任務(wù)運(yùn)行狀態(tài) * NEW -> COMPLETING -> NORMAL(正常結(jié)束,返回結(jié)果) * NEW -> COMPLETING -> EXCEPTIONAL(返回異常結(jié)果) * NEW -> CANCELLED(任務(wù)被取消,無結(jié)果) * NEW -> INTERRUPTING -> INTERRUPTED(任務(wù)被打斷,無結(jié)果) */ private volatile int state; private static final int NEW = 0; // 新建 0 private static final int COMPLETING = 1; // 執(zhí)行中 1 private static final int NORMAL = 2; // 正常 2 private static final int EXCEPTIONAL = 3; // 異常 3 private static final int CANCELLED = 4; // 取消 4 private static final int INTERRUPTING = 5; // 中斷中 5 private static final int INTERRUPTED = 6; // 被中斷 6 /** 將要被執(zhí)行的任務(wù) */ private Callable<V> callable; /** 存放執(zhí)行結(jié)果,用于get()方法獲取結(jié)果,也可能用于get()方法拋出異常 */ private Object outcome; // non-volatile, protected by state reads/writes /** 執(zhí)行任務(wù)Callable的線程; */ private volatile Thread runner; /** 棧結(jié)構(gòu)的等待隊列,該節(jié)點(diǎn)是棧中最頂層的節(jié)點(diǎn) */ private volatile WaitNode waiters;
為了后面更好的分析FutureTask的實(shí)現(xiàn),這里有必要解釋下各個狀態(tài)。
NEW :表示是個新的任務(wù)或者還沒被執(zhí)行完的任務(wù)。這是初始狀態(tài)。
COMPLETING :任務(wù)已經(jīng)執(zhí)行完成或者執(zhí)行任務(wù)的時候發(fā)生異常,但是任務(wù)執(zhí)行結(jié)果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務(wù)執(zhí)行結(jié)果,如果發(fā)生異常,則用來保存異常原因)的時候,狀態(tài)會從NEW變更到COMPLETING。但是這個狀態(tài)會時間會比較短,屬于中間狀態(tài)。
NORMAL :任務(wù)已經(jīng)執(zhí)行完成并且任務(wù)執(zhí)行結(jié)果已經(jīng)保存到outcome字段,狀態(tài)會從COMPLETING轉(zhuǎn)換到NORMAL。這是一個最終態(tài)。
EXCEPTIONAL :任務(wù)執(zhí)行發(fā)生異常并且異常原因已經(jīng)保存到outcome字段中后,狀態(tài)會從COMPLETING轉(zhuǎn)換到EXCEPTIONAL。這是一個最終態(tài)。
CANCELLED :任務(wù)還沒開始執(zhí)行或者已經(jīng)開始執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(false)方法取消任務(wù)且不中斷任務(wù)執(zhí)行線程,這個時候狀態(tài)會從NEW轉(zhuǎn)化為CANCELLED狀態(tài)。這是一個最終態(tài)。
INTERRUPTING :任務(wù)還沒開始執(zhí)行或者已經(jīng)執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(true)方法取消任務(wù)并且要中斷任務(wù)執(zhí)行線程但是還沒有中斷任務(wù)執(zhí)行線程之前,狀態(tài)會從NEW轉(zhuǎn)化為INTERRUPTING。這是一個中間狀態(tài)。
INTERRUPTED :調(diào)用interrupt()中斷任務(wù)執(zhí)行線程之后狀態(tài)會從INTERRUPTING轉(zhuǎn)換到INTERRUPTED。這是一個最終態(tài)。
有一點(diǎn)需要注意的是,所有值大于COMPLETING的狀態(tài)都表示任務(wù)已經(jīng)執(zhí)行完成(任務(wù)正常執(zhí)行完成,任務(wù)執(zhí)行異?;蛘呷蝿?wù)被取消)。
構(gòu)造方法
// Callable 構(gòu)造方法 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } // Runnable 構(gòu)造方法 public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
Runnable的構(gòu)造器,只有一個目的,就是通過Executors.callable把入?yún)⑥D(zhuǎn)化為RunnableAdapter,主要是因?yàn)镃allable的功能比Runnable豐富,Callable有返回值,而Runnable沒有。
/** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
這是一個典型的適配模型,我們要把 Runnable 適配成 Callable,首先要實(shí)現(xiàn) Callable 的接口,接著在 Callable 的 call 方法里面調(diào)用被適配對象(Runnable)的方法。
內(nèi)部類
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
run方法
/** * run方法可以直接被調(diào)用 * 也可以開啟新的線程調(diào)用 */ public void run() { // 狀態(tài)不是任務(wù)創(chuàng)建,或者當(dāng)前任務(wù)已經(jīng)有線程在執(zhí)行了,直接返回 if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) return; try { Callable<V> c = callable; // Callable 不為空,并且已經(jīng)初始化完成 if (c != null && state == NEW) { V result; boolean ran; try { //調(diào)用執(zhí)行 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false;//執(zhí)行失敗 //通過CAS算法設(shè)置返回值(COMPLETING)和狀態(tài)值(EXCEPTIONAL) setException(ex); } //執(zhí)行成功通過CAS(UNSAFE)設(shè)置返回值(COMPLETING)和狀態(tài)值(NORMAL) if (ran) //將result賦值給outcome set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //將任務(wù)runner設(shè)置為null,避免發(fā)生并發(fā)調(diào)用run()方法 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts //須重新讀取任務(wù)狀態(tài),避免不可達(dá)(泄漏)的中斷 int s = state; //確保cancle(ture)操作時,運(yùn)行中的任務(wù)能接收到中斷指令 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
run方法是沒有返回值的,通過給outcome屬性賦值(set(result)),get時就能從outcome屬性中拿到返回值。
FutureTask 兩種構(gòu)造器,最終都轉(zhuǎn)化成了 Callable,所以在 run 方法執(zhí)行的時候,只需要執(zhí)行 Callable 的 call 方法即可,在執(zhí)行 c.call()代碼時,如果入?yún)⑹?Runnable 的話, 調(diào)用路徑為 c.call() -> RunnableAdapter.call() -> Runnable.run(),如果入?yún)⑹?Callable 的話,直接調(diào)用。
setException(Throwable t)方法
//發(fā)生異常時,將返回值設(shè)置到outcome(=COMPLETING)中,并更新任務(wù)狀態(tài)(EXCEPTIONAL) protected void setException(Throwable t) { //調(diào)用UNSAFE類封裝的CAS算法,設(shè)置值 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //喚醒因等待返回值而阻塞的線程 finishCompletion(); } }
由于篇幅有限,更多源碼解析請查看文章擴(kuò)展鏈接
Future的使用
FutureTask可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景。通過傳入Runnable或者Callable的任務(wù)給FutureTask,直接調(diào)用其run方法或者放入線程池執(zhí)行,之后可以在外部通過FutureTask的get方法異步獲取執(zhí)行結(jié)果,因此,F(xiàn)utureTask非常適合用于耗時的計算,主線程可以在完成自己的任務(wù)后,再去獲取結(jié)果。另外,F(xiàn)utureTask還可以確保即使調(diào)用了多次run方法,它都只會執(zhí)行一次Runnable或者Callable任務(wù),或者通過cancel取消FutureTask的執(zhí)行等。
FutureTask執(zhí)行多任務(wù)計算的使用場景
利用FutureTask和ExecutorService,可以用多線程的方式提交計算任務(wù),主線程繼續(xù)執(zhí)行其他任務(wù),當(dāng)主線程需要子線程的計算結(jié)果時,在異步獲取子線程的執(zhí)行結(jié)果。
//任務(wù)正常完成,將返回值設(shè)置到outcome(=COMPLETING)中,并更新任務(wù)狀態(tài)(=NORMAL) protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
執(zhí)行結(jié)果:
生成子線程計算任務(wù): 0 生成子線程計算任務(wù): 1 生成子線程計算任務(wù): 2 生成子線程計算任務(wù): 3 生成子線程計算任務(wù): 4 生成子線程計算任務(wù): 5 生成子線程計算任務(wù): 6 生成子線程計算任務(wù): 7 生成子線程計算任務(wù): 8 生成子線程計算任務(wù): 9 所有計算任務(wù)提交完畢, 主線程接著干其他事情! 子線程計算任務(wù): 0 執(zhí)行完成! 子線程計算任務(wù): 1 執(zhí)行完成! 子線程計算任務(wù): 3 執(zhí)行完成! 子線程計算任務(wù): 4 執(zhí)行完成! 子線程計算任務(wù): 2 執(zhí)行完成! 子線程計算任務(wù): 5 執(zhí)行完成! 子線程計算任務(wù): 7 執(zhí)行完成! 子線程計算任務(wù): 9 執(zhí)行完成! 子線程計算任務(wù): 8 執(zhí)行完成! 子線程計算任務(wù): 6 執(zhí)行完成! 多任務(wù)計算后的總結(jié)果是:990
FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次
在很多高并發(fā)的環(huán)境下,往往我們只需要某些任務(wù)只執(zhí)行一次。這種使用情景FutureTask的特性恰能勝任。舉一個例子,假設(shè)有一個帶key的連接池,當(dāng)key存在時,即直接返回key對應(yīng)的對象;當(dāng)key不存在時,則創(chuàng)建連接。對于這樣的應(yīng)用場景,通常采用的方法為使用一個Map對象來存儲key和連接池對應(yīng)的對應(yīng)關(guān)系,典型的代碼如下面所示:
//移除所有等待線程并發(fā)出信號,調(diào)用done(),以及將任務(wù)callable清空 private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //循環(huán)喚醒阻塞線程,直到阻塞隊列為空 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; //一直到阻塞隊列為空,跳出循環(huán) if (next == null) break; q.next = null; // unlink to help gc 方便gc在適當(dāng)?shù)臅r候回收 q = next; } break; } } done(); callable = null; // to reduce footprint }
在上面的例子中,我們通過加鎖確保高并發(fā)環(huán)境下的線程安全,也確保了connection只創(chuàng)建一次,然而卻犧牲了性能。改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,性能大大提高。
package com.niuh.future; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.ConcurrentHashMap; /** * @description: 改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,性能大大提高。 * <p> * 但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象。 * 為什么呢?因?yàn)閯?chuàng)建Connection是一個耗時操作,假設(shè)多個線程涌入getConnection方法,都發(fā)現(xiàn)key對應(yīng)的鍵不存在, * 于是所有涌入的線程都開始執(zhí)行conn=createConnection(),只不過最終只有一個線程能將connection插入到map里。 * 但是這樣以來,其它線程創(chuàng)建的的connection就沒啥價值,浪費(fèi)系統(tǒng)開銷。 */ public class FutureTaskConnection2 { private static ConcurrentHashMap<String, Connection> connectionPool = new ConcurrentHashMap<>(); public static Connection getConnection(String key) { Connection connection = connectionPool.get(key); if (connection == null) { connection = createConnection(); //根據(jù)putIfAbsent的返回值判斷是否有線程搶先插入了 Connection returnConnection = connectionPool.putIfAbsent(key, connection); if (returnConnection != null) { connection = returnConnection; } } else { return connection; } return connection; } private static Connection createConnection() { try { return DriverManager.getConnection(""); } catch (SQLException e) { e.printStackTrace(); } return null; } }
但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象。 為什么呢?
因?yàn)閯?chuàng)建Connection是一個耗時操作,假設(shè)多個線程涌入getConnection方法,都發(fā)現(xiàn)key對應(yīng)的鍵不存在,于是所有涌入的線程都開始執(zhí)行conn=createConnection(),只不過最終只有一個線程能將connection插入到map里。但是這樣以來,其它線程創(chuàng)建的的connection就沒啥價值,浪費(fèi)系統(tǒng)開銷。
這時最需要解決的問題就是當(dāng)key不存在時,創(chuàng)建Connection的動作(conn=createConnection();)能放在connectionPool.putIfAbsent()之后執(zhí)行,這正是FutureTask發(fā)揮作用的時機(jī),基于ConcurrentHashMap和FutureTask的改造代碼如下:
package com.niuh.future; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * @description: FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次 * 這時最需要解決的問題就是當(dāng)key不存在時,創(chuàng)建Connection的動作(conn=createConnection();) * 能放在connectionPool.putIfAbsent()之后執(zhí)行,這正是FutureTask發(fā)揮作用的時機(jī), * 基于ConcurrentHashMap和FutureTask的改造代碼如下: */ public class FutureTaskConnection3 { private static ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>(); public static Connection getConnection(String key) { FutureTask<Connection> connectionFutureTask = connectionPool.get(key); try { if (connectionFutureTask != null) { return connectionFutureTask.get(); } else { Callable<Connection> callable = new Callable<Connection>() { @Override public Connection call() throws Exception { return createConnection(); } }; FutureTask<Connection> newTask = new FutureTask<>(callable); FutureTask<Connection> returnFt = connectionPool.putIfAbsent(key, newTask); if (returnFt == null) { connectionFutureTask = newTask; newTask.run(); } return connectionFutureTask.get(); } } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } private static Connection createConnection() { try { return DriverManager.getConnection(""); } catch (SQLException e) { e.printStackTrace(); } return null; } }
FutureTask任務(wù)執(zhí)行完回調(diào)
FutureTask有一個方法 void done()會在每個線程執(zhí)行完成return結(jié)果時回調(diào)。 假設(shè)現(xiàn)在需要實(shí)現(xiàn)每個線程完成任務(wù)執(zhí)行后主動執(zhí)行后續(xù)任務(wù)。
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. //自旋等待cancle(true)結(jié)束(中斷結(jié)束) if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
執(zhí)行結(jié)果:
11:01:37.134 [Thread-0] INFO com.niuh.future.FutureTaskDemo1 - 老板給我來一個月餅 11:01:37.139 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:37.139 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:37.139 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:42.151 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 編號[804]月餅已打包好 11:01:42.151 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 編號[88]月餅已打包好 11:01:42.151 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 編號[166]月餅已打包好
“并發(fā)編程之如何理解Future&FutureTask”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。