溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么

發(fā)布時間:2021-10-12 14:28:19 來源:億速云 閱讀:164 作者:iii 欄目:編程語言

本篇內(nèi)容介紹了“Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么

Java 7的ConcurrenHashMap的源碼我建議大家都看看,那個版本的源碼就是Java多線程編程的教科書。在Java 7的源碼中,作者對悲觀鎖的使用非常謹慎,大多都轉(zhuǎn)換為自旋鎖加volatile獲得相同的語義,即使最后迫不得已要用,作者也會通過各種技巧減少鎖的臨界區(qū)。在上一篇文章中我們也有講到,自旋鎖在臨界區(qū)比較小的時候是一個較優(yōu)的選擇是因為它避免了線程由于阻塞而切換上下文,但本質(zhì)上它也是個鎖,在自旋等待期間只有一個線程能進入臨界區(qū),其他線程只會自旋消耗CPU的時間片。Java 8中ConcurrentHashMap的實現(xiàn)通過一些巧妙的設(shè)計和技巧,避開了自旋鎖的局限,提供了更高的并發(fā)性能。如果說Java 7版本的源碼是在教我們?nèi)绾螌⒈^鎖轉(zhuǎn)換為自旋鎖,那么在Java 8中我們甚至可以看到如何將自旋鎖轉(zhuǎn)換為無鎖的方法和技巧。

把書讀薄

Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么

image

圖片來源:https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/

在開始本文之前,大家首先在心里還是要有這樣的一張圖,如果有同學(xué)對HashMap比較熟悉,那這張圖也應(yīng)該不會陌生。事實上在整體的數(shù)據(jù)結(jié)構(gòu)的設(shè)計上Java 8的ConcurrentHashMap和HashMap基本上是一致的。

Java 7中ConcurrentHashMap為了提升性能使用了很多的編程技巧,但是引入Segment的設(shè)計還是有很大的改進空間的,Java 7中ConcurrrentHashMap的設(shè)計有下面這幾個可以改進的點:

  1.  Segment在擴容的時候非擴容線程對本Segment的寫操作時都要掛起等待的

  2.  對ConcurrentHashMap的讀操作需要做兩次哈希尋址,在讀多寫少的情況下其實是有額外的性能損失的

  3.  盡管size()方法的實現(xiàn)中先嘗試無鎖讀,但是如果在這個過程中有別的線程做寫入操作,那調(diào)用size()的這個線程就會給整個ConcurrentHashMap加鎖,這是整個ConcurrrentHashMap唯一一個全局鎖,這點對底層的組件來說還是有性能隱患的

  4.  極端情況下(比如客戶端實現(xiàn)了一個性能很差的哈希函數(shù))get()方法的復(fù)雜度會退化到O(n)。

針對1和2,在Java 8的設(shè)計是廢棄了Segment的使用,將悲觀鎖的粒度降低至桶維度,因此調(diào)用get的時候也不需要再做兩次哈希了。size()的設(shè)計是Java 8版本中最大的亮點,我們在后面的文章中會詳細說明。至于紅黑樹,這篇文章仍然不做過多闡述。接下來的篇幅會深挖細節(jié),把書讀厚,涉及到的模塊有:初始化,put方法, 擴容方法transfer以及size()方法,而其他模塊,比如hash函數(shù)等改變較小,故不再深究。

準備知識

ForwardingNode

static final class ForwardingNode<K,V> extends Node<K,V> {      final Node<K,V>[] nextTable;      ForwardingNode(Node<K,V>[] tab) {          // MOVED = -1,F(xiàn)orwardingNode的哈希值為-1          super(MOVED, null, null, null);          this.nextTable = tab;      }  }

除了普通的Node和TreeNode之外,ConcurrentHashMap還引入了一個新的數(shù)據(jù)類型ForwardingNode,我們這里只展示他的構(gòu)造方法,F(xiàn)orwardingNode的作用有兩個:

  •  在動態(tài)擴容的過程中標志某個桶已經(jīng)被復(fù)制到了新的桶數(shù)組中

  •  如果在動態(tài)擴容的時候有g(shù)et方法的調(diào)用,則ForwardingNode將會把請求轉(zhuǎn)發(fā)到新的桶數(shù)組中,以避免阻塞get方法的調(diào)用,F(xiàn)orwardingNode在構(gòu)造的時候會將擴容后的桶數(shù)組nextTable保存下來。

UNSAFE.compareAndSwap***

這是在Java 8版本的ConcurrentHashMap實現(xiàn)CAS的工具,以int類型為例其方法定義如下:

/**  * Atomically update Java variable to <tt>x</tt> if it is currently  * holding <tt>expected</tt>.  * @return <tt>true</tt> if successful  */  public final native boolean compareAndSwapInt(Object o, long offset,                                                int expected,                                                int x);

相應(yīng)的語義為:

如果對象o起始地址偏移量為offset的值等于expected,則將該值設(shè)為x,并返回true表明更新成功,否則返回false,表明CAS失敗

初始化 

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {      if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 檢查參數(shù)          throw new IllegalArgumentException();      if (initialCapacity < concurrencyLevel)          initialCapacity = concurrencyLevel;      long size = (long)(1.0 + (long)initialCapacity / loadFactor);      int cap = (size >= (long)MAXIMUM_CAPACITY) ?          MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor,求不小于size的 2^n的算法,jdk1.8的HashMap中說過      this.sizeCtl = cap;   }

即使是最復(fù)雜的一個初始化方法代碼也是比較簡單的,這里我們只需要注意兩個點:

  •  concurrencyLevel在Java 7中是Segment數(shù)組的長度,由于在Java 8中已經(jīng)廢棄了Segment,因此concurrencyLevel只是一個保留字段,無實際意義

  •  sizeCtl這個值第一次出現(xiàn),這個值如果等于-1則表明系統(tǒng)正在初始化,如果是其他負數(shù)則表明系統(tǒng)正在擴容,在擴容時sizeCtl二進制的低十六位等于擴容的線程數(shù)加一,高十六位(除符號位之外)包含桶數(shù)組的大小信息

put方法 

public V put(K key, V value) {      return putVal(key, value, false);  }

put方法將調(diào)用轉(zhuǎn)發(fā)到putVal方法:

final V putVal(K key, V value, boolean onlyIfAbsent) {      if (key == null || value == null) throw new NullPointerException();      int hash = spread(key.hashCode());      int binCount = 0;      for (Node<K,V>[] tab = table;;) {          Node<K,V> f; int n, i, fh;          // 【A】延遲初始化          if (tab == null || (n = tab.length) == 0)              tab = initTable();          // 【B】當前桶是空的,直接更新          else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {              if (casTabAt(tab, i, null,                              new Node<K,V>(hash, key, value, null)))                  break;                   // no lock when adding to empty bin          }         // 【C】如果當前的桶的第一個元素是一個ForwardingNode節(jié)點,則該線程嘗試加入擴容          else if ((ffh = f.hash) == MOVED)              tab = helpTransfer(tab, f);          // 【D】否則遍歷桶內(nèi)的鏈表或樹,并插入          else {              // 暫時折疊起來,后面詳細看          }      }      // 【F】流程走到此處,說明已經(jīng)put成功,map的記錄總數(shù)加一      addCount(1L, binCount);      return null;  }

從整個代碼結(jié)構(gòu)上來看流程還是比較清楚的,我用括號加字母的方式標注了幾個非常重要的步驟,put方法依然牽扯出很多的知識點

桶數(shù)組的初始化 

private final Node<K,V>[] initTable() {      Node<K,V>[] tab; int sc;      while ((tab = table) == null || tab.length == 0) {          if ((sc = sizeCtl) < 0)              // 說明已經(jīng)有線程在初始化了,本線程開始自旋              Thread.yield(); // lost initialization race; just spin          else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {              // CAS保證只有一個線程能走到這個分支              try {                  if ((tab = table) == null || tab.length == 0) {                      int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                      @SuppressWarnings("unchecked")                      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                      tabtable = tab = nt;                      // sc = n - n/4 = 0.75n                      sc = n - (n >>> 2);                  }              } finally {                  // 恢復(fù)sizeCtl > 0相當于釋放鎖                  sizeCtl = sc;              }              break;          }      }      return tab;  }

在初始化桶數(shù)組的過程中,系統(tǒng)如何保證不會出現(xiàn)并發(fā)問題呢,關(guān)鍵點在于自旋鎖的使用,當有多個線程都執(zhí)行initTable方法的時候,CAS可以保證只有一個線程能夠進入到真正的初始化分支,其他線程都是自旋等待。這段代碼中我們關(guān)注三點即可:

  •  依照前文所述,當有線程開始初始化桶數(shù)組時,會通過CAS將sizeCtl置為-1,其他線程以此為標志開始自旋等待

  •  當桶數(shù)組初始化結(jié)束后將sizeCtl的值恢復(fù)為正數(shù),其值等于0.75倍的桶數(shù)組長度,這個值的含義和之前HashMap中的THRESHOLD一致,是系統(tǒng)觸發(fā)擴容的臨界點

  •  在finally語句中對sizeCtl的操作并沒有使用CAS是因為CAS保證只有一個線程能夠執(zhí)行到這個地方

添加桶數(shù)組第一個元素 

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {      return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);  }  static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,                                      Node<K,V> c, Node<K,V> v) {      return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  }

put方法的第二個分支會用tabAt判斷當前桶是否是空的,如果是則會通過CAS寫入,tabAt通過UNSAFE接口會拿到桶中的最新元素,casTabAt通過CAS保證不會有并發(fā)問題,如果CAS失敗,則通過循環(huán)再進入其他分支

判斷是否需要新增線程擴容 

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {      Node<K,V>[] nextTab; int sc;      if (tab != null && (f instanceof ForwardingNode) &&          (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {          int rs = resizeStamp(tab.length);          while (nextTab == nextTable && table == tab &&                  (sc = sizeCtl) < 0) {              // RESIZE_STAMP_SHIFT = 16              if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                  sc == rs + MAX_RESIZERS || transferIndex <= 0)                  break;              // 這里將sizeCtl的值自增1,表明參與擴容的線程數(shù)量+1              if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {                  transfer(tab, nextTab);                  break;              }          }          return nextTab;      }      return table;  }

在這個地方我們就要詳細說下sizeCtl這個標志位了,臨時變量rs由resizeStamp這個方法返回

static final int resizeStamp(int n) {      // RESIZE_STAMP_BITS = 16      return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));  }

因為入?yún)是一個int類型的值,所有Integer.numberOfLeadingZeros(n)的返回值介于0到32之間,如果轉(zhuǎn)換成二進制

  •  Integer.numberOfLeadingZeros(n)的最大值是:00000000 00000000 00000000 00100000

  •  Integer.numberOfLeadingZeros(n)的最小值是:00000000 00000000 00000000 00000000

因此resizeStampd的返回值也就介于00000000 00000000 10000000 00000000到00000000 00000000 10000000 00100000之間,從這個返回值的范圍可以看出來resizeStamp的返回值高16位全都是0,是不包含任何信息的。因此在ConcurrrentHashMap中,會把resizeStamp的返回值左移16位拼到sizeCtl中,這就是為什么sizeCtl的高16位包含整個Map大小的原理。有了這個分析,這段代碼中比較長的if判斷也就能看懂了

if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||      sc == rs + MAX_RESIZERS || transferIndex <= 0)      break;      (sc >>> RESIZE_STAMP_SHIFT) != rs保證所有線程要基于同一個舊的桶數(shù)組擴容      transferIndex <= 0已經(jīng)有線程完成擴容任務(wù)了

至于sc == rs + 1 || sc == rs + MAX_RESIZERS這兩個判斷條件如果是細心的同學(xué)一定會覺得難以理解,這個地方確實是JDK的一個BUG,這個BUG已經(jīng)在JDK 12中修復(fù),詳細情況可以參考一下Oracle的官網(wǎng):https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427,這兩個判斷條件應(yīng)該寫成這樣:sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,因為直接比較rs和sc是沒有意義的,必須要有移位操作。它表達的含義是

  •  sc == (rs << RESIZE_STAMP_SHIFT) + 1當前擴容的線程數(shù)為0,即已經(jīng)擴容完成了,就不需要再新增線程擴容

  •  sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS參與擴容的線程數(shù)已經(jīng)到了最大,就不需要再新增線程擴容

真正擴容的邏輯在transfer方法中,我們后面會詳細看,不過有個小細節(jié)可以提前注意,如果nextTable已經(jīng)初始化了,transfer會返回nextTable的的引用,后續(xù)可以直接操作新的桶數(shù)組。

插入新值

如果桶數(shù)組已經(jīng)初始化好了,該擴容的也擴容了,并且根據(jù)哈希定位到的桶中已經(jīng)有元素了,那流程就跟普通的HashMap一樣了,唯一一點不同的就是,這時候要給當前的桶加鎖,且看代碼:

final V putVal(K key, V value, boolean onlyIfAbsent) {      if (key == null || value == null) throw new NullPointerException();      int hash = spread(key.hashCode());      int binCount = 0;      for (Node<K,V>[] tab = table;;) {          Node<K,V> f; int n, i, fh;          if (tab == null || (n = tab.length) == 0)// 折疊          else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 折疊}          else if ((ffh = f.hash) == MOVED)// 折疊          else {              V oldVal = null;              synchronized (f) {                  // 要注意這里這個不起眼的判斷條件                  if (tabAt(tab, i) == f) {                      if (fh >= 0) { // fh>=0的節(jié)點是鏈表,否則是樹節(jié)點或者ForwardingNode                          binCount = 1;                          for (Node<K,V> e = f;; ++binCount) {                              K ek;                              if (e.hash == hash &&                                  ((eek = e.key) == key ||                                      (ek != null && key.equals(ek)))) {                                  oldVal = e.val; // 如果鏈表中有值了,直接更新                                  if (!onlyIfAbsent)                                      e.val = value;                                  break;                              }                              Node<K,V> pred = e;                              if ((ee = e.next) == null) {                                  // 如果流程走到這里,則說明鏈表中還沒值,直接連接到鏈表尾部                                  pred.next = new Node<K,V>(hash, key, value, null);                                  break;                              }                          }                      }                      // 紅黑樹的操作先略過                  }              }          }      }      // put成功,map的元素個數(shù)+1      addCount(1L, binCount);      return null;  }

這段代碼中要特備注意一個不起眼的判斷條件(上下文在源碼上已經(jīng)標注出來了):tabAt(tab, i) == f,這個判斷的目的是為了處理調(diào)用put方法的線程和擴容線程的競爭。因為synchronized是阻塞鎖,如果調(diào)用put方法的線程恰好和擴容線程同時操作同一個桶,且調(diào)用put方法的線程競爭鎖失敗,等到該線程重新獲取到鎖的時候,當前桶中的元素就會變成一個ForwardingNode,那就會出現(xiàn)tabAt(tab, i) != f的情況。

多線程動態(tài)擴容 

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {      int n = tab.length, stride;      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)          stride = MIN_TRANSFER_STRIDE; // subdivide range      if (nextTab == null) {            // 初始化新的桶數(shù)組          try {              @SuppressWarnings("unchecked")              Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];              nextTab = nt;          } catch (Throwable ex) {      // try to cope with OOME              sizeCtl = Integer.MAX_VALUE;              return;         }          nextTabnextTable = nextTab;          transferIndex = n;      }      int nextn = nextTab.length;      ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);      boolean advance = true;      boolean finishing = false; // to ensure sweep before committing nextTab      for (int i = 0, bound = 0;;) {          Node<K,V> f; int fh;          while (advance) {              int nextIndex, nextBound;             if (--i >= bound || finishing)                  advance = false;              else if ((nextIndex = transferIndex) <= 0) {                  i = -1;                  advance = false;              }              else if (U.compareAndSwapInt                          (this, TRANSFERINDEX, nextIndex,                          nextBound = (nextIndex > stride ?                                      nextIndex - stride : 0))) {                  bound = nextBound;                  i = nextIndex - 1;                  advance = false;              }          }          if (i < 0 || i >= n || i + n >= nextn) {              int sc;              if (finishing) {                  nextTable = null;                  table = nextTab;                  sizeCtl = (n << 1) - (n >>> 1);                  return;              }              if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                  // 判斷是會否是最后一個擴容線程                  if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                      return;                  finishing = advance = true;                  i = n; // recheck before commit              }          }          else if ((f = tabAt(tab, i)) == null)              advance = casTabAt(tab, i, null, fwd);          else if ((ffh = f.hash) == MOVED) // 只有最后一個擴容線程才有機會執(zhí)行這個分支              advance = true; // already processed          else { // 復(fù)制過程與HashMap類似,這里不再贅述              synchronized (f) {                 // 折疊              }          }      }  }

在深入到源碼細節(jié)之前我們先根據(jù)下圖看一下在Java 8中ConcurrentHashMap擴容的幾個特點:

  •  新的桶數(shù)組nextTable是原先桶數(shù)組長度的2倍,這與之前HashMap一致

  •  參與擴容的線程也是分段將table中的元素復(fù)制到新的桶數(shù)組nextTable中

  •  桶一個桶數(shù)組中的元素在新的桶數(shù)組中均勻的分布在兩個桶中,桶下標相差n(舊的桶數(shù)組的長度),這一點依然與HashMap保持一致

Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么

image-20210424202636495

各個線程之間如何通力協(xié)作

先看一個關(guān)鍵的變量transferIndex,這是一個被volatile修飾的變量,這一點可以保證所有線程讀到的一定是最新的值。

private transient volatile int transferIndex;

這個值會被第一個參與擴容的線程初始化,因為只有第一個參與擴容的線程才滿足條件nextTab == null

if (nextTab == null) {            // initiating      try {          @SuppressWarnings("unchecked")          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];          nextTab = nt;      } catch (Throwable ex) {      // try to cope with OOME          sizeCtl = Integer.MAX_VALUE;          return;     }      nextTabnextTable = nextTab;      transferIndex = n;  }

在了解了transferIndex屬性的基礎(chǔ)上,上面的這個循環(huán)就好理解了

while (advance) {      int nextIndex, nextBound;        // 當bound <= i <= transferIndex的時候i自減跳出這個循環(huán)繼續(xù)干活      if (--i >= bound || finishing)          advance = false;      // 擴容的所有任務(wù)已經(jīng)被認領(lǐng)完畢,本線程結(jié)束干活      else if ((nextIndex = transferIndex) <= 0) {          i = -1;          advance = false;      }      // 否則認領(lǐng)新的一段復(fù)制任務(wù),并通過`CAS`更新transferIndex的值      else if (U.compareAndSwapInt                  (this, TRANSFERINDEX, nextIndex,                  nextBound = (nextIndex > stride ?                              nextIndex - stride : 0))) {          bound = nextBound;          i = nextIndex - 1;          advance = false;      }  }

transferIndex就像是一個游標,每個線程認領(lǐng)一段復(fù)制任務(wù)的時候都會通過CAS將其更新為transferIndex - stride, CAS可以保證transferIndex可以按照stride這個步長降到0。

最后一個擴容線程需要二次確認?

對于每一個擴容線程,for循環(huán)的變量i代表要復(fù)制的桶的在桶數(shù)組中的下標,這個值的上限和下限通過游標transferIndex和步長stride計算得來,當i減小為負數(shù),則說明當前擴容線程完成了擴容任務(wù),這時候流程會走到這個分支:

// i >= n || i + n >= nextn現(xiàn)在看來取不到  if (i < 0 || i >= n || i + n >= nextn) {      int sc;      if (finishing) { // 【A】完成整個擴容過程          nextTable = null;          table = nextTab;          sizeCtl = (n << 1) - (n >>> 1);           return;      }      // 【B】判斷是否是最后一個擴容線程,如果是,則需要重新掃描一遍桶數(shù)組,做二次確認      if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {          // (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 說明是最后一個擴容線程          if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)              return;          // 重新掃描一遍桶數(shù)組,做二次確認          finishing = advance = true;          i = n; // recheck before commit      }  }

因為變量finishing被初始化為false,所以當線程第一次進入這個if分支的話,會先執(zhí)行注釋為【B】的這個分支,同時因為sizeCtl的低16位被初始化為參與擴容的線程數(shù)加一,因此,當條件(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT滿足時,就能證明當前線程就是最后一個擴容線程了,這這時候?qū)置為n重新掃描一遍桶數(shù)組,并且將finishing置為true保證當桶數(shù)組被掃描結(jié)束后能夠進入注釋為【A】的分支結(jié)束擴容。

這里就有一個問題,按照我們前面的分析,擴容線程能夠通力協(xié)作,保證各自負責的桶數(shù)組的分段不重不漏,這里為什么還需要做二次確認么?有一個開發(fā)者在concurrency-interest這個郵件列表中也關(guān)于這件事咨詢了Doug Lea(地址:http://cs.oswego.edu/pipermail/concurrency-interest/2020-July/017171.html),他給出的回復(fù)是:

Yes, this is a valid point; thanks. The post-scan was needed in a previous version, and could be removed. It does not trigger often enough to matter though, so is for now another minor tweak that might be included next time CHM is updated.

雖然Doug在郵件中的措辭用了could be, not often enough等,但也確認了最后一個擴容線程的二次檢查是沒有必要的。具體的復(fù)制過程與HashMap類似,感興趣的讀者可以翻一下高端的面試從來不會在HashMap的紅黑樹上糾纏太多這篇文章。

size()方法

addCount()方法 

// 記錄map元素總數(shù)的成員變量  private transient volatile long baseCount;

在put方法的最后,有一個addCount方法,因為putVal執(zhí)行到此處說明已經(jīng)成功新增了一個元素,所以addCount方法的作用就是維護當前ConcurrentHashMap的元素總數(shù),在ConcurrentHashMap中有一個變量baseCount用來記錄map中元素的個數(shù),如下圖所示,如果同一時刻有n個線程通過CAS同時操作baseCount變量,有且僅有一個線程會成功,其他線程都會陷入無休止的自旋當中,那一定會帶來性能瓶頸。

Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么

image-20210420221407349

為了避免大量線程都在自旋等待寫入baseCount,ConcurrentHashMap引入了一個輔助隊列,如下圖所示,現(xiàn)在操作baseCount的線程可以分散到這個輔助隊列中去了,調(diào)用size()的時候只需要將baseCount和輔助隊列中的數(shù)值相加即可,這樣就實現(xiàn)了調(diào)用size()無需加鎖。

Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么

image-20210420222306734

輔助隊列是一個類型為CounterCell的數(shù)組:

@sun.misc.Contended static final class CounterCell {      volatile long value;      CounterCell(long x) { value = x; }  }

可以簡單理解為只是包裝了一個long型的變量value,還需要解決一個問題是,對于某個具體的線程它是如何知道操作輔助隊列中的哪個值呢?答案是下面的這個方法:

static final int getProbe() {      return UNSAFE.getInt(Thread.currentThread(), PROBE);  }

getProbe方法會返回當前線程的一個唯一身份碼,這個值是不會變的,因此可以將getProbe的返回值與輔助隊列的長度作求余運算得到具體的下標,它的返回值可能是0,如果返回0則需要調(diào)用ThreadLocalRandom.localInit()初始化。addCount方法中有兩個細節(jié)需要注意

private final void addCount(long x, int check) {      CounterCell[] as; long b, s;      // 注意這里的判斷條件,是有技巧的      if ((as = counterCells) != null ||          !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {          CounterCell a; long v; int m;          boolean uncontended = true;          if (as == null || (m = as.length - 1) < 0 ||              (a = as[ThreadLocalRandom.getProbe() & m]) == null ||              // 變量uncontended記錄著這個CAS操作是否成功              !(uncontended =                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {              fullAddCount(x, uncontended);              return;          }          if (check <= 1)              return;          s = sumCount();      }      if (check >= 0) {          // 檢查是否需要擴容,后面再詳細看      }  }

細節(jié)一:

首先我們要注意方法中剛進來的if判斷條件:

if ((as = counterCells) != null ||      !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {  }

作者在這里巧妙的運用了邏輯短路,如果(as = counterCells) != null則后面的CAS是不會執(zhí)行的,為什么要這么設(shè)置呢?作者有兩點考慮:

  1.  原因在于如果(as = counterCells) != null,則說明輔助隊列已經(jīng)初始化好了,相比于所有的線程都自旋等待baseCount這一個變量,讓線程通過CAS去操作隊列中的值有更大的可能性成功,因為輔助隊列的最大長度為大于當前處理器個數(shù)的2的正整數(shù)冪,可以支持更大的并發(fā)

  2.  如果輔助隊列還沒有初始化好,直到有必要的時候再去創(chuàng)建隊列,如何判斷“必要性”呢?就看對baseCount的CAS操作能否成功,如果失敗,就說明當前系統(tǒng)的并發(fā)已經(jīng)比較高了,需要隊列的輔助,否則直接操作baseCount

細節(jié)二:

只有當輔助隊列已存在,且由ThreadLocalRandom.getProbe()在輔助隊列中確定的位置不為null時,才對其做CAS操作,這本來是一個正常的防御性判斷,但是uncontended記錄了CAS是否成功,如果失敗,則會在fullAddCount中調(diào)用ThreadLocalRandom.advanceProbe換一個身份碼調(diào)整下當前線程在輔助隊列的位置,避免所有線程都在輔助隊列的同一個坑位自旋等待。

fullAddCount()方法 

// See LongAdder version for explanation  // wasUncontended 記錄著調(diào)用方CAS是否成功,如果失敗則換一個輔助隊列的元素繼續(xù)CAS  private final void fullAddCount(long x, boolean wasUncontended) {      int h;      if ((h = ThreadLocalRandom.getProbe()) == 0) {          ThreadLocalRandom.localInit();      // force initialization          h = ThreadLocalRandom.getProbe();          wasUncontended = true;      }      boolean collide = false;                // True if last slot nonempty      for (;;) {          CounterCell[] as; CounterCell a; int n; long v;          // 【A】如果輔助隊列已經(jīng)創(chuàng)建,則直接操作輔助隊列          if ((as = counterCells) != null && (n = as.length) > 0) {              if ((a = as[(n - 1) & h]) == null) {                  if (cellsBusy == 0) {            // Try to attach new Cell                      CounterCell r = new CounterCell(x); // Optimistic create                      if (cellsBusy == 0 &&                          U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {                          boolean created = false;                          try {               // Recheck under lock                              CounterCell[] rs; int m, j;                              if ((rs = counterCells) != null &&                                  (m = rs.length) > 0 &&                                  rs[j = (m - 1) & h] == null) {                                  rs[j] = r;                                  created = true;                              }                          } finally {                             cellsBusy = 0;                          }                          if (created)                              break;                          continue;           // Slot is now non-empty                      }                  }                  collide = false;              }              else if (!wasUncontended)       // 如果調(diào)用方CAS失敗了,本輪空跑,下一個循環(huán)換下標繼續(xù)操作                  wasUncontended = true;      // Continue after rehash              else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))                  break;              else if (counterCells != as || n >= NCPU)                   // 如果輔助隊列長度已經(jīng)超過了CPU個數(shù),本輪空跑,下一個循環(huán)換下標繼續(xù)操作                  collide = false;            // At max size or stale              else if (!collide) // 如果上一次操作失敗了(CAS失敗或者新建CounterCell失敗),本輪空跑,下一個循環(huán)換下標繼續(xù)操作                  collide = true;              else if (cellsBusy == 0 && // 如果連續(xù)兩次操作輔助隊列失敗,則考慮擴容                          U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {                  try {                      if (counterCells == as) {// Expand table unless stale                          CounterCell[] rs = new CounterCell[n << 1];                          for (int i = 0; i < n; ++i)                              rs[i] = as[i];                          counterCells = rs;                      }                  } finally {                      cellsBusy = 0;                  }                  collide = false;                  continue;                   // Retry with expanded table              }              // 如果上一次操作失敗或者調(diào)用方CAS失敗,都會走到這里,變換要操作的輔助隊列下標              h = ThreadLocalRandom.advanceProbe(h);          }          // 【B】如果輔助隊列還未創(chuàng)建,則加鎖創(chuàng)建          else if (cellsBusy == 0 && counterCells == as &&                      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {              boolean init = false;              try {                           // Initialize table                  if (counterCells == as) {                      CounterCell[] rs = new CounterCell[2];                      rs[h & 1] = new CounterCell(x);                      counterCells = rs;                      init = true;                  }              } finally {                  cellsBusy = 0;              }              if (init)                  break;          }          // 【C】如果輔助隊列創(chuàng)建失敗(拿鎖失敗),則嘗試直接操作`baseCount`          else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))              break;                          // Fall back on using base      }  }

因為counterCells是一個普通的數(shù)組,因此對其的寫操作,包括初始化,擴容以及元素的寫都需要加鎖,加鎖的方式是對全局變量cellsBusy的自旋鎖。先看最外層的三個分支:

  •  【B】如果輔助隊列還沒有創(chuàng)建,則加鎖創(chuàng)建

  •  【C】如果因為拿鎖失敗導(dǎo)致輔助隊列創(chuàng)建失敗,則嘗試自旋寫入變量baseCount,萬一真的成功了呢

  •  【A】如果輔助隊列已經(jīng)創(chuàng)建了,則直接去操作輔助隊列相應(yīng)的元素

注釋中標注【A】的這個分支代碼較多,其主要思路是如果通過CAS或者加鎖操作輔助隊列中的某個元素失敗,則首先通過調(diào)用ThreadLocalRandom.advanceProbe(h)換一個隊列中的元素繼續(xù)操作,這次操作是否成功會記錄在臨時變量collide中。如果下一次操作還是失敗,則說明此時的并發(fā)量比較大需要擴容了。如果輔助隊列的長度已經(jīng)超過了CPU的個數(shù),那就不再擴容,繼續(xù)換一個元素操作,因為同一時間能運行的線程數(shù)最大不會超過計算機的CPU個數(shù)。

在這個過程中有四個細節(jié)仍然需要注意:

細節(jié)一:

counterCells只是一個普通的數(shù)組,因此并不是線程安全的,所以對其寫操作需要加鎖保證并發(fā)安全

細節(jié)二:

加鎖的時候,作者做了一個double-check的動作,我看有的文章將其解讀為“類似于單例模式的double-check”,這個是不對的,作者這樣做的原因我們在上一篇文章中有講過,首先第一個檢查cellsBusy == 0是流程往下走的基礎(chǔ),如果cellsBusy == 1則直接拿鎖失敗退出,調(diào)用h = ThreadLocalRandom.advanceProbe(h);更新h后重試,如果cellsBusy == 0校驗通過,則調(diào)用CounterCell r = new CounterCell(x);初始化一個CounterCell,這樣做是為了減少自旋鎖的臨界區(qū)的大小,以此來提升并發(fā)性能

細節(jié)三:

在加鎖的時候先判斷下cellsBusy是否為0,如果為1那直接宣告拿鎖失敗,為什么這么做呢?因為相比于調(diào)用UNSAFE的CAS操作,直接讀取volatile的消耗更少,如果直接讀取cellsBusy已經(jīng)能判斷出拿鎖失敗,那就沒必要再調(diào)用耗時更多的CAS了

細節(jié)四:

對cellsBusy從0到1的更改調(diào)用了CAS但是從1置為0卻只用了賦值操作,這是因為CAS可以保證能走到這條語句的只有一個線程,因此可以用賦值操作來更改cellsBusy的值。

sumCount

前面兩個方法主要是把ConcurrentHashMap中的元素個數(shù)分散的記錄到baseCount和輔助隊列中,調(diào)用size()方法的時候只需要把這些值相加即可。

public int size() {      long n = sumCount();      return ((n < 0L) ? 0 :              (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :              (int)n);  }  final long sumCount() {      CounterCell[] as = counterCells; CounterCell a;      long sum = baseCount;      if (as != null) {          for (int i = 0; i < as.length; ++i) {              if ((a = as[i]) != null)                  sum += a.value;          }      }      return sum;  }

“Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI