溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java線程池是怎么工作的

發(fā)布時(shí)間:2022-01-26 15:27:15 來源:億速云 閱讀:121 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Java線程池是怎么工作的”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

線程池的工作原理

首先我們看下當(dāng)一個(gè)新的任務(wù)提交到線程池之后,線程池是如何處理的

1、線程池判斷核心線程池里的線程是否都在執(zhí)行任務(wù)。如果不是,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則執(zhí)行第二步。

2、線程池判斷工作隊(duì)列是否已經(jīng)滿。如果工作隊(duì)列沒有滿,則將新提交的任務(wù)存儲(chǔ)在這個(gè)工作隊(duì)列里進(jìn)行等待。如果工作隊(duì)列滿了,則執(zhí)行第三步

3、線程池判斷線程池的線程是否都處于工作狀態(tài)。如果沒有,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)。如果已經(jīng)滿了,則交給飽和策略來處理這個(gè)任務(wù)

線程池飽和策略

這里提到了線程池的飽和策略,那我們就簡單介紹下有哪些飽和策略:

AbortPolicy

為Java線程池默認(rèn)的阻塞策略,不執(zhí)行此任務(wù),而且直接拋出一個(gè)運(yùn)行時(shí)異常,切記ThreadPoolExecutor.execute需要try catch,否則程序會(huì)直接退出。

DiscardPolicy

直接拋棄,任務(wù)不執(zhí)行,空方法

DiscardOldestPolicy

從隊(duì)列里面拋棄head的一個(gè)任務(wù),并再次execute 此task。

CallerRunsPolicy

在調(diào)用execute的線程里面執(zhí)行此command,會(huì)阻塞入口

用戶自定義拒絕策略(最常用)

實(shí)現(xiàn)RejectedExecutionHandler,并自己定義策略模式

下我們以ThreadPoolExecutor為例展示下線程池的工作流程圖

Java線程池是怎么工作的
Java線程池是怎么工作的

1、如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。

2、如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。

3、如果無法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則在非corePool中創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。

4、如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步驟的總體設(shè)計(jì)思路,是為了在執(zhí)行execute()方法時(shí),盡可能地避免獲取全局鎖(那將會(huì)是一個(gè)嚴(yán)重的可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。

關(guān)鍵方法源碼分析

我們看看核心方法添加到線程池方法execute的源碼如下:

    //
    //Executes the given task sometime in the future.  The task
    //may execute in a new thread or in an existing pooled thread.
    //
    // If the task cannot be submitted for execution, either because this
    // executor has been shutdown or because its capacity has been reached,
    // the task is handled by the current {@code RejectedExecutionHandler}.
    //
    // @param command the task to execute
    // @throws RejectedExecutionException at discretion of
    //         {@code RejectedExecutionHandler}, if the task
    //         cannot be accepted for execution
    // @throws NullPointerException if {@code command} is null
    //
   public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       //
        // Proceed in 3 steps:
        //
        // 1. If fewer than corePoolSize threads are running, try to
        // start a new thread with the given command as its first
        // task.  The call to addWorker atomically checks runState and
        // workerCount, and so prevents false alarms that would add
        // threads when it shouldn't, by returning false.
        // 翻譯如下:
        // 判斷當(dāng)前的線程數(shù)是否小于corePoolSize如果是,使用入?yún)⑷蝿?wù)通過addWord方法創(chuàng)建一個(gè)新的線程,
        // 如果能完成新線程創(chuàng)建exexute方法結(jié)束,成功提交任務(wù)
        // 2. If a task can be successfully queued, then we still need
        // to double-check whether we should have added a thread
        // (because existing ones died since last checking) or that
        // the pool shut down since entry into this method. So we
        // recheck state and if necessary roll back the enqueuing if
        // stopped, or start a new thread if there are none.
        // 翻譯如下:
        // 在第一步?jīng)]有完成任務(wù)提交;狀態(tài)為運(yùn)行并且能否成功加入任務(wù)到工作隊(duì)列后,再進(jìn)行一次check,如果狀態(tài)
        // 在任務(wù)加入隊(duì)列后變?yōu)榱朔沁\(yùn)行(有可能是在執(zhí)行到這里線程池shutdown了),非運(yùn)行狀態(tài)下當(dāng)然是需要
        // reject;然后再判斷當(dāng)前線程數(shù)是否為0(有可能這個(gè)時(shí)候線程數(shù)變?yōu)榱?),如是,新增一個(gè)線程;
        // 3. If we cannot queue task, then we try to add a new
        // thread.  If it fails, we know we are shut down or saturated
        // and so reject the task.
        // 翻譯如下:
        // 如果不能加入任務(wù)到工作隊(duì)列,將嘗試使用任務(wù)新增一個(gè)線程,如果失敗,則是線程池已經(jīng)shutdown或者線程池
        // 已經(jīng)達(dá)到飽和狀態(tài),所以reject這個(gè)他任務(wù)
        //
       int c = ctl.get();
       // 工作線程數(shù)小于核心線程數(shù)
       if (workerCountOf(c)

下面我們繼續(xù)看看addWorker是如何實(shí)現(xiàn)的:

 private boolean addWorker(Runnable firstTask, boolean core) {
       // java標(biāo)簽
       retry:
       // 死循環(huán)
       for (;;) {
           int c = ctl.get();
           // 獲取當(dāng)前線程狀態(tài)
           int rs = runStateOf(c);
           // Check if queue empty only if necessary.
           // 這個(gè)邏輯判斷有點(diǎn)繞可以改成
           // rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
           // 邏輯判斷成立可以分為以下幾種情況均不接受新任務(wù)
           // 1、rs > shutdown:--不接受新任務(wù)
           // 2、rs >= shutdown && firstTask != null:--不接受新任務(wù)
           // 3、rs >= shutdown && workQueue.isEmppty:--不接受新任務(wù)
           // 邏輯判斷不成立
           // 1、rs==shutdown&&firstTask != null:此時(shí)不接受新任務(wù),但是仍會(huì)執(zhí)行隊(duì)列中的任務(wù)
           // 2、rs==shotdown&&firstTask == null:會(huì)執(zhí)行addWork(null,false)
           //  防止了SHUTDOWN狀態(tài)下沒有活動(dòng)線程了,但是隊(duì)列里還有任務(wù)沒執(zhí)行這種特殊情況。
           //  添加一個(gè)null任務(wù)是因?yàn)镾HUTDOWN狀態(tài)下,線程池不再接受新任務(wù)
           if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
               return false;
           // 死循環(huán)
           // 如果線程池狀態(tài)為RUNNING并且隊(duì)列中還有需要執(zhí)行的任務(wù)
           for (;;) {
               // 獲取線程池中線程數(shù)量
               int wc = workerCountOf(c);
               // 如果超出容量或者最大線程池容量不在接受新任務(wù)
               if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                   return false;
               // 線程安全增加工作線程數(shù)
               if (compareAndIncrementWorkerCount(c))
                   // 跳出retry
                   break retry;
               c = ctl.get();  // Re-read ctl
               // 如果線程池狀態(tài)發(fā)生變化,重新循環(huán)
               if (runStateOf(c) != rs)
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
           }
       }

       // 走到這里說明工作線程數(shù)增加成功
       boolean workerStarted = false;
       boolean workerAdded = false;
       Worker w = null;
       try {
           final ReentrantLock mainLock = this.mainLock;
           w = new Worker(firstTask);
           final Thread t = w.thread;
           if (t != null) {
               // 加鎖
               mainLock.lock();
               try {
                   // Recheck while holding lock.
                   // Back out on ThreadFactory failure or if                   // shut down before lock acquired.
                   int c = ctl.get();
                   int rs = runStateOf(c);
                   // RUNNING狀態(tài) || SHUTDONW狀態(tài)下清理隊(duì)列中剩余的任務(wù)
                   if (rs if (t.isAlive()) // precheck that t is startable
                           throw new IllegalThreadStateException();
                       // 將新啟動(dòng)的線程添加到線程池中
                       workers.add(w);
                       // 更新線程池線程數(shù)且不超過最大值
                       int s = workers.size();
                       if (s > largestPoolSize)
                           largestPoolSize = s;
                       workerAdded = true;
                   }
               } finally {
                   mainLock.unlock();
               }
               // 啟動(dòng)新添加的線程,這個(gè)線程首先執(zhí)行firstTask,然后不停的從隊(duì)列中取任務(wù)執(zhí)行
               if (workerAdded) {
                   //執(zhí)行ThreadPoolExecutor的runWoker方法
                   t.start();
                   workerStarted = true;
               }
           }
       } finally {
           // 線程啟動(dòng)失敗,則從wokers中移除w并遞減wokerCount
           if (! workerStarted)
               // 遞減wokerCount會(huì)觸發(fā)tryTerminate方法
               addWorkerFailed(w);
       }
       return workerStarted;
   }

addWorker之后是runWorker,第一次啟動(dòng)會(huì)執(zhí)行初始化傳進(jìn)來的任務(wù)firstTask;然后會(huì)從workQueue中取任務(wù)執(zhí)行,如果隊(duì)列為空則等待keepAliveTime這么長時(shí)間

final void runWorker(Worker w) {
       Thread wt = Thread.currentThread();
       Runnable task = w.firstTask;
       w.firstTask = null;
       // 允許中斷
       w.unlock(); // allow interrupts
       boolean completedAbruptly = true;
       try {
           // 如果getTask返回null那么getTask中會(huì)將workerCount遞減,如果異常了這個(gè)遞減操作會(huì)在processWorkerExit中處理
           while (task != null || (task = getTask()) != null) {
               w.lock();
               // If pool is stopping, ensure thread is interrupted;
               // if not, ensure thread is not interrupted.  This
               // requires a recheck in second case to deal with
               // shutdownNow race while clearing interrupt
               if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                     runStateAtLeast(ctl.get(), STOP))) &&
                   !wt.isInterrupted())
                   wt.interrupt();
               try {
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
                       task.run();
                   } catch (RuntimeException x) {
                       thrown = x; throw x;
                   } catch (Error x) {
                       thrown = x; throw x;
                   } catch (Throwable x) {
                       thrown = x; throw new Error(x);
                   } finally {
                       afterExecute(task, thrown);
                   }
               } finally {
                   task = null;
                   w.completedTasks++;
                   w.unlock();
               }
           }
           completedAbruptly = false;
       } finally {
           processWorkerExit(w, completedAbruptly);
       }
   }

我們看下getTask是如何執(zhí)行的

private Runnable getTask() {
       boolean timedOut = false; // Did the last poll() time out?
       // 死循環(huán)
       retry: for (;;) {
           // 獲取線程池狀態(tài)
           int c = ctl.get();
           int rs = runStateOf(c);
           // Check if queue empty only if necessary.
           // 1.rs > SHUTDOWN 所以rs至少等于STOP,這時(shí)不再處理隊(duì)列中的任務(wù)
           // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,這時(shí)還需要處理隊(duì)列中的任務(wù)除非隊(duì)列為空
           // 這兩種情況都會(huì)返回null讓runWoker退出while循環(huán)也就是當(dāng)前線程結(jié)束了,所以必須要decrement
           if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
               // 遞減workerCount值
               decrementWorkerCount();
               return null;
           }
           // 標(biāo)記從隊(duì)列中取任務(wù)時(shí)是否設(shè)置超時(shí)時(shí)間
           boolean timed; // Are workers subject to culling?
           // 1.RUNING狀態(tài)
           // 2.SHUTDOWN狀態(tài),但隊(duì)列中還有任務(wù)需要執(zhí)行
           for (;;) {
               int wc = workerCountOf(c);
               // 1.core thread允許被超時(shí),那么超過corePoolSize的的線程必定有超時(shí)
               // 2.allowCoreThreadTimeOut == false && wc >
               // corePoolSize時(shí),一般都是這種情況,core thread即使空閑也不會(huì)被回收,只要超過的線程才會(huì)
               timed = allowCoreThreadTimeOut || wc > corePoolSize;
               // 從addWorker可以看到一般wc不會(huì)大于maximumPoolSize,所以更關(guān)心后面半句的情形:
               // 1. timedOut == false 第一次執(zhí)行循環(huán), 從隊(duì)列中取出任務(wù)不為null方法返回 或者
               // poll出異常了重試
               // 2.timeOut == true && timed ==
               // false:看后面的代碼workerQueue.poll超時(shí)時(shí)timeOut才為true,
               // 并且timed要為false,這兩個(gè)條件相悖不可能同時(shí)成立(既然有超時(shí)那么timed肯定為true)
               // 所以超時(shí)不會(huì)繼續(xù)執(zhí)行而是return null結(jié)束線程。
               if (wc break;
               // workerCount遞減,結(jié)束當(dāng)前thread
               if (compareAndDecrementWorkerCount(c))
                   return null;
               c = ctl.get(); // Re-read ctl
               // 需要重新檢查線程池狀態(tài),因?yàn)樯鲜霾僮鬟^程中線程池可能被SHUTDOWN
               if (runStateOf(c) != rs)
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
           }
           try {
               // 1.以指定的超時(shí)時(shí)間從隊(duì)列中取任務(wù)
               // 2.core thread沒有超時(shí)
               Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
               if (r != null)
                   return r;
               timedOut = true;// 超時(shí)
           } catch (InterruptedException retry) {
               timedOut = false;// 線程被中斷重試
           }
       }
   }

下面我們看下processWorkerExit是如何工作的

private void processWorkerExit(Worker w, boolean completedAbruptly) {
       // 正常的話再runWorker的getTask方法workerCount已經(jīng)被減一了
       if (completedAbruptly)
           decrementWorkerCount();
       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           // 累加線程的completedTasks
           completedTaskCount += w.completedTasks;
           // 從線程池中移除超時(shí)或者出現(xiàn)異常的線程
           workers.remove(w);
       } finally {
           mainLock.unlock();
       }
       // 嘗試停止線程池
       tryTerminate();
       int c = ctl.get();
       // runState為RUNNING或SHUTDOWN
       if (runStateLessThan(c, STOP)) {
           // 線程不是異常結(jié)束
           if (!completedAbruptly) {
               // 線程池最小空閑數(shù),允許core thread超時(shí)就是0,否則就是corePoolSize
               int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
               // 如果min == 0但是隊(duì)列不為空要保證有1個(gè)線程來執(zhí)行隊(duì)列中的任務(wù)
               if (min == 0 && !workQueue.isEmpty())
                   min = 1;
               // 線程池還不為空那就不用擔(dān)心了
               if (workerCountOf(c) >= min)
                   return; // replacement not needed
           }
           // 1.線程異常退出
           // 2.線程池為空,但是隊(duì)列中還有任務(wù)沒執(zhí)行,看addWoker方法對這種情況的處理
           addWorker(null, false);
       }
   }

tryTerminate

processWorkerExit方法中會(huì)嘗試調(diào)用tryTerminate來終止線程池。這個(gè)方法在任何可能導(dǎo)致線程池終止的動(dòng)作后執(zhí)行:比如減少wokerCount或SHUTDOWN狀態(tài)下從隊(duì)列中移除任務(wù)。

final void tryTerminate() {
       for (;;) {
           int c = ctl.get();
           // 以下狀態(tài)直接返回:
           // 1.線程池還處于RUNNING狀態(tài)
           // 2.SHUTDOWN狀態(tài)但是任務(wù)隊(duì)列非空
           // 3.runState >= TIDYING 線程池已經(jīng)停止了或在停止了
           if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
               return;
           // 只能是以下情形會(huì)繼續(xù)下面的邏輯:結(jié)束線程池。
           // 1.SHUTDOWN狀態(tài),這時(shí)不再接受新任務(wù)而且任務(wù)隊(duì)列也空了
           // 2.STOP狀態(tài),當(dāng)調(diào)用了shutdownNow方法
           // workerCount不為0則還不能停止線程池,而且這時(shí)線程都處于空閑等待的狀態(tài)
           // 需要中斷讓線程“醒”過來,醒過來的線程才能繼續(xù)處理shutdown的信號(hào)。
           if (workerCountOf(c) != 0) { // Eligible to terminate
               // runWoker方法中w.unlock就是為了可以被中斷,getTask方法也處理了中斷。
               // ONLY_ONE:這里只需要中斷1個(gè)線程去處理shutdown信號(hào)就可以了。
               interruptIdleWorkers(ONLY_ONE);
               return;
           }
           final ReentrantLock mainLock = this.mainLock;
           mainLock.lock();
           try {
               // 進(jìn)入TIDYING狀態(tài)
               if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                   try {
                       // 子類重載:一些資源清理工作
                       terminated();
                   } finally {
                       // TERMINATED狀態(tài)
                       ctl.set(ctlOf(TERMINATED, 0));
                       // 繼續(xù)awaitTermination
                       termination.signalAll();
                   }
                   return;
               }
           } finally {
               mainLock.unlock();
           }
           // else retry on failed CAS
       }
   }

shutdown這個(gè)方法會(huì)將runState置為SHUTDOWN,會(huì)終止所有空閑的線程。shutdownNow方法將runState置為STOP。和shutdown方法的區(qū)別,這個(gè)方法會(huì)終止所有的線程。主要區(qū)別在于shutdown調(diào)用的是interruptIdleWorkers這個(gè)方法,而shutdownNow實(shí)際調(diào)用的是Worker類的interruptIfStarted方法:

他們的實(shí)現(xiàn)如下:

public void shutdown() {
       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           checkShutdownAccess();
           // 線程池狀態(tài)設(shè)為SHUTDOWN,如果已經(jīng)至少是這個(gè)狀態(tài)那么則直接返回
           advanceRunState(SHUTDOWN);
           // 注意這里是中斷所有空閑的線程:runWorker中等待的線程被中斷 → 進(jìn)入processWorkerExit →
           // tryTerminate方法中會(huì)保證隊(duì)列中剩余的任務(wù)得到執(zhí)行。
           interruptIdleWorkers();
           onShutdown(); // hook for ScheduledThreadPoolExecutor
       } finally {
           mainLock.unlock();
       }
       tryTerminate();
   }
public List shutdownNow() {
   List tasks;
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
       checkShutdownAccess();
       // STOP狀態(tài):不再接受新任務(wù)且不再執(zhí)行隊(duì)列中的任務(wù)。
       advanceRunState(STOP);
       // 中斷所有線程
       interruptWorkers();
       // 返回隊(duì)列中還沒有被執(zhí)行的任務(wù)。
       tasks = drainQueue();
   }
   finally {
       mainLock.unlock();
   }
   tryTerminate();
   return tasks;
}
private void interruptIdleWorkers(boolean onlyOne) {
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
       for (Worker w : workers) {
           Thread t = w.thread;
           // w.tryLock能獲取到鎖,說明該線程沒有在運(yùn)行,因?yàn)閞unWorker中執(zhí)行任務(wù)會(huì)先lock,
           // 因此保證了中斷的肯定是空閑的線程。
           if (!t.isInterrupted() && w.tryLock()) {
               try {
                   t.interrupt();
               } catch (SecurityException ignore) {
               } finally {
                   w.unlock();
               }
           }
           if (onlyOne)
               break;
       }
   }
   finally {
       mainLock.unlock();
   }
}
void interruptIfStarted() {
   Thread t;
   // 初始化時(shí)state == -1
   if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
       try {
           t.interrupt();
       } catch (SecurityException ignore) {
       }
   }
}

線程池的使用

線程池的創(chuàng)建

我們可以通過ThreadPoolExecutor來創(chuàng)建一個(gè)線程池

   /**
    * @param corePoolSize 線程池基本大小,核心線程池大小,活動(dòng)線程小于corePoolSize則直接創(chuàng)建,大于等于則先加到workQueue中,
    * 隊(duì)列滿了才創(chuàng)建新的線程。當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,
    * 等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,
    * 線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。
    * @param maximumPoolSize 最大線程數(shù),超過就reject;線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,
    * 并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)
    * @param keepAliveTime
    * 線程池的工作線程空閑后,保持存活的時(shí)間。所以,如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短,可以調(diào)大時(shí)間,提高線程的利用率
    * @param unit  線程活動(dòng)保持時(shí)間的單位):可選的單位有天(DAYS)、小時(shí)(HOURS)、分鐘(MINUTES)、
    * 毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)
    * @param workQueue 工作隊(duì)列,線程池中的工作線程都是從這個(gè)工作隊(duì)列源源不斷的獲取任務(wù)進(jìn)行執(zhí)行
    */
   public ThreadPoolExecutor(int corePoolSize,
              int maximumPoolSize,
              long keepAliveTime,
              TimeUnit unit,
              BlockingQueue workQueue) {
       // threadFactory用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個(gè)創(chuàng)建出來的線程設(shè)置更有意義的名字
       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
               Executors.defaultThreadFactory(), defaultHandler);
   }
向線程池提交任務(wù)

可以使用兩個(gè)方法向線程池提交任務(wù),分別為execute()和submit()方法。execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功。通過以下代碼可知execute()方法輸入的任務(wù)是一個(gè)Runnable類的實(shí)例。

threadsPool.execute(new Runnable() {
       @Override
       public void run() {
       }
   });

submit()方法用于提交需要返回值的任務(wù)。線程池會(huì)返回一個(gè)future類型的對象,通過這個(gè)future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout,TimeUnit unit)方法則會(huì)阻塞當(dāng)前線程一段時(shí)間后立即返回,這時(shí)候有可能任務(wù)沒有執(zhí)行完。

Future future = executor.submit(harReturnValuetask);
 try
   {
       Object s = future.get();
   }catch(
   InterruptedException e)
   {
       // 處理中斷異常
   }catch(
   ExecutionException e)
   {
       // 處理無法執(zhí)行任務(wù)異常
   }finally
   {
       // 關(guān)閉線程池
       executor.shutdown();
   }
關(guān)閉線程池

可以通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。

只要調(diào)用了這兩個(gè)關(guān)閉方法中的任意一個(gè),isShutdown方法就會(huì)返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true。至于應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。

合理的配置線程池

要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來分析。

1、任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。

2、任務(wù)的優(yōu)先級(jí):高、中和低。

3、任務(wù)的執(zhí)行時(shí)間:長、中和短。

4、任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。

性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置Ncpu+1個(gè)線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿?wù),如果可以拆分,將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒必要進(jìn)行分解??梢酝ㄟ^Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)。優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來處理。它可以讓優(yōu)先級(jí)高的任務(wù)先執(zhí)行

如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級(jí)隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行。依賴數(shù)據(jù)庫連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,等待的時(shí)間越長,則CPU空閑時(shí)間就越長,那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU。

建議使用有界隊(duì)列。有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn)兒,比如幾千。有時(shí)候我們系統(tǒng)里后臺(tái)任務(wù)線程池的隊(duì)列和線程池全滿了,不斷拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因?yàn)楹笈_(tái)任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞,任務(wù)積壓在線程池里。如果當(dāng)時(shí)我們設(shè)置成無界隊(duì)列,那么線程池的隊(duì)列就會(huì)越來越多,有可能會(huì)撐滿內(nèi)存,導(dǎo)致整個(gè)系統(tǒng)不可用,而不只是后臺(tái)任務(wù)出現(xiàn)問題。當(dāng)然,我們的系統(tǒng)所有的任務(wù)是用單獨(dú)的服務(wù)器部署的,我們使用不同規(guī)模的線程池完成不同類型的任務(wù),但是出現(xiàn)這樣問題時(shí)也會(huì)影響到其他任務(wù)。

線程池的監(jiān)控

如果在系統(tǒng)中大量使用線程池,則有必要對線程池進(jìn)行監(jiān)控,方便在出現(xiàn)問題時(shí),可以根據(jù)線程池的使用狀況快速定位問題??梢酝ㄟ^線程池提供的參數(shù)進(jìn)行監(jiān)控,在監(jiān)控線程池的時(shí)候可以使用以下屬性

  • taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。
  • completedTaskCount:線程池在運(yùn)行過程中已完成的任務(wù)數(shù)量,小于或等于taskCount。
  • largestPoolSize:線程池里曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個(gè)數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過。如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過。
  • getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會(huì)自動(dòng)銷毀,所以這個(gè)大小只增不減。
  • getActiveCount:獲取活動(dòng)的線程數(shù)。

通過擴(kuò)展線程池進(jìn)行監(jiān)控??梢酝ㄟ^繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí)行一些代碼來進(jìn)行監(jiān)控。例如,監(jiān)控任務(wù)的平均執(zhí)行時(shí)間、最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等。

“Java線程池是怎么工作的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI