您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“java線程池源碼分析”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“java線程池源碼分析”吧!
當(dāng)提交一個(gè)任務(wù)時(shí),如果當(dāng)前線程數(shù)小于corePoolSize,就會(huì)創(chuàng)建一個(gè)線程。即使其他有可用的空閑線程。
用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。 可以選擇以下幾個(gè)阻塞隊(duì)列:
1.ArrayBlockingQueue 是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。
2.LinkedBlockingQueue 一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)列。
3.SynchronousQueue 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等上一個(gè)元素被移除之后,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列。
4.PriorityBlockingQueue 一個(gè)具有優(yōu)先級(jí)的無(wú)限阻塞隊(duì)列。
不同的runnableTaskQueue對(duì)線程池運(yùn)行邏輯有很大影響
線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是如果使用了無(wú)界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒(méi)什么效果。
線程執(zhí)行結(jié)束后,保持存活的時(shí)間。 當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。
用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字。
線程池隊(duì)列飽和之后的執(zhí)行策略,默認(rèn)是采用AbortPolicy。JDK提供四種實(shí)現(xiàn)方式:
AbortPolicy:直接拋出異常
CallerRunsPolicy :只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)
DiscardOldestPolicy 丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)
DiscardPolicy : 不處理,丟棄掉
keepalive的時(shí)間單位,可選的單位有天(DAYS),小時(shí)(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
我們來(lái)看看 Executors.newCachedThreadPool() 里面的構(gòu)造:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
corePoolSize 為 0,意味著核心線程數(shù)是 0。
maximumPoolSize 是 Integer.MAX_VALUE ,意味這可以一直往線程池提交任務(wù),不會(huì)執(zhí)行 reject 策略。
keepAliveTime 和 unit 決定了線程的存活時(shí)間是 60s,意味著一個(gè)線程空閑60s后才會(huì)被回收。
reject 策略是默認(rèn)的 AbortPolicy,當(dāng)線程池超出最大限制時(shí)拋出異常。不過(guò)這里 CacheThreadPool 的沒(méi)有最大線程數(shù)限制,所以 reject 策略沒(méi)用。
runnableTaskQueue 是 SynchronousQueue。該隊(duì)列的特點(diǎn)是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)。使用該隊(duì)列是實(shí)現(xiàn) CacheThreadPool 的關(guān)鍵之一。
SynchronousQueue 的詳細(xì)原理參考這里
我們看看 CacheThreadPool 的注釋介紹,大意是說(shuō)當(dāng)有任務(wù)提交進(jìn)來(lái),會(huì)優(yōu)先使用線程池里可用的空閑線程來(lái)執(zhí)行任務(wù),但是如果沒(méi)有可用的線程會(huì)直接創(chuàng)建線程。空閑的線程會(huì)保留 60s,之后才會(huì)被回收。這些特性決定了,當(dāng)需要執(zhí)行很多短時(shí)間的任務(wù)時(shí),CacheThreadPool 的線程復(fù)用率比較高, 會(huì)顯著的提高性能。而且線程60s后會(huì)回收,意味著即使沒(méi)有任務(wù)進(jìn)來(lái),CacheThreadPool 并不會(huì)占用很多資源。
那么問(wèn)題來(lái)了:
CacheThreadPool 如何實(shí)現(xiàn)線程保留60s。
CacheThreadPool 如何實(shí)現(xiàn)線程復(fù)用。
首先我們向線程池提交任務(wù)一般用 execute() 方法,我們就從這里入手:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 1.返回包含線程數(shù)以及線程狀態(tài)Integer類型的數(shù)值 int c = ctl.get(); // 如果工作線程數(shù)小于核心線程數(shù),則創(chuàng)建線程并執(zhí)行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 如果失敗,防止外部已經(jīng)在線程池中加入新任務(wù),重新獲取下 c = ctl.get(); } // 2.只有線程處于RUNNING狀態(tài),才執(zhí)行后半句:置入隊(duì)列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果線程池不是RUNNING狀態(tài),則將剛加入的移除 if (! isRunning(recheck) && remove(command)) reject(command); // 如果之前的線程已經(jīng)被消費(fèi)完,則新建一個(gè)線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); // 沒(méi)有被消費(fèi)完,只將任務(wù)放入隊(duì)列 } // 3.如果task不能加入到隊(duì)列,會(huì)嘗試創(chuàng)建一個(gè)新線程。 else if (!addWorker(command, false)) // 如果創(chuàng)建失敗,走reject流程 reject(command);
第一步比較簡(jiǎn)單,如果當(dāng)前運(yùn)行的線程少于核心線程,調(diào)用 addWorker(),創(chuàng)建一個(gè)線程。但是因?yàn)?CacheThreadPool 的 corePoolSize 是0,所以會(huì)跳過(guò)這步,并不會(huì)創(chuàng)建核心線程。
關(guān)鍵在第二步,首先判斷了線程池是否運(yùn)行狀態(tài),緊接著調(diào)用 workQueue.offer() 往對(duì)列添加 task 。 workQueue 是一個(gè) BlockingQueue ,我們知道 BlockingQueue.offer() 方法是向隊(duì)列插入元素,如果成功返回 true ,如果隊(duì)列沒(méi)有可用空間返回 false 。
CacheThreadPool 用的是 SynchronousQueue ,前面了解過(guò) SynchronousQueue 的特性,添加到 SynchronousQueue 的元素必須被其他線程取出,才能塞入下一個(gè)元素。等會(huì)我們?cè)賮?lái)看看哪里是從 SynchronousQueue 取出元素。
這里當(dāng)任務(wù)入隊(duì)列成功后,再次檢查了線程池狀態(tài),還是運(yùn)行狀態(tài)就繼續(xù)。然后檢查當(dāng)前運(yùn)行線程數(shù)量,如果當(dāng)前沒(méi)有運(yùn)行中的線程,調(diào)用 addWorker() ,第一個(gè)參數(shù)為 null 第二個(gè)參數(shù)是 false ,標(biāo)明了非核心線程。
為什么這里 addWorker() 第一個(gè)方法要用null?帶著這個(gè)疑問(wèn),我們來(lái)看看 addWorker() 方法:
大概翻譯了下
檢查是否可以添加新 worker ,在線程池狀態(tài)和給定的邊界(核心數(shù)或最大數(shù))。
如果可以,則計(jì)數(shù)線程數(shù),并且創(chuàng)建并啟動(dòng)新工作程序,以firstTask作為其運(yùn)行第一項(xiàng)任務(wù)。
如果池已停止或有資格關(guān)閉,則此方法返回false。
如果線程工廠在詢問(wèn)時(shí)無(wú)法創(chuàng)建線程,它也會(huì)返回false。
如果線程創(chuàng)建失敗,則由于線程工廠返回null,或者由于異常(通常是Thread.start()中的OutOfMemoryError),我們干凈地回滾。
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * 使用 corePoolSize 綁定做校驗(yàn)為 true,maximumPoolSize 綁定做校驗(yàn)為 false, * @param core if true use corePoolSize as bound, else maximumPoolSize. * * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { // continue retry 快速推多層循環(huán)嵌套 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 當(dāng)前線程數(shù)量+1 if (compareAndIncrementWorkerCount(c)) break retry; // 獲取當(dāng)前線程數(shù) c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 創(chuàng)建線程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加鎖。持有主鎖防止干擾。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // 將任務(wù)包裝成 worker 對(duì)象,用線程安全的方式添加到當(dāng)前工作 HashSet()里 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 線程 start 并執(zhí)行 run方法處理 runWorker() 執(zhí)行 task t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 創(chuàng)建失敗減去線程數(shù) addWorkerFailed(w); } return workerStarted; }
源代碼比較長(zhǎng),這里省略了一部分。過(guò)程主要分成兩步, 第一步是一段 cas 代碼通過(guò)雙重循環(huán)檢查狀態(tài)并為當(dāng)前線程數(shù)擴(kuò)容 +1, 第二部是將任務(wù)包裝成 worker 對(duì)象,用線程安全的方式添加到當(dāng)前工作 HashSet() 里,并開(kāi)始執(zhí)行線程。 終于讀到線程開(kāi)始執(zhí)行的地方了,里程碑式的勝利啊同志們!
但是我們注意到,task 為 null ,Worker 里面的 firstTask 是 null ,那么 wokrer thread 里面是怎么工作下去的呢?
繼續(xù)跟蹤代碼,Worker 類繼承 Runnable 接口,因此 worker thread start 后,走的是 worker.run()方法:
public void run() { runWorker(this); }
繼續(xù)進(jìn)入 runWorker() 方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 獲取task Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // getTask() 獲取任務(wù) while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 退出自旋,進(jìn)入finally代碼塊。調(diào)用processWorkerExit方法,注銷當(dāng)前Worker,實(shí)現(xiàn)worker的銷毀 processWorkerExit(w, completedAbruptly); } }
可以看到這里判斷了 firstTask 如果為空,就調(diào)用 getTask() 方法。getTask() 方法是從 workQueue 拉取任務(wù)。 所以到這里之前的疑問(wèn)就解決了,調(diào)用 addWorker(null,false) 的目的是啟動(dòng)一個(gè)線程,然后再 workQueue 拉取任務(wù)執(zhí)行。
繼續(xù)跟蹤 getTask() 方法:
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 當(dāng)allowCoreThreadTimeout(運(yùn)行空閑核心線程超時(shí)) // 或 wc>corePoolSize(當(dāng)前線程數(shù)量大于核心線程數(shù)量) 時(shí),timed會(huì)標(biāo)識(shí)為true,表示需要進(jìn)行超時(shí)判斷。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 當(dāng)wc(當(dāng)前工作者數(shù)量)大于 最大線程數(shù) 或 空閑線程的空閑時(shí)間大于keepAliveTime(timed && timeout), // 以及wc>1或(workQueue)任務(wù)隊(duì)列為空時(shí),會(huì)進(jìn)入compareAndDecrementWorkerCount方法,對(duì)wc的值減1。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 當(dāng)compareAndDecrementWorkerCount方法返回true時(shí),則getTask方法會(huì)返回null,終止getTask方法的自旋。 // 這時(shí)候回到runWorker方法,就會(huì)進(jìn)入到processWorkerExit方法,進(jìn)行銷毀worker。 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // timed 為 true 時(shí),進(jìn)行poll處理,超時(shí)后線程就會(huì)會(huì)被回收 Runnable r = timed ? // poll(time):取走BlockingQueue里排在首位的對(duì)象, // 若不能立即取出,則可以等time參數(shù)規(guī)定的時(shí)間,取不到時(shí)返回null workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // take():取走BlockingQueue里排在首位的對(duì)象, // 若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到Blocking有新的對(duì)象被加入為止 workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
終于看到從 workQueue 拉取元素了。
CacheThreadPool 構(gòu)造的時(shí)候 corePoolSize 是 0,allowCoreThreadTimeOut 默認(rèn)是 false ,因此 timed 一直為 true ,會(huì)調(diào)用 workQueue.poll() 從隊(duì)列拉取一個(gè)任務(wù),等待 60s, 60s后超時(shí),線程就會(huì)會(huì)被回收。
如果 60s 內(nèi),進(jìn)來(lái)一個(gè)任務(wù),會(huì)發(fā)生什么情況?任務(wù)在 execute() 方法里,會(huì)被 offer() 進(jìn) workQueue ,因?yàn)槟壳瓣?duì)列是空的,所以 offer 進(jìn)來(lái)后,馬上會(huì)被阻塞的 worker.poll() 拉取出來(lái),然后在 runWorker() 方法里執(zhí)行,因?yàn)榫€程沒(méi)有新建所以達(dá)到了線程的復(fù)用。
至此,我們已經(jīng)明白了線程復(fù)用的秘密,以及線程保留 60s 的實(shí)現(xiàn)方法?;氐?execute() 方法,還有剩下一個(gè)邏輯 如果task不能加入到隊(duì)列,會(huì)嘗試創(chuàng)建線程。如果創(chuàng)建失敗,走reject流程
else if (!addWorker(command, false)) reject(command);
因?yàn)?CacheThreadPool 用的 SynchronousQueue ,所以沒(méi)有空閑線程, SynchronousQueue 有一個(gè)元素正在被阻塞,那么就不能加入到隊(duì)列里。會(huì)走到 addWorker(commond,false) 這里,這個(gè)時(shí)候因?yàn)榫蜁?huì)新建線程來(lái)執(zhí)行任務(wù)。如果 addWorker() 返回 false 才會(huì)走 reject 策略。
那么什么時(shí)候 addWorker() 什么時(shí)候會(huì)返回false呢?我們看代碼:
private boolean addWorker(Runnable firstTask, boolean core){ retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 1.線程池已經(jīng)shutdown,或者提交進(jìn)來(lái)task為ull且隊(duì)列也是空,返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 2.如果需要?jiǎng)?chuàng)建核心線程但是當(dāng)前線程已經(jīng)大于corePoolSize 返回false, // 如果是非核心線程但是已經(jīng)超出maximumPoolSize,返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; //省略代碼。。。 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //省略代碼。。。 } } } //省略代碼。。。 }
addWorker() 有以下情況會(huì)返回 false :
線程池已經(jīng) shutdown,或者提交進(jìn)來(lái) task 為ull且同時(shí)任務(wù)隊(duì)列也是空,返回 false。
如果需要?jiǎng)?chuàng)建核心線程但是當(dāng)前線程已經(jīng)大于 corePoolSize 返回 false,
如果是非核心線程但是已經(jīng)超出 maximumPoolSize ,返回 false。
創(chuàng)建線程后,檢查是否已經(jīng)啟動(dòng)。
我們逐條檢查。 第一點(diǎn)只有線程池被 shutDown() 才會(huì)出現(xiàn)。 第二點(diǎn)由于 CacheThreadPool 的 corePoolSize 是 0 , maximumPoolSize 是 Intger.MAX_VALUE ,所以也不會(huì)出現(xiàn)。 第三點(diǎn)是保護(hù)性錯(cuò)誤,我猜因?yàn)榫€程允許通過(guò)外部的 ThreadFactory 創(chuàng)建,所以檢查了一下是否外部已經(jīng) start,如果開(kāi)發(fā)者編碼規(guī)范,一般這種情況也不會(huì)出現(xiàn)。
綜上,在線程池沒(méi)有 shutDown 的情況下,addWorker() 不會(huì)返回 false ,不會(huì)走reject流程,所以理論上 CacheThreadPool 可以一直提交任務(wù),符合CacheThreadPool注釋里的描述。
Executors 還提供了這么一個(gè)方法 Executors.newFixedThreadPool(4) 來(lái)創(chuàng)建一個(gè)有固定線程數(shù)量的線程池,我們看看創(chuàng)建的參數(shù):
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
參數(shù)中核心線程和最大線程一樣,線程保留時(shí)間 0 ,使用 LinkedBlockingQueue 作為任務(wù)隊(duì)列,這樣的線程池有什么樣的特性呢?我們看看注釋說(shuō)明,大意是說(shuō)這是一個(gè)有著固定線程數(shù)量且使用無(wú)界隊(duì)列作為線程隊(duì)列的線程池。如果有新的任務(wù)提交,但是沒(méi)有線程可用,這個(gè)任務(wù)會(huì)一直等待直到有可用的線程。如果一個(gè)線程因?yàn)楫惓=K止了,當(dāng)線程不夠用的時(shí)候會(huì)再創(chuàng)建一個(gè)出來(lái)。線程會(huì)一直保持,直到線程池 shutDown。
和 CacheThreadPool 相比,F(xiàn)ixedThreadPool 注釋里描述的特性有幾個(gè)不同的地方。
因?yàn)?corePoolSize == maximumPoolSize ,所以FixedThreadPool只會(huì)創(chuàng)建核心線程。
在 getTask() 方法,如果隊(duì)列里沒(méi)有任務(wù)可取,線程會(huì)一直阻塞在 LinkedBlockingQueue.take() ,線程不會(huì)被回收。
由于線程不會(huì)被回收,會(huì)一直卡在阻塞,所以沒(méi)有任務(wù)的情況下, FixedThreadPool 占用資源更多。
FixedThreadPool 和 CacheThreadPool 也有相同點(diǎn),都使用無(wú)界隊(duì)列,意味著可用一直向線程池提交任務(wù),不會(huì)觸發(fā) reject 策略。
到此,相信大家對(duì)“java線程池源碼分析”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。