溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么在Java多線程中實現(xiàn)一個通信模式

發(fā)布時間:2020-12-01 14:49:08 來源:億速云 閱讀:141 作者:Leah 欄目:開發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)怎么在Java多線程中實現(xiàn)一個通信模式,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

一、什么是Java線程的等待/通知模式

1、等待/通知模式概述

  首先先介紹下官方的一個正式的介紹:

  等待/通知機(jī)制,是指一個線程A調(diào)用了對象object的wait()方法進(jìn)入等待狀態(tài),而另一個線程B調(diào)用了對象object的notify或者notifyAll()方法,線程A收到通知后從對象O的wait()方法返回,進(jìn)而還行后續(xù)操作。

  而我的理解是(舉例說明):

  假設(shè)工廠里有兩條流水線,某個工作流程需要這兩個流水線配合完成,這兩個流水線分別是A和B,其中A負(fù)責(zé)準(zhǔn)備各種配件,B負(fù)責(zé)租裝配件之后產(chǎn)出輸出到工作臺。B的工作需要A的配件準(zhǔn)備充分,否則就會一直等待A準(zhǔn)備好配件,并且A準(zhǔn)備好配件后會通過一個開頭通知告訴B我已經(jīng)準(zhǔn)備好了,你那邊不用一直等待了,可以繼續(xù)執(zhí)行任務(wù)了。流程A與流程B就是對應(yīng)的線程A與線程B之間的通信,即可以理解為相互配合,具體也就是“”通知/等待“”機(jī)制!

2、需要注意的細(xì)節(jié)  

  那么,我們都知道超類Object有wait()方法與notify()/notifyAll()方法,在進(jìn)行正式代碼舉例之前,應(yīng)該先加深下對這三個方法的理解與一些細(xì)節(jié)(有一些細(xì)節(jié)確實容易被忽略)

  • 調(diào)用wait()方法,會釋放鎖(這一點(diǎn)我想大部分人都知道),線程狀態(tài)由RUNNING->WAITNG,當(dāng)前線程進(jìn)入對象等待隊列中;

  • 調(diào)用notify()/notifyAll()方法不會立馬釋放鎖(這一點(diǎn)我大家人也應(yīng)該知道,但是什么時候釋放鎖呢?--------請看下一條),notify()方法是將等待隊列中的線程移到同步隊列中,而notifyAll()則是全部移到同步隊列中,被移出的線程狀態(tài)WAITING-->BLOCKED;

  • 當(dāng)前調(diào)用notify()/notifyAll()的線程釋放鎖了才算釋放鎖,才有機(jī)會喚醒wait線程返回(為什么有才有機(jī)會返回呢?------繼續(xù)看下一條)

  • 從wait()返回的前提是必須獲得調(diào)用對象鎖,也就是說notify()與notifyAll()釋放鎖之后,wait()進(jìn)入BLOCKED狀態(tài),如果其他線程有競爭當(dāng)前鎖的話,wait線程繼續(xù)爭取鎖資格(不好理解的話,請看下面的代碼舉例)

  • 使用wait()、notify()、notifyAll()方法時需要先調(diào)對象加鎖(這可能是最容易忽視的點(diǎn)了,至于為什么,請先看了代碼之后,看本篇博文最后補(bǔ)充:wait()、notify()、notifyAll()加鎖的原因----防止線程即饑餓)

二、代碼舉例

1、結(jié)合代碼理解

結(jié)合上述的“工廠流程裝配配件并產(chǎn)出的例子”,我們有兩個線程(流水線)WaitThread與NotifyThread、其中WaitThread是被通知的任務(wù),完成主要的工作(組裝配件完成產(chǎn)品),需要時刻判斷標(biāo)志位(開關(guān));NotifyThread是需要通知的任務(wù),需要對WaitThread進(jìn)行“監(jiān)督通知”,兩個配合才能更好完成產(chǎn)品的組裝并輸出。

public class WaitNotify {

 static Object lock = new Object();
 static boolean flag = false;
 public static void main(String[] args) {
  new Thread(new WaitThread(), "WaitThread").start();
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  new Thread(new NotifyThread(), "NotifyThread").start();

 }

 /**
  * 流水線A,完成主要任務(wù)
  */
 static class WaitThread implements Runnable{
  @Override
  public void run() {
   // 獲取object對象鎖
   synchronized (lock){
    // 條件不滿足時一直在等,等另外的線程改變該條件,并通知該wait線程
    while (!flag){
     try {
      System.out.println(Thread.currentThread() + " is waiting, flag is "+flag);
      // wait()方法調(diào)用就會釋放鎖,當(dāng)前線程進(jìn)入等待隊列。
      lock.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    // TODO 條件已經(jīng)滿足,不繼續(xù)while,完成任務(wù)
    System.out.println(Thread.currentThread() + " is running, flag is "+flag);
   }
  }
 }
 /**
  * 流水線B,對開關(guān)進(jìn)行控制,并通知流水線A
  */
 static class NotifyThread implements Runnable{
  @Override
  public void run() {
   // 獲取等wait線程同一個object對象鎖
   synchronized (lock){
    flag = true;
    // 通知wait線程,我已經(jīng)改變了條件,你可以繼續(xù)返回執(zhí)行了(返回之后繼續(xù)判斷while)
    // 但是此時通知notify()操作并立即不會釋放鎖,而是要等當(dāng)前線程釋放鎖
    // TODO 我準(zhǔn)備好配件了,我需要通知全部的組裝流水線A.....
    lock.notifyAll();
    System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag);
   }
  }
 }
}

運(yùn)行main函數(shù),輸出:

Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[WaitThread,5,main] is running, flag is true

車床流水工作開啟,流水線的開關(guān)一開始是關(guān)閉的(flag=false),流水線B(NotifyThread)去開啟后,開始自動喚醒流水線A(WaitThread),整個流水線開始工作了......

  • Thread[WaitThread,5,main] is waiting, flag is false: 一開始流水線A發(fā)現(xiàn)自己沒有配件可租裝,所以等流水線A準(zhǔn)備好配件(這樣是不是覺得特別傻,哈哈哈,真正的流水線不會浪費(fèi)時間等的,而且會有很多條流水線B準(zhǔn)備配件的,這里只是舉例說明,望理解?。?;

  • Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true:流水線B準(zhǔn)備好了配件,開啟開關(guān)(flag=ture),并通知流水線A,讓流水線A開始工作;

  • Thread[WaitThread,5,main] is running, flag is true,流水線B收到了通知,再次檢查開關(guān)是否開啟了,開啟的話就開始返回繼續(xù)完成工作了。

其實結(jié)合上述我舉的例子還是很好理解的,下面是大概的一個粗略時序圖:

怎么在Java多線程中實現(xiàn)一個通信模式

2、擴(kuò)展理解----wait()返回的前提是獲得了鎖

上述已經(jīng)表達(dá)了這個注意的細(xì)節(jié):從wait()返回的前提是必須獲得調(diào)用對象鎖,我們再增加能競爭lock的同步代碼塊(紅字部分)。

public class WaitNotify {

 static Object lock = new Object();
 static boolean flag = false;
 public static void main(String[] args) {
  new Thread(new WaitThread(), "WaitThread").start();
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  new Thread(new NotifyThread(), "NotifyThread").start();
 }

 /**
  * 流水線A,完成主要任務(wù)
  */
 static class WaitThread implements Runnable{
  @Override
  public void run() {
   // 獲取object對象鎖
   synchronized (lock){
    // 條件不滿足時一直在等,等另外的線程改變該條件,并通知該wait線程
    while (!flag){
     try {
      System.out.println(Thread.currentThread() + " is waiting, flag is "+flag);
      // wait()方法調(diào)用就會釋放鎖,當(dāng)前線程進(jìn)入等待隊列。
      lock.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    // TODO 條件已經(jīng)滿足,不繼續(xù)while,完成任務(wù)
    System.out.println(Thread.currentThread() + " is running, flag is "+flag);
   }
  }
 }
 /**
  * 流水線B,對開關(guān)進(jìn)行控制,并通知流水線A
  */
 static class NotifyThread implements Runnable{
  @Override
  public void run() {
   // 獲取等wait線程同一個object對象鎖
   synchronized (lock){
    flag = true;
    // 通知wait線程,我已經(jīng)改變了條件,你可以繼續(xù)返回執(zhí)行了(返回之后繼續(xù)判斷while)
    // 但是此時通知notify()操作并立即不會釋放鎖,而是要等當(dāng)前線程釋放鎖
    // TODO 我準(zhǔn)備好配件了,我需要通知全部的組裝流水線A.....
    lock.notifyAll();
    System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag);
   }
   // 模擬跟流水線B競爭
   synchronized (lock){
    System.out.println(Thread.currentThread() + " hold lock again");
   }
  }
 }
}

輸出結(jié)果:

Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[NotifyThread,5,main] hold lock again
Thread[WaitThread,5,main] is running, flag is true 

其中第三條跟第四條順序可能會反著來的,這就是因為lock鎖可能被紅字部分的synchronized代碼塊競爭獲取(這樣wait()方法可能獲取不到lock鎖,不會返回),也可能被waitThread獲取從wait()方法返回。

Thread[WaitThread,5,main] is waiting, flag is false
Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true
Thread[WaitThread,5,main] is running, flag is true
Thread[NotifyThread,5,main] hold lock again

三、等待/通知模式的應(yīng)用

1、Thread.join()中源碼應(yīng)用

Thread.join()作用:當(dāng)線程A等待thread線程終止之后才從thread.join()返回, 每個線程終止的前提是前驅(qū)線程終止,每個線程等待前驅(qū)線程終止后,才從join方法返回,這里涉及了等待/通知機(jī)制(等待前驅(qū)線程結(jié)束,接收前驅(qū)線程結(jié)束通知)。

Thread.join()源碼中,使用while選好判斷前驅(qū)線程是否活著,如果前驅(qū)線程還活著就一直wait等待,當(dāng)然如果超時的話就直接返回。

public final synchronized void join(long millis)
  throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
      throw new IllegalArgumentException("timeout value is negative");
    }
    // 這里的while(){wait(millis)} 就是利用等待/通知中的等待模式,只不過加上了超時設(shè)置
    if (millis == 0) {
      // while循環(huán),當(dāng)線程還活著的時候就一直循環(huán)等待,直到線程終止
      while (isAlive()) {
        // wait等待
        wait(0);
      }
      // 條件滿足時返回
    } else {
      while (isAlive()) {
        long delay = millis - now;
        if (delay <= 0) {
          break;
        }
        wait(delay);
        now = System.currentTimeMillis() - base;
      }
    }
  }

2、其它的應(yīng)用

  線程池的本質(zhì)是使用一個線程安全的工作隊列連接工作者線程和客戶端線程,客戶端線程將任務(wù)放入工作隊列后便返回,而工作者線程則不斷地從工作隊列中取出工作并執(zhí)行。那么,在這里的等待/通知模式的應(yīng)用就是:

  工作隊列中線程job沒有的話也就是工作隊列為空的情況下,等待客戶端放入工作隊列線程任務(wù),并通知工作線程繼續(xù)從工作隊列中獲取線程執(zhí)行。

  注:關(guān)于線程池的應(yīng)用源碼這里不做介紹,因為一時也講不完(自己也還沒有完全消化),先簡單介紹下應(yīng)用到的地方還有概念。

  補(bǔ)充:其實數(shù)據(jù)庫的連接池也類似線程池這種工作流程,也會涉及等待/通知模式。

3、等待/通知范式

  介紹了那么多應(yīng)用,這種模式應(yīng)該有個統(tǒng)一的范式來套用。對的,必然是有的:

  對于等待者(也可以稱之為消費(fèi)者):

synchronized (對象lock) {
    while (條件不滿足) {
      對象.wait();
    }
    // TODO 處理邏輯
  }

  對于通知者(也可以稱之為生產(chǎn)者):

synchronized (對象lock) {
    while (條件滿足) {
      改變條件
      對象.notify();
    }
  }

  注意:實際開發(fā)中最好采用的是超時等待/通知模式,在thread.join()源碼方法中完美體現(xiàn)

四、wait()、notify()、notifyAll()使用前需要加鎖的原因----防止線程即饑餓

(1)其實根據(jù)wait()注意事項也能明白,wait()是釋放鎖的,那么不加鎖哪來釋放鎖!

(2)wait()與notify()或者notifyAll()必須是搭配一起使用的,否則線程調(diào)用object.wait()之后,沒有超時機(jī)制,也沒有調(diào)用notify()或者notifyAll()喚醒的話,就一直處于WAITING狀態(tài),造成調(diào)用wait()的線程一直都是饑餓狀態(tài)。

(3)由于第2條的,我們已知:即便我們使用了notify()或者notifyAll()去喚醒線程,但是沒有在適當(dāng)?shù)臅r機(jī)喚醒(比如調(diào)用wait()之前就喚醒了),那么仍然調(diào)用wait()線程處于WAITING狀態(tài),所以我們必須保證wait()方法要么不執(zhí)行,要么就執(zhí)行完在被喚醒。也就是下列代碼中1那里不能允許插入調(diào)用notify/notifyAll,自然而然就增加synchronized關(guān)鍵字,保證wait()操作整體執(zhí)行不被破壞!

synchronized (對象lock) {
    while (條件不滿足) {
      // 1 這里如果先執(zhí)行了notify/notifyAll方法,那么2執(zhí)行之后,該線程就一直WAITING
      對象.wait(); // 2
    }
    // TODO 處理邏輯
  }

用圖片展示執(zhí)行順序就是:

怎么在Java多線程中實現(xiàn)一個通信模式

(4)注意synchronized代碼塊中,代碼錯誤或者其它原因線程終止的話,沒有執(zhí)行到wait()方法的話,是會自動釋放鎖的,不必?fù)?dān)心會死鎖。

關(guān)于怎么在Java多線程中實現(xiàn)一個通信模式就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI