溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

并發(fā)編程之如何理解Future&FutureTask

發(fā)布時間:2021-10-23 09:37:08 來源:億速云 閱讀:146 作者:iii 欄目:web開發(fā)

本篇內(nèi)容介紹了“并發(fā)編程之如何理解Future&FutureTask”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

 前言

Java線程實(shí)現(xiàn)方式主要有四種:

  • 繼承Thread類

  • 實(shí)現(xiàn)Runnable接口

  • 實(shí)現(xiàn)Callable接口通過FutureTask包裝器來創(chuàng)建Thread線程

  • 使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程。

其中前兩種方式線程執(zhí)行完后都沒有返回值,后兩種是帶返回值的。

Callable 和 Runnable 接口

Runnable接口

// 實(shí)現(xiàn)Runnable接口的類將被Thread執(zhí)行,表示一個基本的任務(wù) public interface Runnable {     // run方法就是它所有的內(nèi)容,就是實(shí)際執(zhí)行的任務(wù)     public abstract void run(); }

沒有返回值

run 方法沒有返回值,雖然有一些別的方法也能實(shí)現(xiàn)返回值得效果,比如編寫日志文件或者修改共享變量等等,但是不僅容易出錯,效率也不高。

不能拋出異常

public class RunThrowExceptionDemo {      /**      * 普通方法可以在方法簽名中拋出異常      *      * @throws IOException      */     public void normalMethod() throws IOException {         throw new IOException();     }      class RunnableImpl implements Runnable {          /**          * run 方法內(nèi)無法拋出 checked Exception,除非使用 try catch 進(jìn)行處理          */         @Override         public void run() {             try {                 throw new IOException();             } catch (IOException e) {                 e.printStackTrace();             }         }     }  }

可以看到普通方法 normalMethod 可以在方法簽名上拋出異常,這樣上層接口就可以捕獲這個異常進(jìn)行處理,但是實(shí)現(xiàn) Runnable 接口的類,run  方法無法拋出 checked Exception,只能在方法內(nèi)使用 try catch 進(jìn)行處理,這樣上層就無法得知線程中的異常。

設(shè)計導(dǎo)致

其實(shí)這兩個缺陷主要原因就在于 Runnable 接口設(shè)計的 run 方法,這個方法已經(jīng)規(guī)定了 run() 方法的返回類型是  void,而且這個方法沒有聲明拋出任何異常。所以,當(dāng)實(shí)現(xiàn)并重寫這個方法時,我們既不能改返回值類型,也不能更改對于異常拋出的描述,因?yàn)樵趯?shí)現(xiàn)方法的時候,語法規(guī)定是不允許對這些內(nèi)容進(jìn)行修改的。

Runnable 為什么設(shè)計成這樣?

假設(shè) run() 方法可以返回返回值,或者可以拋出異常,也無濟(jì)于事,因?yàn)槲覀儾]有辦法在外層捕獲并處理,這是因?yàn)檎{(diào)用 run() 方法的類(比如  Thread 類和線程池)是 Java 直接提供的,而不是我們編寫的。 所以就算它能有一個返回值,我們也很難把這個返回值利用到,而 Callable  接口就是為了解決這兩個問題。

Callable接口

public interface Callable<V> {     //返回接口,或者拋出異常     V call() throws Exception; }

可以看到 Callable 和 Runnable 接口其實(shí)比較相似,都只有一個方法,也就是線程任務(wù)執(zhí)行的方法,區(qū)別就是 call 方法有返回值,而且聲明了  throws Exception。

Callable 和 Runnable 的不同之處

  • 方法名 :Callable 規(guī)定的執(zhí)行方法是 call(),而 Runnable 規(guī)定的執(zhí)行方法是 run();

  • 返回值 :Callable 的任務(wù)執(zhí)行后有返回值,而 Runnable 的任務(wù)執(zhí)行后是沒有返回值的;

  • 拋出異常 :call() 方法可拋出異常,而 run() 方法是不能拋出受檢查異常的;

與 Callable 配合的有一個 Future 接口,通過 Future 可以了解任務(wù)執(zhí)行情況,或者取消任務(wù)的執(zhí)行,還可獲取任務(wù)執(zhí)行的結(jié)果,這些功能都是  Runnable 做不到的,Callable 的功能要比 Runnable 強(qiáng)大。

Future接口

Future的作用

簡單來說就是利用線程達(dá)到異步的效果,同時還可以獲取子線程的返回值。

比如當(dāng)做一定運(yùn)算的時候,運(yùn)算過程可能比較耗時,有時會去查數(shù)據(jù)庫,或是繁重的計算,比如壓縮、加密等,在這種情況下,如果我們一直在原地等待方法返回,顯然是不明智的,整體程序的運(yùn)行效率會大大降低。

我們可以把運(yùn)算的過程放到子線程去執(zhí)行,再通過 Future  去控制子線程執(zhí)行的計算過程,最后獲取到計算結(jié)果。這樣一來就可以把整個程序的運(yùn)行效率提高,是一種異步的思想。

Future的方法

Future 接口一共有5個方法,源代碼如下:

并發(fā)編程之如何理解Future&FutureTask

public interface Future<V> {    /**    * 嘗試取消任務(wù),如果任務(wù)已經(jīng)完成、已取消或其他原因無法取消,則失敗。    * 1、如果任務(wù)還沒開始執(zhí)行,則該任務(wù)不應(yīng)該運(yùn)行    * 2、如果任務(wù)已經(jīng)開始執(zhí)行,由參數(shù)mayInterruptIfRunning來決定執(zhí)行該任務(wù)的線程是否應(yīng)該被中斷,這只是終止任務(wù)的一種嘗試。若mayInterruptIfRunning為true,則會立即中斷執(zhí)行任務(wù)的線程并返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務(wù)執(zhí)行線程。    * 3、調(diào)用這個方法后,以后對isDone方法調(diào)用都返回true。    * 4、如果這個方法返回true,以后對isCancelled返回true。    */     boolean cancel(boolean mayInterruptIfRunning);     /**     * 判斷任務(wù)是否被取消了,如果調(diào)用了cance()則返回true     */     boolean isCancelled();     /**     * 如果任務(wù)完成,則返回ture     * 任務(wù)完成包含正常終止、異常、取消任務(wù)。在這些情況下都返回true     */     boolean isDone();     /**     * 線程阻塞,直到任務(wù)完成,返回結(jié)果     * 如果任務(wù)被取消,則引發(fā)CancellationException     * 如果當(dāng)前線程被中斷,則引發(fā)InterruptedException     * 當(dāng)任務(wù)在執(zhí)行的過程中出現(xiàn)異常,則拋出ExecutionException     */     V get() throws InterruptedException, ExecutionException;     /**     * 線程阻塞一定時間等待任務(wù)完成,并返回任務(wù)執(zhí)行結(jié)果,如果則超時則拋出TimeoutException     */     V get(long timeout, TimeUnit unit)         throws InterruptedException, ExecutionException, TimeoutException; }

get方法(獲取結(jié)果)

get 方法最主要的作用就是獲取任務(wù)執(zhí)行的結(jié)果,該方法在執(zhí)行時的行為取決于 Callable 任務(wù)的狀態(tài),可能會發(fā)生以下 7 種情況。

  • 任務(wù)已經(jīng)執(zhí)行完,執(zhí)行 get 方法可以立刻返回,獲取到任務(wù)執(zhí)行的結(jié)果。

  • 任務(wù)還沒有開始執(zhí)行,比如我們往線程池中放一個任務(wù),線程池中可能積壓了很多任務(wù),還沒輪到我去執(zhí)行的時候,就去 get  了,在這種情況下,相當(dāng)于任務(wù)還沒開始,我們?nèi)フ{(diào)用 get 的時候,會當(dāng)前的線程阻塞,直到任務(wù)完成再把結(jié)果返回回來。

  • 任務(wù)正在執(zhí)行中,但是執(zhí)行過程比較長,所以我去 get 的時候,它依然在執(zhí)行的過程中。這種情況調(diào)用 get  方法也會阻塞當(dāng)前線程,直到任務(wù)執(zhí)行完返回結(jié)果。

  • 任務(wù)執(zhí)行過程中拋出異常,我們再去調(diào)用 get 的時候,就會拋出 ExecutionException 異常,不管我們執(zhí)行 call  方法時里面拋出的異常類型是什么,在執(zhí)行 get 方法時所獲得的異常都是 ExecutionException。

  • 任務(wù)被取消了,如果任務(wù)被取消,我們用 get 方法去獲取結(jié)果時則會拋出 CancellationException。

  • 任務(wù)被中斷了,如果任務(wù)被當(dāng)前線程中斷,我們用 get 方法去獲取結(jié)果時則會拋出InterruptedException。

  • 任務(wù)超時,我們知道 get 方法有一個重載方法,那就是帶延遲參數(shù)的,調(diào)用了這個帶延遲參數(shù)的 get 方法之后,如果 call  方法在規(guī)定時間內(nèi)正常順利完成了任務(wù),那么 get 會正常返回;但是如果到達(dá)了指定時間依然沒有完成任務(wù),get 方法則會拋出  TimeoutException,代表超時了。

參考示例:

package com.niuh.future;  import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;  public class FutureDemo {     public static void main(String[] args) {         ExecutorService executorService = Executors.newSingleThreadExecutor();         Future<Integer> future = executorService.submit(new FutureTask());         try {             Integer res = future.get(2000, TimeUnit.MILLISECONDS);             System.out.println("Future線程返回值:" + res);         } catch (InterruptedException e) {             e.printStackTrace();         } catch (ExecutionException e) {             e.printStackTrace();         } catch (TimeoutException e) {             e.printStackTrace();         }     }      static class FutureTask implements Callable<Integer> {          @Override         public Integer call() throws Exception {             Thread.sleep(new Random().nextInt(3000));             return new Random().nextInt(10);         }     } }

isDone方法(判斷是否執(zhí)行完畢)

isDone() 方法,該方法是用來判斷當(dāng)前這個任務(wù)是否執(zhí)行完畢了

package com.niuh.future;  import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;  public class FutureIsDoneDemo {     public static void main(String[] args) {         ExecutorService executorService = Executors.newSingleThreadExecutor();         Future<Integer> future = executorService.submit(new FutureTask());         try {             for (int i = 0; i < 3; i++) {                 Thread.sleep(1000);                 System.out.println("線程是否完成:" + future.isDone());             }             Integer res = future.get(2000, TimeUnit.MILLISECONDS);             System.out.println("Future 線程返回值:" + res);         } catch (InterruptedException e) {             e.printStackTrace();         } catch (ExecutionException e) {             e.printStackTrace();         } catch (TimeoutException e) {             e.printStackTrace();         }     }      static class FutureTask implements Callable<Integer> {          @Override         public Integer call() throws Exception {             Thread.sleep(2000);             return new Random().nextInt(10);         }     } }

執(zhí)行結(jié)果:

線程是否完成:false 線程是否完成:false 線程是否完成:true Future 線程返回值:9

可以看到前兩次 isDone 方法的返回結(jié)果是 false,因?yàn)榫€程任務(wù)還沒有執(zhí)行完成,第三次 isDone 方法的返回結(jié)果是 ture。

注意:這個方法返回 true 則代表執(zhí)行完成了,返回 false 則代表還沒完成。但返回  true,并不代表這個任務(wù)是成功執(zhí)行的,比如說任務(wù)執(zhí)行到一半拋出了異常。那么在這種情況下,對于這個 isDone 方法而言,它其實(shí)也是會返回 true  的,因?yàn)閷λ鼇碚f,雖然有異常發(fā)生了,但是這個任務(wù)在未來也不會再被執(zhí)行,它確實(shí)已經(jīng)執(zhí)行完畢了。所以 isDone 方法在返回 true  的時候,不代表這個任務(wù)是成功執(zhí)行的,只代表它執(zhí)行完畢了。

我們將上面的示例稍作修改再來看下結(jié)果,修改 FutureTask 代碼如下:

static class FutureTask implements Callable<Integer> {  @Override  public Integer call() throws Exception {   Thread.sleep(2000);   throw new Exception("故意拋出異常");     } }

執(zhí)行結(jié)果:

并發(fā)編程之如何理解Future&FutureTask

雖然拋出了異常,但是 isDone 方法的返回結(jié)果依然是 ture。

這段代碼說明了:

  • 即便任務(wù)拋出異常,isDone 方法依然會返回 true。

  • 雖然拋出的異常是 IllegalArgumentException,但是對于 get 而言,它拋出的異常依然是  ExecutionException。

  • 雖然在任務(wù)執(zhí)行到2秒的時候就拋出了異常,但是真正要等到我們執(zhí)行 get 的時候,才看到了異常。

cancel方法(取消任務(wù)的執(zhí)行)

如果不想執(zhí)行某個任務(wù)了,則可以使用 cancel 方法,會有以下三種情況:

  • 第一種情況最簡單,那就是當(dāng)任務(wù)還沒有開始執(zhí)行時,一旦調(diào)用 cancel,這個任務(wù)就會被正常取消,未來也不會被執(zhí)行,那么 cancel 方法返回  true。

  • 第二種情況也比較簡單。如果任務(wù)已經(jīng)完成,或者之前已經(jīng)被取消過了,那么執(zhí)行 cancel 方法則代表取消失敗,返回  false。因?yàn)槿蝿?wù)無論是已完成還是已經(jīng)被取消過了,都不能再被取消了。

  • 第三種情況比較特殊,就是這個任務(wù)正在執(zhí)行,這個時候執(zhí)行 cancel 方法是不會直接取消這個任務(wù)的,而是會根據(jù)我們傳入的參數(shù)做判斷。cancel  方法是必須傳入一個參數(shù),該參數(shù)叫作 mayInterruptIfRunning,它是什么含義呢?如果傳入的參數(shù)是  true,執(zhí)行任務(wù)的線程就會收到一個中斷的信號,正在執(zhí)行的任務(wù)可能會有一些處理中斷的邏輯,進(jìn)而停止,這個比較好理解。如果傳入的是 false  則就代表不中斷正在運(yùn)行的任務(wù),也就是說,本次 cancel 不會有任何效果,同時 cancel 方法會返回 false。

參考示例:

package com.niuh.future;  import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;  public class FutureCancelDemo {      static ExecutorService executorService = Executors.newSingleThreadExecutor();      public static void main(String[] args) {         // 當(dāng)任務(wù)還沒有開始執(zhí)行         // demo1();          // 如果任務(wù)已經(jīng)執(zhí)行完         // demo2();          // 如果任務(wù)正在進(jìn)行中         demo3();     }      private static void demo1() {         for (int i = 0; i < 1000; i++) {             executorService.submit(new FutureTask());         }          Future<String> future = executorService.submit(new FutureTask());         try {             boolean cancel = future.cancel(false);             System.out.println("Future 任務(wù)是否被取消:" + cancel);             String res = future.get(2000, TimeUnit.MILLISECONDS);             System.out.println("Future 線程返回值:" + res);         } catch (InterruptedException e) {             e.printStackTrace();         } catch (ExecutionException e) {             e.printStackTrace();         } catch (TimeoutException e) {             e.printStackTrace();         } finally {             executorService.shutdown();         }     }       private static void demo2() {         Future<String> future = executorService.submit(new FutureTask());         try {             Thread.sleep(1000);             boolean cancel = future.cancel(false);             System.out.println("Future 任務(wù)是否被取消:" + cancel);         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             executorService.shutdown();         }     }      private static void demo3() {         Future<String> future = executorService.submit(new FutureInterruptTask());         try {             Thread.sleep(1000);             boolean cancel = future.cancel(true);             System.out.println("Future 任務(wù)是否被取消:" + cancel);         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             executorService.shutdown();         }     }       static class FutureTask implements Callable<String> {          @Override         public String call() throws Exception {             return "正常返回";         }     }      static class FutureInterruptTask implements Callable<String> {          @Override         public String call() throws Exception {             while (!Thread.currentThread().isInterrupted()) {                 System.out.println("循環(huán)執(zhí)行");                 Thread.sleep(500);             }             System.out.println("線程被中斷");             return "正常返回";         }     } }

這里,我們來分析下第三種情況(任務(wù)正在進(jìn)行中),當(dāng)我們設(shè)置 true 時,線程停止

循環(huán)執(zhí)行 循環(huán)執(zhí)行 Future 任務(wù)是否被取消:true

當(dāng)我們設(shè)置 false 時,任務(wù)雖然也被取消成功,但是線程依然執(zhí)行。

循環(huán)執(zhí)行 循環(huán)執(zhí)行 Future 任務(wù)是否被取消:true 循環(huán)執(zhí)行 循環(huán)執(zhí)行 循環(huán)執(zhí)行 循環(huán)執(zhí)行 ......

那么如何選擇傳入 true 還是 false 呢?

  • 傳入 true 適用的情況是,明確知道這個任務(wù)能夠處理中斷。

  • 傳入 false 適用于什么情況呢?如果我們明確知道這個線程不能處理中斷,那應(yīng)該傳入  false。我們不知道這個任務(wù)是否支持取消(是否能響應(yīng)中斷),因?yàn)樵诖蠖鄶?shù)情況下代碼是多人協(xié)作的,對于這個任務(wù)是否支持中斷,我們不一定有十足的把握,那么在這種情況下也應(yīng)該傳入  false。如果這個任務(wù)一旦開始運(yùn)行,我們就希望它完全的執(zhí)行完畢。在這種情況下,也應(yīng)該傳入 false。

需要注意的是,雖然示例中寫了 !Thread.currentThread().isInterrupted()  方法來判斷中斷,但是實(shí)際上并不是通過我們的代碼來進(jìn)行中斷,而是 Future#cancel(true) 內(nèi)部調(diào)用 t.interrupt  方法修改線程的狀態(tài)之后,Thread.sleep 會拋出 InterruptedException 異常,線程池中會執(zhí)行異常的相關(guān)邏輯,并退出當(dāng)前任務(wù)。  sleep 和 interrupt 會產(chǎn)生意想不到的效果。

比如我們將 FutureInterruptTask 代碼修改為 while(true) 形式,調(diào)用 cancel(true)  方法線程還是會被中斷。

static class FutureInterruptTask implements Callable<String> {  @Override  public String call() throws Exception {   while (true) {             System.out.println("循環(huán)執(zhí)行");             Thread.sleep(500);   }  } }

isCancelled方法(判斷是否被取消)

isCancelled 方法,判斷是否被取消,它和 cancel 方法配合使用,比較簡單,可以參考上面的示例。

Callable 和 Future 的關(guān)系

Callable 接口相比于 Runnable 的一大優(yōu)勢是可以有返回結(jié)果,返回結(jié)果就可以用 Future 類的 get 方法來獲取 。因此,F(xiàn)uture  相當(dāng)于一個存儲器,它存儲了 Callable 的 call 方法的任務(wù)結(jié)果。

除此之外,我們還可以通過 Future 的 isDone 方法來判斷任務(wù)是否已經(jīng)執(zhí)行完畢了,還可以通過 cancel  方法取消這個任務(wù),或限時獲取任務(wù)的結(jié)果等,總之 Future 的功能比較豐富。

FutureTask

Future只是一個接口,不能直接用來創(chuàng)建對象,其實(shí)現(xiàn)類是FutureTask,JDK1.8修改了FutureTask的實(shí)現(xiàn),JKD1.8不再依賴AQS來實(shí)現(xiàn),而是通過一個volatile變量state以及CAS操作來實(shí)現(xiàn)。FutureTask結(jié)構(gòu)如下所示:

并發(fā)編程之如何理解Future&FutureTask

我們來看一下 FutureTask 的代碼實(shí)現(xiàn):

public class FutureTask implements RunnableFuture {...}

可以看到,它實(shí)現(xiàn)了一個接口,這個接口叫作 RunnableFuture。

RunnableFuture接口

我們來看一下 RunnableFuture 接口的代碼實(shí)現(xiàn):

public interface RunnableFuture<V> extends Runnable, Future<V> {     /**      * Sets this Future to the result of its computation      * unless it has been cancelled.      */     void run(); }

既然 RunnableFuture 繼承了 Runnable 接口和 Future 接口,而 FutureTask 又實(shí)現(xiàn)了 RunnableFuture  接口,所以 FutureTask 既可以作為 Runnable 被線程執(zhí)行,又可以作為 Future 得到 Callable 的返回值。

FutureTask源碼分析

成員變量

/*  * 當(dāng)前任務(wù)運(yùn)行狀態(tài)  * NEW -> COMPLETING -> NORMAL(正常結(jié)束,返回結(jié)果)  * NEW -> COMPLETING -> EXCEPTIONAL(返回異常結(jié)果)  * NEW -> CANCELLED(任務(wù)被取消,無結(jié)果)  * NEW -> INTERRUPTING -> INTERRUPTED(任務(wù)被打斷,無結(jié)果)  */ private volatile int state; private static final int NEW          = 0; // 新建 0 private static final int COMPLETING   = 1; // 執(zhí)行中 1 private static final int NORMAL       = 2; // 正常 2 private static final int EXCEPTIONAL  = 3; // 異常 3 private static final int CANCELLED    = 4; // 取消 4 private static final int INTERRUPTING = 5; // 中斷中 5 private static final int INTERRUPTED  = 6; // 被中斷 6  /** 將要被執(zhí)行的任務(wù) */ private Callable<V> callable; /** 存放執(zhí)行結(jié)果,用于get()方法獲取結(jié)果,也可能用于get()方法拋出異常 */ private Object outcome; // non-volatile, protected by state reads/writes /** 執(zhí)行任務(wù)Callable的線程; */ private volatile Thread runner; /** 棧結(jié)構(gòu)的等待隊列,該節(jié)點(diǎn)是棧中最頂層的節(jié)點(diǎn) */ private volatile WaitNode waiters;

為了后面更好的分析FutureTask的實(shí)現(xiàn),這里有必要解釋下各個狀態(tài)。

  • NEW :表示是個新的任務(wù)或者還沒被執(zhí)行完的任務(wù)。這是初始狀態(tài)。

  • COMPLETING :任務(wù)已經(jīng)執(zhí)行完成或者執(zhí)行任務(wù)的時候發(fā)生異常,但是任務(wù)執(zhí)行結(jié)果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務(wù)執(zhí)行結(jié)果,如果發(fā)生異常,則用來保存異常原因)的時候,狀態(tài)會從NEW變更到COMPLETING。但是這個狀態(tài)會時間會比較短,屬于中間狀態(tài)。

  • NORMAL :任務(wù)已經(jīng)執(zhí)行完成并且任務(wù)執(zhí)行結(jié)果已經(jīng)保存到outcome字段,狀態(tài)會從COMPLETING轉(zhuǎn)換到NORMAL。這是一個最終態(tài)。

  • EXCEPTIONAL  :任務(wù)執(zhí)行發(fā)生異常并且異常原因已經(jīng)保存到outcome字段中后,狀態(tài)會從COMPLETING轉(zhuǎn)換到EXCEPTIONAL。這是一個最終態(tài)。

  • CANCELLED  :任務(wù)還沒開始執(zhí)行或者已經(jīng)開始執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(false)方法取消任務(wù)且不中斷任務(wù)執(zhí)行線程,這個時候狀態(tài)會從NEW轉(zhuǎn)化為CANCELLED狀態(tài)。這是一個最終態(tài)。

  • INTERRUPTING :任務(wù)還沒開始執(zhí)行或者已經(jīng)執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(true)方法取消任務(wù)并且要中斷任務(wù)執(zhí)行線程但是還沒有中斷任務(wù)執(zhí)行線程之前,狀態(tài)會從NEW轉(zhuǎn)化為INTERRUPTING。這是一個中間狀態(tài)。

  • INTERRUPTED  :調(diào)用interrupt()中斷任務(wù)執(zhí)行線程之后狀態(tài)會從INTERRUPTING轉(zhuǎn)換到INTERRUPTED。這是一個最終態(tài)。

有一點(diǎn)需要注意的是,所有值大于COMPLETING的狀態(tài)都表示任務(wù)已經(jīng)執(zhí)行完成(任務(wù)正常執(zhí)行完成,任務(wù)執(zhí)行異?;蛘呷蝿?wù)被取消)。

構(gòu)造方法

// Callable 構(gòu)造方法 public FutureTask(Callable<V> callable) {     if (callable == null)         throw new NullPointerException();     this.callable = callable;     this.state = NEW;       // ensure visibility of callable }  // Runnable 構(gòu)造方法 public FutureTask(Runnable runnable, V result) {     this.callable = Executors.callable(runnable, result);     this.state = NEW;       // ensure visibility of callable }

Runnable的構(gòu)造器,只有一個目的,就是通過Executors.callable把入?yún)⑥D(zhuǎn)化為RunnableAdapter,主要是因?yàn)镃allable的功能比Runnable豐富,Callable有返回值,而Runnable沒有。

/** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> {     final Runnable task;     final T result;     RunnableAdapter(Runnable task, T result) {         this.task = task;         this.result = result;     }     public T call() {         task.run();         return result;     } }

這是一個典型的適配模型,我們要把 Runnable 適配成 Callable,首先要實(shí)現(xiàn) Callable 的接口,接著在 Callable 的 call  方法里面調(diào)用被適配對象(Runnable)的方法。

內(nèi)部類

static final class WaitNode {  volatile Thread thread;  volatile WaitNode next;  WaitNode() { thread = Thread.currentThread(); } }

run方法

/**  * run方法可以直接被調(diào)用  * 也可以開啟新的線程調(diào)用  */ public void run() {  // 狀態(tài)不是任務(wù)創(chuàng)建,或者當(dāng)前任務(wù)已經(jīng)有線程在執(zhí)行了,直接返回     if (state != NEW ||         !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))         return;     try {         Callable<V> c = callable;         // Callable 不為空,并且已經(jīng)初始化完成         if (c != null && state == NEW) {             V result;             boolean ran;             try {              //調(diào)用執(zhí)行                 result = c.call();                 ran = true;             } catch (Throwable ex) {                 result = null;                 ran = false;//執(zhí)行失敗                 //通過CAS算法設(shè)置返回值(COMPLETING)和狀態(tài)值(EXCEPTIONAL)                 setException(ex);             }             //執(zhí)行成功通過CAS(UNSAFE)設(shè)置返回值(COMPLETING)和狀態(tài)值(NORMAL)             if (ran)              //將result賦值給outcome                 set(result);         }     } finally {         // runner must be non-null until state is settled to         // prevent concurrent calls to run()         //將任務(wù)runner設(shè)置為null,避免發(fā)生并發(fā)調(diào)用run()方法         runner = null;         // state must be re-read after nulling runner to prevent         // leaked interrupts         //須重新讀取任務(wù)狀態(tài),避免不可達(dá)(泄漏)的中斷         int s = state;         //確保cancle(ture)操作時,運(yùn)行中的任務(wù)能接收到中斷指令         if (s >= INTERRUPTING)             handlePossibleCancellationInterrupt(s);     } }
  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. run方法是沒有返回值的,通過給outcome屬性賦值(set(result)),get時就能從outcome屬性中拿到返回值。

  3. FutureTask 兩種構(gòu)造器,最終都轉(zhuǎn)化成了 Callable,所以在 run 方法執(zhí)行的時候,只需要執(zhí)行 Callable 的 call  方法即可,在執(zhí)行 c.call()代碼時,如果入?yún)⑹?Runnable 的話, 調(diào)用路徑為 c.call() ->  RunnableAdapter.call() -> Runnable.run(),如果入?yún)⑹?Callable 的話,直接調(diào)用。

setException(Throwable t)方法

//發(fā)生異常時,將返回值設(shè)置到outcome(=COMPLETING)中,并更新任務(wù)狀態(tài)(EXCEPTIONAL) protected void setException(Throwable t) {  //調(diào)用UNSAFE類封裝的CAS算法,設(shè)置值  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {      outcome = t;     UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state     //喚醒因等待返回值而阻塞的線程     finishCompletion();     } }

由于篇幅有限,更多源碼解析請查看文章擴(kuò)展鏈接

Future的使用

FutureTask可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景。通過傳入Runnable或者Callable的任務(wù)給FutureTask,直接調(diào)用其run方法或者放入線程池執(zhí)行,之后可以在外部通過FutureTask的get方法異步獲取執(zhí)行結(jié)果,因此,F(xiàn)utureTask非常適合用于耗時的計算,主線程可以在完成自己的任務(wù)后,再去獲取結(jié)果。另外,F(xiàn)utureTask還可以確保即使調(diào)用了多次run方法,它都只會執(zhí)行一次Runnable或者Callable任務(wù),或者通過cancel取消FutureTask的執(zhí)行等。

FutureTask執(zhí)行多任務(wù)計算的使用場景

利用FutureTask和ExecutorService,可以用多線程的方式提交計算任務(wù),主線程繼續(xù)執(zhí)行其他任務(wù),當(dāng)主線程需要子線程的計算結(jié)果時,在異步獲取子線程的執(zhí)行結(jié)果。

//任務(wù)正常完成,將返回值設(shè)置到outcome(=COMPLETING)中,并更新任務(wù)狀態(tài)(=NORMAL) protected void set(V v) {  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {   outcome = v;         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state         finishCompletion();     } }

執(zhí)行結(jié)果:

生成子線程計算任務(wù): 0 生成子線程計算任務(wù): 1 生成子線程計算任務(wù): 2 生成子線程計算任務(wù): 3 生成子線程計算任務(wù): 4 生成子線程計算任務(wù): 5 生成子線程計算任務(wù): 6 生成子線程計算任務(wù): 7 生成子線程計算任務(wù): 8 生成子線程計算任務(wù): 9 所有計算任務(wù)提交完畢, 主線程接著干其他事情! 子線程計算任務(wù): 0 執(zhí)行完成! 子線程計算任務(wù): 1 執(zhí)行完成! 子線程計算任務(wù): 3 執(zhí)行完成! 子線程計算任務(wù): 4 執(zhí)行完成! 子線程計算任務(wù): 2 執(zhí)行完成! 子線程計算任務(wù): 5 執(zhí)行完成! 子線程計算任務(wù): 7 執(zhí)行完成! 子線程計算任務(wù): 9 執(zhí)行完成! 子線程計算任務(wù): 8 執(zhí)行完成! 子線程計算任務(wù): 6 執(zhí)行完成! 多任務(wù)計算后的總結(jié)果是:990

FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次

在很多高并發(fā)的環(huán)境下,往往我們只需要某些任務(wù)只執(zhí)行一次。這種使用情景FutureTask的特性恰能勝任。舉一個例子,假設(shè)有一個帶key的連接池,當(dāng)key存在時,即直接返回key對應(yīng)的對象;當(dāng)key不存在時,則創(chuàng)建連接。對于這樣的應(yīng)用場景,通常采用的方法為使用一個Map對象來存儲key和連接池對應(yīng)的對應(yīng)關(guān)系,典型的代碼如下面所示:

//移除所有等待線程并發(fā)出信號,調(diào)用done(),以及將任務(wù)callable清空 private void finishCompletion() {     // assert state > COMPLETING;     for (WaitNode q; (q = waiters) != null;) {         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {             //循環(huán)喚醒阻塞線程,直到阻塞隊列為空             for (;;) {                 Thread t = q.thread;                 if (t != null) {                     q.thread = null;                     LockSupport.unpark(t);                 }                 WaitNode next = q.next;                 //一直到阻塞隊列為空,跳出循環(huán)                 if (next == null)                     break;                 q.next = null; // unlink to help gc   方便gc在適當(dāng)?shù)臅r候回收                 q = next;             }             break;         }     }      done();      callable = null;        // to reduce footprint }

在上面的例子中,我們通過加鎖確保高并發(fā)環(huán)境下的線程安全,也確保了connection只創(chuàng)建一次,然而卻犧牲了性能。改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,性能大大提高。

package com.niuh.future;  import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.ConcurrentHashMap;  /**  * @description: 改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,性能大大提高。  * <p>  * 但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象。  * 為什么呢?因?yàn)閯?chuàng)建Connection是一個耗時操作,假設(shè)多個線程涌入getConnection方法,都發(fā)現(xiàn)key對應(yīng)的鍵不存在,  * 于是所有涌入的線程都開始執(zhí)行conn=createConnection(),只不過最終只有一個線程能將connection插入到map里。  * 但是這樣以來,其它線程創(chuàng)建的的connection就沒啥價值,浪費(fèi)系統(tǒng)開銷。  */ public class FutureTaskConnection2 {     private static ConcurrentHashMap<String, Connection> connectionPool = new ConcurrentHashMap<>();      public static Connection getConnection(String key) {         Connection connection = connectionPool.get(key);         if (connection == null) {             connection = createConnection();             //根據(jù)putIfAbsent的返回值判斷是否有線程搶先插入了             Connection returnConnection = connectionPool.putIfAbsent(key, connection);             if (returnConnection != null) {                 connection = returnConnection;             }         } else {             return connection;         }         return connection;     }      private static Connection createConnection() {         try {             return DriverManager.getConnection("");         } catch (SQLException e) {             e.printStackTrace();         }         return null;     }  }

但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象。 為什么呢?

因?yàn)閯?chuàng)建Connection是一個耗時操作,假設(shè)多個線程涌入getConnection方法,都發(fā)現(xiàn)key對應(yīng)的鍵不存在,于是所有涌入的線程都開始執(zhí)行conn=createConnection(),只不過最終只有一個線程能將connection插入到map里。但是這樣以來,其它線程創(chuàng)建的的connection就沒啥價值,浪費(fèi)系統(tǒng)開銷。

這時最需要解決的問題就是當(dāng)key不存在時,創(chuàng)建Connection的動作(conn=createConnection();)能放在connectionPool.putIfAbsent()之后執(zhí)行,這正是FutureTask發(fā)揮作用的時機(jī),基于ConcurrentHashMap和FutureTask的改造代碼如下:

package com.niuh.future;  import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask;  /**  * @description: FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次  * 這時最需要解決的問題就是當(dāng)key不存在時,創(chuàng)建Connection的動作(conn=createConnection();)  * 能放在connectionPool.putIfAbsent()之后執(zhí)行,這正是FutureTask發(fā)揮作用的時機(jī),  * 基于ConcurrentHashMap和FutureTask的改造代碼如下:  */ public class FutureTaskConnection3 {     private static ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();      public static Connection getConnection(String key) {         FutureTask<Connection> connectionFutureTask = connectionPool.get(key);         try {             if (connectionFutureTask != null) {                 return connectionFutureTask.get();             } else {                 Callable<Connection> callable = new Callable<Connection>() {                     @Override                     public Connection call() throws Exception {                         return createConnection();                     }                 };                 FutureTask<Connection> newTask = new FutureTask<>(callable);                 FutureTask<Connection> returnFt = connectionPool.putIfAbsent(key, newTask);                 if (returnFt == null) {                     connectionFutureTask = newTask;                     newTask.run();                 }                 return connectionFutureTask.get();             }         } catch (ExecutionException e) {             e.printStackTrace();         } catch (InterruptedException e) {             e.printStackTrace();         }         return null;     }      private static Connection createConnection() {         try {             return DriverManager.getConnection("");         } catch (SQLException e) {             e.printStackTrace();         }         return null;     } }

FutureTask任務(wù)執(zhí)行完回調(diào)

FutureTask有一個方法 void done()會在每個線程執(zhí)行完成return結(jié)果時回調(diào)。  假設(shè)現(xiàn)在需要實(shí)現(xiàn)每個線程完成任務(wù)執(zhí)行后主動執(zhí)行后續(xù)任務(wù)。

private void handlePossibleCancellationInterrupt(int s) {     // It is possible for our interrupter to stall before getting a     // chance to interrupt us.  Let's spin-wait patiently.     //自旋等待cancle(true)結(jié)束(中斷結(jié)束)     if (s == INTERRUPTING)         while (state == INTERRUPTING)              Thread.yield(); // wait out pending interrupt      // assert state == INTERRUPTED;      // We want to clear any interrupt we may have received from     // cancel(true).  However, it is permissible to use interrupts     // as an independent mechanism for a task to communicate with     // its caller, and there is no way to clear only the     // cancellation interrupt.     //     // Thread.interrupted(); }

執(zhí)行結(jié)果:

11:01:37.134 [Thread-0] INFO com.niuh.future.FutureTaskDemo1 - 老板給我來一個月餅 11:01:37.139 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:37.139 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:37.139 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。。。。 11:01:42.151 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 -  編號[804]月餅已打包好 11:01:42.151 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 -  編號[88]月餅已打包好 11:01:42.151 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 -  編號[166]月餅已打包好

“并發(fā)編程之如何理解Future&FutureTask”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI