您好,登錄后才能下訂單哦!
這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)JAVA并發(fā)容器ConcurrentHashMap 1.7和1.8 源碼怎么寫,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
HashMap是一個線程不安全的類,在并發(fā)情況下會產(chǎn)生很多問題,詳情可以參考HashMap 源碼解析;HashTable是線程安全的類,但是它使用的是synchronized來保證線程安全,線程競爭激烈的情況下效率非常低下。在jdk1.5的時候引入了ConcurrentHashMap,這也是一個線程安全的類,它使用了分段鎖的技術(shù)來提升并發(fā)訪問效率。
HashTable容器在競爭激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因是所有訪問HashTable的 線程都必須競爭同一把鎖,假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù),那么 當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時,線程間就不會存在鎖競爭,從而可以有效提高并 發(fā)訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術(shù)。
在jdk1.7及以前ConcurrentHashMap采用Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成,之后采用的是和HashMap一樣的結(jié)構(gòu)。
采用Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成,Segment數(shù)組的大小就是ConcurrentHashMap的并發(fā)度。Segment繼承自ReentrantLock,所以他本身就是一個鎖。Segment數(shù)組一旦初始化后就不會再進(jìn)行擴(kuò)容,這也是jdk1.8去掉他的原因。Segment里面又包含了一個table數(shù)組,這個數(shù)組是可以擴(kuò)容的。
如圖我們在定位數(shù)據(jù)的時候需要對key的hash值進(jìn)行兩次尋址操作,第一次找到在Segment數(shù)組的位置,第二次找到在table數(shù)組中的位置。
// 直接繼承自ReentrantLock,所以一個Segment本身就是一個鎖 static final class Segment<K,V> extends ReentrantLock implements Serializable { ... // table數(shù)組 transient volatile HashEntry<K,V>[] table; // 一個Segment內(nèi)的元素個數(shù) transient int count; // 擴(kuò)容閾值 transient int threshold; // 擴(kuò)容因子 final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } ...
我們發(fā)現(xiàn)Segment直接繼承自ReentrantLock,所以一個Segment本身就是一個鎖。所以Segment數(shù)組的長度大小直接影響了ConcurrentHashMap的并發(fā)度。還有每個Segment單獨(dú)維護(hù)了擴(kuò)容閾值,擴(kuò)容因子,所以每個Segment的擴(kuò)容操作時完全獨(dú)立互不干擾的。
static final class HashEntry<K,V> { // 不可變 final int hash; final K key; // volatile保證可見性,這樣我們在get操作時就不用加鎖了 volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } ... }
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { // 參數(shù)校驗(yàn) if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 并發(fā)度控制,最大是65536 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments // 等于ssize從1向左移位的 次數(shù) int sshift = 0; int ssize = 1; // 找出最接近c(diǎn)oncurrencyLevel的2的n次冪的數(shù)值 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 這里之所 以用32是因?yàn)镃oncurrentHashMap里的hash()方法輸出的最大數(shù)是32位的 this.segmentShift = 32 - sshift; // 散列運(yùn)算的掩碼,等于ssize減1 this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 里HashEntry數(shù)組的長度度,它等于initialCapacity除以ssize的倍數(shù)c,如果c大于1,就會取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。 int cap = MIN_SEGMENT_TABLE_CAPACITY; // 保證HashEntry數(shù)組大小一定是2的n次冪 while (cap < c) cap <<= 1; // create segments and segments[0] // 初始化Segment數(shù)組,并實(shí)際只填充Segment數(shù)組的第0個元素。 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
通過代碼我們可以看出,在構(gòu)造ConcurrentHashMap的時候我們就會完成以下件事情:
確認(rèn)ConcurrentHashMap的并發(fā)度,也就是Segment數(shù)組長度,并保證它是2的n次冪
確認(rèn)HashEntry數(shù)組的初始化長度,并保證它是2的n次冪
將Segment數(shù)組初始化好并且只填充第0個元素
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 1. 先獲取key的hash值 int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 2. 定位到Segment s = ensureSegment(j); // 3.調(diào)用Segment的put方法 return s.put(key, hash, value, false); }
主要流程是:
先獲取key的hash值
定位到Segment
調(diào)用Segment的put方法
private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash42((String) k); } h ^= k.hashCode(); // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
這個方法大致思路是:先拿到key的hashCode,然后對這個值進(jìn)行再散列。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 1. 加鎖 HashEntry<K,V> node = tryLock() ? null : // scanAndLockForPut在沒有獲取到鎖的情況下,去查詢key是否存在,如果不存在就新建一個Node scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; // 確定元素在tabl數(shù)組上的位置 int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; // 如果原來位置上有值并且key相同,那么直接替換原來的value if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); // 元素總數(shù)加一 int c = count + 1; // 判斷是否需要擴(kuò)容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
大致過程是:
加鎖
定位key在tabl數(shù)組上的索引位置index
,獲取到頭結(jié)點(diǎn)
判斷是否有hash沖突
如果沒有沖突直接將新節(jié)點(diǎn)node
添加到數(shù)組index
索引位
如果有沖突,先判斷是否有相同key
有相同key直接替換對應(yīng)node的value值
沒有添加新元素到鏈表尾部
解鎖
這里需要注意的是scanAndLockForPut方法,他在沒有獲取到鎖的時候不僅會通過自旋獲取鎖,還會做一些其他的查找或新增節(jié)點(diǎn)的工,以此來提升put性能。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //定位HashEntry數(shù)組位置,獲取第一個節(jié)點(diǎn) HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; //掃描次數(shù),循環(huán)標(biāo)記位 int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below // 表示遍歷鏈表還沒有結(jié)束 if (retries < 0) { if (e == null) { if (node == null) // speculatively create node // 完成新節(jié)點(diǎn)初始化 node = new HashEntry<K,V>(hash, key, value, null); // 完成鏈表的遍歷,還是沒有找到相同key的節(jié)點(diǎn) retries = 0; } // 有hash沖突,開始查找是否有相同的key else if (key.equals(e.key)) retries = 0; else e = e.next; } // 斷循環(huán)次數(shù)是否大于最大掃描次數(shù) else if (++retries > MAX_SCAN_RETRIES) { // 自旋獲取鎖 lock(); break; } // 每間隔一次循環(huán),檢查一次first節(jié)點(diǎn)是否改變 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { // 首節(jié)點(diǎn)有變動,更新first,重新掃描 e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
scanAndLockForPut方法在當(dāng)前線程獲取不到segment鎖的情況下,完成查找或新建節(jié)點(diǎn)的工作。當(dāng)獲取到鎖后直接將該節(jié)點(diǎn)加入鏈表即可,提升了put操作的性能。大致過程:
定位key在HashEntry數(shù)組的索引位,并獲取第一個節(jié)點(diǎn)
嘗試獲取鎖,如果成功直接返回,否則進(jìn)入自旋
判斷是否有hash沖突,沒有就直接完成新節(jié)點(diǎn)的初始化
有hash沖突,開始遍歷鏈表查找是否有相同key
如果沒找到相同key,那么就完成新節(jié)點(diǎn)的初始化
如果找到相同key,判斷循環(huán)次數(shù)是否大于最大掃描次數(shù)
如果循環(huán)次數(shù)是否大于最大掃描次數(shù),就直接CAS拿鎖(阻塞式)
如果循環(huán)次數(shù)不大于最大掃描次數(shù),判斷頭結(jié)點(diǎn)是否有變化
進(jìn)入下次循環(huán)
private void rehash(HashEntry<K,V> node) { // 復(fù)制老數(shù)組 HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // table數(shù)組擴(kuò)容2倍 int newCapacity = oldCapacity << 1; // 擴(kuò)容閾值也增加兩倍 threshold = (int)(newCapacity * loadFactor); // 創(chuàng)建新數(shù)組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 計算新的掩碼 int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 計算新的索引位 int idx = e.hash & sizeMask; // 轉(zhuǎn)移數(shù)據(jù) if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 將新的節(jié)點(diǎn)加到對應(yīng)索引位 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
在這里我們可以發(fā)現(xiàn)每次擴(kuò)容是針對一個單獨(dú)的Segment的,在擴(kuò)容完成之前中不會對擴(kuò)容前的數(shù)組進(jìn)行修改,這樣就可以保證get()
不被擴(kuò)容影響。大致過程是:
新建擴(kuò)容后的數(shù)組,容量是原來的兩倍
遍歷擴(kuò)容前的數(shù)組
通過e.hash & sizeMask;
計算key新的索引位
轉(zhuǎn)移數(shù)據(jù)
將擴(kuò)容后的數(shù)組指向成員變量table
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); // 計算出Segment的索引位 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 以原子的方式獲取Segment if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // 原子方式獲取HashEntry for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; // key相同 if ((k = e.key) == key || (e.hash == h && key.equals(k))) // value是volatile所以可以不加鎖直接取值返回 return e.value; } } return null; }
我們可以看到get方法是沒有加鎖的,因?yàn)镠ashEntry的value和next屬性是volatile的,volatile直接保證了可見性,所以讀的時候可以不加鎖。Java中Unsafe類可以參考這篇博客。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; // true表示size溢出32位(大于Integer.MAX_VALUE) boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { // retries 如果retries等于2則對所有Segment加鎖 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; // 統(tǒng)計每個Segment元素個數(shù) for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { // 解鎖 if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } // 如果size大于Integer.MAX_VALUE值則直接返貨Integer.MAX_VALUE return overflow ? Integer.MAX_VALUE : size; }
size的核心思想是先進(jìn)性兩次不加鎖統(tǒng)計,如果兩次的值一樣則直接返回,否則第三個統(tǒng)計的時候會將所有segment全部鎖定,再進(jìn)行size統(tǒng)計,所以size()盡量少用。因?yàn)檫@是在并發(fā)情況下,size其他線程也會改變size大小,所以size()的返回值只能表示當(dāng)前線程、當(dāng)時的一個狀態(tài),可以算其實(shí)是一個預(yù)估值。
public boolean isEmpty() { long sum = 0L; final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { // 只要有一個Segment的元素個數(shù)不為0則表示不為null if (seg.count != 0) return false; // 統(tǒng)計操作總數(shù) sum += seg.modCount; } } if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } // 說明在統(tǒng)計過程中ConcurrentHashMap又被操作過, // 因?yàn)樯厦媾袛嗔薈oncurrentHashMap不可能會有元素,所以這里如果有操作一定是新增節(jié)點(diǎn) if (sum != 0L) return false; } return true; }
先判斷Segment里面是否有元素,如果有直接返回,如果沒有則統(tǒng)計操作總數(shù);
為了保證在統(tǒng)計過程中ConcurrentHashMap里面的元素沒有發(fā)生變化,再對所有的Segment的操作數(shù)做了統(tǒng)計;
最后 sum==0 表示ConcurrentHashMap里面確實(shí)沒有元素返回true,否則一定進(jìn)行過新增元素返回false。
和size方法一樣這個方法也是一個若一致方法,最后的結(jié)果也是一個預(yù)估值。
這個結(jié)構(gòu)和HashMap一樣
//最大容量 private static final int MAXIMUM_CAPACITY = 1 << 30; //初始容量 private static final int DEFAULT_CAPACITY = 16; //數(shù)組最大容量 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //默認(rèn)并發(fā)度,兼容1.7及之前版本 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //加載/擴(kuò)容因子,實(shí)際使用n - (n >>> 2) private static final float LOAD_FACTOR = 0.75f; //鏈表轉(zhuǎn)紅黑樹的節(jié)點(diǎn)數(shù)閥值 static final int TREEIFY_THRESHOLD = 8; //紅黑樹轉(zhuǎn)鏈表的節(jié)點(diǎn)數(shù)閥值 static final int UNTREEIFY_THRESHOLD = 6; //當(dāng)數(shù)組長度還未超過64,優(yōu)先數(shù)組的擴(kuò)容,否則將鏈表轉(zhuǎn)為紅黑樹 static final int MIN_TREEIFY_CAPACITY = 64; //擴(kuò)容時任務(wù)的最小轉(zhuǎn)移節(jié)點(diǎn)數(shù) private static final int MIN_TRANSFER_STRIDE = 16; //sizeCtl中記錄stamp的位數(shù) private static int RESIZE_STAMP_BITS = 16; //幫助擴(kuò)容的最大線程數(shù) private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //size在sizeCtl中的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // ForwardingNode標(biāo)記節(jié)點(diǎn)的hash值(表示正在擴(kuò)容) static final int MOVED = -1; // hash for forwarding nodes // TreeBin節(jié)點(diǎn)的hash值,它是對應(yīng)桶的根節(jié)點(diǎn) static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash //存放Node元素的數(shù)組,在第一次插入數(shù)據(jù)時初始化 transient volatile Node<K,V>[] table; //一個過渡的table表,只有在擴(kuò)容的時候才會使用 private transient volatile Node<K,V>[] nextTable; //基礎(chǔ)計數(shù)器值(size = baseCount + CounterCell[i].value) private transient volatile long baseCount; /** * 控制table數(shù)組的初始化和擴(kuò)容,不同的值有不同的含義: * -1:表示正在初始化 * -n:表示正在擴(kuò)容 * 0:表示還未初始化,默認(rèn)值 * 大于0:表示下一次擴(kuò)容的閾值 */ private transient volatile int sizeCtl; //節(jié)點(diǎn)轉(zhuǎn)移時下一個需要轉(zhuǎn)移的table索引 private transient volatile int transferIndex; //元素變化時用于控制自旋 private transient volatile int cellsBusy; // 保存table中的每個節(jié)點(diǎn)的元素個數(shù) 長度是2的冪次方,初始化是2,每次擴(kuò)容為原來的2倍 // size = baseCount + CounterCell[i].value private transient volatile CounterCell[] counterCells;
其他的參考HashMap 源碼解析
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } ...
鏈表節(jié)點(diǎn),保存著key和value的值。
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } ...
紅黑樹節(jié)點(diǎn),包含了樹的信息。
TreeBins中使用的節(jié)點(diǎn)
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; // 鎖的持有者 volatile Thread waiter; // 鎖狀態(tài) volatile int lockState; // values for lockState // 表示持有寫鎖 static final int WRITER = 1; // set while holding write lock // 表示等待 static final int WAITER = 2; // set when waiting for write lock // 表示讀鎖的增量值 static final int READER = 4; // increment value for setting read lock ...
與HashMap有點(diǎn)區(qū)別的是,他不直接使用TreeNode作為數(shù)的根節(jié)點(diǎn),而是使用TreeBins對其做了裝飾后成為了根節(jié)點(diǎn);同時它還記錄了鎖的狀態(tài);需要注意的是:
TreeBins節(jié)點(diǎn)的hash值是 -2
我們對紅黑樹添加節(jié)點(diǎn)后,紅黑樹的根節(jié)點(diǎn)有可能會因?yàn)樾D(zhuǎn)而發(fā)生變化,所以我們在添加樹節(jié)點(diǎn)的時候在putTreeVal()
方法里面我們使用cas在加了一次鎖。
/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; // 1. 判斷新的數(shù)組是否是null, // 2. 如果不為NULL給那就找到對應(yīng)索引位上的頭結(jié)點(diǎn) // 3. 判斷頭節(jié)點(diǎn)是否為NULL if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; // 自旋找節(jié)點(diǎn) for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { // 如果又變成了ForwardingNode標(biāo)記節(jié)點(diǎn),那說明有發(fā)生了擴(kuò)容,需要跳出循環(huán)從新查找 if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
ForwardingNode 節(jié)點(diǎn)是一個擴(kuò)容標(biāo)記節(jié)點(diǎn),只要在數(shù)組上發(fā)現(xiàn)對應(yīng)索引位上是ForwardingNode 節(jié)點(diǎn)時,表示正在擴(kuò)容。當(dāng)get方法調(diào)用時,如果遇到ForwardingNode 節(jié)點(diǎn),那么它將會到擴(kuò)容后的數(shù)據(jù)上查找數(shù)據(jù),否則還是在擴(kuò)容前的數(shù)組上查找數(shù)據(jù)。這個要注意兩點(diǎn):
這個節(jié)點(diǎn)的hash值是 -1
這個節(jié)點(diǎn)的find方法是在對擴(kuò)容后的數(shù)組進(jìn)行查找
public ConcurrentHashMap18() { }
與HashMap一樣,構(gòu)造函數(shù)啥都沒干,初始化操作是在第一次put完成的。
public V put(K key, V value) { return putVal(key, value, false); }
啥都么有
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
計算key.hashCode()并將更高位的散列擴(kuò)展(XOR)降低。采用位運(yùn)算主要是是加快計算速度。
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 計算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 判斷是否需要初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 找出key對應(yīng)的索引位上的第一個節(jié)點(diǎn) else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果該索引位為null,則直接將數(shù)據(jù)放到該索引位 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 正在擴(kuò)容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 加內(nèi)置鎖鎖定一個數(shù)組的索引位,并添加節(jié)點(diǎn) synchronized (f) { if (tabAt(tab, i) == f) { // 表示鏈表節(jié)點(diǎn) if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // key相同直接替換value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 將新節(jié)點(diǎn)添加到鏈表尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 表示樹節(jié)點(diǎn) else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; // 添加數(shù)節(jié)點(diǎn) if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 嘗試將鏈表轉(zhuǎn)換成紅黑樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
主要流程:
計算key的hash值
判斷是否需要初始化,如果是則調(diào)用initTable()
方法完成初始化
判斷是否有hash沖突,如沒有直接設(shè)置新節(jié)點(diǎn)到對飲索引位,如果有獲取頭結(jié)點(diǎn)
根據(jù)頭結(jié)點(diǎn)的hash值判斷是否正在擴(kuò)容,如果是則幫助擴(kuò)容
如果沒有擴(kuò)容則對頭結(jié)點(diǎn)加鎖,添加新節(jié)點(diǎn)
fh >= 0
根據(jù)頭結(jié)點(diǎn)hash值判斷是否是鏈表節(jié)點(diǎn),如果是新增鏈表節(jié)點(diǎn),否則新增樹節(jié)點(diǎn)
新增樹節(jié)點(diǎn)putTreeVal()
需要注意,紅黑樹的根節(jié)點(diǎn)有可能會因?yàn)樾D(zhuǎn)而發(fā)生變化,所以我們在添加節(jié)點(diǎn)的時候還需要對根節(jié)點(diǎn)使用cas在加了一次鎖。
判斷是否需要嘗試由鏈表轉(zhuǎn)換成樹結(jié)構(gòu)
addCount(1L, binCount);
新增count數(shù),并判斷是否需要擴(kuò)容或者幫助擴(kuò)容
-1:表示正在初始化
-n:表示正在擴(kuò)容
0:表示還未初始化,默認(rèn)值
大于0:表示下一次擴(kuò)容的閾值
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 正在初始化 if ((sc = sizeCtl) < 0) // 讓出CPU執(zhí)行權(quán),然后自旋 Thread.yield(); // lost initialization race; just spin // CAS替換標(biāo)志位(相當(dāng)于獲取鎖) else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 二次判斷 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 相當(dāng)于sc=n*3/4 sc = n - (n >>> 2); } } finally { // 擴(kuò)容閾值 sizeCtl = sc; } break; } } return tab; }
主要過程:
根據(jù)sizeCtl判斷是否正在初始化
如果其他線程正在初始化就讓出CPU執(zhí)行權(quán),進(jìn)入下一次CPU執(zhí)行權(quán)的競爭Thread.yield();
如果沒有進(jìn)行初始化的線程則,CAS替換sizeCtl標(biāo)志位(相當(dāng)于獲取鎖)
獲取到鎖后再次判斷是否初始化
如果沒有則初始化Node數(shù)組,并設(shè)置sizeCtl值為下一次擴(kuò)容閾值
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // ForwardingNode標(biāo)記節(jié)點(diǎn),表示正在擴(kuò)容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
判斷是否正在擴(kuò)容,如果是就幫助擴(kuò)容。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {\ // n原來數(shù)組長度 int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 判斷是發(fā)起擴(kuò)容的線程還是幫助擴(kuò)容的線程,如果是發(fā)起擴(kuò)容的需要初始化新數(shù)組 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; // 擴(kuò)容期間的數(shù)據(jù)節(jié)點(diǎn)(用于標(biāo)志位,hash值是-1) ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 當(dāng)advance == true時,表明該節(jié)點(diǎn)已經(jīng)處理過了 boolean advance = true; // 在擴(kuò)容完成之前保證get不被影響 boolean finishing = false; // to ensure sweep before committing nextTab // 1. 從右往左找到第一個有數(shù)據(jù)的索引位節(jié)點(diǎn)(有hash沖突的桶) // 2. 如果找到的節(jié)點(diǎn)是NULL節(jié)點(diǎn)(沒有hash沖突的節(jié)點(diǎn)),那么將該索引位的NULL替換成ForwardingNode標(biāo)記節(jié)點(diǎn),這個節(jié)點(diǎn)的hash是-1 // 3. 如果找到不為NULL的節(jié)點(diǎn)(有hash沖突的桶),則對這個節(jié)點(diǎn)進(jìn)行加鎖 // 4. 開始進(jìn)進(jìn)移動節(jié)點(diǎn)數(shù)據(jù) for (int i = 0, bound = 0;;) { //f:當(dāng)前處理i位置的node(頭結(jié)點(diǎn)或者根節(jié)點(diǎn)); Node<K,V> f; int fh; // 通過while循環(huán)獲取本次需要移動的節(jié)點(diǎn)索引i while (advance) { // nextIndex:下一個要處理的節(jié)點(diǎn)索引; nextBound:下一個需要處理的節(jié)點(diǎn)的索引邊界 int nextIndex, nextBound; // i是老數(shù)組索引位,通過--i來講索引位往前一個索引位移動,直到0索引位 if (--i >= bound || finishing) advance = false; // 節(jié)點(diǎn)已全部轉(zhuǎn)移 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // transferIndex(初值為最后一個節(jié)點(diǎn)的索引),表示從transferIndex開始后面所有的節(jié)點(diǎn)都已分配, // 每次線程領(lǐng)取擴(kuò)容任務(wù)后,需要更新transferIndex的值(transferIndex-stride)。 // CAS修改transferIndex,并更新索引邊界 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; // 老數(shù)組最后一個索引位置 i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; // 已經(jīng)完成所有節(jié)點(diǎn)復(fù)制了 if (finishing) { nextTable = null; table = nextTab; // sizeCtl閾值為原來的1.5倍 sizeCtl = (n << 1) - (n >>> 1); // 結(jié)束自旋 return; } // CAS 更新擴(kuò)容閾值,在這里面sizectl值減一,說明新加入一個線程參與到擴(kuò)容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // 將以前老數(shù)組上為NULL的節(jié)點(diǎn)(還沒有元素的桶或者說成沒有hash沖突的數(shù)據(jù)節(jié)點(diǎn)),用ForwardingNode標(biāo)記節(jié)點(diǎn)補(bǔ)齊 // 主要作用是:其他線程在put元素,發(fā)現(xiàn)找到的索引位是fwd節(jié)點(diǎn)則表示正在擴(kuò)容,那么該線程會來幫助擴(kuò)通,而不是在那里等待 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 表示處理過該節(jié)點(diǎn)了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 對應(yīng)索引位加鎖 synchronized (f) { // 再次校驗(yàn)一下老數(shù)組對應(yīng)索引位節(jié)點(diǎn)是否是我們找到的節(jié)點(diǎn)f if (tabAt(tab, i) == f) { // 低索引位頭節(jié)點(diǎn)(i位), 高位索引位頭節(jié)點(diǎn)(i+tab.length) Node<K,V> ln, hn; // fh >=0 表示鏈表節(jié)點(diǎn),TreeBin節(jié)點(diǎn)的hash值-2 if (fh >= 0) { // fh & n算法可以算出新的節(jié)點(diǎn)該分配到那個索引位(runBit要么為0放低位ln,要么為n放高位hn), // runBit表示鏈表中最后一個元素的hash值&n的值 int runBit = fh & n; // lastRun表示鏈表中最后一個元素 Node<K,V> lastRun = f; // 找到鏈表中最后一個節(jié)點(diǎn),并賦值給lastRun for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 判斷原來的最后一個節(jié)點(diǎn)應(yīng)該添加到高位還是低位 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // f表示頭結(jié)點(diǎn),如果p不是尾節(jié)點(diǎn),則轉(zhuǎn)移節(jié)點(diǎn) // 如果以前節(jié)點(diǎn)順序是 1 2 3 4 轉(zhuǎn)移后就是 3 2 1 4 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) // 轉(zhuǎn)移節(jié)點(diǎn)時都是新建節(jié)點(diǎn),以免破壞原來數(shù)組結(jié)構(gòu)影響get方法 ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 設(shè)置新數(shù)組低索引位頭節(jié)點(diǎn)(i位) setTabAt(nextTab, i, ln); // 設(shè)置新數(shù)組高位索引位頭節(jié)點(diǎn)(i+tab.length) setTabAt(nextTab, i + n, hn); // 設(shè)置老數(shù)組i位為標(biāo)記節(jié)點(diǎn),表示已經(jīng)處理過了 setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 設(shè)置新數(shù)組低索引位頭節(jié)點(diǎn)(i位) setTabAt(nextTab, i, ln); // 設(shè)置新數(shù)組高位索引位頭節(jié)點(diǎn)(i+tab.length) setTabAt(nextTab, i + n, hn); // 設(shè)置老數(shù)組i位為標(biāo)記節(jié)點(diǎn),表示已經(jīng)處理過了 setTabAt(tab, i, fwd); advance = true; } } } } } }
主要過程:
tab
為擴(kuò)容前的數(shù)組
判斷是否是第一個發(fā)起擴(kuò)容的線程,如果是需要初始化擴(kuò)容后的數(shù)組nextTable
fwd = new ForwardingNode<K,V>(nextTab)
初始化擴(kuò)容標(biāo)記節(jié)點(diǎn)
進(jìn)入擴(kuò)容循環(huán)
在擴(kuò)容前的數(shù)組tab
上從右往左(從高索引位到低索引位)遍歷所有頭結(jié)點(diǎn),索引位為i
如果找到的頭結(jié)點(diǎn)是NULL(沒有hash沖突)則tab[i]=fwd
。
找到的頭結(jié)點(diǎn)不為NULL則(有hash沖突)則鎖定頭結(jié)點(diǎn)synchronized (f)
再次校驗(yàn)頭結(jié)點(diǎn)是否發(fā)生改變,如果改變直接結(jié)束
初始化高索引位和第索引位的頭結(jié)點(diǎn)
移動節(jié)點(diǎn)到相應(yīng)索引位
設(shè)置擴(kuò)容后的數(shù)組低索引位頭節(jié)點(diǎn)(i位)
設(shè)置擴(kuò)容后的數(shù)組高位索引位頭節(jié)點(diǎn)(i+tab.length)
設(shè)置擴(kuò)容前的數(shù)組i位為標(biāo)記節(jié)點(diǎn)(tab[i]=fwd
),表示已經(jīng)處理過了
進(jìn)入第3步直到完成
注意:
第5點(diǎn)有
tab[i]=fwd
有兩層含義:1,表示對應(yīng)索引位已經(jīng)處理過了;2,當(dāng)其他線程拿到該頭結(jié)點(diǎn)的時候能知曉正在擴(kuò)容,這時在put的時候幫助擴(kuò)容,在get的時候去擴(kuò)容后的數(shù)組上找相應(yīng)的key
int runBit = fh & n;
算法可以算出新的節(jié)點(diǎn)該分配到那個索引位(runBit要么為0放低位ln,要么為n放高位hn)如果是鏈表節(jié)點(diǎn),以前節(jié)點(diǎn)順序是 1 2 3 4 擴(kuò)容后會變成 3 2 1 4
擴(kuò)容的大致過程圖解:
發(fā)起擴(kuò)容,擴(kuò)容前數(shù)組tab
在擴(kuò)容前的數(shù)組tab
上從右往左(從高索引位到低索引位)遍歷所有頭結(jié)點(diǎn),索引位為i
,如果找到的頭結(jié)點(diǎn)是NULL則直接賦值成````fwd``` 標(biāo)記節(jié)點(diǎn)。
3. 擴(kuò)容前的數(shù)組上找到不為NULL的節(jié)點(diǎn),則還是移動節(jié)點(diǎn)到擴(kuò)容后的額數(shù)組
private final void addCount(long x, int check) { // CounterCell[] as;使用計數(shù)器數(shù)組因該是為了提升并發(fā)量,減小沖突概率 CounterCell[] as; long b, s; // 計數(shù)器表不為NULL(counterCells當(dāng)修改baseCount有沖突時,需要將size增量放到這個計數(shù)器數(shù)組里面) if ((as = counterCells) != null || // 使用CAS更新baseCount的值(+1)如果失敗說明存在競爭 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // CounterCell是否存在競爭的標(biāo)記位 boolean uncontended = true; // CounterCell[] as為NULL表示as沒有競爭 if (as == null || (m = as.length - 1) < 0 || // 隨機(jī)一個數(shù)組位置來驗(yàn)證是否為NULL,如果a是null表示沒有競爭,隨機(jī)也是為了減小沖突概率 (a = as[ThreadLocalRandom.getProbe() & m]) == null || // CAS替換a的value,如果失敗表示存在競爭 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 將size增量值存到as上 fullAddCount(x, uncontended); return; } if (check <= 1) return; // 統(tǒng)計size s = sumCount(); } // 檢查是否需要擴(kuò)容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // size大于閾值sizeCtl,tab數(shù)組長度小于最大值1<<30 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // 表示正在擴(kuò)容 if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 幫助擴(kuò)容 transfer(tab, nt); } // sc = (rs << RESIZE_STAMP_SHIFT) + 2,移位后是負(fù)數(shù) else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 發(fā)起擴(kuò)容,此時nextTable=null transfer(tab, null); s = sumCount(); } } }
在put()
方法執(zhí)行最后會對當(dāng)前Map的size+1,ConcurrentHashMap中size由baseCount
和CounterCell[] as
組成,size=baseCount+as[i].value
。addCount的大致過程如下:
CAS替換baseCount值,如果失敗說明對size的增量(size++
)存在競爭
如果存在競爭,我們會使用到CounterCell[] as
數(shù)組
as[ThreadLocalRandom.getProbe() & m]
隨機(jī)取一個索引位,使用CAS完成size++
如果as[i]
也存在競爭會調(diào)用fullAddCount(x, uncontended);
方法完成size++
size++
完成后通過size=baseCount+as[i].value
公式計算出元素總數(shù)
判斷是否需要擴(kuò)容
如果需要擴(kuò)容,在判斷一下是幫助擴(kuò)容還是發(fā)起擴(kuò)容
注意:
CounterCell[] as:這個的只要目的是分散對
baseCount
的單一競爭,提示size++
的并發(fā)率,這里和table
數(shù)組一樣使用了鎖分離技術(shù),as的長度也是2的n次方,初始長度是2在第三步中使用隨機(jī)數(shù)也是為了提升并發(fā)效率,
ThreadLocalRandom
類是JDK7在JUC包下新增的隨機(jī)數(shù)生成器,它解決了Random類在多線程下,多個線程競爭內(nèi)部唯一的原子性種子變量,而導(dǎo)致大量線程自旋重試的不足
fullAddCount(x, uncontended);
方法里面完成了as
的初始化和擴(kuò)容元素總數(shù)的計算公式是
size=baseCount+as[i].value
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
元素總數(shù)的計算公式是size=baseCount+as[i].value
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); // table 不為NULL并且對飲索引位不為NULL if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { // 頭節(jié)點(diǎn)key相同 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 樹節(jié)點(diǎn)或者ForwardingNode標(biāo)記節(jié)點(diǎn) else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 鏈表節(jié)點(diǎn) while ((e = e.next) != null) { // key相同 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
主要流程:
判斷table和key對應(yīng)索引位是否為NULL
判斷頭節(jié)點(diǎn)是否是要找的節(jié)點(diǎn)
eh < 0表示是樹節(jié)點(diǎn)或ForwardingNode標(biāo)記節(jié)點(diǎn),直接通過find方法找對應(yīng)的key
否則是鏈表節(jié)點(diǎn),挨個鏈表節(jié)點(diǎn)找相應(yīng)的key
返回結(jié)果
注意:
get 方法沒有加鎖,原因是節(jié)點(diǎn)的value是volatile的,已經(jīng)保證了可見性,只要value有更新,那么我們一定能讀到最新數(shù)據(jù)。
e.find(h, key)
這里:如果對應(yīng)索引位頭結(jié)點(diǎn)是ForwardingNode節(jié)點(diǎn),那么會直接去擴(kuò)通后的數(shù)組找對應(yīng)的key,可以參見上面ForwardingNode.find()
方法
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
get方法和containsKey方法都是遍歷對應(yīng)索引位上所有節(jié)點(diǎn),來判斷是否存在key相同的節(jié)點(diǎn)以及獲得該節(jié)點(diǎn)的value。但由于遍歷過程中其他線程可能對鏈表結(jié)構(gòu)做了調(diào)整,因此get和containsKey返回的可能是過時的數(shù)據(jù),這一點(diǎn)是ConcurrentHashMap在弱一致性上的體現(xiàn)。
去掉了Segment 數(shù)組:這樣做鎖的粒度更小,減少了并發(fā)沖突的概率;查找數(shù)據(jù)時不用計算兩次hash;
存儲數(shù)據(jù)是采用了鏈表+紅黑樹的形式:當(dāng)一個桶內(nèi)數(shù)據(jù)量很大的時候,紅黑樹的查詢效率遠(yuǎn)高于鏈表。
1.8直接使用了內(nèi)置鎖synchronized:簡化了加鎖操作
1.8的初始化是在第一次put時完成的,1.7的時候再構(gòu)造的時候完成的
在put過程中當(dāng)發(fā)現(xiàn)正在擴(kuò)容,1.8的線程會幫助擴(kuò)容,1.7的只是會檢查key是否存在或者完成新節(jié)點(diǎn)的初始化工作
1.8的hash值計算更簡單了
1.8擴(kuò)容過程中會修改擴(kuò)容前的數(shù)組,1.7擴(kuò)容過程中不會修改原來數(shù)組
1.8在get()
時如果判斷到當(dāng)前索引位正在擴(kuò)容,那么直接在擴(kuò)容后的數(shù)組中去找對應(yīng)的key
1.7的size計算使用的三次計算的方式,1.8使用了鎖分離技術(shù)
package com.xiaolyuh; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author yuhao.wang3 * @since 2019/7/26 17:58 */ public class HashMapTest { public static void main(String[] args) { AtomicInteger integer = new AtomicInteger(); ExecutorService cachedThreadPool = Executors.newFixedThreadPool(50); Map<User, Integer> map = new ConcurrentHashMap<>(); for (int i = 0; i < 10000; i++) { cachedThreadPool.execute(() -> { User user = new User(1); map.put(user, 1); }); } } static class User { int age; public User(int age) { this.age = age; } @Override public int hashCode() { return age; } @Override public String toString() { return "" + age; } } }
為監(jiān)控而生的多級緩存框架 layering-cache這是我開源的一個多級緩存框架的實(shí)現(xiàn)
上述就是小編為大家分享的JAVA并發(fā)容器ConcurrentHashMap 1.7和1.8 源碼怎么寫了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。