溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的

發(fā)布時間:2021-10-25 11:45:57 來源:億速云 閱讀:166 作者:iii 欄目:編程語言

這篇文章主要講解了“定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的”吧!

1 簡介

ScheduledThreadPoolExecutor即定時線程池,是用來執(zhí)行延遲任務(wù)或周期性任務(wù)的。相比于Timer的單線程,定時線程池在遇到任務(wù)拋出異常的時候不會關(guān)閉整個線程池,更加健壯(需要提一下的是:ScheduledThreadPoolExecutor和ThreadPoolExecutor一樣,如果執(zhí)行任務(wù)的過程中拋異常的話,這個任務(wù)是會被丟棄的。所以在任務(wù)的執(zhí)行過程中需要對異常做捕獲處理,有必要的話需要做補償措施)。

傳進來的任務(wù)會被包裝為ScheduledFutureTask,其繼承于FutureTask,提供異步執(zhí)行的能力,并且可以返回執(zhí)行結(jié)果。同時實現(xiàn)了Delayed接口,可以通過getDelay方法來獲取延遲時間。

相比于ThreadPoolExecutor,ScheduledThreadPoolExecutor中使用的隊列是DelayedWorkQueue,是一個無界的隊列。所以在定時線程池中,最大線程數(shù)是沒有意義的(最大線程數(shù)會固定為int的最大值,且不會作為定時線程池的參數(shù))。在ThreadPoolExecutor中,如果當前線程數(shù)小于核心線程數(shù)就直接創(chuàng)建核心線程來執(zhí)行任務(wù),大于等于核心線程數(shù)的話才往阻塞隊列中放入任務(wù);而在ScheduledThreadPoolExecutor中卻不是這種邏輯。ScheduledThreadPoolExecutor中上來就會把任務(wù)放進延遲隊列中,然后再去等待執(zhí)行。

1.1 小頂堆

DelayedWorkQueue的實現(xiàn)有些特殊,是基于小頂堆構(gòu)建的(與DelayQueue和PriorityQueue類似)。因為要保證每次從延遲隊列中拿取到的任務(wù)是距現(xiàn)在最近的一個,所以使用小頂堆結(jié)構(gòu)來構(gòu)建是再適合不過了(堆結(jié)構(gòu)也常常用來解決前N小和前N大的問題)。小頂堆保證每個節(jié)點的值不小于其父節(jié)點的值,而不大于其孩子節(jié)點的值,而對于同級節(jié)點來說則沒有什么限制。這樣在小頂堆中值最小的點永遠保證是在根節(jié)點處。如果用數(shù)組來構(gòu)建小頂堆的話,值最小的點就在數(shù)組中的第一個位置處。

定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的

圖中紅色的數(shù)字代表節(jié)點在數(shù)組中的索引位置,由此可以看出堆的另一條性質(zhì)是:假設(shè)當前節(jié)點的索引是k,那么其父節(jié)點的索引是:(k-1)/2;左孩子節(jié)點的索引是:k* 2+1;而右孩子節(jié)點的索引是k*2+2。

構(gòu)建堆的兩個核心方法是 siftUpsiftDown,siftUp方法用于添加節(jié)點時的上溯過程;而siftDown方法用于刪除節(jié)點時的下溯過程。具體的實現(xiàn)源碼會在下面進行分析,這里就畫圖來理解一下(下面只會分析經(jīng)典的小頂堆添加和刪除節(jié)點的實現(xiàn),而在源碼中的實現(xiàn)略有不同,但核心都是一樣的):

1.1.1 添加節(jié)點

定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的

如果在上面的siftUp過程中,發(fā)現(xiàn)某一次當前節(jié)點的值就已經(jīng)大于了父節(jié)點的值,siftUp過程也就會提前終止了。同時可以看出:在上面的siftUp以及下面將要講的siftDown操作過程中,每次都只會比較并交換當前節(jié)點和其父子節(jié)點的值,而不是整個堆都發(fā)生變動,降低了時間復(fù)雜度。

1.1.2 刪除節(jié)點

刪除節(jié)點分為三種情況,首先來看一下 刪除根節(jié)點的情況

定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的

然后是 刪除最后一個節(jié)點的情況。刪除最后一個節(jié)點是最簡單的,只需要進行刪除就行了,因為這并不影響小頂堆的結(jié)構(gòu),不需要進行調(diào)整。這里就不再展示了(注意:刪除除了最后一個節(jié)點的其他葉子節(jié)點并不屬于當前這種情況,而是屬于下面第三種情況。也就是說刪除這些葉子節(jié)點并不能簡單地刪除它們就完了的,因為堆結(jié)構(gòu)首先得保證是一顆完全二叉樹)。

最后是 刪除既不是根節(jié)點又不是最后一個節(jié)點的情況

定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的

在刪除既不是根節(jié)點又不是最后一個節(jié)點的時候,可以看到執(zhí)行了一次siftDown并伴隨了一次siftUp的過程。但是這個siftUp過程并不是會一定觸發(fā)的,只有滿足最后一個節(jié)點的值比要刪除節(jié)點的父節(jié)點的值還要小的時候才會觸發(fā)siftUp操作(這個很好推理:在小頂堆中如果最后一個節(jié)點值比要刪除節(jié)點的父節(jié)點值要小的話,那么要刪除節(jié)點的左右孩子節(jié)點值也必然是都大于最后一個節(jié)點值的(不考慮值相等的情況),那么此時就不會發(fā)生siftDown操作;而如果發(fā)生了siftDown操作,就說明最后一個節(jié)點值至少要比要刪除節(jié)點的左右孩子節(jié)點中的一個要大(如果有左右孩子節(jié)點的話)。而孫子節(jié)點值是肯定要大于爺爺節(jié)點值的(不考慮值相等的情況),所以也就是說發(fā)生了siftDown操作的時候,最后一個節(jié)點值是比要刪除節(jié)點的父節(jié)點值大的。這個時候?qū)O子節(jié)點和最后一個節(jié)點siftDown交換后,依然是滿足小頂堆性質(zhì)的,所以就不需要附加的siftUp操作;還有一種情況是最后一個節(jié)點值是介于要刪除節(jié)點的父節(jié)點值和要刪除節(jié)點的左右孩子節(jié)點值中的較小者,那么這個時候既不會發(fā)生siftDown,也不會發(fā)生siftUp)。

而源碼中的實現(xiàn)和上面的經(jīng)典實現(xiàn)最大的不同就是不會有節(jié)點彼此交換的操作。在siftUp和siftDown的經(jīng)典實現(xiàn)中,如果需要變動節(jié)點時,都會來一次父子節(jié)點的互相交換操作(包括刪除節(jié)點時首先做的要刪除節(jié)點和最后一個節(jié)點之間的交換操作也是如此)。如果仔細思考的話,就會發(fā)現(xiàn)這其實是多余的。在需要交換節(jié)點的時候,只需要siftUp操作時的父節(jié)點或siftDown時的孩子節(jié)點重新移到當前需要比較的節(jié)點位置上,而比較節(jié)點是不需要移動到它們的位置上的。此時直接進入到下一次的判斷中,重復(fù)siftUp或siftDown過程,直到最后找到了比較節(jié)點的插入位置后,才會將其插入進去。這樣做的好處是可以省去一半的節(jié)點賦值的操作,提高了執(zhí)行的效率。同時這也就意味著,需要將要比較的節(jié)點作為參數(shù)保存起來,而源碼中也正是這么實現(xiàn)的。

1.2 Leader-Follower模式

ScheduledThreadPoolExecutor中使用了Leader-Follower模式。這是一種設(shè)計思想,假如說現(xiàn)在有一堆等待執(zhí)行的任務(wù)(一般是存放在一個隊列中排好序),而所有的工作線程中只會有一個是leader線程,其他的線程都是follower線程。只有l(wèi)eader線程能執(zhí)行任務(wù),而剩下的follower線程則不會執(zhí)行任務(wù),它們會處在休眠中的狀態(tài)。當leader線程拿到任務(wù)后執(zhí)行任務(wù)前,自己會變成follower線程,同時會選出一個新的leader線程,然后才去執(zhí)行任務(wù)。如果此時有下一個任務(wù),就是這個新的leader線程來執(zhí)行了,并以此往復(fù)這個過程。當之前那個執(zhí)行任務(wù)的線程執(zhí)行完畢再回來時,會判斷如果此時已經(jīng)沒任務(wù)了,又或者有任務(wù)但是有其他的線程作為leader線程,那么自己就休眠了;如果此時有任務(wù)但是沒有l(wèi)eader線程,那么自己就會重新成為leader線程來執(zhí)行任務(wù)。

不像ThreadPoolExecutor是需要立即執(zhí)行任務(wù)的,ScheduledThreadPoolExecutor中的任務(wù)是延遲執(zhí)行的,而拿取任務(wù)也是延遲拿取的。所以并不需要所有的線程都處于運行狀態(tài)延時等待獲取任務(wù)。而如果這么做的話,最后也只會有一個線程能執(zhí)行當前任務(wù),其他的線程還是會被再次休眠的(這里只是在說單任務(wù)多線程的情況,但對于多任務(wù)來說也是一樣的,總結(jié)來說就是 Leader-Follower模式只會喚醒真正需要“干事”的線程)。這是很沒有必要的,而且浪費資源。所以使用Leader-Follower模式的好處是:避免沒必要的喚醒和阻塞的操作,這樣會更加有效,且節(jié)省資源。

2 構(gòu)造器

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */ 4 public ScheduledThreadPoolExecutor(int corePoolSize) {
 5     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 6             new DelayedWorkQueue());
 7 }
 8 9 /**
10  * ThreadPoolExecutor:
11  */12 public ThreadPoolExecutor(int corePoolSize,13                           int maximumPoolSize,14                           long keepAliveTime,15                           TimeUnit unit,16                           BlockingQueue<Runnable> workQueue) {17     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,18             Executors.defaultThreadFactory(), defaultHandler);19 }

可以看到:ScheduledThreadPoolExecutor的構(gòu)造器是調(diào)用了父類ThreadPoolExecutor的構(gòu)造器來實現(xiàn)的,而父類的構(gòu)造器以及之中的所有參數(shù)我在之前分析ThreadPoolExecutor的源碼文章中講過,這里就不再贅述了。

3 schedule方法

execute方法和submit方法內(nèi)部都是調(diào)用的schedule方法,所以來看一下其實現(xiàn):

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */ 4 public ScheduledFuture<?> schedule(Runnable command,
 5                                    long delay,
 6                                    TimeUnit unit) {
 7     //非空校驗 8     if (command == null || unit == null)
 9         throw new NullPointerException();10     //包裝任務(wù)11     RunnableScheduledFuture<?> t = decorateTask(command,12             new ScheduledFutureTask<Void>(command, null,13                     triggerTime(delay, unit)));14     //延遲執(zhí)行15     delayedExecute(t);16     return t;17 }1819 /**
20  * 第13行代碼處:
21  * 延遲操作的觸發(fā)時間
22  */23 private long triggerTime(long delay, TimeUnit unit) {24     //delay非負處理25     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));26 }2728 long triggerTime(long delay) {29     /*
30     now方法內(nèi)部就一句話:“System.nanoTime();”,也就是獲取當前時間。這里也就是獲取
31     當前時間加上延遲時間后的結(jié)果。如果延遲時間超過了上限,會在overflowFree方法中處理
32      */33     return now() +34             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));35 }3637 private long overflowFree(long delay) {38     //獲取隊頭節(jié)點(不移除)39     Delayed head = (Delayed) super.getQueue().peek();40     if (head != null) {41         //獲取隊頭的剩余延遲時間42         long headDelay = head.getDelay(NANOSECONDS);43         /*
44         能走進本方法中,就說明delay是一個接近long最大值的數(shù)。此時判斷如果headDelay小于0
45         就說明延遲時間已經(jīng)到了或過期了但是還沒有執(zhí)行,并且delay和headDelay的差值小于0,說明headDelay
46         和delay的差值已經(jīng)超過了long的范圍
47          */48         if (headDelay < 0 && (delay - headDelay < 0))49             //此時更新一下delay的值,確保其和headDelay的差值在long的范圍內(nèi),同時delay也會重新變成一個正數(shù)50             delay = Long.MAX_VALUE + headDelay;51     }52     return delay;53 }5455 /**
56  * 第39行代碼處:
57  * 調(diào)用DelayedWorkQueue中覆寫的peek方法來獲取隊頭節(jié)點
58  */59 public RunnableScheduledFuture<?> peek() {60     final ReentrantLock lock = this.lock;61     lock.lock();62     try {63         return queue[0];64     } finally {65         lock.unlock();66     }67 }6869 /**
70  * 第42行代碼處:
71  * 可以看到本方法就是獲取延遲時間和當前時間的差值
72  */73 public long getDelay(TimeUnit unit) {74     return unit.convert(time - now(), NANOSECONDS);75 }

4 包裝任務(wù)

上面第11行和第12行代碼處會進行任務(wù)的包裝:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */ 4 ScheduledFutureTask(Runnable r, V result, long ns) {
 5     //調(diào)用父類FutureTask的構(gòu)造器 6     super(r, result);
 7     //這里會將延遲時間賦值給this.time 8     this.time = ns;
 9     //period用來表示任務(wù)的類型,為0表示延遲任務(wù),否則表示周期性任務(wù)10     this.period = 0;11     //這里會給每一個任務(wù)賦值一個唯一的序列號。當延遲時間相同時,會以該序列號來進行判斷。序列號小的會出隊12     this.sequenceNumber = sequencer.getAndIncrement();13 }1415 /**
16  * schedule方法第11行代碼處:
17  * 包裝任務(wù),這里只是返回task而已,子類可以覆寫本方法中的邏輯
18  */19 protected <V> RunnableScheduledFuture<V> decorateTask(20         Runnable runnable, RunnableScheduledFuture<V> task) {21     return task;22 }

5 delayedExecute方法

在schedule方法的第15行代碼處會執(zhí)行延遲任務(wù),添加任務(wù)和補充工作線程:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  */  4 private void delayedExecute(RunnableScheduledFuture<?> task) {
  5     if (isShutdown())
  6         /*
  7         這里會調(diào)用父類ThreadPoolExecutor的isShutdown方法來判斷當前線程池是否處于關(guān)閉或正在關(guān)閉的狀態(tài),
  8         如果是的話就執(zhí)行具體的拒絕策略
  9          */ 10         reject(task);
 11     else {
 12         //否則就往延遲隊列中添加當前任務(wù) 13         super.getQueue().add(task);
 14         /*
 15         添加后繼續(xù)判斷當前線程池是否處于關(guān)閉或正在關(guān)閉的狀態(tài),如果是的話就判斷此時是否還能繼續(xù)執(zhí)行任務(wù),
 16         如果不能的話就刪除上面添加的任務(wù)
 17          */ 18         if (isShutdown() &&
 19                 !canRunInCurrentRunState(task.isPeriodic()) &&
 20                 remove(task))
 21             //同時會取消此任務(wù)的執(zhí)行 22             task.cancel(false);
 23         else 24             //否則,說明線程池是可以繼續(xù)執(zhí)行任務(wù)的,就去判斷此時是否需要補充工作線程 25             ensurePrestart();
 26     }
 27 }
 28 29 /**
 30  * 第19行代碼處:
 31  * 傳進來的periodic表示任務(wù)是否是周期性任務(wù),如果是的話就是true(通過“period != 0”進行判斷)
 32  */ 33 boolean canRunInCurrentRunState(boolean periodic) {
 34     return isRunningOrShutdown(periodic ?
 35             //關(guān)閉線程池時判斷是否需要繼續(xù)執(zhí)行周期性任務(wù) 36             continueExistingPeriodicTasksAfterShutdown :
 37             //關(guān)閉線程池時判斷是否需要繼續(xù)執(zhí)行延遲任務(wù) 38             executeExistingDelayedTasksAfterShutdown);
 39 }
 40 41 /**
 42  * ThreadPoolExecutor:
 43  */ 44 final boolean isRunningOrShutdown(boolean shutdownOK) {
 45     //獲取當前線程池的運行狀態(tài) 46     int rs = runStateOf(ctl.get());
 47     //如果是RUNNING狀態(tài)的,或者是SHUTDOWN狀態(tài)并且是能繼續(xù)執(zhí)行任務(wù)的,就返回true 48     return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
 49 }
 50 51 /**
 52  * ScheduledThreadPoolExecutor:
 53  * 上面第20行代碼處的remove方法會調(diào)用ThreadPoolExecutor的remove方法,而該方法我在之前的
 54  * ThreadPoolExecutor的源碼分析文章中已經(jīng)分析過了。但是其中會調(diào)用延遲隊列覆寫的remove邏輯,
 55  * 也就是本方法(同時第130行代碼處也會調(diào)用到這里)
 56  */ 57 public boolean remove(Object x) {
 58     final ReentrantLock lock = this.lock;
 59     //加鎖 60     lock.lock();
 61     try {
 62         //獲取當前節(jié)點的堆索引位 63         int i = indexOf(x);
 64         if (i < 0)
 65             //如果找不到的話,就直接返回false 66             return false;
 67 68         //將當前節(jié)點的索引位設(shè)置為-1,因為下面要進行刪除了 69         setIndex(queue[i], -1);
 70         //size-1 71         int s = --size;
 72         //獲取小頂堆的最后一個節(jié)點,用于替換 73         RunnableScheduledFuture<?> replacement = queue[s];
 74         //將最后一個節(jié)點置為null 75         queue[s] = null;
 76         //如果要刪除的節(jié)點本身就是最后一個節(jié)點的話,就可以直接返回true了,因為不影響小頂堆的結(jié)構(gòu) 77         if (s != i) {
 78             /*
 79             否則執(zhí)行一次siftDown下溯過程,將最后一個節(jié)點的值重新插入到小頂堆中
 80             這其中會刪除i位置處的節(jié)點(siftDown方法后面會再次調(diào)用,到時候再來詳細分析該方法的實現(xiàn))
 81              */ 82             siftDown(i, replacement);
 83             /*
 84             經(jīng)過上面的siftDown的操作后,如果最后一個節(jié)點的延遲時間本身就比要刪除的節(jié)點的小的話,
 85             那么就會直接將最后一個節(jié)點放在要刪除節(jié)點的位置上。此時從刪除節(jié)點到其下面的節(jié)點都是滿足
 86             小頂堆結(jié)構(gòu)的,但是不能保證replacement也就是當前刪除后的替換節(jié)點和其父節(jié)點之間滿足小頂堆
 87             結(jié)構(gòu),也就是說可能出現(xiàn)replacement節(jié)點的延遲時間比其父節(jié)點的還小的情況
 88              */ 89             if (queue[i] == replacement)
 90                 //那么此時就調(diào)用一次siftUp上溯操作,再次調(diào)整replacement節(jié)點其上的小頂堆的結(jié)構(gòu)即可 91                 siftUp(i, replacement);
 92         }
 93         return true;
 94     } finally {
 95         //釋放鎖 96         lock.unlock();
 97     }
 98 }
 99100 /**
101  * 第63行代碼處:
102  */103 private int indexOf(Object x) {104     if (x != null) {105         if (x instanceof ScheduledFutureTask) {106             //如果當前節(jié)點是ScheduledFutureTask類型的,就獲取它的堆索引位107             int i = ((ScheduledFutureTask) x).heapIndex;108             //大于等于0和小于size說明當前節(jié)點還在小頂堆中,并且當前節(jié)點還在延遲隊列中的話,就直接返回該索引位109             if (i >= 0 && i < size && queue[i] == x)110                 return i;111         } else {112             //否則就按照普通遍歷的方式查找是否有相等的節(jié)點,如果有的話就返回索引位113             for (int i = 0; i < size; i++)114                 if (x.equals(queue[i]))115                     return i;116         }117     }118     //找不到的話就返回-1119     return -1;120 }121122 /**
123  * 第22行代碼處:
124  */125 public boolean cancel(boolean mayInterruptIfRunning) {126     //調(diào)用FutureTask的cancel方法來嘗試取消此任務(wù)的執(zhí)行127     boolean cancelled = super.cancel(mayInterruptIfRunning);128     //如果取消成功了,并且允許刪除節(jié)點,并且當前節(jié)點存在于小頂堆中的話,就刪除它129     if (cancelled && removeOnCancel && heapIndex >= 0)130         remove(this);131     return cancelled;132 }133134 /**
135  * ThreadPoolExecutor:
136  * 第25行代碼處:
137  */138 void ensurePrestart() {139     //獲取當前線程池的工作線程數(shù)140     int wc = workerCountOf(ctl.get());141     if (wc < corePoolSize)142         /*
143         如果小于核心線程數(shù),就添加一個核心線程,之前我在分析ThreadPoolExecutor的源碼文章中講過,
144         addWorker方法的執(zhí)行中會同時啟動運行線程。這里傳入的firstTask參數(shù)為null,因為不需要立即執(zhí)行任務(wù),
145         而是從延遲隊列中拿取任務(wù)
146          */147         addWorker(null, true);148     else if (wc == 0)149         //如果當前沒有工作線程,就去添加一個非核心線程,然后運行它。保證至少要有一個線程150         addWorker(null, false);151     /*
152     從這里可以看出,如果當前的工作線程數(shù)已經(jīng)達到了核心線程數(shù)后,就不會再創(chuàng)建工作線程了
153     定時線程池最多只有“核心線程數(shù)”個線程,也就是通過構(gòu)造器傳進來的參數(shù)大小
154      */155 }

6 添加任務(wù)

因為延遲隊列是用小頂堆構(gòu)建的,所以添加的時候會涉及到 小頂堆的調(diào)整:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  * 這里會調(diào)用DelayedWorkQueue的add方法
  4  */  5 public boolean add(Runnable e) {
  6     return offer(e);
  7 }
  8  9 public boolean offer(Runnable x) {
 10     //非空校驗 11     if (x == null)
 12         throw new NullPointerException();
 13     //強轉(zhuǎn)類型 14     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
 15     final ReentrantLock lock = this.lock;
 16     //加鎖 17     lock.lock();
 18     try {
 19         //獲取當前的任務(wù)數(shù)量 20         int i = size;
 21         //判斷是否需要擴容(初始容量為16) 22         if (i >= queue.length)
 23             grow();
 24         //size+1 25         size = i + 1;
 26         if (i == 0) {
 27             //如果當前是第一個任務(wù)的話,就直接放在小頂堆的根節(jié)點位置處就行了(隊列第一個位置) 28             queue[0] = e;
 29             //同時設(shè)置一下當前節(jié)點的堆索引位為0 30             setIndex(e, 0);
 31         } else {
 32             //否則就用siftUp的方式來插入到應(yīng)該插入的位置 33             siftUp(i, e);
 34         }
 35         //經(jīng)過上面的插入過程之后,如果小頂堆的根節(jié)點還是當前新添加節(jié)點的話,說明新添加節(jié)點的延遲時間是最短的 36         if (queue[0] == e) {
 37             //那么此時不管有沒有l(wèi)eader線程,都得將其置為null 38             leader = null;
 39             /*
 40             并且重新將條件隊列上的一個節(jié)點轉(zhuǎn)移到CLH隊列中(如果當前只有一個節(jié)點的時候也會進入到signal方法中
 41             但無妨,因為此時條件隊列中還沒有節(jié)點,所以并不會做什么)需要提一點的是:如果真的看過signal方法內(nèi)部實現(xiàn)
 42             的話就會知道,signal方法在常規(guī)情況下并不是在做喚醒線程的工作,喚醒是在下面的unlock方法中實現(xiàn)的
 43              */ 44             available.signal();
 45         }
 46     } finally {
 47         /*
 48         釋放鎖(注意,這里只會喚醒CLH隊列中的head節(jié)點的下一個節(jié)點,可能是上面被鎖住的添加任務(wù)的其他線程、
 49         也可能是上次執(zhí)行完任務(wù)后準備再次拿取任務(wù)的線程,還有可能是等待被喚醒的follower線程,又或者有其他的
 50         情況。但不管是哪個,只要能保證喚醒動作是一直能被傳播下去的就行。ReentrantLock和阻塞隊列的執(zhí)行細節(jié)
 51         詳見我之前對AQS源碼進行分析的文章)
 52          */ 53         lock.unlock();
 54     }
 55     return true;
 56 }
 57 58 /**
 59  * 第23行代碼處:
 60  */ 61 private void grow() {
 62     int oldCapacity = queue.length;
 63     //可以看到這里的擴容策略是*1.5的方式 64     int newCapacity = oldCapacity + (oldCapacity >> 1);
 65     //如果擴容后的新容量溢出了,就將其恢復(fù)為int的最大值 66     if (newCapacity < 0)
 67         newCapacity = Integer.MAX_VALUE;
 68     //使用Arrays.copyOf(System.arraycopy)的方式來進行數(shù)組的拷貝 69     queue = Arrays.copyOf(queue, newCapacity);
 70 }
 71 72 /**
 73  * 第30行、第99行和第109行代碼處:
 74  * 設(shè)置f節(jié)點在小頂堆中的索引位為idx,這樣在最后的刪除節(jié)點時可以通過index是否大于0來判斷當前節(jié)點是否仍在小頂堆中
 75  */ 76 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
 77     if (f instanceof ScheduledFutureTask)
 78         ((ScheduledFutureTask) f).heapIndex = idx;
 79 }
 80 81 /**
 82  * 第33行代碼處:
 83  * 堆排序的精髓就在于siftUp和siftDown方法,但本實現(xiàn)與常規(guī)的實現(xiàn)略有不同,多了一個入?yún)ey
 84  * key代表當前要插入節(jié)點中的任務(wù)
 85  */ 86 private void siftUp(int k, RunnableScheduledFuture<?> key) {
 87     //當k<=0的時候說明已經(jīng)上溯到根節(jié)點了 88     while (k > 0) {
 89         //獲取父節(jié)點的索引((當前節(jié)點索引位-1)/2的方式) 90         int parent = (k - 1) >>> 1;
 91         //獲取父節(jié)點的任務(wù) 92         RunnableScheduledFuture<?> e = queue[parent];
 93         //如果當前要插入節(jié)點中的任務(wù)延遲時間大于父節(jié)點的延遲時間的話,就停止上溯過程,說明找到了插入的位置 94         if (key.compareTo(e) >= 0)
 95             break;
 96         //否則就需要將父節(jié)點的內(nèi)容賦值給當前節(jié)點 97         queue[k] = e;
 98         //同時設(shè)置一下父節(jié)點的堆索引位為當前節(jié)點處 99         setIndex(e, k);100         //然后將父節(jié)點賦值給當前節(jié)點,繼續(xù)下一次的上溯過程101         k = parent;102     }103     /*
104     走到這里說明有兩種情況:<1>已經(jīng)結(jié)束了上溯的過程,但最后一次的父節(jié)點還沒有賦值,這里就是進行賦值的操作;
105     <2>如果本方法進來的時候要添加的最后一個節(jié)點本身就滿足小頂堆條件的話,那么該處就是在給最后一個節(jié)點進行賦值
106      */107     queue[k] = key;108     //同時設(shè)置一下要插入節(jié)點的堆索引位109     setIndex(key, k);110 }111112 /**
113  * 第94行代碼處:
114  */115 public int compareTo(Delayed other) {116     //如果比較的就是當前對象,就直接返回0相等117     if (other == this)118         return 0;119     if (other instanceof ScheduledFutureTask) {120         //如果需要比較的任務(wù)也是ScheduledFutureTask類型的話,就首先強轉(zhuǎn)一下類型121         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;122         //計算當前任務(wù)和需要比較的任務(wù)之間的延遲時間差123         long diff = time - x.time;124         if (diff < 0)125             //小于0說明當前任務(wù)的延遲時間更短,就返回-1126             return -1;127         else if (diff > 0)128             //大于0說明需要比較的任務(wù)的延遲時間更短,就返回1129             return 1;130         //如果兩者相等的話,就比較序列號,誰的序列號更小(序列號是唯一的),就應(yīng)該先被執(zhí)行131         else if (sequenceNumber < x.sequenceNumber)132             return -1;133         else134             return 1;135     }136     //如果需要比較的任務(wù)不是ScheduledFutureTask類型的話,就通過getDelay的方式來進行比較137     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);138     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;139 }

7 拿取任務(wù)

在上面的ensurePrestart方法中會調(diào)用到addWorker方法,以此來補充工作線程。之前我對ThreadPoolExecutor源碼進行分析的文章中說到過 ,addWorker方法會調(diào)用到getTask方法來從隊列中拿取任務(wù):

  1 /**
  2  * ThreadPoolExecutor:
  3  */  4 private Runnable getTask() {
  5     //...  6     /*
  7     這里的allowCoreThreadTimeOut默認為false(為true表示空閑的核心線程也是要超時銷毀的),
  8     而上面說過定時線程池最多只有“核心線程數(shù)”個線程,所以timed為false
  9      */ 10     boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 11     //... 12     //因為timed為false,所以這里會走take方法中的邏輯 13     Runnable r = timed ?
 14             workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 15             workQueue.take();
 16     //... 17 }
 18 19 /**
 20  * ScheduledThreadPoolExecutor:
 21  * 第15行代碼處:
 22  * 上面的take方法會調(diào)用到DelayedWorkQueue的take方法,而該方法也就是用來實現(xiàn)延遲拿取任務(wù)的
 23  */ 24 public RunnableScheduledFuture<?> take() throws InterruptedException {
 25     final ReentrantLock lock = this.lock;
 26     //加鎖(響應(yīng)中斷模式) 27     lock.lockInterruptibly();
 28     try {
 29         for (; ; ) {
 30             //獲取隊頭節(jié)點 31             RunnableScheduledFuture<?> first = queue[0];
 32             if (first == null)
 33                 /*
 34                 如果當前延遲隊列中沒有延遲任務(wù),就在這里阻塞當前線程(通過AQS中條件隊列的方式),等待有任務(wù)時被喚醒
 35                 另外,當線程執(zhí)行完任務(wù)后也會再次走到getTask方法中的本方法中。如果此時沒任務(wù)了,就會在此被阻塞休眠住
 36                 (我在之前AQS源碼分析的文章中說過:await方法中會釋放掉所有的ReentrantLock鎖資源,然后才會被阻塞?。?
 37                  */ 38                 available.await();
 39             else {
 40                 //否則就獲取隊頭的剩余延遲時間 41                 long delay = first.getDelay(NANOSECONDS);
 42                 //如果延遲時間已經(jīng)到了的話,就刪除并返回隊頭,表示拿取到了任務(wù) 43                 if (delay <= 0)
 44                     return finishPoll(first);
 45                 /*
 46                 這里將隊頭節(jié)點的引用置為null,如果不置為null的話,可能有多個等待著的線程同時持有著隊頭節(jié)點的
 47                 first引用,這樣如果要刪除隊頭節(jié)點的話,因為其還有其他線程的引用,所以不能被及時回收,造成內(nèi)存泄漏
 48                  */ 49                 first = null;
 50                 /*
 51                 如果leader不為null,說明有其他的線程已經(jīng)成為了leader線程,正在延遲等待著
 52                 同時此時沒有新的延遲時間最短的節(jié)點進入到延遲隊列中
 53                  */ 54                 if (leader != null)
 55                     /*
 56                     那么當前線程就變成了follower線程,需要被阻塞住,等待被喚醒(同上,其中會釋放掉所有的鎖資源)
 57                     線程執(zhí)行完任務(wù)后也會再次走到本方法中拿取任務(wù),如果走到這里發(fā)現(xiàn)已經(jīng)有別的leader線程了,
 58                     那么當前線程也會被阻塞休眠??;否則就會在下面的else分支中再次成為leader線程
 59                      */ 60                     available.await();
 61                 else {
 62                     /*
 63                     leader為null,可能是上一個leader線程拿取到任務(wù)后喚醒的下一個線程,也有可能
 64                     是一個新的延遲時間最短的節(jié)點進入到延遲隊列中,從而將leader置為null
 65 
 66                     此時獲取當前線程
 67                      */ 68                     Thread thisThread = Thread.currentThread();
 69                     //并將leader置為當前線程,也就是當前線程成為了leader線程 70                     leader = thisThread;
 71                     try {
 72                         /*
 73                         這里也就是在做具體的延時等待delay納秒的操作了,具體涉及到AQS中條件隊列的相關(guān)操作
 74                         如果被喚醒的話可能是因為到達了延遲時間從而醒來;也有可能是被別的線程signal喚醒了;
 75                         還有可能是中斷被喚醒。正常情況下是等到達了延遲時間后,這里會醒來并進入到下一次循環(huán)中的
 76                         finishPoll方法中,剔除隊頭節(jié)點并最終返回(awaitNanos方法和await方法類似,其中會釋放掉
 77                         所有的鎖資源;不一樣的是在被喚醒時會把當前節(jié)點從條件隊列中“轉(zhuǎn)移”到CLH隊列中。這里可以認為
 78                         是轉(zhuǎn)移,因為在條件隊列中的該節(jié)點狀態(tài)已經(jīng)改為了0,相當于是個垃圾節(jié)點,后續(xù)會進行刪除)
 79                          */ 80                         available.awaitNanos(delay);
 81                     } finally {
 82                         /*
 83                         不管awaitNanos是如何被喚醒的,此時會判斷當前的leader線程是否還是當前線程
 84                         如果是的話就將leader置為null,也就是當前線程不再是leader線程了
 85                          */ 86                         if (leader == thisThread)
 87                             leader = null;
 88                     }
 89                 }
 90             }
 91         }
 92     } finally {
 93         //在退出本方法之前,判斷如果leader線程為null并且刪除隊頭后的延遲隊列仍然不為空的話(說明此時有其他的延遲任務(wù)) 94         if (leader == null && queue[0] != null)
 95             //就將條件隊列上的一個節(jié)點轉(zhuǎn)移到CLH隊列中(同時會剔除上面的垃圾條件節(jié)點) 96             available.signal();
 97         /*
 98         釋放鎖(同offer方法中的邏輯,這里只會喚醒CLH隊列中的head節(jié)點的下一個節(jié)點。這里就體現(xiàn)了
 99         Leader-Follower模式:當leader線程拿取到任務(wù)后準備要執(zhí)行時,會首先喚醒剩下線程中的一個,
100         它將會成為新的leader線程,并以此往復(fù)。保證在任何時間都只有一個leader線程,避免不必要的喚醒與睡眠)
101          */102         lock.unlock();103     }104 }105106 /**
107  * 第44行代碼處:
108  */109 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {110     //size-1111     int s = --size;112     //獲取隊列中的最后一個節(jié)點113     RunnableScheduledFuture<?> x = queue[s];114     //并置空它,便于GC,這里也就是在刪除最后一個節(jié)點115     queue[s] = null;116     //如果刪除前延遲隊列中有不止一個節(jié)點的話,就進入到siftDown方法中,將小頂堆中的根節(jié)點刪除,并且重新維護小頂堆117     if (s != 0)118         siftDown(0, x);119     //同時設(shè)置一下刪除前的根節(jié)點的堆索引位為-1,表示其不存在于小頂堆中了120     setIndex(f, -1);121     //最后將其返回出去122     return f;123 }124125 /**
126  * 第118行代碼處:
127  * 方法參數(shù)中的key代表刪除的最后一個節(jié)點中的任務(wù)
128  */129 private void siftDown(int k, RunnableScheduledFuture<?> key) {130     /*
131     這里會取數(shù)組長度的一半half(注意這里的size是已經(jīng)刪除最后一個節(jié)點后的size),
132     而half也就是在指向最后一個非葉子節(jié)點的下一個節(jié)點
133      */134     int half = size >>> 1;135     //從這里可以看出下溯的終止條件是k大于等于half,也就是此時遍歷到已經(jīng)沒有了非葉子節(jié)點,自然不需要進行調(diào)整136     while (k < half) {137         //獲取左孩子節(jié)點的索引位138         int child = (k << 1) + 1;139         //獲取左孩子節(jié)點的任務(wù)140         RunnableScheduledFuture<?> c = queue[child];141         //獲取右孩子節(jié)點的索引位142         int right = child + 1;143         //如果右孩子節(jié)點的索引位小于size,也就是在說當前節(jié)點含有右子樹。并且左孩子節(jié)點的任務(wù)延遲時間大于右孩子節(jié)點的話144         if (right < size && c.compareTo(queue[right]) > 0)145             //就將c重新指向為右孩子節(jié)點146             c = queue[child = right];147         /*
148         走到這里說明c指向的是左右子節(jié)點中、任務(wù)延遲時間較小的那個節(jié)點。此時判斷如果最后一個節(jié)點的
149         任務(wù)延遲時間小于等于這個較小節(jié)點的話,就可以停止下溯了,說明找到了插入的位置
150          */151         if (key.compareTo(c) <= 0)152             break;153         //否則就把較小的那個節(jié)點賦值給當前節(jié)點處154         queue[k] = c;155         //同時設(shè)置一下延遲時間較小的那個節(jié)點的堆索引位為當前節(jié)點處156         setIndex(c, k);157         //然后將當前節(jié)點指向那個較小的節(jié)點,繼續(xù)下一次循環(huán)158         k = child;159     }160     /*
161     同siftUp方法一樣,走到這里說明有兩種情況:<1>已經(jīng)結(jié)束了下溯的過程,但最后一次的子節(jié)點還沒有賦值,
162     這里會把其賦值為之前刪除的最后一個節(jié)點;
163     <2>如果根節(jié)點的左右子節(jié)點中、任務(wù)延遲時間較小的那個節(jié)點本身的延遲時間就比之前刪除節(jié)點大的話,
164     就會把根節(jié)點替換為之前刪除的最后一個節(jié)點
165     所以本方法加上finishPoll方法,實際上并沒有將最后一個節(jié)點刪除,最后一個節(jié)點中的任務(wù)一直都是保留著的
166     (也就是key),而是變相地將堆的根節(jié)點刪除了(在第一種情況中根節(jié)點在第一次賦值為左右子節(jié)點中、
167     任務(wù)延遲時間較小的那個節(jié)點時,就已經(jīng)被覆蓋了)
168      */169     queue[k] = key;170     //同時設(shè)置一下最后一個節(jié)點現(xiàn)在新的堆索引位171     setIndex(key, k);172 }

8 執(zhí)行延遲任務(wù)

拿取到任務(wù)之后,就是具體的執(zhí)行任務(wù)了。addWorker方法具體的執(zhí)行邏輯我在之前ThreadPoolExecutor的源碼分析文章中已經(jīng)講過了,其中執(zhí)行任務(wù)的時候會調(diào)用task的run方法,也就是這里包裝為ScheduledFutureTask的run方法:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */ 4 public void run() {
 5     //判斷是否是周期性任務(wù) 6     boolean periodic = isPeriodic();
 7     if (!canRunInCurrentRunState(periodic)) { 8         //如果此時不能繼續(xù)執(zhí)行任務(wù)的話,就嘗試取消此任務(wù)的執(zhí)行 9         cancel(false);10     } else if (!periodic)11         /*
12         如果是延遲任務(wù),就調(diào)用ScheduledFutureTask父類FutureTask的run方法,
13         其中會通過call方法來最終調(diào)用到使用者具體寫的任務(wù)
14          */15         ScheduledFutureTask.super.run();16     else if (ScheduledFutureTask.super.runAndReset()) {17         //周期性任務(wù)的執(zhí)行放在下一節(jié)中進行分析18         setNextRunTime();19         reExecutePeriodic(outerTask);20     }21 }

9 scheduleAtFixedRate & scheduleWithFixedDelay方法

scheduleAtFixedRate方法是以上次的延遲時間點開始,延遲指定時間后再次執(zhí)行當前任務(wù);而scheduleWithFixedDelay方法是以上個周期任務(wù)執(zhí)行完畢后的時間點開始,延遲指定時間后再次執(zhí)行當前任務(wù)。因為這兩個方法的實現(xiàn)絕大部分都是一樣的,所以合在一起來進行分析:

  1 /**
  2  * ScheduledThreadPoolExecutor:
  3  * scheduleAtFixedRate方法
  4  */  5 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  6                                               long initialDelay,
  7                                               long period,
  8                                               TimeUnit unit) {
  9     //非空校驗 10     if (command == null || unit == null)
 11         throw new NullPointerException();
 12     //非負校驗 13     if (period <= 0)
 14         throw new IllegalArgumentException();
 15     //包裝任務(wù) 16     ScheduledFutureTask<Void> sft =
 17             new ScheduledFutureTask<Void>(command,
 18                     null,
 19                     triggerTime(initialDelay, unit),
 20                     unit.toNanos(period));
 21     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 22     //把任務(wù)賦值給ScheduledFutureTask的outerTask屬性 23     sft.outerTask = t;
 24     //延遲執(zhí)行 25     delayedExecute(t);
 26     return t;
 27 }
 28 29 /**
 30  * scheduleWithFixedDelay方法
 31  */ 32 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 33                                                  long initialDelay,
 34                                                  long delay,
 35                                                  TimeUnit unit) {
 36     //非空校驗 37     if (command == null || unit == null)
 38         throw new NullPointerException();
 39     //非負校驗 40     if (delay <= 0)
 41         throw new IllegalArgumentException();
 42     //包裝任務(wù) 43     ScheduledFutureTask<Void> sft =
 44             new ScheduledFutureTask<Void>(command,
 45                     null,
 46                     triggerTime(initialDelay, unit),
 47                     unit.toNanos(-delay));
 48     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 49     //把任務(wù)賦值給ScheduledFutureTask的outerTask屬性 50     sft.outerTask = t;
 51     //延遲執(zhí)行 52     delayedExecute(t);
 53     return t;
 54 }
 55 56 /**
 57  * 第17行和第44行代碼處:
 58  */ 59 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 60     super(r, result);
 61     this.time = ns;
 62     /*
 63     可以看到這里與schedule方法中調(diào)用ScheduledFutureTask構(gòu)造器的區(qū)別是多了一個period入?yún)?
 64     在schedule方法中this.period賦值為0,而這里會賦值為周期時間。其他的代碼都是一樣的
 65     如果細心的話可以看出:在上面scheduleAtFixedRate方法傳入的period是一個大于0的數(shù),而
 66     scheduleWithFixedDelay方法傳入的period是一個小于0的數(shù),以此來進行區(qū)分
 67      */ 68     this.period = period;
 69     this.sequenceNumber = sequencer.getAndIncrement();
 70 }

10 執(zhí)行周期性任務(wù)

周期性任務(wù)和延遲任務(wù)的拿取任務(wù)邏輯都是一樣的,而在下面具體運行任務(wù)時有所不同,下面就來看一下其實現(xiàn)的差異:

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  */ 4 public void run() {
 5     boolean periodic = isPeriodic();
 6     if (!canRunInCurrentRunState(periodic))
 7         cancel(false);
 8     else if (!periodic)
 9         ScheduledFutureTask.super.run();10     /*
11     前面都是之前分析過的,而周期性任務(wù)會走下面的分支中
12
13     FutureTask的runAndReset方法相比于run方法來說,區(qū)別在于可以重復(fù)計算(run方法不能復(fù)用)
14     因為runAndReset方法在計算完成后不會修改狀態(tài),狀態(tài)一直都是NEW
15      */16     else if (ScheduledFutureTask.super.runAndReset()) {17         //設(shè)置下次的運行時間點18         setNextRunTime();19         //重新添加任務(wù)20         reExecutePeriodic(outerTask);21     }22 }2324 /**
25  * 第18行代碼處:
26  */27 private void setNextRunTime() {28     /*
29     這里會獲取period,也就是之前設(shè)置的周期時間。上面說過,通過period的正負就可以區(qū)分出到底調(diào)用的是
30     scheduleAtFixedRate方法還是scheduleWithFixedDelay方法
31      */32     long p = period;33     if (p > 0)34         /*
35         如果調(diào)用的是scheduleAtFixedRate方法,下一次的周期任務(wù)時間點就是起始的延遲時間加上周期時間,需要注意的是:
36         如果任務(wù)執(zhí)行的時間大于周期時間period的話,那么定時線程池就不會按照原先設(shè)計的延遲時間進行執(zhí)行,而是會按照近似于
37         任務(wù)執(zhí)行的時間來作為延遲的間隔(不管核心線程有多少個都是如此,因為任務(wù)是放在延遲隊列中的、是線性執(zhí)行的)
38          */39         time += p;40     else41         /*
42         triggerTime方法之前分析過是獲取當前時間+延遲時間后的結(jié)果,而此時是在執(zhí)行完任務(wù)后,也就是說:
43         如果調(diào)用的是scheduleWithFixedDelay方法,下一次的周期任務(wù)時間點就是執(zhí)行完上次任務(wù)后的時間點加上周期時間
44         由此可以看出,scheduleAtFixedRate方法和scheduleWithFixedDelay方法的區(qū)別就在于下一次time設(shè)置的不同而已
45          */46         time = triggerTime(-p);47     //time屬性會記錄到節(jié)點中,在小頂堆中通過compareTo方法來進行排序48 }4950 /**
51  * 第20行代碼處:
52  */53 void reExecutePeriodic(RunnableScheduledFuture<?> task) {54     //判斷此時是否還能繼續(xù)執(zhí)行任務(wù)55     if (canRunInCurrentRunState(true)) {56         /*
57         這里也就是重新往延遲隊列中添加任務(wù),以此達到周期執(zhí)行的效果。添加之后在getTask方法中的take方法中
58         就又可以拿到這個任務(wù)。設(shè)置下次的執(zhí)行時間,然后再添加任務(wù)...周而復(fù)始
59          */60         super.getQueue().add(task);61         //添加后繼續(xù)判斷此時是否還能繼續(xù)執(zhí)行任務(wù),如果不能的話就刪除上面添加的任務(wù)62         if (!canRunInCurrentRunState(true) && remove(task))63             //同時會取消此任務(wù)的執(zhí)行64             task.cancel(false);65         else66             //否則,說明線程池是可以繼續(xù)執(zhí)行任務(wù)的,就去判斷此時是否需要補充工作線程67             ensurePrestart();68     }69 }

注意:網(wǎng)上的一種說法是: scheduleAtFixedRate方法是以上一個任務(wù)開始的時間計時,period時間過去后,檢測上一個任務(wù)是否執(zhí)行完畢。如果上一個任務(wù)執(zhí)行完畢,則當前任務(wù)立即執(zhí)行;如果上一個任務(wù)沒有執(zhí)行完畢,則需要等上一個任務(wù)執(zhí)行完畢后立即執(zhí)行。實際上這種說法是錯誤的,盡管它的表象是對的。正確的說法是: 如果任務(wù)的執(zhí)行時間小于周期時間的話,則會以上次任務(wù)執(zhí)行開始時間加上周期時間后,再去執(zhí)行下一次任務(wù);而如果任務(wù)的執(zhí)行時間大于周期時間的話,則會等到上次任務(wù)執(zhí)行完畢后立即(近似于)執(zhí)行下次任務(wù)。這兩種說法的區(qū)別就在于任務(wù)的執(zhí)行時間大于周期時間的時候,檢測上一個任務(wù)是否完畢的時機不同。實際上在period時間過去后,根本不會有任何的檢測機制。因為只有等上次任務(wù)執(zhí)行完畢后才會往延遲隊列中添加下一次任務(wù),從而觸發(fā)各種后續(xù)的動作。所以在period時間點時,當前線程還在執(zhí)行任務(wù)中,而其他的線程因為延遲隊列中為空會處于休眠的狀態(tài)(假如就只有一個周期任務(wù)的話)。所以根本不會有所謂的“檢測”的說法,這種說法也只能說是想當然了。還是那句話:“Talk is cheap. Show me the code.”

既然都說到這里了,那么現(xiàn)在就想來嘗試分析一下如果任務(wù)的執(zhí)行時間大于周期時間的話,具體是怎樣的一個執(zhí)行流程?

為了便于分析,假設(shè)現(xiàn)在是只有一個周期任務(wù)的場景,那么延遲隊列中的任務(wù)數(shù)量最多就只會有1個:拿取到任務(wù),延遲隊列中就變?yōu)榭?。?zhí)行完任務(wù)的時候,就又會往隊列中放一個任務(wù)。這樣其他搶不到任務(wù)的線程就會被休眠住。而添加任務(wù)的時候因為每次重新添加的任務(wù)都是小頂堆的根節(jié)點(從無到有),即添加的這個任務(wù)就是此時延遲時間最短的任務(wù),所以同時會觸發(fā)嘗試喚醒線程的動作。

同時在添加下一個任務(wù)前會修改下一次的時間點。在setNextRunTime方法中,scheduleAtFixedRate方法是以上一次的延遲時間點加上周期時間來作為下一次的延遲時間點的,并不是scheduleWithFixedDelay方法獲取當前時間加上周期時間的方式。在當前這種情況下周期時間是要小于任務(wù)的執(zhí)行時間的,也就是說會造成下一次的延遲時間點會賦值為一個已經(jīng)過期的時間。且隨著周期的增加,下一次的延遲時間點會離當前時間點越來越遠。既然下一次的延遲時間點已經(jīng)過期了,那么就會去立馬執(zhí)行任務(wù)。

所以總結(jié)一下:需要被喚醒的線程和上次執(zhí)行完任務(wù)的線程就會去爭搶鎖資源(喚醒線程會把當前節(jié)點放進CLH隊列中,上次執(zhí)行完任務(wù)的線程也會再次走到lockInterruptibly方法中(在它重新放任務(wù)的時候也會經(jīng)歷一次lock),同時因為是ReentrantLock非公平鎖,這樣在調(diào)用unlock解鎖時就會出現(xiàn)在CLH隊列上的搶資源現(xiàn)象了),搶到的就會立馬去執(zhí)行下一次的周期任務(wù),而不會有任何的延時,造成的表象就是會以一個近似于任務(wù)執(zhí)行時間為間隔的周期來執(zhí)行任務(wù)。

11 shutdown方法

 1 /**
 2  * ScheduledThreadPoolExecutor:
 3  * 可以看到,定時線程池的shutdown方法是使用的父類ThreadPoolExecutor的shutdown方法,
 4  * 而該方法我在之前的ThreadPoolExecutor的源碼分析文章中已經(jīng)分析過了。但是其中會調(diào)用
 5  * onShutdown的鉤子方法,也就是在ScheduledThreadPoolExecutor中的實現(xiàn)
 6  */ 7 public void shutdown() {
 8     super.shutdown();
 9 }1011 @Override12 void onShutdown() {13     //獲取延遲隊列14     BlockingQueue<Runnable> q = super.getQueue();15     //關(guān)閉線程池時判斷是否需要繼續(xù)執(zhí)行延遲任務(wù)16     boolean keepDelayed =17             getExecuteExistingDelayedTasksAfterShutdownPolicy();18     //關(guān)閉線程池時判斷是否需要繼續(xù)執(zhí)行周期性任務(wù)19     boolean keepPeriodic =20             getContinueExistingPeriodicTasksAfterShutdownPolicy();21     if (!keepDelayed && !keepPeriodic) {22         //如果都不需要的話,就將延遲隊列中的任務(wù)逐個取消(并刪除)23         for (Object e : q.toArray())24             if (e instanceof RunnableScheduledFuture<?>)25                 ((RunnableScheduledFuture<?>) e).cancel(false);26         //最后做清理工作27         q.clear();28     } else {29         for (Object e : q.toArray()) {30             if (e instanceof RunnableScheduledFuture) {31                 //否則就判斷如果任務(wù)是RunnableScheduledFuture類型的,就強轉(zhuǎn)一下類型32                 RunnableScheduledFuture<?> t =33                         (RunnableScheduledFuture<?>) e;34                 //如果關(guān)閉線程池時不需要繼續(xù)執(zhí)行任務(wù),又或者需要繼續(xù)執(zhí)行但是任務(wù)已經(jīng)取消了35                 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||36                         t.isCancelled()) {37                     //就刪除當前節(jié)點38                     if (q.remove(t))39                         //同時取消任務(wù)40                         t.cancel(false);41                 }42             }43         }44     }45     //根據(jù)線程池狀態(tài)來判斷是否應(yīng)該結(jié)束線程池46     tryTerminate();47 }4849 /**
50  * 第27行代碼處:
51  */52 public void clear() {53     final ReentrantLock lock = this.lock;54     //加鎖55     lock.lock();56     try {57         for (int i = 0; i < size; i++) {58             //遍歷獲得延遲隊列中的每一個節(jié)點59             RunnableScheduledFuture<?> t = queue[i];60             if (t != null) {61                 //將節(jié)點置為null62                 queue[i] = null;63                 //同時將索引位置為-1(recheck)64                 setIndex(t, -1);65             }66         }67         //size賦為初始值068         size = 0;69     } finally {70         //釋放鎖71         lock.unlock();72     }73 }

感謝各位的閱讀,以上就是“定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對定時線程池是怎么實現(xiàn)延遲執(zhí)行和周期執(zhí)行的這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI