溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎樣深度解讀java線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-18 17:33:53 來源:億速云 閱讀:121 作者:柒染 欄目:編程語言

這篇文章給大家介紹怎樣深度解讀java線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn),內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

前言

線程池是非常重要的工具,如果你要成為一個(gè)好的工程師,還是得比較好地掌握這個(gè)知識(shí),很多線上問題都是因?yàn)闆]有用好線程池導(dǎo)致的。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術(shù)水平。

總覽

開篇來一些廢話。下圖是 java 線程池幾個(gè)相關(guān)類的繼承結(jié)構(gòu):

怎樣深度解讀java線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn)

先簡單說說這個(gè)繼承結(jié)構(gòu),Executor 位于最頂層,也是最簡單的,就一個(gè) execute(Runnable runnable) 接口方法定義。

ExecutorService 也是接口,在 Executor 接口的基礎(chǔ)上添加了很多的接口方法,所以一般來說我們會(huì)使用這個(gè)接口。

然后再下來一層是 AbstractExecutorService,從名字我們就知道,這是抽象類,這里實(shí)現(xiàn)了非常有用的一些方法供子類直接使用,之后我們?cè)偌?xì)說。

然后才到我們的重點(diǎn)部分 ThreadPoolExecutor 類,這個(gè)類提供了關(guān)于線程池所需的非常豐富的功能。

另外,我們還涉及到下圖中的這些類:

怎樣深度解讀java線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn)

同在并發(fā)包中的 Executors 類,類名中帶字母 s,我們猜到這個(gè)是工具類,里面的方法都是靜態(tài)方法,如以下我們最常用的用于生成 ThreadPoolExecutor 的實(shí)例的一些方法:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

另外,由于線程池支持獲取線程執(zhí)行的結(jié)果,所以,引入了 Future 接口,RunnableFuture 繼承自此接口,然后我們最需要關(guān)心的就是它的實(shí)現(xiàn)類 FutureTask。到這里,記住這個(gè)概念,在線程池的使用過程中,我們是往線程池提交任務(wù)(task),使用過線程池的都知道,我們提交的每個(gè)任務(wù)是實(shí)現(xiàn)了 Runnable 接口的,其實(shí)就是先將 Runnable 的任務(wù)包裝成 FutureTask,然后再提交到線程池。這樣,讀者才能比較容易記住 FutureTask 這個(gè)類名:它首先是一個(gè)任務(wù)(Task),然后具有 Future 接口的語義,即可以在將來(Future)得到執(zhí)行的結(jié)果。

當(dāng)然,線程池中的 BlockingQueue 也是非常重要的概念,如果線程數(shù)達(dá)到 corePoolSize,我們的每個(gè)任務(wù)會(huì)提交到等待隊(duì)列中,等待線程池中的線程來取任務(wù)并執(zhí)行。這里的 BlockingQueue 通常我們使用其實(shí)現(xiàn)類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每個(gè)實(shí)現(xiàn)類都有不同的特征,使用場景之后會(huì)慢慢分析。想要詳細(xì)了解各個(gè) BlockingQueue 的讀者,可以參考我的前面的一篇對(duì) BlockingQueue 的各個(gè)實(shí)現(xiàn)類進(jìn)行詳細(xì)分析的文章。

把事情說完整:除了上面說的這些類外,還有一個(gè)很重要的類,就是定時(shí)任務(wù)實(shí)現(xiàn)類 ScheduledThreadPoolExecutor,它繼承自本文要重點(diǎn)講解的 ThreadPoolExecutor,用于實(shí)現(xiàn)定時(shí)執(zhí)行。不過本文不會(huì)介紹它的實(shí)現(xiàn),我相信讀者看完本文后可以比較容易地看懂它的源碼。

以上就是本文要介紹的知識(shí),廢話不多說,開始進(jìn)入正文。

Executor 接口

/* 
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {
    void execute(Runnable command);
}

我們可以看到 Executor 接口非常簡單,就一個(gè) void execute(Runnable command) 方法,代表提交一個(gè)任務(wù)。為了讓大家理解 java 線程池的整個(gè)設(shè)計(jì)方案,我會(huì)按照 Doug Lea 的設(shè)計(jì)思路來多說一些相關(guān)的東西。

我們經(jīng)常這樣啟動(dòng)一個(gè)線程:

new Thread(new Runnable(){
  // do something
}).start();

用了線程池 Executor 后就可以像下面這么使用:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

如果我們希望線程池同步執(zhí)行每一個(gè)任務(wù),我們可以這么實(shí)現(xiàn)這個(gè)接口:

class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();// 這里不是用的new Thread(r).start(),也就是說沒有啟動(dòng)任何一個(gè)新的線程。
    }
}

我們希望每個(gè)任務(wù)提交進(jìn)來后,直接啟動(dòng)一個(gè)新的線程來執(zhí)行這個(gè)任務(wù),我們可以這么實(shí)現(xiàn):

class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();  // 每個(gè)任務(wù)都用一個(gè)新的線程來執(zhí)行
    }
}

我們?cè)賮砜聪略趺唇M合兩個(gè) Executor 來使用,下面這個(gè)實(shí)現(xiàn)是將所有的任務(wù)都加到一個(gè) queue 中,然后從 queue 中取任務(wù),交給真正的執(zhí)行器執(zhí)行,這里采用 synchronized 進(jìn)行并發(fā)控制:

class SerialExecutor implements Executor {
    // 任務(wù)隊(duì)列
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    // 這個(gè)才是真正的執(zhí)行器
    final Executor executor;
    // 當(dāng)前正在執(zhí)行的任務(wù)
    Runnable active;
    // 初始化的時(shí)候,指定執(zhí)行器
    SerialExecutor(Executor executor) {
        this.executor = executor;
    }
    // 添加任務(wù)到線程池: 將任務(wù)添加到任務(wù)隊(duì)列,scheduleNext 觸發(fā)執(zhí)行器去任務(wù)隊(duì)列取任務(wù)
    public synchronized void execute(final Runnable r) {
        tasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (active == null) {
            scheduleNext();
        }
    }
    protected synchronized void scheduleNext() {
        if ((active = tasks.poll()) != null) {
            // 具體的執(zhí)行轉(zhuǎn)給真正的執(zhí)行器 executor
            executor.execute(active);
        }
    }
}

當(dāng)然了,Executor 這個(gè)接口只有提交任務(wù)的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執(zhí)行結(jié)果、我們想知道當(dāng)前線程池有多少個(gè)線程活著、已經(jīng)完成了多少任務(wù)等等,這些都是這個(gè)接口的不足的地方。接下來我們要介紹的是繼承自 Executor 接口的 ExecutorService 接口,這個(gè)接口提供了比較豐富的功能,也是我們最常使用到的接口。

ExecutorService

一般我們定義一個(gè)線程池的時(shí)候,往往都是使用這個(gè)接口:

ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);

因?yàn)檫@個(gè)接口中定義的一系列方法大部分情況下已經(jīng)可以滿足我們的需要了。

那么我們簡單初略地來看一下這個(gè)接口中都有哪些方法:

public interface ExecutorService extends Executor {
    // 關(guān)閉線程池,已提交的任務(wù)繼續(xù)執(zhí)行,不接受繼續(xù)提交新任務(wù)
    void shutdown();
    // 關(guān)閉線程池,嘗試停止正在執(zhí)行的所有任務(wù),不接受繼續(xù)提交新任務(wù)
    // 它和前面的方法相比,加了一個(gè)單詞“now”,區(qū)別在于它會(huì)去停止當(dāng)前正在進(jìn)行的任務(wù)
    List<Runnable> shutdownNow();
    // 線程池是否已關(guān)閉
    boolean isShutdown();
    // 如果調(diào)用了 shutdown() 或 shutdownNow() 方法后,所有任務(wù)結(jié)束了,那么返回true
    // 這個(gè)方法必須在調(diào)用shutdown或shutdownNow方法之后調(diào)用才會(huì)返回true
    boolean isTerminated();
    // 等待所有任務(wù)完成,并設(shè)置超時(shí)時(shí)間
    // 我們這么理解,實(shí)際應(yīng)用中是,先調(diào)用 shutdown 或 shutdownNow,
    // 然后再調(diào)這個(gè)方法等待所有的線程真正地完成,返回值意味著有沒有超時(shí)
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    // 提交一個(gè) Callable 任務(wù)
    <T> Future<T> submit(Callable<T> task);
    // 提交一個(gè) Runnable 任務(wù),第二個(gè)參數(shù)將會(huì)放到 Future 中,作為返回值,
    // 因?yàn)?nbsp;Runnable 的 run 方法本身并不返回任何東西
    <T> Future<T> submit(Runnable task, T result);
    // 提交一個(gè) Runnable 任務(wù)
    Future<?> submit(Runnable task);
    // 執(zhí)行所有任務(wù),返回 Future 類型的一個(gè) list
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    // 也是執(zhí)行所有任務(wù),但是這里設(shè)置了超時(shí)時(shí)間
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
            throws InterruptedException;
    // 只有其中的一個(gè)任務(wù)結(jié)束了,就可以返回,返回執(zhí)行完的那個(gè)任務(wù)的結(jié)果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    // 同上一個(gè)方法,只有其中的一個(gè)任務(wù)結(jié)束了,就可以返回,返回執(zhí)行完的那個(gè)任務(wù)的結(jié)果,
    // 不過這個(gè)帶超時(shí),超過指定的時(shí)間,拋出 TimeoutException 異常
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

這些方法都很好理解,一個(gè)簡單的線程池主要就是這些功能,能提交任務(wù),能獲取結(jié)果,能關(guān)閉線程池,這也是為什么我們經(jīng)常用這個(gè)接口的原因。

FutureTask

在繼續(xù)往下層介紹 ExecutorService 的實(shí)現(xiàn)類之前,我們先來說說相關(guān)的類 FutureTask。

Future      Runnable
   \           /
    \         /
   RunnableFuture
          |
          |
      FutureTask
FutureTask 通過 RunnableFuture 間接實(shí)現(xiàn)了 Runnable 接口,
所以每個(gè) Runnable 通常都先包裝成 FutureTask,
然后調(diào)用 executor.execute(Runnable command) 將其提交給線程池

我們知道,Runnable 的 void run() 方法是沒有返回值的,所以,通常,如果我們需要的話,會(huì)在 submit 中指定第二個(gè)參數(shù)作為返回值:

<T> Future<T> submit(Runnable task, T result);

其實(shí)到時(shí)候會(huì)通過這兩個(gè)參數(shù),將其包裝成 Callable。它和 Runnable 的區(qū)別在于 run() 沒有返回值,而 Callable 的 call() 方法有返回值,同時(shí),如果運(yùn)行出現(xiàn)異常,call() 方法會(huì)拋出異常。

public interface Callable<V> {
    V call() throws Exception;
}

在這里,就不展開說 FutureTask 類了,因?yàn)楸疚钠緛砭蛪虼罅?,這里我們需要知道怎么用就行了。

下面,我們來看看 ExecutorService 的抽象實(shí)現(xiàn) AbstractExecutorService 。

AbstractExecutorService

AbstractExecutorService 抽象類派生自 ExecutorService 接口,然后在其基礎(chǔ)上實(shí)現(xiàn)了幾個(gè)實(shí)用的方法,這些方法提供給子類進(jìn)行調(diào)用。

這個(gè)抽象類實(shí)現(xiàn)了 invokeAny 方法和 invokeAll 方法,這里的兩個(gè) newTaskFor 方法也比較有用,用于將任務(wù)包裝成 FutureTask。定義于最上層接口 Executor中的 void execute(Runnable command) 由于不需要獲取結(jié)果,不會(huì)進(jìn)行 FutureTask 的包裝。

需要獲取結(jié)果(FutureTask),用 submit 方法,不需要獲取結(jié)果,可以用 execute 方法。

下面,我將一行一行源碼地來分析這個(gè)類,跟著源碼來看看其實(shí)現(xiàn)吧:

Tips: invokeAny 和 invokeAll 方法占了這整個(gè)類的絕大多數(shù)篇幅,讀者可以選擇適當(dāng)跳過,因?yàn)樗鼈兛赡茉谀愕膶?shí)踐中使用的頻次比較低,而且它們不帶有承前啟后的作用,不用擔(dān)心會(huì)漏掉什么導(dǎo)致看不懂后面的代碼。

public abstract class AbstractExecutorService implements ExecutorService {
    // RunnableFuture 是用于獲取執(zhí)行結(jié)果的,我們常用它的子類 FutureTask
    // 下面兩個(gè) newTaskFor 方法用于將我們的任務(wù)包裝成 FutureTask 提交到線程池中執(zhí)行
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    // 提交任務(wù)
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 1\. 將任務(wù)包裝成 FutureTask
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 2\. 交給執(zhí)行器執(zhí)行,execute 方法由具體的子類來實(shí)現(xiàn)
        // 前面也說了,F(xiàn)utureTask 間接實(shí)現(xiàn)了Runnable 接口。
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        // 1\. 將任務(wù)包裝成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task, result);
        // 2\. 交給執(zhí)行器執(zhí)行
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 1\. 將任務(wù)包裝成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        // 2\. 交給執(zhí)行器執(zhí)行
        execute(ftask);
        return ftask;
    }
    // 此方法目的:將 tasks 集合中的任務(wù)提交到線程池執(zhí)行,任意一個(gè)線程執(zhí)行完后就可以結(jié)束了
    // 第二個(gè)參數(shù) timed 代表是否設(shè)置超時(shí)機(jī)制,超時(shí)時(shí)間為第三個(gè)參數(shù),
    // 如果 timed 為 true,同時(shí)超時(shí)了還沒有一個(gè)線程返回結(jié)果,那么拋出 TimeoutException 異常
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        // 任務(wù)數(shù)
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        // 
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
        // ExecutorCompletionService 不是一個(gè)真正的執(zhí)行器,參數(shù) this 才是真正的執(zhí)行器
        // 它對(duì)執(zhí)行器進(jìn)行了包裝,每個(gè)任務(wù)結(jié)束后,將結(jié)果保存到內(nèi)部的一個(gè) completionQueue 隊(duì)列中
        // 這也是為什么這個(gè)類的名字里面有個(gè) Completion 的原因吧。
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        try {
            // 用于保存異常信息,此方法如果沒有得到任何有效的結(jié)果,那么我們可以拋出最后得到的一個(gè)異常
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();
            // 首先先提交一個(gè)任務(wù),后面的任務(wù)到下面的 for 循環(huán)一個(gè)個(gè)提交
            futures.add(ecs.submit(it.next()));
            // 提交了一個(gè)任務(wù),所以任務(wù)數(shù)量減 1
            --ntasks;
            // 正在執(zhí)行的任務(wù)數(shù)(提交的時(shí)候 +1,任務(wù)結(jié)束的時(shí)候 -1)
            int active = 1;
            for (;;) {
                // ecs 上面說了,其內(nèi)部有一個(gè) completionQueue 用于保存執(zhí)行完成的結(jié)果
                // BlockingQueue 的 poll 方法不阻塞,返回 null 代表隊(duì)列為空
                Future<T> f = ecs.poll();
                // 為 null,說明剛剛提交的第一個(gè)線程還沒有執(zhí)行完成
                // 在前面先提交一個(gè)任務(wù),加上這里做一次檢查,也是為了提高性能
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 這里是 else if,不是 if。這里說明,沒有任務(wù)了,同時(shí) active 為 0 說明
                    // 任務(wù)都執(zhí)行完成了。其實(shí)我也沒理解為什么這里做一次 break?
                    // 因?yàn)槲艺J(rèn)為 active 為 0 的情況,必然從下面的 f.get() 返回了
                    // 2018-02-23 感謝讀者 newmicro 的 comment,
                    //  這里的 active == 0,說明所有的任務(wù)都執(zhí)行失敗,那么這里是 for 循環(huán)出口
                    else if (active == 0)
                        break;
                    // 這里也是 else if。這里說的是,沒有任務(wù)了,但是設(shè)置了超時(shí)時(shí)間,這里檢測是否超時(shí)
                    else if (timed) {
                        // 帶等待的 poll 方法
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        // 如果已經(jīng)超時(shí),拋出 TimeoutException 異常,這整個(gè)方法就結(jié)束了
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    // 這里是 else。說明,沒有任務(wù)需要提交,但是池中的任務(wù)沒有完成,還沒有超時(shí)(如果設(shè)置了超時(shí))
                    // take() 方法會(huì)阻塞,直到有元素返回,說明有任務(wù)結(jié)束了
                    else
                        f = ecs.take();
                }
                /*
                 * 我感覺上面這一段并不是很好理解,這里簡單說下。
                 * 1\. 首先,這在一個(gè) for 循環(huán)中,我們?cè)O(shè)想每一個(gè)任務(wù)都沒那么快結(jié)束,
                 *     那么,每一次都會(huì)進(jìn)到第一個(gè)分支,進(jìn)行提交任務(wù),直到將所有的任務(wù)都提交了
                 * 2\. 任務(wù)都提交完成后,如果設(shè)置了超時(shí),那么 for 循環(huán)其實(shí)進(jìn)入了“一直檢測是否超時(shí)”
                       這件事情上
                 * 3\. 如果沒有設(shè)置超時(shí)機(jī)制,那么不必要檢測超時(shí),那就會(huì)阻塞在 ecs.take() 方法上,
                       等待獲取第一個(gè)執(zhí)行結(jié)果
                 * 4\. 如果所有的任務(wù)都執(zhí)行失敗,也就是說 future 都返回了,
                       但是 f.get() 拋出異常,那么從 active == 0 分支出去(感謝 newmicro 提出)
                         // 當(dāng)然,這個(gè)需要看下面的 if 分支。
                 */
                // 有任務(wù)結(jié)束了
                if (f != null) {
                    --active;
                    try {
                        // 返回執(zhí)行結(jié)果,如果有異常,都包裝成 ExecutionException
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }// 注意看 for 循環(huán)的范圍,一直到這里
            if (ee == null)
                ee = new ExecutionException();
            throw ee;
        } finally {
            // 方法退出之前,取消其他的任務(wù)
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    // 執(zhí)行所有的任務(wù),返回任務(wù)結(jié)果。
    // 先不要看這個(gè)方法,我們先想想,其實(shí)我們自己提交任務(wù)到線程池,也是想要線程池執(zhí)行所有的任務(wù)
    // 只不過,我們是每次 submit 一個(gè)任務(wù),這里以一個(gè)集合作為參數(shù)提交
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // 這個(gè)很簡單
            for (Callable<T> t : tasks) {
                // 包裝成 FutureTask
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                // 提交任務(wù)
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        // 這是一個(gè)阻塞方法,直到獲取到值,或拋出了異常
                        // 這里有個(gè)小細(xì)節(jié),其實(shí) get 方法簽名上是會(huì)拋出 InterruptedException 的
                        // 可是這里沒有進(jìn)行處理,而是拋給外層去了。此異常發(fā)生于還沒執(zhí)行完的任務(wù)被取消了
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            // 這個(gè)方法返回,不像其他的場景,返回 List<Future>,其實(shí)執(zhí)行結(jié)果還沒出來
            // 這個(gè)方法返回是真正的返回,任務(wù)都結(jié)束了
            return futures;
        } finally {
            // 為什么要這個(gè)?就是上面說的有異常的情況
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }
    // 帶超時(shí)的 invokeAll,我們找不同吧
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
            long lastTime = System.nanoTime();
            Iterator<Future<T>> it = futures.iterator();
            // 每提交一個(gè)任務(wù),檢測一次是否超時(shí)
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                // 超時(shí)
                if (nanos <= 0)
                    return futures;
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        // 調(diào)用帶超時(shí)的 get 方法,這里的參數(shù) nanos 是剩余的時(shí)間,
                        // 因?yàn)樯厦嫫鋵?shí)已經(jīng)用掉了一些時(shí)間了
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }
}

到這里,我們發(fā)現(xiàn),這個(gè)抽象類包裝了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啟線程來執(zhí)行任務(wù),它們都只是在方法內(nèi)部調(diào)用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法還沒出現(xiàn),需要等具體執(zhí)行器來實(shí)現(xiàn)這個(gè)最重要的部分,這里我們要說的就是 ThreadPoolExecutor 類了。

鑒于本文的篇幅,我覺得看到這里的讀者應(yīng)該已經(jīng)不多了,大家都習(xí)慣了快餐文化。我寫的每篇文章都力求讓讀者可以通過我的一篇文章而對(duì)相關(guān)內(nèi)容有全面的了解,所以篇幅不免長了些。

ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的線程池實(shí)現(xiàn),這個(gè)類實(shí)現(xiàn)了一個(gè)線程池需要的各個(gè)方法,它實(shí)現(xiàn)了任務(wù)提交、線程管理、監(jiān)控等等方法。

我們可以基于它來進(jìn)行業(yè)務(wù)上的擴(kuò)展,以實(shí)現(xiàn)我們需要的其他功能,比如實(shí)現(xiàn)定時(shí)任務(wù)的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor。當(dāng)然,這不是本文關(guān)注的重點(diǎn),下面,還是趕緊進(jìn)行源碼分析吧。

首先,我們來看看線程池實(shí)現(xiàn)中的幾個(gè)概念和處理流程。

我們先回顧下提交任務(wù)的幾個(gè)方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

一個(gè)最基本的概念是,submit 方法中,參數(shù)是 Runnable 類型(也有Callable 類型),這個(gè)參數(shù)不是用于 new Thread(runnable).start() 中的,此處的這個(gè)參數(shù)不是用于啟動(dòng)線程的,這里指的是任務(wù),任務(wù)要做的事情是 run() 方法里面定義的或 Callable 中的 call() 方法里面定義的。

初學(xué)者往往會(huì)搞混這個(gè),因?yàn)?Runnable 總是在各個(gè)地方出現(xiàn),經(jīng)常把一個(gè) Runnable 包到另一個(gè) Runnable 中。請(qǐng)把它想象成有個(gè) Task 接口,這個(gè)接口里面有一個(gè) run() 方法。

我們回過神來繼續(xù)往下看,我畫了一個(gè)簡單的示意圖來描述線程池中的一些主要的構(gòu)件:

怎樣深度解讀java線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn)

當(dāng)然,上圖沒有考慮隊(duì)列是否有界,提交任務(wù)時(shí)隊(duì)列滿了怎么辦?什么情況下會(huì)創(chuàng)建新的線程?提交任務(wù)時(shí)線程池滿了怎么辦?空閑線程怎么關(guān)掉?這些問題下面我們會(huì)一一解決。

我們經(jīng)常會(huì)使用 Executors 這個(gè)工具類來快速構(gòu)造一個(gè)線程池,對(duì)于初學(xué)者而言,這種工具類是很有用的,開發(fā)者不需要關(guān)注太多的細(xì)節(jié),只要知道自己需要一個(gè)線程池,僅僅提供必需的參數(shù)就可以了,其他參數(shù)都采用作者提供的默認(rèn)值。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

這里先不說有什么區(qū)別,它們最終都會(huì)導(dǎo)向這個(gè)構(gòu)造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 這幾個(gè)參數(shù)都是必須要有的
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

基本上,上面的構(gòu)造方法中列出了我們最需要關(guān)心的幾個(gè)屬性了,下面逐個(gè)介紹下構(gòu)造方法中出現(xiàn)的這幾個(gè)屬性:

  • corePoolSize

    核心線程數(shù),不要摳字眼,反正先記著有這么個(gè)屬性就可以了。

  • maximumPoolSize

    最大線程數(shù),線程池允許創(chuàng)建的最大線程數(shù)。

  • workQueue

    任務(wù)隊(duì)列,BlockingQueue 接口的某個(gè)實(shí)現(xiàn)(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。

  • keepAliveTime

    空閑線程的?;顣r(shí)間,如果某線程的空閑時(shí)間超過這個(gè)值都沒有任務(wù)給它做,那么可以被關(guān)閉了。注意這個(gè)值并不會(huì)對(duì)所有線程起作用,如果線程池中的線程數(shù)少于等于核心線程數(shù) corePoolSize,那么這些線程不會(huì)因?yàn)榭臻e太長時(shí)間而被關(guān)閉,當(dāng)然,也可以通過調(diào)用 allowCoreThreadTimeOut(true)使核心線程數(shù)內(nèi)的線程也可以被回收。

  • threadFactory

    用于生成線程,一般我們可以用默認(rèn)的就可以了。通常,我們可以通過它將我們的線程的名字設(shè)置得比較可讀一些,如 Message-Thread-1, Message-Thread-2 類似這樣。

  • handler:

    當(dāng)線程池已經(jīng)滿了,但是又有新的任務(wù)提交的時(shí)候,該采取什么策略由這個(gè)來指定。有幾種方式可供選擇,像拋出異常、直接拒絕然后返回等,也可以自己實(shí)現(xiàn)相應(yīng)的接口實(shí)現(xiàn)自己的邏輯,這個(gè)之后再說。

除了上面幾個(gè)屬性外,我們?cè)倏纯雌渌匾膶傩浴?/p>

Doug Lea 采用一個(gè) 32 位的整數(shù)來存放線程池的狀態(tài)和當(dāng)前池中的線程數(shù),其中高 3 位用于存放線程池狀態(tài),低 29 位表示線程數(shù)(即使只有 29 位,也已經(jīng)不小了,大概 5 億多,現(xiàn)在還沒有哪個(gè)機(jī)器能起這么多線程的吧)。我們知道,java 語言在整數(shù)編碼上是統(tǒng)一的,都是采用補(bǔ)碼的形式,下面是簡單的移位操作和布爾操作,都是挺簡單的。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 這里 COUNT_BITS 設(shè)置為 29(32-3),意味著前三位用于存放線程狀態(tài),后29位用于存放線程數(shù)
// 很多初學(xué)者很喜歡在自己的代碼中寫很多 29 這種數(shù)字,或者某個(gè)特殊的字符串,然后分布在各個(gè)地方,這是非常糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 這里得到的是 29 個(gè) 1,也就是說線程池的最大線程數(shù)是 2^29-1=536870911
// 以我們現(xiàn)在計(jì)算機(jī)的實(shí)際情況,這個(gè)數(shù)量還是夠用的
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 我們說了,線程池的狀態(tài)存放在高 3 位中
// 運(yùn)算結(jié)果為 111跟29個(gè)0:111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;
// 將整數(shù) c 的低 29 位修改為 0,就得到了線程池的狀態(tài)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 將整數(shù) c 的高 3 為修改為 0,就得到了線程池中的線程數(shù)
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

上面就是對(duì)一個(gè)整數(shù)的簡單的位操作,幾個(gè)操作方法將會(huì)在后面的源碼中一直出現(xiàn),所以讀者最好把方法名字和其代表的功能記住,看源碼的時(shí)候也就不需要來來回回翻了。

在這里,介紹下線程池中的各個(gè)狀態(tài)和狀態(tài)變化的轉(zhuǎn)換過程:

  • RUNNING:這個(gè)沒什么好說的,這是最正常的狀態(tài):接受新的任務(wù),處理等待隊(duì)列中的任務(wù)

  • SHUTDOWN:不接受新的任務(wù)提交,但是會(huì)繼續(xù)處理等待隊(duì)列中的任務(wù)

  • STOP:不接受新的任務(wù)提交,不再處理等待隊(duì)列中的任務(wù),中斷正在執(zhí)行任務(wù)的線程

  • TIDYING:所有的任務(wù)都銷毀了,workCount 為 0。線程池的狀態(tài)在轉(zhuǎn)換為 TIDYING 狀態(tài)時(shí),會(huì)執(zhí)行鉤子方法 terminated()

  • TERMINATED:terminated() 方法結(jié)束后,線程池的狀態(tài)就會(huì)變成這個(gè)

RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,所以等于 0 的時(shí)候不能提交任務(wù),大于 0 的話,連正在執(zhí)行的任務(wù)也需要中斷。

看了這幾種狀態(tài)的介紹,讀者大體也可以猜到十之八九的狀態(tài)轉(zhuǎn)換了,各個(gè)狀態(tài)的轉(zhuǎn)換過程有以下幾種:

  • RUNNING -> SHUTDOWN:當(dāng)調(diào)用了 shutdown() 后,會(huì)發(fā)生這個(gè)狀態(tài)轉(zhuǎn)換,這也是最重要的

  • (RUNNING or SHUTDOWN) -> STOP:當(dāng)調(diào)用 shutdownNow() 后,會(huì)發(fā)生這個(gè)狀態(tài)轉(zhuǎn)換,這下要清楚 shutDown() 和 shutDownNow() 的區(qū)別了

  • SHUTDOWN -> TIDYING:當(dāng)任務(wù)隊(duì)列和線程池都清空后,會(huì)由 SHUTDOWN 轉(zhuǎn)換為 TIDYING

  • STOP -> TIDYING:當(dāng)任務(wù)隊(duì)列清空后,發(fā)生這個(gè)轉(zhuǎn)換

  • TIDYING -> TERMINATED:這個(gè)前面說了,當(dāng) terminated() 方法結(jié)束后

上面的幾個(gè)記住核心的就可以了,尤其第一個(gè)和第二個(gè)。

另外,我們還要看看一個(gè)內(nèi)部類 Worker,因?yàn)?Doug Lea 把線程池中的線程包裝成了一個(gè)個(gè) Worker,翻譯成工人,就是線程池中做任務(wù)的線程。所以到這里,我們知道任務(wù)是 Runnable(內(nèi)部變量名叫 task 或 command),線程是 Worker

Worker 這里又用到了抽象類 AbstractQueuedSynchronizer。題外話,AQS 在并發(fā)中真的是到處出現(xiàn),而且非常容易使用,寫少量的代碼就能實(shí)現(xiàn)自己需要的同步方式(對(duì) AQS 源碼感興趣的讀者請(qǐng)參看我之前寫的幾篇文章)。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;
    // 這個(gè)是真正的線程,任務(wù)靠你啦
    final Thread thread;
    // 前面說了,這里的 Runnable 是任務(wù)。為什么叫 firstTask?因?yàn)樵趧?chuàng)建線程的時(shí)候,如果同時(shí)指定了
    // 這個(gè)線程起來以后需要執(zhí)行的第一個(gè)任務(wù),那么第一個(gè)任務(wù)就是存放在這里的(線程可不止執(zhí)行這一個(gè)任務(wù))
    // 當(dāng)然了,也可以為 null,這樣線程起來了,自己到任務(wù)隊(duì)列(BlockingQueue)中取任務(wù)(getTask 方法)就行了
    Runnable firstTask;
    // 用于存放此線程完成的任務(wù)數(shù),注意了,這里用了 volatile,保證可見性
    volatile long completedTasks;
    // Worker 只有這一個(gè)構(gòu)造方法,傳入 firstTask,也可以傳 null
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 調(diào)用 ThreadFactory 來創(chuàng)建一個(gè)新的線程
        this.thread = getThreadFactory().newThread(this);
    }
    // 這里調(diào)用了外部類的 runWorker 方法
    public void run() {
        runWorker(this);
    }
    ...// 其他幾個(gè)方法沒什么好看的,就是用 AQS 操作,來獲取這個(gè)線程的執(zhí)行權(quán),用了獨(dú)占鎖
}

前面雖然啰嗦,但是簡單。有了上面的這些基礎(chǔ)后,我們終于可以看看 ThreadPoolExecutor 的 execute 方法了,前面源碼分析的時(shí)候也說了,各種方法都最終依賴于 execute 方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 前面說的那個(gè)表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
    int c = ctl.get();
    // 如果當(dāng)前線程數(shù)少于核心線程數(shù),那么直接添加一個(gè) worker 來執(zhí)行任務(wù),
    // 創(chuàng)建一個(gè)新的線程,并把當(dāng)前任務(wù) command 作為這個(gè)線程的第一個(gè)任務(wù)(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 添加任務(wù)成功,那么就結(jié)束了。提交任務(wù)嘛,線程池已經(jīng)接受了這個(gè)任務(wù),這個(gè)方法也就可以返回了
        // 至于執(zhí)行的結(jié)果,到時(shí)候會(huì)包裝到 FutureTask 中。
        // 返回 false 代表線程池不允許提交任務(wù)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到這里說明,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了
    // 如果線程池處于 RUNNING 狀態(tài),把這個(gè)任務(wù)添加到任務(wù)隊(duì)列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        /* 這里面說的是,如果任務(wù)進(jìn)入了 workQueue,我們是否需要開啟新的線程
         * 因?yàn)榫€程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程
         * 如果線程數(shù)已經(jīng)大于等于 corePoolSize,那么將任務(wù)添加到隊(duì)列中,然后進(jìn)到這里
         */
        int recheck = ctl.get();
        // 如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊(duì)的這個(gè)任務(wù),并且執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池還是 RUNNING 的,并且線程數(shù)為 0,那么開啟新的線程
        // 到這里,我們知道了,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊(duì)列中了,但是線程都關(guān)閉了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 隊(duì)列滿了,那么進(jìn)入到這個(gè)分支
    // 以 maximumPoolSize 為界創(chuàng)建新的 worker,
    // 如果失敗,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

對(duì)創(chuàng)建線程的錯(cuò)誤理解:如果線程數(shù)少于 corePoolSize,創(chuàng)建一個(gè)線程,如果線程數(shù)在 [corePoolSize, maximumPoolSize] 之間那么可以創(chuàng)建線程或復(fù)用空閑線程,keepAliveTime 對(duì)這個(gè)區(qū)間的線程有效。

從上面的幾個(gè)分支,我們就可以看出,上面的這段話是錯(cuò)誤的。

上面這些一時(shí)半會(huì)也不可能全部消化搞定,我們先繼續(xù)往下吧,到時(shí)候再回頭看幾遍。

這個(gè)方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我們看看它是怎么創(chuàng)建新的線程的:

// 第一個(gè)參數(shù)是準(zhǔn)備提交給這個(gè)線程執(zhí)行的任務(wù),之前說了,可以為 null
// 第二個(gè)參數(shù)為 true 代表使用核心線程數(shù) corePoolSize 作為創(chuàng)建線程的界限,也就說創(chuàng)建這個(gè)線程的時(shí)候,
//         如果線程池中的線程總數(shù)已經(jīng)達(dá)到 corePoolSize,那么不能響應(yīng)這次創(chuàng)建線程的請(qǐng)求
//         如果是 false,代表使用最大線程數(shù) maximumPoolSize 作為界限
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 這個(gè)非常不好理解
        // 如果線程池已關(guān)閉,并滿足以下條件之一,那么不創(chuàng)建新的 worker:
        // 1\. 線程池狀態(tài)大于 SHUTDOWN,其實(shí)也就是 STOP, TIDYING, 或 TERMINATED
        // 2\. firstTask != null
        // 3\. workQueue.isEmpty()
        // 簡單分析下:
        // 還是狀態(tài)控制的問題,當(dāng)線程池處于 SHUTDOWN 的時(shí)候,不允許提交任務(wù),但是已有的任務(wù)繼續(xù)執(zhí)行
        // 當(dāng)狀態(tài)大于 SHUTDOWN 時(shí),不允許提交任務(wù),且中斷正在執(zhí)行的任務(wù)
        // 多說一句:如果線程池處于 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那么是允許創(chuàng)建 worker 的
        // 這是因?yàn)?nbsp;SHUTDOWN 的語義:不允許提交新的任務(wù),但是要把已經(jīng)進(jìn)入到 workQueue 的任務(wù)執(zhí)行完,所以在滿足條件的基礎(chǔ)上,是允許創(chuàng)建新的 Worker 的
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 如果成功,那么就是所有創(chuàng)建線程前的條件校驗(yàn)都滿足了,準(zhǔn)備創(chuàng)建線程執(zhí)行任務(wù)了
            // 這里失敗的話,說明有其他線程也在嘗試往線程池中創(chuàng)建線程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 由于有并發(fā),重新再讀取一下 ctl
            c = ctl.get();
            // 正常如果是 CAS 失敗的話,進(jìn)到下一個(gè)里層的for循環(huán)就可以了
            // 可是如果是因?yàn)槠渌€程的操作,導(dǎo)致線程池的狀態(tài)發(fā)生了變更,如有其他線程關(guān)閉了這個(gè)線程池
            // 那么需要回到外層的for循環(huán)
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    /* 
     * 到這里,我們認(rèn)為在當(dāng)前這個(gè)時(shí)刻,可以開始創(chuàng)建線程來執(zhí)行任務(wù)了,
     * 因?yàn)樵撔r?yàn)的都校驗(yàn)了,至于以后會(huì)發(fā)生什么,那是以后的事,至少當(dāng)前是滿足條件的
     */
    // worker 是否已經(jīng)啟動(dòng)
    boolean workerStarted = false;
    // 是否已將這個(gè) worker 添加到 workers 這個(gè) HashSet 中
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        // 把 firstTask 傳給 worker 的構(gòu)造方法
        w = new Worker(firstTask);
        // 取 worker 中的線程對(duì)象,之前說了,Worker的構(gòu)造方法會(huì)調(diào)用 ThreadFactory 來創(chuàng)建一個(gè)新的線程
        final Thread t = w.thread;
        if (t != null) {
            // 這個(gè)是整個(gè)線程池的全局鎖,持有這個(gè)鎖才能讓下面的操作“順理成章”,
            // 因?yàn)殛P(guān)閉一個(gè)線程池需要這個(gè)鎖,至少我持有鎖的期間,線程池不會(huì)被關(guān)閉
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
                // 小于 SHUTTDOWN 那就是 RUNNING,這個(gè)自不必說,是最正常的情況
                // 如果等于 SHUTDOWN,前面說了,不接受新的任務(wù),但是會(huì)繼續(xù)執(zhí)行等待隊(duì)列中的任務(wù)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker 里面的 thread 可不能是已經(jīng)啟動(dòng)的
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 加到 workers 這個(gè) HashSet 中
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用于記錄 workers 中的個(gè)數(shù)的最大值
                    // 因?yàn)?nbsp;workers 是不斷增加減少的,通過這個(gè)值可以知道線程池的大小曾經(jīng)達(dá)到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功的話,啟動(dòng)這個(gè)線程
            if (workerAdded) {
                // 啟動(dòng)線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果線程沒有啟動(dòng),需要做一些清理工作,如前面 workCount 加了 1,將其減掉
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回線程是否啟動(dòng)成功
    return workerStarted;
}

簡單看下 addWorkFailed 的處理:

// workers 中刪除掉相應(yīng)的 worker
// workCount 減 1
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        // rechecks for termination, in case the existence of this worker was holding up termination
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

回過頭來,繼續(xù)往下走。我們知道,worker 中的線程 start 后,其 run 方法會(huì)調(diào)用 runWorker 方法:

// Worker 類的 run() 方法
public void run() {
    runWorker(this);
}

繼續(xù)往下看 runWorker 方法:

// 此方法由 worker 線程啟動(dòng)后調(diào)用,這里用一個(gè) while 循環(huán)來不斷地從等待隊(duì)列中獲取任務(wù)并執(zhí)行
// 前面說了,worker 在初始化的時(shí)候,可以指定 firstTask,那么第一個(gè)任務(wù)也就可以不需要從隊(duì)列中獲取
final void runWorker(Worker w) {
    // 
    Thread wt = Thread.currentThread();
    // 該線程的第一個(gè)任務(wù)(如果有的話)
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循環(huán)調(diào)用 getTask 獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();          
            // 如果線程池狀態(tài)大于等于 STOP,那么意味著該線程也要中斷
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 這是一個(gè)鉤子方法,留給需要的子類實(shí)現(xiàn)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 到這里終于可以執(zhí)行任務(wù)了
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    // 這里不允許拋出 Throwable,所以轉(zhuǎn)換為 Error
                    thrown = x; throw new Error(x);
                } finally {
                    // 也是一個(gè)鉤子方法,將 task 和異常作為參數(shù),留給需要的子類實(shí)現(xiàn)
                    afterExecute(task, thrown);
                }
            } finally {
                // 置空 task,準(zhǔn)備 getTask 獲取下一個(gè)任務(wù)
                task = null;
                // 累加完成的任務(wù)數(shù)
                w.completedTasks++;
                // 釋放掉 worker 的獨(dú)占鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 如果到這里,需要執(zhí)行線程關(guān)閉:
        // 1\. 說明 getTask 返回 null,也就是說,隊(duì)列中已經(jīng)沒有任務(wù)需要執(zhí)行了,執(zhí)行關(guān)閉
        // 2\. 任務(wù)執(zhí)行過程中發(fā)生了異常
        // 第一種情況,已經(jīng)在代碼處理了將 workCount 減 1,這個(gè)在 getTask 方法分析中會(huì)說
        // 第二種情況,workCount 沒有進(jìn)行處理,所以需要在 processWorkerExit 中處理
        // 限于篇幅,我不準(zhǔn)備分析這個(gè)方法了,感興趣的讀者請(qǐng)自行分析源碼
        processWorkerExit(w, completedAbruptly);
    }
}

我們看看 getTask() 是怎么獲取任務(wù)的,這個(gè)方法寫得真的很好,每一行都很簡單,組合起來卻所有的情況都想好了:

// 此方法有三種可能:
// 1\. 阻塞直到獲取到任務(wù)返回。我們知道,默認(rèn) corePoolSize 之內(nèi)的線程是不會(huì)被回收的,
//      它們會(huì)一直等待任務(wù)
// 2\. 超時(shí)退出。keepAliveTime 起作用的時(shí)候,也就是如果這么多時(shí)間內(nèi)都沒有任務(wù),那么應(yīng)該執(zhí)行關(guān)閉
// 3\. 如果發(fā)生了以下條件,此方法必須返回 null:
//    - 池中有大于 maximumPoolSize 個(gè) workers 存在(通過調(diào)用 setMaximumPoolSize 進(jìn)行設(shè)置)
//    - 線程池處于 SHUTDOWN,而且 workQueue 是空的,前面說了,這種不再接受新的任務(wù)
//    - 線程池處于 STOP,不僅不接受新的線程,連 workQueue 中的線程也不再執(zhí)行
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 兩種可能
        // 1\. rs == SHUTDOWN && workQueue.isEmpty()
        // 2\. rs >= STOP
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // CAS 操作,減少工作線程數(shù)
            decrementWorkerCount();
            return null;
        }
        boolean timed;      // Are workers subject to culling?
        for (;;) {
            int wc = workerCountOf(c);
            // 允許核心線程數(shù)內(nèi)的線程回收,或當(dāng)前線程數(shù)超過了核心線程數(shù),那么有可能發(fā)生超時(shí)關(guān)閉
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 這里 break,是為了不往下執(zhí)行后一個(gè) if (compareAndDecrementWorkerCount(c))
            // 兩個(gè) if 一起看:如果當(dāng)前線程數(shù) wc > maximumPoolSize,或者超時(shí),都返回 null
            // 那這里的問題來了,wc > maximumPoolSize 的情況,為什么要返回 null?
            //    換句話說,返回 null 意味著關(guān)閉線程。
            // 那是因?yàn)橛锌赡荛_發(fā)者調(diào)用了 setMaximumPoolSize() 將線程池的 maximumPoolSize 調(diào)小了,那么多余的 Worker 就需要被關(guān)閉
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            // compareAndDecrementWorkerCount(c) 失敗,線程池中的線程數(shù)發(fā)生了改變
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
        // wc <= maximumPoolSize 同時(shí)沒有超時(shí)
        try {
            // 到 workQueue 中獲取任務(wù)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果此 worker 發(fā)生了中斷,采取的方案是重試
            // 解釋下為什么會(huì)發(fā)生中斷,這個(gè)讀者要去看 setMaximumPoolSize 方法。
            // 如果開發(fā)者將 maximumPoolSize 調(diào)小了,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量,
            // 那么意味著超出的部分線程要被關(guān)閉。重新進(jìn)入 for 循環(huán),自然會(huì)有部分線程會(huì)返回 null
            timedOut = false;
        }
    }
}

到這里,基本上也說完了整個(gè)流程,讀者這個(gè)時(shí)候應(yīng)該回到 execute(Runnable command) 方法,看看各個(gè)分支,我把代碼貼過來一下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 前面說的那個(gè)表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
    int c = ctl.get();
    // 如果當(dāng)前線程數(shù)少于核心線程數(shù),那么直接添加一個(gè) worker 來執(zhí)行任務(wù),
    // 創(chuàng)建一個(gè)新的線程,并把當(dāng)前任務(wù) command 作為這個(gè)線程的第一個(gè)任務(wù)(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 添加任務(wù)成功,那么就結(jié)束了。提交任務(wù)嘛,線程池已經(jīng)接受了這個(gè)任務(wù),這個(gè)方法也就可以返回了
        // 至于執(zhí)行的結(jié)果,到時(shí)候會(huì)包裝到 FutureTask 中。
        // 返回 false 代表線程池不允許提交任務(wù)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到這里說明,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了
    // 如果線程池處于 RUNNING 狀態(tài),把這個(gè)任務(wù)添加到任務(wù)隊(duì)列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        /* 這里面說的是,如果任務(wù)進(jìn)入了 workQueue,我們是否需要開啟新的線程
         * 因?yàn)榫€程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程
         * 如果線程數(shù)已經(jīng)大于等于 corePoolSize,那么將任務(wù)添加到隊(duì)列中,然后進(jìn)到這里
         */
        int recheck = ctl.get();
        // 如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊(duì)的這個(gè)任務(wù),并且執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池還是 RUNNING 的,并且線程數(shù)為 0,那么開啟新的線程
        // 到這里,我們知道了,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊(duì)列中了,但是線程都關(guān)閉了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 隊(duì)列滿了,那么進(jìn)入到這個(gè)分支
    // 以 maximumPoolSize 為界創(chuàng)建新的 worker,
    // 如果失敗,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

上面各個(gè)分支中,有兩種情況會(huì)調(diào)用 reject(command) 來處理任務(wù),因?yàn)榘凑照5牧鞒?,線程池此時(shí)不能接受這個(gè)任務(wù),所以需要執(zhí)行我們的拒絕策略。接下來,我們說一說 ThreadPoolExecutor 中的拒絕策略。

final void reject(Runnable command) {
    // 執(zhí)行拒絕策略
    handler.rejectedExecution(command, this);
}

此處的 handler 我們需要在構(gòu)造線程池的時(shí)候就傳入這個(gè)參數(shù),它是 RejectedExecutionHandler 的實(shí)例。

RejectedExecutionHandler 在 ThreadPoolExecutor 中有四個(gè)已經(jīng)定義好的實(shí)現(xiàn)類可供我們直接使用,當(dāng)然,我們也可以實(shí)現(xiàn)自己的策略,不過一般也沒有必要。

// 只要線程池沒有被關(guān)閉,那么由提交任務(wù)的線程自己來執(zhí)行這個(gè)任務(wù)。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
// 不管怎樣,直接拋出 RejectedExecutionException 異常
// 這個(gè)是默認(rèn)的策略,如果我們構(gòu)造線程池的時(shí)候不傳相應(yīng)的 handler 的話,那就會(huì)指定使用這個(gè)
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
// 不做任何處理,直接忽略掉這個(gè)任務(wù)
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
// 這個(gè)相對(duì)霸道一點(diǎn),如果線程池沒有被關(guān)閉的話,
// 把隊(duì)列隊(duì)頭的任務(wù)(也就是等待了最長時(shí)間的)直接扔掉,然后提交這個(gè)任務(wù)到等待隊(duì)列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

到這里,ThreadPoolExecutor 的源碼算是分析結(jié)束了。單純從源碼的難易程度來說,ThreadPoolExecutor 的源碼還算是比較簡單的,只是需要我們靜下心來好好看看罷了。

Executors

這節(jié)其實(shí)也不是分析 Executors 這個(gè)類,因?yàn)樗鼉H僅是工具類,它的所有方法都是 static 的。

  • 生成一個(gè)固定大小的線程池:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

最大線程數(shù)設(shè)置為與核心線程數(shù)相等,此時(shí) keepAliveTime 設(shè)置為 0(因?yàn)檫@里它是沒用的,即使不為 0,線程池默認(rèn)也不會(huì)回收 corePoolSize 內(nèi)的線程),任務(wù)隊(duì)列采用 LinkedBlockingQueue,無界隊(duì)列。

過程分析:剛開始,每提交一個(gè)任務(wù)都創(chuàng)建一個(gè) worker,當(dāng) worker 的數(shù)量達(dá)到 nThreads 后,不再創(chuàng)建新的線程,而是把任務(wù)提交到 LinkedBlockingQueue 中,而且之后線程數(shù)始終為 nThreads。

  • 生成只有一個(gè)線程的固定線程池,這個(gè)更簡單,和上面的一樣,只要設(shè)置線程數(shù)為 1 就可以了:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 生成一個(gè)需要的時(shí)候就創(chuàng)建新的線程,同時(shí)可以復(fù)用之前創(chuàng)建的線程(如果這個(gè)線程當(dāng)前沒有任務(wù))的線程池:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

核心線程數(shù)為 0,最大線程數(shù)為 Integer.MAX_VALUE,keepAliveTime 為 60 秒,任務(wù)隊(duì)列采用 SynchronousQueue。

這種線程池對(duì)于任務(wù)可以比較快速地完成的情況有比較好的性能。如果線程空閑了 60 秒都沒有任務(wù),那么將關(guān)閉此線程并從線程池中移除。所以如果線程池空閑了很長時(shí)間也不會(huì)有問題,因?yàn)殡S著所有的線程都會(huì)被關(guān)閉,整個(gè)線程池不會(huì)占用任何的系統(tǒng)資源。

過程分析:我把 execute 方法的主體黏貼過來,讓大家看得明白些。鑒于 corePoolSize 是 0,那么提交任務(wù)的時(shí)候,直接將任務(wù)提交到隊(duì)列中,由于采用了 SynchronousQueue,所以如果是第一個(gè)任務(wù)提交的時(shí)候,offer 方法肯定會(huì)返回 false,因?yàn)榇藭r(shí)沒有任何 worker 對(duì)這個(gè)任務(wù)進(jìn)行接收,那么將進(jìn)入到最后一個(gè)分支來創(chuàng)建第一個(gè) worker。之后再提交任務(wù)的話,取決于是否有空閑下來的線程對(duì)任務(wù)進(jìn)行接收,如果有,會(huì)進(jìn)入到第二個(gè) if 語句塊中,否則就是和第一個(gè)任務(wù)一樣,進(jìn)到最后的 else if 分支創(chuàng)建新線程。

int c = ctl.get();
// corePoolSize 為 0,所以不會(huì)進(jìn)到這個(gè) if 分支
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
// offer 如果有空閑線程剛好可以接收此任務(wù),那么返回 true,否則返回 false
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
else if (!addWorker(command, false))
    reject(command);

SynchronousQueue 是一個(gè)比較特殊的 BlockingQueue,其本身不儲(chǔ)存任何元素,它有一個(gè)虛擬隊(duì)列(或虛擬棧),不管讀操作還是寫操作,如果當(dāng)前隊(duì)列中存儲(chǔ)的是與當(dāng)前操作相同模式的線程,那么當(dāng)前操作也進(jìn)入隊(duì)列中等待;如果是相反模式,則配對(duì)成功,從當(dāng)前隊(duì)列中取隊(duì)頭節(jié)點(diǎn)。具體的信息,可以看我的另一篇關(guān)于 BlockingQueue 的文章。

總結(jié)

我一向不喜歡寫總結(jié),因?yàn)槲野阉行枰磉_(dá)的都寫在正文中了,寫小篇幅的總結(jié)并不能真正將話說清楚,本文的總結(jié)部分為準(zhǔn)備面試的讀者而寫,希望能幫到面試者或者沒有足夠的時(shí)間看完全文的讀者。

  1. java 線程池有哪些關(guān)鍵屬性?

    corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler

    corePoolSize 到 maximumPoolSize 之間的線程會(huì)被回收,當(dāng)然 corePoolSize 的線程也可以通過設(shè)置而得到回收(allowCoreThreadTimeOut(true))。

    workQueue 用于存放任務(wù),添加任務(wù)的時(shí)候,如果當(dāng)前線程數(shù)超過了 corePoolSize,那么往該隊(duì)列中插入任務(wù),線程池中的線程會(huì)負(fù)責(zé)到隊(duì)列中拉取任務(wù)。

    keepAliveTime 用于設(shè)置空閑時(shí)間,如果線程數(shù)超出了 corePoolSize,并且有些線程的空閑時(shí)間超過了這個(gè)值,會(huì)執(zhí)行關(guān)閉這些線程的操作

    rejectedExecutionHandler 用于處理當(dāng)線程池不能執(zhí)行此任務(wù)時(shí)的情況,默認(rèn)有拋出 RejectedExecutionException 異常、忽略任務(wù)使用提交任務(wù)的線程來執(zhí)行此任務(wù)將隊(duì)列中等待最久的任務(wù)刪除,然后提交此任務(wù)這四種策略,默認(rèn)為拋出異常。

  2. 說說線程池中的線程創(chuàng)建時(shí)機(jī)?

    • 注意:如果將隊(duì)列設(shè)置為無界隊(duì)列,那么線程數(shù)達(dá)到 corePoolSize 后,其實(shí)線程數(shù)就不會(huì)再增長了。因?yàn)楹竺娴娜蝿?wù)直接往隊(duì)列塞就行了,此時(shí) maximumPoolSize 參數(shù)就沒有什么意義。

    1. 如果當(dāng)前線程數(shù)少于 corePoolSize,那么提交任務(wù)的時(shí)候創(chuàng)建一個(gè)新的線程,并由這個(gè)線程執(zhí)行這個(gè)任務(wù);

    2. 如果當(dāng)前線程數(shù)已經(jīng)達(dá)到 corePoolSize,那么將提交的任務(wù)添加到隊(duì)列中,等待線程池中的線程去隊(duì)列中取任務(wù);

    3. 如果隊(duì)列已滿,那么創(chuàng)建新的線程來執(zhí)行任務(wù),需要保證池中的線程數(shù)不會(huì)超過 maximumPoolSize,如果此時(shí)線程數(shù)超過了 maximumPoolSize,那么執(zhí)行拒絕策略。

  3. Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 構(gòu)造出來的線程池有什么差別?

    細(xì)說太長,往上滑一點(diǎn)點(diǎn),在 Executors 的小節(jié)進(jìn)行了詳盡的描述。

  4. 任務(wù)執(zhí)行過程中發(fā)生異常怎么處理?

    如果某個(gè)任務(wù)執(zhí)行出現(xiàn)異常,那么執(zhí)行任務(wù)的線程會(huì)被關(guān)閉,而不是繼續(xù)接收其他任務(wù)。然后會(huì)啟動(dòng)一個(gè)新的線程來代替它。

  5. 什么時(shí)候會(huì)執(zhí)行拒絕策略?

    1. workers 的數(shù)量達(dá)到了 corePoolSize(任務(wù)此時(shí)需要進(jìn)入任務(wù)隊(duì)列),任務(wù)入隊(duì)成功,與此同時(shí)線程池被關(guān)閉了,而且關(guān)閉線程池并沒有將這個(gè)任務(wù)出隊(duì),那么執(zhí)行拒絕策略。這里說的是非常邊界的問題,入隊(duì)和關(guān)閉線程池并發(fā)執(zhí)行,讀者仔細(xì)看看 execute 方法是怎么進(jìn)到第一個(gè) reject(command) 里面的。

    2. workers 的數(shù)量大于等于 corePoolSize,將任務(wù)加入到任務(wù)隊(duì)列,可是隊(duì)列滿了,任務(wù)入隊(duì)失敗,那么準(zhǔn)備開啟新的線程,可是線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,那么執(zhí)行拒絕策略。

關(guān)于怎樣深度解讀java線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI