您好,登錄后才能下訂單哦!
這篇文章主要介紹“JUC的CyclicBarrier怎么實(shí)現(xiàn)”,在日常操作中,相信很多人在JUC的CyclicBarrier怎么實(shí)現(xiàn)問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”JUC的CyclicBarrier怎么實(shí)現(xiàn)”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
前面介紹的 CountDownLatch 同步器是基于 AQS 實(shí)現(xiàn)的,而本文要介紹的 CyclicBarrier 則沒(méi)有直接繼承 AQS 的 AbstractQueuedSynchronizer 抽象類(lèi),而是基于 ReentrantLock 鎖進(jìn)行實(shí)現(xiàn)。首先來(lái)看一下 CyclicBarrier 的字段定義,如下:
public class CyclicBarrier { /** 支撐 CyclicBarrier 的重入鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 條件隊(duì)列,已經(jīng)到達(dá)屏障的線程會(huì)在條件隊(duì)列中等待其它線程 */ private final Condition trip = lock.newCondition(); /** 參與的線程數(shù) */ private final int parties; /** 當(dāng)所有線程都到達(dá)屏障時(shí)的回調(diào)函數(shù) */ private final Runnable barrierCommand; /** 當(dāng)前年代對(duì)象 */ private Generation generation = new Generation(); /** 當(dāng)前剩余未完成的線程數(shù) */ private int count; // ... 省略方法定義 }
上述各個(gè)字段的含義如代碼注釋?zhuān)@里我們進(jìn)一步解釋一下 generation 字段,該字段為 Generation 類(lèi)型,用于表示當(dāng)前 CyclicBarrier 同步器的年代信息。Generation 內(nèi)部類(lèi)定義如下:
private static class Generation { boolean broken = false; }
當(dāng)新建一個(gè) CyclicBarrier 對(duì)象時(shí)會(huì)初始化 CyclicBarrier#generation
字段。此外,當(dāng)所有參與的線程都到達(dá)屏障后(也稱(chēng) tripped),或者 CyclicBarrier 被重置(即調(diào)用 CyclicBarrier#reset
方法)時(shí),會(huì)新建一個(gè) Generation 對(duì)象賦值給 CyclicBarrier#generation
字段,表示年代的更替。
Generation 定義的 Generation#broken
屬性用于標(biāo)識(shí)當(dāng)前屏障是否被打破。當(dāng) CyclicBarrier 被重置,或者參與到該屏障的某個(gè)線程被中斷、等待超時(shí),亦或是執(zhí)行回調(diào)函數(shù)發(fā)生異常,都會(huì)導(dǎo)致屏障被打破。破損的屏障(即 broken=true
)會(huì)導(dǎo)致當(dāng)前參與等待的線程,以及已經(jīng)處于等待狀態(tài)的線程拋出 BrokenBarrierException 異常,并退出當(dāng)前屏障進(jìn)程。
因?yàn)?CyclicBarrier 的復(fù)用性,導(dǎo)致在程序運(yùn)行期間可能并存多個(gè)年代信息,但是任何時(shí)刻只有一個(gè)年代對(duì)象是活躍的,剩余的年代對(duì)象對(duì)應(yīng)的 CyclicBarrier 要么是已經(jīng)用完的(tripped),要么就是已經(jīng)破損的。
介紹完了字段定義,下面來(lái)分析一下 CyclicBarrier 的方法實(shí)現(xiàn),首先來(lái)看一下構(gòu)造方法。CyclicBarrier 定義了兩個(gè)構(gòu)造方法,實(shí)現(xiàn)如下:
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) { throw new IllegalArgumentException(); } this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中 parties 參數(shù)用于指定當(dāng)前參與的線程數(shù),參數(shù) barrierAction 用于指定當(dāng)所有參與的線程都到達(dá)屏障時(shí)的回調(diào)邏輯。你可能有些疑問(wèn),既然設(shè)置了 parties 字段,為什么還要設(shè)置一個(gè) count 字段呢?
考慮 CyclicBarrier 是可重用的,所以需要有一個(gè)字段記錄參與線程的數(shù)目,即 parties 字段,而 count 字段初始值等于 parties 字段值,但是在運(yùn)行期間其值是會(huì)隨著參與線程逐一到達(dá)屏障而遞減的,所以 count 值始終記錄的是當(dāng)前未到達(dá)屏障的線程數(shù)。當(dāng) CyclicBarrier 被重置時(shí),我們需要依據(jù) parties 字段值來(lái)重置 count 字段值。
繼續(xù)來(lái)看一下 CyclicBarrier 除構(gòu)造方法以外的剩余方法實(shí)現(xiàn),主要分析一下 CyclicBarrier#await
方法和 CyclicBarrier#reset
方法。首先來(lái)看一下 CyclicBarrier#reset
方法,當(dāng)我們希望復(fù)用 CyclicBarrier 對(duì)象時(shí)可以調(diào)用該方法,用于重置 count 值、年代信息,并喚醒所有位于條件隊(duì)列中等待的線程。方法實(shí)現(xiàn)如下:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { this.breakBarrier(); // break the current generation this.nextGeneration(); // start a new generation } finally { lock.unlock(); } } private void breakBarrier() { // 標(biāo)識(shí)當(dāng)前屏障被打破 generation.broken = true; // 重置 count 字段值 count = parties; // 喚醒所有等待的線程 trip.signalAll(); } private void nextGeneration() { // 喚醒所有等待的線程 trip.signalAll(); // 重置 count 值 count = parties; generation = new Generation(); }
再來(lái)看一下 CyclicBarrier#await
方法,該方法用于阻塞當(dāng)前線程,以在屏障處等待其它線程到達(dá),CyclicBarrier 還為該方法定義了超時(shí)等待版本。當(dāng)一個(gè)線程因調(diào)用 CyclicBarrier#await
方法進(jìn)入等待狀態(tài)時(shí),該線程將會(huì)在滿足以下條件之一時(shí)退出等待狀態(tài):
所有參與的線程都已經(jīng)到達(dá)了屏障。
當(dāng)前線程被中斷,或者其它處于等待狀態(tài)的線程被中斷。
如果啟用了超時(shí)機(jī)制,并且某個(gè)參與的線程等待超時(shí)。
CyclicBarrier 被重置。
方法 CyclicBarrier#await
的普通版本和超時(shí)版本在實(shí)現(xiàn)上都是直接委托給 CyclicBarrier#dowait
方法執(zhí)行,所以下面主要來(lái)分析一下該方法,實(shí)現(xiàn)如下:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 獲取當(dāng)前年代信息 final Generation g = generation; // 當(dāng)前屏障被打破,拋出異常 if (g.broken) { throw new BrokenBarrierException(); } // 當(dāng)前線程被中斷,打破屏障,并喚醒所有等待的線程 if (Thread.interrupted()) { this.breakBarrier(); throw new InterruptedException(); } int index = --count; // 如果 count 值為 0,說(shuō)明所有的線程都已經(jīng)到達(dá)屏障 if (index == 0) { // tripped boolean ranAction = false; try { // 如果設(shè)置了回調(diào),則執(zhí)行 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; // 喚醒所有等待的線程,并重置屏障 this.nextGeneration(); return 0; } finally { // 如果執(zhí)行回調(diào)異常 if (!ranAction) { this.breakBarrier(); } } } // count 值不為 0,說(shuō)明存在還未到達(dá)屏障的線程,則進(jìn)入條件隊(duì)列等待 // loop until tripped, broken, interrupted, or timed out for (; ; ) { try { if (!timed) { // 進(jìn)入條件隊(duì)列等待 trip.await(); } else if (nanos > 0L) { // 進(jìn)入條件隊(duì)列超時(shí)等待 nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { // 當(dāng)前線程被中斷,響應(yīng)中斷 if (g == generation && !g.broken) { this.breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not been interrupted, // so this interrupt is deemed to "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 屏障被打破 if (g.broken) { throw new BrokenBarrierException(); } // 當(dāng)前 CyclicBarrier 已經(jīng)被重置 if (g != generation) { return index; } // 等待超時(shí) if (timed && nanos <= 0L) { this.breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
由上述實(shí)現(xiàn)我們可以總結(jié)線程在調(diào)用 CyclicBarrier#await
方法時(shí)的整體執(zhí)行流程。如果當(dāng)前線程不是最后一個(gè)到達(dá)屏障的線程(遞減 count 值之后仍然大于 0),則調(diào)用 Condition#await
方法(或超時(shí)版本)將當(dāng)前線程添加到條件隊(duì)列中等待。如果當(dāng)前線程是最后一個(gè)到達(dá)屏障的線程(遞減 count 值之后為 0),則在線程到達(dá)屏障后執(zhí)行:
如果指定了回調(diào)邏輯,則執(zhí)行該回調(diào),如果期間發(fā)生任何異常,則打破屏障、重置 count 值,并喚醒條件隊(duì)列中所有等待的線程;
否則,繼續(xù)調(diào)用 CyclicBarrier#nextGeneration
方法喚醒條件隊(duì)列中所有等待的線程,并重置 count 值和年代信息。
在上述過(guò)程中如果當(dāng)前線程或處于等待狀態(tài)的線程被中斷、屏障被打破、年代信息發(fā)生變化,或者等待超時(shí)(如果允許的話),則線程將會(huì)從 Condition#await
方法中退出,即當(dāng)前屏障失效。
到此,關(guān)于“JUC的CyclicBarrier怎么實(shí)現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。