您好,登錄后才能下訂單哦!
小編給大家分享一下Java線程池的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
我們知道創(chuàng)建線程的常用方式就是 new Thread() ,而每一次 new Thread() 都會重新創(chuàng)建一個線程,而線程的創(chuàng)建和銷毀都需要耗時的,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性。在 jdk1.5 的 JUC 包中有一個 Executors,他能使我們創(chuàng)建的線程得到復(fù)用,不會頻繁的創(chuàng)建和銷毀線程。
線程池首先創(chuàng)建一些線程,它們的集合稱為線程池。使用線程池可以很好地提高性能,線程池在系統(tǒng)啟動時即創(chuàng)建大量空閑的線程,程序?qū)⒁粋€任務(wù)傳給線程池,線程池就會啟動一條線程來執(zhí)行這個任務(wù),執(zhí)行結(jié)束以后,該線程并不會死亡,而是再次返回線程池中成為空閑狀態(tài),等待執(zhí)行下一個任務(wù)。
先不管它到底是個啥,先看看使用線程池和 new Thread() 的耗時情況:
public class ThreadPoolTest { static CountDownLatch latch = new CountDownLatch(100000); static ExecutorService es = Executors.newFixedThreadPool(4); public static void main(String[] args) throws InterruptedException { long timeStart = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { newThread(); //executors(); } latch.await(); System.out.println(System.currentTimeMillis() - timeStart); es.shutdown(); } /** * 使用線程池 */ public static void executors() { es.submit(() -> { latch.countDown(); }); } /** * 直接new */ public static void newThread() { new Thread(() -> { latch.countDown(); }).start(); } }
對于 10 萬個線程同時跑,如果使用 new 的方式耗時:
使用線程池耗時:
總得來說,合理的使用線程池可以帶來以下幾個好處:
1.降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的消耗。
2.提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
3.增加線程的可管理性。線程是稀缺資源,使用線程池可以進(jìn)行統(tǒng)一分配,調(diào)優(yōu)和監(jiān)控。
我們先了解線程池的思路,哪怕你重來沒了解過什么是線程池,所以不會一上來就給你講一堆線程池的參數(shù)。我嘗試多種想法來解釋它的設(shè)計思路,但都過于官方,但在查找資料的時候在博客上看到了非常通俗易懂的描述,它是這樣描述的,先假想一個工廠的生產(chǎn)流程:
工廠中有固定的一批工人,稱為正式工人,工廠接收的訂單由這些工人去完成。當(dāng)訂單增加,正式工人已經(jīng)忙不過來了,工廠會將生產(chǎn)原料暫時堆積在倉庫中,等有空閑的工人時再處理(因為工人空閑了也不會主動處理倉庫中的生產(chǎn)任務(wù),所以需要調(diào)度員實時調(diào)度)。倉庫堆積滿了后,訂單還在增加怎么辦?工廠只能臨時擴招一批工人來應(yīng)對生產(chǎn)高峰,而這批工人高峰結(jié)束后是要清退的,所以稱為臨時工。當(dāng)時臨時工也以招滿后(受限于工位限制,臨時工數(shù)量有上限),后面的訂單只能忍痛拒絕了。
和線程池的映射如下:
工廠——線程池
訂單——任務(wù)(Runnable)
正式工人——核心線程
臨時工——普通線程
倉庫——任務(wù)隊列
調(diào)度員——getTask()
getTask()是一個方法,將任務(wù)隊列中的任務(wù)調(diào)度給空閑線程,源碼分析再去了解。
映射后,形成線程池流程圖如下:
了解了線程池設(shè)計思路,我們可以總結(jié)一下線程池的工作機制:
在線程池的編程模式下,任務(wù)是提交給整個線程池,而不是直接提交給某個線程,線程池在拿到任務(wù)后, 在內(nèi)部尋找是否有空閑的線程 ,如果有,則將任務(wù)交給某個空閑的線程。如果不存在空閑線程,即線程池中的線程數(shù)大于核心線程 corePoolSize ,則將任務(wù)添加到任務(wù)隊列中 workQueue ,如果任務(wù)隊列有界且滿了之后則會判斷線程池中的線程數(shù)是否大于最大線程數(shù) maximumPoolSize ,如果小于則會創(chuàng)建新的線程來執(zhí)行任務(wù),否則在沒有空閑線程的情況下就會執(zhí)行決絕策略 handler 。
注意:線程池中剛開始沒有線程,當(dāng)一個任務(wù)提交給線程池后,線程池會創(chuàng)建一個新線程來執(zhí)行任務(wù)。一個線程同時只能執(zhí)行一個任務(wù),但可以同時向一個線程池提交多個任務(wù)。
線程池的真正實現(xiàn)類是 ThreadPoolExecutor ,類的集成關(guān)系如下:
ThreadPoolExecutor的構(gòu)造方法有幾個,掌握最主要的即可,其中包含 7 個參數(shù):
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize(必需),線程池中的核心線程數(shù)。
當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于 corePoolSize。
如果當(dāng)前線程數(shù)小于 corePoolSize,此時存在 空閑線程 ,提交的任務(wù)會創(chuàng)建一個新線程來執(zhí)行該任務(wù)。
如果當(dāng)前線程數(shù)等于 corePoolSize,則繼續(xù)提交的任務(wù)被保存到阻塞隊列中,等待被執(zhí)行。
如果執(zhí)行了線程池 prestartAllCoreThreads() 方法,線程池會提前創(chuàng)建并啟動所有核心線程。
maximumPoolSize(必需),線程池中允許的最大線程數(shù)。
當(dāng)隊列滿了,且 已創(chuàng)建的線程數(shù)小于 maximumPoolSize ,則線程池會創(chuàng)建新的線程來執(zhí)行任務(wù)。另外,對于無界隊列,可忽略該參數(shù)。
keepAliveTime(必需),線程存活保持時間。
當(dāng)線程沒有任務(wù)執(zhí)行時,繼續(xù)存活的時間。默認(rèn)情況下,該參數(shù)只在線程數(shù)大于 corePoolSize 時才有用,即當(dāng)非核心線程處于空閑狀態(tài)的時間超過這個時間后,該線程將被回收。將 allowCoreThreadTimeOut 參數(shù)設(shè)置為 true 后,核心線程也會被回收。
unit(必需),keepAliveTime 的時間單位。
workQueue(必需),任務(wù)隊列。
用于保存等待執(zhí)行的任務(wù)的阻塞隊列。workQueue 必須是 BlockingQueue 阻塞隊列。當(dāng)線程池中的線程數(shù)超過它的 corePoolSize 的時候,線程會進(jìn)入阻塞隊列進(jìn)行阻塞等待。
一般來說,我們應(yīng)該盡量使用有界隊列,因為使用無界隊列作為工作隊列會對線程池帶來如下影響。
當(dāng)線程池中的線程數(shù)達(dá)到 corePoolSize 后,新任務(wù)將在無界隊列中等待,因此線程池中的線程數(shù)不會超過 corePoolSize。
由于 1,使用無界隊列時 maximumPoolSize 將是一個無效參數(shù)。
由于 1 和 2,使用無界隊列時 keepAliveTime 將是一個無效參數(shù)。
更重要的,使用無界 queue 可能會耗盡系統(tǒng)資源,有界隊列則有助于防止資源耗盡,同時即使使用有界隊列,也要盡量控制隊列的大小在一個合適的范圍。一般使用, ArrayBlockingQueue 、 LinkedBlockingQueue 、 SynchronousQueue 、 PriorityBlockingQueue 等。
threadFactory(可選),創(chuàng)建線程的工廠。
通過自定義的線程工廠可以給每個新建的線程設(shè)置一個具有識別度的 線程名 ,threadFactory 創(chuàng)建的線程也是采用 new Thread() 方式,threadFactory 創(chuàng)建的線程名都具有統(tǒng)一的風(fēng)格: pool-m-thread-n (m 為線程池的編號,n 為線程池內(nèi)的線程編號)。
handler(可選),線程飽和策略。
當(dāng)阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了 四種策略:
AbortPolicy,直接拋出異常,默認(rèn)策略。
CallerRunsPolicy,用調(diào)用者所在的線程來執(zhí)行任務(wù)。
DiscardOldestPolicy,丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)。
DiscardPolicy,直接丟棄任務(wù)。
當(dāng)然也可以根據(jù)應(yīng)用場景實現(xiàn) RejectedExecutionHandler 接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。
ThreadPoolExecutor 使用 int 的高 3 位來表示線程池狀態(tài),低 29 位表示線程數(shù)量:
源碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;//29 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//約5億 // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
至于為什么這么設(shè)計,我覺得主要原因是為了避免額外的開銷,如果使用 2 個變量來分別表示狀態(tài)和線程數(shù)量,為了保證原子性必須進(jìn)行額外的加鎖操作,而 ctl 則通過原子類就解決了該問題,在通過位運算就能得到狀態(tài)和線程數(shù)量。
可以使用兩個方法向線程池提交任務(wù),分別為 execute() 和 submit() 方法。
execute(),用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功。
submit(),用于提交需要返回值的任務(wù)。線程池會返回一個 future 類型的對象,通過這個 future 對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過 future 的 get() 方法來獲取返回值, get() 方法會阻塞當(dāng)前線程直到任務(wù)完成,而使用 get(long timeout,TimeUnit unit) 方法則會阻塞當(dāng)前線程一段時間后立即返回,這 時候有可能任務(wù)沒有執(zhí)行完。
此外, ExecutorService 還提供了兩個提交任務(wù)的方法, invokeAny() 和 invokeAll() 。
invokeAny(),提交所有任務(wù),哪個任務(wù)先成功執(zhí)行完畢,返回此任務(wù)執(zhí)行結(jié)果,其它任務(wù)取消。
invokeAll(),提交所有的任務(wù)且必須全部執(zhí)行完成。
corePoolSize 和 maximumPoolSize
測試核心線程數(shù)為 1 ,最大線程數(shù)為 2,任務(wù)隊列為 1。
@Slf4j(topic = "ayue") public class ThreadExecutorPoolTest1 { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); for (int i = 1; i < 4; i++) { //執(zhí)行任務(wù) executor.execute(new MyTask(i)); } } //任務(wù) static class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { log.debug("線程名稱:{},正在執(zhí)行task:{}", Thread.currentThread().getName(), taskNum); try { //模擬其他操作 Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("task{}執(zhí)行完畢", taskNum); } } }
輸出:
<code data-type="codeline">11:07:04.377 [pool-1-thread-2] DEBUG ayue - 線程名稱:pool-1-thread-2,正在執(zhí)行task:3</code><code data-type="codeline">11:07:04.377 [pool-1-thread-1] DEBUG ayue - 線程名稱:pool-1-thread-1,正在執(zhí)行task:1</code><code data-type="codeline">11:07:05.384 [pool-1-thread-2] DEBUG ayue - task3執(zhí)行完畢</code><code data-type="codeline">11:07:05.384 [pool-1-thread-1] DEBUG ayue - task1執(zhí)行完畢</code><code data-type="codeline">11:07:05.384 [pool-1-thread-2] DEBUG ayue - 線程名稱:pool-1-thread-2,正在執(zhí)行task:2</code><code data-type="codeline">11:07:06.397 [pool-1-thread-2] DEBUG ayue - task2執(zhí)行完畢</code>
當(dāng)有 3 個線程通過線程池執(zhí)行任務(wù)時,由于核心線程只有一個,且任務(wù)隊列為 1,所以當(dāng)?shù)?3 個線程到來的時候, 會重新開啟一個新的線程 pool-1-thread-2 來執(zhí)行任務(wù)。
當(dāng)然,這里可能有人問核心線程會不會大于最大線程?當(dāng)然不會,如果 corePoolSize > maximumPoolSize ,則程序啟動會直接報錯。
任務(wù)隊列是基于阻塞隊列實現(xiàn)的,即采用生產(chǎn)者消費者模式,在 Java 中需要實現(xiàn) BlockingQueue 接口。但 Java 已經(jīng)為我們提供了 7 種阻塞隊列的實現(xiàn):
1.ArrayBlockingQueue:一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。
2.LinkedBlockingQueue: 一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列,在未指明容量時,容量默認(rèn)為 Integer.MAX_VALUE 。
3.PriorityBlockingQueue: 一個支持優(yōu)先級排序的無界阻塞隊列,對元素沒有要求,可以實現(xiàn) Comparable 接口也可以提供 Comparator 來對隊列中的元素進(jìn)行比較。跟時間沒有任何關(guān)系,僅僅是 按照優(yōu)先級取任務(wù) 。
4.DelayQueue:類似于 PriorityBlockingQueue,是二叉堆實現(xiàn)的無界優(yōu)先級阻塞隊列。要求元素都實現(xiàn) Delayed 接口,通過執(zhí)行時延從隊列中提取任務(wù),時間沒到任務(wù)取不出來。
5.SynchronousQueue: 一個不存儲元素的阻塞隊列,消費者線程調(diào)用 take() 方法的時候就會發(fā)生阻塞,直到有一個生產(chǎn)者線程生產(chǎn)了一個元素,消費者線程就可以拿到這個元素并返回;生產(chǎn)者線程調(diào)用 put() 方法的時候也會發(fā)生阻塞,直到有一個消費者線程消費了一個元素,生產(chǎn)者才會返回。
6.LinkedBlockingDeque: 使用雙向隊列實現(xiàn)的有界雙端阻塞隊列。雙端意味著可以像普通隊列一樣 FIFO(先進(jìn)先出),也可以像棧一樣 FILO(先進(jìn)后出)。
7.LinkedTransferQueue: 它是 ConcurrentLinkedQueue、LinkedBlockingQueue 和 SynchronousQueue 的結(jié)合體,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行為一致,但是是無界的阻塞隊列。
線程工廠默認(rèn)創(chuàng)建的線程名: pool-m-thread-n ,在 Executors.defaultThreadFactory() 可以看到:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { //線程名:namePrefix + threadNumber.getAndIncrement() Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
我們也可以通過 ThreadPoolExecutor 自定義線程名:
@Slf4j(topic = "ayue") public class ThreadExecutorPoolTest1 { public static void main(String[] args) { //自增線程id AtomicInteger threadNumber = new AtomicInteger(1); ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "javatv-" + threadNumber.getAndIncrement()); } }); for (int i = 1; i < 4; i++) { executor.execute(new MyTask(i)); } } static class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { log.debug("線程名稱:{},正在執(zhí)行task:{}", Thread.currentThread().getName(), taskNum); try { //模擬其他操作 Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("task{}執(zhí)行完畢", taskNum); } } }
輸出:
<code data-type="codeline">14:08:07.166 [javatv-1] DEBUG ayue - 線程名稱:javatv-1,正在執(zhí)行task:1</code><code data-type="codeline">14:08:07.166 [javatv-2] DEBUG ayue - 線程名稱:javatv-2,正在執(zhí)行task:3</code><code data-type="codeline">14:08:08.170 [javatv-1] DEBUG ayue - task1執(zhí)行完畢</code><code data-type="codeline">14:08:08.170 [javatv-2] DEBUG ayue - task3執(zhí)行完畢</code><code data-type="codeline">14:08:08.170 [javatv-1] DEBUG ayue - 線程名稱:javatv-1,正在執(zhí)行task:2</code><code data-type="codeline">14:08:09.172 [javatv-1] DEBUG ayue - task2執(zhí)行完畢</code>
線程池提供了 四種策略:
1.AbortPolicy,直接拋出異常,默認(rèn)策略。
2.CallerRunsPolicy,用調(diào)用者所在的線程來執(zhí)行任務(wù)。
3.DiscardOldestPolicy,丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)。
4.DiscardPolicy,直接丟棄任務(wù)。、
把上面代碼的循環(huán)次數(shù)改為 4 次,則會拋出 java.util.concurrent.RejectedExecutionException 異常。
for (int i = 1; i < 5; i++) { executor.execute(new MyTask(i)); }
可以通過調(diào)用線程池的 shutdown 或 shutdownNow 方法來關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的 interrupt 方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。但是它們存在一定的區(qū)別, shutdownNow 首先將線程池的狀態(tài)設(shè)置成 STOP ,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而 shutdown 只是將線程池的狀態(tài)設(shè)置成 SHUTDOWN 狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。 簡單來說:
shutdown():線程池狀態(tài)變?yōu)?SHUTDOWN,不會接收新任務(wù),但已提交任務(wù)會執(zhí)行完,不會阻塞調(diào)用線程的執(zhí)行 。
shutdownNow():線程池狀態(tài)變?yōu)?STOP,會接收新任務(wù),會將隊列中的任務(wù)返回,并用 interrupt 的方式中斷正在執(zhí)行的任務(wù)。
只要調(diào)用了這兩個關(guān)閉方法中的任意一個, isShutdown 方法就會返回 true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用 isTerminaed 方法會返回 true。至于應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用 shutdown 方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用 shutdownNow 方法。
Executors,提供了一系列靜態(tài)工廠方法用于創(chuàng)建各種類型的線程池,基于 ThreadPoolExecutor。
1.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
特點:核心線程數(shù)等于最大線程數(shù),因此也無需超時時間,執(zhí)行完立即回收,阻塞隊列是無界的,可以放任意數(shù)量的任務(wù)。
場景:適用于任務(wù)量已知,相對耗時的任務(wù)。
2.newCachedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
可根據(jù)需要創(chuàng)建新線程的線程池,如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中,如果有被使用完但是還沒銷毀的線程,就復(fù)用該線程。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。這種線程池比較靈活, 對于執(zhí)行很多短期異步任務(wù)的程序而言,這些線程池通常可提高程序性能 。
特點:核心線程數(shù)是 0, 最大線程數(shù)是 Integer.MAX_VALUE ,全部都是空閑線程 60s 后回收。
場景:執(zhí)行大量、耗時少的任務(wù)。
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
特點:單線程線程池。希望多個任務(wù)排隊執(zhí)行,線程數(shù)固定為 1,任務(wù)數(shù)多于 1 時,會放入無界隊列排隊,任務(wù)執(zhí)行完畢,這唯一的線程也不會被釋放。
場景:區(qū)別于自己創(chuàng)建一個單線程串行執(zhí)行任務(wù),如果使用 new Thread 任務(wù)執(zhí)行失敗而終止那么沒有任何補救措施,而線程池還會新建一個線程,保證池的正常工作。
4.ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor。它主要用來在給定的延遲之后運行任務(wù),或者定期執(zhí)行任務(wù)。ScheduledThreadPoolExecuto 的功能與 Timer 類似,但 ScheduledThreadPoolExecutor 功能更強大、更靈活。Timer 對應(yīng)的是單個后臺線程,而 ScheduledThreadPoolExecutor 可以在構(gòu)造函數(shù)中指定多個對應(yīng)的后臺線程數(shù)。
特點:核心線程數(shù)量固定,非核心線程數(shù)量無限,執(zhí)行完閑置 10ms 后回收,任務(wù)隊列為延時阻塞隊列。
場景:執(zhí)行定時或周期性的任務(wù)。
需要針對具體情況而具體處理,不同的任務(wù)類別應(yīng)采用不同規(guī)模的線程池,任務(wù)類別可劃分為 CPU 密集型任務(wù)、IO 密集型任務(wù)和混合型任務(wù)。
CPU 密集型任務(wù):線程池中線程個數(shù)應(yīng)盡量少,不應(yīng)大于 CPU 核心數(shù);
IO 密集型任務(wù):由于 IO 操作速度遠(yuǎn)低于 CPU 速度,那么在運行這類任務(wù)時,CPU 絕大多數(shù)時間處于空閑狀態(tài),那么線程池可以配置盡量多些的線程,以提高 CPU 利用率;
混合型任務(wù):可以拆分為 CPU 密集型任務(wù)和 IO 密集型任務(wù),當(dāng)這兩類任務(wù)執(zhí)行時間相差無幾時,通過拆分再執(zhí)行的吞吐率高于串行執(zhí)行的吞吐率,但若這兩類任務(wù)執(zhí)行時間有數(shù)據(jù)級的差距,那么沒有拆分的意義。
如果在系統(tǒng)中大量使用線程池,則有必要對線程池進(jìn)行監(jiān)控,方便在出現(xiàn)問題時,可以根據(jù)線程池的使用狀況快速定位問題。利用線程池提供的參數(shù)進(jìn)行監(jiān)控,參數(shù)如下:
taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。
completedTaskCount:線程池在運行過程中已完成的任務(wù)數(shù)量,小于或等于 taskCount。
largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量,通過這個數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。
getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。
getActiveCount:獲取活動的線程數(shù)。
通過擴展線程池進(jìn)行監(jiān)控:繼承線程池并重寫線程池的 beforeExecute() , afterExecute() 和 terminated() 方法,可以在任務(wù)執(zhí)行前、后和線程池關(guān)閉前自定義行為。如監(jiān)控任務(wù)的平均執(zhí)行時間,最大執(zhí)行時間和最小執(zhí)行時間等。
在使用線程池的時候,我其實有一些問題也隨之而來,比如線程池的線程怎么創(chuàng)建?任務(wù)怎么執(zhí)行?任務(wù)怎么分配?線程執(zhí)行完后怎么辦?是存活還是死亡?什么時候死亡?為什么要使用阻塞隊列等等問題。帶著這些問題,我們?nèi)プx讀源碼,讀源碼怎么入手?通過 ThreadPoolExecutor 的 execute() 方法。submit 底層也是調(diào)用了 execute() 。
public void execute(Runnable command) { //如果沒有任務(wù)直接拋出異常 if (command == null) throw new NullPointerException(); //獲取當(dāng)前線程的狀態(tài)+線程個數(shù) int c = ctl.get(); /** * workerCountOf,線程池當(dāng)前線程數(shù),并判斷是否小于核心線程數(shù) */ if (workerCountOf(c) < corePoolSize) {//如果小于 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 這里是向任務(wù)隊列投放任務(wù)成功,對線程池的運行中狀態(tài)做二次檢查 // 如果線程池二次檢查狀態(tài)是非運行中狀態(tài),則從任務(wù)隊列移除當(dāng)前的任務(wù)調(diào)用拒絕策略處理(也就是移除前面成功入隊的任務(wù)實例) int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); /* 走到下面的else if分支,說明有以下的前提: * 1、待執(zhí)行的任務(wù)已經(jīng)成功加入任務(wù)隊列 * 2、線程池可能是RUNNING狀態(tài) * 3、傳入的任務(wù)可能從任務(wù)隊列中移除失?。ㄒ瞥〉奈ㄒ豢赡芫褪侨蝿?wù)已經(jīng)被執(zhí)行了) * * 如果當(dāng)前工作線程數(shù)量為0,則創(chuàng)建一個非核心線程并且傳入的任務(wù)對象為null - 返回 * 也就是創(chuàng)建的非核心線程不會馬上運行,而是等待獲取任務(wù)隊列的任務(wù)去執(zhí)行 * 如果前工作線程數(shù)量不為0,原來應(yīng)該是最后的else分支,但是可以什么也不做, * 因為任務(wù)已經(jīng)成功入隊列,總會有合適的時機分配其他空閑線程去執(zhí)行它。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* 走到這里說明有以下的前提: * 1、線程池中的工作線程總數(shù)已經(jīng)大于等于corePoolSize(簡單來說就是核心線程已經(jīng)全部懶創(chuàng)建完畢) * 2、線程池可能不是RUNNING狀態(tài) * 3、線程池可能是RUNNING狀態(tài)同時任務(wù)隊列已經(jīng)滿了 * * 如果向任務(wù)隊列投放任務(wù)失敗,則會嘗試創(chuàng)建非核心線程傳入任務(wù)執(zhí)行 * 創(chuàng)建非核心線程失敗,此時需要拒絕執(zhí)行任務(wù) */ else if (!addWorker(command, false)) reject(command); }
第一個 if 判斷線程池當(dāng)前線程數(shù)是否小于核心線程數(shù)。
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
如果小于,則進(jìn)入 addWorker 方法:
private boolean addWorker(Runnable firstTask, boolean core) { retry: //外層循環(huán):判斷線程池狀態(tài) for (;;) { int c = ctl.get(); //獲取線程池狀態(tài) int rs = runStateOf(c); // 檢查線程池的狀態(tài)是否存活. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //內(nèi)層循環(huán):線程池添加核心線程并返回是否添加成功的結(jié)果 for (;;) { //線程數(shù)量 int wc = workerCountOf(c); //線程數(shù)量超過容量,返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS增加線程數(shù)量,若成功跳出外層循環(huán) if (compareAndIncrementWorkerCount(c)) break retry; //否則失敗,并更新c c = ctl.get(); // Re-read ctl //如果這時的線程池狀態(tài)發(fā)生變化,重新對外層循環(huán)進(jìn)行自旋 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //如果CAS成功了,則繼續(xù)往下走 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //創(chuàng)建一個Worker,這個Worker實現(xiàn)了Runable,把它看成一個任務(wù)單元 w = new Worker(firstTask); //這個Thread就是當(dāng)前的任務(wù)單元Worker,即this final Thread t = w.thread; if (t != null) { //加鎖,因為可能有多個線程來調(diào)用 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次檢查線程池的狀態(tài),避免在獲取鎖前調(diào)用shutdown方法 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果t線程已經(jīng)啟動尚未終止,則拋出異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //否則,加入線程池 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //加入線程池后,啟動該線程,上面已經(jīng)設(shè)置為true if (workerAdded) { t.start(); workerStarted = true; } } } finally { //如果線程啟動失敗,則調(diào)用addWorkerFailed,回滾操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker 是 ThreadPoolExecutor 的內(nèi)部類,繼承了 AQS 并且實現(xiàn)了 Runnable。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; //構(gòu)造方法 Worker(Runnable firstTask) { //在調(diào)用runWorker前禁止中斷 //當(dāng)其它線程調(diào)用了線程池的 shutdownNow 時候,如果 worker 狀態(tài) >= 0 則會中斷該線程 //具體方法在 interruptIfStarted() 中可以看到 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } //省略其他代碼... }
可以看到,在 Worker 的構(gòu)造方法可以知道,其中的 thread 屬性就是通過 this 去創(chuàng)建的,所以線程池核心線程的創(chuàng)建主要是 run 方法中的 runWorker 方法:
runWorker 核心線程執(zhí)行邏輯。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 調(diào)用unlock()是為了讓外部可以中斷 w.unlock(); // allow interrupts // 線程退出的原因,true是任務(wù)導(dǎo)致,false是線程正常退出 boolean completedAbruptly = true; try { // 1. 如果firstTask不為null,則執(zhí)行firstTask // 2. 如果firstTask為null,則調(diào)用getTask()從隊列獲取任務(wù) // 3. 阻塞隊列的特性就是:當(dāng)隊列為空時,當(dāng)前線程會被阻塞等待 while (task != null || (task = getTask()) != null) { w.lock(); // 判斷線程池的狀態(tài),如果線程池正在停止,則對當(dāng)前線程進(jìn)行中斷操作 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();//中斷 try { //該方法里面沒有內(nèi)容,可以自己擴展實現(xiàn),比如上面提到的線程池的監(jiān)控 beforeExecute(wt, task); Throwable thrown = null; try { //執(zhí)行具體的任務(wù) task.run(); } catch (RuntimeException x) {//線程異常后操作 thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //同 beforeExecute() afterExecute(task, thrown); } } finally { task = null;//help gc //統(tǒng)計當(dāng)前worker完成了多少個任務(wù) w.completedTasks++; //釋放鎖 w.unlock(); } } completedAbruptly = false; } finally { // 處理線程退出,completedAbruptly為true說明由于任務(wù)異常導(dǎo)致線程非正常退出 processWorkerExit(w, completedAbruptly); } }
而對于其中的 getTask() 方法,任務(wù)隊列中的任務(wù)調(diào)度給空閑線程,該方法是非常重要的,為什么重要?其中就涉及到面試官常問的 線程池如何保證核心線程不會被銷毀,而空閑線程會被銷毀?
private Runnable getTask() { //判斷最新一次的poll是否超時 //poll:取走BlockingQueue里排在首位的對象 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 條件1:線程池狀態(tài)SHUTDOWN、STOP、TERMINATED狀態(tài) * 條件2:線程池STOP、TERMINATED狀態(tài)或workQueue為空 * 條件1與條件2同時為true,則workerCount-1,并且返回null * 注:條件2是考慮到SHUTDOWN狀態(tài)的線程池不會接受任務(wù),但仍會處理任務(wù)(前面也講到了) */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /* * 該屬性的作用是判斷當(dāng)前線程是否允許超時: * 1.allowCoreThreadTimeOut * 如果為 false(默認(rèn)),核心線程即使在空閑時也保持活動狀態(tài)。 * 如果為 true,則核心線程使用 keepAliveTime 超時等待工作。 * 2.wc > corePoolSize * 當(dāng)前線程是否已經(jīng)超過核心線程數(shù)量。 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * 判斷當(dāng)前線程是否可以退出: * 1.wc > maximumPoolSize || (timed && timedOut) * wc > maximumPoolSize = true,說明當(dāng)前的工作線程總數(shù)大于線程池最大線程數(shù)。 * timed && timedOut = true,說明當(dāng)前線程允許超時并且已經(jīng)超時。 * 2.wc > 1 || workQueue.isEmpty() * 工作線程總數(shù)大于1或者任務(wù)隊列為空,則通過CAS把線程數(shù)減去1,同時返回null */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* * 1.poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象, * 如果在指定時間內(nèi),隊列一旦有數(shù)據(jù)可取,則立即返回隊列中的數(shù)據(jù)。否則直到時間超時還沒有數(shù)據(jù)可取,返回失敗。 * * 2.take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入。 * * * 如果timed為true,通過poll()方法做超時拉取,keepAliveTime時間內(nèi)沒有等待到有效的任務(wù),則返回null。 * * 如果timed為false,通過take()做阻塞拉取,會阻塞到有下一個有效的任務(wù)時候再返回(一般不會是null)。 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //通過poll()方法從任務(wù)隊列中拉取任務(wù)為null timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
① 對于 getTask() 下面的這段代碼,這段邏輯大多數(shù)情況下是針對非核心線程:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
② 我們這樣來閱讀這段代碼,當(dāng)工作線程數(shù)大于核心線程 corePoolSize ,此時進(jìn)入 execute() 方法中的第二個 if 語句:
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
此時線程池總數(shù)已經(jīng)超過了 corePoolSize 但小于 maximumPoolSize ,當(dāng)任務(wù)隊列已經(jīng)滿了的時候,會通過 addWorker(task,false) 添加非核心線程。
而在高并發(fā)的情況下,肯定會產(chǎn)生多余的線程,也就是出現(xiàn) ① 中的情況 wc > maximumPoolSize ,而這些多余的線程怎么辦,是不是會被回收?如果 workQueue.poll 沒有獲取到有效的任務(wù),那么①中的邏輯剛好與 addWorker(task,false) 相反,通過 CAS 減少非核心線程,使得工作線程總數(shù)趨向于 corePoolSize 。
如果對于非核心線程,上一輪循環(huán)獲取任務(wù)對象為 null ,在默認(rèn)情況下 allowCoreThreadTimeOut = false ,因此, getTask() 中 timed = true ,如果沒有獲取到任務(wù),此時 timedOut = true ,這一輪循環(huán)很容易滿足 timed && timedOut 為 true,這個時候 getTask() 返回 null 會導(dǎo)致 Worker#runWorker() 方法跳出死循環(huán),之后執(zhí)行 processWorkerExit() 方法處理后續(xù)工作,而該非核心線程對應(yīng)的 Worker 則變成 游離對象 ,等待被 JVM 回收。
當(dāng) allowCoreThreadTimeOut 設(shè)置為 true 的時候,這里分析的非核心線程的生命周期終結(jié)邏輯同時會適用于核心線程。
由此推出一個面試題: 線程池有多個線程同時沒取到任務(wù),會全部回收嗎?
舉個例子:線程池核心線程數(shù)是 5,最大線程數(shù)為 5,當(dāng)前工作線程數(shù)為 6(6>5,意味著當(dāng)前可以觸發(fā)線程回收),如果此時有 3 個線程同時超時沒有獲取到任務(wù),這 3 個線程會都被回收銷毀嗎?
思路:這道題的核心點在于有多個線程同時超時獲取不到任務(wù)。正常情況下,此時會觸發(fā)線程回收的流程。但是我們知道,正常不設(shè)置 allowCoreThreadTimeOut 變量時,線程池即使沒有任務(wù)處理,也會保持核心線程數(shù)的線程。如果這邊 3 個線程被全部回收,那此時線程數(shù)就變成了 3 個,不符合核心線程數(shù) 5 個,所以這邊我們可以首先得出答案:不會被全部回收。這個時候面試官肯定會問為什么?
根據(jù)答案不難推測,為了防止本題的這種并發(fā)回收問題的出現(xiàn),線程回收的流程必然會有并發(fā)控制。compareAndDecrementWorkerCount(c) 用的是 CAS 方法,如果 CAS 失敗就 continue,進(jìn)入下一輪循環(huán),重新判斷。
像上述例子,其中一條線程會 CAS 失敗,然后重新進(jìn)入循環(huán),發(fā)現(xiàn)工作線程數(shù)已經(jīng)只有 5 了, timed = false , 這條線程就不會被銷毀,可以一直阻塞了,此時就會調(diào)用 workQueue.take() 阻塞等待下一次的任務(wù),也就是說核心線程并不會死亡。
從這里也可以看出,雖然有核心線程數(shù),但線程并沒有區(qū)分是核心還是非核心,并不是先創(chuàng)建的就是核心,超過核心線程數(shù)后創(chuàng)建的就是非核心,最終保留哪些線程,完全隨機。
然后可以回答出前面的問題,線程池如何保證核心線程不會被銷毀,而空閑線程會被銷毀?
核心線程是因為調(diào)用了阻塞方法而不會被銷毀,空閑線程調(diào)用了超時方法在下次執(zhí)行時獲取不到任務(wù)而死亡。
這樣回答其實是可以的,但是這可能顯示出你是背得八股文,所以你應(yīng)該回答核心線程不僅僅是因為調(diào)用了阻塞方法而不會被銷毀,同時利用了 CAS 來保證。
還可以得出 getTask() 返回 null 的情況 :
1.線程池的狀態(tài)已經(jīng)為 STOP,TIDYING, TERMINATED,或者是 SHUTDOWN 且工作隊列為空。
2.工作線程數(shù)大于最大線程數(shù)或當(dāng)前工作線程已超時,且,其存在工作線程或任務(wù)隊列為空。
runWorker 的流程:
在 runWorker 的 finally 塊中,當(dāng)任務(wù)執(zhí)行之后,要對其做處理,作線程在執(zhí)行完 processWorkerExit() 方法才算真正的終結(jié),該方法如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 因為拋出用戶異常導(dǎo)致線程終結(jié),直接使工作線程數(shù)減1即可 // 如果沒有任何異常拋出的情況下是通過getTask()返回null引導(dǎo)線程正常跳出runWorker()方法的while死循環(huán)從而正常終結(jié),這種情況下,在getTask()中已經(jīng)把線程數(shù)減1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 全局的已完成任務(wù)記錄數(shù)加上此將要終結(jié)的Worker中的已完成任務(wù)數(shù) completedTaskCount += w.completedTasks; // 工作線程集合中移除此將要終結(jié)的Worker workers.remove(w); } finally { mainLock.unlock(); } // 見下一小節(jié)分析,用于根據(jù)當(dāng)前線程池的狀態(tài)判斷是否需要進(jìn)行線程池terminate處理 tryTerminate(); int c = ctl.get(); // 如果線程池的狀態(tài)小于STOP,也就是處于RUNNING或者SHUTDOWN狀態(tài)的前提下: // 1.如果線程不是由于拋出用戶異常終結(jié),如果允許核心線程超時,則保持線程池中至少存在一個工作線程 // 2.如果線程由于拋出用戶異常終結(jié),或者當(dāng)前工作線程數(shù),那么直接添加一個新的非核心線程 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { // 如果允許核心線程超時,最小值為0,否則為corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果最小值為0,同時任務(wù)隊列不空,則更新最小值為1 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 工作線程數(shù)大于等于最小值,直接返回不新增非核心線程 if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
代碼的后面部分區(qū)域,會判斷線程池的狀態(tài),如果線程池是 RUNNING 或者 SHUTDOWN 狀態(tài)的前提下,如果當(dāng)前的工作線程由于拋出異常被終結(jié),那么會新創(chuàng)建一個非核心線程。如果當(dāng)前的工作線程并不是拋出用戶異常被終結(jié)(正常情況下的終結(jié)),那么會這樣處理:
allowCoreThreadTimeOut 為 true,也就是允許核心線程超時的前提下,如果任務(wù)隊列空,則會通過創(chuàng)建一個非核心線程保持線程池中至少有一個工作線程。
allowCoreThreadTimeOut 為 false,如果工作線程總數(shù)大于 corePoolSize 則直接返回,否則創(chuàng)建一個非核心線程,也就是會趨向于保持線程池中的工作線程數(shù)量趨向于 corePoolSize 。
processWorkerExit() 執(zhí)行完畢之后,意味著該工作線程的生命周期已經(jīng)完結(jié)。
以上是“Java線程池的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。