溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

ReentrantLock是怎么基于AQS實(shí)現(xiàn)的

發(fā)布時間:2022-03-24 16:32:49 來源:億速云 閱讀:181 作者:iii 欄目:web開發(fā)

這篇文章主要介紹了ReentrantLock是怎么基于AQS實(shí)現(xiàn)的的相關(guān)知識,內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇ReentrantLock是怎么基于AQS實(shí)現(xiàn)的文章都會有所收獲,下面我們一起來看看吧。

  ReentrantLock是一個可重入的互斥鎖,基于AQS實(shí)現(xiàn),它具有與使用 synchronized 方法和語句相同的一些基本行為和語義,但功能更強(qiáng)大。

  lock和unlock

  ReentrantLock 中進(jìn)行同步操作都是從lock方法開始。lock獲取鎖,進(jìn)行一系列的業(yè)務(wù)操作,結(jié)束后使用unlock釋放鎖。

  private final ReentrantLock lock = new ReentrantLock();

  public void sync(){

  lock.lock();

  try {

  // … method body

  } finally {

  lock.unlock()

  }

  }

  lock

  ReentrantLock 中l(wèi)ock的實(shí)現(xiàn)是通過調(diào)用AQS的 AbstractQueuedSynchronizer#acquire 方法實(shí)現(xiàn)。

  public final void acquire(int arg) {

  //嘗試獲取鎖

  if (!tryAcquire(arg) &&

  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

  selfInterrupt();

  }

  根據(jù)之前介紹的模板方法模式,對于鎖的獲取tryAcquire是在ReentrantLock中實(shí)現(xiàn)的。而非公平鎖中的實(shí)際實(shí)現(xiàn)方法為nonfairTryAcquire。

  ReentrantLock#nonfairTryAcquire

  protected final boolean tryAcquire(int acquires) {

  return nonfairTryAcquire(acquires);

  }

  final boolean nonfairTryAcquire(int acquires) {

  final Thread current = Thread.currentThread();

  int c = getState();

  if (c == 0) {

  if (compareAndSetState(0, acquires)) {

  setExclusiveOwnerThread(current);

  return true;

  }

  }

  else if (current == getExclusiveOwnerThread()) {

  int nextc = c + acquires;

  if (nextc < 0) // overflow

  throw new Error("Maximum lock count exceeded");

  setState(nextc);

  return true;

  }

  return false;

  }

  在獲取鎖的邏輯中首先是嘗試以cas方式獲取鎖,如果獲取失敗則表示鎖已經(jīng)被線程持有。

  再判斷持有該鎖的線程是否為當(dāng)前線程,如果是當(dāng)前線程就將state的值加1,在釋放鎖是也需要釋放多次。這就是可重入鎖的實(shí)現(xiàn)。

  如果持有鎖的線程并非當(dāng)前線程則這次加鎖失敗,返回false。加鎖失敗后將調(diào)用 AbstractQueuedSynchronizer#acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 。

  首先會調(diào)用addWaiter方法將該線程入隊(duì)。

  private Node addWaiter(Node mode) {

  Node node = new Node(Thread.currentThread(), mode);

  Node pred = tail;

  if (pred != null) {

  node.prev = pred;

  if (compareAndSetTail(pred, node)) {

  pred.next = node;

  return node;

  }

  }

  enq(node);

  return node;

  }

  mode是指以何種模式的節(jié)點(diǎn)入隊(duì),這里傳入的是Node.EXCLUSIVE(獨(dú)占鎖)。首先將當(dāng)前線程包裝為node節(jié)點(diǎn)。然后判斷等待隊(duì)列的尾節(jié)點(diǎn)是否為空,如果不為空則通過cas的方式將當(dāng)前節(jié)點(diǎn)接在隊(duì)尾。如果tail為空則執(zhí)行enq方法。

  AbstractQueuedSynchronizer#enq

  private Node enq(final Node node) {

  for (;;) {

  Node t = tail;

  if (t == null) { // Must initialize

  if (compareAndSetHead(new Node()))

  tail = head;

  } else {

  node.prev = t;

  if (compareAndSetTail(t, node)) {

  t.next = node;

  return t;

  }

  }

  }

  }

  enq方法通過for(;;)無限循環(huán)的方式將node節(jié)點(diǎn)設(shè)置到等待隊(duì)列的隊(duì)尾(隊(duì)列為空時head和tail都指向當(dāng)前節(jié)點(diǎn))。

  綜上可知addWaiter方法的作用是將競爭鎖失敗的節(jié)點(diǎn)放到等待隊(duì)列的隊(duì)尾。

  等待隊(duì)列中的節(jié)點(diǎn)也并不是什么都不做,這些節(jié)點(diǎn)也會不斷的嘗試獲取鎖,邏輯在acquireQueued中實(shí)現(xiàn)。

  AbstractQueuedSynchronizer#acquireQueued

  final boolean acquireQueued(final Node node, int arg) {

  boolean failed = true;

  try {

  boolean interrupted = false;

  for (;;) {

  final Node p = node.predecessor();

  if (p == head && tryAcquire(arg)) {

  setHead(node);

  p.next = null; // help GC

  failed = false;

  return interrupted;

  }

  if (shouldParkAfterFailedAcquire(p, node) &&

  parkAndCheckInterrupt())

  interrupted = true;

  }

  } finally {

  if (failed)

  cancelAcquire(node);

  }

  }

  可以看到該方法也是使用for(;;)無限循環(huán)的方式來嘗試獲取鎖。首先判斷當(dāng)前節(jié)點(diǎn)是否為頭結(jié)點(diǎn)的下一個節(jié)點(diǎn),如果是則再次調(diào)用tryAcquire嘗試獲取鎖。當(dāng)然這個過程并不是一定不停進(jìn)行的,這樣的話多線程競爭下cpu切換也極耗費(fèi)資源。

  shouldParkAfterFailedAcquire會判斷是否對當(dāng)前節(jié)點(diǎn)進(jìn)行阻塞,阻塞之后只有當(dāng)unpark后節(jié)點(diǎn)才會繼續(xù)假如爭奪鎖的行列。

  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

  int ws = pred.waitStatus;

  if (ws == Node.SIGNAL)

  return true;

  if (ws > 0) {

  do {

  node.prev = pred = pred.prev;

  } while (pred.waitStatus > 0);

  pred.next = node;

  } else {

  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

  }

  return false;

  }

  判斷一個節(jié)點(diǎn)是否需要被阻塞是通過該節(jié)點(diǎn)的前繼節(jié)點(diǎn)的狀態(tài)判斷的。

  如果前繼節(jié)點(diǎn)狀態(tài)為 singal ,則表示前繼節(jié)點(diǎn)還在等待,當(dāng)前節(jié)點(diǎn)需要繼續(xù)被阻塞。返回true。

  如果前繼節(jié)點(diǎn)大于0,則表示前繼節(jié)點(diǎn)為取消狀態(tài)。取消狀態(tài)的節(jié)點(diǎn)不參與鎖的競爭,直接跳過。返回false。

  如果前繼節(jié)點(diǎn)時其他狀態(tài)(0,PROPAGATE),不進(jìn)行阻塞,表示當(dāng)前節(jié)點(diǎn)需要重試嘗試獲取鎖。返回false。

  shouldParkAfterFailedAcquire方法如果返回true,表示需要將當(dāng)前節(jié)點(diǎn)阻塞,阻塞方法為parkAndCheckInterrupt。

  AbstractQueuedSynchronizer#parkAndCheckInterrupt

  private final boolean parkAndCheckInterrupt() {

  LockSupport.park(this);

  return Thread.interrupted();

  }

  阻塞是通過LockSupport進(jìn)行阻塞,被阻塞的節(jié)點(diǎn)不參與鎖的競爭(不在進(jìn)行循環(huán)獲取鎖),只能被unpark后才繼續(xù)競爭鎖。

  而被阻塞的節(jié)點(diǎn)要被釋放則依賴于unlock方法。

  unlock

  ReentrantLock 中unlock的實(shí)現(xiàn)是通過調(diào)用AQS的 AbstractQueuedSynchronizer#release 方法實(shí)現(xiàn)。

  public final boolean release(int arg) {

  if (tryRelease(arg)) {

  Node h = head;

  if (h != null && h.waitStatus != 0)

  unparkSuccessor(h);

  return true;

  }

  return false;

  }

  release調(diào)用tryRelease方法,tryRelease是在 ReentrantLock 中實(shí)現(xiàn)。

  ReentrantLock#tryRelease

  protected final boolean tryRelease(int releases) {

  int c = getState() - releases;

  if (Thread.currentThread() != getExclusiveOwnerThread())

  throw new IllegalMonitorStateException();

  boolean free = false;

  if (c == 0) {

  free = true;

  setExclusiveOwnerThread(null);

  }

  setState(c);

  return free;

  }

  tryRelease方法邏輯很簡單,首先減去releases(一般為1)表示釋放一個鎖,如果釋放后state=0表示釋放鎖成功,后續(xù)等待的節(jié)點(diǎn)可以獲取該鎖了。如果state!=0則表示該鎖為重入鎖,需要多次釋放。

  當(dāng)釋放鎖成功后(state=0),會對頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)進(jìn)行unpark。

  AbstractQueuedSynchronizer#unparkSuccessor

  private void unparkSuccessor(Node node) {

  int ws = node.waitStatus;

  if (ws < 0)

  compareAndSetWaitStatus(node, ws, 0);

  Node s = node.next;

  if (s == null || s.waitStatus > 0) {

  s = null;

  for (Node t = tail; t != null && t != node; t = t.prev)

  if (t.waitStatus <= 0)

  s = t;

  }

  if (s != null)

  LockSupport.unpark(s.thread);

  }

  unparkSuccessor見名知意適用于接觸后面節(jié)點(diǎn)的阻塞狀態(tài)。整個方法的邏輯就是找到傳入節(jié)點(diǎn)的后繼節(jié)點(diǎn),將其喚醒(排除掉狀態(tài)為cancel即waitStatus > 0的節(jié)點(diǎn))。

  公平鎖和非公平鎖

  ReentrantLock 的構(gòu)造方法接受一個可選的公平參數(shù)。當(dāng)設(shè)置為 true 時,在多個線程的競爭時,傾向于將鎖分配給等待時間最長的線程。

  public ReentrantLock(boolean fair) {

  sync = fair ? new FairSync() : new NonfairSync();

  }

  在多個鎖競爭統(tǒng)一資源的環(huán)境下,AQS維護(hù)了一個等待隊(duì)列,未能獲取到鎖的線程都會被掛到該隊(duì)列中。如果使用公平鎖則會從隊(duì)列的頭結(jié)點(diǎn)開始獲取該資源。

  而根據(jù)代碼在公平鎖和非公平鎖的實(shí)現(xiàn)的差別僅僅在于公平鎖多了一個檢測的方法。

  公平鎖

  protected final boolean tryAcquire(int acquires) {

  //&hellip;

  if (c == 0) {

  if (!hasQueuedPredecessors() //!hasQueuedPredecessors()便是比非公平鎖多出來的操作

  && compareAndSetState(0, acquires)) {

  setExclusiveOwnerThread(current);

  return true;

  }

  }

  //&hellip;

  return false;

  }

  hasQueuedPredecessors()

  public final boolean hasQueuedPredecessors() {

  Node t = tail; // Read fields in reverse initialization order

  Node h = head;

  Node s;

  return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());

  }

  方法邏輯很簡單,就是如果等待隊(duì)列還有節(jié)點(diǎn)并且排在首位的不是當(dāng)前線程所處的節(jié)點(diǎn)返回true表示還有等待更長時間的節(jié)點(diǎn)。

關(guān)于“ReentrantLock是怎么基于AQS實(shí)現(xiàn)的”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“ReentrantLock是怎么基于AQS實(shí)現(xiàn)的”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI