溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java 并發(fā)工具CountDownLatch和CyclicBarrier 原理解析

發(fā)布時(shí)間:2020-07-10 08:51:27 來源:網(wǎng)絡(luò) 閱讀:970 作者:wx5c78c8b1dbb1b 欄目:編程語言

[TOC]

Java 并發(fā)工具CountDownLatch和CyclicBarrier 原理解析

一,簡(jiǎn)介

CountDownLatch 允許一個(gè)或者多個(gè)線程等待其他線程完成操作。

CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程達(dá)到一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。

二,代碼演示
CountDownLatchDemo
public class CountDownLatchDemo {

    public static final CountDownLatch count = new CountDownLatch(10);
    private static int j = 0;

    public static void main(String[] args) throws Exception {

        for (int i = 0; i < 10; i++) {

            new Thread(
            ()-> {
                System.out.println("我是"+(++j));
                count.countDown();
                }
            ).start();

        }
        count.await();
        System.out.println("我是總數(shù)"+j+"?。?!");
    }

}

運(yùn)行結(jié)果:
我是1
我是2
我是3
我是4
我是5
我是6
我是7
我是8
我是9
我是10
我是總數(shù)10!??!
CyclicBarrierDemo
public class CyclicBarrierDemo {

    private static final CyclicBarrier c = new CyclicBarrier(6,new Thread(() ->
        System.out.println("我是最后一個(gè)")
            ));

    private static AtomicInteger index = new AtomicInteger(1);

    public static void main(String[] args) throws Exception, BrokenBarrierException {
        for (int i = 1; i <= 6; i ++) {
            new Thread(() -> {
                try {
                    c.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("我是:"+(index.getAndIncrement()));
            }) .start();
        }
    }

}

運(yùn)行結(jié)果:
我是最后一個(gè)
我是:1
我是:2
我是:3
我是:4
我是:5
我是:6
三,源碼解析
CountDownLatch 源碼

原理:

CountDownLatch 又叫做閉鎖,CountDownLatch 的構(gòu)造函數(shù)接受一個(gè)int類型的參數(shù)作為計(jì)數(shù)器,如果你想等待n個(gè)節(jié)點(diǎn)完成,那就傳入N;當(dāng)我們調(diào)用CountDownLatch 的countDown方法時(shí),N就會(huì)減1,CountDownLatch的await會(huì)阻塞當(dāng)前方法,直到N變成0;由于countDown方法可以用在任何地方,這里說的N個(gè)點(diǎn),可以是N個(gè)線程,也可以是1一個(gè)線程里面的N個(gè)步驟。

源碼:

// 構(gòu)造方法
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
   }

// 內(nèi)部類 Sync 繼承AQS
private static final class Sync extends AbstractQueuedSynchronizer {

}
countDown 方法
    public void countDown() {
        // 調(diào)用了AQS的releaseShared方法
        sync.releaseShared(1);
    }
    // 這是Sync的tryReleaseShared 
    // AQS的releaseShared會(huì)調(diào)用子類的tryReleaseShared 用來控制count
    // tryReleaseShared 共享式的釋放狀態(tài) 具體參考AQS
     protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                // 獲取的其實(shí)就是我們構(gòu)造函數(shù)的count
                int c = getState();
                // count == 0 證明整個(gè)記錄流程已經(jīng)完畢了
                if (c == 0)
                    return false;
                // 減1 
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) cas 更新
                    return nextc == 0; // 等于0,返回ture 證明計(jì)數(shù)結(jié)束了,可以去喚醒同步隊(duì)列的線程了
                                        // 喚醒是AQS的releaseShared方法
                                        // 結(jié)合CountDownLatch的await方法理解整這里 
            }
        }
await 方法
    public void await() throws InterruptedException {
        // 共享式獲取同步轉(zhuǎn)態(tài)
        sync.acquireSharedInterruptibly(1);
    }

    // 這是Sync的方法
    // await 其實(shí)是調(diào)用的AQS的acquireSharedInterruptibly 但是aqs會(huì)調(diào)用子類tryAcquireShared
    // 我們看到值有state等于0 才會(huì)返回true 成功 -1 表示失敗 失敗就要加入同步隊(duì)列
    // 所以在countDown方法里面等于0 為什么要去喚醒 ,應(yīng)為這里會(huì)進(jìn)入同步隊(duì)列
    protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
    }

通過源碼我們可以發(fā)現(xiàn)只有當(dāng)countDown 這個(gè)方法計(jì)數(shù)遞減完畢,別的線程才能執(zhí)行,因?yàn)檎{(diào)用await的線程會(huì)進(jìn)入AQS的同步隊(duì)列,然后阻塞。

CyclicBarrier 源碼

原理:

CyclicBarrier 默認(rèn)構(gòu)造方法是CyclicBarrier (int parties),器參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await告訴CyclicBarrier 我已經(jīng)到達(dá)屏障了,然后當(dāng)前線程被阻塞;CyclicBarrier 海提供了一個(gè)高級(jí)的構(gòu)造函數(shù),CyclicBarrier (int parties,Runnable barrierAction),用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction線程,方便處理更復(fù)雜的業(yè)務(wù)邏輯。

源碼:

public CyclicBarrier(int parties) {
        this(parties, null);
    }

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();
await 方法
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 獲取鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 這一代的狀態(tài)
            final Generation g = generation;
            // 默認(rèn)為false Barrier被Broken 就會(huì)為true
            if (g.broken)
                throw new BrokenBarrierException();
            // 線程被中斷了,標(biāo)記為breakBarrier,喚醒所有線程
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // 計(jì)數(shù)器減減
            int index = --count;
            // 到達(dá) trip
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 執(zhí)行構(gòu)造函數(shù)里面的線程
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 喚醒所有等待線程 然后重置
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            // 一直自旋直到發(fā)生:tripped, broken, interrupted, timed out
            for (;;) {
                try {
                    // 帶時(shí)間
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                    // 自旋過程中發(fā)生中斷
                } catch (InterruptedException ie) {
                    // 等于說明當(dāng)前被重點(diǎn)的這個(gè)線程沒有被broken
                    // 拋異常
                    if (g == generation && ! g.broken) { 
                        breakBarrier();
                        throw ie;
                    } else { // 不等于說明后來的線程已經(jīng)broken了
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        // 中斷線程 breakBarrier已經(jīng)沒有意義了
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken) // 屏蔽Broken
                    throw new BrokenBarrierException();
                // 別的線程更新了generation 不屬于當(dāng)前代
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

我們發(fā)現(xiàn)CyclicBarrier是所有線程一起阻塞,直到到達(dá)屏障點(diǎn),然后全部喚醒一起執(zhí)行。

四,總結(jié)

CountDownLatch和CyclicBarrier都可以實(shí)現(xiàn)一個(gè)線程等待一個(gè)或者多個(gè)線程到達(dá)一個(gè)點(diǎn)之后才執(zhí)行,但是這一個(gè)或者多個(gè)線程的狀態(tài)卻是不一樣的,CountDownLatch是來一個(gè)執(zhí)行一個(gè)不會(huì)阻塞,直到大家執(zhí)行完了,在執(zhí)行調(diào)用await方法的線程,CyclicBarrier是來一個(gè)阻塞一個(gè),直到大家都阻塞完畢,然后在優(yōu)先執(zhí)行構(gòu)造函數(shù)里面的線程,在喚醒所有阻塞的線程;CountDownLatch的計(jì)數(shù)器只能執(zhí)行一次,CyclicBarrier可以執(zhí)行多次,所以CyclicBarrier可以執(zhí)行復(fù)雜的業(yè)務(wù)場(chǎng)景。

參考 《Java 并發(fā)編程的藝術(shù)》

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI