溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

JUC-Future與FutureTask原理是什么

發(fā)布時間:2021-10-25 15:45:02 來源:億速云 閱讀:147 作者:iii 欄目:web開發(fā)

本篇內(nèi)容主要講解“JUC-Future與FutureTask原理是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“JUC-Future與FutureTask原理是什么”吧!

1 Future

Future  表示一個任務(wù)的生命周期,是一個可取消的異步運算。提供了相應(yīng)的方法來判斷任務(wù)狀態(tài)(完成或取消),以及獲取任務(wù)的結(jié)果和取消任務(wù)等。適合具有可取消性和執(zhí)行時間較長的異步任務(wù)。

并發(fā)包中許多異步任務(wù)類都繼承自Future,其中最典型的就是 FutureTask

1.1 介紹

JUC-Future與FutureTask原理是什么

Future  表示異步計算的結(jié)果。它提供了檢查計算是否完成的方法,以等待計算的完成,并獲取計算的結(jié)果。計算完成后只能使用get方法來獲取結(jié)果,如有必要,計算完成前可以阻塞此方法。取消則由  cancel 方法來執(zhí)行。還提供了其他方法,以確定任務(wù)是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用 Future  但又不提供可用的結(jié)果,則可以聲明 Future 形式類型、并返回 null 作為底層任務(wù)的結(jié)果。

也就是說Future具有這樣的特性

  • 異步執(zhí)行,可用 get 方法獲取執(zhí)行結(jié)果

  • 如果計算還沒完成,get 方法是會阻塞的,如果完成了,是可以多次獲取并立即得到結(jié)果的

  • 如果計算還沒完成,是可以取消計算的

  • 可以查詢計算的執(zhí)行狀態(tài)

2 FutureTask

FutureTask 為 Future  提供了基礎(chǔ)實現(xiàn),如獲取任務(wù)執(zhí)行結(jié)果(get)和取消任務(wù)(cancel)等。如果任務(wù)尚未完成,獲取任務(wù)執(zhí)行結(jié)果時將會阻塞。一旦執(zhí)行結(jié)束,任務(wù)就不能被重啟或取消(除非使用runAndReset執(zhí)行計算)。

FutureTask 常用來封裝 Callable 和 Runnable,也可作為一個任務(wù)提交到線程池中執(zhí)行。除了作為一個獨立的類,此類也提供創(chuàng)建自定義  task 類使用。FutureTask 的線程安全由CAS保證。

FutureTask 內(nèi)部維護了一個由volatile修飾的int型變量—state,代表當前任務(wù)的運行狀態(tài)

JUC-Future與FutureTask原理是什么

  • NEW:新建

  • COMPLETING:完成

  • NORMAL:正常運行

  • EXCEPTIONAL:異常退出

  • CANCELLED:任務(wù)取消

  • INTERRUPTING:線程中斷中

  • INTERRUPTED:線程已中斷

在這七種狀態(tài)中,有四種任務(wù)終止狀態(tài):NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各種狀態(tài)的轉(zhuǎn)化如下:

JUC-Future與FutureTask原理是什么

數(shù)據(jù)結(jié)構(gòu)及核心參數(shù)

JUC-Future與FutureTask原理是什么

JUC-Future與FutureTask原理是什么

//內(nèi)部持有的callable任務(wù),運行完畢后置空 private Callable<V> callable;  //從get()中返回的結(jié)果或拋出的異常 private Object outcome; // non-volatile, protected by state reads/writes  //運行callable的線程,在 run 時進行 CAS 操作 private volatile Thread runner;  //使用Treiber棧保存等待線程 private volatile WaitNode waiters;

FutureTask  繼承了Runnale和Future,本身也作為一個線程運行,可以提交給線程池執(zhí)行。維護了一個內(nèi)部類WaitNode,使用簡單的Treiber棧(無鎖并發(fā)棧)實現(xiàn),用于存儲等待線程。FutureTask  只有一個自定義的同步器 Sync 的屬性,所有的方法都是委派給此同步器來實現(xiàn)。這也是JUC里使用AQS的通用模式。

源碼解析

FutureTask 的同步器 由于Future在任務(wù)完成后,可以多次自由獲取結(jié)果,因此,用于控制同步的AQS使用共享模式。

FutureTask 底層任務(wù)的執(zhí)行狀態(tài)保存在AQS的狀態(tài)里。AQS是否允許線程獲取(是否阻塞)是取決于任務(wù)是否執(zhí)行完成,而不是具體的狀態(tài)值。

private final class Sync extends AbstractQueuedSynchronizer {     // 定義表示任務(wù)執(zhí)行狀態(tài)的常量。由于使用了位運算進行判斷,所以狀態(tài)值分別是2的冪。      // 表示任務(wù)已經(jīng)準備好了,可以執(zhí)行     private static final int READY     = 0;      // 表示任務(wù)正在執(zhí)行中     private static final int RUNNING   = 1;      // 表示任務(wù)已執(zhí)行完成     private static final int RAN       = 2;      // 表示任務(wù)已取消     private static final int CANCELLED = 4;       // 底層的表示任務(wù)的可執(zhí)行對象     private final Callable<V> callable;      // 表示任務(wù)執(zhí)行結(jié)果,用于get方法返回。     private V result;      // 表示任務(wù)執(zhí)行中的異常,用于get方法調(diào)用時拋出。     private Throwable exception;       /*      * 用于執(zhí)行任務(wù)的線程。在 set/cancel 方法后置為空,表示結(jié)果可獲取。      * 必須是 volatile的,用于確保完成后(result和exception)的可見性。      * (如果runner不是volatile,則result和exception必須都是volatile的)      */     private volatile Thread runner;        /**      * 已完成或已取消 時成功獲取      */     protected int tryAcquireShared( int ignore) {         return innerIsDone() ? 1 : -1;     }      /**      * 在設(shè)置最終完成狀態(tài)后讓AQS總是通知,通過設(shè)置runner線程為空。      * 這個方法并沒有更新AQS的state屬性,      * 所以可見性是通過對volatile的runner的寫來保證的。      */     protected boolean tryReleaseShared( int ignore) {         runner = null;         return true;     }        // 執(zhí)行任務(wù)的方法     void innerRun() {         // 用于確保任務(wù)不會重復(fù)執(zhí)行         if (!compareAndSetState(READY, RUNNING))             return;          // 由于Future一般是異步執(zhí)行,所以runner一般是線程池里的線程。         runner = Thread.currentThread();          // 設(shè)置執(zhí)行線程后再次檢查,在執(zhí)行前檢查是否被異步取消         // 由于前面的CAS已把狀態(tài)設(shè)置RUNNING,         if (getState() == RUNNING) { // recheck after setting thread             V result;             //             try {                 result = callable.call();             } catch (Throwable ex) {                 // 捕獲任務(wù)執(zhí)行過程中拋出的所有異常                 setException(ex);                 return;             }             set(result);         } else {       // 釋放等待的線程             releaseShared(0); // cancel         }     }      // 設(shè)置結(jié)果     void innerSet(V v) {         // 放在循環(huán)里進行是為了失敗后重試。         for (;;) {             // AQS初始化時,狀態(tài)值默認是 0,對應(yīng)這里也就是 READY 狀態(tài)。             int s = getState();              // 已完成任務(wù)不能設(shè)置結(jié)果             if (s == RAN)                 return;              // 已取消 的任務(wù)不能設(shè)置結(jié)果             if (s == CANCELLED) {                 // releaseShared 會設(shè)置runner為空,                 // 這是考慮到與其他的取消請求線程 競爭中斷 runner                 releaseShared(0);                 return;             }              // 先設(shè)置已完成,免得多次設(shè)置             if (compareAndSetState(s, RAN)) {                 result = v;                 releaseShared(0); // 此方法會更新 runner,保證result的可見性                 done();                 return;             }         }     }      // 獲取異步計算的結(jié)果     V innerGet() throws InterruptedException, ExecutionException {         acquireSharedInterruptibly(0);// 獲取共享,如果沒有完成則會阻塞。          // 檢查是否被取消         if (getState() == CANCELLED)             throw new CancellationException();          // 異步計算過程中出現(xiàn)異常         if (exception != null)             throw new ExecutionException(exception);          return result;     }      // 取消執(zhí)行任務(wù)     boolean innerCancel( boolean mayInterruptIfRunning) {         for (;;) {             int s = getState();              // 已完成或已取消的任務(wù)不能再次取消             if (ranOrCancelled(s))                 return false;              // 任務(wù)處于 READY 或 RUNNING             if (compareAndSetState(s, CANCELLED))                 break;         }         // 任務(wù)取消后,中斷執(zhí)行線程         if (mayInterruptIfRunning) {             Thread r = runner;             if (r != null)                 r.interrupt();         }         releaseShared(0); // 釋放等待的訪問結(jié)果的線程         done();         return true;     }      /**      * 檢查任務(wù)是否處于完成或取消狀態(tài)      */     private boolean ranOrCancelled( int state) {         return (state & (RAN | CANCELLED)) != 0;     }       // 其他方法省略 }

從 innerCancel  方法可知,取消操作只是改變了任務(wù)對象的狀態(tài)并可能會中斷執(zhí)行線程。如果任務(wù)的邏輯代碼沒有響應(yīng)中斷,則會一直異步執(zhí)行直到完成,只是最終的執(zhí)行結(jié)果不會被通過get方法返回,計算資源的開銷仍然是存在的。

總的來說,F(xiàn)uture 是線程間協(xié)調(diào)的一種工具。

AbstractExecutorService.submit(Callable task)

FutureTask  內(nèi)部實現(xiàn)方法都很簡單,先從線程池的submit分析。submit方法默認實現(xiàn)在AbstractExecutorService,幾種實現(xiàn)源碼如下:

public Future<?> submit(Runnable task) {     if (task == null) throw new NullPointerException();     RunnableFuture<Void> ftask = newTaskFor(task, null);     execute(ftask);     return ftask; } public <T> Future<T> submit(Runnable task, T result) {     if (task == null) throw new NullPointerException();     RunnableFuture<T> ftask = newTaskFor(task, result);     execute(ftask);     return ftask; } public <T> Future<T> submit(Callable<T> task) {     if (task == null) throw new NullPointerException();     RunnableFuture<T> ftask = newTaskFor(task);     execute(ftask);     return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {     return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) {     this.callable = Executors.callable(runnable, result);     this.state = NEW;       // ensure visibility of callable }

首先調(diào)用newTaskFor方法構(gòu)造FutureTask,然后調(diào)用execute把任務(wù)放進線程池中,返回FutureTask

FutureTask.run()

JUC-Future與FutureTask原理是什么

public void run() {     //新建任務(wù),CAS替換runner為當前線程     if (state != NEW ||         !UNSAFE.compareAndSwapObject(this, runnerOffset,                                      null, Thread.currentThread()))         return;     try {         Callable<V> c = callable;         if (c != null && state == NEW) {             V result;             boolean ran;             try {                 result = c.call();                 ran = true;             } catch (Throwable ex) {                 result = null;                 ran = false;                 setException(ex);             }             if (ran)                 set(result);//設(shè)置執(zhí)行結(jié)果         }     } finally {         // runner must be non-null until state is settled to         // prevent concurrent calls to run()         runner = null;         // state must be re-read after nulling runner to prevent         // leaked interrupts         int s = state;         if (s >= INTERRUPTING)             handlePossibleCancellationInterrupt(s);//處理中斷邏輯     } }

運行任務(wù),如果任務(wù)狀態(tài)為NEW狀態(tài),則利用CAS修改為當前線程。執(zhí)行完畢調(diào)用set(result)方法設(shè)置執(zhí)行結(jié)果。 set(result)源碼如下

JUC-Future與FutureTask原理是什么

首先利用cas修改state狀態(tài)為

JUC-Future與FutureTask原理是什么

設(shè)置返回結(jié)果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式設(shè)置state狀態(tài)為

JUC-Future與FutureTask原理是什么

結(jié)果設(shè)置完畢后,調(diào)用finishCompletion()喚醒等待線程

JUC-Future與FutureTask原理是什么

private void finishCompletion() {     for (WaitNode q; (q = waiters) != null;) {         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待線程             for (;;) {//自旋遍歷等待線程                 Thread t = q.thread;                 if (t != null) {                     q.thread = null;                     LockSupport.unpark(t);//喚醒等待線程                 }                 WaitNode next = q.next;                 if (next == null)                     break;                 q.next = null; // unlink to help gc                 q = next;             }             break;         }     }     //任務(wù)完成后調(diào)用函數(shù),自定義擴展     done();     callable = null;        // to reduce footprint }

回到run方法,如果在 run 期間被中斷,此時需要調(diào)用handlePossibleCancellationInterrupt處理中斷邏輯,確保任何中斷(例如cancel(true))只停留在當前run或runAndReset的任務(wù)中

JUC-Future與FutureTask原理是什么

private void handlePossibleCancellationInterrupt(int s) {     //在中斷者中斷線程之前可能會延遲,所以我們只需要讓出CPU時間片自旋等待     if (s == INTERRUPTING)         while (state == INTERRUPTING)             Thread.yield(); // wait out pending interrupt }

FutureTask.runAndReset()

JUC-Future與FutureTask原理是什么

runAndReset是  FutureTask另外一個任務(wù)執(zhí)行的方法,它不會返回執(zhí)行結(jié)果,而且在任務(wù)執(zhí)行完之后會重置stat的狀態(tài)為NEW,使任務(wù)可以多次執(zhí)行。  runAndReset的典型應(yīng)用是在 ScheduledThreadPoolExecutor 中,周期性的執(zhí)行任務(wù)。

FutureTask.get()

JUC-Future與FutureTask原理是什么

FutureTask 通過get()獲取任務(wù)執(zhí)行結(jié)果。如果任務(wù)處于未完成的狀態(tài)(state <=  COMPLETING),就調(diào)用awaitDone等待任務(wù)完成。任務(wù)完成后,通過report獲取執(zhí)行結(jié)果或拋出執(zhí)行期間的異常。

JUC-Future與FutureTask原理是什么

awaitDone(boolean timed, long nanos)

JUC-Future與FutureTask原理是什么

private int awaitDone(boolean timed, long nanos)     throws InterruptedException {     final long deadline = timed ? System.nanoTime() + nanos : 0L;     WaitNode q = null;     boolean queued = false;     for (;;) {//自旋         if (Thread.interrupted()) {//獲取并清除中斷狀態(tài)             removeWaiter(q);//移除等待WaitNode             throw new InterruptedException();         }          int s = state;         if (s > COMPLETING) {             if (q != null)                 q.thread = null;//置空等待節(jié)點的線程             return s;         }         else if (s == COMPLETING) // cannot time out yet             Thread.yield();         else if (q == null)             q = new WaitNode();         else if (!queued)             //CAS修改waiter             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                  q.next = waiters, q);         else if (timed) {             nanos = deadline - System.nanoTime();             if (nanos <= 0L) {                 removeWaiter(q);//超時,移除等待節(jié)點                 return state;             }             LockSupport.parkNanos(this, nanos);//阻塞當前線程         }         else             LockSupport.park(this);//阻塞當前線程     } }

awaitDone用于等待任務(wù)完成,或任務(wù)因為中斷或超時而終止。返回任務(wù)的完成狀態(tài)。

1.如果線程被中斷,首先清除中斷狀態(tài),調(diào)用removeWaiter移除等待節(jié)點,然后拋InterruptedException。removeWaiter源碼如下:

JUC-Future與FutureTask原理是什么

private void removeWaiter(WaitNode node) {     if (node != null) {         node.thread = null;//首先置空線程         retry:         for (;;) {          // restart on removeWaiter race             //依次遍歷查找             for (WaitNode pred = null, q = waiters, s; q != null; q = s) {                 s = q.next;                 if (q.thread != null)                     pred = q;                 else if (pred != null) {                     pred.next = s;                     if (pred.thread == null) // check for race                         continue retry;                 }                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替換                     continue retry;             }             break;         }     } }

2.如果當前為結(jié)束態(tài)(state>COMPLETING),則根據(jù)需要置空等待節(jié)點的線程,并返回 Future 狀態(tài)

3.如果當前為正在完成(COMPLETING),說明此時 Future 還不能做出超時動作,為任務(wù)讓出CPU執(zhí)行時間片

4.如果state為NEW,先新建一個WaitNode,然后CAS修改當前waiters

5.如果等待超時,則調(diào)用removeWaiter移除等待節(jié)點,返回任務(wù)狀態(tài);如果設(shè)置了超時時間但是尚未超時,則park阻塞當前線程

6.其他情況直接阻塞當前線程

FutureTask.cancel(boolean mayInterruptIfRunning)

  1. public boolean cancel(boolean mayInterruptIfRunning) { 

  2.     //如果當前Future狀態(tài)為NEW,根據(jù)參數(shù)修改Future狀態(tài)為INTERRUPTING或CANCELLED 

  3.     if (!(state == NEW && 

  4.           UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 

  5.               mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 

  6.         return false; 

  7.     try {    // in case call to interrupt throws exception 

  8.         if (mayInterruptIfRunning) {//可以在運行時中斷 

  9.             try { 

  10.                 Thread t = runner; 

  11.                 if (t != null) 

  12.                     t.interrupt(); 

  13.             } finally { // final state 

  14.                 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 

  15.             } 

  16.         } 

  17.     } finally { 

  18.         finishCompletion();//移除并喚醒所有等待線程 

  19.     } 

  20.     return true; 


說明:嘗試取消任務(wù)。如果任務(wù)已經(jīng)完成或已經(jīng)被取消,此操作會失敗。如果當前Future狀態(tài)為NEW,根據(jù)參數(shù)修改Future狀態(tài)為INTERRUPTING或CANCELLED。如果當前狀態(tài)不為NEW,則根據(jù)參數(shù)mayInterruptIfRunning決定是否在任務(wù)運行中也可以中斷。中斷操作完成后,調(diào)用finishCompletion移除并喚醒所有等待線程。

示例

JUC-Future與FutureTask原理是什么

JUC-Future與FutureTask原理是什么

到此,相信大家對“JUC-Future與FutureTask原理是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI