溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

JUC的CyclicBarrier怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-21 10:22:07 來(lái)源:億速云 閱讀:94 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“JUC的CyclicBarrier怎么實(shí)現(xiàn)”,在日常操作中,相信很多人在JUC的CyclicBarrier怎么實(shí)現(xiàn)問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”JUC的CyclicBarrier怎么實(shí)現(xiàn)”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

CyclicBarrier 實(shí)現(xiàn)內(nèi)幕

前面介紹的 CountDownLatch 同步器是基于 AQS 實(shí)現(xiàn)的,而本文要介紹的 CyclicBarrier 則沒(méi)有直接繼承 AQS 的 AbstractQueuedSynchronizer 抽象類(lèi),而是基于 ReentrantLock 鎖進(jìn)行實(shí)現(xiàn)。首先來(lái)看一下 CyclicBarrier 的字段定義,如下:

public class CyclicBarrier {

    /** 支撐 CyclicBarrier 的重入鎖 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 條件隊(duì)列,已經(jīng)到達(dá)屏障的線程會(huì)在條件隊(duì)列中等待其它線程 */
    private final Condition trip = lock.newCondition();
    /** 參與的線程數(shù) */
    private final int parties;
    /** 當(dāng)所有線程都到達(dá)屏障時(shí)的回調(diào)函數(shù) */
    private final Runnable barrierCommand;
    /** 當(dāng)前年代對(duì)象 */
    private Generation generation = new Generation();
    /** 當(dāng)前剩余未完成的線程數(shù) */
    private int count;

    // ... 省略方法定義

}

上述各個(gè)字段的含義如代碼注釋?zhuān)@里我們進(jìn)一步解釋一下 generation 字段,該字段為 Generation 類(lèi)型,用于表示當(dāng)前 CyclicBarrier 同步器的年代信息。Generation 內(nèi)部類(lèi)定義如下:

private static class Generation {
    boolean broken = false;
}

當(dāng)新建一個(gè) CyclicBarrier 對(duì)象時(shí)會(huì)初始化 CyclicBarrier#generation 字段。此外,當(dāng)所有參與的線程都到達(dá)屏障后(也稱(chēng) tripped),或者 CyclicBarrier 被重置(即調(diào)用 CyclicBarrier#reset 方法)時(shí),會(huì)新建一個(gè) Generation 對(duì)象賦值給 CyclicBarrier#generation 字段,表示年代的更替。

Generation 定義的 Generation#broken 屬性用于標(biāo)識(shí)當(dāng)前屏障是否被打破。當(dāng) CyclicBarrier 被重置,或者參與到該屏障的某個(gè)線程被中斷、等待超時(shí),亦或是執(zhí)行回調(diào)函數(shù)發(fā)生異常,都會(huì)導(dǎo)致屏障被打破。破損的屏障(即 broken=true)會(huì)導(dǎo)致當(dāng)前參與等待的線程,以及已經(jīng)處于等待狀態(tài)的線程拋出 BrokenBarrierException 異常,并退出當(dāng)前屏障進(jìn)程。

因?yàn)?CyclicBarrier 的復(fù)用性,導(dǎo)致在程序運(yùn)行期間可能并存多個(gè)年代信息,但是任何時(shí)刻只有一個(gè)年代對(duì)象是活躍的,剩余的年代對(duì)象對(duì)應(yīng)的 CyclicBarrier 要么是已經(jīng)用完的(tripped),要么就是已經(jīng)破損的。

介紹完了字段定義,下面來(lái)分析一下 CyclicBarrier 的方法實(shí)現(xiàn),首先來(lái)看一下構(gòu)造方法。CyclicBarrier 定義了兩個(gè)構(gòu)造方法,實(shí)現(xiàn)如下:

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) {
        throw new IllegalArgumentException();
    }
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

其中 parties 參數(shù)用于指定當(dāng)前參與的線程數(shù),參數(shù) barrierAction 用于指定當(dāng)所有參與的線程都到達(dá)屏障時(shí)的回調(diào)邏輯。你可能有些疑問(wèn),既然設(shè)置了 parties 字段,為什么還要設(shè)置一個(gè) count 字段呢?

考慮 CyclicBarrier 是可重用的,所以需要有一個(gè)字段記錄參與線程的數(shù)目,即 parties 字段,而 count 字段初始值等于 parties 字段值,但是在運(yùn)行期間其值是會(huì)隨著參與線程逐一到達(dá)屏障而遞減的,所以 count 值始終記錄的是當(dāng)前未到達(dá)屏障的線程數(shù)。當(dāng) CyclicBarrier 被重置時(shí),我們需要依據(jù) parties 字段值來(lái)重置 count 字段值。

繼續(xù)來(lái)看一下 CyclicBarrier 除構(gòu)造方法以外的剩余方法實(shí)現(xiàn),主要分析一下 CyclicBarrier#await 方法和 CyclicBarrier#reset 方法。首先來(lái)看一下 CyclicBarrier#reset 方法,當(dāng)我們希望復(fù)用 CyclicBarrier 對(duì)象時(shí)可以調(diào)用該方法,用于重置 count 值、年代信息,并喚醒所有位于條件隊(duì)列中等待的線程。方法實(shí)現(xiàn)如下:

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        this.breakBarrier();   // break the current generation
        this.nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

private void breakBarrier() {
    // 標(biāo)識(shí)當(dāng)前屏障被打破
    generation.broken = true;
    // 重置 count 字段值
    count = parties;
    // 喚醒所有等待的線程
    trip.signalAll();
}

private void nextGeneration() {
    // 喚醒所有等待的線程
    trip.signalAll();
    // 重置 count 值
    count = parties;
    generation = new Generation();
}

再來(lái)看一下 CyclicBarrier#await 方法,該方法用于阻塞當(dāng)前線程,以在屏障處等待其它線程到達(dá),CyclicBarrier 還為該方法定義了超時(shí)等待版本。當(dāng)一個(gè)線程因調(diào)用 CyclicBarrier#await 方法進(jìn)入等待狀態(tài)時(shí),該線程將會(huì)在滿足以下條件之一時(shí)退出等待狀態(tài):

  1. 所有參與的線程都已經(jīng)到達(dá)了屏障。

  2. 當(dāng)前線程被中斷,或者其它處于等待狀態(tài)的線程被中斷。

  3. 如果啟用了超時(shí)機(jī)制,并且某個(gè)參與的線程等待超時(shí)。

  4. CyclicBarrier 被重置。

方法 CyclicBarrier#await 的普通版本和超時(shí)版本在實(shí)現(xiàn)上都是直接委托給 CyclicBarrier#dowait 方法執(zhí)行,所以下面主要來(lái)分析一下該方法,實(shí)現(xiàn)如下:

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lock();
    try {
        // 獲取當(dāng)前年代信息
        final Generation g = generation;

        // 當(dāng)前屏障被打破,拋出異常
        if (g.broken) {
            throw new BrokenBarrierException();
        }

        // 當(dāng)前線程被中斷,打破屏障,并喚醒所有等待的線程
        if (Thread.interrupted()) {
            this.breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        // 如果 count 值為 0,說(shuō)明所有的線程都已經(jīng)到達(dá)屏障
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 如果設(shè)置了回調(diào),則執(zhí)行
                final Runnable command = barrierCommand;
                if (command != null) {
                    command.run();
                }
                ranAction = true;
                // 喚醒所有等待的線程,并重置屏障
                this.nextGeneration();
                return 0;
            } finally {
                // 如果執(zhí)行回調(diào)異常
                if (!ranAction) {
                    this.breakBarrier();
                }
            }
        }

        // count 值不為 0,說(shuō)明存在還未到達(dá)屏障的線程,則進(jìn)入條件隊(duì)列等待

        // loop until tripped, broken, interrupted, or timed out
        for (; ; ) {
            try {
                if (!timed) {
                    // 進(jìn)入條件隊(duì)列等待
                    trip.await();
                } else if (nanos > 0L) {
                    // 進(jìn)入條件隊(duì)列超時(shí)等待
                    nanos = trip.awaitNanos(nanos);
                }
            } catch (InterruptedException ie) {
                // 當(dāng)前線程被中斷,響應(yīng)中斷
                if (g == generation && !g.broken) {
                    this.breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not been interrupted,
                    // so this interrupt is deemed to "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            // 屏障被打破
            if (g.broken) {
                throw new BrokenBarrierException();
            }

            // 當(dāng)前 CyclicBarrier 已經(jīng)被重置
            if (g != generation) {
                return index;
            }

            // 等待超時(shí)
            if (timed && nanos <= 0L) {
                this.breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

由上述實(shí)現(xiàn)我們可以總結(jié)線程在調(diào)用 CyclicBarrier#await 方法時(shí)的整體執(zhí)行流程。如果當(dāng)前線程不是最后一個(gè)到達(dá)屏障的線程(遞減 count 值之后仍然大于 0),則調(diào)用 Condition#await 方法(或超時(shí)版本)將當(dāng)前線程添加到條件隊(duì)列中等待。如果當(dāng)前線程是最后一個(gè)到達(dá)屏障的線程(遞減 count 值之后為 0),則在線程到達(dá)屏障后執(zhí)行:

  1. 如果指定了回調(diào)邏輯,則執(zhí)行該回調(diào),如果期間發(fā)生任何異常,則打破屏障、重置 count 值,并喚醒條件隊(duì)列中所有等待的線程;

  2. 否則,繼續(xù)調(diào)用 CyclicBarrier#nextGeneration 方法喚醒條件隊(duì)列中所有等待的線程,并重置 count 值和年代信息。

在上述過(guò)程中如果當(dāng)前線程或處于等待狀態(tài)的線程被中斷、屏障被打破、年代信息發(fā)生變化,或者等待超時(shí)(如果允許的話),則線程將會(huì)從 Condition#await 方法中退出,即當(dāng)前屏障失效。

到此,關(guān)于“JUC的CyclicBarrier怎么實(shí)現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI