您好,登錄后才能下訂單哦!
手寫AQS非公平鎖的示例分析,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
1. Unsafe工具類
package com.shi.flink.unsafeTest; import sun.misc.Unsafe; import java.lang.reflect.Field; /** * @author shiye * @create 2021-03-30 17:03 */ public class UnsafeUtil { public static Unsafe getInstance() { Field field = null; try { field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (NoSuchFieldException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } return null; } }
2. 手寫AQS抽象類
package com.shi.flink.shilock; import com.shi.flink.unsafeTest.UnsafeUtil; import sun.misc.Unsafe; import java.util.concurrent.locks.AbstractOwnableSynchronizer; import java.util.concurrent.locks.LockSupport; /** * 自己寫抽象AQS實現(xiàn) * * @author shiye * @create 2021-03-30 14:10 */ public abstract class ShiAQS extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; /** * 頭指針 */ private transient volatile Node head; /** * 尾指針 */ private transient volatile Node tail; /** * 狀態(tài)值: * 0:空閑, * 1:正在有人使用 */ private volatile int state; /** * 獲取當前狀態(tài) * * @return */ protected final int getState() { return state; } /** * 設(shè)置當前鎖的狀態(tài) * * @param state */ public void setState(int state) { this.state = state; } /** * 使用unsafe類來初始化一些參數(shù)值 */ private static final Unsafe unsafe = UnsafeUtil.getInstance(); private static long stateOffset; private static long headOffset; private static long tailOffset; private static long waitStatusOffset; private static long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next")); } catch (NoSuchFieldException e) { e.printStackTrace(); } } /** * 設(shè)置狀態(tài) * * @param expect * @param update * @return */ protected boolean compareAndSetState(int expect, int update) { //讀取傳入對象o在內(nèi)存中偏移量為offset位置的值與期望值expected作比較 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } /** * 設(shè)置頭指針 * * @param update * @return */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * 如果pre節(jié)點得waitStatus值為ws, * 則把signal賦值給waitStatus * * @param pre * @param ws * @param signal * @return */ private static boolean compareAndSetWaitStatus(Node pre, int ws, int signal) { return unsafe.compareAndSwapInt(pre, waitStatusOffset, ws, signal); } /** * 設(shè)置尾指針 * * @param expect * @param update * @return */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * 設(shè)置下一個節(jié)點 * * @param node * @param expect * @param update * @return */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } /** * 解鎖方法 * * @param arg */ protected void release(int arg) throws Exception { //嘗試去釋放占用鎖得線程 boolean tryRelease = tryRelease(arg); if (tryRelease) { Node h = head; if (h != null && h.waitStatus != 0) { unparkSuccessor(h); } } } /** * 嘗試去釋放占用鎖得線程 * * @param arg * @return * @throws Exception */ protected boolean tryRelease(int arg) throws Exception { if (Thread.currentThread() != getExclusiveOwnerThread()) { //如果當前線程不是占用鎖得線程就拋出異常 throw new Exception("解鎖失敗,當前線程不是占用鎖得線程無法解鎖"); } else { setExclusiveOwnerThread(null); this.setState(0); return true; } } /** * 打斷某個線程 * * @return */ protected boolean interruptThread(Thread thread) throws Exception { Thread ownerThread = getExclusiveOwnerThread(); if (ownerThread == thread) { //如果是正在運行得線程 compareAndSetState(1, 0); setExclusiveOwnerThread(null); } else if (head != null) { //再對類中查找當前線程,并且取消排隊 for (Node next1 = head.next; next1 != null; next1 = next1.next) { if (next1.thread == thread) { compareAndSetWaitStatus(next1, next1.waitStatus, 1); } } } //解鎖 thread.interrupt(); System.out.println(thread.getName() + " 已經(jīng)中斷了 ====> "); unparkSuccessor(head); System.out.println(thread.getName() + " 已經(jīng)結(jié)束了 ====> "); return false; } /** * 自定義一個內(nèi)部類Node節(jié)點 */ static final class Node { //共享模式標記 static final Node shared = new Node(); //獨占鎖標記 static final Node excusive = null; //waitStatus值,指示線程已取消 static final int cancelled = 1; //waitStatus值,用于指示后續(xù)線程需要解除等待狀態(tài) static final int signal = -1; //waitStatus值,指示線程正在等待條件 static final int condition = -2; //waitStatus值,指示下一個acquireShared應(yīng)該 無條件傳播 static final int propagate = -3; //鎖的等待狀態(tài) volatile int waitStatus; //前指針 volatile Node prev; //后指針 volatile Node next; //線程 volatile Thread thread; Node nextWaiter; //是否是共享鎖 final boolean isShared() { return nextWaiter == shared; } //無參構(gòu)造 public Node() { } public Node(Node nextWaiter, Thread thread) { this.nextWaiter = nextWaiter; this.thread = thread; } /** * 獲取前節(jié)點 * * @return */ public Node getPrev() { Node p = prev; if (p == null) { throw new NullPointerException("前節(jié)點不能為空"); } return p; } } /** * 獲得 * * @param arg */ public void acquire(int arg) { //1.嘗試去排隊 boolean tryAcquire = tryAcquire(arg); if (!tryAcquire) { //2.如果搶占鎖失敗,就去排隊 Node node = addWaiter(Node.excusive); //3.對已經(jīng)再隊列中的節(jié)點,進行休眠等侯 acquireQueued(node, arg); } } /** * 先嘗試去排隊 * 1.先獲取鎖得狀態(tài),如果狀態(tài)為0,就嘗試去占用一次鎖 * 否則返回占用失敗 * * @param arg * @return true:表示搶占鎖成功 * false:表示搶占所失敗 */ public final boolean tryAcquire(int arg) { Thread current = Thread.currentThread(); int state = getState(); if (state == 0) { //如果空閑了,就嘗試去占用一次鎖 if (compareAndSetState(0, arg)) { //搶占成功就返回true,并設(shè)置線程 setExclusiveOwnerThread(current); return true; } } else if (getExclusiveOwnerThread() == current) { //如果當前當前線程多次搶占鎖,就將狀態(tài)+arg int nextState = state + arg; if (nextState < 0) { throw new Error("超過最大鎖計數(shù)"); } setState(nextState); return true; } return false; } /** * 添加等待隊列 * * @param mode */ public Node addWaiter(Node mode) { Node node = new Node(mode, Thread.currentThread()); Node temp = tail; if (temp == null) { //入隊 enQueue(node); return node; } else { //如果隊列中不為空,就把當前節(jié)點添加到尾節(jié)點中 node.prev = temp; if (compareAndSetTail(temp, node)) { temp.next = node; return node; } } return node; } /** * 入隊 * 把node節(jié)點添加到隊列中, * 如果隊列為null就初始化一個隊列并且把node節(jié)點添加到尾節(jié)點中 * * @param node 返回當前節(jié)點 */ public Node enQueue(Node node) { while (true) { Node temp = tail; if (temp == null) { //創(chuàng)建一個頭指針 compareAndSetHead(new Node()); //讓尾指針也指向頭指針(空節(jié)點) tail = head; } else { node.prev = temp; if (compareAndSetTail(temp, node)) { temp.next = node; return node; } } } } /** * @param node 當前正在侯隊中得節(jié)點 * @param arg * @return */ protected boolean acquireQueued(Node node, int arg) { boolean failed = true; try { //是否被打斷,默認false boolean interrupted = false; while (true) { final Node p = node.getPrev(); if (p == head && tryAcquire(arg)) { //如果是他的頭節(jié)點是head,并且嘗試搶占鎖成功就出隊,讓當前線程運行 setHead(node); p.next = null;//利于gc回收 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { //pre節(jié)點得waitStatus 設(shè)置成-1,并且讓當前線程阻塞,打斷當前線程 interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } protected final void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pre = node.prev; while (pre.waitStatus > 0) { node.prev = pre = pre.prev; } Node predNext = pre.next; node.waitStatus = Node.cancelled; if (node == tail && compareAndSetTail(node, pre)) { compareAndSetNext(pre, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pre != head && ((ws = pre.waitStatus) == Node.signal || (ws <= 0 && compareAndSetWaitStatus(pre, ws, Node.signal))) && pre.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pre, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } /** * 解鎖必須成功 * * @param node */ protected final void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); } /** * AQS源碼是這樣實現(xiàn)得 * 如果當前節(jié)點不為空,并且用戶取消了,就從尾節(jié)點往前遍歷一個,直到找到最前面得一個節(jié)點,解鎖當前線程 */ Node next1 = node.next; // if (next1 == null || next1.waitStatus > 0) { // next1 = null; // for (Node t = tail; t != null && t != node; t = t.prev) { // if (t.waitStatus <= 0) { // next1 = t; // } // } // } /** * 我自己實現(xiàn),從前往后找 */ if (next1 != null && next1.waitStatus > 0) { for (next1 = next1.next; next1 != null; next1 = next1.next) { if (next1.waitStatus <= 0) { break; } } } if (next1 != null) { //喚醒下一個線程 System.out.println(next1.thread.getName() + " 開始喚醒了 ====> "); LockSupport.unpark(next1.thread); System.out.println(next1.thread.getName() + " 已經(jīng)喚醒了 ====> "); } } /** * 將pre節(jié)點得waitStatus 設(shè)置成-1 * * @param pre * @param node * @return */ protected static boolean shouldParkAfterFailedAcquire(Node pre, Node node) { //獲取node節(jié)點得前一個節(jié)點得狀態(tài) int ws = pre.waitStatus; //如果是-1 就返回true if (ws == Node.signal) { return true; } if (ws > 0) { do { pre = pre.prev; node.prev = pre; } while (pre.waitStatus > 0); pre.next = node; } else { //設(shè)置成-1 boolean flag = compareAndSetWaitStatus(pre, ws, Node.signal); // System.out.println("設(shè)置成-1是否成功:" + flag); } return false; } /** * 阻塞當前線程,并且返回當前線程得打斷狀態(tài) * * @return true: 打斷線程成功 */ protected final boolean parkAndCheckInterrupt() { //打斷線程,讓線程阻塞 LockSupport.park(this); return Thread.interrupted(); } public Node getHead() { return head; } public void setHead(Node head) { this.head = head; } }
3.非公平所實現(xiàn)
package com.shi.flink.shilock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 自定義非公平鎖 * @author shiye * @create 2021-03-30 11:02 */ public class ShiNonfairLock extends ShiAQS implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; @Override public void lock() { if(compareAndSetState(0, 1)){ //如果搶到了鎖,就把當前線程設(shè)置進去 setExclusiveOwnerThread(Thread.currentThread()); }else{ // 否則就去排隊 acquire(1); } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { try { super.release(1); } catch (Exception e) { e.printStackTrace(); } } @Override public Condition newCondition() { return null; } /** * 打斷某個線程(自己瞎寫的,有bug) * @param thread * @return */ public boolean interruptThread(Thread thread) throws Exception { return super.interruptThread(thread); } }
4.測試
package com.shi.flink.shilock; import java.util.concurrent.TimeUnit; /** * @author shiye * @create 2021-03-31 17:09 */ public class MyLockTest { public static void main(String[] args) throws Exception { ShiNonfairLock lock = new ShiNonfairLock(); new Thread(() -> { try { System.out.println("A 線程進入到...加鎖過程"); lock.lock(); System.out.println("A 已經(jīng)搶占到鎖...休眠10s后運行......"); TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("A線程運行完成,開始解鎖...."); lock.unlock(); } }, "A").start(); TimeUnit.SECONDS.sleep(1); Thread B = new Thread(() -> { try { System.out.println("B 線程進入到...加鎖過程"); lock.lock(); System.out.println("B 已經(jīng)搶占到鎖...休眠10s后運行......"); TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("B線程運行完成,開始解鎖...."); lock.unlock(); } }, "B"); B.start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { try { System.out.println("C 線程進入到...加鎖過程"); lock.lock(); System.out.println("C 已經(jīng)搶占到鎖...休眠10s后運行......"); TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("C線程運行完成,開始解鎖...."); lock.unlock(); } }, "C").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { try { System.out.println("D 線程進入到...加鎖過程"); lock.lock(); System.out.println("D 已經(jīng)搶占到鎖...休眠10s后運行......"); TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("D線程運行完成,開始解鎖...."); lock.unlock(); } }, "D").start(); // TimeUnit.SECONDS.sleep(1); // System.out.println("強制讓 " + B.getName() + " 線程中斷..."); // lock.interruptThread(B); } }
看完上述內(nèi)容,你們掌握手寫AQS非公平鎖的示例分析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。