溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java阻塞隊(duì)列的原理和使用場景

發(fā)布時(shí)間:2021-06-16 16:46:42 來源:億速云 閱讀:361 作者:chen 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Java阻塞隊(duì)列的原理和使用場景”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

什么是阻塞隊(duì)列

在數(shù)據(jù)結(jié)構(gòu)中,隊(duì)列遵循FIFO(先進(jìn)先出)原則。在java中,Queue接口定義了定義了基本行為,由子類完成實(shí)現(xiàn),常見的隊(duì)列有ArrayDequeLinkedList等,這些都是非線程安全的,在java 1.5中新增了阻塞隊(duì)列,當(dāng)隊(duì)列滿時(shí),添加元素的線程呈阻塞狀態(tài);當(dāng)隊(duì)列為空時(shí),獲取元素的線程呈阻塞狀態(tài)。

生產(chǎn)者、消費(fèi)者模型

Java阻塞隊(duì)列的原理和使用場景

生產(chǎn)者將元素添加到隊(duì)列中,消費(fèi)中獲取數(shù)據(jù)后完成數(shù)據(jù)處理。兩者通過隊(duì)列解決了生產(chǎn)者和消費(fèi)者的耦合關(guān)系;當(dāng)生產(chǎn)者的生產(chǎn)速度與消費(fèi)者的消費(fèi)速度不一致時(shí),可以通過大道緩沖的目的。

阻塞隊(duì)列的使用場景

線程池

在線程池中,當(dāng)工作線程數(shù)大于等于corePoolSize時(shí),后續(xù)的任務(wù)后添加到阻塞隊(duì)列中;

目前有那些阻塞隊(duì)列

在java中,BlockingQueue接口定義了阻塞隊(duì)列的行為,常用子類是ArrayBlockingQueueLinkedBlockingQueue

Java阻塞隊(duì)列的原理和使用場景

BlockingQueue繼承了Queue接口,擁有其全部特性。在BlockingQueue的java doc中對(duì)其中的操作方法做了匯總

Java阻塞隊(duì)列的原理和使用場景

插入元素

  • add(e):當(dāng)隊(duì)列已滿時(shí),再添加元素會(huì)拋出異常IllegalStateException

  • offer(e):添加成功,返回true,否則返回false

  • put:(e):當(dāng)隊(duì)列已滿時(shí),再添加元素會(huì)使線程變?yōu)樽枞麪顟B(tài)

  • offer(e, time,unit):當(dāng)隊(duì)列已滿時(shí),在末尾添加數(shù)據(jù),如果在指定時(shí)間內(nèi)沒有添加成功,返回false,反之是true

刪除元素

  •  remove(e):返回true表示已成功刪除,否則返回false

  • poll():如果隊(duì)列為空返回null,否則返回隊(duì)列中的第一個(gè)元素

  • take():獲取隊(duì)列中的第一個(gè)元素,如果隊(duì)列為空,獲取元素的線程變?yōu)樽枞麪顟B(tài)

  • poll(time, unit):當(dāng)隊(duì)列為空時(shí),線程被阻塞,如果超過指定時(shí)間,線程退出

檢查元素

  •  element():獲取隊(duì)頭元素,如果元素為null,拋出NoSuchElementException

  • peek():獲取隊(duì)頭元素,如果隊(duì)列為空返回null,否則返回目標(biāo)元素

ArrayBlockingQueue

底層基于數(shù)組的有界阻塞隊(duì)列,在構(gòu)造此隊(duì)列時(shí)必須指定容量;

構(gòu)造函數(shù)

// 第一個(gè)	
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

	// 第二個(gè)
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

	// 第三個(gè)
	public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
  • capacity:隊(duì)列的初始容量

  • fair:線程訪問隊(duì)列的公平性。如果為true按照FIFO的原則處理,反之;默認(rèn)為falsec:

  • 已有元素的集合,類型于合并兩個(gè)數(shù)組

put()方法

public void put(E e) throws InterruptedException {
         // 檢查元素是否為null
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 獲取鎖
        lock.lockInterruptibly();
        try {
            // 如果當(dāng)前隊(duì)列為空,變?yōu)樽枞麪顟B(tài)
            while (count == items.length)
                notFull.await();
            // 反之,就添加元素
            enqueue(e);
        } finally {
            // 解鎖
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 此時(shí)隊(duì)列不為空,喚醒消費(fèi)者
        notEmpty.signal();
    }

take()方法

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 獲取鎖
        lock.lockInterruptibly();
        try {
            // 如果隊(duì)列為空,消費(fèi)者變?yōu)樽枞麪顟B(tài)
            while (count == 0)
                notEmpty.await();
            // 不為空,就獲取數(shù)據(jù)
            return dequeue();
        } finally {
            // 解鎖
            lock.unlock();
        }
    }

        private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 獲取隊(duì)頭元素x
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
         // 此時(shí)隊(duì)列沒有滿,同時(shí)生產(chǎn)者繼續(xù)添加數(shù)據(jù)
        notFull.signal();
        return x;
    }

LinkedBlockingQueue

底層基于單向鏈表的無界阻塞隊(duì)列,如果不指定初始容量,默認(rèn)為Integer.MAX_VALUE,否則為指定容量

構(gòu)造函數(shù)

// 不指定容量     
	public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
	// 指定容量
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

	// 等同于合并數(shù)組
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

put()方法

 public void put(E e) throws InterruptedException {
        // 元素為空,拋出異常
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        // 獲取隊(duì)列中的數(shù)據(jù)量
        final AtomicInteger count = this.count;
        // 獲取鎖
        putLock.lockInterruptibly();
        try {
            // 隊(duì)列滿了,變?yōu)樽枞麪顟B(tài)
            while (count.get() == capacity) {
                notFull.await();
            }
            // 將目標(biāo)元素添加到鏈表的尾端
            enqueue(node);
            // 總數(shù)增加
            c = count.getAndIncrement();
            // 隊(duì)列還沒有滿,繼續(xù)添加元素
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 解鎖
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

take()方法

public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 獲取隊(duì)列中的工作數(shù)
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 獲取鎖
        takeLock.lockInterruptibly();
        try {
            // 如果隊(duì)列為空,變?yōu)樽枞麪顟B(tài)
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 獲取隊(duì)頭元素
            x = dequeue();
            // 遞減
            c = count.getAndDecrement();
            // 通知消費(fèi)者
            if (c > 1)
                notEmpty.signal();
        } finally {
            // 解鎖
            takeLock.unlock();
        }
        if (c == capacity)
            // 
            signalNotFull();
        return x;
    }

對(duì)比

相同點(diǎn)

  • 兩者都是通過Condition通知生產(chǎn)者和消費(fèi)者完成元素的添加和獲取

  • 都可以指定容量

不同點(diǎn)

  •  ArrayBlockingQueue基于數(shù)據(jù),LinkedBlockingQueue基于鏈表

  • ArrayBlockingQueue內(nèi)有一把鎖,LinkedBlockingQueue內(nèi)有兩把鎖

Java阻塞隊(duì)列的原理和使用場景
Java阻塞隊(duì)列的原理和使用場景 

自己動(dòng)手實(shí)現(xiàn)一個(gè)阻塞隊(duì)列

通過分析源碼可以知道,阻塞隊(duì)列其實(shí)是通過通知機(jī)制Condition完成生產(chǎn)者和消費(fèi)的互通。也可以通過Object類中的wait()notifynotifyAll實(shí)現(xiàn)。下面是自己寫的一個(gè)阻塞隊(duì)列

public class BlockQueue {
    // 對(duì)象鎖
    public static final Object LOCK = new Object();
    // 控制變量的值 來通知雙方
    public boolean condition;
    
    public void put() {
        synchronized (LOCK) {
            while (condition) {
                try {
                    // 滿了
                    System.out.println("put   隊(duì)列滿了,開始阻塞");
                    LOCK.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            condition = true;
            System.out.println("put   改為true,喚醒消費(fèi)者");
            LOCK.notifyAll();
        }
    }


    public void take() {
        synchronized (LOCK) {
            while (!condition) {
                // 沒滿
                System.out.println("take   隊(duì)列沒滿,開始阻塞");
                try {
                    LOCK.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            condition = false;
            System.out.println("take   改為false,喚醒生產(chǎn)者");
            LOCK.notifyAll();
        }
    }
}

“Java阻塞隊(duì)列的原理和使用場景”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI