溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

ArrayBlockingQueue 1.8 源碼淺析

發(fā)布時間:2020-08-06 14:22:11 來源:網(wǎng)絡(luò) 閱讀:620 作者:wx5c78c8b1dbb1b 欄目:編程語言

[TOC]

ArrayBlockingQueue 1.8 源碼淺析

一,簡介

ArrayBlockingQueue 是一個用數(shù)組實(shí)現(xiàn)的有界隊(duì)列;此隊(duì)列按照先進(jìn)先出(FIFO)的規(guī)則對元素進(jìn)行排序;默認(rèn)情況下不保證線程公平的訪問隊(duì)列,所謂公平訪問隊(duì)列是指阻塞的線程,可以按照阻塞的先后順序的訪問隊(duì)列,即先阻塞的線程先訪問隊(duì)列;非公平性是對先等待的線程是非公平的,當(dāng)隊(duì)列可用時,阻塞的線程都可以爭奪訪問隊(duì)列的資格,有可能先阻塞的線程最后才訪問;為了保證公平性,通常會降低吞吐量。

二,類UML圖

ArrayBlockingQueue 1.8 源碼淺析

三,基本成員
    /** The queued items */
    // 記錄數(shù)據(jù)的數(shù)組
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    // 索引用于 take,poll,peek,remove 等方法
    int takeIndex;

    /** items index for next put, offer, or add */
    // 索引用于 put,offer,or add 等方法
    int putIndex;

    /** Number of elements in the queue */
    // 總數(shù)
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    // 隊(duì)列的鎖
    final ReentrantLock lock;

    /** Condition for waiting takes */
    // 用于讓線程等待,消費(fèi)時隊(duì)列為空
    private final Condition notEmpty;

    /** Condition for waiting puts */
    // 用于讓線程等待,生產(chǎn)時隊(duì)列滿
    private final Condition notFull;
四,常用方法
構(gòu)造方法

我們看下兩個構(gòu)造,其實(shí)也就是一個,注意沒有無參構(gòu)造,初始化時必須要給出容量。

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    // 初始化一個ArrayBlockingQueue
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // 初始化一個數(shù)組
        this.items = new Object[capacity];
        // 初始化一個鎖
        lock = new ReentrantLock(fair);
        // 用來存放消費(fèi)者的阻塞線程
        notEmpty = lock.newCondition();
        // 用來存放生產(chǎn)者的線程
        notFull =  lock.newCondition();
    }
add 方法

可以看出add調(diào)用的是offer方法,詳情請看offer方法。

    public boolean add(E e) {
        // 調(diào)用父類的方法
        return super.add(e);
    }

    // 父類 AbstractQueue 的add方法
     public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

注意:add 插入失敗會拋異常。

offer 方法
    // offer加入元素
    public boolean offer(E e) {
        // 不能為null
        checkNotNull(e);
        // 獲取鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果數(shù)組滿了,返回false
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    // enqueue

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        // 獲取數(shù)組
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 喚醒消費(fèi)阻塞的隊(duì)列
        notEmpty.signal();
    }

注意:offer還有一個重載方法,帶有超時時間的插入,支持中斷offer(E e, long timeout, TimeUnit unit)。

put 方法
public void put(E e) throws InterruptedException {
        // 不能為null
        checkNotNull(e);
        // 獲取鎖
        final ReentrantLock lock = this.lock;
        // 支持中斷
        lock.lockInterruptibly();
        try {
            // 等于數(shù)組的容量
            while (count == items.length)
                // 等待
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

注意:put和前面的offer要區(qū)別,offer方法隊(duì)列滿是返回false,put方法是讓線程等待,根據(jù)自己的場景用合適的方法。

poll 方法
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

注意:poll也有一個重載方法,帶有超時和中斷poll(long timeout, TimeUnit unit)。

take 方法
    // 消費(fèi)
    public E take() throws InterruptedException {
        // 獲取鎖
        final ReentrantLock lock = this.lock;
        // 支持中斷
        lock.lockInterruptibly();
        try {
            // 隊(duì)列為空
            while (count == 0)
                // 阻塞
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

注意:take和poll也是一對方法,poll隊(duì)列為空返回null,take是讓線程等待,直到喚醒。

peek 方法
// 獲取隊(duì)尾的元素 不刪除
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
size 方法
    // 統(tǒng)計(jì)個數(shù) size是準(zhǔn)確值
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
五,總結(jié)

ArrayBlockingQueue 是有界的,所以我們在初始化是容量要設(shè)計(jì)好,因?yàn)樗遣豢梢詳U(kuò)容的,還有我覺得這個隊(duì)列適合一些穩(wěn)定并發(fā)量的系統(tǒng),如果并發(fā)量突然變大,導(dǎo)致隊(duì)列滿,會造成大量的線程等待,影響系統(tǒng)的響應(yīng);我們通過閱讀源碼也發(fā)現(xiàn)隊(duì)列的源碼是很輕量的,使用起來也很簡單,讓人很好理解;使用這個隊(duì)列一定要注意put,offer,take,poll這兩組方法,根據(jù)自己的業(yè)務(wù)場景選擇是直接返回(響應(yīng)速度快)還是阻塞線程。

參考:《Java 并發(fā)編程的藝術(shù)》

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI