溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

深度解讀 java 線程池設(shè)計思想及源碼實現(xiàn)

發(fā)布時間:2020-06-21 05:14:02 來源:網(wǎng)絡(luò) 閱讀:465 作者:Java月亮呀 欄目:編程語言

前言

我相信大家都看過很多的關(guān)于線程池的文章,基本上也是面試必問的,好像我寫這篇文章其實是沒有什么意義的,不過,我相信你也和我一樣,看了很多文章還是一知半解,甚至可能看了很多瞎說的文章。希望大家看過這篇文章以后,就可以完全掌握 Java 線程池了。

本文一大重點是源碼解析,不過線程池設(shè)計思想以及作者實現(xiàn)過程中的一些巧妙用法是我想傳達(dá)給讀者的。本文還是會一行行關(guān)鍵代碼進(jìn)行分析,目的是為了讓那些自己看源碼不是很理解的同學(xué)可以得到參考。

線程池是非常重要的工具,如果你要成為一個好的工程師,還是得比較好地掌握這個知識。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術(shù)水平。

本文略長,建議在 pc 上閱讀,邊看文章邊翻源碼(Java7 和 Java8 都一樣),建議想好好看的讀者抽出至少 15 至 30 分鐘的整塊時間來閱讀。當(dāng)然,如果讀者僅為面試準(zhǔn)備,可以直接滑到最后的總結(jié)部分。

總覽

開篇來一些廢話。下圖是 java 線程池幾個相關(guān)類的繼承結(jié)構(gòu):
深度解讀 java 線程池設(shè)計思想及源碼實現(xiàn)

先簡單說說這個繼承結(jié)構(gòu),Executor 位于最頂層,也是最簡單的,就一個 execute(Runnable runnable) 接口方法定義。

ExecutorService 也是接口,在 Executor 接口的基礎(chǔ)上添加了很多的接口方法,所以一般來說我們會使用這個接口。

然后再下來一層是 AbstractExecutorService,從名字我們就知道,這是抽象類,這里實現(xiàn)了非常有用的一些方法供子類直接使用,之后我們再細(xì)說。

然后才到我們的重點部分 ThreadPoolExecutor 類,這個類提供了關(guān)于線程池所需的非常豐富的功能。

另外,我們還涉及到下圖中的這些類:
深度解讀 java 線程池設(shè)計思想及源碼實現(xiàn)

同在并發(fā)包中的 Executors 類,類名中帶字母 s,我們猜到這個是工具類,里面的方法都是靜態(tài)方法,如以下我們最常用的用于生成 ThreadPoolExecutor 的實例的一些方法:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

另外,由于線程池支持獲取線程執(zhí)行的結(jié)果,所以,引入了 Future 接口,RunnableFuture 繼承自此接口,然后我們最需要關(guān)心的就是它的實現(xiàn)類 FutureTask。到這里,記住這個概念,在線程池的使用過程中,我們是往線程池提交任務(wù)(task),使用過線程池的都知道,我們提交的每個任務(wù)是實現(xiàn)了 Runnable 接口的,其實就是先將 Runnable 的任務(wù)包裝成 FutureTask,然后再提交到線程池。這樣,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(wù)(Task),然后具有 Future 接口的語義,即可以在將來(Future)得到執(zhí)行的結(jié)果。

當(dāng)然,線程池中的 BlockingQueue 也是非常重要的概念,如果線程數(shù)達(dá)到 corePoolSize,我們的每個任務(wù)會提交到等待隊列中,等待線程池中的線程來取任務(wù)并執(zhí)行。這里的 BlockingQueue 通常我們使用其實現(xiàn)類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每個實現(xiàn)類都有不同的特征,使用場景之后會慢慢分析。

把事情說完整:除了上面說的這些類外,還有一個很重要的類,就是定時任務(wù)實現(xiàn)類 ScheduledThreadPoolExecutor,它繼承自本文要重點講解的 ThreadPoolExecutor,用于實現(xiàn)定時執(zhí)行。不過本文不會介紹它的實現(xiàn),我相信讀者看完本文后可以比較容易地看懂它的源碼。

以上就是本文要介紹的知識,廢話不多說,開始進(jìn)入正文。

Executor 接口

/* 
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {
    void execute(Runnable command);
}

我們可以看到 Executor 接口非常簡單,就一個 void execute(Runnable command) 方法,代表提交一個任務(wù)。為了讓大家理解 java 線程池的整個設(shè)計方案,我會按照 Doug Lea 的設(shè)計思路來多說一些相關(guān)的東西。

我們經(jīng)常這樣啟動一個線程:

new Thread(new Runnable(){
  // do something
}).start();

用了線程池 Executor 后就可以像下面這么使用:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

如果我們希望線程池同步執(zhí)行每一個任務(wù),我們可以這么實現(xiàn)這個接口:

class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();// 這里不是用的new Thread(r).start(),也就是說沒有啟動任何一個新的線程。
    }
}

我們希望每個任務(wù)提交進(jìn)來后,直接啟動一個新的線程來執(zhí)行這個任務(wù),我們可以這么實現(xiàn):

class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();  // 每個任務(wù)都用一個新的線程來執(zhí)行
    }
}

我們再來看下怎么組合兩個 Executor 來使用,下面這個實現(xiàn)是將所有的任務(wù)都加到一個 queue 中,然后從 queue 中取任務(wù),交給真正的執(zhí)行器執(zhí)行,這里采用 synchronized 進(jìn)行并發(fā)控制:

class SerialExecutor implements Executor {
    // 任務(wù)隊列
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    // 這個才是真正的執(zhí)行器
    final Executor executor;
    // 當(dāng)前正在執(zhí)行的任務(wù)
    Runnable active;

    // 初始化的時候,指定執(zhí)行器
    SerialExecutor(Executor executor) {
        this.executor = executor;
    }

    // 添加任務(wù)到線程池: 將任務(wù)添加到任務(wù)隊列,scheduleNext 觸發(fā)執(zhí)行器去任務(wù)隊列取任務(wù)
    public synchronized void execute(final Runnable r) {
        tasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (active == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((active = tasks.poll()) != null) {
            // 具體的執(zhí)行轉(zhuǎn)給真正的執(zhí)行器 executor
            executor.execute(active);
        }
    }
}

當(dāng)然了,Executor 這個接口只有提交任務(wù)的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執(zhí)行結(jié)果、我們想知道當(dāng)前線程池有多少個線程活著、已經(jīng)完成了多少任務(wù)等等,這些都是這個接口的不足的地方。接下來我們要介紹的是繼承自 Executor 接口的 ExecutorService 接口,這個接口提供了比較豐富的功能,也是我們最常使用到的接口。

ExecutorService

一般我們定義一個線程池的時候,往往都是使用這個接口:


ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);

因為這個接口中定義的一系列方法大部分情況下已經(jīng)可以滿足我們的需要了。

那么我們簡單初略地來看一下這個接口中都有哪些方法:

public interface ExecutorService extends Executor {

    // 關(guān)閉線程池,已提交的任務(wù)繼續(xù)執(zhí)行,不接受繼續(xù)提交新任務(wù)
    void shutdown();

    // 關(guān)閉線程池,嘗試停止正在執(zhí)行的所有任務(wù),不接受繼續(xù)提交新任務(wù)
    // 它和前面的方法相比,加了一個單詞“now”,區(qū)別在于它會去停止當(dāng)前正在進(jìn)行的任務(wù)
    List<Runnable> shutdownNow();

    // 線程池是否已關(guān)閉
    boolean isShutdown();

    // 如果調(diào)用了 shutdown() 或 shutdownNow() 方法后,所有任務(wù)結(jié)束了,那么返回true
    // 這個方法必須在調(diào)用shutdown或shutdownNow方法之后調(diào)用才會返回true
    boolean isTerminated();

    // 等待所有任務(wù)完成,并設(shè)置超時時間
    // 我們這么理解,實際應(yīng)用中是,先調(diào)用 shutdown 或 shutdownNow,
    // 然后再調(diào)這個方法等待所有的線程真正地完成,返回值意味著有沒有超時
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;

    // 提交一個 Callable 任務(wù)
    <T> Future<T> submit(Callable<T> task);

    // 提交一個 Runnable 任務(wù),第二個參數(shù)將會放到 Future 中,作為返回值,
    // 因為 Runnable 的 run 方法本身并不返回任何東西
    <T> Future<T> submit(Runnable task, T result);

    // 提交一個 Runnable 任務(wù)
    Future<?> submit(Runnable task);

    // 執(zhí)行所有任務(wù),返回 Future 類型的一個 list
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;

    // 也是執(zhí)行所有任務(wù),但是這里設(shè)置了超時時間
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
            throws InterruptedException;

    // 只有其中的一個任務(wù)結(jié)束了,就可以返回,返回執(zhí)行完的那個任務(wù)的結(jié)果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;

    // 同上一個方法,只有其中的一個任務(wù)結(jié)束了,就可以返回,返回執(zhí)行完的那個任務(wù)的結(jié)果,
    // 不過這個帶超時,超過指定的時間,拋出 TimeoutException 異常
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

這些方法都很好理解,一個簡單的線程池主要就是這些功能,能提交任務(wù),能獲取結(jié)果,能關(guān)閉線程池,這也是為什么我們經(jīng)常用這個接口的原因。

FutureTask

在繼續(xù)往下層介紹 ExecutorService 的實現(xiàn)類之前,我們先來說說相關(guān)的類 FutureTask。

Future   -> RunnableFuture -> FutureTask
Runnable -> RunnableFuture

FutureTask 通過 RunnableFuture 間接實現(xiàn)了 Runnable 接口,
所以每個 Runnable 通常都先包裝成 FutureTask,
然后調(diào)用 executor.execute(Runnable command) 將其提交給線程池

我們知道,Runnable 的 void run() 方法是沒有返回值的,所以,通常,如果我們需要的話,會在 submit 中指定第二個參數(shù)作為返回值:

&lt;T&gt; Future&lt;T&gt; submit(Runnable task, T result);
其實到時候會通過這兩個參數(shù),將其包裝成 Callable。

Callable 也是因為線程池的需要,所以才有了這個接口。它和 Runnable 的區(qū)別在于 run() 沒有返回值,而 Callable 的 call() 方法有返回值,同時,如果運行出現(xiàn)異常,call() 方法會拋出異常。

public interface Callable<V> {

    V call() throws Exception;
}

在這里,就不展開說 FutureTask 類了,因為本文篇幅本來就夠大了,這里我們需要知道怎么用就行了。

下面,我們來看看 ExecutorService 的抽象實現(xiàn) AbstractExecutorService 。

AbstractExecutorService

AbstractExecutorService 抽象類派生自 ExecutorService 接口,然后在其基礎(chǔ)上實現(xiàn)了幾個實用的方法,這些方法提供給子類進(jìn)行調(diào)用。

這個抽象類實現(xiàn)了 invokeAny 方法和 invokeAll 方法,這里的兩個 newTaskFor 方法也比較有用,用于將任務(wù)包裝成 FutureTask。定義于最上層接口 Executor中的 void execute(Runnable command) 由于不需要獲取結(jié)果,不會進(jìn)行 FutureTask 的包裝。

需要獲取結(jié)果(FutureTask),用 submit 方法,不需要獲取結(jié)果,可以用 execute 方法。

下面,我將一行一行源碼地來分析這個類,跟著源碼來看看其實現(xiàn)吧:

Tips: invokeAny 和 invokeAll 方法占了這整個類的絕大多數(shù)篇幅,讀者可以選擇適當(dāng)跳過,因為它們可能在你的實踐中使用的頻次比較低,而且它們不帶有承前啟后的作用,不用擔(dān)心會漏掉什么導(dǎo)致看不懂后面的代碼。

public abstract class AbstractExecutorService implements ExecutorService {

    // RunnableFuture 是用于獲取執(zhí)行結(jié)果的,我們常用它的子類 FutureTask
    // 下面兩個 newTaskFor 方法用于將我們的任務(wù)包裝成 FutureTask 提交到線程池中執(zhí)行
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    // 提交任務(wù)
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 1. 將任務(wù)包裝成 FutureTask
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 2. 交給執(zhí)行器執(zhí)行,execute 方法由具體的子類來實現(xiàn)
        // 前面也說了,F(xiàn)utureTask 間接實現(xiàn)了Runnable 接口。
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        // 1. 將任務(wù)包裝成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task, result);
        // 2. 交給執(zhí)行器執(zhí)行
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 1. 將任務(wù)包裝成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        // 2. 交給執(zhí)行器執(zhí)行
        execute(ftask);
        return ftask;
    }

    // 此方法目的:將 tasks 集合中的任務(wù)提交到線程池執(zhí)行,任意一個線程執(zhí)行完后就可以結(jié)束了
    // 第二個參數(shù) timed 代表是否設(shè)置超時機(jī)制,超時時間為第三個參數(shù),
    // 如果 timed 為 true,同時超時了還沒有一個線程返回結(jié)果,那么拋出 TimeoutException 異常
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        // 任務(wù)數(shù)
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        // 
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);

        // ExecutorCompletionService 不是一個真正的執(zhí)行器,參數(shù) this 才是真正的執(zhí)行器
        // 它對執(zhí)行器進(jìn)行了包裝,每個任務(wù)結(jié)束后,將結(jié)果保存到內(nèi)部的一個 completionQueue 隊列中
        // 這也是為什么這個類的名字里面有個 Completion 的原因吧。
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        try {
            // 用于保存異常信息,此方法如果沒有得到任何有效的結(jié)果,那么我們可以拋出最后得到的一個異常
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 首先先提交一個任務(wù),后面的任務(wù)到下面的 for 循環(huán)一個個提交
            futures.add(ecs.submit(it.next()));
            // 提交了一個任務(wù),所以任務(wù)數(shù)量減 1
            --ntasks;
            // 正在執(zhí)行的任務(wù)數(shù)(提交的時候 +1,任務(wù)結(jié)束的時候 -1)
            int active = 1;

            for (;;) {
                // ecs 上面說了,其內(nèi)部有一個 completionQueue 用于保存執(zhí)行完成的結(jié)果
                // BlockingQueue 的 poll 方法不阻塞,返回 null 代表隊列為空
                Future<T> f = ecs.poll();
                // 為 null,說明剛剛提交的第一個線程還沒有執(zhí)行完成
                // 在前面先提交一個任務(wù),加上這里做一次檢查,也是為了提高性能
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 這里是 else if,不是 if。這里說明,沒有任務(wù)了,同時 active 為 0 說明
                    // 任務(wù)都執(zhí)行完成了。其實我也沒理解為什么這里做一次 break?
                    // 因為我認(rèn)為 active 為 0 的情況,必然從下面的 f.get() 返回了

                    // 2018-02-23 感謝讀者 newmicro 的 comment,
                    //  這里的 active == 0,說明所有的任務(wù)都執(zhí)行失敗,那么這里是 for 循環(huán)出口
                    else if (active == 0)
                        break;
                    // 這里也是 else if。這里說的是,沒有任務(wù)了,但是設(shè)置了超時時間,這里檢測是否超時
                    else if (timed) {
                        // 帶等待的 poll 方法
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        // 如果已經(jīng)超時,拋出 TimeoutException 異常,這整個方法就結(jié)束了
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    // 這里是 else。說明,沒有任務(wù)需要提交,但是池中的任務(wù)沒有完成,還沒有超時(如果設(shè)置了超時)
                    // take() 方法會阻塞,直到有元素返回,說明有任務(wù)結(jié)束了
                    else
                        f = ecs.take();
                }
                /*
                 * 我感覺上面這一段并不是很好理解,這里簡單說下。
                 * 1. 首先,這在一個 for 循環(huán)中,我們設(shè)想每一個任務(wù)都沒那么快結(jié)束,
                 *     那么,每一次都會進(jìn)到第一個分支,進(jìn)行提交任務(wù),直到將所有的任務(wù)都提交了
                 * 2. 任務(wù)都提交完成后,如果設(shè)置了超時,那么 for 循環(huán)其實進(jìn)入了“一直檢測是否超時”
                       這件事情上
                 * 3. 如果沒有設(shè)置超時機(jī)制,那么不必要檢測超時,那就會阻塞在 ecs.take() 方法上,
                       等待獲取第一個執(zhí)行結(jié)果
                 * 4. 如果所有的任務(wù)都執(zhí)行失敗,也就是說 future 都返回了,
                       但是 f.get() 拋出異常,那么從 active == 0 分支出去(感謝 newmicro 提出)
                         // 當(dāng)然,這個需要看下面的 if 分支。
                 */

                // 有任務(wù)結(jié)束了
                if (f != null) {
                    --active;
                    try {
                        // 返回執(zhí)行結(jié)果,如果有異常,都包裝成 ExecutionException
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }// 注意看 for 循環(huán)的范圍,一直到這里

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            // 方法退出之前,取消其他的任務(wù)
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

    // 執(zhí)行所有的任務(wù),返回任務(wù)結(jié)果。
    // 先不要看這個方法,我們先想想,其實我們自己提交任務(wù)到線程池,也是想要線程池執(zhí)行所有的任務(wù)
    // 只不過,我們是每次 submit 一個任務(wù),這里以一個集合作為參數(shù)提交
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // 這個很簡單
            for (Callable<T> t : tasks) {
                // 包裝成 FutureTask
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                // 提交任務(wù)
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        // 這是一個阻塞方法,直到獲取到值,或拋出了異常
                        // 這里有個小細(xì)節(jié),其實 get 方法簽名上是會拋出 InterruptedException 的
                        // 可是這里沒有進(jìn)行處理,而是拋給外層去了。此異常發(fā)生于還沒執(zhí)行完的任務(wù)被取消了
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            // 這個方法返回,不像其他的場景,返回 List<Future>,其實執(zhí)行結(jié)果還沒出來
            // 這個方法返回是真正的返回,任務(wù)都結(jié)束了
            return futures;
        } finally {
            // 為什么要這個?就是上面說的有異常的情況
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

    // 帶超時的 invokeAll,我們找不同吧
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            Iterator<Future<T>> it = futures.iterator();
            // 每提交一個任務(wù),檢測一次是否超時
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                // 超時
                if (nanos <= 0)
                    return futures;
            }

            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        // 調(diào)用帶超時的 get 方法,這里的參數(shù) nanos 是剩余的時間,
                        // 因為上面其實已經(jīng)用掉了一些時間了
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

}

到這里,我們發(fā)現(xiàn),這個抽象類包裝了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啟線程來執(zhí)行任務(wù),它們都只是在方法內(nèi)部調(diào)用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法還沒出現(xiàn),需要等具體執(zhí)行器來實現(xiàn)這個最重要的部分,這里我們要說的就是 ThreadPoolExecutor 類了。

鑒于本文的篇幅,我覺得看到這里的讀者應(yīng)該已經(jīng)不多了,快餐文化使然??!我寫的每篇文章都力求讓讀者可以通過我的一篇文章而記住所有的相關(guān)知識點,所以篇幅不免長了些。其實,工作了很多年的話,會有一個感覺,比如說線程池,即使看了 20 篇各種總結(jié),也不如一篇長文實實在在講解清楚每一個知識點,有點少即是多,多即是少的意味了。

ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的線程池實現(xiàn),這個類實現(xiàn)了一個線程池需要的各個方法,它實現(xiàn)了任務(wù)提交、線程管理、監(jiān)控等等方法。

我們可以基于它來進(jìn)行業(yè)務(wù)上的擴(kuò)展,以實現(xiàn)我們需要的其他功能,比如實現(xiàn)定時任務(wù)的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor。當(dāng)然,這不是本文關(guān)注的重點,下面,還是趕緊進(jìn)行源碼分析吧。

首先,我們來看看線程池實現(xiàn)中的幾個概念和處理流程。

我們先回顧下提交任務(wù)的幾個方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

一個最基本的概念是,submit 方法中,參數(shù)是 Runnable 類型(也有Callable 類型),這個參數(shù)不是用于 new Thread(runnable).start() 中的,此處的這個參數(shù)不是用于啟動線程的,這里指的是任務(wù),任務(wù)要做的事情是 run() 方法里面定義的或 Callable 中的 call() 方法里面定義的。

初學(xué)者往往會搞混這個,因為 Runnable 總是在各個地方出現(xiàn),經(jīng)常把一個 Runnable 包到另一個 Runnable 中。請把它想象成有個 Task 接口,這個接口里面有一個 run() 方法(我想作者只是不想因為這個再定義一個完全可以用 Runnable 來代替的接口,Callable 的出現(xiàn),完全是因為 Runnable 不能滿足需要)。

我們回過神來繼續(xù)往下看,我畫了一個簡單的示意圖來描述線程池中的一些主要的構(gòu)件:
深度解讀 java 線程池設(shè)計思想及源碼實現(xiàn)

當(dāng)然,上圖沒有考慮隊列是否有界,提交任務(wù)時隊列滿了怎么辦?什么情況下會創(chuàng)建新的線程?提交任務(wù)時線程池滿了怎么辦?空閑線程怎么關(guān)掉?這些問題下面我們會一一解決。

我們經(jīng)常會使用 Executors 這個工具類來快速構(gòu)造一個線程池,對于初學(xué)者而言,這種工具類是很有用的,開發(fā)者不需要關(guān)注太多的細(xì)節(jié),只要知道自己需要一個線程池,僅僅提供必需的參數(shù)就可以了,其他參數(shù)都采用作者提供的默認(rèn)值。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

這里先不說有什么區(qū)別,它們最終都會導(dǎo)向這個構(gòu)造方法:


    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 這幾個參數(shù)都是必須要有的
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();

        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

基本上,上面的構(gòu)造方法中列出了我們最需要關(guān)心的幾個屬性了,下面逐個介紹下構(gòu)造方法中出現(xiàn)的這幾個屬性:

  • corePoolSize

核心線程數(shù),不要摳字眼,反正先記著有這么個屬性就可以了。

  • maximumPoolSize

最大線程數(shù),線程池允許創(chuàng)建的最大線程數(shù)。

  • workQueue

任務(wù)隊列,BlockingQueue 接口的某個實現(xiàn)(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。

  • keepAliveTime

空閑線程的?;顣r間,如果某線程的空閑時間超過這個值都沒有任務(wù)給它做,那么可以被關(guān)閉了。注意這個值并不會對所有線程起作用,如果線程池中的線程數(shù)少于等于核心線程數(shù) corePoolSize,那么這些線程不會因為空閑太長時間而被關(guān)閉,當(dāng)然,也可以通過調(diào)用 allowCoreThreadTimeOut(true)使核心線程數(shù)內(nèi)的線程也可以被回收。

  • threadFactory

用于生成線程,一般我們可以用默認(rèn)的就可以了。通常,我們可以通過它將我們的線程的名字設(shè)置得比較可讀一些,如 Message-Thread-1, Message-Thread-2 類似這樣。

  • handler:

當(dāng)線程池已經(jīng)滿了,但是又有新的任務(wù)提交的時候,該采取什么策略由這個來指定。有幾種方式可供選擇,像拋出異常、直接拒絕然后返回等,也可以自己實現(xiàn)相應(yīng)的接口實現(xiàn)自己的邏輯,這個之后再說。

除了上面幾個屬性外,我們再看看其他重要的屬性。

Doug Lea 采用一個 32 位的整數(shù)來存放線程池的狀態(tài)和當(dāng)前池中的線程數(shù),其中高 3 位用于存放線程池狀態(tài),低 29 位表示線程數(shù)(即使只有 29 位,也已經(jīng)不小了,大概 5 億多,現(xiàn)在還沒有哪個機(jī)器能起這么多線程的吧)。我們知道,java 語言在整數(shù)編碼上是統(tǒng)一的,都是采用補(bǔ)碼的形式,下面是簡單的移位操作和布爾操作,都是挺簡單的。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 這里 COUNT_BITS 設(shè)置為 29(32-3),意味著前三位用于存放線程狀態(tài),后29位用于存放線程數(shù)
// 很多初學(xué)者很喜歡在自己的代碼中寫很多 29 這種數(shù)字,或者某個特殊的字符串,然后分布在各個地方,這是非常糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;

// 000 11111111111111111111111111111
// 這里得到的是 29 個 1,也就是說線程池的最大線程數(shù)是 2^29-1=536870911
// 以我們現(xiàn)在計算機(jī)的實際情況,這個數(shù)量還是夠用的
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 我們說了,線程池的狀態(tài)存放在高 3 位中
// 運算結(jié)果為 111跟29個0:111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 將整數(shù) c 的低 29 位修改為 0,就得到了線程池的狀態(tài)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 將整數(shù) c 的高 3 為修改為 0,就得到了線程池中的線程數(shù)
private static int workerCountOf(int c)  { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

上面就是對一個整數(shù)的簡單的位操作,幾個操作方法將會在后面的源碼中一直出現(xiàn),所以讀者最好把方法名字和其代表的功能記住,看源碼的時候也就不需要來來回回翻了。

在這里,介紹下線程池中的各個狀態(tài)和狀態(tài)變化的轉(zhuǎn)換過程:

  • RUNNING:這個沒什么好說的,這是最正常的狀態(tài):接受新的任務(wù),處理等待隊列中的任務(wù)
  • SHUTDOWN:不接受新的任務(wù)提交,但是會繼續(xù)處理等待隊列中的任務(wù)
  • STOP:不接受新的任務(wù)提交,不再處理等待隊列中的任務(wù),中斷正在執(zhí)行任務(wù)的線程
  • TIDYING:所有的任務(wù)都銷毀了,workCount 為 0。線程池的狀態(tài)在轉(zhuǎn)換為 TIDYING 狀態(tài)時,會執(zhí)行鉤子方法 terminated()
  • TERMINATED:terminated() 方法結(jié)束后,線程池的狀態(tài)就會變成這個

    RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,所以等于 0 的時候不能提交任務(wù),大于 0 的話,連正在執(zhí)行的任務(wù)也需要中斷。

看了這幾種狀態(tài)的介紹,讀者大體也可以猜到十之八九的狀態(tài)轉(zhuǎn)換了,各個狀態(tài)的轉(zhuǎn)換過程有以下幾種:

  • RUNNING -> SHUTDOWN:當(dāng)調(diào)用了 shutdown() 后,會發(fā)生這個狀態(tài)轉(zhuǎn)換,這也是最重要的
  • (RUNNING or SHUTDOWN) -> STOP:當(dāng)調(diào)用 shutdownNow() 后,會發(fā)生這個狀態(tài)轉(zhuǎn)換,這下要清楚 shutDown() 和 shutDownNow() 的區(qū)別了
  • SHUTDOWN -> TIDYING:當(dāng)任務(wù)隊列和線程池都清空后,會由 SHUTDOWN 轉(zhuǎn)換為 TIDYING
  • STOP -> TIDYING:當(dāng)任務(wù)隊列清空后,發(fā)生這個轉(zhuǎn)換
  • TIDYING -> TERMINATED:這個前面說了,當(dāng) terminated() 方法結(jié)束后
    上面的幾個記住核心的就可以了,尤其第一個和第二個。

另外,我們還要看看一個內(nèi)部類 Worker,因為 Doug Lea 把線程池中的線程包裝成了一個個 Worker,翻譯成工人,就是線程池中做任務(wù)的線程。所以到這里,我們知道任務(wù)是 Runnable(內(nèi)部叫 task 或 command),線程是 Worker。

Worker 這里又用到了抽象類 AbstractQueuedSynchronizer。題外話,AQS 在并發(fā)中真的是到處出現(xiàn),而且非常容易使用,寫少量的代碼就能實現(xiàn)自己需要的同步方式。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    // 這個是真正的線程,任務(wù)靠你啦
    final Thread thread;

    // 前面說了,這里的 Runnable 是任務(wù)。為什么叫 firstTask?因為在創(chuàng)建線程的時候,如果同時指定了
    // 這個線程起來以后需要執(zhí)行的第一個任務(wù),那么第一個任務(wù)就是存放在這里的(線程可不止執(zhí)行這一個任務(wù))
    // 當(dāng)然了,也可以為 null,這樣線程起來了,自己到任務(wù)隊列(BlockingQueue)中取任務(wù)(getTask 方法)就行了
    Runnable firstTask;

    // 用于存放此線程完全的任務(wù)數(shù),注意了,這里用了 volatile,保證可見性
    volatile long completedTasks;

    // Worker 只有這一個構(gòu)造方法,傳入 firstTask,也可以傳 null
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程
        this.thread = getThreadFactory().newThread(this);
    }

    // 這里調(diào)用了外部類的 runWorker 方法
    public void run() {
        runWorker(this);
    }

    ...// 其他幾個方法沒什么好看的,就是用 AQS 操作,來獲取這個線程的執(zhí)行權(quán),用了獨占鎖
}

前面雖然啰嗦,但是簡單。有了上面的這些基礎(chǔ)后,我們終于可以看看 ThreadPoolExecutor 的 execute 方法了,前面源碼分析的時候也說了,各種方法都最終依賴于 execute 方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    // 前面說的那個表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
    int c = ctl.get();

    // 如果當(dāng)前線程數(shù)少于核心線程數(shù),那么直接添加一個 worker 來執(zhí)行任務(wù),
    // 創(chuàng)建一個新的線程,并把當(dāng)前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 添加任務(wù)成功,那么就結(jié)束了。提交任務(wù)嘛,線程池已經(jīng)接受了這個任務(wù),這個方法也就可以返回了
        // 至于執(zhí)行的結(jié)果,到時候會包裝到 FutureTask 中。
        // 返回 false 代表線程池不允許提交任務(wù)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到這里說明,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了

    // 如果線程池處于 RUNNING 狀態(tài),把這個任務(wù)添加到任務(wù)隊列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        /* 這里面說的是,如果任務(wù)進(jìn)入了 workQueue,我們是否需要開啟新的線程
         * 因為線程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程
         * 如果線程數(shù)已經(jīng)大于等于 corePoolSize,那么將任務(wù)添加到隊列中,然后進(jìn)到這里
         */
        int recheck = ctl.get();
        // 如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊的這個任務(wù),并且執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池還是 RUNNING 的,并且線程數(shù)為 0,那么開啟新的線程
        // 到這里,我們知道了,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊列中了,但是線程都關(guān)閉了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 隊列滿了,那么進(jìn)入到這個分支
    // 以 maximumPoolSize 為界創(chuàng)建新的 worker,
    // 如果失敗,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

對創(chuàng)建線程的錯誤理解:如果線程數(shù)少于 corePoolSize,創(chuàng)建一個線程,如果線程數(shù)在 [corePoolSize, maximumPoolSize] 之間那么可以創(chuàng)建線程或復(fù)用空閑線程,keepAliveTime 對這個區(qū)間的線程有效。

從上面的幾個分支,我們就可以看出,上面的這段話是錯誤的。

上面這些一時半會也不可能全部消化搞定,我們先繼續(xù)往下吧,到時候再回頭看幾遍。

這個方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我們看看它是怎么創(chuàng)建新的線程的:

// 第一個參數(shù)是準(zhǔn)備提交給這個線程執(zhí)行的任務(wù),之前說了,可以為 null
// 第二個參數(shù)為 true 代表使用核心線程數(shù) corePoolSize 作為創(chuàng)建線程的界線,也就說創(chuàng)建這個線程的時候,
//         如果線程池中的線程總數(shù)已經(jīng)達(dá)到 corePoolSize,那么不能響應(yīng)這次創(chuàng)建線程的請求
//         如果是 false,代表使用最大線程數(shù) maximumPoolSize 作為界線
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 這個非常不好理解
        // 如果線程池已關(guān)閉,并滿足以下條件之一,那么不創(chuàng)建新的 worker:
        // 1. 線程池狀態(tài)大于 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED
        // 2. firstTask != null
        // 3. workQueue.isEmpty()
        // 簡單分析下:
        // 還是狀態(tài)控制的問題,當(dāng)線程池處于 SHUTDOWN 的時候,不允許提交任務(wù),但是已有的任務(wù)繼續(xù)執(zhí)行
        // 當(dāng)狀態(tài)大于 SHUTDOWN 時,不允許提交任務(wù),且中斷正在執(zhí)行的任務(wù)
        // 多說一句:如果線程池處于 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那么是允許創(chuàng)建 worker 的
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 如果成功,那么就是所有創(chuàng)建線程前的條件校驗都滿足了,準(zhǔn)備創(chuàng)建線程執(zhí)行任務(wù)了
            // 這里失敗的話,說明有其他線程也在嘗試往線程池中創(chuàng)建線程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 由于有并發(fā),重新再讀取一下 ctl
            c = ctl.get();
            // 正常如果是 CAS 失敗的話,進(jìn)到下一個里層的for循環(huán)就可以了
            // 可是如果是因為其他線程的操作,導(dǎo)致線程池的狀態(tài)發(fā)生了變更,如有其他線程關(guān)閉了這個線程池
            // 那么需要回到外層的for循環(huán)
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /* 
     * 到這里,我們認(rèn)為在當(dāng)前這個時刻,可以開始創(chuàng)建線程來執(zhí)行任務(wù)了,
     * 因為該校驗的都校驗了,至于以后會發(fā)生什么,那是以后的事,至少當(dāng)前是滿足條件的
     */

    // worker 是否已經(jīng)啟動
    boolean workerStarted = false;
    // 是否已將這個 worker 添加到 workers 這個 HashSet 中
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        // 把 firstTask 傳給 worker 的構(gòu)造方法
        w = new Worker(firstTask);
        // 取 worker 中的線程對象,之前說了,Worker的構(gòu)造方法會調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程
        final Thread t = w.thread;
        if (t != null) {
            // 這個是整個類的全局鎖,持有這個鎖才能讓下面的操作“順理成章”,
            // 因為關(guān)閉一個線程池需要這個鎖,至少我持有鎖的期間,線程池不會被關(guān)閉
            mainLock.lock();
            try {

                int c = ctl.get();
                int rs = runStateOf(c);

                // 小于 SHUTTDOWN 那就是 RUNNING,這個自不必說,是最正常的情況
                // 如果等于 SHUTDOWN,前面說了,不接受新的任務(wù),但是會繼續(xù)執(zhí)行等待隊列中的任務(wù)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker 里面的 thread 可不能是已經(jīng)啟動的
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 加到 workers 這個 HashSet 中
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用于記錄 workers 中的個數(shù)的最大值
                    // 因為 workers 是不斷增加減少的,通過這個值可以知道線程池的大小曾經(jīng)達(dá)到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功的話,啟動這個線程
            if (workerAdded) {
                // 啟動線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果線程沒有啟動,需要做一些清理工作,如前面 workCount 加了 1,將其減掉
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回線程是否啟動成功
    return workerStarted;
}

簡單看下 addWorkFailed 的處理:

// workers 中刪除掉相應(yīng)的 worker
// workCount 減 1
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        // rechecks for termination, in case the existence of this worker was holding up termination
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

回過頭來,繼續(xù)往下走。我們知道,worker 中的線程 start 后,其 run 方法會調(diào)用 runWorker 方法:

// Worker 類的 run() 方法
public void run() {
    runWorker(this);
}

繼續(xù)往下看 runWorker 方法:

// 此方法由 worker 線程啟動后調(diào)用,這里用一個 while 循環(huán)來不斷地從等待隊列中獲取任務(wù)并執(zhí)行
// 前面說了,worker 在初始化的時候,可以指定 firstTask,那么第一個任務(wù)也就可以不需要從隊列中獲取
final void runWorker(Worker w) {
    // 
    Thread wt = Thread.currentThread();
    // 該線程的第一個任務(wù)(如果有的話)
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循環(huán)調(diào)用 getTask 獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();          
            // 如果線程池狀態(tài)大于等于 STOP,那么意味著該線程也要中斷
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 這是一個鉤子方法,留給需要的子類實現(xiàn)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 到這里終于可以執(zhí)行任務(wù)了
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    // 這里不允許拋出 Throwable,所以轉(zhuǎn)換為 Error
                    thrown = x; throw new Error(x);
                } finally {
                    // 也是一個鉤子方法,將 task 和異常作為參數(shù),留給需要的子類實現(xiàn)
                    afterExecute(task, thrown);
                }
            } finally {
                // 置空 task,準(zhǔn)備 getTask 獲取下一個任務(wù)
                task = null;
                // 累加完成的任務(wù)數(shù)
                w.completedTasks++;
                // 釋放掉 worker 的獨占鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 如果到這里,需要執(zhí)行線程關(guān)閉:
        // 1. 說明 getTask 返回 null,也就是說,這個 worker 的使命結(jié)束了,執(zhí)行關(guān)閉
        // 2. 任務(wù)執(zhí)行過程中發(fā)生了異常
        // 第一種情況,已經(jīng)在代碼處理了將 workCount 減 1,這個在 getTask 方法分析中會說
        // 第二種情況,workCount 沒有進(jìn)行處理,所以需要在 processWorkerExit 中處理
        // 限于篇幅,我不準(zhǔn)備分析這個方法了,感興趣的讀者請自行分析源碼
        processWorkerExit(w, completedAbruptly);
    }
}

我們看看 getTask() 是怎么獲取任務(wù)的,這個方法寫得真的很好,每一行都很簡單,組合起來卻所有的情況都想好了:

// 此方法有三種可能:
// 1. 阻塞直到獲取到任務(wù)返回。我們知道,默認(rèn) corePoolSize 之內(nèi)的線程是不會被回收的,
//      它們會一直等待任務(wù)
// 2. 超時退出。keepAliveTime 起作用的時候,也就是如果這么多時間內(nèi)都沒有任務(wù),那么應(yīng)該執(zhí)行關(guān)閉
// 3. 如果發(fā)生了以下條件,此方法必須返回 null:
//    - 池中有大于 maximumPoolSize 個 workers 存在(通過調(diào)用 setMaximumPoolSize 進(jìn)行設(shè)置)
//    - 線程池處于 SHUTDOWN,而且 workQueue 是空的,前面說了,這種不再接受新的任務(wù)
//    - 線程池處于 STOP,不僅不接受新的線程,連 workQueue 中的線程也不再執(zhí)行
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 兩種可能
        // 1. rs == SHUTDOWN && workQueue.isEmpty()
        // 2. rs >= STOP
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // CAS 操作,減少工作線程數(shù)
            decrementWorkerCount();
            return null;
        }

        boolean timed;      // Are workers subject to culling?
        for (;;) {
            int wc = workerCountOf(c);
            // 允許核心線程數(shù)內(nèi)的線程回收,或當(dāng)前線程數(shù)超過了核心線程數(shù),那么有可能發(fā)生超時關(guān)閉
            timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 這里 break,是為了不往下執(zhí)行后一個 if (compareAndDecrementWorkerCount(c))
            // 兩個 if 一起看:如果當(dāng)前線程數(shù) wc > maximumPoolSize,或者超時,都返回 null
            // 那這里的問題來了,wc > maximumPoolSize 的情況,為什么要返回 null?
            //    換句話說,返回 null 意味著關(guān)閉線程。
            // 那是因為有可能開發(fā)者調(diào)用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調(diào)小了
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            // compareAndDecrementWorkerCount(c) 失敗,線程池中的線程數(shù)發(fā)生了改變
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
        // wc <= maximumPoolSize 同時沒有超時
        try {
            // 到 workQueue 中獲取任務(wù)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果此 worker 發(fā)生了中斷,采取的方案是重試
            // 解釋下為什么會發(fā)生中斷,這個讀者要去看 setMaximumPoolSize 方法,
            // 如果開發(fā)者將 maximumPoolSize 調(diào)小了,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量,
            // 那么意味著超出的部分線程要被關(guān)閉。重新進(jìn)入 for 循環(huán),自然會有部分線程會返回 null
            timedOut = false;
        }
    }
}

到這里,基本上也說完了整個流程,讀者這個時候應(yīng)該回到 execute(Runnable command) 方法,看看各個分支,我把代碼貼過來一下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    // 前面說的那個表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
    int c = ctl.get();

    // 如果當(dāng)前線程數(shù)少于核心線程數(shù),那么直接添加一個 worker 來執(zhí)行任務(wù),
    // 創(chuàng)建一個新的線程,并把當(dāng)前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 添加任務(wù)成功,那么就結(jié)束了。提交任務(wù)嘛,線程池已經(jīng)接受了這個任務(wù),這個方法也就可以返回了
        // 至于執(zhí)行的結(jié)果,到時候會包裝到 FutureTask 中。
        // 返回 false 代表線程池不允許提交任務(wù)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到這里說明,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了

    // 如果線程池處于 RUNNING 狀態(tài),把這個任務(wù)添加到任務(wù)隊列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        /* 這里面說的是,如果任務(wù)進(jìn)入了 workQueue,我們是否需要開啟新的線程
         * 因為線程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程
         * 如果線程數(shù)已經(jīng)大于等于 corePoolSize,那么將任務(wù)添加到隊列中,然后進(jìn)到這里
         */
        int recheck = ctl.get();
        // 如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊的這個任務(wù),并且執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池還是 RUNNING 的,并且線程數(shù)為 0,那么開啟新的線程
        // 到這里,我們知道了,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊列中了,但是線程都關(guān)閉了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 隊列滿了,那么進(jìn)入到這個分支
    // 以 maximumPoolSize 為界創(chuàng)建新的 worker,
    // 如果失敗,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

上面各個分支中,有兩種情況會調(diào)用 reject(command) 來處理任務(wù),因為按照正常的流程,線程池此時不能接受這個任務(wù),所以需要執(zhí)行我們的拒絕策略。接下來,我們說一說 ThreadPoolExecutor 中的拒絕策略。

final void reject(Runnable command) {
    // 執(zhí)行拒絕策略
    handler.rejectedExecution(command, this);
}

此處的 handler 我們需要在構(gòu)造線程池的時候就傳入這個參數(shù),它是 RejectedExecutionHandler 的實例。

RejectedExecutionHandler 在 ThreadPoolExecutor 中有四個已經(jīng)定義好的實現(xiàn)類可供我們直接使用,當(dāng)然,我們也可以實現(xiàn)自己的策略,不過一般也沒有必要。


// 只要線程池沒有被關(guān)閉,那么由提交任務(wù)的線程自己來執(zhí)行這個任務(wù)。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

// 不管怎樣,直接拋出 RejectedExecutionException 異常
// 這個是默認(rèn)的策略,如果我們構(gòu)造線程池的時候不傳相應(yīng)的 handler 的話,那就會指定使用這個
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

//       不做任何處理,直接忽略掉這個任務(wù)
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

// 這個相對霸道一點,如果線程池沒有被關(guān)閉的話,
// 把隊列隊頭的任務(wù)(也就是等待了最長時間的)直接扔掉,然后提交這個任務(wù)到等待隊列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

到這里,ThreadPoolExecutor 的源碼算是分析結(jié)束了。單純從源碼的難易程度來說,ThreadPoolExecutor 的源碼還算是比較簡單的,只是需要我們靜下心來好好看看罷了。

Executors

這節(jié)其實也不是分析 Executors 這個類,因為它僅僅是工具類,它的所有方法都是 static 的。

  • 生成一個固定大小的線程池:
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
    }

    最大線程數(shù)設(shè)置為與核心線程數(shù)相等,此時 keepAliveTime 設(shè)置為 0(因為這里它是沒用的,即使不為 0,線程池默認(rèn)也不會回收 corePoolSize 內(nèi)的線程),任務(wù)隊列采用 LinkedBlockingQueue,×××隊列。

過程分析:剛開始,每提交一個任務(wù)都創(chuàng)建一個 worker,當(dāng) worker 的數(shù)量達(dá)到 nThreads 后,不再創(chuàng)建新的線程,而是把任務(wù)提交到 LinkedBlockingQueue 中,而且之后線程數(shù)始終為 nThreads。

  • 生成只有一個線程的固定線程池,這個更簡單,和上面的一樣,只要設(shè)置線程數(shù)為 1 就可以了:
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
    }
  • 生成一個需要的時候就創(chuàng)建新的線程,同時可以復(fù)用之前創(chuàng)建的線程(如果這個線程當(dāng)前沒有任務(wù))的線程池:
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
    }

    核心線程數(shù)為 0,最大線程數(shù)為 Integer.MAX_VALUE,keepAliveTime 為 60 秒,任務(wù)隊列采用 SynchronousQueue。

這種線程池對于任務(wù)可以比較快速地完成的情況有比較好的性能。如果線程空閑了 60 秒都沒有任務(wù),那么將關(guān)閉此線程并從線程池中移除。所以如果線程池空閑了很長時間也不會有問題,因為隨著所有的線程都會被關(guān)閉,整個線程池不會占用任何的系統(tǒng)資源。

過程分析:我把 execute 方法的主體黏貼過來,讓大家看得明白些。鑒于 corePoolSize 是 0,那么提交任務(wù)的時候,直接將任務(wù)提交到隊列中,由于采用了 SynchronousQueue,所以如果是第一個任務(wù)提交的時候,offer 方法肯定會返回 false,因為此時沒有任何 worker 對這個任務(wù)進(jìn)行接收,那么將進(jìn)入到最后一個分支來創(chuàng)建第一個 worker。之后再提交任務(wù)的話,取決于是否有空閑下來的線程對任務(wù)進(jìn)行接收,如果有,會進(jìn)入到第二個 if 語句塊中,否則就是和第一個任務(wù)一樣,進(jìn)到最后的 else if 分支創(chuàng)建新線程。

int c = ctl.get();
// corePoolSize 為 0,所以不會進(jìn)到這個 if 分支
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
// offer 如果有空閑線程剛好可以接收此任務(wù),那么返回 true,否則返回 false
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
else if (!addWorker(command, false))
    reject(command);

SynchronousQueue 是一個比較特殊的 BlockingQueue,其本身不儲存任何元素,它有一個虛擬隊列(或虛擬棧),不管讀操作還是寫操作,如果當(dāng)前隊列中存儲的是與當(dāng)前操作相同模式的線程,那么當(dāng)前操作也進(jìn)入隊列中等待;如果是相反模式,則配對成功,從當(dāng)前隊列中取隊頭節(jié)點。

總結(jié)

我一向不喜歡寫總結(jié),因為我把所有需要表達(dá)的都寫在正文中了,寫小篇幅的總結(jié)并不能真正將話說清楚,本文的總結(jié)部分為準(zhǔn)備面試的讀者而寫,希望能幫到面試者或者沒有足夠的時間看完全文的讀者。

java 線程池有哪些關(guān)鍵屬性?

corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler
corePoolSize 到 maximumPoolSize 之間的線程會被回收,當(dāng)然 corePoolSize 的線程也可以通過設(shè)置而得到回收(allowCoreThreadTimeOut(true))。
workQueue 用于存放任務(wù),添加任務(wù)的時候,如果當(dāng)前線程數(shù)超過了 corePoolSize,那么往該隊列中插入任務(wù),線程池中的線程會負(fù)責(zé)到隊列中拉取任務(wù)。
keepAliveTime 用于設(shè)置空閑時間,如果線程數(shù)超出了 corePoolSize,并且有些線程的空閑時間超過了這個值,會執(zhí)行關(guān)閉這些線程的操作
rejectedExecutionHandler 用于處理當(dāng)線程池不能執(zhí)行此任務(wù)時的情況,默認(rèn)有拋出 RejectedExecutionException 異常、忽略任務(wù)、使用提交任務(wù)的線程來執(zhí)行此任務(wù)和將隊列中等待最久的任務(wù)刪除,然后提交此任務(wù)這四種策略,默認(rèn)為拋出異常。

說說線程池中的線程創(chuàng)建時機(jī)?

如果當(dāng)前線程數(shù)少于 corePoolSize,那么提交任務(wù)的時候創(chuàng)建一個新的線程,并由這個線程執(zhí)行這個任務(wù);
如果當(dāng)前線程數(shù)已經(jīng)達(dá)到 corePoolSize,那么將提交的任務(wù)添加到隊列中,等待線程池中的線程去隊列中取任務(wù);
如果隊列已滿,那么創(chuàng)建新的線程來執(zhí)行任務(wù),需要保證池中的線程數(shù)不會超過 maximumPoolSize,如果此時線程數(shù)超過了 maximumPoolSize,那么執(zhí)行拒絕策略。

  • 注意:如果將隊列設(shè)置為×××隊列,那么線程數(shù)達(dá)到 corePoolSize 后,其實線程數(shù)就不會再增長了。

Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 構(gòu)造出來的線程池有什么差別?

細(xì)說太長,往上滑一點點,在 Executors 的小節(jié)進(jìn)行了詳盡的描述。

任務(wù)執(zhí)行過程中發(fā)生異常怎么處理?

如果某個任務(wù)執(zhí)行出現(xiàn)異常,那么執(zhí)行任務(wù)的線程會被關(guān)閉,而不是繼續(xù)接收其他任務(wù)。然后會啟動一個新的線程來代替它。

什么時候會執(zhí)行拒絕策略?

  • workers 的數(shù)量達(dá)到了 corePoolSize(任務(wù)此時需要進(jìn)入任務(wù)隊列),任務(wù)入隊成功,與此同時線程池被關(guān)閉了,而且關(guān)閉線程池并沒有將這個任務(wù)出隊,那么執(zhí)行拒絕策略。這里說的是非常邊界的問題,入隊和關(guān)閉線程池并發(fā)執(zhí)行,讀者仔細(xì)看看 execute 方法是怎么進(jìn)到第一個 reject(command) 里面的。
  • workers 的數(shù)量大于等于 corePoolSize,將任務(wù)加入到任務(wù)隊列,可是隊列滿了,任務(wù)入隊失敗,那么準(zhǔn)備開啟新的線程,可是線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,那么執(zhí)行拒絕策略。
    因為本文實在太長了,所以我沒有說執(zhí)行結(jié)果是怎么獲取的,也沒有說關(guān)閉線程池相關(guān)的部分,這個就留給讀者吧。

本文篇幅是有點長,如果讀者發(fā)現(xiàn)什么不對的地方,或者有需要補(bǔ)充的地方,請不吝提出,謝謝。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI