溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java concurrency之CountDownLatch原理和示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

發(fā)布時(shí)間:2020-08-26 14:01:13 來源:腳本之家 閱讀:109 作者:mrr 欄目:編程語言

CountDownLatch簡(jiǎn)介

CountDownLatch是一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。 

CountDownLatch和CyclicBarrier的區(qū)別

(01) CountDownLatch的作用是允許1或N個(gè)線程等待其他線程完成執(zhí)行;而CyclicBarrier則是允許N個(gè)線程相互等待。

(02) CountDownLatch的計(jì)數(shù)器無法被重置;CyclicBarrier的計(jì)數(shù)器可以被重置后使用,因此它被稱為是循環(huán)的barrier。

關(guān)于CyclicBarrier的原理,后面一章再來學(xué)習(xí)。

CountDownLatch函數(shù)列表

CountDownLatch(int count)

構(gòu)造一個(gè)用給定計(jì)數(shù)初始化的 CountDownLatch。

// 使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線程被中斷。
void await()
// 使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線程被中斷或超出了指定的等待時(shí)間。
boolean await(long timeout, TimeUnit unit)
// 遞減鎖存器的計(jì)數(shù),如果計(jì)數(shù)到達(dá)零,則釋放所有等待的線程。
void countDown()
// 返回當(dāng)前計(jì)數(shù)。
long getCount()
// 返回標(biāo)識(shí)此鎖存器及其狀態(tài)的字符串。
String toString()

CountDownLatch數(shù)據(jù)結(jié)構(gòu)

CountDownLatch的UML類圖如下:

Java concurrency之CountDownLatch原理和示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

CountDownLatch的數(shù)據(jù)結(jié)構(gòu)很簡(jiǎn)單,它是通過"共享鎖"實(shí)現(xiàn)的。它包含了sync對(duì)象,sync是Sync類型。Sync是實(shí)例類,它繼承于AQS。  

1. CountDownLatch(int count)

public CountDownLatch(int count) {
  if (count < 0) throw new IllegalArgumentException("count < 0");
  this.sync = new Sync(count);
}

說明:該函數(shù)是創(chuàng)建一個(gè)Sync對(duì)象,而Sync是繼承于AQS類。Sync構(gòu)造函數(shù)如下:

Sync(int count) {
  setState(count);
} 

setState()在AQS中實(shí)現(xiàn),源碼如下:

protected final void setState(long newState) {
  state = newState;
}

說明:在AQS中,state是一個(gè)private volatile long類型的對(duì)象。對(duì)于CountDownLatch而言,state表示的”鎖計(jì)數(shù)器“。CountDownLatch中的getCount()最終是調(diào)用AQS中的getState(),返回的state對(duì)象,即”鎖計(jì)數(shù)器“。 

2. await()

public void await() throws InterruptedException {
  sync.acquireSharedInterruptibly(1);
}

說明:該函數(shù)實(shí)際上是調(diào)用的AQS的acquireSharedInterruptibly(1);

AQS中的acquireSharedInterruptibly()的源碼如下:

public final void acquireSharedInterruptibly(long arg)
    throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
}

說明:acquireSharedInterruptibly()的作用是獲取共享鎖。

如果當(dāng)前線程是中斷狀態(tài),則拋出異常InterruptedException。否則,調(diào)用tryAcquireShared(arg)嘗試獲取共享鎖;嘗試成功則返回,否則就調(diào)用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()會(huì)使當(dāng)前線程一直等待,直到當(dāng)前線程獲取到共享鎖(或被中斷)才返回。

tryAcquireShared()在CountDownLatch.java中被重寫,它的源碼如下:

protected int tryAcquireShared(int acquires) {
  return (getState() == 0) ? 1 : -1;
}

說明:tryAcquireShared()的作用是嘗試獲取共享鎖。

如果"鎖計(jì)數(shù)器=0",即鎖是可獲取狀態(tài),則返回1;否則,鎖是不可獲取狀態(tài),則返回-1。

private void doAcquireSharedInterruptibly(long arg)
  throws InterruptedException {
  // 創(chuàng)建"當(dāng)前線程"的Node節(jié)點(diǎn),且Node中記錄的鎖是"共享鎖"類型;并將該節(jié)點(diǎn)添加到CLH隊(duì)列末尾。
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    for (;;) {
      // 獲取上一個(gè)節(jié)點(diǎn)。
      // 如果上一節(jié)點(diǎn)是CLH隊(duì)列的表頭,則"嘗試獲取共享鎖"。
      final Node p = node.predecessor();
      if (p == head) {
        long r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          failed = false;
          return;
        }
      }
      // (上一節(jié)點(diǎn)不是CLH隊(duì)列的表頭) 當(dāng)前線程一直等待,直到獲取到共享鎖。
      // 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀態(tài))。
      if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

說明:

(01) addWaiter(Node.SHARED)的作用是,創(chuàng)建”當(dāng)前線程“的Node節(jié)點(diǎn),且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);并將該節(jié)點(diǎn)添加到CLH隊(duì)列末尾。

(02) node.predecessor()的作用是,獲取上一個(gè)節(jié)點(diǎn)。如果上一節(jié)點(diǎn)是CLH隊(duì)列的表頭,則”嘗試獲取共享鎖“。

(03) shouldParkAfterFailedAcquire()的作用和它的名稱一樣,如果在嘗試獲取鎖失敗之后,線程應(yīng)該等待,則返回true;否則,返回false。

(04) 當(dāng)shouldParkAfterFailedAcquire()返回ture時(shí),則調(diào)用parkAndCheckInterrupt(),當(dāng)前線程會(huì)進(jìn)入等待狀態(tài),直到獲取到共享鎖才繼續(xù)運(yùn)行。 

3. countDown()

public void countDown() {
  sync.releaseShared(1);
}

說明:該函數(shù)實(shí)際上調(diào)用releaseShared(1)釋放共享鎖。

releaseShared()在AQS中實(shí)現(xiàn),源碼如下:

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}

說明:releaseShared()的目的是讓當(dāng)前線程釋放它所持有的共享鎖。

它首先會(huì)通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。

tryReleaseShared()在CountDownLatch.java中被重寫,源碼如下:

protected boolean tryReleaseShared(int releases) {
  // Decrement count; signal when transition to zero
  for (;;) {
    // 獲取“鎖計(jì)數(shù)器”的狀態(tài)
    int c = getState();
    if (c == 0)
      return false;
    // “鎖計(jì)數(shù)器”-1
    int nextc = c-1;
    // 通過CAS函數(shù)進(jìn)行賦值。
    if (compareAndSetState(c, nextc))
      return nextc == 0;
  }
}

說明:tryReleaseShared()的作用是釋放共享鎖,將“鎖計(jì)數(shù)器”的值-1。

總結(jié):CountDownLatch是通過“共享鎖”實(shí)現(xiàn)的。在創(chuàng)建CountDownLatch中時(shí),會(huì)傳遞一個(gè)int類型參數(shù)count,該參數(shù)是“鎖計(jì)數(shù)器”的初始狀態(tài),表示該“共享鎖”最多能被count給線程同時(shí)獲取。當(dāng)某線程調(diào)用該CountDownLatch對(duì)象的await()方法時(shí),該線程會(huì)等待“共享鎖”可用時(shí),才能獲取“共享鎖”進(jìn)而繼續(xù)運(yùn)行。而“共享鎖”可用的條件,就是“鎖計(jì)數(shù)器”的值為0!而“鎖計(jì)數(shù)器”的初始值為count,每當(dāng)一個(gè)線程調(diào)用該CountDownLatch對(duì)象的countDown()方法時(shí),才將“鎖計(jì)數(shù)器”-1;通過這種方式,必須有count個(gè)線程調(diào)用countDown()之后,“鎖計(jì)數(shù)器”才為0,而前面提到的等待線程才能繼續(xù)運(yùn)行!

以上,就是CountDownLatch的實(shí)現(xiàn)原理。

CountDownLatch的使用示例

下面通過CountDownLatch實(shí)現(xiàn):"主線程"等待"5個(gè)子線程"全部都完成"指定的工作(休眠1000ms)"之后,再繼續(xù)運(yùn)行。

 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 public class CountDownLatchTest1 {
   private static int LATCH_SIZE = 5;
   private static CountDownLatch doneSignal;
   public static void main(String[] args) {
     try {
       doneSignal = new CountDownLatch(LATCH_SIZE);
       // 新建5個(gè)任務(wù)
       for(int i=0; i<LATCH_SIZE; i++)
         new InnerThread().start();
       System.out.println("main await begin.");
       // "主線程"等待線程池中5個(gè)任務(wù)的完成
       doneSignal.await();
       System.out.println("main await finished.");
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
   }
   static class InnerThread extends Thread{
     public void run() {
       try {
         Thread.sleep(1000);
         System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
         // 將CountDownLatch的數(shù)值減1
         doneSignal.countDown();
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
     }
   }
 }

運(yùn)行結(jié)果:

main await begin.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
main await finished.

結(jié)果說明:主線程通過doneSignal.await()等待其它線程將doneSignal遞減至0。其它的5個(gè)InnerThread線程,每一個(gè)都通過doneSignal.countDown()將doneSignal的值減1;當(dāng)doneSignal為0時(shí),main被喚醒后繼續(xù)執(zhí)行。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI