溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

什么是ThreadPoolExecutor

發(fā)布時間:2021-10-12 11:46:27 來源:億速云 閱讀:146 作者:iii 欄目:編程語言

本篇內(nèi)容介紹了“什么是ThreadPoolExecutor”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

    ThreadPoolExecutor是一個通過使用可能幾個池線程之一來執(zhí)行每個提交任務(wù)的ExecutorService,這些線程池通常通過Executors工廠方法進(jìn)行配置。

        ThreadPoolExecutor中的線程池處理了兩個不同的問題:

        1、由于減少了每個任務(wù)調(diào)用的開銷,在執(zhí)行大量的異步任務(wù)時它們通常提供改進(jìn)的性能;

        2、它們提供了邊界和管理資源的一種手段,包括多線程,在執(zhí)行任務(wù)集合時的消耗。

        每個ThreadPoolExecutor還維護(hù)一些基本的統(tǒng)計數(shù)據(jù),例如完成任務(wù)的數(shù)量。

ThreadPoolExecutor中的重要成員變量

AtomicInteger ctl

        AtomicInteger類型的ctl代表了ThreadPoolExecutor中的控制狀態(tài),它是一個復(fù)核類型的成員變量,是一個原子整數(shù),借助高低位包裝了兩個概念:

        (1)workerCount:線程池中當(dāng)前活動的線程數(shù)量,占據(jù)ctl的低29位;

        (2)runState:線程池運行狀態(tài),占據(jù)ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五種狀態(tài)。

//COUNT_BITS分割32位二進(jìn)制偏移量,Integer.SIZE即Integer類型長度(32),COUNT_BITS=29,高3位保存線程池的狀態(tài),低29位用來計量對象池中工作線程數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

        AtomicInteger ctl的定義如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//異或運算符  100100|111=100111
private static int ctlOf(int rs, int wc) { return rs | wc; }

線程池理想的最大工作線程數(shù)(上限):

//(1<<COUNT_BITS)-1=0x20000000-1=0x1fffffff
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

獲取線程池當(dāng)前的工作線程數(shù):

//通過(與)運算符,示例 11001&1111=1001,CAPACITY=0x1fffffff,所以就是ctl的值(&)CAPACITY就是只獲取ctl低29位的值就是當(dāng)前線程池的工作線程數(shù)
private static int workerCountOf(int c)  { return c & CAPACITY; }
基本變量
//用以下文中workers集合操作的鎖
private final ReentrantLock mainLock = new ReentrantLock();

//用于保存任務(wù)并傳遞給工作線程的隊列
private final BlockingQueue<Runnable> workQueue;
/**
 * Set containing all worker threads in pool. Accessed only when
 * holding mainLock.
 * 保存線程池中所有工作線程的集合,僅在獲取mainLock鎖權(quán)限時可操作
 */
private final HashSet<Worker> workers = new HashSet<Worker>();

/**
 * Wait condition to support awaitTermination
   創(chuàng)建線程池的線程通過調(diào)用線程池引用.awaitTermination方法中通過termination實現(xiàn)持有鎖后釋放鎖掛起等待工作線程tryTerminate操作成功喚醒,或者超時自動喚醒中斷失敗。
 */
private final Condition termination = mainLock.newCondition();

/**
 * Tracks largest attained pool size. Accessed only under
 * mainLock.
   獲取線程池工作集合歷史最大容量,需獲得鎖
 */
private int largestPoolSize;

/**
 * Counter for completed tasks. Updated only on termination of
 * worker threads. Accessed only under mainLock.
   池完成任務(wù)數(shù),在processWorkerExit函數(shù)持鎖增量更新
 */
private long completedTaskCount;

//用以持鎖任務(wù)創(chuàng)建worker時創(chuàng)建線程的工廠類
private volatile ThreadFactory threadFactory;

/**
 * Handler called when saturated or shutdown in execute.
 * 在線程非RUNNING狀態(tài)或者池容量和隊列容器容量滿載時拒絕處理對象
 */
private volatile RejectedExecutionHandler handler;

 /**
 * Timeout in nanoseconds for idle threads waiting for work.
 * Threads use this timeout when there are more than corePoolSize
 * present or if allowCoreThreadTimeOut. Otherwise they wait
 * forever for new work.
 * 空閑線程等待工作的超時時間(以納秒為單位)。 
 * 當(dāng)超過corePoolSize工作線程書或allowCoreThreadTimeOut為true時,線程將使用此超時。 
 * 否則,他們將永遠(yuǎn)等待新的工作
 */
private volatile long keepAliveTime;

/**
 * If false (default), core threads stay alive even when idle.
 * If true, core threads use keepAliveTime to time out waiting
 * for work.
 * 如果為false(默認(rèn)值為false),則即使處于空閑狀態(tài),核心線程也會保持活動狀態(tài)。 
 * 如果為true,則活躍線程使用keepAliveTime來超時等待工作,達(dá)到閾值就會釋放線程
 */
private volatile boolean allowCoreThreadTimeOut;

/**
 * Core pool size is the minimum number of workers to keep alive
 * (and not allow to time out etc) unless allowCoreThreadTimeOut
 * is set, in which case the minimum is zero.
 * 除非設(shè)置allowCoreThreadTimeOut,否則核心池大小是保持活動狀態(tài)(不允許超時等)的最低數(shù)量,
 * 在這種情況下,最小值為零
 */
private volatile int corePoolSize;

/**
 * Maximum pool size. Note that the actual maximum is internally
 * bounded by CAPACITY.
 * 線程池最大工作線程書數(shù),受CAPACITY約束,最大不會超過CAPACITY
 */
private volatile int maximumPoolSize;

/**
 * The default rejected execution handler.
 * 默認(rèn)拒絕策略處理器
 */
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

線程池狀態(tài)解析:

  1. RUNNING: Accept new tasks and process queued tasks(運行狀態(tài),接受新任務(wù),并處理隊列任務(wù))

// 二進(jìn)制位 -1=0x8000001(取反碼后+1得補(bǔ)碼)=0xfffffffe+1=0xffffffff
// 右移29位后=0xe0000000即 1110 0000 0000 0000 0000 0000 0000 0000
// -536870912
private static final int RUNNING    = -1 << COUNT_BITS;
  1. SHUTDOWN: Don't accept new tasks, but process queued tasks(停止運行狀態(tài),不接受新任務(wù),但處理隊列中任務(wù))

//SHUTDOWN=0
private static final int SHUTDOWN   =  0 << COUNT_BITS;
  1. STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks(中斷線程池工作狀態(tài),不接受新任務(wù),不處理隊列中準(zhǔn)備彈出的任務(wù),但是會執(zhí)行完現(xiàn)有的工作任務(wù)(前提是在修改為STOP前,彈出隊列的任務(wù)已經(jīng)走過線程狀態(tài)判斷,執(zhí)行業(yè)務(wù)方法,若正好彈出準(zhǔn)備判斷線程狀態(tài),STOP扭轉(zhuǎn)成功,當(dāng)前任務(wù)也會被攔截))

// STOP=0x20000000=0010 0000 0000 0000 0000 0000 0000 0000 
// 536870912
private static final int STOP       =  1 << COUNT_BITS;
  1. TIDYING:All tasks have terminated, workerCount is zero,the thread transitioning to state TIDYING.will run the terminated() hook method(任務(wù)處理結(jié)束狀態(tài),workcount=0,線程池運行狀態(tài)修改為TIDYING,并且會執(zhí)行==terminated()==鉤子函數(shù))

// TIDYING=0x40000000=0100 0000 0000 0000 0000 0000 0000 0000 
// 1073741824
private static final int TIDYING    =  2 << COUNT_BITS;
  1. TERMINATED: terminated() has completed() (terminated()執(zhí)行完成后,會修改成TERMINATED狀態(tài))

// TERMINATED=0x60000000=0110 0000 0000 0000 0000 0000 0000 0000 
// 1610612736
private static final int TERMINATED =  3 << COUNT_BITS;

線程池狀態(tài)扭轉(zhuǎn)過程

源碼注釋
    /** The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN 
     *    On invocation of shutdown(), perhaps implicitly in finalize()
         顯式調(diào)用線程池showdown()方法,或者線程池對象不被引用,被GC回收時調(diào)用finalize()函數(shù),finalize()函數(shù)中調(diào)用shutdown()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
          顯式調(diào)用shutdownNow()扭轉(zhuǎn)狀態(tài),修改線程池中workers所有工作線程為中斷狀態(tài),讓接下來隊列彈出的任務(wù)都跳過執(zhí)行任務(wù)
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
          工作線程全部執(zhí)行完成且隊列也是空,則扭轉(zhuǎn)狀態(tài)
     * STOP -> TIDYING
     *    When pool is empty
        當(dāng)沒有任務(wù)時,狀態(tài)扭轉(zhuǎn)
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
        執(zhí)行terminated(),try{terminated()}finally{扭轉(zhuǎn)狀態(tài)}
     */
獲取線程狀態(tài)
// Packing and unpacking ctl
//~CAPACITY 連同符號位反轉(zhuǎn)(即相反數(shù)-1,若不理解百度反碼和補(bǔ)碼)  得  0xe0000000 ,就是取高三位做位與計算
private static int runStateOf(int c)     { return c & ~CAPACITY; }

線程池方法分析

1.執(zhí)行任務(wù)入口解析(流程圖)
 public void execute(Runnable command) {
        //任務(wù)非空校驗
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
         //獲取當(dāng)前線程池計數(shù)器值
        int c = ctl.get();
        //判斷當(dāng)前工作線程池的活動線程數(shù)是否<核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
          	//進(jìn)入addWorker函數(shù),參數(shù)true(標(biāo)識創(chuàng)建核心線程數(shù)工作線程,該函數(shù)中會對該標(biāo)識識別是當(dāng)前工作數(shù)量數(shù)比較核心線程數(shù)還是最大線程數(shù)),檢查是否可創(chuàng)建worker任務(wù)線程
            if (addWorker(command, true))
                return;
           //執(zhí)行此步,意味著進(jìn)入addWorker函數(shù),資源被其他線程爭奪,導(dǎo)致該任務(wù)沒有搶到創(chuàng)建核心工作線程的資源,二次獲取最新活動線程數(shù)
            c = ctl.get();
        }
   			//檢查線程池狀態(tài)是否為RUNNING狀態(tài),并且任務(wù)隊列是否可追加該任務(wù)
        if (isRunning(c) && workQueue.offer(command)) {
            //重新獲取線程池ctl值
            int recheck = ctl.get();
            //檢查當(dāng)前線程池狀態(tài)為非RUNNING狀態(tài),且從隊列容器中回滾該任務(wù)
            if (! isRunning(recheck) && remove(command))
                //拒絕加入任務(wù),實則調(diào)用上文中handlder的rejectedExecution()拋出異常(默認(rèn)AbortPolicy中止策略)
                reject(command);
            //獲取最后一次獲取的計量值,判斷是否工作線程均已完成任務(wù),因為很有可能之前在36行操作之前工作線程數(shù)已達(dá)最大線程數(shù)閾值,但是正好剛加入到隊列中后,線程已全部執(zhí)行完成,且釋放了,所以需要創(chuàng)建一個空任務(wù)的worker線程用以調(diào)用runWorker中從隊列中彈出任務(wù)去執(zhí)行(具體查看getTask())
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
   			//第三步則意味著第二步可能池狀態(tài)非RUNNING,當(dāng)然如果是非RUNNING狀態(tài),在addWorker判斷池狀態(tài)是否可接受新非核心任務(wù)。
   	   //也有可能是隊列滿載,該任務(wù)會插隊嘗試創(chuàng)建非核心工作線程,如果創(chuàng)建失敗,會觸發(fā)拒絕策略異常
        else if (!addWorker(command, false))
            reject(command);
    }
	 //拒絕任務(wù)
    final void reject(Runnable command) {
        //默認(rèn)拒絕策略Handler-AbortPolicy
        handler.rejectedExecution(command, this);
    }		

流程圖:

什么是ThreadPoolExecutor

2.創(chuàng)建任務(wù)線程(addWorker)(流程圖)

/**
 * firstTask:創(chuàng)建任務(wù)工作線程RunWorker-執(zhí)行的第一個任務(wù),也有可能是空任務(wù)(喚醒任務(wù))
 * core:true(核心工作線程),false(非核心工作線程) 對應(yīng)的是比較corePoolSize和maxPoolSize條件。
**/
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
						//STOP、TIDYING、TERMINATED狀態(tài)不接受新任務(wù),所以直接拒絕,創(chuàng)建任務(wù)失敗
            //SHUTDOWN狀態(tài)下,僅允許創(chuàng)建task為null的喚醒任務(wù)(前提隊列中存在任務(wù)),因為隊列中有任務(wù),否則喚醒任務(wù)創(chuàng)建線程無意義不允許創(chuàng)建工作線程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            
            for (;;) {
                int wc = workerCountOf(c);
                //根據(jù)core標(biāo)識,對應(yīng)比較閾值,首先保證不能>=(1>>29)-1,否則不允許創(chuàng)建工作線程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
              	//CAS當(dāng)前值+1替換當(dāng)前值,根絕替換返回值判斷是否替換成功,成功,直接跳出循環(huán)
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //意味著CAS替換失敗,重新取值,判斷最新池狀態(tài)是否還是RUNNING,RUNNNING狀態(tài)則繼續(xù)執(zhí)行該循環(huán)體,嘗試ctl+1操作
                //否則直接跳入外循環(huán),進(jìn)行狀態(tài)判斷是否允許創(chuàng)建任務(wù)線程
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //此次任務(wù)工作的創(chuàng)建標(biāo)記以及對應(yīng)的線程啟動標(biāo)記
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //創(chuàng)建任務(wù)工作線程,查看下文代碼的Worker源碼
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
              //嘗試持有worker集合的權(quán)限獨占鎖
                mainLock.lock();
                try {
                    //如果獲得鎖時,線程池狀態(tài)非RUNNING或SHUTDOWN狀態(tài)TASK不為空,則不允許該任務(wù)工作對象加入集合,也不允許線程啟動
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //檢查線程狀態(tài)是否可啟動
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        //記錄worker集合在某一刻的長度最大數(shù),按照配置來說,也就是同時存活存貨線程數(shù)最大也頂多就是MaxPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //開啟任務(wù)工作對象加入集合成功標(biāo)記
                        workerAdded = true;
                    }
                } finally {
                   //釋放worker集合的權(quán)限獨占鎖,因為可能同一時刻有N個任務(wù)需要創(chuàng)建對象加入workers集合
                    mainLock.unlock();
                }
                if (workerAdded) {
                   //加入工作集合成功,則需要啟動本次工作對象的內(nèi)置線程
                    t.start();
                  //工作對象線程啟動成功標(biāo)記
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
               //添加任務(wù)工作對象失敗,看下文源碼
                addWorkerFailed(w);
        }
        return workerStarted;
}
 //持MainLock鎖,讓workers集合移除添加失敗的任務(wù),以及上文中ctl的cas自增操作回滾,嘗試中止線程,最終釋放鎖
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      if (w != null)
        workers.remove(w);
      //遞歸循環(huán),直至降一操作成功
      decrementWorkerCount();
      //嘗試停止線程池,該處不詳做介紹,在runWorker中會有介紹
      tryTerminate();
    } finally {
      mainLock.unlock();
    }
}
 
  

//ThreadPoolExectutor內(nèi)部類
private final class Worker  extends AbstractQueuedSynchronizer implements Runnable
{
      //該任務(wù)工作對象內(nèi)置線程(用來處理該工作對象中的任務(wù),以及隊列中的任務(wù)),如果是ThreadFactory是異常的,則thread一定是null
      final Thread thread;
      //該任務(wù)工作對象的初始化任務(wù),有可能是NULL(喚醒任務(wù))
      Runnable firstTask;
      //該工作任務(wù)執(zhí)行完成的任務(wù)次數(shù)
      volatile long completedTasks;

      /**
                 * 構(gòu)造New實例的內(nèi)部變量
                 * state默認(rèn)為-1,在runWorker中執(zhí)行到持鎖修改state=1,才可以觸發(fā)線程中斷信號,查看下文interruptIfStarted
                 */
      Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
      }

      /** 將處理邏輯交給ThrearunWorker  */
      public void run() {
        runWorker(this);
      }

      //判斷該任務(wù)工作對象是否有線程持有鎖
      protected boolean isHeldExclusively() {
        return getState() != 0;
      }
      //線程池修改為STOP狀態(tài)時,會對worker集合中的工作對象內(nèi)置線程發(fā)送中斷信號
      void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
          try {
            t.interrupt();
          } catch (SecurityException ignore) {
          }
        }
  }
}
//上面的任務(wù)工作創(chuàng)建失敗后的回滾工作線程數(shù)自增操作
private void decrementWorkerCount() {
  //可以看到是遞歸降一操作,循環(huán)降一操作,直至成功才退出循環(huán)
  do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

流程圖:

什么是ThreadPoolExecutor

3.任務(wù)工作線程執(zhí)行解析(runWorker)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();

    //獲取工作初始化對象時的任務(wù)
    Runnable task = w.firstTask;
    //然后置空,防止重復(fù)執(zhí)行
    w.firstTask = null;
    //本處并不是釋放鎖,只是把默認(rèn)state(-1)修改為0,允許中斷。
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        //如果內(nèi)置任務(wù)是NULL,就會去從隊列中彈出任務(wù)處理,空隊列就會阻塞或者超時阻塞。
        while (task != null || (task = getTask()) != null) {
            w.lock();
            /**
             * 兩次檢查
             * 第一次檢查 如果是>=STOP狀態(tài)
             * 第二次檢查 獲取當(dāng)前線程中斷信號(該靜態(tài)方法會清除中斷信號)且判斷是否>=STOP狀態(tài)
             * 前兩次檢查任一滿足,則繼續(xù)檢查該線程是否中斷,未中斷將中斷該線程
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try { 
                //子實現(xiàn)類執(zhí)行任務(wù)前的鉤子函數(shù)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //執(zhí)行execute傳入的任務(wù),或者execute加入到隊列中的任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                  //子實現(xiàn)類執(zhí)行任務(wù)后的鉤子函數(shù)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //每個工作對象內(nèi)部會記錄worker對應(yīng)的線程處理了多少個任務(wù)(無論任務(wù)內(nèi)部工作是否有異常),但是如果遇到某個任務(wù)拋出異常后,該線程就會釋放
                //比如隊列容量80個,池工作最大工作線程數(shù)是20個,然后隊列滿載的情況下,極有可能每個線程在執(zhí)行初始化的內(nèi)置任務(wù)zhi
                w.completedTasks++;
                w.unlock();
            }
        }
        //如果任務(wù)執(zhí)行過程中出現(xiàn)異常,不會執(zhí)行此步
        completedAbruptly = false;
    } finally {
        //線程釋放后的退出處理工作,會把此次執(zhí)行任務(wù)的結(jié)果和工作對象傳遞給該函數(shù)。
        processWorkerExit(w, completedAbruptly);
    }
}
//用于子類實現(xiàn)類的runWorker的任務(wù)執(zhí)行前置鉤子函數(shù)
protected void beforeExecute(Thread t, Runnable r) { }
//用于子類實現(xiàn)類的runWorker的任務(wù)執(zhí)行后置鉤子函數(shù)
protected void afterExecute(Thread t, Runnable r) { }

private void processWorkerExit(Worker w, boolean completedAbruptly) {
         //未完成標(biāo)記,就先回滾數(shù)量,比如工作任務(wù)執(zhí)行異常,或者開啟核心線程超時配置,指定時間未收到隊列喚醒
        if (completedAbruptly) 
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //持鎖,統(tǒng)計工作對象中完成的次數(shù),累加到線程池的累計變量
            completedTaskCount += w.completedTasks;
            //集合移除該工作對象
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //嘗試中斷,RUNNING狀態(tài)或者SHUTDOWN狀態(tài)下隊列中有任務(wù),該操作無需理會,下文有該方法詳細(xì)介紹
        tryTerminate();

        int c = ctl.get();
        /**
        	* 1.如果是STOP狀態(tài)即值以上的狀態(tài),該操作跳過
          * 2.該步操作主要是該線程處理任務(wù)結(jié)果來判斷,如果是異常退出,直接創(chuàng)建一個空任務(wù)的處理處理線程
          * 3.如果正常線程處理完成釋放的線程,判斷是allowCoreThreadTimeOut是否是true,如果是且隊列是空,則有可能是線程超時未取到任務(wù)而釋放線程的,則所有線程return返回直接釋放,無需創(chuàng)建線程,否則則查看隊列中是否有任務(wù)未處理完(如果有任務(wù)則需要最少一個線程,如果是最后一個線程,需要再創(chuàng)建一個空任務(wù)處理線程,由隊列彈出任務(wù)來自旋處理),如果是allowCoreThreadTimeOut為默認(rèn)值false,判斷是否超過核心線程數(shù)如果超過就直接釋放線程,否則需要再創(chuàng)建一個空任務(wù)的處理線程
          **/
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; 
            }
          //創(chuàng)建一個非核心的空任務(wù)線程用來處理隊列中的任務(wù)
            addWorker(null, false);
        }
    }
final void tryTerminate() {
        //自旋嘗試停止線程池,前提是非RUNNING狀態(tài)或非(SHUTDOWN狀態(tài)下隊列不為空)的情況之一,否則直接跳出循環(huán)
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //如果是活躍線程數(shù)>0,就會從工作者列表中從第一個開始取,直到?jīng)]有中斷的工作線程,然后對該線程發(fā)送中斷信號
            if (workerCountOf(c) != 0) { 
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //在持有鎖之后,則把shutdown或者stopz狀態(tài)嘗試扭轉(zhuǎn)為tidying狀態(tài)
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                         //喚醒嘗試tryTerminate過程中阻塞在condition隊列中的線程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // 繼續(xù)自旋嘗試該次操作
        }
    }

“什么是ThreadPoolExecutor”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI