溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java阻塞隊(duì)列的實(shí)現(xiàn)原理是什么

發(fā)布時(shí)間:2021-08-06 16:02:44 來源:億速云 閱讀:165 作者:Leah 欄目:編程語言

本篇文章給大家分享的是有關(guān)Java阻塞隊(duì)列的實(shí)現(xiàn)原理是什么,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

BlockingQueue接口提供了3個(gè)添加元素方法:

  • add:添加元素到隊(duì)列里,添加成功返回true,由于容量滿了添加失敗會(huì)拋出IllegalStateException異常;

  • offer:添加元素到隊(duì)列里,添加成功返回true,添加失敗返回false;

  • put:添加元素到隊(duì)列里,如果容量滿了會(huì)阻塞直到容量不滿。

3個(gè)刪除方法:

  • poll:刪除隊(duì)列頭部元素,如果隊(duì)列為空,返回null。否則返回元素;

  • remove:基于對(duì)象找到對(duì)應(yīng)的元素,并刪除。刪除成功返回true,否則返回false;

  • take:刪除隊(duì)列頭部元素,如果隊(duì)列為空,一直阻塞到隊(duì)列有元素并刪除。

常用的阻塞隊(duì)列具體類有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。

本文以ArrayBlockingQueue和LinkedBlockingQueue為例,分析它們的實(shí)現(xiàn)原理。

ArrayBlockingQueue

ArrayBlockingQueue的原理就是使用一個(gè)可重入鎖和這個(gè)鎖生成的兩個(gè)條件對(duì)象進(jìn)行并發(fā)控制(classic two-condition  algorithm)。

ArrayBlockingQueue是一個(gè)帶有長(zhǎng)度的阻塞隊(duì)列,初始化的時(shí)候必須要指定隊(duì)列長(zhǎng)度,且指定長(zhǎng)度之后不允許進(jìn)行修改。

它帶有的屬性如下:

// 存儲(chǔ)隊(duì)列元素的數(shù)組,是個(gè)循環(huán)數(shù)組  final Object[] items;     // 拿數(shù)據(jù)的索引,用于take,poll,peek,remove方法  int takeIndex;     // 放數(shù)據(jù)的索引,用于put,offer,add方法  int putIndex;     // 元素個(gè)數(shù)  int count;     // 可重入鎖  final ReentrantLock lock;  // notEmpty條件對(duì)象,由lock創(chuàng)建  private final Condition notEmpty;  // notFull條件對(duì)象,由lock創(chuàng)建  private final Condition notFull;

數(shù)據(jù)的添加

ArrayBlockingQueue有不同的幾個(gè)數(shù)據(jù)添加方法,add、offer、put方法。

add方法:

public boolean add(E e) {      if (offer(e))          return true;      else          throw new IllegalStateException("Queue full");  }

add方法內(nèi)部調(diào)用offer方法如下:

public boolean offer(E e) {      checkNotNull(e); // 不允許元素為空      final ReentrantLock lock = this.lock;      lock.lock(); // 加鎖,保證調(diào)用offer方法的時(shí)候只有1個(gè)線程      try {          if (count == items.length) // 如果隊(duì)列已滿              return false; // 直接返回false,添加失敗          else {              insert(e); // 數(shù)組沒滿的話調(diào)用insert方法              return true; // 返回true,添加成功          }      } finally {          lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用offer方法      }  }

insert方法如下:

private void insert(E x) {      items[putIndex] = x; // 元素添加到數(shù)組里      putIndex = inc(putIndex); // 放數(shù)據(jù)索引+1,當(dāng)索引滿了變成0      ++count; // 元素個(gè)數(shù)+1      notEmpty.signal(); // 使用條件對(duì)象notEmpty通知,比如使用take方法的時(shí)候隊(duì)列里沒有數(shù)據(jù),被阻塞。這個(gè)時(shí)候隊(duì)列insert了一條數(shù)據(jù),需要調(diào)用signal進(jìn)行通知  }

put方法:

public void put(E e) throws InterruptedException {      checkNotNull(e); // 不允許元素為空      final ReentrantLock lock = this.lock;      lock.lockInterruptibly(); // 加鎖,保證調(diào)用put方法的時(shí)候只有1個(gè)線程      try {          while (count == items.length) // 如果隊(duì)列滿了,阻塞當(dāng)前線程,并加入到條件對(duì)象notFull的等待隊(duì)列里              notFull.await(); // 線程阻塞并被掛起,同時(shí)釋放鎖          insert(e); // 調(diào)用insert方法      } finally {          lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用put方法      }  }

ArrayBlockingQueue的添加數(shù)據(jù)方法有add,put,offer這3個(gè)方法,總結(jié)如下:

add方法內(nèi)部調(diào)用offer方法,如果隊(duì)列滿了,拋出IllegalStateException異常,否則返回true

offer方法如果隊(duì)列滿了,返回false,否則返回true

add方法和offer方法不會(huì)阻塞線程,put方法如果隊(duì)列滿了會(huì)阻塞線程,直到有線程消費(fèi)了隊(duì)列里的數(shù)據(jù)才有可能被喚醒。

這3個(gè)方法內(nèi)部都會(huì)使用可重入鎖保證原子性。

數(shù)據(jù)的刪除

ArrayBlockingQueue有不同的幾個(gè)數(shù)據(jù)刪除方法,poll、take、remove方法。

poll方法:

public E poll() {      final ReentrantLock lock = this.lock;      lock.lock(); // 加鎖,保證調(diào)用poll方法的時(shí)候只有1個(gè)線程      try {          return (count == 0) ? null : extract(); // 如果隊(duì)列里沒元素了,返回null,否則調(diào)用extract方法      } finally {          lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用poll方法      }  }

poll方法內(nèi)部調(diào)用extract方法:

private E extract() {      final Object[] items = this.items;      E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素      items[takeIndex] = null; // 對(duì)應(yīng)取索引上的數(shù)據(jù)清空      takeIndex = inc(takeIndex); // 取數(shù)據(jù)索引+1,當(dāng)索引滿了變成0      --count; // 元素個(gè)數(shù)-1      notFull.signal(); // 使用條件對(duì)象notFull通知,比如使用put方法放數(shù)據(jù)的時(shí)候隊(duì)列已滿,被阻塞。這個(gè)時(shí)候消費(fèi)了一條數(shù)據(jù),隊(duì)列沒滿了,就需要調(diào)用signal進(jìn)行通知      return x; // 返回元素  }

take方法:

public E take() throws InterruptedException {      final ReentrantLock lock = this.lock;      lock.lockInterruptibly(); // 加鎖,保證調(diào)用take方法的時(shí)候只有1個(gè)線程      try {          while (count == 0) // 如果隊(duì)列空,阻塞當(dāng)前線程,并加入到條件對(duì)象notEmpty的等待隊(duì)列里              notEmpty.await(); // 線程阻塞并被掛起,同時(shí)釋放鎖          return extract(); // 調(diào)用extract方法      } finally {          lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用take方法      }  }

remove方法:

public boolean remove(Object o) {      if (o == null) return false;      final Object[] items = this.items;      final ReentrantLock lock = this.lock;      lock.lock(); // 加鎖,保證調(diào)用remove方法的時(shí)候只有1個(gè)線程      try {          for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素              if (o.equals(items[i])) { // 兩個(gè)對(duì)象相等的話                  removeAt(i); // 調(diào)用removeAt方法                  return true; // 刪除成功,返回true              }          }          return false; // 刪除成功,返回false      } finally {          lock.unlock(); // 釋放鎖,讓其他線程可以調(diào)用remove方法      }  }

removeAt方法:

void removeAt(int i) {      final Object[] items = this.items;      if (i == takeIndex) { // 如果要?jiǎng)h除數(shù)據(jù)的索引是取索引位置,直接刪除取索引位置上的數(shù)據(jù),然后取索引+1即可          items[takeIndex] = null;          takeIndex = inc(takeIndex);      } else { // 如果要?jiǎng)h除數(shù)據(jù)的索引不是取索引位置,移動(dòng)元素元素,更新取索引和放索引的值          for (;;) {              int nexti = inc(i);              if (nexti != putIndex) {                  items[i] = items[nexti];                  i = nexti;              } else {                  items[i] = null;                  putIndex = i;                  break;              }          }      }      --count; // 元素個(gè)數(shù)-1      notFull.signal(); // 使用條件對(duì)象notFull通知,比如使用put方法放數(shù)據(jù)的時(shí)候隊(duì)列已滿,被阻塞。這個(gè)時(shí)候消費(fèi)了一條數(shù)據(jù),隊(duì)列沒滿了,就需要調(diào)用signal進(jìn)行通知   }

ArrayBlockingQueue的刪除數(shù)據(jù)方法有poll,take,remove這3個(gè)方法,總結(jié)如下:

poll方法對(duì)于隊(duì)列為空的情況,返回null,否則返回隊(duì)列頭部元素。

remove方法取的元素是基于對(duì)象的下標(biāo)值,刪除成功返回true,否則返回false。

poll方法和remove方法不會(huì)阻塞線程。

take方法對(duì)于隊(duì)列為空的情況,會(huì)阻塞并掛起當(dāng)前線程,直到有數(shù)據(jù)加入到隊(duì)列中。

這3個(gè)方法內(nèi)部都會(huì)調(diào)用notFull.signal方法通知正在等待隊(duì)列滿情況下的阻塞線程。

LinkedBlockingQueue

LinkedBlockingQueue是一個(gè)使用鏈表完成隊(duì)列操作的阻塞隊(duì)列。鏈表是單向鏈表,而不是雙向鏈表。

內(nèi)部使用放鎖和拿鎖,這兩個(gè)鎖實(shí)現(xiàn)阻塞(“two lock queue” algorithm)。

它帶有的屬性如下:

// 容量大小  private final int capacity;     // 元素個(gè)數(shù),因?yàn)橛?個(gè)鎖,存在競(jìng)態(tài)條件,使用AtomicInteger  private final AtomicInteger count = new AtomicInteger(0);     // 頭結(jié)點(diǎn)  private transient Node<E> head;     // 尾節(jié)點(diǎn)  private transient Node<E> last;     // 拿鎖  private final ReentrantLock takeLock = new ReentrantLock();     // 拿鎖的條件對(duì)象  private final Condition notEmpty = takeLock.newCondition();     // 放鎖  private final ReentrantLock putLock = new ReentrantLock();     // 放鎖的條件對(duì)象  private final Condition notFull = putLock.newCondition();

ArrayBlockingQueue只有1個(gè)鎖,添加數(shù)據(jù)和刪除數(shù)據(jù)的時(shí)候只能有1個(gè)被執(zhí)行,不允許并行執(zhí)行。

而LinkedBlockingQueue有2個(gè)鎖,放鎖和拿鎖,添加數(shù)據(jù)和刪除數(shù)據(jù)是可以并行進(jìn)行的,當(dāng)然添加數(shù)據(jù)和刪除數(shù)據(jù)的時(shí)候只能有1個(gè)線程各自執(zhí)行。

數(shù)據(jù)的添加

LinkedBlockingQueue有不同的幾個(gè)數(shù)據(jù)添加方法,add、offer、put方法。

add方法內(nèi)部調(diào)用offer方法:

public boolean offer(E e) {      if (e == null) throw new NullPointerException(); // 不允許空元素      final AtomicInteger count = this.count;      if (count.get() == capacity) // 如果容量滿了,返回false          return false;      int c = -1;      Node<E> node = new Node(e); // 容量沒滿,以新元素構(gòu)造節(jié)點(diǎn)      final ReentrantLock putLock = this.putLock;      putLock.lock(); // 放鎖加鎖,保證調(diào)用offer方法的時(shí)候只有1個(gè)線程      try {          if (count.get() < capacity) { // 再次判斷容量是否已滿,因?yàn)榭赡苣面i在進(jìn)行消費(fèi)數(shù)據(jù),沒滿的話繼續(xù)執(zhí)行              enqueue(node); // 節(jié)點(diǎn)添加到鏈表尾部              c = count.getAndIncrement(); // 元素個(gè)數(shù)+1              if (c + 1 < capacity) // 如果容量還沒滿                  notFull.signal(); // 在放鎖的條件對(duì)象notFull上喚醒正在等待的線程,表示可以再次往隊(duì)列里面加數(shù)據(jù)了,隊(duì)列還沒滿          }      } finally {          putLock.unlock(); // 釋放放鎖,讓其他線程可以調(diào)用offer方法      }      if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費(fèi)數(shù)據(jù),count會(huì)變化。這里的if條件表示如果隊(duì)列中還有1條數(shù)據(jù)          signalNotEmpty(); // 在拿鎖的條件對(duì)象notEmpty上喚醒正在等待的1個(gè)線程,表示隊(duì)列里還有1條數(shù)據(jù),可以進(jìn)行消費(fèi)      return c >= 0; // 添加成功返回true,否則返回false  }

put方法:

public void put(E e) throws InterruptedException {      if (e == null) throw new NullPointerException(); // 不允許空元素      int c = -1;      Node<E> node = new Node(e); // 以新元素構(gòu)造節(jié)點(diǎn)      final ReentrantLock putLock = this.putLock;      final AtomicInteger count = this.count;      putLock.lockInterruptibly(); // 放鎖加鎖,保證調(diào)用put方法的時(shí)候只有1個(gè)線程      try {          while (count.get() == capacity) { // 如果容量滿了              notFull.await(); // 阻塞并掛起當(dāng)前線程          }          enqueue(node); // 節(jié)點(diǎn)添加到鏈表尾部          c = count.getAndIncrement(); // 元素個(gè)數(shù)+1          if (c + 1 < capacity) // 如果容量還沒滿              notFull.signal(); // 在放鎖的條件對(duì)象notFull上喚醒正在等待的線程,表示可以再次往隊(duì)列里面加數(shù)據(jù)了,隊(duì)列還沒滿      } finally {          putLock.unlock(); // 釋放放鎖,讓其他線程可以調(diào)用put方法      }      if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費(fèi)數(shù)據(jù),count會(huì)變化。這里的if條件表示如果隊(duì)列中還有1條數(shù)據(jù)          signalNotEmpty(); // 在拿鎖的條件對(duì)象notEmpty上喚醒正在等待的1個(gè)線程,表示隊(duì)列里還有1條數(shù)據(jù),可以進(jìn)行消費(fèi)  }

LinkedBlockingQueue的添加數(shù)據(jù)方法add,put,offer跟ArrayBlockingQueue一樣,不同的是它們的底層實(shí)現(xiàn)不一樣。

ArrayBlockingQueue中放入數(shù)據(jù)阻塞的時(shí)候,需要消費(fèi)數(shù)據(jù)才能喚醒。

而LinkedBlockingQueue中放入數(shù)據(jù)阻塞的時(shí)候,因?yàn)樗鼉?nèi)部有2個(gè)鎖,可以并行執(zhí)行放入數(shù)據(jù)和消費(fèi)數(shù)據(jù),不僅在消費(fèi)數(shù)據(jù)的時(shí)候進(jìn)行喚醒插入阻塞的線程,同時(shí)在插入的時(shí)候如果容量還沒滿,也會(huì)喚醒插入阻塞的線程。

數(shù)據(jù)的刪除

LinkedBlockingQueue有不同的幾個(gè)數(shù)據(jù)刪除方法,poll、take、remove方法。

poll方法:

public E poll() {      final AtomicInteger count = this.count;      if (count.get() == 0) // 如果元素個(gè)數(shù)為0          return null; // 返回null      E x = null;      int c = -1;      final ReentrantLock takeLock = this.takeLock;      takeLock.lock(); // 拿鎖加鎖,保證調(diào)用poll方法的時(shí)候只有1個(gè)線程      try {          if (count.get() > 0) { // 判斷隊(duì)列里是否還有數(shù)據(jù)              x = dequeue(); // 刪除頭結(jié)點(diǎn)              c = count.getAndDecrement(); // 元素個(gè)數(shù)-1              if (c > 1) // 如果隊(duì)列里還有元素                  notEmpty.signal(); // 在拿鎖的條件對(duì)象notEmpty上喚醒正在等待的線程,表示隊(duì)列里還有數(shù)據(jù),可以再次消費(fèi)          }      } finally {          takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調(diào)用poll方法      }      if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數(shù)據(jù),count會(huì)變化。這里的if條件表示如果隊(duì)列中還可以再插入數(shù)據(jù)          signalNotFull(); // 在放鎖的條件對(duì)象notFull上喚醒正在等待的1個(gè)線程,表示隊(duì)列里還能再次添加數(shù)據(jù)                  return x;  }

take方法:

public E take() throws InterruptedException {      E x;      int c = -1;      final AtomicInteger count = this.count;      final ReentrantLock takeLock = this.takeLock;      takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調(diào)用take方法的時(shí)候只有1個(gè)線程      try {          while (count.get() == 0) { // 如果隊(duì)列里已經(jīng)沒有元素了              notEmpty.await(); // 阻塞并掛起當(dāng)前線程          }          x = dequeue(); // 刪除頭結(jié)點(diǎn)          c = count.getAndDecrement(); // 元素個(gè)數(shù)-1          if (c > 1) // 如果隊(duì)列里還有元素              notEmpty.signal(); // 在拿鎖的條件對(duì)象notEmpty上喚醒正在等待的線程,表示隊(duì)列里還有數(shù)據(jù),可以再次消費(fèi)      } finally {          takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調(diào)用take方法      }      if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數(shù)據(jù),count會(huì)變化。這里的if條件表示如果隊(duì)列中還可以再插入數(shù)據(jù)          signalNotFull(); // 在放鎖的條件對(duì)象notFull上喚醒正在等待的1個(gè)線程,表示隊(duì)列里還能再次添加數(shù)據(jù)      return x;  }

remove方法:

public boolean remove(Object o) {      if (o == null) return false;      fullyLock(); // remove操作要移動(dòng)的位置不固定,2個(gè)鎖都需要加鎖      try {          for (Node<E> trail = head, p = trail.next; // 從鏈表頭結(jié)點(diǎn)開始遍歷               p != null;               trail = p, p = p.next) {              if (o.equals(p.item)) { // 判斷是否找到對(duì)象                  unlink(p, trail); // 修改節(jié)點(diǎn)的鏈接信息,同時(shí)調(diào)用notFull的signal方法                  return true;              }          }          return false;      } finally {          fullyUnlock(); // 2個(gè)鎖解鎖      }  }

LinkedBlockingQueue的take方法對(duì)于沒數(shù)據(jù)的情況下會(huì)阻塞,poll方法刪除鏈表頭結(jié)點(diǎn),remove方法刪除指定的對(duì)象。

需要注意的是remove方法由于要?jiǎng)h除的數(shù)據(jù)的位置不確定,需要2個(gè)鎖同時(shí)加鎖。

以上就是Java阻塞隊(duì)列的實(shí)現(xiàn)原理是什么,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI