溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

ThreadPoolExecutor線(xiàn)程池原理及其execute方法(詳解)

發(fā)布時(shí)間:2020-10-17 14:47:56 來(lái)源:腳本之家 閱讀:250 作者:jingxian 欄目:編程語(yǔ)言

jdk1.7.0_79

對(duì)于線(xiàn)程池大部分人可能會(huì)用,也知道為什么用。無(wú)非就是任務(wù)需要異步執(zhí)行,再者就是線(xiàn)程需要統(tǒng)一管理起來(lái)。對(duì)于從線(xiàn)程池中獲取線(xiàn)程,大部分人可能只知道,我現(xiàn)在需要一個(gè)線(xiàn)程來(lái)執(zhí)行一個(gè)任務(wù),那我就把任務(wù)丟到線(xiàn)程池里,線(xiàn)程池里有空閑的線(xiàn)程就執(zhí)行,沒(méi)有空閑的線(xiàn)程就等待。實(shí)際上對(duì)于線(xiàn)程池的執(zhí)行原理遠(yuǎn)遠(yuǎn)不止這么簡(jiǎn)單。

Java并發(fā)包中提供了線(xiàn)程池類(lèi)——ThreadPoolExecutor,實(shí)際上更多的我們可能用到的是Executors工廠(chǎng)類(lèi)為我們提供的線(xiàn)程池newFixedThreadPool、newSingleThreadPool、newCachedThreadPool,這三個(gè)線(xiàn)程池并不是ThreadPoolExecutor的子類(lèi),關(guān)于這幾者之間的關(guān)系,我們先來(lái)查看ThreadPoolExecutor,查看源碼發(fā)現(xiàn)其一共有4個(gè)構(gòu)造方法。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

首先就從這幾個(gè)參數(shù)開(kāi)始來(lái)了解線(xiàn)程池ThreadPoolExecutor的執(zhí)行原理。

corePoolSize:核心線(xiàn)程池的線(xiàn)程數(shù)量

maximumPoolSize:最大的線(xiàn)程池線(xiàn)程數(shù)量

keepAliveTime:線(xiàn)程活動(dòng)保持時(shí)間,線(xiàn)程池的工作線(xiàn)程空閑后,保持存活的時(shí)間。

unit:線(xiàn)程活動(dòng)保持時(shí)間的單位。

workQueue:指定任務(wù)隊(duì)列所使用的阻塞隊(duì)列

corePoolSizemaximumPoolSize都在指定線(xiàn)程池中的線(xiàn)程數(shù)量,好像平時(shí)用到線(xiàn)程池的時(shí)候最多就只需要傳遞一個(gè)線(xiàn)程池大小的參數(shù)就能創(chuàng)建一個(gè)線(xiàn)程池啊,Java為我們提供了一些常用的線(xiàn)程池類(lèi)就是上面提到的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,當(dāng)然如果我們想要自己發(fā)揮創(chuàng)建自定義的線(xiàn)程池就得自己來(lái)“配置”有關(guān)線(xiàn)程池的一些參數(shù)。

當(dāng)把一個(gè)任務(wù)交給線(xiàn)程池來(lái)處理的時(shí)候,線(xiàn)程池的執(zhí)行原理如下圖所示參考自《Java并發(fā)編程的藝術(shù)》

ThreadPoolExecutor線(xiàn)程池原理及其execute方法(詳解)

首先會(huì)判斷核心線(xiàn)程池里是否有線(xiàn)程可執(zhí)行,有空閑線(xiàn)程則創(chuàng)建一個(gè)線(xiàn)程來(lái)執(zhí)行任務(wù)。

②當(dāng)核心線(xiàn)程池里已經(jīng)沒(méi)有線(xiàn)程可執(zhí)行的時(shí)候,此時(shí)將任務(wù)丟到任務(wù)隊(duì)列中去。

③如果任務(wù)隊(duì)列(有界)也已經(jīng)滿(mǎn)了的話(huà),但運(yùn)行的線(xiàn)程數(shù)小于最大線(xiàn)程池的數(shù)量的時(shí)候,此時(shí)將會(huì)新建一個(gè)線(xiàn)程用于執(zhí)行任務(wù),但如果運(yùn)行的線(xiàn)程數(shù)已經(jīng)達(dá)到最大線(xiàn)程池的數(shù)量的時(shí)候,此時(shí)將無(wú)法創(chuàng)建線(xiàn)程執(zhí)行任務(wù)。

所以實(shí)際上對(duì)于線(xiàn)程池不僅是單純地將任務(wù)丟到線(xiàn)程池,線(xiàn)程池中有線(xiàn)程就執(zhí)行任務(wù),沒(méi)線(xiàn)程就等待。

為鞏固一下線(xiàn)程池的原理,現(xiàn)在再來(lái)了解上面提到的常用的3個(gè)線(xiàn)程池:

Executors.newFixedThreadPool創(chuàng)建一個(gè)固定數(shù)量線(xiàn)程的線(xiàn)程池。

// Executors#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

可以看到newFixedThreadPool中調(diào)用的是ThreadPoolExecutor類(lèi),傳遞的參數(shù)corePoolSize= maximumPoolSize=nThread?;仡櫨€(xiàn)程池的執(zhí)行原理,當(dāng)一個(gè)任務(wù)提交到線(xiàn)程池中,首先判斷核心線(xiàn)程池里有沒(méi)有空閑線(xiàn)程,有則創(chuàng)建線(xiàn)程,沒(méi)有則將任務(wù)放到任務(wù)隊(duì)列(這里是有界阻塞隊(duì)列LinkedBlockingQueue)中,如果任務(wù)隊(duì)列已經(jīng)滿(mǎn)了的話(huà),對(duì)于newFixedThreadPool來(lái)說(shuō),它的最大線(xiàn)程池?cái)?shù)量=核心線(xiàn)程池?cái)?shù)量,此時(shí)任務(wù)隊(duì)列也滿(mǎn)了,將不能擴(kuò)展創(chuàng)建新的線(xiàn)程來(lái)執(zhí)行任務(wù)。

Executors.newSingleThreadExecutor:創(chuàng)建只包含一個(gè)線(xiàn)程的線(xiàn)程池。  

//Executors# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

只有一個(gè)線(xiàn)程的線(xiàn)程池好像有點(diǎn)奇怪,并且并沒(méi)有直接將返回ThreadPoolExecutor,甚至也沒(méi)有直接將線(xiàn)程池?cái)?shù)量1傳遞給newFixedThreadPool返回。那就說(shuō)明這個(gè)只含有一個(gè)線(xiàn)程的線(xiàn)程池,或許并沒(méi)有只包含一個(gè)線(xiàn)程那么簡(jiǎn)單。在其源碼注釋中這么寫(xiě)到:創(chuàng)建只有一個(gè)工作線(xiàn)程的線(xiàn)程池用于操作一個(gè)無(wú)界隊(duì)列(如果由于前驅(qū)節(jié)點(diǎn)的執(zhí)行被終止結(jié)束了,一個(gè)新的線(xiàn)程將會(huì)繼續(xù)執(zhí)行后繼節(jié)點(diǎn)線(xiàn)程)任務(wù)得以繼續(xù)執(zhí)行,不同于newFixedThreadPool(1)不會(huì)有額外的線(xiàn)程來(lái)重新繼續(xù)執(zhí)行后繼節(jié)點(diǎn)。也就是說(shuō)newSingleThreadExecutor自始至終都只有一個(gè)線(xiàn)程在執(zhí)行,這和newFixedThreadPool一樣,但如果線(xiàn)程終止結(jié)束過(guò)后newSingleThreadExecutor則會(huì)重新創(chuàng)建一個(gè)新的線(xiàn)程來(lái)繼續(xù)執(zhí)行任務(wù)隊(duì)列中的線(xiàn)程,而newFixedThreaPool則不會(huì)。

Executors.newCachedThreadPool:根據(jù)需要?jiǎng)?chuàng)建新線(xiàn)程的線(xiàn)程池。

//Executors#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
  return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

可以看到newCachedThread返回的是ThreadPoolExecutor,其參數(shù)核心線(xiàn)程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,這也就是說(shuō)當(dāng)任務(wù)被提交到newCachedThread線(xiàn)程池時(shí),將會(huì)直接把任務(wù)放到SynchronousQueue任務(wù)隊(duì)列中,maximumPool從任務(wù)隊(duì)列中獲取任務(wù)。注意SynchronousQueue是一個(gè)沒(méi)有容量的隊(duì)列,也就是說(shuō)每個(gè)入隊(duì)操作必須等待另一個(gè)線(xiàn)程的對(duì)應(yīng)出隊(duì)操作,如果主線(xiàn)程提交任務(wù)的速度高于maximumPool中線(xiàn)程處理任務(wù)的速度時(shí),newCachedThreadPool會(huì)不斷創(chuàng)建線(xiàn)程,線(xiàn)程多并不是一件好事,嚴(yán)重會(huì)耗盡CPU和內(nèi)存資源。

題外話(huà):newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,這三者都直接或間接調(diào)用了ThreadPoolExecutor,為什么它們?nèi)邲](méi)有直接是其子類(lèi),而是通過(guò)Executors來(lái)實(shí)例化呢?這是所采用的靜態(tài)工廠(chǎng)方法,在java.util.Connections接口中同樣也是采用的靜態(tài)工廠(chǎng)方法來(lái)創(chuàng)建相關(guān)的類(lèi)。這樣有很多好處,靜態(tài)工廠(chǎng)方法是用來(lái)產(chǎn)生對(duì)象的,產(chǎn)生什么對(duì)象沒(méi)關(guān)系,只要返回原返回類(lèi)型或原返回類(lèi)型的子類(lèi)型都可以,降低API數(shù)目和使用難度,在《Effective Java》中的第1條就是靜態(tài)工廠(chǎng)方法。

回到ThreadPoolExecutor,首先來(lái)看它的繼承關(guān)系:

ThreadPoolExecutor線(xiàn)程池原理及其execute方法(詳解)

ThreadPoolExecutor它的頂級(jí)父類(lèi)是Executor接口,只包含了一個(gè)方法——execute,這個(gè)方法也就是線(xiàn)程池的“執(zhí)行”。

//Executor#execute
public interface Executor {
 void execute(Runnable command);
}

Executor#execute的實(shí)現(xiàn)則是在ThreadPoolExecutor中實(shí)現(xiàn)的:

//ThreadPoolExecutor#execute
public void execute(Runnable command) {
  if (command == null) 
  throw new NullPointerException();
  int c = ctl.get(); 
 …
}

一來(lái)就碰到個(gè)不知所云的ctl變量它的定義:

private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));

這個(gè)變量使用來(lái)干嘛的呢?它的作用有點(diǎn)類(lèi)似我們?cè)凇?strong>ReadWriteLock接口及其實(shí)現(xiàn)ReentrantReadWriteLock》中提到的讀寫(xiě)鎖有讀、寫(xiě)兩個(gè)同步狀態(tài),而AQS則只提供了state一個(gè)int型變量,此時(shí)將state高16位表示為讀狀態(tài),低16位表示為寫(xiě)狀態(tài)。這里的clt同樣也是,它表示了兩個(gè)概念:

workerCount:當(dāng)前有效的線(xiàn)程數(shù)

runState:當(dāng)前線(xiàn)程池的五種狀態(tài),Running、Shutdown、Stop、Tidying、Terminate。

int型變量一共有32位,線(xiàn)程池的五種狀態(tài)runState至少需要3位來(lái)表示,故workCount只能有29位,所以代碼中規(guī)定線(xiàn)程池的有效線(xiàn)程數(shù)最多為229-1。

//ThreadPoolExecutor
private static final int COUNT_BITS = Integer.SIZE – 3;  //32-3=29,線(xiàn)程數(shù)量所占位數(shù)
private static final int CAPACITY = (1 << COUNT_BITS) – 1; //低29位表示最大線(xiàn)程數(shù),229-1
//五種線(xiàn)程池狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS; /int型變量高3位(含符號(hào)位)101表RUNING
private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位000
private static final int STOP = 1 << COUNT_BITS; //高3位001
private static final int TIDYING = 2 << COUNT_BITS; //高3位010
private static final int TERMINATED = 3 << COUNT_BITS; //高3位011

再次回到ThreadPoolExecutor#execute方法:

//ThreadPoolExecutor#execute
public void execute(Runnable command) {
 if (command == null) 
  throw new NullPointerException();
   int c = ctl.get(); //由它可以獲取到當(dāng)前有效的線(xiàn)程數(shù)和線(xiàn)程池的狀態(tài)
/*1.獲取當(dāng)前正在運(yùn)行線(xiàn)程數(shù)是否小于核心線(xiàn)程池,是則新創(chuàng)建一個(gè)線(xiàn)程執(zhí)行任務(wù),否則將任務(wù)放到任務(wù)隊(duì)列中*/
 if (workerCountOf(c) < corePoolSize){
  if (addWorker(command, tre))  //在addWorker中創(chuàng)建工作線(xiàn)程執(zhí)行任務(wù)
   return ;
  c = ctl.get();
 }
/*2.當(dāng)前核心線(xiàn)程池中全部線(xiàn)程都在運(yùn)行workerCountOf(c) >= corePoolSize,所以此時(shí)將線(xiàn)程放到任務(wù)隊(duì)列中*/
 if (isRunning(c) && workQueue.offer(command)) { //線(xiàn)程池是否處于運(yùn)行狀態(tài),且是否任務(wù)插入任務(wù)隊(duì)列成功
  int recheck = ctl.get();
     if (!isRunning(recheck) && remove(command))  //線(xiàn)程池是否處于運(yùn)行狀態(tài),如果不是則使剛剛的任務(wù)出隊(duì)
       reject(command); //拋出RejectedExceptionException異常
     else if (workerCountOf(recheck) == 0)
       addWorker(null, false);
  }
/*3.插入隊(duì)列不成功,且當(dāng)前線(xiàn)程數(shù)數(shù)量小于最大線(xiàn)程池?cái)?shù)量,此時(shí)則創(chuàng)建新線(xiàn)程執(zhí)行任務(wù),創(chuàng)建失敗拋出異常*/
  else if (!addWorker(command, false)){
    reject(command); //拋出RejectedExceptionException異常
  }
}

上面代碼注釋第7行的即判斷當(dāng)前核心線(xiàn)程池里是否有空閑線(xiàn)程,有則通過(guò)addWorker方法創(chuàng)建工作線(xiàn)程執(zhí)行任務(wù)。addWorker方法較長(zhǎng),篩選出重要的代碼來(lái)解析。

//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*首先會(huì)再次檢查線(xiàn)程池是否處于運(yùn)行狀態(tài),核心線(xiàn)程池中是否還有空閑線(xiàn)程,都滿(mǎn)足條件過(guò)后則會(huì)調(diào)用compareAndIncrementWorkerCount先將正在運(yùn)行的線(xiàn)程數(shù)+1,數(shù)量自增成功則跳出循環(huán),自增失敗則繼續(xù)從頭繼續(xù)循環(huán)*/
  ...
  if (compareAndIncrementWorkerCount(c))
    break retry;
  ...
/*正在運(yùn)行的線(xiàn)程數(shù)自增成功后則將線(xiàn)程封裝成工作線(xiàn)程Worker*/
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    final ReentrantLock mainLock = this.mainLock;  //全局鎖
    w = new Woker(firstTask);  //將線(xiàn)程封裝為Worker工作線(xiàn)程
    final Thread t = w.thread;
    if (t != null) {
      mainLock.lock(); //獲取全局鎖
/*當(dāng)持有了全局鎖的時(shí)候,還需要再次檢查線(xiàn)程池的運(yùn)行狀態(tài)等*/
      try {
        int c = clt.get();
        int rs = runStateOf(c);  //線(xiàn)程池運(yùn)行狀態(tài)
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){  //線(xiàn)程池處于運(yùn)行狀態(tài),或者線(xiàn)程池關(guān)閉且任務(wù)線(xiàn)程為空
          if (t.isAlive()) //線(xiàn)程處于活躍狀態(tài),即線(xiàn)程已經(jīng)開(kāi)始執(zhí)行或者還未死亡,正確的應(yīng)線(xiàn)程在這里應(yīng)該是還未開(kāi)始執(zhí)行的
            throw new IllegalThreadStateException();
          workers.add(w); //private final HashSet<Worker> wokers = new HashSet<Worker>();包含線(xiàn)程池中所有的工作線(xiàn)程,只有在獲取了全局的時(shí)候才能訪(fǎng)問(wèn)它。將新構(gòu)造的工作線(xiàn)程加入到工作線(xiàn)程集合中
          int s = worker.size(); //工作線(xiàn)程數(shù)量
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true; //新構(gòu)造的工作線(xiàn)程加入成功
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start(); //在被構(gòu)造為Worker工作線(xiàn)程,且被加入到工作線(xiàn)程集合中后,執(zhí)行線(xiàn)程任務(wù),注意這里的start實(shí)際上執(zhí)行Worker中run方法,所以接下來(lái)分析Worker的run方法
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted) //未能成功創(chuàng)建執(zhí)行工作線(xiàn)程
      addWorkerFailed(w); //在啟動(dòng)工作線(xiàn)程失敗后,將工作線(xiàn)程從集合中移除
  }
  return workerStarted;
}

在上面第35代碼中,工作線(xiàn)程被成功添加到工作線(xiàn)程集合中后,則開(kāi)始start執(zhí)行,這里start執(zhí)行的是Worker工作線(xiàn)程中的run方法。

//ThreadPoolExecutor$Worker,它繼承了AQS,同時(shí)實(shí)現(xiàn)了Runnable,所以它具備了這兩者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    setState(-1); //設(shè)置AQS的同步狀態(tài)為-1,禁止中斷,直到調(diào)用runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); //通過(guò)線(xiàn)程工廠(chǎng)來(lái)創(chuàng)建一個(gè)線(xiàn)程,將自身作為Runnable傳遞傳遞
  }
  public void run() {
    runWorker(this); //運(yùn)行工作線(xiàn)程
  }
}

ThreadPoolExecutor#runWorker,在此方法中,Worker在執(zhí)行完任務(wù)后,還會(huì)循環(huán)獲取任務(wù)隊(duì)列里的任務(wù)執(zhí)行(其中的getTask方法),也就是說(shuō)Worker不僅僅是在執(zhí)行完給它的任務(wù)就釋放或者結(jié)束,它不會(huì)閑著,而是繼續(xù)從任務(wù)隊(duì)列中獲取任務(wù),直到任務(wù)隊(duì)列中沒(méi)有任務(wù)可執(zhí)行時(shí),它才退出循環(huán)完成任務(wù)。理解了以上的源碼過(guò)后,往后線(xiàn)程池執(zhí)行原理的第二步、第三步的理解實(shí)則水到渠成。

以上這篇ThreadPoolExecutor線(xiàn)程池原理及其execute方法(詳解)就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持億速云。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI