您好,登錄后才能下訂單哦!
這篇文章主要講解了“Java編程生產(chǎn)者消費(fèi)者實(shí)現(xiàn)的方法有哪些”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Java編程生產(chǎn)者消費(fèi)者實(shí)現(xiàn)的方法有哪些”吧!
實(shí)現(xiàn)生產(chǎn)者消費(fèi)者的四種方式
一、最基礎(chǔ)的
二、java.util.concurrent.lock 中的 Lock 框架
三、阻塞隊(duì)列BlockingQueue的實(shí)現(xiàn)
Blockqueue 接口的一些方法
四、信號(hào)量 Semaphore 的實(shí)現(xiàn)
利用 wait() 和 notify() 方法實(shí)現(xiàn),當(dāng)緩沖區(qū)滿或?yàn)榭諘r(shí)都調(diào)用 wait() 方法等待,當(dāng)生產(chǎn)者生產(chǎn)了一個(gè)產(chǎn)品或消費(fèi)者消費(fèi)了一個(gè)產(chǎn)品后會(huì)喚醒所有線程;
package com.practice; public class testMain { private static Integer count = 0; private static final Integer FULL = 10; private static String LOCK = "lock"; public static void main(String[] args) { testMain testMain = new testMain(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i < 10; i++) { try{ Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } synchronized (LOCK){ while(count == FULL){//緩存空間滿了 try{ LOCK.wait();//線程阻塞 }catch (Exception e){ e.printStackTrace(); } } count++;//生產(chǎn)者 System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有"+count); LOCK.notifyAll();//喚醒所有線程 } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i < 10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } synchronized (LOCK){ while(count == 0){ try{ LOCK.wait(); }catch (Exception e){ } } count--; System.out.println(Thread.currentThread().getName() + "消費(fèi)者消費(fèi),目前總共有 "+count); LOCK.notifyAll();//喚醒所有線程 } } } } }
通過(guò)對(duì) lock 的 lock() 方法和 unlock() 方法實(shí)現(xiàn)對(duì)鎖的顯示控制,而 synchronize()
則是對(duì)鎖的隱形控制,可重入鎖也叫做遞歸鎖,指的是同一個(gè)線程外層函數(shù)獲得鎖之后,內(nèi)層遞歸函數(shù)仍然有獲取該鎖的代碼,但不受影響;
簡(jiǎn)單來(lái)說(shuō),該鎖維護(hù)這一個(gè)與獲取鎖相關(guān)的計(jì)數(shù)器,如果擁有鎖的某個(gè)線程再次得到鎖,那么獲計(jì)數(shù)器就加1,函數(shù)調(diào)用結(jié)束計(jì)數(shù)器就減1,然后鎖需要釋放兩次才能獲得真正釋放,已經(jīng)獲取鎖的線程進(jìn)入其他需要相同鎖的同步代碼塊不會(huì)被阻塞
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockTest { private static Integer count = 0; private static Integer FULL = 10; //創(chuàng)建一個(gè)鎖對(duì)象 private Lock lock = new ReentrantLock(); //創(chuàng)建兩個(gè)條件變量,一個(gè)為緩沖非滿,一個(gè)緩沖區(qū)非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args){ ReentrantLockTest testMain = new ReentrantLockTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } // 獲取鎖 lock.lock(); try { while (count == FULL) { try{ notFull.await(); }catch(InterruptedException e){ e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有" + count); }finally { lock.unlock(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); } catch (Exception e){ e.printStackTrace(); } lock.lock(); try{ while(count==0){ try{ notEmpty.await(); }catch (InterruptedException e){ e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消費(fèi)者消費(fèi),目前總共有 " + count); }finally { lock.unlock();//解鎖 } } } } }
被阻塞的情況主要分為如下兩種,BlockingQueue 是線程安全的
1,當(dāng)隊(duì)列滿了的時(shí)候進(jìn)行入隊(duì)操作;
2,當(dāng)隊(duì)列空的時(shí)候進(jìn)行出隊(duì)操作
四類方法分別對(duì)應(yīng)于:
1,ThrowsException,如果操作不能馬上進(jìn)行,則拋出異常;
2,SpecialValue 如果操作不能馬上進(jìn)行,將會(huì)返回一個(gè)特殊的值,true或false;
3,Blocks 操作被阻塞;
4,TimeOut 指定時(shí)間未執(zhí)行返回一個(gè)特殊值 true 或 false
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 使用 BlockQueue 實(shí)現(xiàn)生產(chǎn)者消費(fèi)模型 */ public class BlockQueueTest { public static Integer count = 0; //創(chuàng)建一個(gè)阻塞隊(duì)列 final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); public static void main(String[] args) { BlockQueueTest testMain = new BlockQueueTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } try{ blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有 " + count); }catch (InterruptedException e){ e.printStackTrace(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } try{ blockingQueue.take();//消費(fèi) count--; System.out.println(Thread.currentThread().getName() + " 消費(fèi)者消費(fèi),目前總共有 "+ count); }catch (InterruptedException e){ e.printStackTrace(); } } } } }
Semaphore (信號(hào)量) 用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,它通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。Java中的 Semaphone 維護(hù)了一個(gè)許可集,一開始設(shè)定這個(gè)許可集的數(shù)量,使用 acquire()
方法獲得一個(gè)許可,當(dāng)許可不足時(shí)會(huì)被阻塞,release()
添加一個(gè)許可。
下面代碼中,還加入了 mutex
信號(hào)量,維護(hù)消費(fèi)者和生產(chǎn)者之間的同步關(guān)系,保證生產(chǎn)者消費(fèi)者之間的交替進(jìn)行
import java.util.concurrent.Semaphore; public class SemaphoreTest { private static Integer count = 0; //創(chuàng)建三個(gè)信號(hào)量 final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1);//互斥鎖,控制共享數(shù)據(jù)的互斥訪問(wèn) public static void main(String[] args) { SemaphoreTest testMain = new SemaphoreTest(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); new Thread(testMain.new Producer()).start(); new Thread(testMain.new Consumer()).start(); } class Producer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch (InterruptedException e){ e.printStackTrace(); } try{ notFull.acquire();//獲取一個(gè)信號(hào)量 mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有 "+count); } catch (InterruptedException e){ e.printStackTrace(); } finally { mutex.release();//添加 notEmpty.release(); } } } } class Consumer implements Runnable{ @Override public void run(){ for (int i = 0; i <10; i++) { try{ Thread.sleep(3000); }catch(InterruptedException e){ e.printStackTrace(); } try{ notEmpty.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消費(fèi)者消費(fèi),目前總共有"+count); }catch (InterruptedException e){ e.printStackTrace(); }finally { mutex.release(); notFull.release(); } } } } }
感謝各位的閱讀,以上就是“Java編程生產(chǎn)者消費(fèi)者實(shí)現(xiàn)的方法有哪些”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Java編程生產(chǎn)者消費(fèi)者實(shí)現(xiàn)的方法有哪些這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。