溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

手寫AQS非公平鎖的示例分析

發(fā)布時間:2021-09-18 10:30:36 來源:億速云 閱讀:125 作者:柒染 欄目:編程語言

手寫AQS非公平鎖的示例分析,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

1. Unsafe工具類

package com.shi.flink.unsafeTest;

import sun.misc.Unsafe;

import java.lang.reflect.Field;

/**
 * @author shiye
 * @create 2021-03-30 17:03
 */
public class UnsafeUtil {
    public static Unsafe getInstance() {
        Field field = null;
        try {
            field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return null;
    }
}

2. 手寫AQS抽象類

package com.shi.flink.shilock;

import com.shi.flink.unsafeTest.UnsafeUtil;
import sun.misc.Unsafe;

import java.util.concurrent.locks.AbstractOwnableSynchronizer;
import java.util.concurrent.locks.LockSupport;

/**
 * 自己寫抽象AQS實現(xiàn)
 *
 * @author shiye
 * @create 2021-03-30 14:10
 */
public abstract class ShiAQS extends AbstractOwnableSynchronizer implements java.io.Serializable {
    private static final long serialVersionUID = 7373984972572414691L;

    /**
     * 頭指針
     */
    private transient volatile Node head;

    /**
     * 尾指針
     */
    private transient volatile Node tail;

    /**
     * 狀態(tài)值:
     * 0:空閑,
     * 1:正在有人使用
     */
    private volatile int state;

    /**
     * 獲取當前狀態(tài)
     *
     * @return
     */
    protected final int getState() {
        return state;
    }

    /**
     * 設(shè)置當前鎖的狀態(tài)
     *
     * @param state
     */
    public void setState(int state) {
        this.state = state;
    }

    /**
     * 使用unsafe類來初始化一些參數(shù)值
     */
    private static final Unsafe unsafe = UnsafeUtil.getInstance();
    private static long stateOffset;
    private static long headOffset;
    private static long tailOffset;
    private static long waitStatusOffset;
    private static long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
    }

    /**
     * 設(shè)置狀態(tài)
     *
     * @param expect
     * @param update
     * @return
     */
    protected boolean compareAndSetState(int expect, int update) {
        //讀取傳入對象o在內(nèi)存中偏移量為offset位置的值與期望值expected作比較
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /**
     * 設(shè)置頭指針
     *
     * @param update
     * @return
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * 如果pre節(jié)點得waitStatus值為ws,
     * 則把signal賦值給waitStatus
     *
     * @param pre
     * @param ws
     * @param signal
     * @return
     */
    private static boolean compareAndSetWaitStatus(Node pre, int ws, int signal) {
        return unsafe.compareAndSwapInt(pre, waitStatusOffset, ws, signal);
    }

    /**
     * 設(shè)置尾指針
     *
     * @param expect
     * @param update
     * @return
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * 設(shè)置下一個節(jié)點
     *
     * @param node
     * @param expect
     * @param update
     * @return
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    /**
     * 解鎖方法
     *
     * @param arg
     */
    protected void release(int arg) throws Exception {
        //嘗試去釋放占用鎖得線程
        boolean tryRelease = tryRelease(arg);

        if (tryRelease) {
            Node h = head;
            if (h != null && h.waitStatus != 0) {
                unparkSuccessor(h);
            }
        }
    }

    /**
     * 嘗試去釋放占用鎖得線程
     *
     * @param arg
     * @return
     * @throws Exception
     */
    protected boolean tryRelease(int arg) throws Exception {
        if (Thread.currentThread() != getExclusiveOwnerThread()) {
            //如果當前線程不是占用鎖得線程就拋出異常
            throw new Exception("解鎖失敗,當前線程不是占用鎖得線程無法解鎖");
        } else {
            setExclusiveOwnerThread(null);
            this.setState(0);
            return true;
        }
    }

    /**
     * 打斷某個線程
     *
     * @return
     */
    protected boolean interruptThread(Thread thread) throws Exception {
        Thread ownerThread = getExclusiveOwnerThread();
        if (ownerThread == thread) {
            //如果是正在運行得線程
            compareAndSetState(1, 0);
            setExclusiveOwnerThread(null);
        } else if (head != null) {
            //再對類中查找當前線程,并且取消排隊
            for (Node next1 = head.next; next1 != null; next1 = next1.next) {
                if (next1.thread == thread) {
                    compareAndSetWaitStatus(next1, next1.waitStatus, 1);
                }
            }
        }
        //解鎖
        thread.interrupt();
        System.out.println(thread.getName() + " 已經(jīng)中斷了 ====> ");
        unparkSuccessor(head);
        System.out.println(thread.getName() + " 已經(jīng)結(jié)束了 ====> ");
        return false;
    }

    /**
     * 自定義一個內(nèi)部類Node節(jié)點
     */
    static final class Node {

        //共享模式標記
        static final Node shared = new Node();

        //獨占鎖標記
        static final Node excusive = null;

        //waitStatus值,指示線程已取消
        static final int cancelled = 1;

        //waitStatus值,用于指示后續(xù)線程需要解除等待狀態(tài)
        static final int signal = -1;

        //waitStatus值,指示線程正在等待條件
        static final int condition = -2;

        //waitStatus值,指示下一個acquireShared應(yīng)該 無條件傳播
        static final int propagate = -3;

        //鎖的等待狀態(tài)
        volatile int waitStatus;

        //前指針
        volatile Node prev;

        //后指針
        volatile Node next;

        //線程
        volatile Thread thread;

        Node nextWaiter;

        //是否是共享鎖
        final boolean isShared() {
            return nextWaiter == shared;
        }

        //無參構(gòu)造
        public Node() {
        }

        public Node(Node nextWaiter, Thread thread) {
            this.nextWaiter = nextWaiter;
            this.thread = thread;
        }

        /**
         * 獲取前節(jié)點
         *
         * @return
         */
        public Node getPrev() {
            Node p = prev;
            if (p == null) {
                throw new NullPointerException("前節(jié)點不能為空");
            }
            return p;
        }
    }


    /**
     * 獲得
     *
     * @param arg
     */
    public void acquire(int arg) {
        //1.嘗試去排隊
        boolean tryAcquire = tryAcquire(arg);
        if (!tryAcquire) {
            //2.如果搶占鎖失敗,就去排隊
            Node node = addWaiter(Node.excusive);

            //3.對已經(jīng)再隊列中的節(jié)點,進行休眠等侯
            acquireQueued(node, arg);
        }
    }

    /**
     * 先嘗試去排隊
     * 1.先獲取鎖得狀態(tài),如果狀態(tài)為0,就嘗試去占用一次鎖
     * 否則返回占用失敗
     *
     * @param arg
     * @return true:表示搶占鎖成功
     * false:表示搶占所失敗
     */
    public final boolean tryAcquire(int arg) {
        Thread current = Thread.currentThread();
        int state = getState();
        if (state == 0) {
            //如果空閑了,就嘗試去占用一次鎖
            if (compareAndSetState(0, arg)) {
                //搶占成功就返回true,并設(shè)置線程
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (getExclusiveOwnerThread() == current) {
            //如果當前當前線程多次搶占鎖,就將狀態(tài)+arg
            int nextState = state + arg;
            if (nextState < 0) {
                throw new Error("超過最大鎖計數(shù)");
            }
            setState(nextState);
            return true;
        }
        return false;
    }

    /**
     * 添加等待隊列
     *
     * @param mode
     */
    public Node addWaiter(Node mode) {
        Node node = new Node(mode, Thread.currentThread());
        Node temp = tail;
        if (temp == null) {
            //入隊
            enQueue(node);
            return node;
        } else {
            //如果隊列中不為空,就把當前節(jié)點添加到尾節(jié)點中
            node.prev = temp;
            if (compareAndSetTail(temp, node)) {
                temp.next = node;
                return node;
            }
        }
        return node;
    }

    /**
     * 入隊
     * 把node節(jié)點添加到隊列中,
     * 如果隊列為null就初始化一個隊列并且把node節(jié)點添加到尾節(jié)點中
     *
     * @param node 返回當前節(jié)點
     */
    public Node enQueue(Node node) {
        while (true) {
            Node temp = tail;
            if (temp == null) {
                //創(chuàng)建一個頭指針
                compareAndSetHead(new Node());
                //讓尾指針也指向頭指針(空節(jié)點)
                tail = head;
            } else {
                node.prev = temp;
                if (compareAndSetTail(temp, node)) {
                    temp.next = node;
                    return node;
                }
            }
        }
    }

    /**
     * @param node 當前正在侯隊中得節(jié)點
     * @param arg
     * @return
     */
    protected boolean acquireQueued(Node node, int arg) {
        boolean failed = true;

        try {
            //是否被打斷,默認false
            boolean interrupted = false;
            while (true) {
                final Node p = node.getPrev();
                if (p == head && tryAcquire(arg)) {
                    //如果是他的頭節(jié)點是head,并且嘗試搶占鎖成功就出隊,讓當前線程運行
                    setHead(node);
                    p.next = null;//利于gc回收
                    failed = false;
                    return interrupted;
                }

                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                    //pre節(jié)點得waitStatus 設(shè)置成-1,并且讓當前線程阻塞,打斷當前線程
                    interrupted = true;
                }
            }

        } finally {
            if (failed) cancelAcquire(node);
        }
    }

    protected final void cancelAcquire(Node node) {
        if (node == null) return;

        node.thread = null;
        Node pre = node.prev;

        while (pre.waitStatus > 0) {
            node.prev = pre = pre.prev;
        }
        Node predNext = pre.next;
        node.waitStatus = Node.cancelled;

        if (node == tail && compareAndSetTail(node, pre)) {
            compareAndSetNext(pre, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pre != head &&
                    ((ws = pre.waitStatus) == Node.signal ||
                            (ws <= 0 && compareAndSetWaitStatus(pre, ws, Node.signal))) &&
                    pre.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pre, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }

    }

    /**
     * 解鎖必須成功
     *
     * @param node
     */
    protected final void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }

        /**
         * AQS源碼是這樣實現(xiàn)得
         * 如果當前節(jié)點不為空,并且用戶取消了,就從尾節(jié)點往前遍歷一個,直到找到最前面得一個節(jié)點,解鎖當前線程
         */
        Node next1 = node.next;
//        if (next1 == null || next1.waitStatus > 0) {
//            next1 = null;
//            for (Node t = tail; t != null && t != node; t = t.prev) {
//                if (t.waitStatus <= 0) {
//                    next1 = t;
//                }
//            }
//        }

        /**
         * 我自己實現(xiàn),從前往后找
         */
        if (next1 != null && next1.waitStatus > 0) {
            for (next1 = next1.next; next1 != null; next1 = next1.next) {
                if (next1.waitStatus <= 0) {
                    break;
                }
            }
        }

        if (next1 != null) {
            //喚醒下一個線程
            System.out.println(next1.thread.getName() + " 開始喚醒了 ====> ");
            LockSupport.unpark(next1.thread);
            System.out.println(next1.thread.getName() + " 已經(jīng)喚醒了 ====> ");

        }
    }

    /**
     * 將pre節(jié)點得waitStatus 設(shè)置成-1
     *
     * @param pre
     * @param node
     * @return
     */
    protected static boolean shouldParkAfterFailedAcquire(Node pre, Node node) {
        //獲取node節(jié)點得前一個節(jié)點得狀態(tài)
        int ws = pre.waitStatus;

        //如果是-1 就返回true
        if (ws == Node.signal) {
            return true;
        }

        if (ws > 0) {
            do {
                pre = pre.prev;
                node.prev = pre;
            } while (pre.waitStatus > 0);
            pre.next = node;
        } else {
            //設(shè)置成-1
            boolean flag = compareAndSetWaitStatus(pre, ws, Node.signal);
//            System.out.println("設(shè)置成-1是否成功:" + flag);
        }
        return false;
    }

    /**
     * 阻塞當前線程,并且返回當前線程得打斷狀態(tài)
     *
     * @return true: 打斷線程成功
     */
    protected final boolean parkAndCheckInterrupt() {
        //打斷線程,讓線程阻塞
        LockSupport.park(this);
        return Thread.interrupted();
    }

    public Node getHead() {
        return head;
    }

    public void setHead(Node head) {
        this.head = head;
    }
}

3.非公平所實現(xiàn)

package com.shi.flink.shilock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 自定義非公平鎖
 * @author shiye
 * @create 2021-03-30 11:02
 */
public class ShiNonfairLock extends ShiAQS implements Lock, java.io.Serializable {

    private static final long serialVersionUID = 7373984872572414699L;

    @Override
    public void lock() {
        if(compareAndSetState(0, 1)){
            //如果搶到了鎖,就把當前線程設(shè)置進去
            setExclusiveOwnerThread(Thread.currentThread());
        }else{
//            否則就去排隊
            acquire(1);
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        try {
            super.release(1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }


    /**
     *  打斷某個線程(自己瞎寫的,有bug)
     * @param thread
     * @return
     */
    public boolean interruptThread(Thread thread) throws Exception {
        return super.interruptThread(thread);
    }
}

4.測試

package com.shi.flink.shilock;

import java.util.concurrent.TimeUnit;

/**
 * @author shiye
 * @create 2021-03-31 17:09
 */
public class MyLockTest {

    public static void main(String[] args) throws Exception {
        ShiNonfairLock lock = new ShiNonfairLock();

        new Thread(() -> {
            try {
                System.out.println("A 線程進入到...加鎖過程");
                lock.lock();
                System.out.println("A 已經(jīng)搶占到鎖...休眠10s后運行......");
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("A線程運行完成,開始解鎖....");
                lock.unlock();
            }

        }, "A").start();


        TimeUnit.SECONDS.sleep(1);
        Thread B = new Thread(() -> {
            try {
                System.out.println("B 線程進入到...加鎖過程");
                lock.lock();
                System.out.println("B 已經(jīng)搶占到鎖...休眠10s后運行......");
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("B線程運行完成,開始解鎖....");
                lock.unlock();
            }

        }, "B");
        B.start();

        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            try {
                System.out.println("C 線程進入到...加鎖過程");
                lock.lock();
                System.out.println("C 已經(jīng)搶占到鎖...休眠10s后運行......");
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("C線程運行完成,開始解鎖....");
                lock.unlock();
            }

        }, "C").start();

        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            try {
                System.out.println("D 線程進入到...加鎖過程");
                lock.lock();
                System.out.println("D 已經(jīng)搶占到鎖...休眠10s后運行......");
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("D線程運行完成,開始解鎖....");
                lock.unlock();
            }

        }, "D").start();

//        TimeUnit.SECONDS.sleep(1);
//        System.out.println("強制讓 " + B.getName() + " 線程中斷...");
//        lock.interruptThread(B);
    }
}

看完上述內(nèi)容,你們掌握手寫AQS非公平鎖的示例分析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

aqs
AI