您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Java阻塞隊(duì)列的原理和使用場景”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
在數(shù)據(jù)結(jié)構(gòu)中,隊(duì)列遵循FIFO(先進(jìn)先出)原則。在java中,Queue
接口定義了定義了基本行為,由子類完成實(shí)現(xiàn),常見的隊(duì)列有ArrayDeque
、LinkedList
等,這些都是非線程安全的,在java 1.5中新增了阻塞隊(duì)列,當(dāng)隊(duì)列滿時(shí),添加元素的線程呈阻塞狀態(tài);當(dāng)隊(duì)列為空時(shí),獲取元素的線程呈阻塞狀態(tài)。
生產(chǎn)者將元素添加到隊(duì)列中,消費(fèi)中獲取數(shù)據(jù)后完成數(shù)據(jù)處理。兩者通過隊(duì)列解決了生產(chǎn)者和消費(fèi)者的耦合關(guān)系;當(dāng)生產(chǎn)者的生產(chǎn)速度與消費(fèi)者的消費(fèi)速度不一致時(shí),可以通過大道緩沖的目的。
線程池
在線程池中,當(dāng)工作線程數(shù)大于等于corePoolSize時(shí),后續(xù)的任務(wù)后添加到阻塞隊(duì)列中;
在java中,BlockingQueue
接口定義了阻塞隊(duì)列的行為,常用子類是ArrayBlockingQueue
和LinkedBlockingQueue
。
BlockingQueue
繼承了Queue
接口,擁有其全部特性。在BlockingQueue
的java doc中對(duì)其中的操作方法做了匯總
插入元素
add(e):當(dāng)隊(duì)列已滿時(shí),再添加元素會(huì)拋出異常IllegalStateException
offer(e):添加成功,返回true,否則返回false
put:(e):當(dāng)隊(duì)列已滿時(shí),再添加元素會(huì)使線程變?yōu)樽枞麪顟B(tài)
offer(e, time,unit):當(dāng)隊(duì)列已滿時(shí),在末尾添加數(shù)據(jù),如果在指定時(shí)間內(nèi)沒有添加成功,返回false,反之是true
刪除元素
remove(e):返回true表示已成功刪除,否則返回false
poll():如果隊(duì)列為空返回null,否則返回隊(duì)列中的第一個(gè)元素
take():獲取隊(duì)列中的第一個(gè)元素,如果隊(duì)列為空,獲取元素的線程變?yōu)樽枞麪顟B(tài)
poll(time, unit):當(dāng)隊(duì)列為空時(shí),線程被阻塞,如果超過指定時(shí)間,線程退出
檢查元素
element():獲取隊(duì)頭元素,如果元素為null,拋出NoSuchElementException
peek():獲取隊(duì)頭元素,如果隊(duì)列為空返回null,否則返回目標(biāo)元素
底層基于數(shù)組的有界阻塞隊(duì)列,在構(gòu)造此隊(duì)列時(shí)必須指定容量;
構(gòu)造函數(shù)
// 第一個(gè) public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } // 第二個(gè) public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } // 第三個(gè) public ArrayBlockingQueue(int capacity) { this(capacity, false); }
capacity:隊(duì)列的初始容量
fair:線程訪問隊(duì)列的公平性。如果為true按照FIFO的原則處理,反之;默認(rèn)為falsec:
已有元素的集合,類型于合并兩個(gè)數(shù)組
put()方法
public void put(E e) throws InterruptedException { // 檢查元素是否為null checkNotNull(e); final ReentrantLock lock = this.lock; // 獲取鎖 lock.lockInterruptibly(); try { // 如果當(dāng)前隊(duì)列為空,變?yōu)樽枞麪顟B(tài) while (count == items.length) notFull.await(); // 反之,就添加元素 enqueue(e); } finally { // 解鎖 lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 此時(shí)隊(duì)列不為空,喚醒消費(fèi)者 notEmpty.signal(); }
take()方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 獲取鎖 lock.lockInterruptibly(); try { // 如果隊(duì)列為空,消費(fèi)者變?yōu)樽枞麪顟B(tài) while (count == 0) notEmpty.await(); // 不為空,就獲取數(shù)據(jù) return dequeue(); } finally { // 解鎖 lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") // 獲取隊(duì)頭元素x E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 此時(shí)隊(duì)列沒有滿,同時(shí)生產(chǎn)者繼續(xù)添加數(shù)據(jù) notFull.signal(); return x; }
底層基于單向鏈表的無界阻塞隊(duì)列,如果不指定初始容量,默認(rèn)為Integer.MAX_VALUE
,否則為指定容量
構(gòu)造函數(shù)
// 不指定容量 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 指定容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } // 等同于合并數(shù)組 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put()方法
public void put(E e) throws InterruptedException { // 元素為空,拋出異常 if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // 獲取隊(duì)列中的數(shù)據(jù)量 final AtomicInteger count = this.count; // 獲取鎖 putLock.lockInterruptibly(); try { // 隊(duì)列滿了,變?yōu)樽枞麪顟B(tài) while (count.get() == capacity) { notFull.await(); } // 將目標(biāo)元素添加到鏈表的尾端 enqueue(node); // 總數(shù)增加 c = count.getAndIncrement(); // 隊(duì)列還沒有滿,繼續(xù)添加元素 if (c + 1 < capacity) notFull.signal(); } finally { // 解鎖 putLock.unlock(); } if (c == 0) signalNotEmpty(); }
take()方法
public E take() throws InterruptedException { E x; int c = -1; // 獲取隊(duì)列中的工作數(shù) final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 獲取鎖 takeLock.lockInterruptibly(); try { // 如果隊(duì)列為空,變?yōu)樽枞麪顟B(tài) while (count.get() == 0) { notEmpty.await(); } // 獲取隊(duì)頭元素 x = dequeue(); // 遞減 c = count.getAndDecrement(); // 通知消費(fèi)者 if (c > 1) notEmpty.signal(); } finally { // 解鎖 takeLock.unlock(); } if (c == capacity) // signalNotFull(); return x; }
相同點(diǎn)
兩者都是通過Condition通知生產(chǎn)者和消費(fèi)者完成元素的添加和獲取
都可以指定容量
不同點(diǎn)
ArrayBlockingQueue
基于數(shù)據(jù),LinkedBlockingQueue
基于鏈表
ArrayBlockingQueue
內(nèi)有一把鎖,LinkedBlockingQueue
內(nèi)有兩把鎖
自己動(dòng)手實(shí)現(xiàn)一個(gè)阻塞隊(duì)列
通過分析源碼可以知道,阻塞隊(duì)列其實(shí)是通過通知機(jī)制Condition完成生產(chǎn)者和消費(fèi)的互通。也可以通過Object
類中的wait()
和notify
、notifyAll
實(shí)現(xiàn)。下面是自己寫的一個(gè)阻塞隊(duì)列
public class BlockQueue { // 對(duì)象鎖 public static final Object LOCK = new Object(); // 控制變量的值 來通知雙方 public boolean condition; public void put() { synchronized (LOCK) { while (condition) { try { // 滿了 System.out.println("put 隊(duì)列滿了,開始阻塞"); LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } condition = true; System.out.println("put 改為true,喚醒消費(fèi)者"); LOCK.notifyAll(); } } public void take() { synchronized (LOCK) { while (!condition) { // 沒滿 System.out.println("take 隊(duì)列沒滿,開始阻塞"); try { LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } condition = false; System.out.println("take 改為false,喚醒生產(chǎn)者"); LOCK.notifyAll(); } } }
“Java阻塞隊(duì)列的原理和使用場景”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。