您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“ConcurrentHashMap有什么用”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“ConcurrentHashMap有什么用”吧!
??
HashTable
的put
,get
,remove
等方法是通過(guò)synchronized
來(lái)修飾保證其線程安全性的。HashTable
是 不允許key跟value為null的。問(wèn)題是 synchronized
是個(gè)關(guān)鍵字級(jí)別的重量鎖,在get數(shù)據(jù)的時(shí)候任何寫(xiě)入操作都不允許。相對(duì)來(lái)說(shuō)性能不好。因此目前主要用的ConcurrentHashMap
來(lái)保證線程安全性。
ConcurrentHashMap
主要分為JDK<=7跟JDK>=8的兩個(gè)版本,ConcurrentHashMap
的空間利用率更低一般只有10%~20%,接下來(lái)分別介紹。
先宏觀說(shuō)下JDK7中的大致組成,ConcurrentHashMap由Segment
數(shù)組結(jié)構(gòu)和HashEntry
數(shù)組組成。Segment是一種可重入鎖,是一種數(shù)組和鏈表的結(jié)構(gòu),一個(gè)Segment中包含一個(gè)HashEntry數(shù)組,每個(gè)HashEntry又是一個(gè)鏈表結(jié)構(gòu)。正是通過(guò)Segment分段鎖,ConcurrentHashMap實(shí)現(xiàn)了高效率的并發(fā)。缺點(diǎn)是并發(fā)程度是有segment數(shù)組來(lái)決定的,并發(fā)度一旦初始化無(wú)法擴(kuò)容。先繪制個(gè)ConcurrentHashMap
的形象直觀圖。要想理解currentHashMap
,可以簡(jiǎn)單的理解為將數(shù)據(jù)「分表分庫(kù)」。ConcurrentHashMap
是由 Segment
數(shù)組 結(jié)構(gòu)和HashEntry
數(shù)組 結(jié)構(gòu)組成。
??
Segment 是一種可重入鎖 ReentrantLock
的子類 ,在ConcurrentHashMap
里扮演鎖的角色,HashEntry
則用于存儲(chǔ)鍵值對(duì)數(shù)據(jù)。ConcurrentHashMap
里包含一個(gè)Segment
數(shù)組來(lái)實(shí)現(xiàn)鎖分離,Segment
的結(jié)構(gòu)和HashMap
類似,一個(gè)Segment
里包含一個(gè)HashEntry
數(shù)組,每個(gè)HashEntry
是一個(gè)鏈表結(jié)構(gòu)的元素, 每個(gè)Segment
守護(hù)者一個(gè)HashEntry
數(shù)組里的元素,當(dāng)對(duì)HashEntry
數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí),必須首先獲得它對(duì)應(yīng)的Segment
鎖。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table; //包含一個(gè)HashMap 可以理解為
}
可以理解為我們的每個(gè)segment
都是實(shí)現(xiàn)了Lock
功能的HashMap
。如果我們同時(shí)有多個(gè)segment
形成了segment
數(shù)組那我們就可以實(shí)現(xiàn)并發(fā)咯。
currentHashMap
的構(gòu)造函數(shù),先總結(jié)幾點(diǎn)。
構(gòu)造函數(shù)詳解:
//initialCapacity 是我們保存所以KV數(shù)據(jù)的初始值
//loadFactor這個(gè)就是HashMap的負(fù)載因子
// 我們segment數(shù)組的初始化大小
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS) // 最大允許segment的個(gè)數(shù),不能超過(guò) 1< 24
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0; // 類似擾動(dòng)函數(shù)
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1; // 確保segment一定是2次冪
}
this.segmentShift = 32 - sshift;
//有點(diǎn)類似與擾動(dòng)函數(shù),跟下面的參數(shù)配合使用實(shí)現(xiàn) 當(dāng)前元素落到那個(gè)segment上面。
this.segmentMask = ssize - 1; // 為了 取模 專用
if (initialCapacity > MAXIMUM_CAPACITY) //不能大于 1< 30
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; //總的數(shù)組大小 被 segment 分散后 需要多少個(gè)table
if (c * ssize < initialCapacity)
++c; //確保向上取值
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 每個(gè)table初始化大小為2
while (cap < c) // 單獨(dú)的一個(gè)segment[i] 對(duì)應(yīng)的table 容量大小。
cap <<= 1;
// 將table的容量初始化為2的次冪
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
// 負(fù)載因子,閾值,每個(gè)segment的初始化大小。跟hashmap 初始值類似。
// 并且segment的初始化是懶加載模式,剛開(kāi)始只有一個(gè)s0,其余的在需要的時(shí)候才會(huì)增加。
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
// 整體思想就是通過(guò)多次不同方式的位運(yùn)算來(lái)努力將數(shù)據(jù)均勻的分不到目標(biāo)table中,都是些擾動(dòng)函數(shù)
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash42((String) k);
}
h ^= k.hashCode();
// single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
hash
找到對(duì)應(yīng)的
segment
,繼續(xù)通過(guò)
hash
找到對(duì)應(yīng)的
table
,然后就是遍歷這個(gè)鏈表看是否可以找到,并且要注意
get
的時(shí)候是沒(méi)有加鎖的。 public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key); // JDK7中標(biāo)準(zhǔn)的hash值獲取算法
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // hash值如何映射到對(duì)應(yīng)的segment上
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
// 無(wú)非就是獲得hash值對(duì)應(yīng)的segment 是否存在,
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
// 看下這個(gè)hash值對(duì)應(yīng)的是segment(HashEntry)中的具體位置。然后遍歷查詢?cè)撴湵?br/> K k;
if ((k = e.key) key || (e.hash h && key.equals(k)))
return e.value;
}
}
return null;
}
hash
值對(duì)應(yīng)的
segment
位置,然后看該
segment
位置是否初始化了(因?yàn)閟egment是懶加載模式)。選擇性初始化,最終執(zhí)行put操作。 @SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value null)
throw new NullPointerException();
int hash = hash(key);// 還是獲得最終hash值
int j = (hash >>> segmentShift) & segmentMask; // hash值位操作對(duì)應(yīng)的segment數(shù)組位置
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) null)
s = ensureSegment(j);
// 初始化時(shí)候因?yàn)橹挥械谝粋€(gè)segment,如果落在了其余的segment中 則需要現(xiàn)初始化。
return s.put(key, hash, value, false);
// 直接在數(shù)據(jù)中執(zhí)行put操作。
}
其中put
操作基本思路跟HashMap
幾乎一樣,只是在開(kāi)始跟結(jié)束進(jìn)行了加鎖的操作tryLock and unlock
,然后JDK7中都是先擴(kuò)容再添加數(shù)據(jù)的,并且獲得不到鎖也會(huì)進(jìn)行自旋的tryLock或者lock阻塞排隊(duì)進(jìn)行等待(同時(shí)獲得鎖前提前new出新數(shù)據(jù))。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 在往該 segment 寫(xiě)入前,需要先獲取該 segment 的獨(dú)占鎖,獲取失敗嘗試獲取自旋鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// segment 內(nèi)部的數(shù)組
HashEntry<K,V>[] tab = table;
// 利用 hash 值,求應(yīng)該放置的數(shù)組下標(biāo)
int index = (tab.length - 1) & hash;
// first 是數(shù)組該位置處的鏈表的表頭
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) key ||
(e.hash hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 覆蓋舊值
e.value = value;
++modCount;
}
break;
}
// 繼續(xù)順著鏈表走
e = e.next;
}
else {
// node 是不是 null,這個(gè)要看獲取鎖的過(guò)程。沒(méi)獲得鎖的線程幫我們創(chuàng)建好了節(jié)點(diǎn),直接頭插法
// 如果不為 null,那就直接將它設(shè)置為鏈表表頭;如果是 null,初始化并設(shè)置為鏈表表頭。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果超過(guò)了該 segment 的閾值,這個(gè) segment 需要擴(kuò)容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 擴(kuò)容
else
// 沒(méi)有達(dá)到閾值,將 node 放到數(shù)組 tab 的 index 位置,
// 將新的結(jié)點(diǎn)設(shè)置成原鏈表的表頭
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解鎖
unlock();
}
return oldValue;
}
如果加鎖失敗了調(diào)用scanAndLockForPut
,完成查找或新建節(jié)點(diǎn)的工作。當(dāng)獲取到鎖后直接將該節(jié)點(diǎn)加入鏈表即可,「提升」了put操作的性能,這里涉及到自旋。大致過(guò)程:
??
在我獲取不到鎖的時(shí)候我進(jìn)行tryLock,準(zhǔn)備好new的數(shù)據(jù),同時(shí)還有一定的次數(shù)限制,還要考慮別的已經(jīng)獲得線程的節(jié)點(diǎn)修改該頭節(jié)點(diǎn)。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循環(huán)獲取鎖
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e null) {
if (node null) // speculatively create node
// 進(jìn)到這里說(shuō)明數(shù)組該位置的鏈表是空的,沒(méi)有任何元素
// 當(dāng)然,進(jìn)到這里的另一個(gè)原因是 tryLock() 失敗,所以該槽存在并發(fā),不一定是該位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 順著鏈表往下走
e = e.next;
}
// 重試次數(shù)如果超過(guò) MAX_SCAN_RETRIES(單核 1 次多核 64 次),那么不搶了,進(jìn)入到阻塞隊(duì)列等待鎖
// lock() 是阻塞方法,直到獲取鎖后返回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) 0 &&
// 進(jìn)入這里,說(shuō)明有新的元素進(jìn)到了鏈表,并且成為了新的表頭
// 這邊的策略是,重新執(zhí)行 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
Size
這個(gè)size方法比較有趣,他是先無(wú)鎖的統(tǒng)計(jì)下所有的數(shù)據(jù)量看下前后兩次是否數(shù)據(jù)一樣,如果一樣則返回?cái)?shù)據(jù),如果不一樣則要把全部的segment進(jìn)行加鎖,統(tǒng)計(jì),解鎖。并且size方法只是返回一個(gè)統(tǒng)計(jì)性的數(shù)字,因此size謹(jǐn)慎使用哦。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ RETRIES_BEFORE_LOCK) { // 超過(guò)2次則全部加鎖
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // 直接對(duì)全部segment加鎖消耗性太大
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount; // 統(tǒng)計(jì)的是modCount,涉及到增刪該都會(huì)加1
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum last) // 每一個(gè)前后的修改次數(shù)一樣 則認(rèn)為一樣,但凡有一個(gè)不一樣則直接break。
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
segment
數(shù)組初始化后就不可變了,也就是說(shuō)
「并發(fā)性不可變」,不過(guò)
segment
里的
table
可以擴(kuò)容為2倍,該方法沒(méi)有考慮并發(fā),因?yàn)閳?zhí)行該方法之前已經(jīng)獲取了鎖。其中JDK7中的
rehash
思路跟JDK8 中擴(kuò)容后處理鏈表的思路一樣,個(gè)人不過(guò)感覺(jué)沒(méi)有8寫(xiě)的精髓好看。// 方法參數(shù)上的 node 是這次擴(kuò)容后,需要添加到新的數(shù)組中的數(shù)據(jù)。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 創(chuàng)建新數(shù)組
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩碼,如從 16 擴(kuò)容到 32,那么 sizeMask 為 31,對(duì)應(yīng)二進(jìn)制 ‘000...00011111’
int sizeMask = newCapacity - 1;
// 遍歷原數(shù)組,將原數(shù)組位置 i 處的鏈表拆分到 新數(shù)組位置 i 和 i+oldCap 兩個(gè)位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是鏈表的第一個(gè)元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 計(jì)算應(yīng)該放置在新數(shù)組中的位置,
// 假設(shè)原數(shù)組長(zhǎng)度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask; // 新位置
if (next null) // 該位置處只有一個(gè)元素
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是鏈表表頭
HashEntry<K,V> lastRun = e;
// idx 是當(dāng)前鏈表的頭結(jié)點(diǎn) e 的新位置
int lastIdx = idx;
// for 循環(huán)找到一個(gè) lastRun 結(jié)點(diǎn),這個(gè)結(jié)點(diǎn)之后的所有元素是將要放到一起的
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 將 lastRun 及其之后的所有結(jié)點(diǎn)組成的這個(gè)鏈表放到 lastIdx 這個(gè)位置
newTable[lastIdx] = lastRun;
// 下面的操作是處理 lastRun 之前的結(jié)點(diǎn),
//這些結(jié)點(diǎn)可能分配在另一個(gè)鏈表中,也可能分配到上面的那個(gè)鏈表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 將新來(lái)的 node 放到新數(shù)組中剛剛的 兩個(gè)鏈表之一 的 頭部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
ConcurrentHashMap
中通過(guò)原子操作
sun.misc.Unsafe
查找元素、替換元素和設(shè)置元素。通過(guò)這樣的硬件級(jí)別獲得數(shù)據(jù)可以保證及時(shí)是多線程我也每次獲得的數(shù)據(jù)是最新的。這些原子操作起著非常關(guān)鍵的作用,你可以在所有
ConcurrentHashMap
的基本功能中看到,隨機(jī)距離如下: final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
??
ConcurrentHashMap
允許多個(gè)修改操作并發(fā)進(jìn)行,其關(guān)鍵在于使用了鎖分離技術(shù)。它使用了多個(gè)鎖來(lái)控制對(duì)hash表的不同部分進(jìn)行的修改。內(nèi)部使用段(Segment
)來(lái)表示這些不同的部分,每個(gè)段其實(shí)就是一個(gè)小的HashTable
,只要多個(gè)修改操作發(fā)生在不同的段上,它們就可以并發(fā)進(jìn)行。
?用于存儲(chǔ)鍵值對(duì)數(shù)據(jù)的
?HashEntry
,在設(shè)計(jì)上它的成員變量value跟next
都是volatile
類型的,這樣就保證別的線程對(duì)value值的修改,get方法可以馬上看到。
ConcurrentHashMap的弱一致性體現(xiàn)在迭代器,clear和get方法,原因在于沒(méi)有加鎖。
JDK8相比與JDK7主要區(qū)別如下:
??
取消了segment數(shù)組,直接用table保存數(shù)據(jù),鎖的粒度更小,減少并發(fā)沖突的概率。采用table數(shù)組元素作為鎖,從而實(shí)現(xiàn)了對(duì)每一行數(shù)據(jù)進(jìn)行加鎖,進(jìn)一步減少并發(fā)沖突的概率,并發(fā)控制使用Synchronized和CAS來(lái)操作。 存儲(chǔ)數(shù)據(jù)時(shí)采用了數(shù)組+ 鏈表+紅黑樹(shù)的形式。
?private static final int MAXIMUM_CAPACITY = 1 << 30; // 數(shù)組的最大值
private static final int DEFAULT_CAPACITY = 16; // 默認(rèn)數(shù)組長(zhǎng)度
static final int TREEIFY_THRESHOLD = 8; // 鏈表轉(zhuǎn)紅黑樹(shù)的一個(gè)條件
static final int UNTREEIFY_THRESHOLD = 6; // 紅黑樹(shù)轉(zhuǎn)鏈表的一個(gè)條件
static final int MIN_TREEIFY_CAPACITY = 64; // 鏈表轉(zhuǎn)紅黑樹(shù)的另一個(gè)條件
static final int MOVED = -1; // 表示正在擴(kuò)容轉(zhuǎn)移
static final int TREEBIN = -2; // 表示已經(jīng)轉(zhuǎn)換成樹(shù)
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // 獲得hash值的輔助參數(shù)
transient volatile Node<K,V>[] table;// 默認(rèn)沒(méi)初始化的數(shù)組,用來(lái)保存元素
private transient volatile Node<K,V>[] nextTable; // 轉(zhuǎn)移的時(shí)候用的數(shù)組
static final int NCPU = Runtime.getRuntime().availableProcessors();// 獲取可用的CPU個(gè)數(shù)
private transient volatile Node<K,V>[] nextTable; // 連接表,用于哈希表擴(kuò)容,擴(kuò)容完成后會(huì)被重置為 null
private transient volatile long baseCount;保存著整個(gè)哈希表中存儲(chǔ)的所有的結(jié)點(diǎn)的個(gè)數(shù)總和,有點(diǎn)類似于 HashMap 的 size 屬性。private transient volatile int
sizeCtl
;負(fù)數(shù):表示進(jìn)行初始化或者擴(kuò)容,-1:表示正在初始化,-N:表示有 N-1 個(gè)線程正在進(jìn)行擴(kuò)容 正數(shù):0 表示還沒(méi)有被初始化,> 0的數(shù):初始化或者是下一次進(jìn)行擴(kuò)容的閾值,有點(diǎn)類似HashMap中的
?threshold
,不過(guò)功能「更強(qiáng)大」。
Node
static class Node<K,V> implements Map.Entry<K,V> {
final int hash; // key的hash值
final K key; // key
volatile V val; // value
volatile Node<K,V> next;
//表示鏈表中的下一個(gè)節(jié)點(diǎn)
}
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
// 紅黑樹(shù)的父親節(jié)點(diǎn)
TreeNode<K,V> left;
// 左節(jié)點(diǎn)
TreeNode<K,V> right;
// 右節(jié)點(diǎn)
TreeNode<K,V> prev;
// 前節(jié)點(diǎn)
boolean red;
// 是否為紅點(diǎn)
}
ForwardingNode
的構(gòu)造方法中,可以看到此變量的hash =
「-1」 ,類中還存儲(chǔ)
nextTable
的引用。該初始化方法只在
transfer
方法被調(diào)用,如果一個(gè)類被設(shè)置成此種情況并且hash = -1 則說(shuō)明該節(jié)點(diǎn)不需要resize了。static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
//注意這里
super(MOVED, null, null, null);
this.nextTable = tab;
}
//.....
}
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
}
整體的構(gòu)造情況基本跟HashMap類似,并且為了跟原來(lái)的JDK7中的兼容性還可以傳入并發(fā)度。不過(guò)JDK8中并發(fā)度已經(jīng)有table的具體長(zhǎng)度來(lái)控制了。
??
ConcurrentHashMap():創(chuàng)建一個(gè)帶有默認(rèn)初始容量 (16)、加載因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 ConcurrentHashMap(int):創(chuàng)建一個(gè)帶有指定初始容量 tableSizeFor
、默認(rèn)加載因子 (0.75) 和 concurrencyLevel (16) 的新的空映射ConcurrentHashMap(Map<? extends K, ? extends V> m):構(gòu)造一個(gè)與給定映射具有相同映射關(guān)系的新映射 ConcurrentHashMap(int initialCapacity, float loadFactor):創(chuàng)建一個(gè)帶有指定初始容量、加載因子和默認(rèn) concurrencyLevel (1) 的新的空映射 ConcurrentHashMap(int, float, int):創(chuàng)建一個(gè)帶有指定初始容量、加載因子和并發(fā)級(jí)別的新的空映射。
假設(shè)table已經(jīng)初始化完成,put操作采用 CAS + synchronized 實(shí)現(xiàn)并發(fā)插入或更新操作,具體實(shí)現(xiàn)如下。
??
做一些邊界處理,然后獲得hash值。 沒(méi)初始化就初始化,初始化后看下對(duì)應(yīng)的桶是否為空,為空就原子性的嘗試插入。 如果當(dāng)前節(jié)點(diǎn)正在擴(kuò)容還要去幫忙擴(kuò)容,騷操作。 用 syn
來(lái)加鎖當(dāng)前節(jié)點(diǎn),然后操作幾乎跟就跟hashmap一樣了。
// Node 節(jié)點(diǎn)的 hash值在HashMap中存儲(chǔ)的就是hash值,在currenthashmap中可能有多種情況哦!
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key null || value null) throw new NullPointerException(); //邊界處理
int hash = spread(key.hashCode());// 最終hash值計(jì)算
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //循環(huán)表
Node<K,V> f; int n, i, fh;
if (tab null || (n = tab.length) 0)
tab = initTable(); // 初始化表 如果為空,懶漢式
else if ((f = tabAt(tab, i = (n - 1) & hash)) null) {
// 如果對(duì)應(yīng)桶位置為空
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
// CAS 原子性的嘗試插入
break;
}
else if ((fh = f.hash) MOVED)
// 如果當(dāng)前節(jié)點(diǎn)正在擴(kuò)容。還要幫著去擴(kuò)容。
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) { // 桶存在數(shù)據(jù) 加鎖操作進(jìn)行處理
if (tabAt(tab, i) f) {
if (fh >= 0) { // 如果存儲(chǔ)的是鏈表 存儲(chǔ)的是節(jié)點(diǎn)的hash值
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 遍歷鏈表去查找,如果找到key一樣則選擇性
if (e.hash hash &&
((ek = e.key) key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) null) {// 找到尾部插入
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {// 如果桶節(jié)點(diǎn)類型為TreeBin
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// 嘗試紅黑樹(shù)插入,同時(shí)也要防止節(jié)點(diǎn)本來(lái)就有,選擇性覆蓋
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { // 如果鏈表數(shù)量
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 鏈表轉(zhuǎn)紅黑樹(shù)哦!
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 統(tǒng)計(jì)大小 并且檢查是否要擴(kuò)容。
return null;
}
涉及到重要函數(shù)initTable
、tabAt
、casTabAt
、helpTransfer
、putTreeVal
、treeifyBin
、addCount
函數(shù)。
「只允許一個(gè)線程」對(duì)表進(jìn)行初始化,如果不巧有其他線程進(jìn)來(lái)了,那么會(huì)讓其他線程交出 CPU 等待下次系統(tǒng)調(diào)度Thread.yield
。這樣,保證了表同時(shí)只會(huì)被一個(gè)線程初始化,對(duì)于table的大小,會(huì)根據(jù)sizeCtl
的值進(jìn)行設(shè)置,如果沒(méi)有設(shè)置szieCtl的值,那么默認(rèn)生成的table大小為16,否則,會(huì)根據(jù)sizeCtl
的大小設(shè)置table大小。
// 容器初始化 操作
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) null || tab.length 0) {
if ((sc = sizeCtl) < 0) // 如果正在初始化-1,-N 正在擴(kuò)容。
Thread.yield(); // 進(jìn)行線程讓步等待
// 讓掉當(dāng)前線程 CPU 的時(shí)間片,使正在運(yùn)行中的線程重新變成就緒狀態(tài),并重新競(jìng)爭(zhēng) CPU 的調(diào)度權(quán)。
// 它可能會(huì)獲取到,也有可能被其他線程獲取到。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 比較sizeCtl的值與sc是否相等,相等則用 -1 替換,這表明我這個(gè)線程在進(jìn)行初始化了!
try {
if ((tab = table) null || tab.length 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默認(rèn)為16
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2); // sc = 0.75n
}
} finally {
sizeCtl = sc; //設(shè)置sizeCtl 類似threshold
}
break;
}
}
return tab;
}
在ConcurrentHashMap
中使用了unSafe
方法,通過(guò)直接操作內(nèi)存的方式來(lái)保證并發(fā)處理的安全性,使用的是硬件的安全機(jī)制。
// 用來(lái)返回節(jié)點(diǎn)數(shù)組的指定位置的節(jié)點(diǎn)的原子操作
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// cas原子操作,在指定位置設(shè)定值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 原子操作,在指定位置設(shè)定值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
// 比較table數(shù)組下標(biāo)為i的結(jié)點(diǎn)是否為c,若為c,則用v交換操作。否則,不進(jìn)行交換操作。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
可以看到獲得table[i]數(shù)據(jù)是通過(guò)Unsafe
對(duì)象通過(guò)反射獲取的,取數(shù)據(jù)直接table[index]不可以么,為什么要這么復(fù)雜?在java內(nèi)存模型中,我們已經(jīng)知道每個(gè)線程都有一個(gè)工作內(nèi)存,里面存儲(chǔ)著table的「副本」,雖然table是volatile
修飾的,但不能保證線程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接獲取指定內(nèi)存的數(shù)據(jù),「保證了每次拿到數(shù)據(jù)都是最新的」。
// 可能有多個(gè)線程在同時(shí)幫忙運(yùn)行helpTransfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// table不是空 且 node節(jié)點(diǎn)是轉(zhuǎn)移類型,并且轉(zhuǎn)移類型的nextTable 不是空 說(shuō)明還在擴(kuò)容ing
int rs = resizeStamp(tab.length);
// 根據(jù) length 得到一個(gè)前16位的標(biāo)識(shí)符,數(shù)組容量大小。
// 確定新table指向沒(méi)有變,老table數(shù)據(jù)也沒(méi)變,并且此時(shí) sizeCtl小于0 還在擴(kuò)容ing
while (nextTab nextTable && table tab && (sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc rs + 1 || sc rs + MAX_RESIZERS || transferIndex <= 0)
// 1. sizeCtl 無(wú)符號(hào)右移16位獲得高16位如果不等 rs 標(biāo)識(shí)符變了
// 2. 如果擴(kuò)容結(jié)束了 這里可以看 trePresize 函數(shù)第一次擴(kuò)容操作:
// 默認(rèn)第一個(gè)線程設(shè)置 sc = rs 左移 16 位 + 2,當(dāng)?shù)谝粋€(gè)線程結(jié)束擴(kuò)容了,
// 就會(huì)將 sc 減一。這個(gè)時(shí)候,sc 就等于 rs + 1。
// 3. 如果達(dá)到了最大幫助線程個(gè)數(shù) 65535個(gè)
// 4. 如果轉(zhuǎn)移下標(biāo)調(diào)整ing 擴(kuò)容已經(jīng)結(jié)束了
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 如果以上都不是, 將 sizeCtl + 1,增加一個(gè)線程來(lái)擴(kuò)容
transfer(tab, nextTab); // 進(jìn)行轉(zhuǎn)移
break;// 結(jié)束循環(huán)
}
}
return nextTab;
}
return table;
}
?該方法的作用是「返回?zé)o符號(hào)整型i的最高非零位前面的0的個(gè)數(shù)」,包括符號(hào)位在內(nèi);如果i為負(fù)數(shù),這個(gè)方法將會(huì)返回0,符號(hào)位為1. 比如說(shuō),10的二進(jìn)制表示為 0000 0000 0000 0000 0000 0000 0000 1010 java的整型長(zhǎng)度為32位。那么這個(gè)方法返回的就是28
?
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
//RESIZE_STAMP_BITS = 16
}
主要就2件事:一是更新 baseCount,二是判斷是否需要擴(kuò)容。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 首先如果沒(méi)有并發(fā) 此時(shí)countCells is null, 此時(shí)嘗試CAS設(shè)置數(shù)據(jù)值。
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 如果 counterCells不為空以為此時(shí)有并發(fā)的設(shè)置 或者 CAS設(shè)置 baseCount 失敗了
CounterCell a; long v; int m;
boolean uncontended = true;
if (as null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 1. 如果沒(méi)出現(xiàn)并發(fā) 此時(shí)計(jì)數(shù)盒子為 null
// 2. 隨機(jī)取出一個(gè)數(shù)組位置發(fā)現(xiàn)為空
// 3. 出現(xiàn)并發(fā)后修改這個(gè)cellvalue 失敗了
// 執(zhí)行funAddCount
fullAddCount(x, uncontended);// 死循環(huán)操作
return;
}
if (check <= 1)
return;
s = sumCount(); // 吧counterCells數(shù)組中的每一個(gè)數(shù)據(jù)進(jìn)行累加給baseCount。
}
// 如果需要擴(kuò)容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);// 獲得高位標(biāo)識(shí)符
if (sc < 0) { // 是否需要幫忙去擴(kuò)容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc rs + 1 ||
sc rs + MAX_RESIZERS || (nt = nextTable) null || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
} // 第一次擴(kuò)容
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
ConcurrentHashMap
提供了
baseCount
、
counterCells
兩個(gè)輔助變量和一個(gè)
CounterCell
輔助內(nèi)部類。sumCount() 就是迭代
counterCells
來(lái)統(tǒng)計(jì) sum 的過(guò)程。put 操作時(shí),肯定會(huì)影響
size()
,在
put()
方法最后會(huì)調(diào)用
addCount()
方法。整體的思維方法跟LongAdder類似,用的思維就是借鑒的
ConcurrentHashMap
。每一個(gè)
Cell
都用Contended修飾來(lái)避免偽共享。??
JDK1.7 和 JDK1.8 對(duì) size 的計(jì)算是不一樣的。1.7 中是先不加鎖計(jì)算三次,如果三次結(jié)果不一樣在加鎖。 JDK1.8 size 是通過(guò)對(duì) baseCount 和 counterCell 進(jìn)行 CAS 計(jì)算,最終通過(guò) baseCount 和 遍歷 CounterCell 數(shù)組得出 size。 JDK 8 推薦使用mappingCount 方法,因?yàn)檫@個(gè)方法的返回值是 long 類型,不會(huì)因?yàn)?size 方法是 int 類型限制最大值。
addCount
第一次擴(kuò)容時(shí)候會(huì)有騷操作
sc=rs << RESIZE_STAMP_SHIFT) + 2)
其中
rs = resizeStamp(n)
。這里需要核心說(shuō)一點(diǎn),如果不是第一次擴(kuò)容則直接將低16位的數(shù)字 +1 即可。
這個(gè)操作幾乎跟HashMap
的操作完全一樣,核心思想就是一定要決定向左還是向右然后最終嘗試放置新數(shù)據(jù),然后balance。不同點(diǎn)就是有鎖的考慮。
這里的基本思路跟HashMap
幾乎一樣,不同點(diǎn)就是先變成TreeNode,然后是「單向鏈表」串聯(lián)。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//如果整個(gè)table的數(shù)量小于64,就擴(kuò)容至原來(lái)的一倍,不轉(zhuǎn)紅黑樹(shù)了
//因?yàn)檫@個(gè)閾值擴(kuò)容可以減少hash沖突,不必要去轉(zhuǎn)紅黑樹(shù)
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) { //鎖定當(dāng)前桶
if (tabAt(tab, index) b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
//遍歷這個(gè)鏈表然后將每個(gè)節(jié)點(diǎn)封裝成TreeNode,最終單鏈表串聯(lián)起來(lái),
// 最終 調(diào)用setTabAt 放置紅黑樹(shù)
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) null)
hd = p;
else
tl.next = p;
tl = p;
}
//通過(guò)TreeBin對(duì)象對(duì)TreeNode轉(zhuǎn)換成紅黑樹(shù)
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
主要功能就是鏈表變化為紅黑樹(shù),這個(gè)紅黑樹(shù)用TreeBin
來(lái)包裝。并且要注意 轉(zhuǎn)成紅黑樹(shù)以后以前鏈表的結(jié)構(gòu)信息還是有的,最終信息如下:
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
//創(chuàng)建空節(jié)點(diǎn) hash = -2
this.first = b;
TreeNode<K,V> r = null; // root 節(jié)點(diǎn)
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r null) {
x.parent = null;
x.red = false;
r = x; // root 節(jié)點(diǎn)設(shè)置為x
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
// x代表的是轉(zhuǎn)換為樹(shù)之前的順序遍歷到鏈表的位置的節(jié)點(diǎn),r代表的是根節(jié)點(diǎn)
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc null &&
(kc = comparableClassFor(k)) null) ||
(dir = compareComparables(kc, k, pk)) 0)
dir = tieBreakOrder(k, pk);
// 當(dāng)key不可以比較,或者相等的時(shí)候采取的一種排序措施
TreeNode<K,V> xp = p;
// 放一定是放在葉子節(jié)點(diǎn)上,如果還沒(méi)找到葉子節(jié)點(diǎn)則進(jìn)行循環(huán)往下找。
// 找到了目前葉子節(jié)點(diǎn)才會(huì)進(jìn)入 再放置數(shù)據(jù)
if ((p = (dir <= 0) ? p.left : p.right) null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
// 每次插入一個(gè)元素的時(shí)候都調(diào)用 balanceInsertion 來(lái)保持紅黑樹(shù)的平衡
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
當(dāng)數(shù)組長(zhǎng)度小于64的時(shí)候,擴(kuò)張數(shù)組長(zhǎng)度一倍,調(diào)用此函數(shù)。擴(kuò)容后容量大小的核對(duì),可能涉及到初始化容器大小。并且擴(kuò)容的時(shí)候又跟2的次冪聯(lián)系上了!,其中初始化時(shí)候傳入map會(huì)調(diào)用putAll方法直接put一個(gè)map的話,在「putAll」方法中沒(méi)有調(diào)用initTable方法去初始化table,而是直接調(diào)用了tryPresize方法,所以這里需要做一個(gè)是不是需要初始化table的判斷。
PS:默認(rèn)第一個(gè)線程設(shè)置 sc = rs 左移 16 位 + 2,當(dāng)?shù)谝粋€(gè)線程結(jié)束擴(kuò)容了,就會(huì)將 sc 減一。這個(gè)時(shí)候,sc 就等于 rs + 1,這個(gè)時(shí)候說(shuō)明擴(kuò)容完畢了。
/**
* 擴(kuò)容表為指可以容納指定個(gè)數(shù)的大?。偸?的N次方)
* 假設(shè)原來(lái)的數(shù)組長(zhǎng)度為16,則在調(diào)用tryPresize的時(shí)候,size參數(shù)的值為16<<1(32),此時(shí)sizeCtl的值為12
* 計(jì)算出來(lái)c的值為64, 則要擴(kuò)容到 sizeCtl ≥ c
* 第一次擴(kuò)容之后 數(shù)組長(zhǎng):32 sizeCtl:24
* 第三次擴(kuò)容之后 數(shù)組長(zhǎng):128 sizeCtl:96 退出
*/
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1); // 合理范圍
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab null || (n = tab.length) 0) {
// 初始化傳入map,今天putAll會(huì)直接調(diào)用這個(gè)。
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//初始化tab的時(shí)候,把 sizeCtl 設(shè)為 -1
try {
if (table tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // sc=sizeCtl = 0.75n
}
} finally {
sizeCtl = sc;
}
}
}
// 初始化時(shí)候如果 數(shù)組容量<=sizeCtl 或 容量已經(jīng)最大化了則退出
else if (c <= sc || n >= MAXIMUM_CAPACITY) {
break;//退出擴(kuò)張
}
else if (tab table) {
int rs = resizeStamp(n);
if (sc < 0) { // sc = siztCtl 如果正在擴(kuò)容Table的話,則幫助擴(kuò)容
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc rs + 1 ||
sc rs + MAX_RESIZERS || (nt = nextTable) null ||
transferIndex <= 0)
break; // 各種條件判斷是否需要加入擴(kuò)容工作。
// 幫助轉(zhuǎn)移數(shù)據(jù)的線程數(shù) + 1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 沒(méi)有在初始化或擴(kuò)容,則開(kāi)始擴(kuò)容
// 此處切記第一次擴(kuò)容 直接 +2
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) {
transfer(tab, null);
}
}
}
}
這里代碼量比較大主要分文三部分,并且感覺(jué)思路很精髓,尤其「是其他線程幫著去擴(kuò)容的騷操作」。
[bound,i]
,然后開(kāi)始 --i 來(lái)遍歷自己的任務(wù)區(qū)間,對(duì)每個(gè)桶進(jìn)行處理。如果遇到桶的頭結(jié)點(diǎn)是空的,那么使用
ForwardingNode
標(biāo)識(shí)舊table中該桶已經(jīng)被處理完成了。如果遇到已經(jīng)處理完成的桶,直接跳過(guò)進(jìn)行下一個(gè)桶的處理。如果是正常的桶,對(duì)桶首節(jié)點(diǎn)加鎖,正常的遷移即可(跟HashMap第三部分一樣思路),遷移結(jié)束后依然會(huì)將原表的該位置標(biāo)識(shí)位已經(jīng)處理。該函數(shù)中的finish= true
則說(shuō)明整張表的遷移操作已經(jīng)「全部」完成了,我們只需要重置 table
的引用并將 nextTable
賦為空即可。否則,CAS
式的將 sizeCtl
減一,表示當(dāng)前線程已經(jīng)完成了任務(wù),退出擴(kuò)容操作。如果退出成功,那么需要進(jìn)一步判斷當(dāng)前線程是否就是最后一個(gè)在執(zhí)行擴(kuò)容的。
f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
第一次擴(kuò)容時(shí)在addCount
中有寫(xiě)到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
表示當(dāng)前只有一個(gè)線程正在工作,「相對(duì)應(yīng)的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT
,說(shuō)明當(dāng)前線程就是最后一個(gè)還在擴(kuò)容的線程,那么會(huì)將 finishing 標(biāo)識(shí)為 true,并在下一次循環(huán)中退出擴(kuò)容方法。
HashMap
大致思路類似的遍歷鏈表/紅黑樹(shù)然后擴(kuò)容操作。private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE 用來(lái)控制不要占用太多CPU
stride = MIN_TRANSFER_STRIDE; // subdivide range //MIN_TRANSFER_STRIDE=16 每個(gè)CPU處理最小長(zhǎng)度個(gè)數(shù)
if (nextTab null) { // 新表格為空則直接新建二倍,別的輔助線程來(lái)幫忙擴(kuò)容則不會(huì)進(jìn)入此if條件
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; // transferIndex 指向最后一個(gè)桶,方便從后向前遍歷
}
int nextn = nextTab.length; // 新表長(zhǎng)度
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 創(chuàng)建一個(gè)fwd節(jié)點(diǎn),這個(gè)是用來(lái)控制并發(fā)的,當(dāng)一個(gè)節(jié)點(diǎn)為空或已經(jīng)被轉(zhuǎn)移之后,就設(shè)置為fwd節(jié)點(diǎn)
boolean advance = true; //是否繼續(xù)向前查找的標(biāo)志位
boolean finishing = false; // to ensure sweep(清掃) before committing nextTab,在完成之前重新在掃描一遍數(shù)組,看看有沒(méi)完成的沒(méi)
// 第一部分
// i 指向當(dāng)前桶, bound 指向當(dāng)前線程需要處理的桶結(jié)點(diǎn)的區(qū)間下限【bound,i】 這樣來(lái)跟線程劃分任務(wù)。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 這個(gè) while 循環(huán)的目的就是通過(guò) --i 遍歷當(dāng)前線程所分配到的桶結(jié)點(diǎn)
// 一個(gè)桶一個(gè)桶的處理
while (advance) {// 每一次成功處理操作都會(huì)將advance設(shè)置為true,然里來(lái)處理區(qū)間的上一個(gè)數(shù)據(jù)
int nextIndex, nextBound;
if (--i >= bound || finishing) { //通過(guò)此處進(jìn)行任務(wù)區(qū)間的遍歷
advance = false;
}
else if ((nextIndex = transferIndex) <= 0) {
i = -1;// 任務(wù)分配完了
advance = false;
}
// 更新 transferIndex
// 為當(dāng)前線程分配任務(wù),處理的桶結(jié)點(diǎn)區(qū)間為(nextBound,nextIndex)
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
// nextIndex本來(lái)等于末尾數(shù)字,
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 當(dāng)前線程所有任務(wù)完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 已經(jīng)完成轉(zhuǎn)移 則直接賦值操作
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); //設(shè)置sizeCtl為擴(kuò)容后的0.75
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1 表示當(dāng)前線程任務(wù)完成。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
// 判斷當(dāng)前線程完成的線程是不是最后一個(gè)在擴(kuò)容的,思路精髓
return;
}
finishing = advance = true;// 如果是則相應(yīng)的設(shè)置參數(shù)
i = n;
}
}
else if ((f = tabAt(tab, i)) null) // 數(shù)組中把null的元素設(shè)置為ForwardingNode節(jié)點(diǎn)(hash值為MOVED[-1])
advance = casTabAt(tab, i, null, fwd); // 如果老節(jié)點(diǎn)數(shù)據(jù)是空的則直接進(jìn)行CAS設(shè)置為fwd
else if ((fh = f.hash) MOVED) //已經(jīng)是個(gè)fwd了,因?yàn)槭嵌嗑€程操作 可能別人已經(jīng)給你弄好了,
advance = true; // already processed
else {
synchronized (f) { //加鎖操作
if (tabAt(tab, i) f) {
Node<K,V> ln, hn;
if (fh >= 0) { //該節(jié)點(diǎn)的hash值大于等于0,說(shuō)明是一個(gè)Node節(jié)點(diǎn)
// 關(guān)于鏈表的操作整體跟HashMap類似不過(guò) 感覺(jué)好像更擾一些。
int runBit = fh & n; // fh= f.hash first hash的意思,看第一個(gè)點(diǎn) 放老位置還是新位置
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n; //n的值為擴(kuò)張前的數(shù)組的長(zhǎng)度
if (b != runBit) {
runBit = b;
lastRun = p;//最后導(dǎo)致發(fā)生變化的節(jié)點(diǎn)
}
}
if (runBit 0) { //看最后一個(gè)變化點(diǎn)是新還是舊 舊
ln = lastRun;
hn = null;
}
else {
hn = lastRun; //看最后一個(gè)變化點(diǎn)是新還是舊 舊
ln = null;
}
/*
* 構(gòu)造兩個(gè)鏈表,順序大部分和原來(lái)是反的,不過(guò)順序也有差異
* 分別放到原來(lái)的位置和新增加的長(zhǎng)度的相同位置(i/n+i)
*/
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) 0)
/*
* 假設(shè)runBit的值為0,
* 則第一次進(jìn)入這個(gè)設(shè)置的時(shí)候相當(dāng)于把舊的序列的最后一次發(fā)生hash變化的節(jié)點(diǎn)(該節(jié)點(diǎn)后面可能還有hash計(jì)算后同為0的節(jié)點(diǎn))設(shè)置到舊的table的第一個(gè)hash計(jì)算后為0的節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)
* 并且把自己返回,然后在下次進(jìn)來(lái)的時(shí)候把它自己設(shè)置為后面節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
*/
ln = new Node<K,V>(ph, pk, pv, ln);
else
/*
* 假設(shè)runBit的值不為0,
* 則第一次進(jìn)入這個(gè)設(shè)置的時(shí)候相當(dāng)于把舊的序列的最后一次發(fā)生hash變化的節(jié)點(diǎn)(該節(jié)點(diǎn)后面可能還有hash計(jì)算后同不為0的節(jié)點(diǎn))設(shè)置到舊的table的第一個(gè)hash計(jì)算后不為0的節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)
* 并且把自己返回,然后在下次進(jìn)來(lái)的時(shí)候把它自己設(shè)置為后面節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
*/
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 該節(jié)點(diǎn)hash值是個(gè)負(fù)數(shù)否則的話是一個(gè)樹(shù)節(jié)點(diǎn)
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null; // 舊 頭尾
TreeNode<K,V> hi = null, hiTail = null; //新頭圍
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) 0) {
if ((p.prev = loTail) null)
lo = p;
else
loTail.next = p; //舊頭尾設(shè)置
loTail = p;
++lc;
}
else { // 新頭圍設(shè)置
if ((p.prev = hiTail) null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//ln 如果老位置數(shù)字<=6 則要對(duì)老位置鏈表進(jìn)行紅黑樹(shù)降級(jí)到鏈表,否則就看是否還需要對(duì)老位置數(shù)據(jù)進(jìn)行新建紅黑樹(shù)
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd); //老表中i位置節(jié)點(diǎn)設(shè)置下
advance = true;
}
}
}
}
}
}
這個(gè)就很簡(jiǎn)單了,獲得hash值,然后判斷存在與否,遍歷鏈表即可,注意get沒(méi)有任何鎖操作!
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 計(jì)算key的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // 表不為空并且表的長(zhǎng)度大于0并且key所在的桶不為空
if ((eh = e.hash) h) { // 表中的元素的hash值與key的hash值相等
if ((ek = e.key) key || (ek != null && key.equals(ek))) // 鍵相等
// 返回值
return e.val;
}
else if (eh < 0) // 是個(gè)TreeBin hash = -2
// 在紅黑樹(shù)中查找,因?yàn)榧t黑樹(shù)中也保存這一個(gè)鏈表順序
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 對(duì)于結(jié)點(diǎn)hash值大于0的情況鏈表
if (e.hash h &&
((ek = e.key) key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
關(guān)于清空也相對(duì)簡(jiǎn)單 ,無(wú)非就是遍歷桶數(shù)組,然后通過(guò)CAS來(lái)置空。
public void clear() {
long delta = 0L;
int i = 0;
Node<K,V>[] tab = table;
while (tab != null && i < tab.length) {
int fh;
Node<K,V> f = tabAt(tab, i);
if (f null)
++i; //這個(gè)桶是空的直接跳過(guò)
else if ((fh = f.hash) MOVED) { // 這個(gè)桶的數(shù)據(jù)還在擴(kuò)容中,要去擴(kuò)容同時(shí)等待。
tab = helpTransfer(tab, f);
i = 0; // restart
}
else {
synchronized (f) { // 真正的刪除
if (tabAt(tab, i) f) {
Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
//循環(huán)到鏈表/者紅黑樹(shù)的尾部
while (p != null) {
--delta; // 記錄刪除了多少個(gè)
p = p.next;
}
//利用CAS無(wú)鎖置null
setTabAt(tab, i++, null);
}
}
}
}
if (delta != 0L)
addCount(delta, -1); //調(diào)整count
}
ConcurrentHashMap是如果來(lái)做到「并發(fā)安全」,又是如何做到「高效」的并發(fā)的呢?
首先是讀操作,讀源碼發(fā)現(xiàn)get方法中根本沒(méi)有使用同步機(jī)制,也沒(méi)有使用unsafe
方法,所以讀操作是支持并發(fā)操作的。
寫(xiě)操作
transfer
,該方法的只有
addCount
,
helpTransfer
和
tryPresize
這三個(gè)方法來(lái)調(diào)用。
addCount是在當(dāng)對(duì)數(shù)組進(jìn)行操作,使得數(shù)組中存儲(chǔ)的元素個(gè)數(shù)發(fā)生了變化的時(shí)候會(huì)調(diào)用的方法。 helpTransfer
是在當(dāng)一個(gè)線程要對(duì)table中元素進(jìn)行操作的時(shí)候,如果檢測(cè)到節(jié)點(diǎn)的·hash·= MOVED 的時(shí)候,就會(huì)調(diào)用helpTransfer
方法,在helpTransfer
中再調(diào)用transfer
方法來(lái)幫助完成數(shù)組的擴(kuò)容??
tryPresize
是在treeIfybin
和putAll
方法中調(diào)用,treeIfybin
主要是在put
添加元素完之后,判斷該數(shù)組節(jié)點(diǎn)相關(guān)元素是不是已經(jīng)超過(guò)8個(gè)的時(shí)候,如果超過(guò)則會(huì)調(diào)用這個(gè)方法來(lái)擴(kuò)容數(shù)組或者把鏈表轉(zhuǎn)為樹(shù)。注意putAll
在初始化傳入一個(gè)大map的時(shí)候會(huì)調(diào)用?!?/section>
總結(jié)擴(kuò)容情況發(fā)生:
??
在往map中添加元素的時(shí)候,在某一個(gè)節(jié)點(diǎn)的數(shù)目已經(jīng)超過(guò)了8個(gè),同時(shí)數(shù)組的長(zhǎng)度又小于64的時(shí)候,才會(huì)觸發(fā)數(shù)組的擴(kuò)容。 當(dāng)數(shù)組中元素達(dá)到了sizeCtl的數(shù)量的時(shí)候,則會(huì)調(diào)用transfer方法來(lái)進(jìn)行擴(kuò)容
3. 擴(kuò)容時(shí)候是否可以進(jìn)行讀寫(xiě)。
?對(duì)于讀操作,因?yàn)槭菦](méi)有加鎖的所以可以的. 對(duì)于寫(xiě)操作,JDK8中已經(jīng)將鎖的范圍細(xì)膩到
?table[i]
l了,當(dāng)在進(jìn)行數(shù)組擴(kuò)容的時(shí)候,如果當(dāng)前節(jié)點(diǎn)還沒(méi)有被處理(也就是說(shuō)還沒(méi)有設(shè)置為fwd節(jié)點(diǎn)),那就可以進(jìn)行設(shè)置操作。如果該節(jié)點(diǎn)已經(jīng)被處理了,則當(dāng)前線程也會(huì)加入到擴(kuò)容的操作中去。
ConcurrentHashMap
中,同步處理主要是通過(guò)
Synchronized
和
unsafe
的硬件級(jí)別原子性 這兩種方式來(lái)完成的。??
在取得sizeCtl跟某個(gè)位置的Node的時(shí)候,使用的都是 unsafe
的方法,來(lái)達(dá)到并發(fā)安全的目的當(dāng)需要在某個(gè)位置設(shè)置節(jié)點(diǎn)的時(shí)候,則會(huì)通過(guò) Synchronized
的同步機(jī)制來(lái)鎖定該位置的節(jié)點(diǎn)。在數(shù)組擴(kuò)容的時(shí)候,則通過(guò)處理的 步長(zhǎng)
和fwd
節(jié)點(diǎn)來(lái)達(dá)到并發(fā)安全的目的,通過(guò)設(shè)置hash值為MOVED=-1。當(dāng)把某個(gè)位置的節(jié)點(diǎn)復(fù)制到擴(kuò)張后的table的時(shí)候,也通過(guò) Synchronized
的同步機(jī)制來(lái)保證線程安全
到此,相信大家對(duì)“ConcurrentHashMap有什么用”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。