溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java異步之call future的示例分析

發(fā)布時間:2021-06-15 09:18:41 來源:億速云 閱讀:161 作者:小新 欄目:開發(fā)技術(shù)

這篇文章給大家分享的是有關(guān)Java異步之call future的示例分析的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

一、概述

我們大家都知道,在 Java 中創(chuàng)建線程主要有三種方式:

  • 繼承 Thread 類;

  • 實現(xiàn) Runnable 接口;

  • 實現(xiàn) Callable 接口。

而后兩者的區(qū)別在于 Callable 接口中的 call() 方法可以異步地返回一個計算結(jié)果 Future,并且一般需要配合ExecutorService 來執(zhí)行。這一套操作在代碼實現(xiàn)上似乎也并不難,可是對于call()方法具體怎么(被ExecutorService)執(zhí)行的,以及 Future 這個結(jié)果是怎么獲取的,卻又不是很清楚了。

那么本篇文章,我們就一起來學(xué)習(xí)下 Callable 接口以及 Future 的使用,主要面向兩個問題:

  • 承載著具體任務(wù)的 call() 方法如何被執(zhí)行的?

  • 任務(wù)的執(zhí)行結(jié)果如何得到?

你可能會說,這兩個難道不是一個問題嗎?任務(wù)執(zhí)行了就會有返回結(jié)果,而返回結(jié)果也一定是任務(wù)執(zhí)行了才返回的,難道還能返回一個其他任務(wù)的結(jié)果么??不要著急,耐心的看下去,你就會發(fā)現(xiàn),這兩個還真的就是一個問題。

本文將分為兩個部分,第一部分分別介紹 任務(wù)執(zhí)行、以及結(jié)果這三個概念在 Java API 中的實體和各自的繼承關(guān)系,第二部分通過一個簡單的例子回顧他們的用法,再理解下這兩個問題的答案。

二、Callable、Executor 與 Future

既然是一個任務(wù)被執(zhí)行并返回結(jié)果,那么我們先來看看具體的任務(wù),也就是 Callable 接口。

2.1、任務(wù):Callable

非常簡單,只包含一個有泛型返回值的 call() 方法,需要在最后返回定義類型的結(jié)果。如果任務(wù)沒有需要返回的結(jié)果,那么將泛型 V 設(shè)為 void 并return null;就可以了。對比的是 Runnable,另一個明顯的區(qū)別則是 Callable可以拋出異常。

public interface Callable<V> {
    V call() throws Exception;
}


public interface Runnable {
    public abstract void run();
}

2.2、執(zhí)行:ExecutorService

說到線程就少不了線程池,而說到線程池肯定離不開 Executor 接口。下面這幅圖是 Executor 的框架,我們常用的是其中的兩個具體實現(xiàn)類 ThreadPoolExecutor 以及 ScheduledThreadPoolExecutor,在 Executors 類中通過靜態(tài)方法獲取。Executors 中包含了線程池以及線程工廠的構(gòu)造,與 Executor 接口的關(guān)系類似于 Collection 接口和 Collections 類的關(guān)系。

Java異步之call future的示例分析

那么我們自頂向下,從源碼上了解一下 Executor 框架,學(xué)習(xí)學(xué)習(xí)任務(wù)是如何被執(zhí)行的。首先是 Executor 接口,其中只定義了 execute() 方法。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService 接口繼承了 Executor 接口,主要擴(kuò)展了一系列的 submit() 方法以及對 executor 的終止和判斷狀態(tài)。以第一個<T> Future<T> submit(Callable<T> task);為例,其中 task 為用戶定義的執(zhí)行的異步任務(wù),F(xiàn)uture 表示了任務(wù)的執(zhí)行結(jié)果,泛型 T 代表任務(wù)結(jié)果的類型。

public interface ExecutorService extends Executor {

    void shutdown();                // 現(xiàn)有任務(wù)完成后停止線程池
 
    List<Runnable> shutdownNow();   // 立即停止線程池

    boolean isShutdown();           // 判斷是否已停止

    boolean isTerminated();

    <T> Future<T> submit(Callable<T> task);        // 提交Callale任務(wù)

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    // 針對Callable集合的invokeAll()等方法
}

抽象類AbstractExecutorServiceThreadPoolExecutor 的基類,在下面的代碼中,它實現(xiàn)了ExecutorService 接口中的 submit() 方法。注釋中是對應(yīng)的 newTaskFor() 方法的代碼,非常簡單,就是將傳入的Callable 或 Runnable 參數(shù)封裝成一個 FutureTask 對象。

// 1.第一個重載方法,參數(shù)為Callable
public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  // return new FutureTask<T>(callable);
  execute(ftask);
  return ftask;
}

// 2.第二個重載方法,參數(shù)為Runnable
public Future<?> submit(Runnable task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<Void> ftask = newTaskFor(task, null);
  // return new FutureTask<T>(task, null);
  execute(ftask);
  return ftask;
}

// 3.第三個重載方法,參數(shù)為Runnable + 返回對象
public <T> Future<T> submit(Runnable task, T result) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task, result);
  // return new FutureTask<T>(task, result);
  execute(ftask);
  return ftask;
}

那么也就是說,無論傳入的是 Callable 還是 Runnable,submit() 方法其實就做了三件事

Java異步之call future的示例分析

具體來說,submit() 中首先生成了一個 RunnableFuture 引用的 FutureTask 實例,然后調(diào)用 execute() 方法來執(zhí)行它,那么我們可以推測 FutureTask 繼承自 RunnableFuture,而 RunnableFuture 又實現(xiàn)了 Runnable,因為execute() 的參數(shù)應(yīng)為 Runnable 類型。上面還涉及到了 FutureTask 的構(gòu)造函數(shù),也來看一下。

public FutureTask(Callable<V> callable) {
  this.callable = callable;
  this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
  this.callable = Executors.callable(runnable, result); // 通過適配器將runnable在call()中執(zhí)行并返回result
  this.state = NEW;
}

FutureTask 共有兩個構(gòu)造方法。第一個構(gòu)造方法比較簡單,對應(yīng)上面的第一個 submit(),采用組合的方式封裝Callable 并將狀態(tài)設(shè)為NEW;而第二個構(gòu)造方法對應(yīng)上面的后兩個 submit() 重載,不同之處是首先使用了Executors.callable來將 Runnable 和 result 組合成 Callable,這里采用了適配器RunnableAdapter implements Callable,巧妙地在 call() 中執(zhí)行 Runnable 并返回結(jié)果。

static final class RunnableAdapter<T> implements Callable<T> {
  final Runnable task;
  final T result;                // 返回的結(jié)果;顯然:需要在run()中賦值

  RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
  }
  public T call() {
    task.run();
    return result;
  }
}

在適配器設(shè)計模式中,通常包含**目標(biāo)接口 Target、適配器 Adapter 和被適配者 Adaptee **三類角色,其中目標(biāo)接口代表客戶端(當(dāng)前業(yè)務(wù)系統(tǒng))所需要的功能,通常為借口或抽象類;被適配者為現(xiàn)存的不能滿足使用需求的類;適配器是一個轉(zhuǎn)換器,也稱 wrapper,用于給被適配者添加目標(biāo)功能,使得客戶端可以按照目標(biāo)接口的格式正確訪問。對于 RunnableAdapter 來說,Callable 是其目標(biāo)接口,而 Runnable 則是被適配者。RunnableAdapter 通過覆蓋 call() 方法使其可按照 Callable 的要求來使用,同時其構(gòu)造方法中接收被適配者和目標(biāo)對象,滿足了 call() 方法有返回值的要求。

Java異步之call future的示例分析

那么總結(jié)一下 submit() 方法執(zhí)行的流程,就是:Callable 被封裝在 Runnable 的子類中傳入 execute() 得以執(zhí)行。

2.3、結(jié)果:Future

要說 Future 就是異步任務(wù)的執(zhí)行結(jié)果其實并不準(zhǔn)確,因為它代表了一個任務(wù)的執(zhí)行過程,有狀態(tài)、可以被取消,而 get() 方法的返回值才是任務(wù)的結(jié)果。

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

我們在上面中還提到了 RuunableFuture 和 FutureTask。從官方的注釋來看,RuunableFuture 就是一個可以 run的 future,實現(xiàn)了 Runnable 和 Future 兩個接口,在 run() 方法中執(zhí)行完計算時應(yīng)該將結(jié)果保存起來以便通過 get()獲取。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation unless it has been cancelled.
     */
    void run();
}

FutureTask 直接實現(xiàn)了 RunnableFuture 接口,作為執(zhí)行過程,共有下面這幾種狀態(tài),其中 COMPLETING 為一個暫時狀態(tài),表示正在設(shè)置結(jié)果或異常,對應(yīng)的,設(shè)置完成后狀態(tài)變?yōu)?NORMAL 或 EXCEPTIONAL;CANCELLED、INTERRUPTED 表示任務(wù)被取消或中斷。在上面的構(gòu)造方法中,將 state 初始化為 NEW。

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

然后是 FutureTask 的主要內(nèi)容,主要是 run() 和 get()。注意 outcome 的注釋,無論是否發(fā)生異常返回的都是這個 outcome,因為在執(zhí)行中如果執(zhí)行成功就將結(jié)果設(shè)置給了它(set()),而發(fā)生異常時將異常賦給了他(setException()),而在獲取結(jié)果時也都返回了 outcome(通過report())。

public class FutureTask<V> implements RunnableFuture<V> {
    
    private Callable<V> callable;         // target,待執(zhí)行的任務(wù)
    
    /** 保存執(zhí)行結(jié)果或異常,在get()方法中返回/拋出 */
    private Object outcome; // 非volatile,通過CAS保證線程安全
    
    
    public void run() {
        ......
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();            // 調(diào)用call()執(zhí)行用戶任務(wù)并獲取結(jié)果
                ran = true;                   // 執(zhí)行完成,ran置為true
            } catch (Throwable ex) {          // 調(diào)用call()出現(xiàn)異常,而run()方法繼續(xù)執(zhí)行
                 result = null;
                 ran = false;
                 setException(ex);            
                 // setException(Throwable t): compareAndSwapInt(NEW, COMPLETING);  outcome = t;      
            }
            if (ran)
                set(result);                  
            	// set(V v): compareAndSwapInt(NEW, COMPLETING);  outcome = v;
        }
    }
    
    
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);         // 加入隊列等待COMPLETING完成,可響應(yīng)超時、中斷
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 超時等待
    }
    
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)                              // 將outcome作為執(zhí)行結(jié)果返回
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);   // 將outcome作為捕獲的返回
    }
}

FutureTask 實現(xiàn)了 RunnableFuture 接口,所以有兩方面的作用。

  • 第一,作為 Runnable 傳入 execute() 方法來執(zhí)行,同時封裝 Callable 對象并在 run() 中調(diào)用其 call() 方法;

  • 第二,作為 Future 管理任務(wù)的執(zhí)行狀態(tài),將 call() 的返回值保存在 outcome 中以通過 get() 獲取。這似乎就能回答開頭的兩個問題,并且渾然天成,就好像是一個問題,除非發(fā)生異常的時候返回的不是任務(wù)的結(jié)果而是異常對象。

總結(jié)一下繼承關(guān)系:

Java異步之call future的示例分析

三、使用舉例

文章的標(biāo)題有點唬人,說到底還是講 Callable 的用法。現(xiàn)在我們知道了 Future 代表了任務(wù)執(zhí)行的過程和結(jié)果,作為 call() 方法的返回值來獲取執(zhí)行結(jié)果;而 FutureTask 是一個 Runnable 的 Future,既是任務(wù)執(zhí)行的過程和結(jié)果,又是 call 方法最終執(zhí)行的載體。下面通過一個例子看看他們在使用上的區(qū)別。

首先創(chuàng)建一個任務(wù),即定義一個任務(wù)類實現(xiàn) Callable 接口,在 call() 方法里添加我們的操作,這里用耗時三秒然后返回 100 模擬計算過程。

class MyTask implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("子線程開始計算...");
        for (int i=0;i<3;++i){
            Thread.sleep(1000);
            System.out.println("子線程計算中,用時 "+(i+1)+" 秒");
        }
        System.out.println("子線程計算完成,返回:100");
        return 100;
    }
}

然后呢,創(chuàng)建一個線程池,并實例化一個 MyTask 備用。

ExecutorService executor = Executors.newCachedThreadPool();
MyTask task = new MyTask();

現(xiàn)在,分別使用 Future 和 FutureTask 來獲取執(zhí)行結(jié)果,看看他們有什么區(qū)別。

3.1、使用Future

Future 一般作為 submit() 的返回值使用,并在主線程中以阻塞的方式獲取異步任務(wù)的執(zhí)行結(jié)果。

System.out.println("主線程啟動線程池");
Future<Integer> future = executor.submit(task);
System.out.println("主線程得到返回結(jié)果:"+future.get());
executor.shutdown();

看看輸出結(jié)果:

主線程啟動線程池

子線程開始計算...

子線程計算中,用時 1 秒

子線程計算中,用時 2 秒

子線程計算中,用時 3 秒

子線程計算完成,返回:100

主線程得到返回結(jié)果:100

由于 get() 方法阻塞獲取結(jié)果,所以輸出順序為子線程計算完成后主線程輸出結(jié)果。

3.2、使用FutureTask

由于 FutureTask 集任務(wù)與結(jié)果于一身,所以我們可以使用 FutureTask 自身而非返回值來管理任務(wù),這需要首先利用 Callable 對象來構(gòu)造 FutureTask,并調(diào)用不同的submit()重載方法。

System.out.println("主線程啟動線程池");
FutureTask<Integer> futureTask = new FutureTask<>(task);
executor.submit(futureTask);                                 // 作為Ruunable傳入submit()中
System.out.println("主線程得到返回結(jié)果:"+futureTask.get());    // 作為Future獲取結(jié)果
executor.shutdown();

這段程序的輸出與上面中完全相同,其實兩者在實際執(zhí)行中的區(qū)別也不大,雖然前者調(diào)用了submit(Callable<T> task)而后者調(diào)用了submit(Runnable task),但最終都通過execute(futuretask)來把任務(wù)加入線程池中。

四、總結(jié)

上面大費周章其實只是盡可能細(xì)致地講清楚了 Callable 中的任務(wù)是如何執(zhí)行的,總結(jié)起來就是:

線程池中,submit() 方法實際上將 Callable 封裝在 FutureTask 中,將其作為 Runnable 的子類傳給 execute()真正執(zhí)行;FutureTask 在 run() 中調(diào)用 Callable 對象的 call() 方法并接收返回值或捕獲異常保存在Object outcome中,同時管理執(zhí)行過程中的狀態(tài)state;FutureTask 同時作為 Future 的子類,通過 get() 返回任務(wù)的執(zhí)行結(jié)果,若未執(zhí)行完成則通過等待隊列進(jìn)行阻塞等待完成;

FutureTask 作為一個 Runnable 的 Future,其中最重要的兩個方法如下。

Java異步之call future的示例分析

感謝各位的閱讀!關(guān)于“Java異步之call future的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI