溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

死磕 java線程系列之線程池深入解析——未來任務(wù)執(zhí)行流程

發(fā)布時間:2020-06-16 00:26:57 來源:網(wǎng)絡(luò) 閱讀:173 作者:彤哥讀源碼 欄目:編程語言

死磕 java線程系列之線程池深入解析——未來任務(wù)執(zhí)行流程

(手機(jī)橫屏看源碼更方便)


注:java源碼分析部分如無特殊說明均基于 java8 版本。

注:線程池源碼部分如無特殊說明均指ThreadPoolExecutor類。

簡介

前面我們一起學(xué)習(xí)了線程池中普通任務(wù)的執(zhí)行流程,但其實線程池中還有一種任務(wù),叫作未來任務(wù)(future task),使用它您可以獲取任務(wù)執(zhí)行的結(jié)果,它是怎么實現(xiàn)的呢?

建議學(xué)習(xí)本章前先去看看彤哥之前寫的《死磕 java線程系列之自己動手寫一個線程池(續(xù))》,有助于理解本章的內(nèi)容,且那邊的代碼比較短小,學(xué)起來相對容易一些。

問題

(1)線程池中的未來任務(wù)是怎么執(zhí)行的?

(2)我們能學(xué)到哪些比較好的設(shè)計模式?

(3)對我們未來學(xué)習(xí)別的框架有什么幫助?

來個栗子

我們還是從一個例子入手,來講解來章的內(nèi)容。

我們定義一個線程池,并使用它提交5個任務(wù),這5個任務(wù)分別返回0、1、2、3、4,在未來的某一時刻,我們再取用它們的返回值,做一個累加操作。

public class ThreadPoolTest02 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 新建一個固定5個線程的線程池
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        List<Future<Integer>> futureList = new ArrayList<>();
        // 提交5個任務(wù),分別返回0、1、2、3、4
        for (int i = 0; i < 5; i++) {
            int num = i;

            // 任務(wù)執(zhí)行的結(jié)果用Future包裝
            Future<Integer> future = threadPool.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("return: " + num);
                // 返回值
                return num;
            });

            // 把future添加到list中
            futureList.add(future);
        }

        // 任務(wù)全部提交完再從future中g(shù)et返回值,并做累加
        int sum = 0;
        for (Future<Integer> future : futureList) {
            sum += future.get();
        }

        System.out.println("sum=" + sum);
    }
}

這里我們思考兩個問題:

(1)如果這里使用普通任務(wù),要怎么寫,時間大概是多少?

如果使用普通任務(wù),那么就要把累加操作放到任務(wù)里面,而且并不是那么好寫(final的問題),總時間大概是1秒多一點。但是,這樣有一個缺點,就是累加操作跟任務(wù)本身的內(nèi)容耦合到一起了,后面如果改成累乘,還要修改任務(wù)的內(nèi)容。

(2)如果這里把future.get()放到for循環(huán)里面,時間大概是多少?

這個問題我們先不回答,先來看源碼分析。

submit()方法

submit方法,它是提交有返回值任務(wù)的一種方式,內(nèi)部使用未來任務(wù)(FutureTask)包裝,再交給execute()去執(zhí)行,最后返回未來任務(wù)本身。

public <T> Future<T> submit(Callable<T> task) {
    // 非空檢測
    if (task == null) throw new NullPointerException();
    // 包裝成FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    // 交給execute()方法去執(zhí)行
    execute(ftask);
    // 返回futureTask
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    // 將普通任務(wù)包裝成FutureTask
    return new FutureTask<T>(callable);
}

這里的設(shè)計很巧妙,實際上這兩個方法都是在AbstractExecutorService這個抽象類中完成的,這是模板方法的一種運用。

我們來看看FutureTask的繼承體系:

死磕 java線程系列之線程池深入解析——未來任務(wù)執(zhí)行流程

FutureTask實現(xiàn)了RunnableFuture接口,而RunnableFuture接口組合了Runnable接口和Future接口的能力,而Future接口提供了get任務(wù)返回值的能力。

問題:submit()方法返回的為什么是Future接口而不是RunnableFuture接口或者FutureTask類呢?

答:這是因為submit()返回的結(jié)果,對外部調(diào)用者只想暴露其get()的能力(Future接口),而不想暴露其run()的能力(Runaable接口)。

FutureTask類的run()方法

經(jīng)過上一章的學(xué)習(xí),我們知道execute()方法最后調(diào)用的是task的run()方法,上面我們傳進(jìn)去的任務(wù),最后被包裝成了FutureTask,也就是說execute()方法最后會調(diào)用到FutureTask的run()方法,所以我們直接看這個方法就可以了。

public void run() {
    // 狀態(tài)不為NEW,或者修改為當(dāng)前線程來運行這個任務(wù)失敗,則直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;

    try {
        // 真正的任務(wù)
        Callable<V> c = callable;
        // state必須為NEW時才運行
        if (c != null && state == NEW) {
            // 運行的結(jié)果
            V result;
            boolean ran;
            try {
                // 任務(wù)執(zhí)行的地方【本文由公從號“彤哥讀源碼”原創(chuàng)】
                result = c.call();
                // 已執(zhí)行完畢
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 處理異常
                setException(ex);
            }
            if (ran)
                // 處理結(jié)果
                set(result);
        }
    } finally {
        // 置空runner
        runner = null;
        // 處理中斷
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

可以看到代碼也比較簡單,先做狀態(tài)的檢測,再執(zhí)行任務(wù),最后處理結(jié)果或異常。

執(zhí)行任務(wù)這里沒啥問題,讓我們看看處理結(jié)果或異常的代碼。

protected void setException(Throwable t) {
    // 將狀態(tài)從NEW置為COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 返回值置為傳進(jìn)來的異常(outcome為調(diào)用get()方法時返回的)
        outcome = t;
        // 最終的狀態(tài)設(shè)置為EXCEPTIONAL
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        // 調(diào)用完成方法
        finishCompletion();
    }
}
protected void set(V v) {
    // 將狀態(tài)從NEW置為COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 返回值置為傳進(jìn)來的結(jié)果(outcome為調(diào)用get()方法時返回的)
        outcome = v;
        // 最終的狀態(tài)設(shè)置為NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        // 調(diào)用完成方法
        finishCompletion();
    }
}

咋一看,這兩個方法似乎差不多,不同的是出去的結(jié)果不一樣且狀態(tài)不一樣,最后都調(diào)用了finishCompletion()方法。

private void finishCompletion() {
    // 如果隊列不為空(這個隊列實際上為調(diào)用者線程)
    for (WaitNode q; (q = waiters) != null;) {
        // 置空隊列
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 調(diào)用者線程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 如果調(diào)用者線程不為空,則喚醒它
                    // 【本文由公從號“彤哥讀源碼”原創(chuàng)】
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 鉤子方法,子類重寫
    done();
    // 置空任務(wù)
    callable = null;        // to reduce footprint
}

整個run()方法總結(jié)下來:

(1)FutureTask有一個狀態(tài)state控制任務(wù)的運行過程,正常運行結(jié)束state從NEW->COMPLETING->NORMAL,異常運行結(jié)束state從NEW->COMPLETING->EXCEPTIONAL;

(2)FutureTask保存了運行任務(wù)的線程runner,它是線程池中的某個線程;

(3)調(diào)用者線程是保存在waiters隊列中的,它是什么時候設(shè)置進(jìn)去的呢?

(4)任務(wù)執(zhí)行完畢,除了設(shè)置狀態(tài)state變化之外,還要喚醒調(diào)用者線程。

調(diào)用者線程是什么時候保存在FutureTask中(waiters)的呢?查看構(gòu)造方法:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

發(fā)現(xiàn)并沒有相關(guān)信息,我們再試想一下,如果調(diào)用者不調(diào)用get()方法,那么這種未來任務(wù)是不是跟普通任務(wù)沒有什么區(qū)別?確實是的哈,所以只有調(diào)用get()方法了才有必要保存調(diào)用者線程到FutureTask中。

所以,我們來看看get()方法中是什么鬼。

FutureTask類的get()方法

get()方法調(diào)用時如果任務(wù)未執(zhí)行完畢,會阻塞直到任務(wù)結(jié)束。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果狀態(tài)小于等于COMPLETING,則進(jìn)入隊列等待
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    // 返回結(jié)果(異常)
    return report(s);
}

是不是很清楚,如果任務(wù)狀態(tài)小于等于COMPLETING,則進(jìn)入隊列等待。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 我們這里假設(shè)不帶超時
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 處理中斷
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        // 4. 如果狀態(tài)大于COMPLETING了,則跳出循環(huán)并返回
        // 這是自旋的出口
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 如果狀態(tài)等于COMPLETING,說明任務(wù)快完成了,就差設(shè)置狀態(tài)到NORMAL或EXCEPTIONAL和設(shè)置結(jié)果了
        // 這時候就讓出CPU,優(yōu)先完成任務(wù)
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 1. 如果隊列為空
        else if (q == null)
            // 初始化隊列(WaitNode中記錄了調(diào)用者線程)
            q = new WaitNode();
        // 2. 未進(jìn)入隊列
        else if (!queued)
            // 嘗試入隊
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 超時處理
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        // 3. 阻塞當(dāng)前線程(調(diào)用者線程)
        else
            // 【本文由公從號“彤哥讀源碼”原創(chuàng)】
            LockSupport.park(this);
    }
}

這里我們假設(shè)調(diào)用get()時任務(wù)還未執(zhí)行,也就是其狀態(tài)為NEW,我們試著按上面標(biāo)示的1、2、3、4走一遍邏輯:

(1)第一次循環(huán),狀態(tài)為NEW,直接到1處,初始化隊列并把調(diào)用者線程封裝在WaitNode中;

(2)第二次循環(huán),狀態(tài)為NEW,隊列不為空,到2處,讓包含調(diào)用者線程的WaitNode入隊;

(3)第三次循環(huán),狀態(tài)為NEW,隊列不為空,且已入隊,到3處,阻塞調(diào)用者線程;

(4)假設(shè)過了一會任務(wù)執(zhí)行完畢了,根據(jù)run()方法的分析最后會unpark調(diào)用者線程,也就是3處會被喚醒;

(5)第四次循環(huán),狀態(tài)肯定大于COMPLETING了,退出循環(huán)并返回;

問題:為什么要在for循環(huán)中控制整個流程呢,把這里的每一步單獨拿出來寫行不行?

答:因為每一次動作都需要重新檢查狀態(tài)state有沒有變化,如果拿出去寫也是可以的,只是代碼會非常冗長。這里只分析了get()時狀態(tài)為NEW,其它的狀態(tài)也可以自行驗證,都是可以保證正確的,甚至兩個線程交叉運行(斷點的技巧)。

OK,這里返回之后,再看看是怎么處理最終的結(jié)果的。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 任務(wù)正常結(jié)束
    if (s == NORMAL)
        return (V)x;
    // 被取消了
    if (s >= CANCELLED)
        throw new CancellationException();
    // 執(zhí)行異常
    throw new ExecutionException((Throwable)x);
}

還記得前面分析run的時候嗎,任務(wù)執(zhí)行異常時是把異常放在outcome里面的,這里就用到了。

(1)如果正常執(zhí)行結(jié)束,則返回任務(wù)的返回值;

(2)如果異常結(jié)束,則包裝成ExecutionException異常拋出;

通過這種方式,線程中出現(xiàn)的異常也可以返回給調(diào)用者線程了,不會像執(zhí)行普通任務(wù)那樣調(diào)用者是不知道任務(wù)執(zhí)行到底有沒有成功的。

其它

FutureTask除了可以獲取任務(wù)的返回值以外,還能夠取消任務(wù)的執(zhí)行。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

這里取消任務(wù)是通過中斷執(zhí)行線程來處理的,有興趣的同學(xué)可以自己分析一下。

回答開篇

如果這里把future.get()放到for循環(huán)里面,時間大概是多少?

答:大概會是5秒多一點,因為每提交一個任務(wù),都要阻塞調(diào)用者線程直到任務(wù)執(zhí)行完畢,每個任務(wù)執(zhí)行都是1秒多,所以總時間就是5秒多點。

總結(jié)

(1)未來任務(wù)是通過把普通任務(wù)包裝成FutureTask來實現(xiàn)的。

(2)通過FutureTask不僅能夠獲取任務(wù)執(zhí)行的結(jié)果,還有感知到任務(wù)執(zhí)行的異常,甚至還可以取消任務(wù);

(3)AbstractExecutorService中定義了很多模板方法,這是一種很重要的設(shè)計模式;

(4)FutureTask其實就是典型的異常調(diào)用的實現(xiàn)方式,后面我們學(xué)習(xí)到Netty、Dubbo的時候還會見到這種設(shè)計思想的。

彩蛋

RPC框架中異步調(diào)用是怎么實現(xiàn)的?

答:RPC框架常用的調(diào)用方式有同步調(diào)用、異步調(diào)用,其實它們本質(zhì)上都是異步調(diào)用,它們就是用FutureTask的方式來實現(xiàn)的。

一般地,通過一個線程(我們叫作遠(yuǎn)程線程)去調(diào)用遠(yuǎn)程接口,如果是同步調(diào)用,則直接讓調(diào)用者線程阻塞著等待遠(yuǎn)程線程調(diào)用的結(jié)果,待結(jié)果返回了再返回;如果是異步調(diào)用,則先返回一個未來可以獲取到遠(yuǎn)程結(jié)果的東西FutureXxx,當(dāng)然,如果這個FutureXxx在遠(yuǎn)程結(jié)果返回之前調(diào)用了get()方法一樣會阻塞著調(diào)用者線程。

有興趣的同學(xué)可以先去預(yù)習(xí)一下dubbo的異步調(diào)用(它是把Future扔到RpcContext中的)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI