您好,登錄后才能下訂單哦!
基本概念
本章,我們會(huì)講解“線程獲取公平鎖”的原理;在講解之前,需要了解幾個(gè)基本概念。后面的內(nèi)容,都是基于這些概念的;這些概念可能比較枯燥,但從這些概念中,能窺見(jiàn)“java鎖”的一些架構(gòu),這對(duì)我們了解鎖是有幫助的。
1. AQS -- 指AbstractQueuedSynchronizer類。
AQS是java中管理“鎖”的抽象類,鎖的許多公共方法都是在這個(gè)類中實(shí)現(xiàn)。AQS是獨(dú)占鎖(例如,ReentrantLock)和共享鎖(例如,Semaphore)的公共父類。
2. AQS鎖的類別 -- 分為“獨(dú)占鎖”和“共享鎖”兩種。
(01) 獨(dú)占鎖 -- 鎖在一個(gè)時(shí)間點(diǎn)只能被一個(gè)線程鎖占有。根據(jù)鎖的獲取機(jī)制,它又劃分為“公平鎖”和“非公平鎖”。公平鎖,是按照通過(guò)CLH等待線程按照先來(lái)先得的規(guī)則,公平的獲取鎖;而非公平鎖,則當(dāng)線程要獲取鎖時(shí),它會(huì)無(wú)視CLH等待隊(duì)列而直接獲取鎖。獨(dú)占鎖的典型實(shí)例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是獨(dú)占鎖。
(02) 共享鎖 -- 能被多個(gè)線程同時(shí)擁有,能被共享的鎖。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享鎖。這些鎖的用途和原理,在以后的章節(jié)再詳細(xì)介紹。
3. CLH隊(duì)列 -- Craig, Landin, and Hagersten lock queue
CLH隊(duì)列是AQS中“等待鎖”的線程隊(duì)列。在多線程中,為了保護(hù)競(jìng)爭(zhēng)資源不被多個(gè)線程同時(shí)操作而起來(lái)錯(cuò)誤,我們常常需要通過(guò)鎖來(lái)保護(hù)這些資源。在獨(dú)占鎖中,競(jìng)爭(zhēng)資源在一個(gè)時(shí)間點(diǎn)只能被一個(gè)線程鎖訪問(wèn);而其它線程則需要等待。CLH就是管理這些“等待鎖”的線程的隊(duì)列。
CLH是一個(gè)非阻塞的 FIFO 隊(duì)列。也就是說(shuō)往里面插入或移除一個(gè)節(jié)點(diǎn)的時(shí)候,在并發(fā)條件下不會(huì)阻塞,而是通過(guò)自旋鎖和 CAS 保證節(jié)點(diǎn)插入和移除的原子性。
4. CAS函數(shù) -- Compare And Swap
CAS函數(shù),是比較并交換函數(shù),它是原子操作函數(shù);即,通過(guò)CAS操作的數(shù)據(jù)都是以原子方式進(jìn)行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函數(shù)。它們共同的特點(diǎn)是,這些函數(shù)所執(zhí)行的動(dòng)作是以原子的方式進(jìn)行的。
本章是圍繞“公平鎖”如何獲取鎖而層次展開(kāi)?!肮芥i”涉及到的知識(shí)點(diǎn)比較多,但總的來(lái)說(shuō),不是特別難;如果讀者能讀懂AQS和ReentrantLock.java這兩個(gè)類的大致意思,理解鎖的原理和機(jī)制也就不成問(wèn)題了。本章只是作者本人對(duì)鎖的一點(diǎn)點(diǎn)理解,希望這部分知識(shí)能幫助您了解“公平鎖”的獲取過(guò)程,認(rèn)識(shí)“鎖”的框架。
ReentrantLock數(shù)據(jù)結(jié)構(gòu)
ReentrantLock的UML類圖
從圖中可以看出:
(01) ReentrantLock實(shí)現(xiàn)了Lock接口。
(02) ReentrantLock與sync是組合關(guān)系。ReentrantLock中,包含了Sync對(duì)象;而且,Sync是AQS的子類;更重要的是,Sync有兩個(gè)子類FairSync(公平鎖)和NonFairSync(非公平鎖)。ReentrantLock是一個(gè)獨(dú)占鎖,至于它到底是公平鎖還是非公平鎖,就取決于sync對(duì)象是"FairSync的實(shí)例"還是"NonFairSync的實(shí)例"。
獲取公平鎖(基于JDK1.7.0_40)
通過(guò)前面“Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock”的“示例1”,我們知道,獲取鎖是通過(guò)lock()函數(shù)。下面,我們以lock()對(duì)獲取公平鎖的過(guò)程進(jìn)行展開(kāi)。
1. lock()
lock()在ReentrantLock.java的FairSync類中實(shí)現(xiàn),它的源碼如下:
final void lock() { acquire(1); }
說(shuō)明:“當(dāng)前線程”實(shí)際上是通過(guò)acquire(1)獲取鎖的。
這里說(shuō)明一下“1”的含義,它是設(shè)置“鎖的狀態(tài)”的參數(shù)。對(duì)于“獨(dú)占鎖”而言,鎖處于可獲取狀態(tài)時(shí),它的狀態(tài)值是0;鎖被線程初次獲取到了,它的狀態(tài)值就變成了1。
由于ReentrantLock(公平鎖/非公平鎖)是可重入鎖,所以“獨(dú)占鎖”可以被單個(gè)線程多此獲取,每獲取1次就將鎖的狀態(tài)+1。也就是說(shuō),初次獲取鎖時(shí),通過(guò)acquire(1)將鎖的狀態(tài)值設(shè)為1;再次獲取鎖時(shí),將鎖的狀態(tài)值設(shè)為2;依次類推...這就是為什么獲取鎖時(shí),傳入的參數(shù)是1的原因了。
可重入就是指鎖可以被單個(gè)線程多次獲取。
2. acquire()
acquire()在AQS中實(shí)現(xiàn)的,它的源碼如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(01) “當(dāng)前線程”首先通過(guò)tryAcquire()嘗試獲取鎖。獲取成功的話,直接返回;嘗試失敗的話,進(jìn)入到等待隊(duì)列排序等待(前面還有可能有需要線程在等待該鎖)。
(02) “當(dāng)前線程”嘗試失敗的情況下,先通過(guò)addWaiter(Node.EXCLUSIVE)來(lái)將“當(dāng)前線程”加入到"CLH隊(duì)列(非阻塞的FIFO隊(duì)列)"末尾。CLH隊(duì)列就是線程等待隊(duì)列。
(03) 再執(zhí)行完addWaiter(Node.EXCLUSIVE)之后,會(huì)調(diào)用acquireQueued()來(lái)獲取鎖。由于此時(shí)ReentrantLock是公平鎖,它會(huì)根據(jù)公平性原則來(lái)獲取鎖。
(04) “當(dāng)前線程”在執(zhí)行acquireQueued()時(shí),會(huì)進(jìn)入到CLH隊(duì)列中休眠等待,直到獲取鎖了才返回!如果“當(dāng)前線程”在休眠等待過(guò)程中被中斷過(guò),acquireQueued會(huì)返回true,此時(shí)"當(dāng)前線程"會(huì)調(diào)用selfInterrupt()來(lái)自己給自己產(chǎn)生一個(gè)中斷。至于為什么要自己給自己產(chǎn)生一個(gè)中斷,后面再介紹。
上面是對(duì)acquire()的概括性說(shuō)明。下面,我們將該函數(shù)分為4部分來(lái)逐步解析。
一. tryAcquire()
1. tryAcquire()
公平鎖的tryAcquire()在ReentrantLock.java的FairSync類中實(shí)現(xiàn),源碼如下:
protected final boolean tryAcquire(int acquires) { // 獲取“當(dāng)前線程” final Thread current = Thread.currentThread(); // 獲取“獨(dú)占鎖”的狀態(tài) int c = getState(); // c=0意味著“鎖沒(méi)有被任何線程鎖擁有”, if (c == 0) { // 若“鎖沒(méi)有被任何線程鎖擁有”, // 則判斷“當(dāng)前線程”是不是CLH隊(duì)列中的第一個(gè)線程線程, // 若是的話,則獲取該鎖,設(shè)置鎖的狀態(tài),并切設(shè)置鎖的擁有者為“當(dāng)前線程”。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果“獨(dú)占鎖”的擁有者已經(jīng)為“當(dāng)前線程”, // 則將更新鎖的狀態(tài)。 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
說(shuō)明:根據(jù)代碼,我們可以分析出,tryAcquire()的作用就是嘗試去獲取鎖。注意,這里只是嘗試!
嘗試成功的話,返回true;嘗試失敗的話,返回false,后續(xù)再通過(guò)其它辦法來(lái)獲取該鎖。后面我們會(huì)說(shuō)明,在嘗試失敗的情況下,是如何一步步獲取鎖的。
2. hasQueuedPredecessors()
hasQueuedPredecessors()在AQS中實(shí)現(xiàn),源碼如下:
public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
說(shuō)明: 通過(guò)代碼,能分析出,hasQueuedPredecessors() 是通過(guò)判斷"當(dāng)前線程"是不是在CLH隊(duì)列的隊(duì)首,來(lái)返回AQS中是不是有比“當(dāng)前線程”等待更久的線程。下面對(duì)head、tail和Node進(jìn)行說(shuō)明。
3. Node的源碼
Node就是CLH隊(duì)列的節(jié)點(diǎn)。Node在AQS中實(shí)現(xiàn),它的數(shù)據(jù)結(jié)構(gòu)如下:
private transient volatile Node head; // CLH隊(duì)列的隊(duì)首 private transient volatile Node tail; // CLH隊(duì)列的隊(duì)尾 // CLH隊(duì)列的節(jié)點(diǎn) static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 線程已被取消,對(duì)應(yīng)的waitStatus的值 static final int CANCELLED = 1; // “當(dāng)前線程的后繼線程需要被unpark(喚醒)”,對(duì)應(yīng)的waitStatus的值。 // 一般發(fā)生情況是:當(dāng)前線程的后繼線程處于阻塞狀態(tài),而當(dāng)前線程被release或cancel掉,因此需要喚醒當(dāng)前線程的后繼線程。 static final int SIGNAL = -1; // 線程(處在Condition休眠狀態(tài))在等待Condition喚醒,對(duì)應(yīng)的waitStatus的值 static final int CONDITION = -2; // (共享鎖)其它線程獲取到“共享鎖”,對(duì)應(yīng)的waitStatus的值 static final int PROPAGATE = -3; // waitStatus為“CANCELLED, SIGNAL, CONDITION, PROPAGATE”時(shí)分別表示不同狀態(tài), // 若waitStatus=0,則意味著當(dāng)前線程不屬于上面的任何一種狀態(tài)。 volatile int waitStatus; // 前一節(jié)點(diǎn) volatile Node prev; // 后一節(jié)點(diǎn) volatile Node next; // 節(jié)點(diǎn)所對(duì)應(yīng)的線程 volatile Thread thread; // nextWaiter是“區(qū)別當(dāng)前CLH隊(duì)列是 ‘獨(dú)占鎖'隊(duì)列 還是 ‘共享鎖'隊(duì)列 的標(biāo)記” // 若nextWaiter=SHARED,則CLH隊(duì)列是“獨(dú)占鎖”隊(duì)列; // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),則CLH隊(duì)列是“共享鎖”隊(duì)列。 Node nextWaiter; // “共享鎖”則返回true,“獨(dú)占鎖”則返回false。 final boolean isShared() { return nextWaiter == SHARED; } // 返回前一節(jié)點(diǎn) final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } // 構(gòu)造函數(shù)。thread是節(jié)點(diǎn)所對(duì)應(yīng)的線程,mode是用來(lái)表示thread的鎖是“獨(dú)占鎖”還是“共享鎖”。 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 構(gòu)造函數(shù)。thread是節(jié)點(diǎn)所對(duì)應(yīng)的線程,waitStatus是線程的等待狀態(tài)。 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
說(shuō)明:
Node是CLH隊(duì)列的節(jié)點(diǎn),代表“等待鎖的線程隊(duì)列”。
(01) 每個(gè)Node都會(huì)一個(gè)線程對(duì)應(yīng)。
(02) 每個(gè)Node會(huì)通過(guò)prev和next分別指向上一個(gè)節(jié)點(diǎn)和下一個(gè)節(jié)點(diǎn),這分別代表上一個(gè)等待線程和下一個(gè)等待線程。
(03) Node通過(guò)waitStatus保存線程的等待狀態(tài)。
(04) Node通過(guò)nextWaiter來(lái)區(qū)分線程是“獨(dú)占鎖”線程還是“共享鎖”線程。如果是“獨(dú)占鎖”線程,則nextWaiter的值為EXCLUSIVE;如果是“共享鎖”線程,則nextWaiter的值是SHARED。
4. compareAndSetState()
compareAndSetState()在AQS中實(shí)現(xiàn)。它的源碼如下:
protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
說(shuō)明: compareAndSwapInt() 是sun.misc.Unsafe類中的一個(gè)本地方法。對(duì)此,我們需要了解的是 compareAndSetState(expect, update) 是以原子的方式操作當(dāng)前線程;若當(dāng)前線程的狀態(tài)為expect,則設(shè)置它的狀態(tài)為update。
5. setExclusiveOwnerThread()
setExclusiveOwnerThread()在AbstractOwnableSynchronizer.java中實(shí)現(xiàn),它的源碼如下:
// exclusiveOwnerThread是當(dāng)前擁有“獨(dú)占鎖”的線程 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; }
說(shuō)明:setExclusiveOwnerThread()的作用就是,設(shè)置線程t為當(dāng)前擁有“獨(dú)占鎖”的線程。
6. getState(), setState()
getState()和setState()都在AQS中實(shí)現(xiàn),源碼如下:
// 鎖的狀態(tài) private volatile int state; // 設(shè)置鎖的狀態(tài) protected final void setState(int newState) { state = newState; } // 獲取鎖的狀態(tài) protected final int getState() { return state; }
說(shuō)明:state表示鎖的狀態(tài),對(duì)于“獨(dú)占鎖”而已,state=0表示鎖是可獲取狀態(tài)(即,鎖沒(méi)有被任何線程鎖持有)。由于java中的獨(dú)占鎖是可重入的,state的值可以>1。
小結(jié):tryAcquire()的作用就是讓“當(dāng)前線程”嘗試獲取鎖。獲取成功返回true,失敗則返回false。
二. addWaiter(Node.EXCLUSIVE)
addWaiter(Node.EXCLUSIVE)的作用是,創(chuàng)建“當(dāng)前線程”的Node節(jié)點(diǎn),且Node中記錄“當(dāng)前線程”對(duì)應(yīng)的鎖是“獨(dú)占鎖”類型,并且將該節(jié)點(diǎn)添加到CLH隊(duì)列的末尾。
1.addWaiter()
addWaiter()在AQS中實(shí)現(xiàn),源碼如下:
private Node addWaiter(Node mode) { // 新建一個(gè)Node節(jié)點(diǎn),節(jié)點(diǎn)對(duì)應(yīng)的線程是“當(dāng)前線程”,“當(dāng)前線程”的鎖的模型是mode。 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 若CLH隊(duì)列不為空,則將“當(dāng)前線程”添加到CLH隊(duì)列末尾 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若CLH隊(duì)列為空,則調(diào)用enq()新建CLH隊(duì)列,然后再將“當(dāng)前線程”添加到CLH隊(duì)列中。 enq(node); return node; }
說(shuō)明:對(duì)于“公平鎖”而言,addWaiter(Node.EXCLUSIVE)會(huì)首先創(chuàng)建一個(gè)Node節(jié)點(diǎn),節(jié)點(diǎn)的類型是“獨(dú)占鎖”(Node.EXCLUSIVE)類型。然后,再將該節(jié)點(diǎn)添加到CLH隊(duì)列的末尾。
2. compareAndSetTail()
compareAndSetTail()在AQS中實(shí)現(xiàn),源碼如下:
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
說(shuō)明:compareAndSetTail也屬于CAS函數(shù),也是通過(guò)“本地方法”實(shí)現(xiàn)的。compareAndSetTail(expect, update)會(huì)以原子的方式進(jìn)行操作,它的作用是判斷CLH隊(duì)列的隊(duì)尾是不是為expect,是的話,就將隊(duì)尾設(shè)為update。
3. enq()
enq()在AQS中實(shí)現(xiàn),源碼如下:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
說(shuō)明: enq()的作用很簡(jiǎn)單。如果CLH隊(duì)列為空,則新建一個(gè)CLH表頭;然后將node添加到CLH末尾。否則,直接將node添加到CLH末尾。
小結(jié):addWaiter()的作用,就是將當(dāng)前線程添加到CLH隊(duì)列中。這就意味著將當(dāng)前線程添加到等待獲取“鎖”的等待線程隊(duì)列中了。
三. acquireQueued()
前面,我們已經(jīng)將當(dāng)前線程添加到CLH隊(duì)列中了。而acquireQueued()的作用就是逐步的去執(zhí)行CLH隊(duì)列的線程,如果當(dāng)前線程獲取到了鎖,則返回;否則,當(dāng)前線程進(jìn)行休眠,直到喚醒并重新獲取鎖了才返回。下面,我們看看acquireQueued()的具體流程。
1. acquireQueued()
acquireQueued()在AQS中實(shí)現(xiàn),源碼如下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // interrupted表示在CLH隊(duì)列的調(diào)度中, // “當(dāng)前線程”在休眠時(shí),有沒(méi)有被中斷過(guò)。 boolean interrupted = false; for (;;) { // 獲取上一個(gè)節(jié)點(diǎn)。 // node是“當(dāng)前線程”對(duì)應(yīng)的節(jié)點(diǎn),這里就意味著“獲取上一個(gè)等待鎖的線程”。 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
說(shuō)明:acquireQueued()的目的是從隊(duì)列中獲取鎖。
2. shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire()在AQS中實(shí)現(xiàn),源碼如下:
// 返回“當(dāng)前線程是否應(yīng)該阻塞” private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前繼節(jié)點(diǎn)的狀態(tài) int ws = pred.waitStatus; // 如果前繼節(jié)點(diǎn)是SIGNAL狀態(tài),則意味這當(dāng)前線程需要被unpark喚醒。此時(shí),返回true。 if (ws == Node.SIGNAL) return true; // 如果前繼節(jié)點(diǎn)是“取消”狀態(tài),則設(shè)置 “當(dāng)前節(jié)點(diǎn)”的 “當(dāng)前前繼節(jié)點(diǎn)” 為 “‘原前繼節(jié)點(diǎn)'的前繼節(jié)點(diǎn)”。 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前繼節(jié)點(diǎn)為“0”或者“共享鎖”狀態(tài),則設(shè)置前繼節(jié)點(diǎn)為SIGNAL狀態(tài)。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
說(shuō)明:
(01) 關(guān)于waitStatus請(qǐng)參考下表(中擴(kuò)號(hào)內(nèi)為waitStatus的值),更多關(guān)于waitStatus的內(nèi)容,可以參考前面的Node類的介紹。
CANCELLED[1] -- 當(dāng)前線程已被取消
SIGNAL[-1] -- “當(dāng)前線程的后繼線程需要被unpark(喚醒)”。一般發(fā)生情況是:當(dāng)前線程的后繼線程處于阻塞狀態(tài),而當(dāng)前線程被release或cancel掉,因此需要喚醒當(dāng)前線程的后繼線程。
CONDITION[-2] -- 當(dāng)前線程(處在Condition休眠狀態(tài))在等待Condition喚醒
PROPAGATE[-3] -- (共享鎖)其它線程獲取到“共享鎖”
[0] -- 當(dāng)前線程不屬于上面的任何一種狀態(tài)。
(02) shouldParkAfterFailedAcquire()通過(guò)以下規(guī)則,判斷“當(dāng)前線程”是否需要被阻塞。
規(guī)則1:如果前繼節(jié)點(diǎn)狀態(tài)為SIGNAL,表明當(dāng)前節(jié)點(diǎn)需要被unpark(喚醒),此時(shí)則返回true。
規(guī)則2:如果前繼節(jié)點(diǎn)狀態(tài)為CANCELLED(ws>0),說(shuō)明前繼節(jié)點(diǎn)已經(jīng)被取消,則通過(guò)先前回溯找到一個(gè)有效(非CANCELLED狀態(tài))的節(jié)點(diǎn),并返回false。
規(guī)則3:如果前繼節(jié)點(diǎn)狀態(tài)為非SIGNAL、非CANCELLED,則設(shè)置前繼的狀態(tài)為SIGNAL,并返回false。
如果“規(guī)則1”發(fā)生,即“前繼節(jié)點(diǎn)是SIGNAL”狀態(tài),則意味著“當(dāng)前線程”需要被阻塞。接下來(lái)會(huì)調(diào)用parkAndCheckInterrupt()阻塞當(dāng)前線程,直到當(dāng)前先被喚醒才從parkAndCheckInterrupt()中返回。
3. parkAndCheckInterrupt())
parkAndCheckInterrupt()在AQS中實(shí)現(xiàn),源碼如下:
private final boolean parkAndCheckInterrupt() { // 通過(guò)LockSupport的park()阻塞“當(dāng)前線程”。 LockSupport.park(this); // 返回線程的中斷狀態(tài)。 return Thread.interrupted(); }
說(shuō)明:parkAndCheckInterrupt()的作用是阻塞當(dāng)前線程,并且返回“線程被喚醒之后”的中斷狀態(tài)。
它會(huì)先通過(guò)LockSupport.park()阻塞“當(dāng)前線程”,然后通過(guò)Thread.interrupted()返回線程的中斷狀態(tài)。
這里介紹一下線程被阻塞之后如何喚醒。一般有2種情況:
第1種情況:unpark()喚醒。“前繼節(jié)點(diǎn)對(duì)應(yīng)的線程”使用完鎖之后,通過(guò)unpark()方式喚醒當(dāng)前線程。
第2種情況:中斷喚醒。其它線程通過(guò)interrupt()中斷當(dāng)前線程。
補(bǔ)充:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用類似,是阻塞/喚醒。
它們的用法不同,park(),unpark()是輕量級(jí)的,而wait(),notify()是必須先通過(guò)Synchronized獲取同步鎖。
關(guān)于LockSupport,我們會(huì)在之后的章節(jié)再專門進(jìn)行介紹!
4. 再次tryAcquire()
了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函數(shù)之后。我們接著分析acquireQueued()的for循環(huán)部分。
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
說(shuō)明:
(01) 通過(guò)node.predecessor()獲取前繼節(jié)點(diǎn)。predecessor()就是返回node的前繼節(jié)點(diǎn),若對(duì)此有疑惑可以查看下面關(guān)于Node類的介紹。
(02) p == head && tryAcquire(arg)
首先,判斷“前繼節(jié)點(diǎn)”是不是CHL表頭。如果是的話,則通過(guò)tryAcquire()嘗試獲取鎖。
其實(shí),這樣做的目的是為了“讓當(dāng)前線程獲取鎖”,但是為什么需要先判斷p==head呢?理解這個(gè)對(duì)理解“公平鎖”的機(jī)制很重要,因?yàn)檫@么做的原因就是為了保證公平性!
(a) 前面,我們?cè)趕houldParkAfterFailedAcquire()我們判斷“當(dāng)前線程”是否需要阻塞;
(b) 接著,“當(dāng)前線程”阻塞的話,會(huì)調(diào)用parkAndCheckInterrupt()來(lái)阻塞線程。當(dāng)線程被解除阻塞的時(shí)候,我們會(huì)返回線程的中斷狀態(tài)。而線程被解決阻塞,可能是由于“線程被中斷”,也可能是由于“其它線程調(diào)用了該線程的unpark()函數(shù)”。
(c) 再回到p==head這里。如果當(dāng)前線程是因?yàn)槠渌€程調(diào)用了unpark()函數(shù)而被喚醒,那么喚醒它的線程,應(yīng)該是它的前繼節(jié)點(diǎn)所對(duì)應(yīng)的線程(關(guān)于這一點(diǎn),后面在“釋放鎖”的過(guò)程中會(huì)看到)。 OK,是前繼節(jié)點(diǎn)調(diào)用unpark()喚醒了當(dāng)前線程!
此時(shí),再來(lái)理解p==head就很簡(jiǎn)單了:當(dāng)前繼節(jié)點(diǎn)是CLH隊(duì)列的頭節(jié)點(diǎn),并且它釋放鎖之后;就輪到當(dāng)前節(jié)點(diǎn)獲取鎖了。然后,當(dāng)前節(jié)點(diǎn)通過(guò)tryAcquire()獲取鎖;獲取成功的話,通過(guò)setHead(node)設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn),并返回。
總之,如果“前繼節(jié)點(diǎn)調(diào)用unpark()喚醒了當(dāng)前線程”并且“前繼節(jié)點(diǎn)是CLH表頭”,此時(shí)就是滿足p==head,也就是符合公平性原則的。否則,如果當(dāng)前線程是因?yàn)椤熬€程被中斷”而喚醒,那么顯然就不是公平了。這就是為什么說(shuō)p==head就是保證公平性!
小結(jié):acquireQueued()的作用就是“當(dāng)前線程”會(huì)根據(jù)公平性原則進(jìn)行阻塞等待,直到獲取鎖為止;并且返回當(dāng)前線程在等待過(guò)程中有沒(méi)有并中斷過(guò)。
四. selfInterrupt()
selfInterrupt()是AQS中實(shí)現(xiàn),源碼如下:
private static void selfInterrupt() { Thread.currentThread().interrupt(); }
說(shuō)明:selfInterrupt()的代碼很簡(jiǎn)單,就是“當(dāng)前線程”自己產(chǎn)生一個(gè)中斷。但是,為什么需要這么做呢?
這必須結(jié)合acquireQueued()進(jìn)行分析。如果在acquireQueued()中,當(dāng)前線程被中斷過(guò),則執(zhí)行selfInterrupt();否則不會(huì)執(zhí)行。
在acquireQueued()中,即使是線程在阻塞狀態(tài)被中斷喚醒而獲取到cpu執(zhí)行權(quán)利;但是,如果該線程的前面還有其它等待鎖的線程,根據(jù)公平性原則,該線程依然無(wú)法獲取到鎖。它會(huì)再次阻塞! 該線程再次阻塞,直到該線程被它的前面等待鎖的線程鎖喚醒;線程才會(huì)獲取鎖,然后“真正執(zhí)行起來(lái)”!
也就是說(shuō),在該線程“成功獲取鎖并真正執(zhí)行起來(lái)”之前,它的中斷會(huì)被忽略并且中斷標(biāo)記會(huì)被清除! 因?yàn)樵趐arkAndCheckInterrupt()中,我們線程的中斷狀態(tài)時(shí)調(diào)用了Thread.interrupted()。該函數(shù)不同于Thread的isInterrupted()函數(shù),isInterrupted()僅僅返回中斷狀態(tài),而interrupted()在返回當(dāng)前中斷狀態(tài)之后,還會(huì)清除中斷狀態(tài)。 正因?yàn)橹暗闹袛酄顟B(tài)被清除了,所以這里需要調(diào)用selfInterrupt()重新產(chǎn)生一個(gè)中斷!
小結(jié):selfInterrupt()的作用就是當(dāng)前線程自己產(chǎn)生一個(gè)中斷。
總結(jié)
再回過(guò)頭看看acquire()函數(shù),它最終的目的是獲取鎖!
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(01) 先是通過(guò)tryAcquire()嘗試獲取鎖。獲取成功的話,直接返回;嘗試失敗的話,再通過(guò)acquireQueued()獲取鎖。
(02) 嘗試失敗的情況下,會(huì)先通過(guò)addWaiter()來(lái)將“當(dāng)前線程”加入到"CLH隊(duì)列"末尾;然后調(diào)用acquireQueued(),在CLH隊(duì)列中排序等待獲取鎖,在此過(guò)程中,線程處于休眠狀態(tài)。直到獲取鎖了才返回。 如果在休眠等待過(guò)程中被中斷過(guò),則調(diào)用selfInterrupt()來(lái)自己產(chǎn)生一個(gè)中斷。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。