您好,登錄后才能下訂單哦!
一、導(dǎo)語(yǔ)
最近在學(xué)習(xí)并發(fā)編程原理,所以準(zhǔn)備整理一下自己學(xué)到的知識(shí),先寫(xiě)一篇CountDownLatch的源碼分析,之后希望可以慢慢寫(xiě)完整個(gè)并發(fā)編程。
二、什么是CountDownLatch
CountDownLatch是java的JUC并發(fā)包里的一個(gè)工具類,可以理解為一個(gè)倒計(jì)時(shí)器,主要是用來(lái)控制多個(gè)線程之間的通信。
比如有一個(gè)主線程A,它要等待其他4個(gè)子線程執(zhí)行完畢之后才能執(zhí)行,此時(shí)就可以利用CountDownLatch來(lái)實(shí)現(xiàn)這種功能了。
三、簡(jiǎn)單使用
public static void main(String[] args){
System.out.println("主線程和他的兩個(gè)小兄弟約好去吃火鍋");
System.out.println("主線程進(jìn)入了飯店");
System.out.println("主線程想要開(kāi)始動(dòng)筷子吃飯");
//new一個(gè)計(jì)數(shù)器,初始值為2,當(dāng)計(jì)數(shù)器為0時(shí),主線程開(kāi)始執(zhí)行
CountDownLatch latch = new CountDownLatch(2);
new Thread(){
public void run() {
try {
System.out.println("子線程1——小兄弟A 正在到飯店的路上");
Thread.sleep(3000);
System.out.println("子線程1——小兄弟A 到飯店了");
//一個(gè)小兄弟到了,計(jì)數(shù)器-1
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("子線程2——小兄弟B 正在到飯店的路上");
Thread.sleep(3000);
System.out.println("子線程2——小兄弟B 到飯店了");
//另一個(gè)小兄弟到了,計(jì)數(shù)器-1
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
//主線程等待,直到其他兩個(gè)小兄弟也進(jìn)入飯店(計(jì)數(shù)器==0),主線程才能吃飯
latch.await();
System.out.println("主線程終于可以開(kāi)始吃飯了~");
}
四、源碼分析
核心代碼:
CountDownLatch latch = new CountDownLatch(1);
latch.await();
latch.countDown();
其中構(gòu)造函數(shù)的參數(shù)是計(jì)數(shù)器的值;
await()方法是用來(lái)阻塞線程,直到計(jì)數(shù)器的值為0
countDown()方法是執(zhí)行計(jì)數(shù)器-1操作
1、首先來(lái)看構(gòu)造函數(shù)的代碼
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
這段代碼很簡(jiǎn)單,首先if判斷傳入的count是否<0,如果小于0直接拋異常。
然后new一個(gè)類Sync,這個(gè)Sync是什么呢?我們一起來(lái)看下
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//嘗試獲取共享鎖
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//嘗試釋放共享鎖
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
可以看到Sync是一個(gè)內(nèi)部類,繼承了AQS,AQS是一個(gè)同步器,之后我們會(huì)詳細(xì)講。
其中有幾個(gè)核心點(diǎn):
變量 state是父類AQS里面的變量,在這里的語(yǔ)義是計(jì)數(shù)器的值
getState()方法也是父類AQS里的方法,很簡(jiǎn)單,就是獲取state的值
tryAcquireShared和tryReleaseShared也是父類AQS里面的方法,在這里CountDownLatch對(duì)他們進(jìn)行了重寫(xiě),先有個(gè)印象,之后詳講。
2、了解了CountDownLatch的構(gòu)造函數(shù)之后,我們?cè)賮?lái)看它的核心代碼,首先是await()。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
可以看到,其實(shí)是通過(guò)內(nèi)部類Sync調(diào)用了父類AQS的acquireSharedInterruptibly()方法。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判斷線程是否是中斷狀態(tài)
if (Thread.interrupted())
throw new InterruptedException();
//嘗試獲取state的值
if (tryAcquireShared(arg) < 0)//step1
doAcquireSharedInterruptibly(arg);//step2
}
tryAcquireShared(arg)這個(gè)方法就是我們剛才在Sync內(nèi)看到的重寫(xiě)父類AQS的方法,意思就是判斷是否getState() == 0,如果state為0,返回1,則step1處不進(jìn)入if體內(nèi)acquireSharedInterruptibly(int arg)方法執(zhí)行完畢。若state!=0,則返回-1,進(jìn)入if體內(nèi)step2處。
下面我們來(lái)看acquireSharedInterruptibly(int arg)方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//step1、把當(dāng)前線程封裝為共享類型的Node,加入隊(duì)列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//step2、獲取當(dāng)前node的前一個(gè)元素
final Node p = node.predecessor();
//step3、如果前一個(gè)元素是隊(duì)首
if (p == head) {
//step4、再次調(diào)用tryAcquireShared()方法,判斷state的值是否為0
int r = tryAcquireShared(arg);
//step5、如果state的值==0
if (r >= 0) {
//step6、設(shè)置當(dāng)前node為隊(duì)首,并嘗試釋放共享鎖
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//step7、是否可以安心掛起當(dāng)前線程,是就掛起;并且判斷當(dāng)前線程是否中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//step8、如果出現(xiàn)異常,failed沒(méi)有更新為false,則把當(dāng)前node從隊(duì)列中取消
if (failed)
cancelAcquire(node);
}
}
按照代碼中的注釋,我們可以大概了解該方法的內(nèi)容,下面我們來(lái)仔細(xì)看下其中調(diào)用的一些方法是干什么的。
1、首先看addWaiter()
//step1
private Node addWaiter(Node mode) {
//把當(dāng)前線程封裝為node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//獲取當(dāng)前隊(duì)列的隊(duì)尾tail,并賦值給pred
Node pred = tail;
//如果pred!=null,即當(dāng)前隊(duì)尾不為null
if (pred != null) {
//把當(dāng)前隊(duì)尾tail,變成當(dāng)前node的前繼節(jié)點(diǎn)
node.prev = pred;
//cas更新當(dāng)前node為新的隊(duì)尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果隊(duì)尾為空,走enq方法
enq(node);//step1.1
return node;
}
//step1.1
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果隊(duì)尾tail為null,初始化隊(duì)列
if (t == null) { // Must initialize
//cas設(shè)置一個(gè)新的空node為隊(duì)首
if (compareAndSetHead(new Node()))
tail = head;
} else {
//cas把當(dāng)前node設(shè)置為新隊(duì)尾,把前隊(duì)尾設(shè)置成當(dāng)前node的前繼節(jié)點(diǎn)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2、接下來(lái)我們?cè)趤?lái)看setHeadAndPropagate()方法,看其內(nèi)部實(shí)現(xiàn)
//step6
private void setHeadAndPropagate(Node node, int propagate) {
//獲取隊(duì)首head
Node h = head; // Record old head for check below
//設(shè)置當(dāng)前node為隊(duì)首,并取消node所關(guān)聯(lián)的線程
setHead(node);
//
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果當(dāng)前node的后繼節(jié)點(diǎn)為null或者是shared類型的
if (s == null || s.isShared())
//釋放鎖,喚醒下一個(gè)線程
doReleaseShared();//step6.1
}
}
//step6.1
private void doReleaseShared() {
for (;;) {
//找到頭節(jié)點(diǎn)
Node h = head;
if (h != null && h != tail) {
//獲取頭節(jié)點(diǎn)狀態(tài)
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒head節(jié)點(diǎn)的next節(jié)點(diǎn)
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
3、接下來(lái)我們來(lái)看countDown()方法。
public void countDown() {
sync.releaseShared(1);
}
可以看到調(diào)用的是父類AQS的releaseShared 方法
public final boolean releaseShared(int arg) {
//state-1
if (tryReleaseShared(arg)) {//step1
//喚醒等待線程,內(nèi)部調(diào)用的是LockSupport.unpark方法
doReleaseShared();//step2
return true;
}
return false;
}
//step1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//獲取當(dāng)前state的值
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//cas操作來(lái)進(jìn)行原子減1
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
五、總結(jié)
CountDownLatch主要是通過(guò)計(jì)數(shù)器state來(lái)控制是否可以執(zhí)行其他操作,如果不能就通過(guò)LockSupport.park()方法掛起線程,直到其他線程執(zhí)行完畢后喚醒它。 下面我們通過(guò)一個(gè)簡(jiǎn)單的圖來(lái)幫助我們理解一下:
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。