溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java從同步容器到并發(fā)容器的操作過程

發(fā)布時間:2020-09-26 08:55:49 來源:腳本之家 閱讀:122 作者:knock_小新 欄目:編程語言

引言

容器是Java基礎類庫中使用頻率最高的一部分,Java集合包中提供了大量的容器類來幫組我們簡化開發(fā),我前面的文章中對Java集合包中的關鍵容器進行過一個系列的分析,但這些集合類都是非線程安全的,即在多線程的環(huán)境下,都需要其他額外的手段來保證數據的正確性,最簡單的就是通過synchronized關鍵字將所有使用到非線程安全的容器代碼全部同步執(zhí)行。這種方式雖然可以達到線程安全的目的,但存在幾個明顯的問題:首先編碼上存在一定的復雜性,相關的代碼段都需要添加鎖。其次這種一刀切的做法在高并發(fā)情況下性能并不理想,基本相當于串行執(zhí)行。JDK1.5中為我們提供了一系列的并發(fā)容器,集中在java.util.concurrent包下,用來解決這兩個問題,先從同步容器說起。

同步容器Vector和HashTable

為了簡化代碼開發(fā)的過程,早期的JDK在java.util包中提供了Vector和HashTable兩個同步容器,這兩個容器的實現和早期的ArrayList和HashMap代碼實現基本一樣,不同在于Vector和HashTable在每個方法上都添加了synchronized關鍵字來保證同一個實例同時只有一個線程能訪問,部分源碼如下:

//Vector
public synchronized int size() {};
public synchronized E get(int index) {};
//HashTable 
public synchronized V put(K key, V value) {};
public synchronized V remove(Object key) {};

通過對每個方法添加synchronized,保證了多次操作的串行。這種方式雖然使用起來方便了,但并沒有解決高并發(fā)下的性能問題,與手動鎖住ArrayList和HashMap并沒有什么區(qū)別,不論讀還是寫都會鎖住整個容器。其次這種方式存在另一個問題:當多個線程進行復合操作時,是線程不安全的??梢酝ㄟ^下面的代碼來說明這個問題:

public static void deleteVector(){
 int index = vectors.size() - 1;
 vectors.remove(index);
}

代碼中對Vector進行了兩步操作,首先獲取size,然后移除最后一個元素,多線程情況下如果兩個線程交叉執(zhí)行,A線程調用size后,B線程移除最后一個元素,這時A線程繼續(xù)remove將會拋出索引超出的錯誤。

那么怎么解決這個問題呢?最直接的修改方案就是對代碼塊加鎖來防止多線程同時執(zhí)行:

public static void deleteVector(){
 synchronized (vectors) {
  int index = vectors.size() - 1;
  vectors.remove(index);
 }
}

如果上面的問題通過加鎖來解決沒有太直觀的影響,那么來看看對vectors進行迭代的情況:

public static void foreachVector(){
 synchronized (vectors) {
  for (int i = 0; i < vectors.size(); i++) {
   System.out.println(vectors.get(i).toString());
  }
 }
}

為了避免多線程情況下在迭代的過程中其他線程對vectors進行了修改,就不得不對整個迭代過程加鎖,想象這么一個場景,如果迭代操作非常頻繁,或者vectors元素很大,那么所有的修改和讀取操作將不得不在鎖外等待,這將會對多線程性能造成極大的影響。那么有沒有什么方式能夠很好的對容器的迭代操作和修改操作進行分離,在修改時不影響容器的迭代操作呢?這就需要java.util.concurrent包中的各種并發(fā)容器了出場了。

并發(fā)容器CopyOnWrite

CopyOnWrite--寫時復制容器是一種常用的并發(fā)容器,它通過多線程下讀寫分離來達到提高并發(fā)性能的目的,和前面我們講解StampedLock時所用的解決方案類似:任何時候都可以進行讀操作,寫操作則需要加鎖。不同的是,在CopyOnWrite中,對容器的修改操作加鎖后,通過copy一個新的容器來進行修改,修改完畢后將容器替換為新的容器即可。

這種方式的好處顯而易見:通過copy一個新的容器來進行修改,這樣讀操作就不需要加鎖,可以并發(fā)讀,因為在讀的過程中是采用的舊的容器,即使新容器做了修改對舊容器也沒有影響,同時也很好的解決了迭代過程中其他線程修改導致的并發(fā)問題。

JDK中提供的并發(fā)容器包括CopyOnWriteArrayList和CopyOnWriteArraySet,下面通過CopyOnWriteArrayList的部分源碼來理解這種思想:

//添加元素
public boolean add(E e) {
 //獨占鎖
 final ReentrantLock lock = this.lock;
 lock.lock();
 try {
  Object[] elements = getArray();
  int len = elements.length;
  //復制一個新的數組newElements
  Object[] newElements = Arrays.copyOf(elements, len + 1);
  newElements[len] = e;
  //修改后指向新的數組
  setArray(newElements);
  return true;
 } finally {
  lock.unlock();
 }
}
public E get(int index) {
 //未加鎖,直接獲取
 return get(getArray(), index);
}

代碼很簡單,在add操作中通過一個共享的ReentrantLock來獲取鎖,這樣可以防止多線程下多個線程同時修改容器內容。獲取鎖后通過Arrays.copyOf復制了一個新的容器,然后對新的容器進行了修改,最后直接通過setArray將原數組引用指向了新的數組,避免了在修改過程中迭代數據出現錯誤。get操作由于是讀操作,未加鎖,直接讀取就行。

CopyOnWriteArraySet類似,這里不做過多講解。

CopyOnWrite容器雖然在多線程下使用是安全的,相比較Vector也大大提高了讀寫的性能,但它也有自身的問題。

首先就是性能,在講解ArrayList的文章中提到過,ArrayList的擴容由于使用了Arrays.copyOf每次都需要申請更大的空間以及復制現有的元素到新的數組,對性能存在一定影響。CopyOnWrite容器也不例外,每次修改操作都會申請新的數組空間,然后進行替換。所以在高并發(fā)頻繁修改容器的情況下,會不斷申請新的空間,同時會造成頻繁的GC,這時使用CopyOnWrite容器并不是一個好的選擇。

其次還有一個數據一致性問題,由于在修改中copy了新的數組進行替換,同時舊數組如果還在被使用,那么新的數據就不能被及時讀取到,這樣就造成了數據不一致,如果需要強數據一致性,CopyOnWrite容器也不太適合。

并發(fā)容器ConcurrentHashMap

ConcurrentHashMap容器相較于CopyOnWrite容器在并發(fā)加鎖粒度上有了更大一步的優(yōu)化,它通過修改對單個hash桶元素加鎖的達到了更細粒度的并發(fā)控制。在了解ConcurrentHashMap容器之前,推薦大家先閱讀我之前對HashMap源碼分析的文章--Java集合(5)一 HashMap與HashSet,因為在底層數據結構上,ConcurrentHashMap和HashMap都使用了數組+鏈表+紅黑樹的方式,只是在HashMap的基礎上添加了并發(fā)相關的一些控制,所以這里只對ConcurrentHashMap中并發(fā)相關代碼做一些分析。

Java從同步容器到并發(fā)容器的操作過程

 還是先從ConcurrentHashMap的寫操作開始,這里就是put方法:

final V putVal(K key, V value, boolean onlyIfAbsent) {
 if (key == null || value == null) throw new NullPointerException();
 int hash = spread(key.hashCode()); //計算桶的hash值
 int binCount = 0;
 //循環(huán)插入元素,避免并發(fā)插入失敗
 for (Node<K,V>[] tab = table;;) {
  Node<K,V> f; int n, i, fh;
  if (tab == null || (n = tab.length) == 0)
   tab = initTable();
  else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
   //如果當前桶無元素,則通過cas操作插入新節(jié)點
   if (casTabAt(tab, i, null,
       new Node<K,V>(hash, key, value, null)))
    break;     
  }
  //如果當前桶正在擴容,則協(xié)助擴容
  else if ((fh = f.hash) == MOVED)
   tab = helpTransfer(tab, f);
  else {
   V oldVal = null;
   //hash沖突時鎖住當前需要添加節(jié)點的頭元素,可能是鏈表頭節(jié)點或者紅黑樹的根節(jié)點
   synchronized (f) { 
    if (tabAt(tab, i) == f) {
     if (fh >= 0) {
      binCount = 1;
      for (Node<K,V> e = f;; ++binCount) {
       K ek;
       if (e.hash == hash &&
        ((ek = e.key) == key ||
         (ek != null && key.equals(ek)))) {
        oldVal = e.val;
        if (!onlyIfAbsent)
         e.val = value;
        break;
       }
       Node<K,V> pred = e;
       if ((e = e.next) == null) {
        pred.next = new Node<K,V>(hash, key,
               value, null);
        break;
       }
      }
     }
     else if (f instanceof TreeBin) {
      Node<K,V> p;
      binCount = 2;
      if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
              value)) != null) {
       oldVal = p.val;
       if (!onlyIfAbsent)
        p.val = value;
      }
     }
    }
   }
   if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
     treeifyBin(tab, i);
    if (oldVal != null)
     return oldVal;
    break;
   }
  }
 }
 addCount(1L, binCount);
 return null;
}

在put元素的過程中,有幾個并發(fā)處理的關鍵點:

•如果當前桶對應的節(jié)點還沒有元素插入,通過典型的無鎖cas操作嘗試插入新節(jié)點,減少加鎖的概率,并發(fā)情況下如果插入不成功,很容易想到自旋,也就是for (Node<K,V>[] tab = table;;)。

•如果當前桶正在擴容,則協(xié)助擴容((fh = f.hash) == MOVED)。這里是一個重點,ConcurrentHashMap的擴容和HashMap不一樣,它在多線程情況下或使用多個線程同時擴容,每個線程擴容指定的一部分hash桶,當前線程擴容完指定桶之后會繼續(xù)獲取下一個擴容任務,直到擴容全部完成。擴容的大小和HashMap一樣,都是翻倍,這樣可以有效減少移動的元素數量,也就是使用2的冪次方的原因,在HashMap中也一樣。

•在發(fā)生hash沖突時僅僅只鎖住當前需要添加節(jié)點的頭元素即可,可能是鏈表頭節(jié)點或者紅黑樹的根節(jié)點,其他桶節(jié)點都不需要加鎖,大大減小了鎖粒度。

通過ConcurrentHashMap添加元素的過程,知道了ConcurrentHashMap容器是通過CAS + synchronized一起來實現并發(fā)控制的。這里有個額外的問題:為什么使用synchronized而不使用ReentrantLock?前面我的文章也對synchronized以及ReentrantLock的實現方式和性能做過分析,在這里我的理解是synchronized在后期優(yōu)化空間上比ReentrantLock更大。

并發(fā)容器ConcurrentSkipListMap

java.util中對應的容器在java.util.concurrent包中基本都可以找到對應的并發(fā)容器:List和Set有對應的CopyOnWriteArrayList與CopyOnWriteArraySet,HashMap有對應的ConcurrentHashMap,但是有序的TreeMap或并沒有對應的ConcurrentTreeMap。

為什么沒有ConcurrentTreeMap呢?這是因為TreeMap內部使用了紅黑樹來實現,紅黑樹是一種自平衡的二叉樹,當樹被修改時,需要重新平衡,重新平衡操作可能會影響樹的大部分節(jié)點,如果并發(fā)量非常大的情況下,這就需要在許多樹節(jié)點上添加互斥鎖,那并發(fā)就失去了意義。所以提供了另外一種并發(fā)下的有序map實現:ConcurrentSkipListMap。

ConcurrentSkipListMap內部使用跳表(SkipList)這種數據結構來實現,他的結構相對紅黑樹來說非常簡單理解,實現起來也相對簡單,而且在理論上它的查找、插入、刪除時間復雜度都為log(n)。在并發(fā)上,ConcurrentSkipListMap采用無鎖的CAS+自旋來控制。

跳表簡單來說就是一個多層的鏈表,底層是一個普通的鏈表,然后逐層減少,通常通過一個簡單的算法實現每一層元素是下一層的元素的二分之一,這樣當搜索元素時從最頂層開始搜索,可以說是另一種形式的二分查找。

一個簡單的獲取跳表層數概率算法實現如下:

int random_level() { 
 K = 1; 
 while (random(0,1)) 
  K++; 
 
 return K; 
} 

通過簡單的0和1獲取概率,1層的概率為50%,2層的概率為25%,3層的概率為12.5%,這樣逐級遞減。

一個三層的跳表添加元素的過程如下:

Java從同步容器到并發(fā)容器的操作過程

 插入值為15的節(jié)點:

Java從同步容器到并發(fā)容器的操作過程

 插入后:

Java從同步容器到并發(fā)容器的操作過程

 維基百科中有一個添加節(jié)點的動圖,這里也貼出來方便理解:

Java從同步容器到并發(fā)容器的操作過程

通過分析ConcurrentSkipListMap的put方法來理解跳表以及CAS自旋并發(fā)控制:

private V doPut(K key, V value, boolean onlyIfAbsent) {
 Node<K,V> z;    // added node
 if (key == null)
  throw new NullPointerException();
 Comparator<? super K> cmp = comparator;
 outer: for (;;) {
  for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前繼節(jié)點
   if (n != null) { //查找到前繼節(jié)點
    Object v; int c;
    Node<K,V> f = n.next; //獲取后繼節(jié)點的后繼節(jié)點
    if (n != b.next) //發(fā)生競爭,兩次節(jié)點獲取不一致,并發(fā)導致
     break;
    if ((v = n.value) == null) { // 節(jié)點已經被刪除
     n.helpDelete(b, f);
     break;
    }
    if (b.value == null || v == n) 
     break;
    if ((c = cpr(cmp, key, n.key)) > 0) { //進行下一輪查找,比當前key大
     b = n;
     n = f;
     continue;
    }
    if (c == 0) { //相等時直接cas修改值
     if (onlyIfAbsent || n.casValue(v, value)) {
      @SuppressWarnings("unchecked") V vv = (V)v;
      return vv;
     }
     break; // restart if lost race to replace value
    }
    // else c < 0; fall through
   }
   z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
   if (!b.casNext(n, z)) //cas修改值 
    break;   // restart if lost race to append to b
   break outer;
  }
 }
 int rnd = ThreadLocalRandom.nextSecondarySeed(); //獲取隨機數
 if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
  int level = 1, max;
  while (((rnd >>>= 1) & 1) != 0) // 獲取跳表層級
   ++level;
  Index<K,V> idx = null;
  HeadIndex<K,V> h = head;
  if (level <= (max = h.level)) { //如果獲取的調表層級小于等于當前最大層級,則直接添加,并將它們組成一個上下的鏈表
   for (int i = 1; i <= level; ++i)
    idx = new Index<K,V>(z, idx, null);
  }
  else { // try to grow by one level //否則增加一層level,在這里體現為Index<K,V>數組
   level = max + 1; // hold in array and later pick the one to use
   @SuppressWarnings("unchecked")Index<K,V>[] idxs =
    (Index<K,V>[])new Index<?,?>[level+1];
   for (int i = 1; i <= level; ++i)
    idxs[i] = idx = new Index<K,V>(z, idx, null);
   for (;;) {
    h = head;
    int oldLevel = h.level;
    if (level <= oldLevel) // lost race to add level
     break;
    HeadIndex<K,V> newh = h;
    Node<K,V> oldbase = h.node;
    for (int j = oldLevel+1; j <= level; ++j) //新添加的level層的具體數據
     newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
    if (casHead(h, newh)) {
     h = newh;
     idx = idxs[level = oldLevel];
     break;
    }
   }
  }
  // 逐層插入數據過程
  splice: for (int insertionLevel = level;;) {
   int j = h.level;
   for (Index<K,V> q = h, r = q.right, t = idx;;) {
    if (q == null || t == null)
     break splice;
    if (r != null) {
     Node<K,V> n = r.node;
     // compare before deletion check avoids needing recheck
     int c = cpr(cmp, key, n.key);
     if (n.value == null) {
      if (!q.unlink(r))
       break;
      r = q.right;
      continue;
     }
     if (c > 0) {
      q = r;
      r = r.right;
      continue;
     }
    }
    if (j == insertionLevel) {
     if (!q.link(r, t))
      break; // restart
     if (t.node.value == null) {
      findNode(key);
      break splice;
     }
     if (--insertionLevel == 0)
      break splice;
    }
    if (--j >= insertionLevel && j < level)
     t = t.down;
    q = q.down;
    r = q.right;
   }
  }
 }
 return null;
}

這里的插入方法很復雜,可以分為3大步來理解:第一步獲取前繼節(jié)點后通過CAS來插入節(jié)點;第二步對level層數進行判斷,如果大于最大層數,則插入一層;第三步插入對應層的數據。整個插入過程全部通過CAS自旋的方式保證并發(fā)情況下的數據正確性。

總結

JDK中提供了豐富的并發(fā)容器供我們使用,文章中介紹的也并不全面,重點是要通過了解各種并發(fā)容器的原理,明白他們各自獨特的使用場景。這里簡單做個總結:當并發(fā)讀遠多于修改的場景下需要使用List和Set時,可以考慮使用CopyOnWriteArrayList和CopyOnWriteArraySet;當需要并發(fā)使用<Key, Value>鍵值對存取數據時,可以使用ConcurrentHashMap;當要保證并發(fā)<Key, Value>鍵值對有序時可以使用ConcurrentSkipListMap。

以上所述是小編給大家介紹的Java從同步容器到并發(fā)容器的操作過程,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對億速云網站的支持!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI