溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

CountDownLatch的實現(xiàn)原理是什么

發(fā)布時間:2021-07-12 10:48:39 來源:億速云 閱讀:146 作者:chen 欄目:編程語言

這篇文章主要講解了“CountDownLatch的實現(xiàn)原理是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“CountDownLatch的實現(xiàn)原理是什么”吧!

 前言

CountDownLatch是多線程中一個比較重要的概念,它可以使得一個或多個線程等待其他線程執(zhí)行完畢之后再執(zhí)行。它內(nèi)部有一個計數(shù)器和一個阻塞隊列,每當(dāng)一個線程調(diào)用countDown()方法后,計數(shù)器的值減少1。當(dāng)計數(shù)器的值不為0時,調(diào)用await()方法的線程將會被加入到阻塞隊列,一直阻塞到計數(shù)器的值為0。

常用方法

public class CountDownLatch {      //構(gòu)造一個值為count的計數(shù)器     public CountDownLatch(int count);      //阻塞當(dāng)前線程直到計數(shù)器為0     public void await() throws InterruptedException;      //在單位為unit的timeout時間之內(nèi)阻塞當(dāng)前線程     public boolean await(long timeout, TimeUnit unit);      //將計數(shù)器的值減1,當(dāng)計數(shù)器的值為0時,阻塞隊列內(nèi)的線程才可以運行     public void countDown();        }

下面給一個簡單的示例:

package com.yang.testCountDownLatch;  import java.util.concurrent.CountDownLatch;  public class Main {     private static final int NUM = 3;      public static void main(String[] args) throws InterruptedException {         CountDownLatch latch = new CountDownLatch(NUM);         for (int i = 0; i < NUM; i++) {             new Thread(() -> {                 try {                     Thread.sleep(2000);                     System.out.println(Thread.currentThread().getName() + "運行完畢");                 } catch (InterruptedException e) {                     e.printStackTrace();                 } finally {                     latch.countDown();                 }             }).start();         }         latch.await();         System.out.println("主線程運行完畢");     } }

輸出如下:

CountDownLatch的實現(xiàn)原理是什么

看得出來,主線程會等到3個子線程執(zhí)行完畢才會執(zhí)行。

原理解析

類圖

CountDownLatch的實現(xiàn)原理是什么

可以看得出來,CountDownLatch里面有一個繼承AQS的內(nèi)部類Sync,其實是AQS來支持CountDownLatch的各項操作的。

CountDownLatch(int count)

new CountDownLatch(int count)用來創(chuàng)建一個AQS同步隊列,并將計數(shù)器的值賦給了AQS的state。

public CountDownLatch(int count) {     if (count < 0) throw new IllegalArgumentException("count < 0");     this.sync = new Sync(count); }  private static final class Sync extends AbstractQueuedSynchronizer {          Sync(int count) {         setState(count);     }  }

countDown()

countDown()方法會對計數(shù)器進行減1的操作,當(dāng)計數(shù)器值為0時,將會喚醒在阻塞隊列中等待的所有線程。其內(nèi)部調(diào)用了Sync的releaseShared(1)方法

public void countDown() {      sync.releaseShared(1);  }   public final boolean releaseShared(int arg) {      if (tryReleaseShared(arg)) {          //此時計數(shù)器的值為0,喚醒所有被阻塞的線程          doReleaseShared();          return true;      }      return false;  }

tryReleaseShared(arg)內(nèi)部使用了自旋+CAS操將計數(shù)器的值減1,當(dāng)減為0時,方法返回true,將會調(diào)用doReleaseShared()方法。對CAS機制不了解的同學(xué),可以先參考我的另外一篇文章淺探CAS實現(xiàn)原理

protected boolean tryReleaseShared(int releases) {       //自旋       for (;;) {           int c = getState();           if (c == 0)               //此時計數(shù)器的值已經(jīng)為0了,其他線程早就執(zhí)行完畢了,當(dāng)前線程也已經(jīng)再執(zhí)行了,不需要再次喚醒了               return false;           int nextc = c-1;           //使用CAS機制,將state的值變?yōu)閟tate-1           if (compareAndSetState(c, nextc))               return nextc == 0;       }   }

doReleaseShared()是AQS中的方法,該方法會喚醒隊列中所有被阻塞的線程。

private void doReleaseShared() {      for (;;) {          Node h = head;          if (h != null && h != tail) {              int ws = h.waitStatus;              if (ws == Node.SIGNAL) {                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                      continue;            // loop to recheck cases                  unparkSuccessor(h);              }              else if (ws == 0 &&                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                  continue;                // loop on failed CAS          }          if (h == head)                   // loop if head changed              break;      }  }

這段方法比較難理解,會另外篇幅介紹。這里只要認為該段方法會喚醒所有因調(diào)用await()方法而阻塞的線程。

await()

當(dāng)計數(shù)器的值不為0時,該方法會將當(dāng)前線程加入到阻塞隊列中,并把當(dāng)前線程掛起。

public void await() throws InterruptedException {     sync.acquireSharedInterruptibly(1); }

同樣是委托內(nèi)部類Sync,調(diào)用其

acquireSharedInterruptibly()方法

public final void acquireSharedInterruptibly(int arg)           throws InterruptedException {       if (Thread.interrupted())           throw new InterruptedException();       if (tryAcquireShared(arg) < 0)           doAcquireSharedInterruptibly(arg);   }

接著看Sync內(nèi)的tryAcquireShared()方法,如果當(dāng)前計數(shù)器的值為0,則返回1,最終將導(dǎo)致await()不會將線程阻塞。如果當(dāng)前計數(shù)器的值不為0,則返回-1。

protected int tryAcquireShared(int acquires) {         return (getState() == 0) ? 1 : -1;     }

tryAcquireShared方法返回一個負值時,將會調(diào)用AQS中的

doAcquireSharedInterruptibly()方法,將調(diào)用await()方法的線程加入到阻塞隊列中,并將此線程掛起。

private void doAcquireSharedInterruptibly(int arg)       throws InterruptedException {       //將當(dāng)前線程構(gòu)造成一個共享模式的節(jié)點,并加入到阻塞隊列中       final Node node = addWaiter(Node.SHARED);       boolean failed = true;       try {           for (;;) {               final Node p = node.predecessor();               if (p == head) {                           int r = tryAcquireShared(arg);                   if (r >= 0) {                       setHeadAndPropagate(node, r);                       p.next = null; // help GC                       failed = false;                       return;                   }               }               if (shouldParkAfterFailedAcquire(p, node) &&                   parkAndCheckInterrupt())                   throw new InterruptedException();           }       } finally {           if (failed)               cancelAcquire(node);       }   }

同樣,以上的代碼位于AQS中,在沒有了解AQS結(jié)構(gòu)的情況下去理解上述代碼,有些困難,關(guān)于AQS源碼,會另開篇幅介紹。

使用場景

CountDownLatch的使用場景很廣泛,一般用于分頭做某些事,再匯總的情景。例如:

數(shù)據(jù)報表:當(dāng)前的微服務(wù)架構(gòu)十分流行,大多數(shù)項目都會被拆成若干的子服務(wù),那么報表服務(wù)在進行統(tǒng)計時,需要向各個服務(wù)抽取數(shù)據(jù)。此時可以創(chuàng)建與服務(wù)數(shù)相同的線程數(shù),交由線程池處理,每個線程去對應(yīng)服務(wù)中抽取數(shù)據(jù),注意需要在finally語句塊中進行countDown()操作。主線程調(diào)用await()阻塞,直到所有數(shù)據(jù)抽取成功,最后主線程再進行對數(shù)據(jù)的過濾組裝等,形成直觀的報表。

風(fēng)險評估:客戶端的一個同步請求查詢用戶的風(fēng)險等級,服務(wù)端收到請求后會請求多個子系統(tǒng)獲取數(shù)據(jù),然后使用風(fēng)險評估規(guī)則模型進行風(fēng)險評估。如果使用單線程去完成這些操作,這個同步請求超時的可能性會很大,因為服務(wù)端請求多個子系統(tǒng)是依次排隊的,請求子系統(tǒng)獲取數(shù)據(jù)的時間是線性累加的。此時可以使用CountDownLatch,讓多個線程并發(fā)請求多個子系統(tǒng),當(dāng)獲取到多個子系統(tǒng)數(shù)據(jù)之后,再進行風(fēng)險評估,這樣請求子系統(tǒng)獲取數(shù)據(jù)的時間就等于最耗時的那個請求的時間,可以大大減少處理時間。

感謝各位的閱讀,以上就是“CountDownLatch的實現(xiàn)原理是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對CountDownLatch的實現(xiàn)原理是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI