溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java并發(fā)編程中CountDownLatch源碼分析

發(fā)布時(shí)間:2022-02-28 10:49:52 來(lái)源:億速云 閱讀:122 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹了Java并發(fā)編程中CountDownLatch源碼分析的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Java并發(fā)編程中CountDownLatch源碼分析文章都會(huì)有所收獲,下面我們一起來(lái)看看吧。

一、前言

CountDownLatch維護(hù)了一個(gè)計(jì)數(shù)器(還是是state字段),調(diào)用countDown方法會(huì)將計(jì)數(shù)器減1,調(diào)用await方法會(huì)阻塞線程直到計(jì)數(shù)器變?yōu)???梢杂糜趯?shí)現(xiàn)一個(gè)線程等待所有子線程任務(wù)完成之后再繼續(xù)執(zhí)行的邏輯,也可以實(shí)現(xiàn)類似簡(jiǎn)易CyclicBarrier的功能,達(dá)到讓多個(gè)線程等待同時(shí)開(kāi)始執(zhí)行某一段邏輯目的。

二、使用

  • 一個(gè)線程等待其它線程執(zhí)行完再繼續(xù)執(zhí)行

	......
	CountDownLatch cdl = new CountDownLatch(10);
	ExecutorService es = Executors.newFixedThreadPool(10);
	for (int i = 0; i < 10; i++) {
		es.execute(() -> {
			//do something
			cdl.countDown();
		});
	}
	cdl.await();
	......
  • 實(shí)現(xiàn)類似CyclicBarrier的功能,先await,再countDown

	......
        CountDownLatch cdl = new CountDownLatch(1);
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            es.execute(() -> {
                cdl.await();
                //do something
            });
        }
        Thread.sleep(10000L);
        cdl.countDown();
        ......

三、源碼分析

CountDownLatch的結(jié)構(gòu)和ReentrantLock、Semaphore的結(jié)構(gòu)類似,也是使用的內(nèi)部類Sync繼承AQS的方式,并且重寫了tryAcquireShared和tryReleaseShared方法。

還是首先來(lái)看構(gòu)造函數(shù):

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

需要傳入一個(gè)大于0的count,代表CountDownLatch計(jì)數(shù)器的初始值,通過(guò)Sync的構(gòu)造函數(shù)最終賦值給父類AQS的state字段??梢粋€(gè)看到這個(gè)state字段用法多多,在ReentrantLock中使用0和1來(lái)標(biāo)識(shí)鎖的狀態(tài),Semaphore中用來(lái)標(biāo)識(shí)信號(hào)量,此處又用來(lái)表示計(jì)數(shù)器。

CountDownLatch要通過(guò)await方法完成阻塞,先來(lái)看看這個(gè)方法是如何實(shí)現(xiàn)的:

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

調(diào)用的是sync的acquireSharedInterruptibly方法,該方法定義在AQS中,Semaphore也調(diào)用的這個(gè)方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

這個(gè)方法的邏輯前面在解析SemaPhore的時(shí)候細(xì)說(shuō)過(guò)了,這里不再贅述,主要就是兩個(gè)方法的調(diào)用,先通過(guò)tryAcquireShared方法嘗試獲取"許可",返回值代表此次獲取后的剩余量,如果大于等于0表示獲取成功,否則表示失敗。如果失敗,那么就會(huì)進(jìn)入doAcquireSharedInterruptibly方法執(zhí)行入隊(duì)阻塞的邏輯。這里我們主要到CountDownLatch中看看tryAcquireShared方法的實(shí)現(xiàn):

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

和Semaphore的實(shí)現(xiàn)中每次將state減去requires不同,這里直接判斷state是否為0,如果為0那么返回1,表示獲取"許可"成功;如果不為0,表示失敗,則需要入隊(duì)阻塞。從這個(gè)tryAcquireShared方法就能看出CountDownLatch的邏輯了:等到state變?yōu)榱?,那么所有線程都能獲取運(yùn)行許可。

那么我們接下來(lái)來(lái)到countDown方法:

public void countDown() {
        sync.releaseShared(1);
    }

調(diào)用的是sync的releaseShared方法,該方法定義在父類AQS中,Semaphore使用的也是這個(gè)方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
        	//當(dāng)state從非
            doReleaseShared();
            return true;
        }
        return false;
    }

前面提到了CountDownLatch也重寫了tryReleaseShared方法:

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                	//如果state等于0了直接返回false
                	//保證在并發(fā)情況下,最多只會(huì)有一個(gè)線程返回true
                	//也包括調(diào)用countDown的次數(shù)超過(guò)state的初始值
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                	//如果返回true,表示state從非0變?yōu)榱?
                	//那么后續(xù)需要喚醒阻塞線程
                    return nextc == 0;
            }
        }

Semaphore在釋放信號(hào)量的時(shí)候,是將獲取的許可歸還到state中,但是CountDownLatch沒(méi)有獲取許可的邏輯(獲取許可的時(shí)候是判斷state是否等于0),所以在countDown的時(shí)候沒(méi)有釋放的邏輯,就是將state減1,然后根據(jù)state減1之后的值是否為0判斷release是否成功,如果state本來(lái)大于0,經(jīng)過(guò)減1之后變?yōu)榱?,那么返回true。tryReleaseShared方法的返回值決定了后續(xù)需不需要調(diào)用doReleaseShared方法喚醒阻塞線程。

這里有個(gè)邏輯:如果state已經(jīng)為0,那么返回false。這個(gè)主要應(yīng)對(duì)兩種情況:

  • 調(diào)用countDown的次數(shù)超過(guò)了state的初始值多

  • 線程并發(fā)調(diào)用的時(shí)候保證只有一個(gè)線程去完成阻塞線程的喚醒操作

可以看到CountDownLatch沒(méi)有鎖的概念,countDown方法可以被一個(gè)線程重復(fù)調(diào)用,只需要對(duì)state做reduce操作,而不用關(guān)心是誰(shuí)做的reduce。如果tryReleaseShared返回true,那么表示需要在后面進(jìn)入doReleaseShared方法,該方法和Semaphore中調(diào)用的方法是同一個(gè),主要是喚醒阻塞線程或者設(shè)置PROPAGAGE狀態(tài),這里也不再贅述~

阻塞線程被喚醒之后,會(huì)在doAcquireSharedInterruptibly方法中繼續(xù)循環(huán),雖然和Semaphore調(diào)用的是同樣的方法,但是這里有不一樣的地方,所以還是提一句。我們首先回到doAcquireSharedInterruptibly方法:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                	//如果head.next被unpark喚醒,說(shuō)明此時(shí)state==0
                	//那么tryAcquireShared會(huì)返回1
                    int r = tryAcquireShared(arg);
                    //r==1
                    if (r >= 0) {
                    	//node節(jié)點(diǎn)被喚醒后,還會(huì)繼續(xù)喚醒node.next
                    	//這樣依次傳遞,因?yàn)樵谶@里的r一定為1
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

當(dāng)head.next線程被unpark喚醒后,會(huì)進(jìn)入tryAcquireShared方法判斷,由于此時(shí)state已經(jīng)為0(只有當(dāng)state變?yōu)?時(shí),才會(huì)unpark喚醒線程),而前面提到了在CountDownLatch重寫的tryAcquireShared中,如果state==0,那么會(huì)返回1,所以會(huì)進(jìn)入setHeadAndPropagate方法:

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

該方法在Semaphore中詳細(xì)介紹過(guò),這里我們就站在CountDownLatch的角度來(lái)看看。其實(shí)很簡(jiǎn)單了,注意此時(shí)該方法的propagate參數(shù)值是1,那么就會(huì)進(jìn)入到下面的if邏輯里,繼續(xù)喚醒下一個(gè)node。當(dāng)下一個(gè)node對(duì)應(yīng)的線程被喚醒后,同樣會(huì)進(jìn)入setHeadAndPropagate方法,propagage同樣為1,那么繼續(xù)喚醒下一個(gè)node,就這樣依次將整個(gè)CLH隊(duì)列的節(jié)點(diǎn)都喚醒。

關(guān)于“Java并發(fā)編程中CountDownLatch源碼分析”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Java并發(fā)編程中CountDownLatch源碼分析”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI