溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么解析ThreadPoolExecutor

發(fā)布時間:2021-11-02 16:20:18 來源:億速云 閱讀:143 作者:iii 欄目:web開發(fā)

這篇文章主要介紹“怎么解析ThreadPoolExecutor”,在日常操作中,相信很多人在怎么解析ThreadPoolExecutor問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么解析ThreadPoolExecutor”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

為什么要用線程池

你有沒有這樣的疑惑,為什么要用線程池呢?可能你會說,我可以復(fù)用已經(jīng)創(chuàng)建的線程呀;線程是個重量級對象,為了避免頻繁創(chuàng)建和銷毀,使用線程池來管理最好了。

沒毛病,各位都很懂哈~

不過使用線程池還有一個重要的點(diǎn):可以控制并發(fā)的數(shù)量。如果并發(fā)數(shù)量太多了,導(dǎo)致消耗的資源增多,直接把服務(wù)器給搞趴下了,肯定也是不行的

繞不過去的幾個參數(shù)

提到 ThreadPoolExecutor 那么你的小腦袋肯定會想到那么幾個參數(shù),咱們來瞅瞅源碼(我就直接放有 7 個參數(shù)的那個方法了):

public ThreadPoolExecutor(int corePoolSize,                             int maximumPoolSize,                             long keepAliveTime,                             TimeUnit unit,                             BlockingQueue<Runnable> workQueue,                             ThreadFactory threadFactory,                             RejectedExecutionHandler handler)

咱們分別來看:

  • corePoolSize :

核心線程數(shù),在線程池中有兩種線程,核心線程和非核心線程。在線程池中的核心線程,就算是它什么都不做,也會一直在線程池中,除非設(shè)置了  allowCoreThreadTimeOut 參數(shù)

  • maximumPoolSize:

線程池能夠創(chuàng)建的最大線程數(shù)。這個值 = 核心線程數(shù) + 非核心線程數(shù)

  • keepAliveTime & unit :

線程池是可以撤銷線程的,那么什么時候撤銷呢?一個線程如果在一段時間內(nèi),都沒有執(zhí)行任務(wù),那說明這個線程很閑啊,那是不是就可以把它撤銷掉了?

所以呢,如果一個線程不是核心線程,而且在 keepAliveTime & unit 這段時間內(nèi),還沒有干活,那么很抱歉,只能請你走人了  核心線程就算是很閑,也不會將它從線程池中清除,沒辦法誰讓它是 core 線程呢~

  • workQueue :

工作隊(duì)列,這個隊(duì)列維護(hù)的是等待執(zhí)行的 Runnable 任務(wù)對象

常用的幾個隊(duì)列:LinkedBlockingQueue , ArrayBlockingQueue , SynchronousQueue ,  DelayQueue

大廠的編碼規(guī)范,相信各位都知道,并不建議使用 Executors ,最重要的一個原因就是:Executors 提供的很多方法默認(rèn)使用的都是無界的  LinkedBlockingQueue ,在高負(fù)載情況下,無界隊(duì)列很容易就導(dǎo)致 OOM ,而 OOM  會讓所有請求都無法處理,所以在使用時,強(qiáng)烈建議使用有界隊(duì)列,因?yàn)槿绻闶褂玫氖怯薪珀?duì)列的話,當(dāng)線程數(shù)量太多時,它會走拒絕策略

  • threadFactory :

創(chuàng)建線程的工廠,用來批量創(chuàng)建線程的。如果不指定的話,就會創(chuàng)建一個默認(rèn)的線程工廠

  • handler :

拒絕處理策略。在 workQueue 那里說了,如果使用的是有界隊(duì)列,那么當(dāng)線程數(shù)量大于最大線程數(shù)的時候,拒絕處理策略就起到作用了

常用的有四種處理策略:

- AbortPolicy :默認(rèn)的拒絕策略,會丟棄任務(wù)并拋出 RejectedExecutionException 異常-  CallerRunsPolicy :提交任務(wù)的線程,自己去執(zhí)行這個任務(wù)- DiscardOldestPolicy :直接丟棄新來的任務(wù),也沒有任何異常拋出-  DiscardOldestPolicy :丟棄最老的任務(wù),然后將新任務(wù)加入到工作隊(duì)列中

默認(rèn)拒絕策略是 AbortPolicy ,會 throw RejectedExecutionException  異常,但是這是一個運(yùn)行時異常,對于運(yùn)行時異常編譯器不會強(qiáng)制 catch 它,所以就會比較容易忽略掉錯誤。

所以,如果線程池處理的任務(wù)非常重要,盡量自定義自己的拒絕策略

線程池的幾個狀態(tài)

在源碼中,能夠清楚地看到線程池有 5 種狀態(tài):

private static final int RUNNING    = -1 << COUNT_BITS; private static final int SHUTDOWN   =  0 << COUNT_BITS; private static final int STOP       =  1 << COUNT_BITS; private static final int TIDYING    =  2 << COUNT_BITS; private static final int TERMINATED =  3 << COUNT_BITS;

同時,使用 AtomicInteger 修飾的變量 ctl 來控制線程池的狀態(tài),而 ctl 保存了 2 個變量:一個是 rs 即 runState  ,線程池的運(yùn)行狀態(tài);一個是 wc 即 workerCount ,線程池中活動線程的數(shù)量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 線程池創(chuàng)建之后就處于 RUNNING 狀態(tài)

  • 調(diào)用 shutdown() 方法之后處于 SHUTDOWN 狀態(tài),此時線程池不再接受新的任務(wù),清除一些空閑 worker ,等待阻塞隊(duì)列的任務(wù)完成

  • 調(diào)用 shutdownNow() 方法后處于 STOP 狀態(tài),此時線程池不再接受新的任務(wù),中斷所有的線程,阻塞隊(duì)列中沒有被執(zhí)行的任務(wù)也會被全部丟棄

  • 當(dāng)線程池中執(zhí)行的任務(wù)為空時,也就是此時 ctl 的值為 0 時,線程池會變?yōu)?TIDYING 狀態(tài),接下來會執(zhí)行 terminated() 方法

  • 執(zhí)行完 terminated() 方法之后,線程池的狀態(tài)就由 TIDYING 轉(zhuǎn)到 TERMINATED 狀態(tài)

懵了?別急,有張圖呢~

怎么解析ThreadPoolExecutor

線程池處理任務(wù)

execute

做到線程復(fù)用,肯定要先 execute 起來吧

線程池處理任務(wù)的核心方法是 execute ,大概思路就是:

  • 如果 command 為 null ,沒啥說的,直接拋出異常就完事兒了

  • 如果當(dāng)前線程數(shù)小于 corePoolSize ,會新建一個核心線程執(zhí)行任務(wù)

  • 如果當(dāng)前線程數(shù)不小于 corePoolSize  ,就會將任務(wù)放到隊(duì)列中等待,如果任務(wù)排隊(duì)成功,仍然需要檢查是否應(yīng)該添加線程,所以需要重新檢查狀態(tài),并且在必要時回滾排隊(duì);如果線程池處于 running  狀態(tài),但是此時沒有線程,就會創(chuàng)建線程

  • 如果沒有辦法給任務(wù)排隊(duì),說明這個時候,緩存隊(duì)列滿了,而且線程數(shù)達(dá)到了 maximumPoolSize  或者是線程池關(guān)閉了,系統(tǒng)沒辦法再響應(yīng)新的請求,此時會執(zhí)行拒絕策略

來瞅瞅源碼具體是如何處理的:

public void execute(Runnable command) {     if (command == null)         throw new NullPointerException();        int c = ctl.get();     // 當(dāng)前線程數(shù)小于 corePoolSize 時,調(diào)用 addWorker 創(chuàng)建核心線程來執(zhí)行任務(wù)     if (workerCountOf(c) < corePoolSize) {         if (addWorker(command, true))             return;         c = ctl.get();     }     // 當(dāng)前線程數(shù)不小于 corePoolSize ,就將任務(wù)添加到 workQueue 中     if (isRunning(c) && workQueue.offer(command)) {      // 獲取到當(dāng)前線程的狀態(tài),賦值給 recheck ,是為了重新檢查狀態(tài)         int recheck = ctl.get();         // 如果 isRunning 返回 false ,那就 remove 掉這個任務(wù),然后執(zhí)行拒絕策略,也就是回滾重新排隊(duì)         if (! isRunning(recheck) && remove(command))             reject(command);         // 線程池處于 running 狀態(tài),但是沒有線程,那就創(chuàng)建線程執(zhí)行任務(wù)         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     // 如果放入 workQueue 失敗,嘗試通過創(chuàng)建非核心線程來執(zhí)行任務(wù)     // 如果還是失敗,說明線程池已經(jīng)關(guān)閉或者已經(jīng)飽和,會拒絕執(zhí)行該任務(wù)     else if (!addWorker(command, false))         reject(command); }

在上面源碼中,判斷了兩次線程池的狀態(tài),為什么要這么做呢?

這是因?yàn)樵诙嗑€程環(huán)境下,線程池的狀態(tài)是時刻發(fā)生變化的,可能剛獲取線程池狀態(tài)之后,這個狀態(tài)就立刻發(fā)生了改變.如果沒有二次檢查的話,線程池處于非  RUNNING 狀態(tài)時, command 就永遠(yuǎn)不會執(zhí)行

有點(diǎn)兒懵?阿粉都懂你,一張圖走起~

怎么解析ThreadPoolExecutor

addWorker

從上面能夠看出來,主要是 addWorker 方法

addWorker 主要是用來創(chuàng)建核心線程的,它主要的實(shí)現(xiàn)邏輯是:

  • 判斷線程數(shù)量有沒有超過規(guī)定的數(shù)量,如果超過了就返回 false

  • 如果沒有超過,就會創(chuàng)建 worker 對象,并初始化一個 Thread 對象,然后啟動這個線程對象

接下來瞅瞅源碼:

private boolean addWorker(Runnable firstTask, boolean core) {     retry:     for (;;) {         int c = ctl.get();         int rs = runStateOf(c);          // Check if queue empty only if necessary.   // 線程池狀態(tài) >= SHUTDOWN 時,不再接受新的任務(wù),直接返回 false   // 如果 rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() 同樣不接受新的任務(wù),返回 false         if (rs >= SHUTDOWN &&             ! (rs == SHUTDOWN &&                 firstTask == null &&                 ! workQueue.isEmpty()))             return false;          for (;;) {             int wc = workerCountOf(c);    // wc >= CAPACITY 說明線程數(shù)不夠,所以就返回 false    // wc >= (core ? corePoolSize : maximumPoolSize) 是在做判斷     // 如果 core 為 true ,說明要創(chuàng)建的線程是核心線程,接下來判斷 wc 是否大于 核心線程數(shù) ,如果大于返回 false     // 如果 core 為 false ,說明要創(chuàng)建的線程是非核心線程,接下來判斷 wc 是否大于 最大線程數(shù) ,如果大于返回 false             if (wc >= CAPACITY ||                 wc >= (core ? corePoolSize : maximumPoolSize))                 return false;    // CAS 操作增加 workerCount 的值,如果成功跳出循環(huán)             if (compareAndIncrementWorkerCount(c))                 break retry;             c = ctl.get();  // Re-read ctl    // 判斷線程池狀態(tài)有沒有變化,如果有變化,則重試             if (runStateOf(c) != rs)                 continue retry;             // else CAS failed due to workerCount change; retry inner loop         }     }   // workerCount 增加成功之后開始走下面的代碼     boolean workerStarted = false;     boolean workerAdded = false;     Worker w = null;     try {   // 創(chuàng)建一個 worker 對象         w = new Worker(firstTask);   // 實(shí)例化一個 Thread 對象         final Thread t = w.thread;         if (t != null) {    // 接下來的操作需要加鎖進(jìn)行             final ReentrantLock mainLock = this.mainLock;             mainLock.lock();             try {                 // Recheck while holding lock.                 // Back out on ThreadFactory failure or if                 // shut down before lock acquired.                 int rs = runStateOf(ctl.get());                  if (rs < SHUTDOWN ||                     (rs == SHUTDOWN && firstTask == null)) {                     if (t.isAlive()) // precheck that t is startable                         throw new IllegalThreadStateException();      // 將任務(wù)線程添加到線程池中                     workers.add(w);                     int s = workers.size();                     if (s > largestPoolSize)                         largestPoolSize = s;                     workerAdded = true;                 }             } finally {                 mainLock.unlock();             }             if (workerAdded) {     // 啟動任務(wù)線程,開始執(zhí)行任務(wù)                 t.start();                 workerStarted = true;             }         }     } finally {         if (! workerStarted)    // 如果任務(wù)線程啟動失敗調(diào)用 addWorkerFailed     // addWorkerFailed 方法里面主要做了兩件事:將該線程從線程池中移除;將 workerCount 的值減 1             addWorkerFailed(w);     }     return workerStarted; }

Worker 類

在 addWorker 中,主要是由 Worker 類去做一些相應(yīng)處理, worker 繼承 AQS ,實(shí)現(xiàn) Runnable  接口

線程池維護(hù)的是 HashSet,一個由 worker 對象組成的 HashSet

private final HashSet<Worker> workers = new HashSet<Worker>();

worker 繼承 AQS 主要是利用 AQS 獨(dú)占鎖機(jī)制,來標(biāo)識線程是否空閑;另外, worker 還實(shí)現(xiàn)了 Runnable  接口,所以它本身就是一個線程任務(wù),在構(gòu)造方法中創(chuàng)建了一個線程,線程的任務(wù)就是自己 this。thread =  getThreadFactory().newThread(this);

咱們瞅瞅里面的源碼:

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */        private static final long serialVersionUID = 6138294804551838833L;         // 處理任務(wù)的線程        final Thread thread;        // worker 傳入的任務(wù)        Runnable firstTask;        /** Per-thread task counter */        volatile long completedTasks;         /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {         // 將 state 設(shè)為 -1 ,避免 worker 在執(zhí)行前被中斷            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;   // 創(chuàng)建一個線程,來執(zhí)行任務(wù)            this.thread = getThreadFactory().newThread(this);        }         /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }         // Lock methods        //        // The value 0 represents the unlocked state.        // The value 1 represents the locked state.         protected boolean isHeldExclusively() {            return getState() != 0;        }         protected boolean tryAcquire(int unused) {            if (compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }         protected boolean tryRelease(int unused) {            setExclusiveOwnerThread(null);            setState(0);            return true;        }         public void lock()        { acquire(1); }        public boolean tryLock()  { return tryAcquire(1); }        public void unlock()      { release(1); }        public boolean isLocked() { return isHeldExclusively(); }         void interruptIfStarted() {            Thread t;            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                }            }        }    }

runWorker

worker 類在執(zhí)行 run 方法時,實(shí)際上調(diào)用的是 runWorker 方法

final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        // 允許中斷        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {         // 判斷 task 是否為空,如果不為空直接執(zhí)行         // 如果 task 為空,調(diào)用 getTask() 方法,從 workQueue 中取出新的 task 執(zhí)行            while (task != null || (task = getTask()) != null) {             // 加鎖,防止被其他線程中斷                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                // 檢查線程池的狀態(tài),如果線程池處于 stop 狀態(tài),則需要中斷當(dāng)前線程                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                 // 執(zhí)行 beforeExecute                     beforeExecute(wt, task);                    Throwable thrown = null;                    try {                     // 執(zhí)行任務(wù)                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                     // 執(zhí)行 afterExecute 方法                        afterExecute(task, thrown);                    }                } finally {                 // 將 task 設(shè)置為 null ,循環(huán)操作                    task = null;                    w.completedTasks++;                    // 釋放鎖                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }

在 runWorker 方法中,首先會去執(zhí)行創(chuàng)建這個 worker 時就有的任務(wù),當(dāng)執(zhí)行完這個任務(wù)之后, worker 并不會被銷毀,而是在 while  循環(huán)中, worker 會不斷的調(diào)用 getTask 方法從阻塞隊(duì)列中獲取任務(wù)然后調(diào)用 task。run()  來執(zhí)行任務(wù),這樣就達(dá)到了復(fù)用線程的目的。通過循環(huán)條件 while (task != null || (task = getTask()) != null)  可以看出,只要 getTask 方法返回值不為 null ,就會一直循環(huán)下去,這個線程也就會一直在執(zhí)行,從而達(dá)到了線程復(fù)用的目的

getTask

咱們來看看 getTask 方法的實(shí)現(xiàn):

private Runnable getTask() {         boolean timedOut = false; // Did the last poll() time out?          for (;;) {             int c = ctl.get();             int rs = runStateOf(c);              // Check if queue empty only if necessary.             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                 decrementWorkerCount();                 return null;             }              int wc = workerCountOf(c);              // Are workers subject to culling?             // allowCoreThreadTimeOut 變量默認(rèn)為 false ,也就是核心線程就算是空閑也不會被銷毀             // 如果為 true ,核心線程在 keepAliveTime 內(nèi)是空閑的,就會被銷毀             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;              // 如果運(yùn)行線程數(shù)大于最大線程數(shù),但是緩存隊(duì)列已經(jīng)空了,此時遞減 worker 數(shù)量             // 如果有設(shè)置允許線程超時或者線程數(shù)量超過了核心線程數(shù)量,并且線程在規(guī)定時間內(nèi)沒有 poll 到任務(wù)并且隊(duì)列為空,此時也遞減 worker 數(shù)量             if ((wc > maximumPoolSize || (timed && timedOut))                 && (wc > 1 || workQueue.isEmpty())) {                 if (compareAndDecrementWorkerCount(c))                     return null;                 continue;             }              try {                 // 如果 timed 為 true ,會調(diào)用 workQueue 的 poll 方法                  // 超時時間為 keepAliveTime ,如果超過 keepAliveTime 時長的話, poll 就會返回 null                   // 如果返回為 null ,在 runWorker 中                   // while (task != null || (task = getTask()) != null) 循環(huán)條件被打破,從而跳出循環(huán),此時線程執(zhí)行完畢                 // 如果 timed 為 false ( allowCoreThreadTimeOut 為 false ,并且 wc > corePoolSize 為 false )                  // 會調(diào)用 workQueue 的 take 方法阻塞到當(dāng)前                  // 當(dāng)隊(duì)列中有任務(wù)加入時,線程被喚醒, take 方法返回任務(wù),開始執(zhí)行                 Runnable r = timed ?                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                     workQueue.take();                 if (r != null)                     return r;                 timedOut = true;             } catch (InterruptedException retry) {                 timedOut = false;             }         }     }

源碼分析到這里就差不多清楚了

線程復(fù)用主要體現(xiàn)在 runWorker 方法中的 while 循環(huán)中,在 while 循環(huán)里面, worker 會不斷的調(diào)用 getTask 方法,而在  getTask 方法里,如果任務(wù)隊(duì)列中沒有了任務(wù),此時如果線程是核心線程則會一直卡在 workQueue。take 方法,這個時候會被阻塞并掛起,不會占用  CPU 資源,直到拿到任務(wù)然后返回 true , 此時 runWorker 中得到這個任務(wù)來繼續(xù)執(zhí)行任務(wù),從而實(shí)現(xiàn)了線程復(fù)用。

到此,關(guān)于“怎么解析ThreadPoolExecutor”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI