溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!

發(fā)布時間:2020-07-19 04:08:52 來源:網(wǎng)絡(luò) 閱讀:513 作者:架構(gòu)師追風(fēng) 欄目:編程語言

線程池

“線程池”,顧名思義就是一個線程緩存,線程是稀缺資源,如果被無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,因此Java中提供線程池對線程進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!

線程池介紹

在web開發(fā)中,服務(wù)器需要接受并處理請求,所以會為一個請求來分配一個線程來進行處理。如果每次請求都新創(chuàng)建一個線程的話實現(xiàn)起來非常簡便,但是存在一個問題:

如果并發(fā)的請求數(shù)量非常多,但每個線程執(zhí)行的時間很短,這樣就會頻繁的創(chuàng)建和銷毀線程,如此一來會大大降低系統(tǒng)的效率??赡艹霈F(xiàn)服務(wù)器在為每個請求創(chuàng)建新線程和銷毀線程上花費的時間和消耗的系統(tǒng)資源要比處理實際的用戶請求的時間和資源更多。

那么有沒有一種辦法使執(zhí)行完一個任務(wù),并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?

這就是線程池的目的了。線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務(wù)重用線程,線程創(chuàng)建的開銷被分攤到了多個任務(wù)上。

什么時候使用線程池?

  • 單個任務(wù)處理時間比較短

  • 需要處理的任務(wù)數(shù)量很大

線程池優(yōu)勢

  • 重用存在的線程,減少線程創(chuàng)建,消亡的開銷,提高性能

  • 提高響應(yīng)速度。當任務(wù)到達時,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。

  • 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。

線程的實現(xiàn)方式

Runnable,Thread,Callable

//?實現(xiàn)Runnable接口的類將被Thread執(zhí)行,表示一個基本的任務(wù)

public?interface?Runnable?{

//?run方法就是它所有的內(nèi)容,就是實際執(zhí)行的任務(wù)

public?abstract?void?run();

}

//Callable同樣是任務(wù),與Runnable接口的區(qū)別在于它接收泛型,同時它執(zhí)行任務(wù)后帶有返回內(nèi)容

public?interface?Callable<V>?{

//?相對于run方法的帶有返回值的call方法

V?call()?throws?Exception;

}

Executor框架

Executor接口是線程池框架中最基礎(chǔ)的部分,定義了一個用于執(zhí)行Runnable的execute方法。

下圖為它的繼承與實現(xiàn)

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!


從圖中可以看出Executor下有一個重要子接口ExecutorService,其中定義了線程池的具體行為

1,execute(Runnable command):履行Ruannable類型的任務(wù),

2,submit(task):可用來提交Callable或Runnable任務(wù),并返回代表此任務(wù)的Future對象

3,shutdown():在完成已提交的任務(wù)后封閉辦事,不再接管新任務(wù),

4,shutdownNow():停止所有正在履行的任務(wù)并封閉辦事。

5,isTerminated():測試是否所有任務(wù)都履行完畢了。

6,isShutdown():測試是否該ExecutorService已被關(guān)閉。

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!

線程池重點屬性

private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));

private?static?final?int?COUNT_BITS?=?Integer.SIZE?-?3;

private?static?final?int?CAPACITY?=?(1?<<?COUNT_BITS)?-?1;

ctl 是對線程池的運行狀態(tài)和線程池中有效線程的數(shù)量進行控制的一個字段, 它包含兩部分的信息: 線程池的運行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。


ctl相關(guān)方法

private?static?int?runStateOf(int?c)?{?return?c?&?~CAPACITY;?}

private?static?int?workerCountOf(int?c)?{?return?c?&?CAPACITY;?}

private?static?int?ctlOf(int?rs,?int?wc)?{?return?rs?|?wc;?}
  • runStateOf:獲取運行狀態(tài);

  • workerCountOf:獲取活動線程數(shù);

  • ctlOf:獲取運行狀態(tài)和活動線程數(shù)的值。

線程池存在5種狀態(tài)

RUNNING?=?-1?<<?COUNT_BITS;?//高3位為111

SHUTDOWN?=?0?<<?COUNT_BITS;?//高3位為000

STOP?=?1?<<?COUNT_BITS;?//高3位為001

TIDYING?=?2?<<?COUNT_BITS;?//高3位為010

TERMINATED?=?3?<<?COUNT_BITS;?//高3位為011

1、RUNNING

(1) 狀態(tài)說明:線程池處在RUNNING狀態(tài)時,能夠接收新任務(wù),以及對已添加的任務(wù)進行處理。

(02) 狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING。換句話說,線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0!

2、 SHUTDOWN

(1) 狀態(tài)說明:線程池處在SHUTDOWN狀態(tài)時,不接收新任務(wù),但能處理已添加的任務(wù)。

(2) 狀態(tài)切換:調(diào)用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。

3、STOP

(1) 狀態(tài)說明:線程池處在STOP狀態(tài)時,不接收新任務(wù),不處理已添加的任務(wù),并且會中斷正在處理的任務(wù)。

(2) 狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。

4、TIDYING

(1) 狀態(tài)說明:當所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會變?yōu)門IDYING狀態(tài)。當線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時,進行相應(yīng)的處理;可以通過重載terminated()函數(shù)來實現(xiàn)。

(2) 狀態(tài)切換:當線程池在SHUTDOWN狀態(tài)下,阻塞隊列為空并且線程池中執(zhí)行的任務(wù)也為空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態(tài)下,線程池中執(zhí)行的任務(wù)為空時,就會由STOP -> TIDYING。

5、 TERMINATED

(1) 狀態(tài)說明:線程池徹底終止,就變成TERMINATED狀態(tài)。

(2) 狀態(tài)切換:線程池處在TIDYING狀態(tài)時,執(zhí)行完terminated()之后,就會由 TIDYING -> TERMINATED。

進入TERMINATED的條件如下:

  • 線程池不是RUNNING狀態(tài);

  • 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài);

  • 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空;

  • workerCount為0;

  • 設(shè)置TIDYING狀態(tài)成功。

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!

線程池的具體實現(xiàn)

ThreadPoolExecutor 默認線程池

ScheduledThreadPoolExecutor 定時線程池

ThreadPoolExecutor線程池的創(chuàng)建

public?ThreadPoolExecutor(int?corePoolSize,
??????????????????????????int?maximumPoolSize,
??????????????????????????long?keepAliveTime,
??????????????????????????TimeUnit?unit,
??????????????????????????BlockingQueue<Runnable>?workQueue,
??????????????????????????ThreadFactory?threadFactory,
??????????????????????????RejectedExecutionHandler?handler)

任務(wù)提交

1、public?void?execute()?//提交任務(wù)無返回值
2、public?Future<?>?submit()?//任務(wù)執(zhí)行完成后有返回值


參數(shù)解釋

corePoolSize

線程池中的核心線程數(shù),當提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當前線程數(shù)等于corePoolSize;如果當前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。

maximumPoolSize

線程池中允許的最大線程數(shù)。如果當前阻塞隊列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當前線程數(shù)小于maximumPoolSize;

keepAliveTime

線程池維護線程所允許的空閑時間。當線程池中的線程數(shù)量大于corePoolSize的時候,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;

unit

keepAliveTime的單位;

workQueue

用來保存等待被執(zhí)行的任務(wù)的阻塞隊列,且任務(wù)必須實現(xiàn)Runable接口,在JDK中提供了如下阻塞隊列:

  • 1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務(wù);

  • 2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;

  • 3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene;

  • 4、priorityBlockingQuene:具有優(yōu)先級的***阻塞隊列;

threadFactory

它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認使用Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認的ThreadFactory來創(chuàng)建線程時,會使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級并且是非守護線程,同時也設(shè)置了線程的名稱。歡迎大家關(guān)注我的公種浩【程序員追風(fēng)】,整理了2019年多家公司java面試題資料100多頁pdf文檔,文章都會在里面更新,整理的資料也會放在里面。

handler

線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:

  • 1、AbortPolicy:直接拋出異常,默認策略;

  • 2、CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);

  • 3、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當前任務(wù);

  • 4、DiscardPolicy:直接丟棄任務(wù);

上面的4種策略都是ThreadPoolExecutor的內(nèi)部類。

當然也可以根據(jù)應(yīng)用場景實現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!

線程池監(jiān)控

public?long?getTaskCount()?//線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù)
public?long?getCompletedTaskCount()?//已完成的任務(wù)數(shù)
public?int?getPoolSize()?//線程池當前的線程數(shù)
public?int?getActiveCount()?//線程池中正在執(zhí)行任務(wù)的線程數(shù)量

線程池原理

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!


源碼分析

execute方法

public?void?execute(Runnable?command)?{
????if?(command?==?null)
????????throw?new?NullPointerException();
/*
?*?clt記錄著runState和workerCount
?*/
????int?c?=?ctl.get();
/*
?*?workerCountOf方法取出低29位的值,表示當前活動的線程數(shù);
?*?如果當前活動線程數(shù)小于corePoolSize,則新建一個線程放入線程池中;
?*?并把任務(wù)添加到該線程中。
?*/
????if?(workerCountOf(c)?<?corePoolSize)?{
????????/*
?????????*?addWorker中的第二個參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來判斷還是maximumPoolSize來判斷;
?????????*?如果為true,根據(jù)corePoolSize來判斷;
?????????*?如果為false,則根據(jù)maximumPoolSize來判斷
?????????*/
????????if?(addWorker(command,?true))
????????????return;
/*
?*?如果添加失敗,則重新獲取ctl值
?*/
????????c?=?ctl.get();
????}
/*
?*?如果當前線程池是運行狀態(tài)并且任務(wù)添加到隊列成功
?*/
????if?(isRunning(c)?&&?workQueue.offer(command))?{
????????//?重新獲取ctl值
????????int?recheck?=?ctl.get();
?????????//?再次判斷線程池的運行狀態(tài),如果不是運行狀態(tài),由于之前已經(jīng)把command添加到workQueue中了,
????????//?這時需要移除該command
????????//?執(zhí)行過后通過handler使用拒絕策略對該任務(wù)進行處理,整個方法返回
????????if?(!?isRunning(recheck)?&&?remove(command))
????????????reject(command);
????????/*
?????????*?獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法
?????????*?這里傳入的參數(shù)表示:
?????????*?1.?第一個參數(shù)為null,表示在線程池中創(chuàng)建一個線程,但不去啟動;
?????????*?2.?第二個參數(shù)為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize,添加線程時根據(jù)maximumPoolSize來判斷;
?????????*?如果判斷workerCount大于0,則直接返回,在workQueue中新增的command會在將來的某個時刻被執(zhí)行。
?????????*/
????????else?if?(workerCountOf(recheck)?==?0)
????????????addWorker(null,?false);
????}
/*
?*?如果執(zhí)行到這里,有兩種情況:
?*?1.?線程池已經(jīng)不是RUNNING狀態(tài);
?*?2.?線程池是RUNNING狀態(tài),但workerCount?>=?corePoolSize并且workQueue已滿。
?*?這時,再次調(diào)用addWorker方法,但第二個參數(shù)傳入為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize;
?*?如果失敗則拒絕該任務(wù)
?*/
????else?if?(!addWorker(command,?false))
????????reject(command);
}

簡單來說,在執(zhí)行execute()方法時如果狀態(tài)一直是RUNNING時,的執(zhí)行過程如下:

  1. 如果workerCount < corePoolSize,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù);

  2. 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊列未滿,則將任務(wù)添加到該阻塞隊列中;

  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù);

  4. 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認的處理方式是直接拋異常。

這里要注意一下addWorker(null, false);,也就是創(chuàng)建一個線程,但并沒有傳入任務(wù),因為任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時候,會直接從workQueue中獲取任務(wù)。所以,在workerCountOf(recheck) == 0時執(zhí)行addWorker(null, false);也是為了保證線程池在RUNNING狀態(tài)下必須要有一個線程來執(zhí)行任務(wù)。

execute方法執(zhí)行流程如下:

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!


addWorker方法

addWorker方法的主要工作是在線程池中創(chuàng)建一個新的線程并執(zhí)行,firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個任務(wù),core參數(shù)為true表示在新增線程時會判斷當前活動線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當前活動線程數(shù)是否少于maximumPoolSize,代碼如下:

private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
????retry:
????for?(;;)?{
????????int?c?=?ctl.get();
????//?獲取運行狀態(tài)
????????int?rs?=?runStateOf(c);
????/*
?????*?這個if判斷
?????*?如果rs?>=?SHUTDOWN,則表示此時不再接收新任務(wù);
?????*?接著判斷以下3個條件,只要有1個不滿足,則返回false:
?????*?1.?rs?==?SHUTDOWN,這時表示關(guān)閉狀態(tài),不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊列中已保存的任務(wù)
?????*?2.?firsTask為空
?????*?3.?阻塞隊列不為空
?????*?
?????*?首先考慮rs?==?SHUTDOWN的情況
?????*?這種情況下不會接受新提交的任務(wù),所以在firstTask不為空的時候會返回false;
?????*?然后,如果firstTask為空,并且workQueue也為空,則返回false,
?????*?因為隊列中已經(jīng)沒有任務(wù)了,不需要再添加線程了
?????*/
?????//?Check?if?queue?empty?only?if?necessary.
????????if?(rs?>=?SHUTDOWN?&&
????????????????!?(rs?==?SHUTDOWN?&&
????????????????????????firstTask?==?null?&&
????????????????????????!?workQueue.isEmpty()))
????????????return?false;
????????for?(;;)?{
????????????//?獲取線程數(shù)
????????????int?wc?=?workerCountOf(c);
????????????//?如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;
????????????//?這里的core是addWorker方法的第二個參數(shù),如果為true表示根據(jù)corePoolSize來比較,
????????????//?如果為false則根據(jù)maximumPoolSize來比較。
????????????//?
????????????if?(wc?>=?CAPACITY?||
????????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
????????????????return?false;
????????????//?嘗試增加workerCount,如果成功,則跳出第一個for循環(huán)
????????????if?(compareAndIncrementWorkerCount(c))
????????????????break?retry;
????????????//?如果增加workerCount失敗,則重新獲取ctl的值
????????????c?=?ctl.get();??//?Re-read?ctl
????????????//?如果當前的運行狀態(tài)不等于rs,說明狀態(tài)已被改變,返回第一個for循環(huán)繼續(xù)執(zhí)行
????????????if?(runStateOf(c)?!=?rs)
????????????????continue?retry;
????????????//?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
????????}
????}
????boolean?workerStarted?=?false;
????boolean?workerAdded?=?false;
????Worker?w?=?null;
????try?{
?????//?根據(jù)firstTask來創(chuàng)建Worker對象
????????w?=?new?Worker(firstTask);
?????//?每一個Worker對象都會創(chuàng)建一個線程
????????final?Thread?t?=?w.thread;
????????if?(t?!=?null)?{
????????????final?ReentrantLock?mainLock?=?this.mainLock;
????????????mainLock.lock();
????????????try?{
????????????????int?rs?=?runStateOf(ctl.get());
????????????????//?rs?<?SHUTDOWN表示是RUNNING狀態(tài);
????????????????//?如果rs是RUNNING狀態(tài)或者rs是SHUTDOWN狀態(tài)并且firstTask為null,向線程池中添加線程。
????????????????//?因為在SHUTDOWN時不會在添加新的任務(wù),但還是會執(zhí)行workQueue中的任務(wù)
????????????????if?(rs?<?SHUTDOWN?||
????????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
????????????????????if?(t.isAlive())?//?precheck?that?t?is?startable
????????????????????????throw?new?IllegalThreadStateException();
????????????????????//?workers是一個HashSet
????????????????????workers.add(w);
????????????????????int?s?=?workers.size();
????????????????????//?largestPoolSize記錄著線程池中出現(xiàn)過的最大線程數(shù)量
????????????????????if?(s?>?largestPoolSize)
????????????????????????largestPoolSize?=?s;
????????????????????workerAdded?=?true;
????????????????}
????????????}?finally?{
????????????????mainLock.unlock();
????????????}
????????????if?(workerAdded)?{
????????????????//?啟動線程
????????????????t.start();
????????????????workerStarted?=?true;
????????????}
????????}
????}?finally?{
????????if?(!?workerStarted)
????????????addWorkerFailed(w);
????}
????return?workerStarted;
}

Worker類

線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象,請參見JDK源碼。

Worker類繼承了AQS,并實現(xiàn)了Runnable接口,注意其中的firstTask和thread屬性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。

在調(diào)用構(gòu)造方法時,需要把任務(wù)傳入,這里通過getThreadFactory().newThread(this);來新建一個線程,newThread方法傳入的參數(shù)是this,因為Worker本身繼承了Runnable接口,也就是一個線程,所以一個Worker對象在啟動的時候會調(diào)用Worker類中的run方法。

Worker繼承了AQS,使用AQS來實現(xiàn)獨占鎖的功能。為什么不使用ReentrantLock來實現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:

  1. lock方法一旦獲取了獨占鎖,表示當前線程正在執(zhí)行任務(wù)中;

  2. 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;

  3. 如果該線程現(xiàn)在不是獨占鎖的狀態(tài),也就是空閑的狀態(tài),說明它沒有在處理任務(wù),這時可以對該線程進行中斷;

  4. 線程池在執(zhí)行shutdown方法或tryTerminate方法時會調(diào)用interruptIdleWorkers方法來中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態(tài);

  5. 之所以設(shè)置為不可重入,是因為我們不希望任務(wù)在調(diào)用像setCorePoolSize這樣的線程池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果在任務(wù)中調(diào)用了如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。

所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。

此外,在構(gòu)造方法中執(zhí)行了setState(-1);,把state變量設(shè)置為-1,為什么這么做呢?是因為AQS中默認的state是0,如果剛創(chuàng)建了一個Worker對象,還沒有執(zhí)行任務(wù)時,這時就不應(yīng)該被中斷,看一下tryAquire方法:

protected?boolean?tryAcquire(int?unused)?{
????//cas修改state,不可重入
????if?(compareAndSetState(0,?1))?{?
????????setExclusiveOwnerThread(Thread.currentThread());
????????return?true;
????}
????return?false;
}

tryAcquire方法是根據(jù)state是否是0來判斷的,所以,setState(-1);將state設(shè)置為-1是為了禁止在執(zhí)行任務(wù)前對線程進行中斷。

正因為如此,在runWorker方法中會先調(diào)用Worker對象的unlock方法將state設(shè)置為0。

runWorker方法

在Worker類中的run方法調(diào)用了runWorker方法來執(zhí)行任務(wù),runWorker方法的代碼如下:

final?void?runWorker(Worker?w)?{
????Thread?wt?=?Thread.currentThread();
????//?獲取第一個任務(wù)
????Runnable?task?=?w.firstTask;
????w.firstTask?=?null;
????//?允許中斷
????w.unlock();?//?allow?interrupts
????//?是否因為異常退出循環(huán)
????boolean?completedAbruptly?=?true;
????try?{
????????//?如果task為空,則通過getTask來獲取任務(wù)
????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
????????????w.lock();
????????????if?((runStateAtLeast(ctl.get(),?STOP)?||
????????????????????(Thread.interrupted()?&&
????????????????????????????runStateAtLeast(ctl.get(),?STOP)))?&&
????????????????????!wt.isInterrupted())
????????????????wt.interrupt();
????????????try?{
????????????????beforeExecute(wt,?task);
????????????????Throwable?thrown?=?null;
????????????????try?{
????????????????????task.run();
????????????????}?catch?(RuntimeException?x)?{
????????????????????thrown?=?x;?throw?x;
????????????????}?catch?(Error?x)?{
????????????????????thrown?=?x;?throw?x;
????????????????}?catch?(Throwable?x)?{
????????????????????thrown?=?x;?throw?new?Error(x);
????????????????}?finally?{
????????????????????afterExecute(task,?thrown);
????????????????}
????????????}?finally?{
????????????????task?=?null;
????????????????w.completedTasks++;
????????????????w.unlock();
????????????}
????????}
????????completedAbruptly?=?false;
????}?finally?{
????????processWorkerExit(w,?completedAbruptly);
????}
}

這里說明一下第一個if判斷,目的是:

  • 如果線程池正在停止,那么要保證當前線程是中斷狀態(tài);

  • 如果不是的話,則要保證當前線程不是中斷狀態(tài);

這里要考慮在執(zhí)行該if語句期間可能也執(zhí)行了shutdownNow方法,shutdownNow方法會把狀態(tài)設(shè)置為STOP,回顧一下STOP狀態(tài):

不能接受新任務(wù),也不處理隊列中的任務(wù),會中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時,調(diào)用 shutdownNow() 方法會使線程池進入到該狀態(tài)。

STOP狀態(tài)要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態(tài)時線程是非中斷狀態(tài)的,因為Thread.interrupted()方法會復(fù)位中斷的狀態(tài)。

總結(jié)一下runWorker方法的執(zhí)行過程:

  1. while循環(huán)不斷地通過getTask()方法獲取任務(wù);

  2. getTask()方法從阻塞隊列中取任務(wù);

  3. 如果線程池正在停止,那么要保證當前線程是中斷狀態(tài),否則要保證當前線程不是中斷狀態(tài);

  4. 調(diào)用task.run()執(zhí)行任務(wù);

  5. 如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;

  6. runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。

這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實現(xiàn)。

completedAbruptly變量來表示在執(zhí)行任務(wù)過程中是否出現(xiàn)了異常,在processWorkerExit方法中會對該變量的值進行判斷。


getTask方法

getTask方法用來從阻塞隊列中取任務(wù),代碼如下:

private?Runnable?getTask()?{
????//?timeOut變量的值表示上次從阻塞隊列中取任務(wù)時是否超時
????boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?
????for?(;;)?{
????????int?c?=?ctl.get();
????????int?rs?=?runStateOf(c);
????????//?Check?if?queue?empty?only?if?necessary.
????/*
?????*?如果線程池狀態(tài)rs?>=?SHUTDOWN,也就是非RUNNING狀態(tài),再進行以下判斷:
?????*?1.?rs?>=?STOP,線程池是否正在stop;
?????*?2.?阻塞隊列是否為空。
?????*?如果以上條件滿足,則將workerCount減1并返回null。
?????*?因為如果當前線程池狀態(tài)的值是SHUTDOWN或以上時,不允許再向阻塞隊列中添加任務(wù)。
?????*/
????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
????????????decrementWorkerCount();
????????????return?null;
????????}
????????int?wc?=?workerCountOf(c);
????????//?Are?workers?subject?to?culling?
????????//?timed變量用于判斷是否需要進行超時控制。
????????//?allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;
????????//?wc?>?corePoolSize,表示當前線程池中的線程數(shù)量大于核心線程數(shù)量;
????????//?對于超過核心線程數(shù)量的這些線程,需要進行超時控制
????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;

????/*
?????*?wc?>?maximumPoolSize的情況是因為可能在此方法執(zhí)行階段同時執(zhí)行了setMaximumPoolSize方法;
?????*?timed?&&?timedOut?如果為true,表示當前操作需要進行超時控制,并且上次從阻塞隊列中獲取任務(wù)發(fā)生了超時
?????*?接下來判斷,如果有效線程數(shù)量大于1,或者阻塞隊列是空的,那么嘗試將workerCount減1;
?????*?如果減1失敗,則返回重試。
?????*?如果wc?==?1時,也就說明當前線程是線程池中唯一的一個線程了。
?????*/
????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
????????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
????????????if?(compareAndDecrementWorkerCount(c))
????????????????return?null;
????????????continue;
????????}
????????try?{
????????/*
?????????*?根據(jù)timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內(nèi)沒有獲取到任務(wù),則返回null;
?????????*?否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。
?????????*
?????????*/
????????????Runnable?r?=?timed??
????????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
????????????????????workQueue.take();
????????????if?(r?!=?null)
????????????????return?r;
????????????//?如果?r?==?null,說明已經(jīng)超時,timedOut設(shè)置為true
????????????timedOut?=?true;
????????}?catch?(InterruptedException?retry)?{
????????????//?如果獲取任務(wù)時當前線程發(fā)生了中斷,則設(shè)置timedOut為false并返回循環(huán)重試
????????????timedOut?=?false;
????????}
????}
}

這里重要的地方是第二個if判斷,目的是控制線程池的有效線程數(shù)量。由上文中的分析可以知道,在執(zhí)行execute方法時,如果當前線程池的線程數(shù)量超過了corePoolSize且小于maximumPoolSize,并且workQueue已滿時,則可以增加工作線程,但這時如果超時沒有獲取到任務(wù),也就是timedOut為true的情況,說明workQueue已經(jīng)為空了,也就說明了當前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于corePoolSize數(shù)量的線程銷毀掉,保持線程數(shù)量在corePoolSize即可。

什么時候會銷毀?當然是runWorker方法執(zhí)行完之后,也就是Worker中的run方法執(zhí)行完,由JVM自動回收。

getTask方法返回null時,在runWorker方法中會跳出while循環(huán),然后會執(zhí)行processWorkerExit方法。


processWorkerExit方法

private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
????//?如果completedAbruptly值為true,則說明線程執(zhí)行時出現(xiàn)了異常,需要將workerCount減1;
????//?如果線程執(zhí)行時沒有出現(xiàn)異常,說明在getTask()方法中已經(jīng)已經(jīng)對workerCount進行了減1操作,這里就不必再減了。??
????if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
????????decrementWorkerCount();
????final?ReentrantLock?mainLock?=?this.mainLock;
????mainLock.lock();
????try?{
????????//統(tǒng)計完成的任務(wù)數(shù)
????????completedTaskCount?+=?w.completedTasks;
????????//?從workers中移除,也就表示著從線程池中移除了一個工作線程
????????workers.remove(w);
????}?finally?{
????????mainLock.unlock();
????}
????//?根據(jù)線程池狀態(tài)進行判斷是否結(jié)束線程池
????tryTerminate();
????int?c?=?ctl.get();
/*
?*?當線程池是RUNNING或SHUTDOWN狀態(tài)時,如果worker是異常結(jié)束,那么會直接addWorker;
?*?如果allowCoreThreadTimeOut=true,并且等待隊列有任務(wù),至少保留一個worker;
?*?如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
?*/
????if?(runStateLessThan(c,?STOP))?{
????????if?(!completedAbruptly)?{
????????????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
????????????if?(min?==?0?&&?!?workQueue.isEmpty())
????????????????min?=?1;
????????????if?(workerCountOf(c)?>=?min)
????????????????return;?//?replacement?not?needed
????????}
????????addWorker(null,?false);
????}
}

至此,processWorkerExit執(zhí)行完之后,工作線程被銷毀,以上就是整個工作線程的生命周期,從execute方法開始,Worker使用ThreadFactory創(chuàng)建新的工作線程,runWorker通過getTask獲取任務(wù),然后執(zhí)行任務(wù),如果getTask返回null,進入processWorkerExit方法,整個線程結(jié)束,如圖所示:

Java線程池原理與源碼詳細解讀,再也不怕面試問線程池了!


總結(jié)

  • 分析了線程的創(chuàng)建,任務(wù)的提交,狀態(tài)的轉(zhuǎn)換以及線程池的關(guān)閉;

  • 這里通過execute方法來展開線程池的工作流程,execute方法通過corePoolSize,maximumPoolSize以及阻塞隊列的大小來判斷決定傳入的任務(wù)應(yīng)該被立即執(zhí)行,還是應(yīng)該添加到阻塞隊列中,還是應(yīng)該拒絕任務(wù)。

  • 介紹了線程池關(guān)閉時的過程,也分析了shutdown方法與getTask方法存在競態(tài)條件;

  • 在獲取任務(wù)時,要通過線程池的狀態(tài)來判斷應(yīng)該結(jié)束工作線程還是阻塞線程等待新的任務(wù),也解釋了為什么關(guān)閉線程池時要中斷工作線程以及為什么每一個worker都需要lock。

最后

歡迎大家一起交流,喜歡文章記得點個贊喲,感謝支持!


向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI