溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

深入剖析線程同步工具CountDownLatch原理

發(fā)布時間:2020-07-07 09:27:53 來源:網絡 閱讀:5220 作者:kukelook 欄目:編程語言

0 CountDownLatch的作用

CountDownLatch作為一個多線程間的同步工具,它允許一個或多個線程等待其他線程(可以是多個)完成工作后,再恢復執(zhí)行。

就像下面這樣:

深入剖析線程同步工具CountDownLatch原理

1 從一個Demo說起

我們直接拿源碼中給出的Demo看一下,源碼中的這個demo可以看做模擬一個賽跑的場景。 賽跑肯定有跑得快的運動員也有跑的慢的運動員,每個運動員就表示一個線程。 運動員聽到槍聲后開始起跑,而最后一個運動員到達終點后,標志的比賽的結束。 整個過程如下圖所示:

CountDownLatch跑步模擬

源碼如下所示

public class Race {

    private static final int N = 4;

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);  // 鳴槍開始信號
        CountDownLatch doneSignal = new CountDownLatch(N);   // 等待N個運動員都跑完后,比賽結束(結束信號)

        for (int i = 0; i < N; ++i) // N個運動員準備就緒,等待槍聲
            new Thread(new Runner(startSignal, doneSignal, i)).start();

        Thread.sleep(1000); // 等待所有運動員就緒
        System.out.println("所有運動員就緒");
        startSignal.countDown();      // 鳴槍,開賽
        System.out.println("比賽進行中...");
        doneSignal.await();           // 等待N個運動員全部跑完(等待doneSignal變?yōu)?)
        System.out.println("比賽結束");
    }
}

class Runner implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    private int number;

    Runner(CountDownLatch startSignal, CountDownLatch doneSignal, int number) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
        this.number = number;
    }
    public void run() {
        try {
            // 等待槍聲(等待開始信號startSignal變?yōu)?)
            System.out.println(number + "號運動員準備就緒");
            startSignal.await();
            // 賽跑
            System.out.println(number + "號運動員跑步中...");
            Thread.sleep(new Random().nextInt(10) * 1000);
            // 此運動員跑到終點
            System.out.println(number + "號運動員到達終點");
            doneSignal.countDown();
        } catch (InterruptedException ex) {} // return;
    }
}

上面代碼運行后,輸出如下:

0號運動員準備就緒
3號運動員準備就緒
2號運動員準備就緒
1號運動員準備就緒
所有運動員就緒
比賽進行中...
0號運動員跑步中...
1號運動員跑步中...
2號運動員跑步中...
3號運動員跑步中...
2號運動員到達終點
1號運動員到達終點
0號運動員到達終點
3號運動員到達終點
比賽結束

下面,深入到代碼細節(jié),看一下CountDownLatch初始化、countDown方法、await方法是如何實現的。

2 CountDownLatch類圖

通過下圖來了解一下CountDownLatch的類繼承關系

CountDownLatch類圖
3 CountDownLatch的初始化
CountDownLatch只有一個構造方法:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

他會初始化一個Sync,這是他的一個內部類,類似于ReentrantLock,Sync也繼承于AbstractQueuedSynchronizer(AQS)。

AQS是個啥?可以參考筆者的另一篇文章:Java隊列同步器(AQS)到底是怎么一回事

然后看一下Sync的源碼

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    // 調用AQS的setState方法,將state賦值為count的值
    Sync(int count) {
        setState(count);
    }

    // 獲取AQS state的當前值
    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

所以CountDownLatch的初始化,其實是將參數count的值賦值給AQS的state,依然是用state來控制同步狀態(tài)。

4 await方法的實現
依然用上面賽跑的例子來說明這個問題。這里我們只考慮所有運動員等待槍聲的情景。

回憶一下,賽跑的例子中,通過下面的方式創(chuàng)建了鳴槍信號:

CountDownLatch startSignal = new CountDownLatch(1);  // 鳴槍開始信號

然后創(chuàng)建了N個線程(表示N個運動員),并調用其start方法讓其開始執(zhí)行(運動員準備就緒,等待鳴槍開跑)。

然后通過在run方法中調用startSignal.await(),來實現等待鳴槍的動作(其實就是等startSignal的值降為0)。

我們來看一下他是怎么await的。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

調用了AQS的acquireSharedInterruptibly方法

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 判斷線程是否已經被中斷
    if (Thread.interrupted())
        throw new InterruptedException();
    // 調用CountDownLatch.Sync的tryAcquireShared方法
    // 此方法判斷count的值是否==0,如果==0,返回1,否則返回-1
    // 目前我們還沒有執(zhí)行countDown,所以count肯定不等于0,這里肯定返回-1
    // 所以會執(zhí)行到AQS的doAcquireSharedInterruptibly方法中
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

AQS.doAcquireSharedInterruptibly的實現如下

/
 * Acquires in shared interruptible mode.
 * @param arg the acquire argument
 */
// 此方法會在count>0時將當前線程加入到等待隊列中
// 由于我們目前還沒有執(zhí)行countDown,所以count會保持>0
// 啟動的N個線程會全部加入到隊列中
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 將當前線程添加到等待隊列中(SHARED模式)
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 自旋獲取同步狀態(tài)
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 依然調用CountDownLatch.Sync的tryAcquireShared方法判斷
                // 如果count降為0,退出自旋
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 將node的waitStatus設置為-1(常量SIGNAL,表示需要喚醒),并阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}**
```**

假設N=4,那么4個線程全部start后,會全部加入到隊列中自旋等待,像下面這樣:

CountDownLatch.await自旋等待
5 countDown方法的實現
countDown方法實際上就是將AQS中的state的值-1。然后判斷當前state的值是否==0,如果等于0,說明所有線程都執(zhí)行結束了,需要喚醒所有等待的線程。

依然繼續(xù)上面的場景,鳴槍后,所有的運動員開跑。

鳴槍這個動作實際上就是在主線程中執(zhí)行:

startSignal.countDown();


這就相當于向剛才隊列中的所有線程發(fā)了一個恢復執(zhí)行的信號,所有線程會被喚醒,繼續(xù)執(zhí)行await后面的代碼。

countDown具體干了啥呢?

public void countDown() {
sync.releaseShared(1);
}


他會調用AQS的releaseShared方法

public final boolean releaseShared(int arg) {
// 調用CountDownLatch.Sync的tryReleaseShared方法
// 該方法嘗試將count值-1,并判斷-1后的count是否==0,如果==0,返回true,否則false
// 該方法的源碼已經在Sync的源碼中給出,可翻閱上文查看
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}


由于startSignal中count的初始值==1,startSignal.countDown()后,count變?yōu)?。所以tryReleaseShared會返回true。

然后開始執(zhí)行doReleaseShared,喚醒隊列中的線程。

doReleaseShared是AQS中的方法。

/**

  • Release action for shared mode -- signals successor and ensures
  • propagation. (Note: For exclusive mode, release just amounts
  • to calling unparkSuccessor of head if it needs signal.)
    /
    private void doReleaseShared() {
    /
    • Ensure that a release propagates, even if there are other
    • in-progress acquires/releases. This proceeds in the usual
    • way of trying to unparkSuccessor of head if it needs
    • signal. But if it does not, status is set to PROPAGATE to
    • ensure that upon release, propagation continues.
    • Additionally, we must loop in case a new node is added
    • while we are doing this. Also, unlike other uses of
    • unparkSuccessor, we need to know if CAS to reset status
    • fails, if so rechecking.
      */
      for (;;) {
      Node h = head;
      if (h != null && h != tail) {
      int ws = h.waitStatus;
      // Node.SIGNAL == -1
      // 由上文可知,進入隊列的線程的waitStatus都等于-1
      // 所以這里為true
      if (ws == Node.SIGNAL) {
      // 嘗試將waitStatus從-1改為0,如果修改成功,就恢復這個線程的執(zhí)行狀態(tài)
      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
      continue; // loop to recheck cases
      unparkSuccessor(h);
      }
      else if (ws == 0 &&
      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
      continue; // loop on failed CAS
      }
      if (h == head) // loop if head changed
      break;
      }
      }

這里,被阻塞的線程又恢復執(zhí)行,恢復到哪了呢?就是剛才自旋等待的那里。

把上面的源碼直接拿下來,再說明一下(注釋部分)

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 將當前線程添加到等待隊列中(SHARED模式)
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 線程被釋放后,繼續(xù)下一次循環(huán)
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 獲取頭節(jié)點,從頭結點開始釋放,由于count已經降為0,所以r >= 0為true
                // 然后會將自己摘除當前隊列,使下一個節(jié)點成為頭節(jié)點
                // 等下一個節(jié)點也恢復過來后,同樣執(zhí)行上面的過程
                // 這樣,隊列中的所有線程就被釋放了
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 將node的waitStatus設置為-1(常量SIGNAL,表示需要喚醒),并阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

6 總結

本文從源碼層面詳細說明了CountDownLatch是如何運作的。 CountDownLatch也是基于AQS實現,所以了解AQS的機制,對于理解本文至關重要。 其實,CountDownLatch最核心的就是通過控制AQS的state,來同步多個線程之間的狀態(tài)。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI