您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)ReentrantLock中條件鎖是什么,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
問題
(1)條件鎖是什么?
(2)條件鎖適用于什么場(chǎng)景?
(3)條件鎖的await()是在其它線程signal()的時(shí)候喚醒的嗎?
簡(jiǎn)介
條件鎖,是指在獲取鎖之后發(fā)現(xiàn)當(dāng)前業(yè)務(wù)場(chǎng)景自己無法處理,而需要等待某個(gè)條件的出現(xiàn)才可以繼續(xù)處理時(shí)使用的一種鎖。
比如,在阻塞隊(duì)列中,當(dāng)隊(duì)列中沒有元素的時(shí)候是無法彈出一個(gè)元素的,這時(shí)候就需要阻塞在條件notEmpty上,等待其它線程往里面放入一個(gè)元素后,喚醒這個(gè)條件notEmpty,當(dāng)前線程才可以繼續(xù)去做“彈出一個(gè)元素”的行為。
注意,這里的條件,必須是在獲取鎖之后去等待,對(duì)應(yīng)到ReentrantLock的條件鎖,就是獲取鎖之后才能調(diào)用condition.await()方法。
在java中,條件鎖的實(shí)現(xiàn)都在AQS的ConditionObject類中,ConditionObject實(shí)現(xiàn)了Condition接口,下面我們通過一個(gè)例子來進(jìn)入到條件鎖的學(xué)習(xí)中。
使用示例
public class ReentrantLockTest { public static void main(String[] args) throws InterruptedException { // 聲明一個(gè)重入鎖 ReentrantLock lock = new ReentrantLock(); // 聲明一個(gè)條件鎖 Condition condition = lock.newCondition(); new Thread(()->{ try { lock.lock(); // 1 try { System.out.println("before await"); // 2 // 等待條件 condition.await(); // 3 System.out.println("after await"); // 10 } finally { lock.unlock(); // 11 } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 這里睡1000ms是為了讓上面的線程先獲取到鎖 Thread.sleep(1000); lock.lock(); // 4 try { // 這里睡2000ms代表這個(gè)線程執(zhí)行業(yè)務(wù)需要的時(shí)間 Thread.sleep(2000); // 5 System.out.println("before signal"); // 6 // 通知條件已成立 condition.signal(); // 7 System.out.println("after signal"); // 8 } finally { lock.unlock(); // 9 } } }
上面的代碼很簡(jiǎn)單,一個(gè)線程等待條件,另一個(gè)線程通知條件已成立,后面的數(shù)字代表代碼實(shí)際運(yùn)行的順序,如果你能把這個(gè)順序看懂基本條件鎖掌握得差不多了。
源碼分析
ConditionObject的主要屬性
public class ConditionObject implements Condition, java.io.Serializable { /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; }
可以看到條件鎖中也維護(hù)了一個(gè)隊(duì)列,為了和AQS的隊(duì)列區(qū)分,我這里稱為條件隊(duì)列,firstWaiter是隊(duì)列的頭節(jié)點(diǎn),lastWaiter是隊(duì)列的尾節(jié)點(diǎn),它們是干什么的呢?接著看。
lock.newCondition()方法
新建一個(gè)條件鎖。
// ReentrantLock.newCondition() public Condition newCondition() { return sync.newCondition(); } // ReentrantLock.Sync.newCondition() final ConditionObject newCondition() { return new ConditionObject(); } // AbstractQueuedSynchronizer.ConditionObject.ConditionObject() public ConditionObject() { }
新建一個(gè)條件鎖最后就是調(diào)用的AQS中的ConditionObject類來實(shí)例化條件鎖。
condition.await()方法
condition.await()方法,表明現(xiàn)在要等待條件的出現(xiàn)。
// AbstractQueuedSynchronizer.ConditionObject.await() public final void await() throws InterruptedException { // 如果線程中斷了,拋出異常 if (Thread.interrupted()) throw new InterruptedException(); // 添加節(jié)點(diǎn)到Condition的隊(duì)列中,并返回該節(jié)點(diǎn) Node node = addConditionWaiter(); // 完全釋放當(dāng)前線程獲取的鎖 // 因?yàn)殒i是可重入的,所以這里要把獲取的鎖全部釋放 int savedState = fullyRelease(node); int interruptMode = 0; // 是否在同步隊(duì)列中 while (!isOnSyncQueue(node)) { // 阻塞當(dāng)前線程 LockSupport.park(this); // 上面部分是調(diào)用await()時(shí)釋放自己占有的鎖,并阻塞自己等待條件的出現(xiàn) // *************************分界線************************* // // 下面部分是條件已經(jīng)出現(xiàn),嘗試去獲取鎖 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 嘗試獲取鎖,注意第二個(gè)參數(shù),這是上一章分析過的方法 // 如果沒獲取到會(huì)再次阻塞(這個(gè)方法這里就不貼出來了,有興趣的翻翻上一章的內(nèi)容) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除取消的節(jié)點(diǎn) if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 線程中斷相關(guān) if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter private Node addConditionWaiter() { Node t = lastWaiter; // 如果條件隊(duì)列的尾節(jié)點(diǎn)已取消,從頭節(jié)點(diǎn)開始清除所有已取消的節(jié)點(diǎn) if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); // 重新獲取尾節(jié)點(diǎn) t = lastWaiter; } // 新建一個(gè)節(jié)點(diǎn),它的等待狀態(tài)是CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果尾節(jié)點(diǎn)為空,則把新節(jié)點(diǎn)賦值給頭節(jié)點(diǎn)(相當(dāng)于初始化隊(duì)列) // 否則把新節(jié)點(diǎn)賦值給尾節(jié)點(diǎn)的nextWaiter指針 if (t == null) firstWaiter = node; else t.nextWaiter = node; // 尾節(jié)點(diǎn)指向新節(jié)點(diǎn) lastWaiter = node; // 返回新節(jié)點(diǎn) return node; } // AbstractQueuedSynchronizer.fullyRelease final int fullyRelease(Node node) { boolean failed = true; try { // 獲取狀態(tài)變量的值,重復(fù)獲取鎖,這個(gè)值會(huì)一直累加 // 所以這個(gè)值也代表著獲取鎖的次數(shù) int savedState = getState(); // 一次性釋放所有獲得的鎖 if (release(savedState)) { failed = false; // 返回獲取鎖的次數(shù) return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } // AbstractQueuedSynchronizer.isOnSyncQueue final boolean isOnSyncQueue(Node node) { // 如果等待狀態(tài)是CONDITION,或者前一個(gè)指針為空,返回false // 說明還沒有移到AQS的隊(duì)列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果next指針有值,說明已經(jīng)移到AQS的隊(duì)列中了 if (node.next != null) // If has successor, it must be on queue return true; // 從AQS的尾節(jié)點(diǎn)開始往前尋找看是否可以找到當(dāng)前節(jié)點(diǎn),找到了也說明已經(jīng)在AQS的隊(duì)列中了 return findNodeFromTail(node); }
這里有幾個(gè)難理解的點(diǎn):
(1)Condition的隊(duì)列和AQS的隊(duì)列不完全一樣;
AQS的隊(duì)列頭節(jié)點(diǎn)是不存在任何值的,是一個(gè)虛節(jié)點(diǎn);
Condition的隊(duì)列頭節(jié)點(diǎn)是存儲(chǔ)著實(shí)實(shí)在在的元素值的,是真實(shí)節(jié)點(diǎn)。
(2)各種等待狀態(tài)(waitStatus)的變化;
首先,在條件隊(duì)列中,新建節(jié)點(diǎn)的初始等待狀態(tài)是CONDITION(-2);
其次,移到AQS的隊(duì)列中時(shí)等待狀態(tài)會(huì)更改為0(AQS隊(duì)列節(jié)點(diǎn)的初始等待狀態(tài)為0);
然后,在AQS的隊(duì)列中如果需要阻塞,會(huì)把它上一個(gè)節(jié)點(diǎn)的等待狀態(tài)設(shè)置為SIGNAL(-1);
最后,不管在Condition隊(duì)列還是AQS隊(duì)列中,已取消的節(jié)點(diǎn)的等待狀態(tài)都會(huì)設(shè)置為CANCELLED(1);
另外,后面我們?cè)诠蚕礞i的時(shí)候還會(huì)講到另外一種等待狀態(tài)叫PROPAGATE(-3)。
(3)相似的名稱;
AQS中下一個(gè)節(jié)點(diǎn)是next,上一個(gè)節(jié)點(diǎn)是prev;
Condition中下一個(gè)節(jié)點(diǎn)是nextWaiter,沒有上一個(gè)節(jié)點(diǎn)。
如果弄明白了這幾個(gè)點(diǎn),看懂上面的代碼還是輕松加愉快的,如果沒弄明白,彤哥這里指出來了,希望您回頭再看看上面的代碼。
下面總結(jié)一下await()方法的大致流程:
(1)新建一個(gè)節(jié)點(diǎn)加入到條件隊(duì)列中去;
(2)完全釋放當(dāng)前線程占有的鎖;
(3)阻塞當(dāng)前線程,并等待條件的出現(xiàn);
(4)條件已出現(xiàn)(此時(shí)節(jié)點(diǎn)已經(jīng)移到AQS的隊(duì)列中),嘗試獲取鎖;
也就是說await()方法內(nèi)部其實(shí)是先釋放鎖->等待條件->再次獲取鎖的過程。
condition.signal()方法
condition.signal()方法通知條件已經(jīng)出現(xiàn)。
// AbstractQueuedSynchronizer.ConditionObject.signal public final void signal() { // 如果不是當(dāng)前線程占有著鎖,調(diào)用這個(gè)方法拋出異常 // 說明signal()也要在獲取鎖之后執(zhí)行 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 條件隊(duì)列的頭節(jié)點(diǎn) Node first = firstWaiter; // 如果有等待條件的節(jié)點(diǎn),則通知它條件已成立 if (first != null) doSignal(first); } // AbstractQueuedSynchronizer.ConditionObject.doSignal private void doSignal(Node first) { do { // 移到條件隊(duì)列的頭節(jié)點(diǎn)往后一位 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 相當(dāng)于把頭節(jié)點(diǎn)從隊(duì)列中出隊(duì) first.nextWaiter = null; // 轉(zhuǎn)移節(jié)點(diǎn)到AQS隊(duì)列中 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // AbstractQueuedSynchronizer.transferForSignal final boolean transferForSignal(Node node) { // 把節(jié)點(diǎn)的狀態(tài)更改為0,也就是說即將移到AQS隊(duì)列中 // 如果失敗了,說明節(jié)點(diǎn)已經(jīng)被改成取消狀態(tài)了 // 返回false,通過上面的循環(huán)可知會(huì)尋找下一個(gè)可用節(jié)點(diǎn) if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 調(diào)用AQS的入隊(duì)方法把節(jié)點(diǎn)移到AQS的隊(duì)列中 // 注意,這里enq()的返回值是node的上一個(gè)節(jié)點(diǎn),也就是舊尾節(jié)點(diǎn) Node p = enq(node); // 上一個(gè)節(jié)點(diǎn)的等待狀態(tài) int ws = p.waitStatus; // 如果上一個(gè)節(jié)點(diǎn)已取消了,或者更新狀態(tài)為SIGNAL失敗(也是說明上一個(gè)節(jié)點(diǎn)已經(jīng)取消了) // 則直接喚醒當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)的線程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 如果更新上一個(gè)節(jié)點(diǎn)的等待狀態(tài)為SIGNAL成功了 // 則返回true,這時(shí)上面的循環(huán)不成立了,退出循環(huán),也就是只通知了一個(gè)節(jié)點(diǎn) // 此時(shí)當(dāng)前節(jié)點(diǎn)還是阻塞狀態(tài) // 也就是說調(diào)用signal()的時(shí)候并不會(huì)真正喚醒一個(gè)節(jié)點(diǎn) // 只是把節(jié)點(diǎn)從條件隊(duì)列移到AQS隊(duì)列中 return true; }
signal()方法的大致流程為:
(1)從條件隊(duì)列的頭節(jié)點(diǎn)開始尋找一個(gè)非取消狀態(tài)的節(jié)點(diǎn);
(2)把它從條件隊(duì)列移到AQS隊(duì)列;
(3)且只移動(dòng)一個(gè)節(jié)點(diǎn);
注意,這里調(diào)用signal()方法后并不會(huì)真正喚醒一個(gè)節(jié)點(diǎn),那么,喚醒一個(gè)節(jié)點(diǎn)是在啥時(shí)候呢?
還記得開頭例子嗎?倒回去再好好看看,signal()方法后,最終會(huì)執(zhí)行l(wèi)ock.unlock()方法,此時(shí)才會(huì)真正喚醒一個(gè)節(jié)點(diǎn),喚醒的這個(gè)節(jié)點(diǎn)如果曾經(jīng)是條件節(jié)點(diǎn)的話又會(huì)繼續(xù)執(zhí)行await()方法“分界線”下面的代碼。
結(jié)束了,仔細(xì)體會(huì)下^^
如果非要用一個(gè)圖來表示的話,我想下面這個(gè)圖可以大致表示一下(這里是用時(shí)序圖畫的,但是實(shí)際并不能算作一個(gè)真正的時(shí)序圖哈,了解就好):
總結(jié)
(1)重入鎖是指可重復(fù)獲取的鎖,即一個(gè)線程獲取鎖之后再嘗試獲取鎖時(shí)會(huì)自動(dòng)獲取鎖;
(2)在ReentrantLock中重入鎖是通過不斷累加state變量的值實(shí)現(xiàn)的;
(3)ReentrantLock的釋放要跟獲取匹配,即獲取了幾次也要釋放幾次;
(4)ReentrantLock默認(rèn)是非公平模式,因?yàn)榉枪侥J叫矢撸?/p>
(5)條件鎖是指為了等待某個(gè)條件出現(xiàn)而使用的一種鎖;
(6)條件鎖比較經(jīng)典的使用場(chǎng)景就是隊(duì)列為空時(shí)阻塞在條件notEmpty上;
(7)ReentrantLock中的條件鎖是通過AQS的ConditionObject內(nèi)部類實(shí)現(xiàn)的;
(8)await()和signal()方法都必須在獲取鎖之后釋放鎖之前使用;
(9)await()方法會(huì)新建一個(gè)節(jié)點(diǎn)放到條件隊(duì)列中,接著完全釋放鎖,然后阻塞當(dāng)前線程并等待條件的出現(xiàn);
(10)signal()方法會(huì)尋找條件隊(duì)列中第一個(gè)可用節(jié)點(diǎn)移到AQS隊(duì)列中;
(11)在調(diào)用signal()方法的線程調(diào)用unlock()方法才真正喚醒阻塞在條件上的節(jié)點(diǎn)(此時(shí)節(jié)點(diǎn)已經(jīng)在AQS隊(duì)列中);
(12)之后該節(jié)點(diǎn)會(huì)再次嘗試獲取鎖,后面的邏輯與lock()的邏輯基本一致了。
關(guān)于“ReentrantLock中條件鎖是什么”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。