溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

多線程(二十二、異步執(zhí)行-Futrue模式)

發(fā)布時間:2020-06-19 04:44:21 來源:網(wǎng)絡 閱讀:532 作者:shayang88 欄目:編程語言

Future簡介

如果一個任務需要返回執(zhí)行結果,一般我們會實現(xiàn)一個Callable任務,并創(chuàng)建一個線程來執(zhí)行任務。對于執(zhí)行時間比較長的任務,顯然我們同步的等待結果再去執(zhí)行后續(xù)的業(yè)務是不現(xiàn)實的,那么,F(xiàn)uture模式是怎樣解決這個問題的呢?

Future模式,可以讓調(diào)用方立即返回,然后它自己會在后面慢慢處理,此時調(diào)用者拿到的僅僅是一個憑證,調(diào)用者可以先去處理其它任務,在真正需要用到調(diào)用結果的場合,再使用憑證去獲取調(diào)用結果。這個憑證就是這里的Future。

Future接口的定義:

public interface Future<V> {
      // 取消任務
    boolean cancel(boolean mayInterruptIfRunning);
      // 任務是否取消
    boolean isCancelled();
      // 標記任務是否執(zhí)行完成
    boolean isDone();
      // 阻塞獲取任務結果
    V get() throws InterruptedException, ExecutionException;
      // 超時獲取任務結果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

Future模式中,最重要的就是FutureTask類

多線程(二十二、異步執(zhí)行-Futrue模式)

FutureTask一共給任務定義了7種狀態(tài)

1、NEW:表示任務的初始化狀態(tài);
2、COMPLETING:表示任務已執(zhí)行完成(正常完成或異常完成),但任務結果或異常原因還未設置完成,屬于中間狀態(tài);
3、NORMAL:表示任務已經(jīng)執(zhí)行完成(正常完成),且任務結果已設置完成,屬于最終狀態(tài);
4、EXCEPTIONAL:表示任務已經(jīng)執(zhí)行完成(異常完成),且任務異常已設置完成,屬于最終狀態(tài);
5、CANCELLED:表示任務還沒開始執(zhí)行就被取消(非中斷方式),屬于最終狀態(tài);
6、INTERRUPTING:表示任務還沒開始執(zhí)行就被取消(中斷方式),正式被中斷前的過渡狀態(tài),屬于中間狀態(tài);
7、INTERRUPTED:表示任務還沒開始執(zhí)行就被取消(中斷方式),且已被中斷,屬于最終狀態(tài)。

各個狀態(tài)之間的流轉(zhuǎn):

多線程(二十二、異步執(zhí)行-Futrue模式)

FutureTask構造

FutureTask在構造時可以接受Runnable或Callable任務,如果是Runnable,則最終包裝成Callable:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
                // 包裝Runnable成為Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask成員

private volatile int state;//任務狀態(tài)
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

private Callable<V> callable;       // 真正的任務
private volatile Thread runner;     // 保存正在執(zhí)行任務的線程

/**
 * 記錄結果或異常
 */
private Object outcome;

/**
 * 無鎖棧(Treiber stack)
 * 保存等待線程
 */
private volatile WaitNode waiters;

當調(diào)用FutureTask的get方法時,如果任務沒有完成,則調(diào)用線程會被阻塞,其實就是將線程包裝成WaitNode結點保存到waiters指向的棧中。

static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

任務執(zhí)行run

public void run() {
    // 僅當任務為NEW狀態(tài)時, 才能執(zhí)行任務
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                            //執(zhí)行任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                                //設置異常
                setException(ex);
            }
            if (ran)
                            //設置任務執(zhí)行結果outcome
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set方法:

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;//存儲結果值
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

任務取消

public boolean cancel(boolean mayInterruptIfRunning) {
    // 僅NEW狀態(tài)下可以取消任務
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;

    try {   
        if (mayInterruptIfRunning) {    // 中斷任務
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
            //釋放所有在棧上等待的線程
        finishCompletion();
    }
    return true;
}

任務取消后,最終調(diào)用finishCompletion方法,釋放所有在棧上等待的線程

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) { //自旋釋放所有等待線程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//喚醒線程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

獲取結果

FutureTask可以通過get方法獲取任務結果,如果需要限時等待,可以調(diào)用get(long timeout, TimeUnit unit)

public V get() throws InterruptedException, ExecutionException {
    int s = state;
        //當前任務的狀態(tài)是NEW或COMPLETING,會調(diào)用awaitDone阻塞線程
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);   // 任務執(zhí)行結果
}
/**
 * 返回執(zhí)行結果.
 */
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V) x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable) x);
}

ScheduledFutureTask

1、ScheduledFutureTask在普通FutureTask的基礎上增加了周期執(zhí)行/延遲執(zhí)行的功能
2、ScheduledFutureTask是ScheduledThreadPoolExecutor這個線程池的默認調(diào)度任務類,通過繼承FutureTask和Delayed接口來實現(xiàn)周期/延遲功能的。
多線程(二十二、異步執(zhí)行-Futrue模式)

ScheduledFutureTask的源碼非常簡單,基本都是委托FutureTask來實現(xiàn)的

任務運行
public void run() {
    // 是否是周期任務
    boolean periodic = isPeriodic();  
        //// 能否運行任務
    if (!canRunInCurrentRunState(periodic))      
        cancel(false);
    else if (!periodic)  // 非周期任務:調(diào)用FutureTask的run方法運行
        ScheduledFutureTask.super.run();
                // 周期任務:調(diào)用FutureTask的runAndReset方法運行
    else if (ScheduledFutureTask.super.runAndReset()) { 
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

FutureTask的runAndReset方法與run方法的區(qū)別就是當任務正常執(zhí)行完成后,不會設置任務的最終狀態(tài)(即保持NEW狀態(tài)),以便任務重復執(zhí)行:

protected boolean runAndReset() {
    // 僅NEW狀態(tài)的任務可以執(zhí)行
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return false;

    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); //不設置執(zhí)行結果
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;//重新設置任務狀態(tài)為NEW,繼續(xù)重復執(zhí)行
}
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI