溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列

發(fā)布時(shí)間:2021-06-18 15:25:31 來(lái)源:億速云 閱讀:258 作者:Leah 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列

使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列

使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列

使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列

使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列

package com.shi.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 阻塞隊(duì)列
 * @author shiye
 *
 */
public class TestBlockQueue {

	public static void main(String[] args) throws InterruptedException {
		//定義一個(gè)長(zhǎng)度為3的阻塞隊(duì)列
		BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
		
		System.out.println("----------------拋出異常的 情況----------------------");
//		blockingQueue.add("aa");
//		blockingQueue.add("bb");
//		blockingQueue.add("cc");
		//blockingQueue.add("dd");//如果隊(duì)列滿了 Exception java.lang.IllegalStateException: Queue full
		
//		System.out.println(blockingQueue.element());//檢查隊(duì)列頭的信息 : aa
		
//		blockingQueue.remove();
//		blockingQueue.remove();
//		blockingQueue.remove();
		//blockingQueue.remove();//如果隊(duì)列為空 Exception java.util.NoSuchElementException
		
		//System.out.println(blockingQueue.element());//如果隊(duì)列為空  Exception java.util.NoSuchElementException
		
		System.out.println("----------------返回true/false----------------------");
//		System.out.println(blockingQueue.offer("11"));//插入隊(duì)列	true
//		System.out.println(blockingQueue.offer("22"));//插入隊(duì)列	true
//		System.out.println(blockingQueue.offer("33"));//插入隊(duì)列	true
//		System.out.println(blockingQueue.offer("44"));//插入隊(duì)列	false
//		
//		System.out.println(blockingQueue.peek());//檢查隊(duì)列頭元素  11
//		
//		System.out.println(blockingQueue.poll());//輸出隊(duì)列	11
//		System.out.println(blockingQueue.poll());//輸出隊(duì)列	22
//		System.out.println(blockingQueue.poll());//輸出隊(duì)列	33
//		System.out.println(blockingQueue.poll());//輸出隊(duì)列	null
		
		System.out.println("----------------隊(duì)列阻塞等待----------------------");
//		blockingQueue.put("zhangsan");
//		blockingQueue.put("lisi");
//		blockingQueue.put("wangwu");
//		//blockingQueue.put("shiye");//線程一直等待無(wú)法關(guān)閉
//		
//		blockingQueue.take();
//		blockingQueue.take();
//		blockingQueue.take();
		//blockingQueue.take();//線程一直等待  無(wú)法響應(yīng)
		
		System.out.println("----------------隊(duì)列等待一定時(shí)間之后就退出----------------------");
		System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true
		System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true
		System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//true
		System.out.println(blockingQueue.offer("aa", 2, TimeUnit.SECONDS));//false 等待2s鐘之后就退出
		
		
	}

}

SynchronousQueue

package com.shi.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

/**
 * 不存儲(chǔ)數(shù)據(jù)的隊(duì)列,即生產(chǎn)一個(gè)消費(fèi)一個(gè)的隊(duì)列
 * @author shiye
 *
 *結(jié)果:
	AA	 存放1 ...
	BB	 get 1
	AA	 存放2 ...
	BB	 get 2
	AA	 存放3 ...
	BB	 get 3
 */
public class TestSynchroniousQueue {

	public static void main(String[] args) {
		BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
		
		new Thread(()-> {
			try {
				System.out.println(Thread.currentThread().getName()+ "\t 存放1 ..." );
				blockingQueue.put("1");
				
				System.out.println(Thread.currentThread().getName()+ "\t 存放2 ..." );
				blockingQueue.put("2");
				
				System.out.println(Thread.currentThread().getName()+ "\t 存放3 ..." );
				blockingQueue.put("3");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		},"AA").start();
		
		
		new Thread(()-> {
			try {
				Thread.sleep(5000);//睡眠5秒
				System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take());
			
				Thread.sleep(5000);//睡眠5秒
				System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take());
				
				Thread.sleep(5000);//睡眠5秒
				System.out.println(Thread.currentThread().getName() + "\t get " + blockingQueue.take());

			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		},"BB").start();
		
	}

}

綜合案例(使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問(wèn)題)

package com.shi.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 	通過(guò)阻塞隊(duì)列的方式 實(shí)現(xiàn)  生產(chǎn)者 消費(fèi)者 問(wèn)題
 * @author shiye
 *	使用到的技術(shù):
 *	countDownLatch:閉鎖
 *	volatile 自旋鎖
 *	AtomicInteger 原子整型
 *	BlockingQueue 阻塞隊(duì)列
 *	
 */
public class TestProducterAndConsumterByQueue {

	public static void main(String[] args) throws InterruptedException {
		
		//閉鎖
		final CountDownLatch countDownLatch = new CountDownLatch(11);
		
		Check check = new Check(new ArrayBlockingQueue<>(3));
		
		//創(chuàng)建線程生產(chǎn) (啟動(dòng)10個(gè)線程去生產(chǎn))
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				System.out.println(Thread.currentThread().getName() + "\t 生產(chǎn)者啟動(dòng)...");
				try {
					check.productor("aaa");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				countDownLatch.countDown();//線程數(shù)量減一
			},"AA-"+i).start();
		}
		
		
		
		//創(chuàng)建1 個(gè)線程消費(fèi)
		new Thread(()->{
			System.out.println(Thread.currentThread().getName() + "\t 消費(fèi)者啟動(dòng)...");
			try {
				check.consumter();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			countDownLatch.countDown();//線程數(shù)量減一
		},"BB").start();
		
		Thread.sleep(5000);//等待5秒 停止
		check.stop();
		
		countDownLatch.await();//等待上面的線程全部執(zhí)行完畢,才檢查產(chǎn)品數(shù)量
		System.out.println("5s之后線程停止,總共生產(chǎn)了:"+ check.getTotle() +"件產(chǎn)品");
	}

}

//店員
class Check{
	private volatile boolean FLAG = true;//標(biāo)志位
	private AtomicInteger atomicInteger = new AtomicInteger();//統(tǒng)計(jì)總數(shù)的變量
	
	private BlockingQueue<Object> blockingQueue = null;//定義一個(gè)阻塞隊(duì)列
	
	public Check(BlockingQueue<Object> blockingQueue) {
		this.blockingQueue = blockingQueue;
		System.out.println("創(chuàng)建一個(gè) "+blockingQueue.getClass().getName()+" 實(shí)例");
	}
	
	//生產(chǎn)者
	public void productor(String num) throws InterruptedException {
		while(FLAG) {
			System.out.println( Thread.currentThread().getName() + "\t 生產(chǎn)者生產(chǎn)數(shù)據(jù):" + num + "到隊(duì)列中...");
			blockingQueue.offer(num,2l,TimeUnit.SECONDS); //延遲2s插入數(shù)據(jù)到隊(duì)列中。。
			Thread.sleep(1000);//線程睡眠1s
			atomicInteger.getAndIncrement();//讓總數(shù)自加1
		}
	}
	
	//消費(fèi)者
	public void consumter() throws InterruptedException {
		while(FLAG) {
			Object object = blockingQueue.poll(2, TimeUnit.SECONDS);//最多消費(fèi)延遲2s
			if(object != null) {
				System.out.println( Thread.currentThread().getName() + "\t 消費(fèi)者消費(fèi)數(shù)據(jù):" + object);
			}
		}
	}
	
	//停止
	public void stop() {
		FLAG = false;
	}
	
	public int getTotle() {
		return atomicInteger.get();
	}
}

關(guān)于使用BlockingQueue怎么實(shí)現(xiàn)阻塞隊(duì)列就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI