溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

詳解Java線程池和Executor原理的分析

發(fā)布時(shí)間:2020-09-20 03:04:04 來(lái)源:腳本之家 閱讀:133 作者:木子_6259 欄目:編程語(yǔ)言

詳解Java線程池和Executor原理的分析

線程池作用與基本知識(shí)

在開始之前,我們先來(lái)討論下“線程池”這個(gè)概念。“線程池”,顧名思義就是一個(gè)線程緩存。它是一個(gè)或者多個(gè)線程的集合,用戶可以把需要執(zhí)行的任務(wù)簡(jiǎn)單地扔給線程池,而不用過多的糾結(jié)與執(zhí)行的細(xì)節(jié)。那么線程池有哪些作用?或者說(shuō)與直接用Thread相比,有什么優(yōu)勢(shì)?我簡(jiǎn)單總結(jié)了以下幾點(diǎn):

減小線程創(chuàng)建和銷毀帶來(lái)的消耗

對(duì)于Java Thread的實(shí)現(xiàn),我在前面的一篇blog中進(jìn)行了分析。Java Thread與內(nèi)核線程是1:1(Linux)的,再加上Thread在Java層與C++層都有不少成員數(shù)據(jù),所以Java Thread其實(shí)是比較重的。創(chuàng)建和銷毀一個(gè)Java Thread需要OS和JVM都做不少工作,因此如果將Java Thread緩存起來(lái),可以實(shí)現(xiàn)一定的效率提升。

更加方便和透明的實(shí)現(xiàn)計(jì)算資源控制

討論這一條,可能需要舉一些例子。以非常聞名的web服務(wù)器Nginx為例,Nginx以強(qiáng)大的并發(fā)能力和低資源消耗而著稱。Nginx為了實(shí)現(xiàn)這些嚴(yán)格的要求,它嚴(yán)格地限定了工作線程的數(shù)目(worker線程一般等于CPU數(shù)目)。這種設(shè)計(jì)的著眼點(diǎn)就是降低線程切換帶來(lái)的性能損失,這條優(yōu)化方式對(duì)Java同樣適用。倘若,每來(lái)一個(gè)任務(wù)就新建一個(gè)Thread來(lái)運(yùn)算,那最終的結(jié)果就是程序資源難以控制(某個(gè)功能把CPU跑滿了),而且整體的執(zhí)行速度也比較慢。 而Java線程池提供了FixedThreadPool,你可以使用它實(shí)現(xiàn)線程最大數(shù)目的控制。

上面說(shuō)了這么多的“廢話”,還是來(lái)結(jié)合Java線程池的實(shí)現(xiàn)來(lái)分析一下吧!Java的線程池有一下幾種實(shí)現(xiàn):

cached ThreadPool

緩存線程池的特點(diǎn)是它會(huì)緩存之前的線程,新提交的任務(wù)可以運(yùn)行在緩存的線程中,即實(shí)現(xiàn)了前文所述的第一個(gè)優(yōu)勢(shì)。

fixed ThreadPool

cachedThreadPool的一個(gè)特點(diǎn)是——新提交的任務(wù)沒有空閑線程可以執(zhí)行了,就會(huì)創(chuàng)建一個(gè)新的線程。而fixedThreadPool不會(huì)這樣,它會(huì)將任務(wù)保存起來(lái),等到有空閑線程再執(zhí)行。即實(shí)現(xiàn)了前文所述的第二個(gè)優(yōu)勢(shì)。

scheduled ThreadPool

scheduled ThreadPool的特點(diǎn)是可以實(shí)現(xiàn)任務(wù)的調(diào)度,比如任務(wù)的延遲執(zhí)行和周期執(zhí)行。

出了上面三種,Java還實(shí)現(xiàn)了newWorkStealingPool,這個(gè)是基于Fork/Join框架的。目前我還沒研究這個(gè),所以就先不管它了。Java的并發(fā)支持中,使用了Executor來(lái)包裝各種線程池,“執(zhí)行器”這個(gè)名稱其實(shí)挺貼切的,線程池可不就是個(gè)執(zhí)行器嘛!

1.cached ThreadPool、fixed ThreadPool的實(shí)現(xiàn)

從前文的描述就可以看出,這兩種線程池非常類似。的確是這樣,事實(shí)上它們是同時(shí)實(shí)現(xiàn)的,不行我們來(lái)看實(shí)際例子:

ThreadPoolExecutor executor1 = (ThreadPoolExecutor)Executors.newCachedThreadPool();

ThreadPoolExecutor executor2 = (ThreadPoolExecutor)Executors.newFixedThreadPool(4);

這是兩種線程池的新建方法,看起來(lái)很像吧!如果你不這么認(rèn)為,我只能讓你看看真相了。

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>());
}

是的,它們調(diào)用了同一個(gè)構(gòu)造函數(shù),只是參數(shù)略有不同。那么我們來(lái)看看這些參數(shù)的含義,以及兩組參數(shù)的區(qū)別。首先還是需要貼一下ThreadPoolExecutor的構(gòu)造函數(shù)了。

public ThreadPoolExecutor(int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
             TimeUnit unit,
             BlockingQueue<Runnable> workQueue) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), defaultHandler);
}

為了看起來(lái)清爽,再一層的構(gòu)造函數(shù)我就不貼了,而且那個(gè)構(gòu)造函數(shù)也只是簡(jiǎn)單的賦值而已。這里的函數(shù)原型已經(jīng)能給我們很多很多信息了,不得不說(shuō)JDK的代碼命名確實(shí)好,簡(jiǎn)直就像注釋一樣。

maximumPoolSize就是線程池的最大線程數(shù);對(duì)于cached ThreadPool來(lái)說(shuō),這個(gè)值是Integer.MAX_VALUE,基本相當(dāng)于無(wú)窮大了,什么樣的機(jī)器能跑幾十億線程?。?duì)于fixed ThreadPool來(lái)講,這個(gè)值就是用戶設(shè)定的線程池的數(shù)目。
keepAliveTime和unit決定了線程的緩存過期時(shí)間;對(duì)于cached ThreadPool來(lái)講,線程的緩存過期時(shí)間是一分鐘,換言之,一個(gè)工作線程如果一分鐘都無(wú)事可干,就把它撤銷掉以節(jié)省資源。fixed ThreadPool傳入的時(shí)間是0,這里的含義是fixed ThreadPool中的工作線程是永遠(yuǎn)不過期的。

corePoolSize是線程池的最小線程數(shù);對(duì)于cached ThreadPool,這個(gè)值為0,因?yàn)樵谕耆珱]有任務(wù)的情況下,cached ThreadPool的確會(huì)成為“光桿司令”。至于fixed ThreadPool,這個(gè)fixed已經(jīng)表明corePoolSize是等于線程總數(shù)的。
接下來(lái),我們根據(jù)一個(gè)簡(jiǎn)單的使用例子,來(lái)看看一下cached ThreadPool的流程。

public class Task implements Callable<String> {

private String name;
public Task(String name) {
  this.name = name;
}
@Override
public String call() throws Exception {
  System.out.printf("%s: Starting at : %s\n", this.name, new Date());
  return "hello, world";
}
public static void main(String[] args) {
  ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
  Task task = new Task("test");
  Future<String> result = executor.submit(task);
  try {
    System.out.printf("%s\n", result.get());
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }
  executor.shutdown();
  System.out.printf("Main ends at : %s\n", new Date());
}
}

首先,來(lái)看看executor.submit(task),這其實(shí)調(diào)用了ThreadPoolExecutor.execute(Runnable command)方法,這個(gè)方法的代碼如下,整段代碼的邏輯是這樣的。首先檢查線程池的線程數(shù)是否不夠corePoolSize,如果不夠就直接新建線程并把command添加進(jìn)去;如果線程數(shù)已經(jīng)夠了或者添加失?。ǘ鄠€(gè)線程增加添加的情況),就嘗試把command添加到隊(duì)列中(workQueue.offer(command)),如果添加失敗了,就reject掉cmd。大體的邏輯是這樣的,這段代碼有很多基于線程安全的設(shè)計(jì),這里為了不跑題,就先忽略細(xì)節(jié)了。

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  else if (!addWorker(command, false))
    reject(command);
}

到這里,看起來(lái)線程池實(shí)現(xiàn)的整體思路其實(shí)也沒多么復(fù)雜。但是還有一個(gè)問題——一個(gè)普通的Thread在執(zhí)行完自己的run方法后會(huì)自動(dòng)退出。那么線程池是如何實(shí)現(xiàn)Worker線程不斷的干活,甚至在沒有任務(wù)的時(shí)候。其實(shí)答案很簡(jiǎn)單,就是Worker其實(shí)在跑大循環(huán),Worker實(shí)際運(yùn)行方法如下:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    while (task != null || (task = getTask()) != null) {
      w.lock();
  /***/
      try {
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          task.run();
        /***/
        } finally {
          afterExecute(task, thrown);
        }
      } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

關(guān)鍵就在這個(gè)while的判斷條件,對(duì)于需要cached線程的情況下,getTask()會(huì)阻塞起來(lái),如果緩存的時(shí)間過期,就會(huì)返回一個(gè)null,然后Worker就退出了,也就結(jié)束了它的服役周期。而在有任務(wù)的情況下,Woker會(huì)把task拿出來(lái),然后調(diào)用task.run()執(zhí)行任務(wù),并通過Future通知客戶線程(即future.get()返回)。這樣一個(gè)簡(jiǎn)單的線程池使用過程就完了。。。

當(dāng)然,線程池的很多精髓知識(shí)——基于線程安全的設(shè)計(jì),我都沒有分析。有興趣可以自己分析一下,也可以和我討論。此外Scheduled ThreadPool這里也沒有分析,它的要點(diǎn)其實(shí)是調(diào)度,主要是根據(jù)時(shí)間最小堆來(lái)驅(qū)動(dòng)的。

感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持,如有疑問請(qǐng)留言,或者到本站社區(qū)交流,大家共同進(jìn)步!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI