溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

JUC并發(fā)編程LinkedBlockingQueue隊(duì)列源碼分析

發(fā)布時(shí)間:2023-05-08 16:26:01 來(lái)源:億速云 閱讀:134 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了JUC并發(fā)編程LinkedBlockingQueue隊(duì)列源碼分析的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇JUC并發(fā)編程LinkedBlockingQueue隊(duì)列源碼分析文章都會(huì)有所收獲,下面我們一起來(lái)看看吧。

LinkedBlockingQueue介紹

在JUC包下關(guān)于線程安全的隊(duì)列實(shí)現(xiàn)有很多,那么此篇文章講解LinkedBlockingQueue的實(shí)現(xiàn)原理,相信各位讀者在線程池中能看到LinkedBlockingQueue或者SynchronousQueue隊(duì)列來(lái)作為儲(chǔ)存任務(wù)和消費(fèi)任務(wù)的通道。一個(gè)并發(fā)安全的隊(duì)列,在多線程中充當(dāng)著安全的傳輸任務(wù)的責(zé)任。

既然是介紹LinkedBlockingQueue,那么從構(gòu)造方法入手最合適不過(guò)。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化一個(gè)偽節(jié)點(diǎn),讓head和last都指向這個(gè)偽節(jié)點(diǎn)
    // 為什么需要偽節(jié)點(diǎn)的存在?
    // 因?yàn)榭梢员WC不會(huì)發(fā)生極端情況(假設(shè)沒(méi)有偽節(jié)點(diǎn),并且只存在一個(gè)節(jié)點(diǎn)的情況下,生產(chǎn)者和消費(fèi)者并發(fā)執(zhí)行就可能出現(xiàn)極端情況)
    last = head = new Node<E>(null);
}

JUC并發(fā)編程LinkedBlockingQueue隊(duì)列源碼分析

為什么需要存在偽節(jié)點(diǎn),因?yàn)榭梢员WC不會(huì)發(fā)生極端情況(假設(shè)沒(méi)有偽節(jié)點(diǎn),并且只存在一個(gè)節(jié)點(diǎn)的情況下,生產(chǎn)者和消費(fèi)者并發(fā)執(zhí)行就可能出現(xiàn)極端情況,用偽節(jié)點(diǎn)就能很好的解決這個(gè)極端問(wèn)題)

/**
 * 因?yàn)槭顷?duì)列,用鏈表實(shí)現(xiàn),所以頭尾指針肯定不可少。
 * */
transient Node<E> head;
private transient Node<E> last;
/**
 * 我們可以很清楚的看到,這里使用了2套R(shí)eentrantLock和對(duì)應(yīng)的condition條件等待隊(duì)列。
 * 目的也很明顯,讓生產(chǎn)者和消費(fèi)者并行。
 * */
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

2套R(shí)eentrantLock和對(duì)應(yīng)的condition條件等待隊(duì)列,很明顯目的是為了讓生產(chǎn)者和消費(fèi)者并行,所以就需要一個(gè)偽節(jié)點(diǎn)處理極端并發(fā)情況。

為了,一些沒(méi)有接觸過(guò)隊(duì)列的讀者,所以這里還是介紹一下API把

API用途注意事項(xiàng)
offer生產(chǎn)者不會(huì)阻塞,如果插入失敗,或者隊(duì)列已經(jīng)滿了,直接返回
poll消費(fèi)者不會(huì)阻塞,如果消費(fèi)失敗,或者隊(duì)列當(dāng)前為空,直接返回
put生產(chǎn)者會(huì)阻塞,如果插入失敗或者隊(duì)列已經(jīng)滿了,阻塞直到插入成功
take消費(fèi)者會(huì)阻塞,如果消費(fèi)失敗或者當(dāng)前隊(duì)列為空,阻塞直到消費(fèi)成功

put方法-生產(chǎn)者

public void put(E e) throws InterruptedException {
    // 不能插入null
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 創(chuàng)建插入的節(jié)點(diǎn)。
    Node<E> node = new Node<E>(e);
    // 拿到生產(chǎn)者的鎖對(duì)象
    final ReentrantLock putLock = this.putLock;
    // 拿到全局計(jì)數(shù)器,注意這里用的是AtomicInteger,所以自增的原子性已經(jīng)保證。
    final AtomicInteger count = this.count;
    // 上的是可響應(yīng)中斷鎖。
    putLock.lockInterruptibly();
    try {
        // 如果當(dāng)前隊(duì)列已經(jīng)滿了,此時(shí)我們就要去阻塞,等待隊(duì)列被消費(fèi),我們要被喚醒,醒來(lái)生產(chǎn)節(jié)點(diǎn)。
        while (count.get() == capacity) {
            // 進(jìn)入條件等待隊(duì)列阻塞。
            // 注意,只要阻塞,是會(huì)釋放鎖的,其他生產(chǎn)者線程可以搶到鎖。
            notFull.await();
        }
        // 插入到隊(duì)列尾部
        enqueue(node);
        // 因?yàn)椴迦肓斯?jié)點(diǎn),所以全局計(jì)數(shù)需要+1
        // 但是這里請(qǐng)注意細(xì)節(jié),getAndIncrement方法返回的是舊值。
        c = count.getAndIncrement();
        // 這里是一個(gè)很sao的點(diǎn)
        // 注意,這里只要當(dāng)前隊(duì)列沒(méi)滿,喚醒的是生產(chǎn)者的條件等待隊(duì)列。
        // 為什么要這么做?
        // 很簡(jiǎn)單,首先需要考慮,生產(chǎn)者和消費(fèi)者是并發(fā)執(zhí)行了。 
        // 其次,只要隊(duì)列沒(méi)滿就能一直生產(chǎn),那么隊(duì)列一旦滿了后,后來(lái)的線程就都去條件隊(duì)列阻塞,所以線程生產(chǎn)完一個(gè)節(jié)點(diǎn)就有必要去喚醒等待的同胞(不管有沒(méi)有同胞在阻塞,這是義務(wù))
        if (c + 1 < capacity)
            // 喚醒條件等待隊(duì)列中頭部節(jié)點(diǎn)。
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 這里也是一個(gè)很sao的點(diǎn)
    // 再次強(qiáng)調(diào),getAndIncrement方法是返回的舊值
    // 所以當(dāng)前生產(chǎn)者如果生產(chǎn)的是第一個(gè)節(jié)點(diǎn),那么c ==0
    // 而隊(duì)列中沒(méi)有節(jié)點(diǎn),消費(fèi)者是要阻塞的
    // 也即,這里給隊(duì)列生產(chǎn)了一個(gè)節(jié)點(diǎn),要喚醒消費(fèi)者去消費(fèi)節(jié)點(diǎn)。
    if (c == 0)
        signalNotEmpty();
}
// 插入到隊(duì)列尾部
// 因?yàn)镽eentrantLock保證了整體的原子性,所以這里細(xì)節(jié)部分不需要保證原子性了。
private void enqueue(Node<E> node) {
    // 插入到尾部
    last = last.next = node;
}

第一次看到這個(gè)代碼難免會(huì)發(fā)生震撼,為什么在生產(chǎn)者代碼里面喚醒生產(chǎn)者?不是正常寫的生產(chǎn)者消費(fèi)者模型,不都是生產(chǎn)者生產(chǎn)一個(gè)喚醒消費(fèi)者消費(fèi)嗎?怎么這里不一樣??????

因?yàn)檫@里生產(chǎn)者和消費(fèi)者并行處理,當(dāng)隊(duì)列滿了以后,后來(lái)的生產(chǎn)者線程都會(huì)去阻塞,所以生產(chǎn)者線程生產(chǎn)完一個(gè)節(jié)點(diǎn)就有必要去喚醒等待的同胞(不管有沒(méi)有同胞在阻塞,這是義務(wù))

大致流程如下:

  • 創(chuàng)建Node節(jié)點(diǎn)

  • 上生產(chǎn)者鎖

  • 如果隊(duì)列已經(jīng)滿了,就去生產(chǎn)者條件隊(duì)列阻塞

  • 如果沒(méi)滿,或者喚醒后,就插入到last指針的后面

  • 全局節(jié)點(diǎn)計(jì)數(shù)器+1

  • 如果當(dāng)前隊(duì)列還有空間,就喚醒在阻塞的同胞。

  • 釋放鎖

  • 如果在生產(chǎn)之前隊(duì)列為空,本次生產(chǎn)后就需要喚醒在阻塞的消費(fèi)者線程,讓他們醒來(lái)消費(fèi)我剛生產(chǎn)的節(jié)點(diǎn)

take方法-消費(fèi)者

public E take() throws InterruptedException {
    E x;
    int c = -1;
    // 全局計(jì)數(shù)器
    final AtomicInteger count = this.count;
    // 消費(fèi)者的鎖對(duì)象
    final ReentrantLock takeLock = this.takeLock;
    // 可響應(yīng)中斷鎖。
    takeLock.lockInterruptibly();
    try {
        // 如果當(dāng)前隊(duì)列中沒(méi)有節(jié)點(diǎn),此時(shí)消費(fèi)者需要去阻塞,因?yàn)椴蛔枞粫?huì)浪費(fèi)CPU性能,又消費(fèi)不到節(jié)點(diǎn)。
        while (count.get() == 0) {
            // 去消費(fèi)者的條件隊(duì)列阻塞。
            notEmpty.await();
        }
        // 醒來(lái)后,去消費(fèi)節(jié)點(diǎn)。
        x = dequeue();
        // 給全局計(jì)數(shù)器-1,但是這里也要注意,返回的是舊值
        c = count.getAndDecrement();
        // 如果隊(duì)列中還有節(jié)點(diǎn)就喚醒其他消費(fèi)者去消費(fèi)節(jié)點(diǎn)。
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 這里也是一個(gè)sao點(diǎn)
    // 請(qǐng)注意,這里的c是舊值,因?yàn)間etAndDecrement返回的是舊值
    // 所以,如果當(dāng)前消費(fèi)線程消費(fèi)節(jié)點(diǎn)之前隊(duì)列是滿的,當(dāng)消費(fèi)完畢后,我有必要去喚醒因?yàn)殛?duì)列滿了而阻塞等待的生產(chǎn)者,因?yàn)楫?dāng)前已經(jīng)空出一個(gè)空間了。
    if (c == capacity)
        // 喚醒生產(chǎn)者
        signalNotFull();
    return x;
}
// 消費(fèi)者消費(fèi)節(jié)點(diǎn)
// 所以需要HelpGC 
// 不過(guò)這里要注意,head都是指向偽節(jié)點(diǎn)。
private E dequeue() {
    // 拿到頭節(jié)點(diǎn),
    Node<E> h = head;
    // 拿到頭節(jié)點(diǎn)的next節(jié)點(diǎn),next節(jié)點(diǎn)作為下一個(gè)head節(jié)點(diǎn)。
    // 因?yàn)閔ead節(jié)點(diǎn)是指向偽節(jié)點(diǎn),所以head.next節(jié)點(diǎn)就是當(dāng)前要消費(fèi)的節(jié)點(diǎn)。
    Node<E> first = h.next;
    // 將當(dāng)前的頭結(jié)點(diǎn)的next指向自己。
    h.next = h; // help GC
    // 設(shè)置新的頭結(jié)點(diǎn),也即把當(dāng)前消費(fèi)的節(jié)點(diǎn)做為下次的偽節(jié)點(diǎn)
    // head節(jié)點(diǎn)指向的都是偽節(jié)點(diǎn)
    head = first;
    // 拿到當(dāng)前消費(fèi)者想要的數(shù)據(jù)
    E x = first.item;
    first.item = null;
    return x;
}

這里跟put生產(chǎn)者基本思想一致,只不過(guò)這里是消費(fèi)者,因?yàn)槭巧a(chǎn)者消費(fèi)者并行,所以這里也是喚醒同胞,因?yàn)楫?dāng)隊(duì)列為空所有的消費(fèi)者都會(huì)阻塞,所以每次消費(fèi)者線程消費(fèi)完節(jié)點(diǎn)后 ,有義務(wù)喚醒同胞。

大致流程如下:

  • 拿到全局計(jì)數(shù)器

  • 上消費(fèi)者鎖

  • 如果當(dāng)前隊(duì)列為空,當(dāng)前消費(fèi)者線程就要去阻塞

  • 如果不為空,或者被喚醒以后消費(fèi)節(jié)點(diǎn),把消費(fèi)的節(jié)點(diǎn)作為下一次的偽節(jié)點(diǎn),也即作為head節(jié)點(diǎn)

  • 全局計(jì)數(shù)器-1

  • 喚醒同胞

  • 釋放鎖

  • 如果在消費(fèi)之前隊(duì)列已經(jīng)滿了,那么可能會(huì)有生產(chǎn)者線程在阻塞,所以我有義務(wù)去喚醒他們

關(guān)于“JUC并發(fā)編程LinkedBlockingQueue隊(duì)列源碼分析”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“JUC并發(fā)編程LinkedBlockingQueue隊(duì)列源碼分析”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI