您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
package com.shi.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * 阻塞隊(duì)列 * @author shiye * */ public class TestBlockQueue { public static void main(String[] args) throws InterruptedException { //定義一個(gè)長(zhǎng)度為3的阻塞隊(duì)列 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println("----------------拋出異常的 情況----------------------"); // blockingQueue.add("aa"); // blockingQueue.add("bb"); // blockingQueue.add("cc"); //blockingQueue.add("dd");//如果隊(duì)列滿了 Exception java.lang.IllegalStateException: Queue full // System.out.println(blockingQueue.element());//檢查隊(duì)列頭的信息 : aa // blockingQueue.remove(); // blockingQueue.remove(); // blockingQueue.remove(); //blockingQueue.remove();//如果隊(duì)列為空 Exception java.util.NoSuchElementException //System.out.println(blockingQueue.element());//如果隊(duì)列為空 Exception java.util.NoSuchElementException System.out.println("----------------返回true/false----------------------"); // System.out.println(blockingQueue.offer("11"));//插入隊(duì)列 true // System.out.println(blockingQueue.offer("22"));//插入隊(duì)列 true // System.out.println(blockingQueue.offer("33"));//插入隊(duì)列 true // System.out.println(blockingQueue.offer("44"));//插入隊(duì)列 false // // System.out.println(blockingQueue.peek());//檢查隊(duì)列頭元素 11 // // System.out.println(blockingQueue.poll());//輸出隊(duì)列 11 // System.out.println(blockingQueue.poll());//輸出隊(duì)列 22 // System.out.println(blockingQueue.poll());//輸出隊(duì)列 33 // System.out.println(blockingQueue.poll());//輸出隊(duì)列 null System.out.println("----------------隊(duì)列阻塞等待----------------------"); // blockingQueue.put("zhangsan"); // blockingQueue.put("lisi"); // blockingQueue.put("wangwu"); // //blockingQueue.put("shiye");//線程一直等待無(wú)法關(guān)閉 // // blockingQueue.take(); // blockingQueue.take(); // blockingQueue.take(); //blockingQueue.take();//線程一直等待 無(wú)法響應(yīng) System.out.println("----------------隊(duì)列等待一定時(shí)間之后就退出----------------------"); System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//false 等待2s鐘之后就退出 } }
SynchronousQueue
package com.shi.queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; /** * 不存儲(chǔ)數(shù)據(jù)的隊(duì)列,即生產(chǎn)一個(gè)消費(fèi)一個(gè)的隊(duì)列 * @author shiye * *結(jié)果: AA 存放1 ... BB get 1 AA 存放2 ... BB get 2 AA 存放3 ... BB get 3 */ public class TestSynchroniousQueue { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(()-> { try { System.out.println(Thread.currentThread().getName()+ "\t 存放1 ..." ); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName()+ "\t 存放2 ..." ); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName()+ "\t 存放3 ..." ); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"AA").start(); new Thread(()-> { try { Thread.sleep(5000);//睡眠5秒 System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take()); Thread.sleep(5000);//睡眠5秒 System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take()); Thread.sleep(5000);//睡眠5秒 System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"BB").start(); } }
綜合案例(使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問(wèn)題)
package com.shi.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 通過(guò)阻塞隊(duì)列的方式 實(shí)現(xiàn) 生產(chǎn)者 消費(fèi)者 問(wèn)題 * @author shiye * 使用到的技術(shù): * countDownLatch:閉鎖 * volatile 自旋鎖 * AtomicInteger 原子整型 * BlockingQueue 阻塞隊(duì)列 * */ public class TestProducterAndConsumterByQueue { public static void main(String[] args) throws InterruptedException { //閉鎖 final CountDownLatch countDownLatch = new CountDownLatch(11); Check check = new Check(new ArrayBlockingQueue<>(3)); //創(chuàng)建線程生產(chǎn) (啟動(dòng)10個(gè)線程去生產(chǎn)) for (int i = 0; i < 10; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName() + "\t 生產(chǎn)者啟動(dòng)..."); try { check.productor("aaa"); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown();//線程數(shù)量減一 },"AA-"+i).start(); } //創(chuàng)建1 個(gè)線程消費(fèi) new Thread(()->{ System.out.println(Thread.currentThread().getName() + "\t 消費(fèi)者啟動(dòng)..."); try { check.consumter(); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown();//線程數(shù)量減一 },"BB").start(); Thread.sleep(5000);//等待5秒 停止 check.stop(); countDownLatch.await();//等待上面的線程全部執(zhí)行完畢,才檢查產(chǎn)品數(shù)量 System.out.println("5s之后線程停止,總共生產(chǎn)了:"+ check.getTotle() +"件產(chǎn)品"); } } //店員 class Check{ private volatile boolean FLAG = true;//標(biāo)志位 private AtomicInteger atomicInteger = new AtomicInteger();//統(tǒng)計(jì)總數(shù)的變量 private BlockingQueue<Object> blockingQueue = null;//定義一個(gè)阻塞隊(duì)列 public Check(BlockingQueue<Object> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println("創(chuàng)建一個(gè) "+blockingQueue.getClass().getName()+" 實(shí)例"); } //生產(chǎn)者 public void productor(String num) throws InterruptedException { while(FLAG) { System.out.println( Thread.currentThread().getName() + "\t 生產(chǎn)者生產(chǎn)數(shù)據(jù):" + num + "到隊(duì)列中..."); blockingQueue.offer(num,2l,TimeUnit.SECONDS); //延遲2s插入數(shù)據(jù)到隊(duì)列中。。 Thread.sleep(1000);//線程睡眠1s atomicInteger.getAndIncrement();//讓總數(shù)自加1 } } //消費(fèi)者 public void consumter() throws InterruptedException { while(FLAG) { Object object = blockingQueue.poll(2, TimeUnit.SECONDS);//最多消費(fèi)延遲2s if(object != null) { System.out.println( Thread.currentThread().getName() + "\t 消費(fèi)者消費(fèi)數(shù)據(jù):" + object); } } } //停止 public void stop() { FLAG = false; } public int getTotle() { return atomicInteger.get(); } }
關(guān)于使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。