您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)怎么在Java高并發(fā)中實(shí)現(xiàn)一個(gè)BlockingQueue功能,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
有界的阻塞隊(duì)列,內(nèi)部是一個(gè)數(shù)組,有邊界的意思是:容量是有限的,必須進(jìn)行初始化,指定它的容量大小,以先進(jìn)先出的方式存儲(chǔ)數(shù)據(jù),最新插入的在對(duì)尾,最先移除的對(duì)象在頭部。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 隊(duì)列元素 */ final Object[] items; /** 下一次讀取操作的位置, poll, peek or remove */ int takeIndex; /** 下一次寫入操作的位置, offer, or add */ int putIndex; /** 元素?cái)?shù)量 */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. * 它采用一個(gè) ReentrantLock 和相應(yīng)的兩個(gè) Condition 來實(shí)現(xiàn)。 */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** 指定大小 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 指定容量大小與指定訪問策略 * @param fair 指定獨(dú)占鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的線程獲取到鎖; */ public ArrayBlockingQueue(int capacity, boolean fair) {} /** * 指定容量大小、指定訪問策略與最初包含給定集合中的元素 * @param c 將此集合中的元素在構(gòu)造方法期間就先添加到隊(duì)列中 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {} }
ArrayBlockingQueue 在生產(chǎn)者放入數(shù)據(jù)和消費(fèi)者獲取數(shù)據(jù),都是共用一個(gè)鎖對(duì)象,由此也意味著兩者無法真正并行運(yùn)行。按照實(shí)現(xiàn)原理來分析, ArrayBlockingQueue 完全可以采用分離鎖,從而實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者操作的完全并行運(yùn)行。然而事實(shí)上并沒有如此,因?yàn)?ArrayBlockingQueue 的數(shù)據(jù)寫入已經(jīng)足夠輕巧,以至于引入獨(dú)立的鎖機(jī)制,除了給代碼帶來額外的復(fù)雜性外,其在性能上完全占不到任何便宜。
通過構(gòu)造函數(shù)得知,參數(shù) fair 控制對(duì)象內(nèi)部是否采用公平鎖,默認(rèn)采用非公平鎖。
items、takeIndex、putIndex、count 等屬性并沒有使用 volatile 修飾,這是因?yàn)樵L問這些變量(通過方法獲取)使用都在鎖內(nèi),并不存在可見性問題,如 size() 。
另外有個(gè)獨(dú)占鎖 lock 用來對(duì)出入對(duì)操作加鎖,這導(dǎo)致同時(shí)只有一個(gè)線程可以訪問入隊(duì)出隊(duì)。
/** 進(jìn)行入隊(duì)操作 */ public void put(E e) throws InterruptedException { //e為null,則拋出NullPointerException異常 checkNotNull(e); //獲取獨(dú)占鎖 final ReentrantLock lock = this.lock; /** * lockInterruptibly() * 獲取鎖定,除非當(dāng)前線程為interrupted * 如果鎖沒有被另一個(gè)線程占用并且立即返回,則將鎖定計(jì)數(shù)設(shè)置為1。 * 如果當(dāng)前線程已經(jīng)保存此鎖,則保持計(jì)數(shù)將遞增1,該方法立即返回。 * 如果鎖被另一個(gè)線程保持,則當(dāng)前線程將被禁用以進(jìn)行線程調(diào)度,并且處于休眠狀態(tài) * */ lock.lockInterruptibly(); try { //空隊(duì)列 while (count == items.length) //進(jìn)行條件等待處理 notFull.await(); //入隊(duì)操作 enqueue(e); } finally { //釋放鎖 lock.unlock(); } } /** 真正的入隊(duì) */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; //獲取當(dāng)前元素 final Object[] items = this.items; //按下一個(gè)插入索引進(jìn)行元素添加 items[putIndex] = x; // 計(jì)算下一個(gè)元素應(yīng)該存放的下標(biāo),可以理解為循環(huán)隊(duì)列 if (++putIndex == items.length) putIndex = 0; count++; //喚起消費(fèi)者 notEmpty.signal(); }
這里由于在操作共享變量前加了鎖,所以不存在內(nèi)存不可見問題,加鎖后獲取的共享變量都是從主內(nèi)存中獲取的,而不是在CPU緩存或者寄存器里面的值,釋放鎖后修改的共享變量值會(huì)刷新到主內(nèi)存。
另外這個(gè)隊(duì)列使用循環(huán)數(shù)組實(shí)現(xiàn),所以在計(jì)算下一個(gè)元素存放下標(biāo)時(shí)候有些特殊。另外 insert 后調(diào)用 notEmpty.signal() ;是為了激活調(diào)用 notEmpty.await(); 阻塞后放入 notEmpty 條件隊(duì)列的線程。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; //這里有些特殊 if (itrs != null) //保持隊(duì)列中的元素和迭代器的元素一致 itrs.elementDequeued(); notFull.signal(); return x; }
Take 操作和 Put 操作很類似
//該類的迭代器,所有的迭代器共享數(shù)據(jù),隊(duì)列改變會(huì)影響所有的迭代器 transient Itrs itrs = null; //其存放了目前所創(chuàng)建的所有迭代器。 /** * 迭代器和它們的隊(duì)列之間的共享數(shù)據(jù),允許隊(duì)列元素被刪除時(shí)更新迭代器的修改。 */ class Itrs { void elementDequeued() { // assert lock.getHoldCount() == 1; if (count == 0) //隊(duì)列中數(shù)量為0的時(shí)候,隊(duì)列就是空的,會(huì)將所有迭代器進(jìn)行清理并移除 queueIsEmpty(); //takeIndex的下標(biāo)是0,意味著隊(duì)列從尾中取完了,又回到頭部獲取 else if (takeIndex == 0) takeIndexWrapped(); } /** * 當(dāng)隊(duì)列為空的時(shí)候做的事情 * 1. 通知所有迭代器隊(duì)列已經(jīng)為空 * 2. 清空所有的弱引用,并且將迭代器置空 */ void queueIsEmpty() {} /** * 將takeIndex包裝成0 * 并且通知所有的迭代器,并且刪除已經(jīng)過期的任何對(duì)象(個(gè)人理解是置空對(duì)象) * 也直接的說就是在Blocking隊(duì)列進(jìn)行出隊(duì)的時(shí)候,進(jìn)行迭代器中的數(shù)據(jù)同步,保持隊(duì)列中的元素和迭代器的元素是一致的。 */ void takeIndexWrapped() {} }
//從這里知道,在ArrayBlockingQueue對(duì)象中調(diào)用此方法,才會(huì)生成這個(gè)對(duì)象 //那么就可以理解為,只要并未調(diào)用此方法,則ArrayBlockingQueue對(duì)象中的Itrs對(duì)象則為空 public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { Itr() { //這里就是生產(chǎn)它的地方 //count等于0的時(shí)候,創(chuàng)建的這個(gè)迭代器是個(gè)無用的迭代器,可以直接移除,進(jìn)入detach模式。 //否則就把當(dāng)前隊(duì)列的讀取位置給迭代器當(dāng)做下一個(gè)元素,cursor存儲(chǔ)下個(gè)元素的位置。 if (count == 0) { // assert itrs == null; cursor = NONE; nextIndex = NONE; prevTakeIndex = DETACHED; } else { final int takeIndex = ArrayBlockingQueue.this.takeIndex; prevTakeIndex = takeIndex; nextItem = itemAt(nextIndex = takeIndex); cursor = incCursor(takeIndex); if (itrs == null) { itrs = new Itrs(this); } else { itrs.register(this); // in this order itrs.doSomeSweeping(false); } prevCycles = itrs.cycles; // assert takeIndex >= 0; // assert prevTakeIndex == takeIndex; // assert nextIndex >= 0; // assert nextItem != null; } } }
代碼演示
package com.rumenz.task; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @className: BlockingQuqueExample * @description: TODO 類描述 * @author: mac * @date: 2021/1/20 **/ public class BlockingQueueExample { private static volatile Boolean flag=false; public static void main(String[] args) { BlockingQueue blockingQueue=new ArrayBlockingQueue(1024); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(()->{ try{ blockingQueue.put(1); Thread.sleep(2000); blockingQueue.put(3); flag=true; }catch (Exception e){ e.printStackTrace(); } }); executorService.execute(()->{ try { while (!flag){ Integer i = (Integer) blockingQueue.take(); System.out.println(i); } }catch (Exception e){ e.printStackTrace(); } }); executorService.shutdown(); } }
LinkedBlockingQueue
基于鏈表的阻塞隊(duì)列,通 ArrayBlockingQueue 類似,其內(nèi)部也維護(hù)這一個(gè)數(shù)據(jù)緩沖隊(duì)列(該隊(duì)列由一個(gè)鏈表構(gòu)成),當(dāng)生產(chǎn)者往隊(duì)列放入一個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手上獲取數(shù)據(jù),并緩存在隊(duì)列的內(nèi)部,而生產(chǎn)者立即返回,只有當(dāng)隊(duì)列緩沖區(qū)到達(dá)最大值容量時(shí)(LinkedBlockingQueue可以通過構(gòu)造函數(shù)指定該值),才會(huì)阻塞隊(duì)列,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù),生產(chǎn)者會(huì)被喚醒,反之對(duì)于消費(fèi)者這端的處理也基于同樣的原理。
LinkedBlockingQueue 之所以能夠高效的處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵?duì)于生產(chǎn)者和消費(fèi)者端分別采用了獨(dú)立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行的操作隊(duì)列中的數(shù)據(jù),以調(diào)高整個(gè)隊(duì)列的并發(fā)能力。
如果構(gòu)造一個(gè) LinkedBlockingQueue 對(duì)象,而沒有指定容量大小, LinkedBlockingQueue 會(huì)默認(rèn)一個(gè)類似無限大小的容量 Integer.MAX_VALUE ,這樣的話,如果生產(chǎn)者的速度一旦大于消費(fèi)者的速度,也許還沒有等到隊(duì)列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已經(jīng)被消耗殆盡了。
LinkedBlockingQueue 是一個(gè)使用鏈表完成隊(duì)列操作的阻塞隊(duì)列。鏈表是單向鏈表,而不是雙向鏈表。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //隊(duì)列的容量,指定大小或?yàn)槟J(rèn)值Integer.MAX_VALUE private final int capacity; //元素的數(shù)量 private final AtomicInteger count = new AtomicInteger(); //隊(duì)列頭節(jié)點(diǎn),始終滿足head.item==null transient Node<E> head; //隊(duì)列的尾節(jié)點(diǎn),始終滿足last.next==null private transient Node<E> last; /** Lock held by take, poll, etc */ //出隊(duì)的鎖:take, poll, peek 等讀操作的方法需要獲取到這個(gè)鎖 private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ //當(dāng)隊(duì)列為空時(shí),保存執(zhí)行出隊(duì)的線程:如果讀操作的時(shí)候隊(duì)列是空的,那么等待 notEmpty 條件 private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ //入隊(duì)的鎖:put, offer 等寫操作的方法需要獲取到這個(gè)鎖 private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ //當(dāng)隊(duì)列滿時(shí),保存執(zhí)行入隊(duì)的線程:如果寫操作的時(shí)候隊(duì)列是滿的,那么等待 notFull 條件 private final Condition notFull = putLock.newCondition(); //傳說中的無界隊(duì)列 public LinkedBlockingQueue() {} //傳說中的有界隊(duì)列 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } //傳說中的無界隊(duì)列 public LinkedBlockingQueue(Collection<? extends E> c){} /** * 鏈表節(jié)點(diǎn)類 */ static class Node<E> { E item; /** * One of: * - 真正的繼任者節(jié)點(diǎn) * - 這個(gè)節(jié)點(diǎn),意味著繼任者是head.next * - 空,意味著沒有后繼者(這是最后一個(gè)節(jié)點(diǎn)) */ Node<E> next; Node(E x) { item = x; } } }
通過其構(gòu)造函數(shù),得知其可以當(dāng)做無界隊(duì)列也可以當(dāng)做有界隊(duì)列來使用。
這里用了兩把鎖分別是 takeLock 和 putLock ,而 Condition 分別是 notEmpty 和 notFull ,它們是這樣搭配的。
takeLock
putLock
從上面的構(gòu)造函數(shù)中可以看到,這里會(huì)初始化一個(gè)空的頭結(jié)點(diǎn),那么第一個(gè)元素入隊(duì)的時(shí)候,隊(duì)列中就會(huì)有兩個(gè)元素。讀取元素時(shí),也是獲取頭結(jié)點(diǎn)后面的一個(gè)元素。count的計(jì)數(shù)值不包含這個(gè)頭結(jié)點(diǎn)。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 將指定元素插入到此隊(duì)列的尾部,如有必要,則等待空間變得可用。 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 如果你糾結(jié)這里為什么是 -1,可以看看 offer 方法。這就是個(gè)標(biāo)識(shí)成功、失敗的標(biāo)志而已。 int c = -1; //包裝成node節(jié)點(diǎn) Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //獲取鎖定 putLock.lockInterruptibly(); try { /** 如果隊(duì)列滿,等待 notFull 的條件滿足。 */ while (count.get() == capacity) { notFull.await(); } //入隊(duì) enqueue(node); //原子性自增 c = count.getAndIncrement(); // 如果這個(gè)元素入隊(duì)后,還有至少一個(gè)槽可以使用,調(diào)用 notFull.signal() 喚醒等待線程。 // 哪些線程會(huì)等待在 notFull 這個(gè) Condition 上呢? if (c + 1 < capacity) notFull.signal(); } finally { //解鎖 putLock.unlock(); } // 如果 c == 0,那么代表隊(duì)列在這個(gè)元素入隊(duì)前是空的(不包括head空節(jié)點(diǎn)), // 那么所有的讀線程都在等待 notEmpty 這個(gè)條件,等待喚醒,這里做一次喚醒操作 if (c == 0) signalNotEmpty(); } /** 鏈接節(jié)點(diǎn)在隊(duì)列末尾 */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; // 入隊(duì)的代碼非常簡(jiǎn)單,就是將 last 屬性指向這個(gè)新元素,并且讓原隊(duì)尾的 next 指向這個(gè)元素 //last.next = node; //last = node; // 這里入隊(duì)沒有并發(fā)問題,因?yàn)橹挥蝎@取到 putLock 獨(dú)占鎖以后,才可以進(jìn)行此操作 last = last.next = node; } /** * 等待PUT信號(hào) * 僅在 take/poll 中調(diào)用 * 也就是說:元素入隊(duì)后,如果需要,則會(huì)調(diào)用這個(gè)方法喚醒讀線程來讀 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal();//喚醒 } finally { putLock.unlock(); } } }
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //首先,需要獲取到 takeLock 才能進(jìn)行出隊(duì)操作 takeLock.lockInterruptibly(); try { // 如果隊(duì)列為空,等待 notEmpty 這個(gè)條件滿足再繼續(xù)執(zhí)行 while (count.get() == 0) { notEmpty.await(); } //// 出隊(duì) x = dequeue(); //count 進(jìn)行原子減 1 c = count.getAndDecrement(); // 如果這次出隊(duì)后,隊(duì)列中至少還有一個(gè)元素,那么調(diào)用 notEmpty.signal() 喚醒其他的讀線程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } /** * 出隊(duì) */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } }
ArrayBlockingQueue和LinkedBlockingQueue間還有一個(gè)明顯的不同之處在于,前者在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷毀任何額外的對(duì)象實(shí)例,而后者則會(huì)生成一個(gè)額外的Node對(duì)象。這在長時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對(duì)于GC的影響還是存在一定的區(qū)別。
LinkedBlockingQueue 實(shí)現(xiàn)一個(gè)線程添加文件對(duì)象,四個(gè)線程讀取文件對(duì)象
package concurrent; import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class TestBlockingQueue { static long randomTime() { return (long) (Math.random() * 1000); } public static void main(String[] args) { // 能容納100個(gè)文件 final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100); // 線程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("F:\\JavaLib"); // 完成標(biāo)志 final File exitFile = new File(""); // 讀個(gè)數(shù) final AtomicInteger rc = new AtomicInteger(); // 寫個(gè)數(shù) final AtomicInteger wc = new AtomicInteger(); // 讀線程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".java"); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四個(gè)寫線程 for (int index = 0; index < 4; index++) { // write thread final int NO = index; Runnable write = new Runnable() { String threadName = "Write" + NO; public void run() { while (true) { try { Thread.sleep(randomTime()); int index = wc.incrementAndGet(); File file = queue.take(); // 隊(duì)列已經(jīng)無對(duì)象 if (file == exitFile) { // 再次添加"標(biāo)志",以讓其他線程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); } }
關(guān)于怎么在Java高并發(fā)中實(shí)現(xiàn)一個(gè)BlockingQueue功能就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。