溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Java中怎么使用ConcurrentHashMap實(shí)現(xiàn)線(xiàn)程安全的Map

發(fā)布時(shí)間:2023-04-27 10:28:03 來(lái)源:億速云 閱讀:88 作者:iii 欄目:開(kāi)發(fā)技術(shù)

本文小編為大家詳細(xì)介紹“Java中怎么使用ConcurrentHashMap實(shí)現(xiàn)線(xiàn)程安全的Map”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“Java中怎么使用ConcurrentHashMap實(shí)現(xiàn)線(xiàn)程安全的Map”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來(lái)學(xué)習(xí)新知識(shí)吧。

jdk1.7版本

數(shù)據(jù)結(jié)構(gòu)

    /**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;

可以看到主要就是一個(gè)Segment數(shù)組,注釋也寫(xiě)了,每個(gè)都是一個(gè)特殊的hash table。

來(lái)看一下Segment是什么東西。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    	......
            /**
         * The per-segment table. Elements are accessed via
         * entryAt/setEntryAt providing volatile semantics.
         */
        transient volatile HashEntry<K,V>[] table;
        transient int threshold;
        final float loadFactor;
    	// 構(gòu)造函數(shù)
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
  		......
    }

上面是部分代碼,可以看到Segment繼承了ReentrantLock,所以其實(shí)每個(gè)Segment就是一個(gè)鎖。

里面存放著HashEntry數(shù)組,該變量用volatile修飾。HashEntry和hashmap的節(jié)點(diǎn)類(lèi)似,也是一個(gè)鏈表的節(jié)點(diǎn)。

來(lái)看看具體的代碼,可以看到和hashmap里面稍微不同的是,他的成員變量有用volatile修飾。

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
        ......
    }

所以ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)差不多是下圖這種樣子的。

Java中怎么使用ConcurrentHashMap實(shí)現(xiàn)線(xiàn)程安全的Map

在構(gòu)造的時(shí)候,Segment 的數(shù)量由所謂的 concurrentcyLevel 決定,默認(rèn)是 16,也可以在相應(yīng)構(gòu)造函數(shù)直接指定。注意,Java 需要它是 2 的冪數(shù)值,如果輸入是類(lèi)似 15 這種非冪值,會(huì)被自動(dòng)調(diào)整到 16 之類(lèi) 2 的冪數(shù)值。

來(lái)看看源碼,先從簡(jiǎn)單的get方法開(kāi)始

get()

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        // 通過(guò)unsafe獲取Segment數(shù)組的元素
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            // 還是通過(guò)unsafe獲取HashEntry數(shù)組的元素
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

get的邏輯很簡(jiǎn)單,就是找到Segment對(duì)應(yīng)下標(biāo)的HashEntry數(shù)組,再找到HashEntry數(shù)組對(duì)應(yīng)下標(biāo)的鏈表頭,再遍歷鏈表獲取數(shù)據(jù)。

這個(gè)獲取數(shù)組中的數(shù)據(jù)是使用UNSAFE.getObjectVolatile(segments, u),unsafe提供了像c語(yǔ)言的可以直接訪(fǎng)問(wèn)內(nèi)存的能力。該方法可以獲取對(duì)象的相應(yīng)偏移量的數(shù)據(jù)。u就是計(jì)算好的一個(gè)偏移量,所以等同于segments[i],只是效率更高。

put()

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

而對(duì)于 put 操作,是以 Unsafe 調(diào)用方式,直接獲取相應(yīng)的 Segment,然后進(jìn)行線(xiàn)程安全的 put 操作:

主要邏輯在Segment內(nèi)部的put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // scanAndLockForPut會(huì)去查找是否有key相同Node
            // 無(wú)論如何,確保獲取鎖
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        // 更新已有value...
                    }
                    else {
                        // 放置HashEntry到特定位置,如果超過(guò)閾值,進(jìn)行rehash
                        // ...
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

size()

來(lái)看一下主要的代碼,

for (;;) {
    // 如果重試次數(shù)等于默認(rèn)的2,就鎖住所有的segment,來(lái)計(jì)算值
    if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
            sum += seg.modCount;
            int c = seg.count;
            if (c < 0 || (size += c) < 0)
                overflow = true;
        }
    }
    // 如果sum不再變化,就表示得到了一個(gè)確切的值
    if (sum == last)
        break;
    last = sum;
}

這里其實(shí)就是計(jì)算所有segment的數(shù)量和,如果數(shù)量和跟上次獲取到的值相等,就表示map沒(méi)有進(jìn)行操作,這個(gè)值是相對(duì)正確的。如果重試兩次之后還是沒(méi)法得到一個(gè)統(tǒng)一的值,就鎖住所有的segment,再來(lái)獲取值。

擴(kuò)容

private void rehash(HashEntry<K,V> node) {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
    		// 新表的大小是原來(lái)的兩倍
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        // 如果有多個(gè)節(jié)點(diǎn)
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        // 這里操作就是找到末尾的一段索引值都相同的鏈表節(jié)點(diǎn),這段的頭結(jié)點(diǎn)是lastRun.
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        // 然后將lastRun結(jié)點(diǎn)賦值給數(shù)組位置,這樣lastRun后面的節(jié)點(diǎn)也跟著過(guò)去了。
                        newTable[lastIdx] = lastRun;
                        // 之后就是復(fù)制開(kāi)頭到lastRun之間的節(jié)點(diǎn)
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

jdk1.8版本

數(shù)據(jù)結(jié)構(gòu)

1.8的版本的ConcurrentHashmap整體上和Hashmap有點(diǎn)像,但是去除了segment,而是使用node的數(shù)組。

transient volatile Node<K,V>[] table;

1.8中還是有Segment這個(gè)內(nèi)部類(lèi),但是存在的意義只是為了序列化兼容,實(shí)際已經(jīng)不使用了。

來(lái)看一下node節(jié)點(diǎn)

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        ......
    }

和HashMap中的node節(jié)點(diǎn)類(lèi)似,也是實(shí)現(xiàn)Map.Entry,不同的是val和next加上了volatile修飾來(lái)保證可見(jiàn)性。

put()

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 利用CAS去進(jìn)行無(wú)鎖線(xiàn)程安全操作,如果bin是空的
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                     // 細(xì)粒度的同步修改操作... 
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 找到相同key就更新
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 沒(méi)有相同的就新增
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果是樹(shù)節(jié)點(diǎn),進(jìn)行樹(shù)的操作
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // Bin超過(guò)閾值,進(jìn)行樹(shù)化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

可以看到,在同步邏輯上,它使用的是 synchronized,而不是通常建議的 ReentrantLock 之類(lèi),這是為什么呢?現(xiàn)在 JDK1.8 中,synchronized 已經(jīng)被不斷優(yōu)化,可以不再過(guò)分擔(dān)心性能差異,另外,相比于 ReentrantLock,它可以減少內(nèi)存消耗,這是個(gè)非常大的優(yōu)勢(shì)。

與此同時(shí),更多細(xì)節(jié)實(shí)現(xiàn)通過(guò)使用 Unsafe 進(jìn)行了優(yōu)化,例如 tabAt 就是直接利用 getObjectAcquire,避免間接調(diào)用的開(kāi)銷(xiāo)。

那么,再來(lái)看看size是怎么操作的?

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

這里就是獲取成員變量counterCells,遍歷獲取總數(shù)。

其實(shí),對(duì)于 CounterCell 的操作,是基于 java.util.concurrent.atomic.LongAdder 進(jìn)行的,是一種 JVM 利用空間換取更高效率的方法,利用了Striped64內(nèi)部的復(fù)雜邏輯。這個(gè)東西非常小眾,大多數(shù)情況下,建議還是使用 AtomicLong,足以滿(mǎn)足絕大部分應(yīng)用的性能需求。

擴(kuò)容

 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
		......
        // 初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
     	// 是否繼續(xù)處理下一個(gè)
        boolean advance = true;
     	// 是否完成
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 首次循環(huán)才會(huì)進(jìn)來(lái)這里
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //擴(kuò)容結(jié)束后做后續(xù)工作
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //每當(dāng)一條線(xiàn)程擴(kuò)容結(jié)束就會(huì)更新一次 sizeCtl 的值,進(jìn)行減 1 操作
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 如果是null,設(shè)置fwd
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 說(shuō)明該位置已經(jīng)被處理過(guò)了,不需要再處理
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                // 真正的處理邏輯
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 樹(shù)節(jié)點(diǎn)操作
                        else if (f instanceof TreeBin) {
                            ......
                        }
                    }
                }
            }
        }
    }
     }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 樹(shù)節(jié)點(diǎn)操作
                    else if (f instanceof TreeBin) {
                        ......
                    }
                }
            }
        }
    }
}

核心邏輯和HashMap一樣也是創(chuàng)建兩個(gè)鏈表,只是多了獲取lastRun的操作。

讀到這里,這篇“Java中怎么使用ConcurrentHashMap實(shí)現(xiàn)線(xiàn)程安全的Map”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過(guò)才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI