溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

死磕 java同步系列之CyclicBarrier源碼解析——有圖有真相

發(fā)布時間:2020-08-14 19:43:36 來源:網(wǎng)絡(luò) 閱讀:257 作者:彤哥讀源碼 欄目:編程語言

問題

(1)CyclicBarrier是什么?

(2)CyclicBarrier具有什么特性?

(3)CyclicBarrier與CountDownLatch的對比?

簡介

CyclicBarrier,回環(huán)柵欄,它會阻塞一組線程直到這些線程同時達到某個條件才繼續(xù)執(zhí)行。它與CountDownLatch很類似,但又不同,CountDownLatch需要調(diào)用countDown()方法觸發(fā)事件,而CyclicBarrier不需要,它就像一個柵欄一樣,當(dāng)一組線程都到達了柵欄處才繼續(xù)往下走。

使用方法

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                System.out.println("before");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("after");
            }).start();
        }
    }
}    

這段方法很簡單,使用一個CyclicBarrier使得三個線程保持同步,當(dāng)三個線程同時到達cyclicBarrier.await();處大家再一起往下運行。

源碼分析

主要內(nèi)部類

private static class Generation {
    boolean broken = false;
}

Generation,中文翻譯為代,一代人的代,用于控制CyclicBarrier的循環(huán)使用。

比如,上面示例中的三個線程完成后進入下一代,繼續(xù)等待三個線程達到柵欄處再一起執(zhí)行,而CountDownLatch則做不到這一點,CountDownLatch是一次性的,無法重置其次數(shù)。

主要屬性

// 重入鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件鎖,名稱為trip,絆倒的意思,可能是指線程來了先絆倒,等達到一定數(shù)量了再喚醒
private final Condition trip = lock.newCondition();
// 需要等待的線程數(shù)量
private final int parties;
// 當(dāng)喚醒的時候執(zhí)行的命令
private final Runnable barrierCommand;
// 代
private Generation generation = new Generation();
// 當(dāng)前這一代還需要等待的線程數(shù)
private int count;

通過屬性可以看到,CyclicBarrier內(nèi)部是通過重入鎖的條件鎖來實現(xiàn)的,那么你可以腦補一下這個場景嗎?

彤哥來腦補一下:假如初始時count = parties = 3,當(dāng)?shù)谝粋€線程到達柵欄處,count減1,然后把它加入到Condition的隊列中,第二個線程到達柵欄處也是如此,第三個線程到達柵欄處,count減為0,調(diào)用Condition的signalAll()通知另外兩個線程,然后把它們加入到AQS的隊列中,等待當(dāng)前線程運行完畢,調(diào)用lock.unlock()的時候依次從AQS的隊列中喚醒一個線程繼續(xù)運行,也就是說實際上三個線程先依次(排隊)到達柵欄處,再依次往下運行。

以上純屬彤哥腦補的內(nèi)容,真實情況是不是如此呢,且往后看。

構(gòu)造方法

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 初始化parties
    this.parties = parties;
    // 初始化count等于parties
    this.count = parties;
    // 初始化都到達柵欄處執(zhí)行的命令
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

構(gòu)造方法需要傳入一個parties變量,也就是需要等待的線程數(shù)。

await()方法

每個需要在柵欄處等待的線程都需要顯式地調(diào)用await()方法等待其它線程的到來。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 調(diào)用dowait方法,不需要超時
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lock();
    try {
        // 當(dāng)前代
        final Generation g = generation;

        // 檢查
        if (g.broken)
            throw new BrokenBarrierException();

        // 中斷檢查
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // count的值減1
        int index = --count;
        // 如果數(shù)量減到0了,走這段邏輯(最后一個線程走這里)
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 如果初始化的時候傳了命令,這里執(zhí)行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 調(diào)用下一代方法
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 這個循環(huán)只有非最后一個線程可以走到
        for (;;) {
            try {
                if (!timed)
                    // 調(diào)用condition的await()方法
                    trip.await();
                else if (nanos > 0L)
                    // 超時等待方法
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            // 檢查
            if (g.broken)
                throw new BrokenBarrierException();

            // 正常來說這里肯定不相等
            // 因為上面打破柵欄的時候調(diào)用nextGeneration()方法時generation的引用已經(jīng)變化了
            if (g != generation)
                return index;

            // 超時檢查
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
private void nextGeneration() {
    // 調(diào)用condition的signalAll()將其隊列中的等待者全部轉(zhuǎn)移到AQS的隊列中
    trip.signalAll();
    // 重置count
    count = parties;
    // 進入下一代
    generation = new Generation();
}

dowait()方法里的整個邏輯分成兩部分:

(1)最后一個線程走上面的邏輯,當(dāng)count減為0的時候,打破柵欄,它調(diào)用nextGeneration()方法通知條件隊列中的等待線程轉(zhuǎn)移到AQS的隊列中等待被喚醒,并進入下一代。

(2)非最后一個線程走下面的for循環(huán)邏輯,這些線程會阻塞在condition的await()方法處,它們會加入到條件隊列中,等待被通知,當(dāng)它們喚醒的時候已經(jīng)更新?lián)Q“代”了,這時候返回。

圖解

死磕 java同步系列之CyclicBarrier源碼解析——有圖有真相

學(xué)習(xí)過前面的章節(jié),看這個圖很簡單了,看不懂的同學(xué)還需要把推薦的內(nèi)容好好看看哦^^

總結(jié)

(1)CyclicBarrier會使一組線程阻塞在await()處,當(dāng)最后一個線程到達時喚醒(只是從條件隊列轉(zhuǎn)移到AQS隊列中)前面的線程大家再繼續(xù)往下走;

(2)CyclicBarrier不是直接使用AQS實現(xiàn)的一個同步器;

(3)CyclicBarrier基于ReentrantLock及其Condition實現(xiàn)整個同步邏輯;

彩蛋

CyclicBarrier與CountDownLatch的異同?

(1)兩者都能實現(xiàn)阻塞一組線程等待被喚醒;

(2)前者是最后一個線程到達時自動喚醒;

(3)后者是通過顯式地調(diào)用countDown()實現(xiàn)的;

(4)前者是通過重入鎖及其條件鎖實現(xiàn)的,后者是直接基于AQS實現(xiàn)的;

(5)前者具有“代”的概念,可以重復(fù)使用,后者只能使用一次;

(6)前者只能實現(xiàn)多個線程到達柵欄處一起運行;

(7)后者不僅可以實現(xiàn)多個線程等待一個線程條件成立,還能實現(xiàn)一個線程等待多個線程條件成立(詳見CountDownLatch那章使用案例);

推薦閱讀

1、死磕 java同步系列之開篇

2、死磕 java魔法類之Unsafe解析

3、死磕 java同步系列之JMM(Java Memory Model)

4、死磕 java同步系列之volatile解析

5、死磕 java同步系列之synchronized解析

6、死磕 java同步系列之自己動手寫一個鎖Lock

7、死磕 java同步系列之AQS起篇

8、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

9、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

10、死磕 java同步系列之ReentrantLock VS synchronized

11、死磕 java同步系列之ReentrantReadWriteLock源碼解析

12、死磕 java同步系列之Semaphore源碼解析

13、死磕 java同步系列之CountDownLatch源碼解析

14、死磕 java同步系列之AQS終篇

15、死磕 java同步系列之StampedLock源碼解析


歡迎關(guān)注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

死磕 java同步系列之CyclicBarrier源碼解析——有圖有真相

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI