溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么在Java項(xiàng)目中實(shí)現(xiàn)一個(gè)ReentrantLock鎖

發(fā)布時(shí)間:2021-01-18 15:18:54 來源:億速云 閱讀:181 作者:Leah 欄目:開發(fā)技術(shù)

今天就跟大家聊聊有關(guān)怎么在Java項(xiàng)目中實(shí)現(xiàn)一個(gè)ReentrantLock鎖,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

ReentrantLock鎖

ReentrantLock是Java中常用的鎖,屬于樂觀鎖類型,多線程并發(fā)情況下。能保證共享數(shù)據(jù)安全性,線程間有序性
ReentrantLock通過原子操作和阻塞實(shí)現(xiàn)鎖原理,一般使用lock獲取鎖,unlock釋放鎖,
下面說一下鎖的基本使用和底層基本實(shí)現(xiàn)原理,lock和unlock底層

lock的時(shí)候可能被其他線程獲得所,那么此線程會(huì)阻塞自己,關(guān)鍵原理底層用到Unsafe類的API: CAS和park

使用

java.util.concurrent.locks.ReentrantLock

在多線程環(huán)境下使用,創(chuàng)建鎖對(duì)象,調(diào)用lock()獲取鎖開始處理邏輯,處理完unlock()釋放鎖。注意使用的時(shí)候lock和unlock必須成對(duì)出現(xiàn),不然可能出現(xiàn)死鎖或者嚴(yán)重堵塞的情況

unlock

//創(chuàng)建鎖對(duì)象
ReentrantLock lock = new ReentrantLock();
lock.lock(); //獲取鎖(鎖定)
System.out.println("一段需要上鎖的代碼")
lock.unlock(); //鎖釋放

執(zhí)行完代碼后,釋放鎖,讓其他線程去獲取,需要注意的是,多個(gè)線程使用的鎖對(duì)象必須是同一個(gè)。

什么情況需要上鎖,就是在多線程不安全的情況下,多個(gè)線程操作同一個(gè)對(duì)象。
如多個(gè)線程同時(shí)操作一個(gè)隊(duì)列,offer()添加對(duì)象,兩個(gè)線程同時(shí)offer,因?yàn)椴皇窃硬僮鳎芸赡芤粋€(gè)線程添加成功,另一個(gè)線程添加失敗,延伸到一些業(yè)務(wù)中是要杜絕的問題。

可以用鎖解決問題,我們可以定義一個(gè)隊(duì)列同一時(shí)間只能被一個(gè)拿到鎖的線程操作,即保證offer這種非原子操作完成后,釋放鎖,再讓其他線程拿到鎖后,才能offer,保證有序的offer,不會(huì)丟失信息。

示例

為了體現(xiàn)鎖的作用,這里sleep睡眠0.1秒,增加哪個(gè)線程獲取鎖的隨機(jī)性
因?yàn)榫€程喚醒后,會(huì)開始嘗試獲取鎖,多個(gè)線程下競(jìng)爭(zhēng)一把鎖是隨機(jī)的

package javabasis.threads;
import java.util.concurrent.locks.ReentrantLock;

public class LockTest implements Runnable {
  
	public static ReentrantLock lock = new ReentrantLock();//創(chuàng)建鎖對(duì)象
	private int thold;
  
	public LockTest(int h) {
		this.thold = h;
	}
	
	public static void main(String[] args) {
		for (int i = 10; i < 15; i++) {
			new Thread(new LockTest(i),"name-" + i).start();
		}
	}

	@Override
	public void run() {
		try {
			Thread.sleep(100);
			lock.lock(); //獲取鎖
			System.out.println("lock threadName:" + Thread.currentThread().getName());
			{
				System.out.print(" writeStart ");
				for (int i = 0; i < 15; i++) {
						Thread.sleep(100);
					System.out.print(thold+",");
				}
				System.out.println(" writeEnd");
			}
			System.out.println("unlock threadName:" + Thread.currentThread().getName() + "\r\n");
			lock.unlock(); //鎖釋放 
		} catch (InterruptedException e) {	
		}		
	}	
}

運(yùn)行main方法輸出結(jié)果:

lock threadName:name-10
 writeStart 10,10,10,10,10,10,10,10,10,10,10,10,10,10,10, writeEnd
unlock threadName:name-10

lock threadName:name-14
 writeStart 14,14,14,14,14,14,14,14,14,14,14,14,14,14,14, writeEnd
unlock threadName:name-14

lock threadName:name-13
 writeStart 13,13,13,13,13,13,13,13,13,13,13,13,13,13,13, writeEnd
unlock threadName:name-13

lock threadName:name-11
 writeStart 11,11,11,11,11,11,11,11,11,11,11,11,11,11,11, writeEnd
unlock threadName:name-11

lock threadName:name-12
 writeStart 12,12,12,12,12,12,12,12,12,12,12,12,12,12,12, writeEnd
unlock threadName:name-12

這體現(xiàn)在多線程情況下,鎖能做到讓線程之間有序運(yùn)行,

如果沒有鎖,情況可能是 12,13,13,10,10,10,12,沒有鎖其他線程可能插隊(duì)執(zhí)行System.out.print

將上鎖的代碼注釋后輸出結(jié)果:

lock threadName:name-11
lock threadName:name-12
 writeStart lock threadName:name-10
 writeStart lock threadName:name-13
 writeStart lock threadName:name-14
 writeStart writeStart 14,12,10,11,13,11,12,14,10,13,10,13,14,12,11,10,14,12,11,13,14,11,13,12,10,13,10,12,14,11,11,13,10,12,14,14,10,12,11,13,11,14,13,12,10,14,10,11,13,12,14,12,11,13,10,14,10,11,12,13,12,14,11,13,10,11,10,14,13,12,11, writeEnd
unlock threadName:name-11

13,12, writeEnd
unlock threadName:name-12

 writeEnd
unlock threadName:name-13

14, writeEnd
unlock threadName:name-14

10, writeEnd
unlock threadName:name-10

原理

ReentrantLock主要用到unsafe的CAS和park兩個(gè)功能實(shí)現(xiàn)鎖(CAS + park )

多個(gè)線程同時(shí)操作一個(gè)數(shù)N,使用原子(CAS)操作,原子操作能保證同一時(shí)間只能被一個(gè)線程修改,而修改數(shù)N成功后,返回true,其他線程修改失敗,返回false,
這個(gè)原子操作可以定義線程是否拿到鎖,返回true代表獲取鎖,返回false代表為沒有拿到鎖。

拿到鎖的線程,自然是繼續(xù)執(zhí)行后續(xù)邏輯代碼,而沒有拿到鎖的線程,則調(diào)用park,將線程(自己)阻塞。

線程阻塞需要其他線程喚醒,ReentrantLock中用到了鏈表用于存放等待或者阻塞的線程,每次線程阻塞,先將自己的線程信息放入鏈表尾部,再阻塞自己;之后需要拿到鎖的線程,在調(diào)用unlock 釋放鎖時(shí),從鏈表中獲取阻塞線程,調(diào)用unpark 喚醒指定線程

Unsafe

sun.misc.Unsafe是關(guān)鍵類,提供大量偏底層的API 包括CAS park
sun.misc.Unsafe 此類在openjdk中可以查看

CAS 原子操作

compare and swapz(CAS)比較并交換,是原子性操作,
原理:當(dāng)修改一個(gè)(內(nèi)存中的)變量o的值N的時(shí)候,首先有個(gè)期望值expected,和一個(gè)更新值x,先比較N是否等于expected,等于,那么更新內(nèi)存中的值為x值,否則不更新。

public final native boolean compareAndSwapInt(Object o, long offset,
                       int expected,
                       int x);

這里offset據(jù)了解,是對(duì)象的成員變量在內(nèi)存中的偏移地址,
即底層一個(gè)對(duì)象object存放在內(nèi)存中,讀取的地址是0x2110,此對(duì)象的一個(gè)成員變量state的值也在內(nèi)存中,但內(nèi)存地址肯定不是0x2110

java中的CAS使用

java.util.concurrent.locks.AbstractQueuedSynchronizer

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
static {
    try {
      stateOffset = unsafe.objectFieldOffset
        (AbstractQueuedSynchronizer.class.getDeclaredField("state")); //獲取成員變量state在內(nèi)存中的偏移量

    } catch (Exception ex) { throw new Error(ex); }
  }
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  }

在Java中,compareAndSetState這個(gè)操作如果更新成功,返回true,失敗返回false,通過這個(gè)機(jī)制,可以定義鎖(樂觀鎖)。
如三個(gè)線程A,B,C,在目標(biāo)值為0的情況下,同時(shí)執(zhí)行compareAndSetState(0,1) 去修改它
期望值是0,更新值是1,因?yàn)槭窃硬僮鳎诘谝粋€(gè)線程操作成功之后目標(biāo)值變?yōu)?,返回true
所以另外兩個(gè)線程就因?yàn)槠谕禐?不等于1,返回false。
我們可以理解為,返回true的線程拿到了鎖。

最終調(diào)用的Java類是sun.misc.Unsafe

park 阻塞

Java中可以通過unsafe.park()去阻塞(停止)一個(gè)線程,也可以通過unsafe.unpark()讓一個(gè)阻塞線程恢復(fù)繼續(xù)執(zhí)行

unsafe.park()

阻塞(停止)當(dāng)前線程

public native void park(boolean isAbsolute, long time);

根據(jù)debug測(cè)試,此方法能停止線程自己,最后通過其他線程喚醒

unsafe.unpark()

取消阻塞(喚醒)線程

public native void unpark(Object thread);

根據(jù)debug測(cè)試,此方法可以喚醒其他被park調(diào)用阻塞的線程

park與interrupt的區(qū)別

interrupt是Thread類的的API,park是Unsafe類的API,兩者是有區(qū)別的。
測(cè)試了解,Thread.currentThread().interrupt(),線程會(huì)繼續(xù)運(yùn)行,而Unsafe.park(Thread.currentThread())就是直接阻塞線程,不繼續(xù)運(yùn)行代碼。

獲取鎖

線程cas操作失敗,可以park阻塞自己,讓其他擁有鎖的線程在unlock的時(shí)候釋放自己,達(dá)到鎖的效果

java.util.concurrent.locks.ReentrantLock的lock方法是

public void lock() {
    sync.lock();
  }

而sync的實(shí)現(xiàn)類其中一個(gè)是java.util.concurrent.locks.ReentrantLock.NonfairSync 不公平鎖,它的邏輯比較直接

/**
NonfairSync
*/
final void lock() {
  if (compareAndSetState(0, 1))//cas操作,如果true 則表示操作成功,獲取鎖
    setExclusiveOwnerThread(Thread.currentThread()); //設(shè)置獲取鎖擁有者為當(dāng)前線程
  else
    acquire(1);//獲取鎖失敗,鎖住線程(自己)
}

獲取失敗后阻塞線程

如果獲取鎖失敗,會(huì)再嘗試一次,失敗后,將線程(自己)阻塞

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
  }
protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
    }
final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) { 
			//如果期望值為0,內(nèi)存值也為0,再次嘗試獲取鎖(此時(shí)其他線程也可能嘗試獲取鎖)
        if (compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current); //第二次獲取成功,放回true
          return true;
        }
      }
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false; //沒有獲取到鎖,返回false,則 !tryAcquire(arg) 為true,執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    }

獲取鎖失敗,線程會(huì)進(jìn)入循環(huán),acquireQueued 方法中for是個(gè)無限循環(huán),除非獲取鎖成功后,才會(huì)return。

//獲取鎖失敗后,準(zhǔn)備阻塞線程(自己)
//阻塞之前,添加節(jié)點(diǎn)存放到鏈表,其他線程可以通過這個(gè)鏈表喚醒此線程
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); 
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {//cas操作
        pred.next = node;
        return node;
      }
    }
    enq(node);
    return node;
  }

// 在此方法直到獲取鎖成功才會(huì)跳出循環(huán)
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return interrupted; //獲取鎖成功之后才會(huì)return跳出此方法
        }
        if (shouldParkAfterFailedAcquire(p, node) && //如果滿足阻塞條件
          parkAndCheckInterrupt()) 
          interrupted = true;
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
  }

  private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//停止線程(自己)
    return Thread.interrupted();
  }

釋放鎖

一個(gè)線程拿到鎖之后,執(zhí)行完關(guān)鍵代碼,必須unlock釋放鎖的,否則其他線程永遠(yuǎn)拿不到鎖

public void unlock() {
    sync.release(1);
  }

public final boolean release(int arg) {
    if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
        unparkSuccessor(h);
      return true;
    }
    return false;
  }
//java.util.concurrent.locks.ReentrantLock.Sync 的tryRelease
 protected final boolean tryRelease(int releases) {
      int c = getState() - releases; //這里一般是 1 - 1 = 0
      if (Thread.currentThread() != getExclusiveOwnerThread()) //只能是鎖的擁有者釋放鎖
        throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
      }
      setState(c); //設(shè)置state為0,相當(dāng)于釋放鎖,讓其他線程compareAndSetState(0, 1)可能成功
			
      return free;
    }

protected final void setState(int newState) {
    state = newState; //沒有cas操作
  }

setState不做cas操作是因?yàn)?,只有擁有鎖的線程才調(diào)用unlock,不存才并發(fā)混亂問題

其他線程沒拿到鎖不會(huì)設(shè)值成功,其他線程在此線程設(shè)置state為0之前,compareAndSetState(0, 1)都會(huì)失敗,拿不到鎖,此線程設(shè)置state為0之后,其他線程compareAndSetState(0, 1)才有可能成功,返回true從而拿到鎖

釋放線程

線程在獲取鎖失敗后,有可能阻塞線程(自己),在阻塞之前把阻塞線程信息放入鏈表的
釋放鎖之后,線程會(huì)嘗試通過鏈表釋放其他線程(一個(gè)),讓一個(gè)阻塞線程恢復(fù)運(yùn)行

阻塞線程被取消阻塞后如何拿到鎖(ReentrantLock中)

有時(shí)候線程被中斷后,喚醒繼續(xù)執(zhí)行后面的代碼,
線程沒有拿到鎖之后主動(dòng)阻塞自己的,但所還沒拿到,被喚醒之后怎么去嘗試重新獲取鎖呢? 里面有一個(gè)for循環(huán)

final void lock() {
      if (compareAndSetState(0, 1)) 
        setExclusiveOwnerThread(Thread.currentThread());//拿到鎖
      else
        acquire(1); //沒有拿到鎖
    }
// 上鎖失敗,會(huì)添加一個(gè)節(jié)點(diǎn),節(jié)點(diǎn)包含線程信息,將此節(jié)點(diǎn)放入隊(duì)列
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
  }

// 存好節(jié)點(diǎn)后,將線程(自己)中斷,等其他線程喚醒(自己)
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {//循環(huán) 被喚醒后線程還是在此處循環(huán)
        
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {//嘗試獲取鎖
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return interrupted; //如果拿到鎖了,才會(huì)return
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt()) //沒拿到鎖時(shí),主動(dòng)中斷Thread.currentThread()
          interrupted = true;
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
  }

被喚醒后繼續(xù)執(zhí)行compareAndSetState(0, 1)返回false沒拿到鎖,則繼續(xù)循環(huán)或阻塞

compareAndSetState(0, 1) 這個(gè)操作是獲取鎖的關(guān)鍵

看完上述內(nèi)容,你們對(duì)怎么在Java項(xiàng)目中實(shí)現(xiàn)一個(gè)ReentrantLock鎖有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI