溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析

發(fā)布時(shí)間:2021-05-12 11:09:30 來源:億速云 閱讀:179 作者:小新 欄目:編程語言

這篇文章將為大家詳細(xì)講解有關(guān)Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

一、兩個(gè)線程一個(gè)生產(chǎn)者一個(gè)消費(fèi)者

需求情景

兩個(gè)線程,一個(gè)負(fù)責(zé)生產(chǎn),一個(gè)負(fù)責(zé)消費(fèi),生產(chǎn)者生產(chǎn)一個(gè),消費(fèi)者消費(fèi)一個(gè)。

涉及問題

  • 同步問題:如何保證同一資源被多個(gè)線程并發(fā)訪問時(shí)的完整性。常用的同步方法是采用標(biāo)記或加鎖機(jī)制。

  • wait() / nofity() 方法是基類Object的兩個(gè)方法,也就意味著所有Java類都會(huì)擁有這兩個(gè)方法,這樣,我們就可以為任何對(duì)象實(shí)現(xiàn)同步機(jī)制。

  • wait()方法:當(dāng)緩沖區(qū)已滿/空時(shí),生產(chǎn)者/消費(fèi)者線程停止自己的執(zhí)行,放棄鎖,使自己處于等待狀態(tài),讓其他線程執(zhí)行。

  • notify()方法:當(dāng)生產(chǎn)者/消費(fèi)者向緩沖區(qū)放入/取出一個(gè)產(chǎn)品時(shí),向其他等待的線程發(fā)出可執(zhí)行的通知,同時(shí)放棄鎖,使自己處于等待狀態(tài)。

代碼實(shí)現(xiàn)(共三個(gè)類和一個(gè)main方法的測試類)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*資源序號(hào)*/
  private int number = 0;
  /*資源標(biāo)記*/
  private boolean flag = false;

  /**
   * 生產(chǎn)資源
   */
  public synchronized void create() {
    if (flag) {//先判斷標(biāo)記是否已經(jīng)生產(chǎn)了,如果已經(jīng)生產(chǎn),等待消費(fèi);
      try {
        wait();//讓生產(chǎn)線程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生產(chǎn)一個(gè)
    System.out.println(Thread.currentThread().getName() + "生產(chǎn)者------------" + number);
    flag = true;//將資源標(biāo)記為已經(jīng)生產(chǎn)
    notify();//喚醒在等待操作資源的線程(隊(duì)列)
  }

  /**
   * 消費(fèi)資源
   */
  public synchronized void destroy() {
    if (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消費(fèi)者****" + number);

    flag = false;
    notify();
  }
}

Producer.java

package com.demo.ProducerConsumer;

/**
 * 生產(chǎn)者
 * @author lixiaoxi
 *
 */
public class Producer implements Runnable{

  private Resource resource;

  public Producer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.create();
    }

  }
}

Consumer.java

package com.demo.ProducerConsumer;

/**
 * 消費(fèi)者
 * @author lixiaoxi
 *
 */
public class Consumer implements Runnable{

  private Resource resource;

  public Consumer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.destroy();
    }

  }
}

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生產(chǎn)者線程
    new Thread(new Consumer(resource)).start();//消費(fèi)者線程

  }
}

打印結(jié)果:

Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析

以上打印結(jié)果可以看出沒有任何問題。

二、多個(gè)線程,多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者的問題

需求情景

四個(gè)線程,兩個(gè)個(gè)負(fù)責(zé)生產(chǎn),兩個(gè)個(gè)負(fù)責(zé)消費(fèi),生產(chǎn)者生產(chǎn)一個(gè),消費(fèi)者消費(fèi)一個(gè)。

涉及問題

notifyAll()方法:當(dāng)生產(chǎn)者/消費(fèi)者向緩沖區(qū)放入/取出一個(gè)產(chǎn)品時(shí),向其他等待的所有線程發(fā)出可執(zhí)行的通知,同時(shí)放棄鎖,使自己處于等待狀態(tài)。

再次測試代碼

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生產(chǎn)者線程
    new Thread(new Producer(resource)).start();//生產(chǎn)者線程
    new Thread(new Consumer(resource)).start();//消費(fèi)者線程
    new Thread(new Consumer(resource)).start();//消費(fèi)者線程

  }
}

運(yùn)行結(jié)果:

Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析

Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析

通過以上打印結(jié)果發(fā)現(xiàn)問題

147生產(chǎn)了一次,消費(fèi)了兩次。169生產(chǎn)了,而沒有消費(fèi)。

原因分析

當(dāng)兩個(gè)線程同時(shí)操作生產(chǎn)者生產(chǎn)或者消費(fèi)者消費(fèi)時(shí),如果有生產(chǎn)者或消費(fèi)者的兩個(gè)線程都wait()時(shí),再次notify(),由于其中一個(gè)線程已經(jīng)改變了標(biāo)記而另外一個(gè)線程再次往下直接執(zhí)行的時(shí)候沒有判斷標(biāo)記而導(dǎo)致的。if判斷標(biāo)記,只有一次,會(huì)導(dǎo)致不該運(yùn)行的線程運(yùn)行了。出現(xiàn)了數(shù)據(jù)錯(cuò)誤的情況。

解決方案

while判斷標(biāo)記,解決了線程獲取執(zhí)行權(quán)后,是否要運(yùn)行!也就是每次wait()后再notify()時(shí)先再次判斷標(biāo)記。

代碼改進(jìn)(Resource中的 if -> while)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*資源序號(hào)*/
  private int number = 0;
  /*資源標(biāo)記*/
  private boolean flag = false;

  /**
   * 生產(chǎn)資源
   */
  public synchronized void create() {
    while (flag) {//先判斷標(biāo)記是否已經(jīng)生產(chǎn)了,如果已經(jīng)生產(chǎn),等待消費(fèi);
      try {
        wait();//讓生產(chǎn)線程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生產(chǎn)一個(gè)
    System.out.println(Thread.currentThread().getName() + "生產(chǎn)者------------" + number);
    flag = true;//將資源標(biāo)記為已經(jīng)生產(chǎn)
    notify();//喚醒在等待操作資源的線程(隊(duì)列)
  }

  /**
   * 消費(fèi)資源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消費(fèi)者****" + number);

    flag = false;
    notify();
  }
}

運(yùn)行結(jié)果:

Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析

再次發(fā)現(xiàn)問題

打印到某個(gè)值比如生產(chǎn)完187,程序運(yùn)行卡死了,好像鎖死了一樣。

原因分析

notify:只能喚醒一個(gè)線程,如果本方喚醒了本方,沒有意義。而且while判斷標(biāo)記+notify會(huì)導(dǎo)致”死鎖”。

解決方案

notifyAll解決了本方線程一定會(huì)喚醒對(duì)方線程的問題。

最后代碼改進(jìn)(Resource中的 notify() -> notifyAll())

Resource.java

package com.demo.ProducerConsumer;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*資源序號(hào)*/
  private int number = 0;
  /*資源標(biāo)記*/
  private boolean flag = false;

  /**
   * 生產(chǎn)資源
   */
  public synchronized void create() {
    while (flag) {//先判斷標(biāo)記是否已經(jīng)生產(chǎn)了,如果已經(jīng)生產(chǎn),等待消費(fèi);
      try {
        wait();//讓生產(chǎn)線程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生產(chǎn)一個(gè)
    System.out.println(Thread.currentThread().getName() + "生產(chǎn)者------------" + number);
    flag = true;//將資源標(biāo)記為已經(jīng)生產(chǎn)
    notifyAll();//喚醒在等待操作資源的線程(隊(duì)列)
  }

  /**
   * 消費(fèi)資源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消費(fèi)者****" + number);

    flag = false;
    notifyAll();
  }
}

運(yùn)行結(jié)果:

Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析

以上就大功告成了,沒有任何問題。

再來梳理一下整個(gè)流程。按照示例,生產(chǎn)者消費(fèi)者交替運(yùn)行,每次生產(chǎn)后都有對(duì)應(yīng)的消費(fèi)者,測試類創(chuàng)建實(shí)例,如果是生產(chǎn)者先運(yùn)行,進(jìn)入run()方法,進(jìn)入create()方法,flag默認(rèn)為false,number+1,生產(chǎn)者生產(chǎn)一個(gè)產(chǎn)品,flag置為true,同時(shí)調(diào)用notifyAll()方法,喚醒所有正在等待的線程,接下來如果還是生產(chǎn)者運(yùn)行呢?這是flag為true,進(jìn)入while循環(huán),執(zhí)行wait()方法,接下來如果是消費(fèi)者運(yùn)行的話,調(diào)用destroy()方法,這時(shí)flag為true,消費(fèi)者購買了一次產(chǎn)品,隨即將flag置為false,并喚醒所有正在等待的線程。這就是一次完整的多生產(chǎn)者對(duì)應(yīng)多消費(fèi)者的問題。

三、使用Lock和Condition來解決生產(chǎn)者消費(fèi)者問題

上面的代碼有一個(gè)問題,就是我們?yōu)榱吮苊馑械木€程都處于等待的狀態(tài),使用了notifyAll方法來喚醒所有的線程,即notifyAll喚醒的是自己方和對(duì)方線程。如果我需要只是喚醒對(duì)方的線程,比如:生產(chǎn)者只能喚醒消費(fèi)者的線程,消費(fèi)者只能喚醒生產(chǎn)者的線程。

在jdk1.5當(dāng)中為我們提供了多線程的升級(jí)解決方案:

1. 將同步synchronized替換成了Lock操作。

2. 將Object中的wait,notify,notifyAll方法替換成了Condition對(duì)象。

3. 可以只喚醒對(duì)方的線程。

完整代碼:

Resource1.java

package com.demo.ProducerConsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 資源
 * @author lixiaoxi
 *
 */
public class Resource1 {

  /*資源序號(hào)*/
  private int number = 0;
  /*資源標(biāo)記*/
  private boolean flag = false;
  
  private Lock lock = new ReentrantLock();
  //使用lock建立生產(chǎn)者的condition對(duì)象
  private Condition condition_pro = lock.newCondition(); 
  //使用lock建立消費(fèi)者的condition對(duì)象
  private Condition condition_con = lock.newCondition(); 


  /**
   * 生產(chǎn)資源
   */
  public void create() throws InterruptedException {
    
    try{
      lock.lock();
      //先判斷標(biāo)記是否已經(jīng)生產(chǎn)了,如果已經(jīng)生產(chǎn),等待消費(fèi)
      while(flag){
        //生產(chǎn)者等待
        condition_pro.await();
      }
      //生產(chǎn)一個(gè)
      number++;
      System.out.println(Thread.currentThread().getName() + "生產(chǎn)者------------" + number);
      //將資源標(biāo)記為已經(jīng)生產(chǎn)
      flag = true;
      //生產(chǎn)者生產(chǎn)完畢后,喚醒消費(fèi)者的線程(注意這里不是signalAll)
      condition_con.signal();
    }finally{
      lock.unlock();
    }
  }

  /**
   * 消費(fèi)資源
   */
  public void destroy() throws InterruptedException{

    try{
      lock.lock();
      //先判斷標(biāo)記是否已經(jīng)消費(fèi)了,如果已經(jīng)消費(fèi),等待生產(chǎn)
      while(!flag){
        //消費(fèi)者等待
        condition_con.await();
      }
      
      System.out.println(Thread.currentThread().getName() + "消費(fèi)者****" + number);
      //將資源標(biāo)記為已經(jīng)消費(fèi)
      flag = false;
      //消費(fèi)者消費(fèi)完畢后,喚醒生產(chǎn)者的線程
      condition_pro.signal();
    }finally{
      lock.unlock();
    }
  }
}

Producer1.java

package com.demo.ProducerConsumer;

/**
 * 生產(chǎn)者
 * @author lixiaoxi
 *
 */
public class Producer1 implements Runnable{

  private Resource1 resource;

  public Producer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.create();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  
}

Consumer1.java

package com.demo.ProducerConsumer;

/**
 * 消費(fèi)者
 * @author lixiaoxi
 *
 */
public class Consumer1 implements Runnable{

  private Resource1 resource;

  public Consumer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.destroy();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  
}

ProducerConsumerTest1.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest1 {

  public static void main(String args[]) {
    Resource1 resource = new Resource1();
    new Thread(new Producer1(resource)).start();//生產(chǎn)者線程
    new Thread(new Producer1(resource)).start();//生產(chǎn)者線程
    new Thread(new Consumer1(resource)).start();//消費(fèi)者線程
    new Thread(new Consumer1(resource)).start();//消費(fèi)者線程

  }
}

運(yùn)行結(jié)果:

Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析

四、總結(jié)

1、如果生產(chǎn)者、消費(fèi)者都是1個(gè),那么flag標(biāo)記可以用if判斷。這里有多個(gè),必須用while判斷。

2、在while判斷的同時(shí),notify函數(shù)可能喚醒本類線程(如一個(gè)消費(fèi)者喚醒另一個(gè)消費(fèi)者),這會(huì)導(dǎo)致所有消費(fèi)者忙等待,程序無法繼續(xù)往下執(zhí)行。使用notifyAll函數(shù)代替notify可以解決這個(gè)問題,notifyAll可以保證非本類線程被喚醒(消費(fèi)者線程能喚醒生產(chǎn)者線程,反之也可以),解決了忙等待問題。

小心假死

生產(chǎn)者/消費(fèi)者模型最終達(dá)到的目的是平衡生產(chǎn)者和消費(fèi)者的處理能力,達(dá)到這個(gè)目的的過程中,并不要求只有一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者??梢远鄠€(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者,可以一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,可以多個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者。

假死就發(fā)生在上面三種場景下。假死指的是全部線程都進(jìn)入了WAITING狀態(tài),那么程序就不再執(zhí)行任何業(yè)務(wù)功能了,整個(gè)項(xiàng)目呈現(xiàn)停滯狀態(tài)。

比方說有生產(chǎn)者A和生產(chǎn)者B,緩沖區(qū)由于空了,消費(fèi)者處于WAITING。生產(chǎn)者B處于WAITING,生產(chǎn)者A被消費(fèi)者通知生產(chǎn),生產(chǎn)者A生產(chǎn)出來的產(chǎn)品本應(yīng)該通知消費(fèi)者,結(jié)果通知了生產(chǎn)者B,生產(chǎn)者B被喚醒,發(fā)現(xiàn)緩沖區(qū)滿了,于是繼續(xù)WAITING。至此,兩個(gè)生產(chǎn)者線程處于WAITING,消費(fèi)者處于WAITING,系統(tǒng)假死。

上面的分析可以看出,假死出現(xiàn)的原因是因?yàn)閚otify的是同類,所以非單生產(chǎn)者/單消費(fèi)者的場景,可以采取兩種方法解決這個(gè)問題:

(1)synchronized用notifyAll()喚醒所有線程、ReentrantLock用signalAll()喚醒所有線程。

(2)用ReentrantLock定義兩個(gè)Condition,一個(gè)表示生產(chǎn)者的Condition,一個(gè)表示消費(fèi)者的Condition,喚醒的時(shí)候調(diào)用相應(yīng)的Condition的signal()方法就可以了。

Java的特點(diǎn)有哪些

Java的特點(diǎn)有哪些 1.Java語言作為靜態(tài)面向?qū)ο缶幊陶Z言的代表,實(shí)現(xiàn)了面向?qū)ο罄碚?,允許程序員以優(yōu)雅的思維方式進(jìn)行復(fù)雜的編程。 2.Java具有簡單性、面向?qū)ο?、分布式、安全性、平臺(tái)獨(dú)立與可移植性、動(dòng)態(tài)性等特點(diǎn)。 3.使用Java可以編寫桌面應(yīng)用程序、Web應(yīng)用程序、分布式系統(tǒng)和嵌入式系統(tǒng)應(yīng)用程序等。

關(guān)于“Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI