溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

如何構(gòu)建java高性能隊(duì)列

發(fā)布時(shí)間:2022-01-06 15:17:49 來(lái)源:億速云 閱讀:101 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“如何構(gòu)建java高性能隊(duì)列”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

隊(duì)列

隊(duì)列,是一種先進(jìn)先出(First In First Out,F(xiàn)IFO)的數(shù)據(jù)結(jié)構(gòu),類(lèi)似于實(shí)際生活場(chǎng)景中的排隊(duì),先到的人先得。

使用數(shù)組和鏈表實(shí)現(xiàn)簡(jiǎn)單的隊(duì)列,我們前面都介紹過(guò)了,這里就不再贅述了,有興趣的同學(xué)可以點(diǎn)擊以下鏈接查看:

重溫四大基礎(chǔ)數(shù)據(jù)結(jié)構(gòu):數(shù)組、鏈表、隊(duì)列和棧

說(shuō)起高性能的隊(duì)列,當(dāng)然是說(shuō)在高并發(fā)環(huán)境下也能夠工作得很好的隊(duì)列,這里的很好主要是指兩個(gè)方面:并發(fā)安全、性能好。

并發(fā)安全的隊(duì)列

在Java中,默認(rèn)地,也自帶了一些并發(fā)安全的隊(duì)列:

隊(duì)列有界性數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue有界加鎖數(shù)組
LinkedBlockingQueue可選有界加鎖鏈表
ConcurrentLinkedQueue無(wú)界無(wú)鎖鏈表
SynchronousQueue無(wú)界無(wú)鎖隊(duì)列或棧
LinkedTransferQueue無(wú)界無(wú)鎖鏈表
PriorityBlockingQueue無(wú)界加鎖
DelayQueue無(wú)界加鎖

> 這些隊(duì)列的源碼解析快捷入口:死磕 Java并發(fā)集合之終結(jié)篇

總結(jié)起來(lái),實(shí)現(xiàn)并發(fā)安全隊(duì)列的數(shù)據(jù)結(jié)構(gòu)主要有:數(shù)組、鏈表和堆,堆主要用于實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列,不具備通用性,暫且不討論。

從有界性來(lái)看,只有ArrayBlockingQueue和LinkedBlockingQueue可以實(shí)現(xiàn)有界隊(duì)列,其它的都是無(wú)界隊(duì)列。

從加鎖來(lái)看,ArrayBlockingQueue和LinkedBlockingQueue都采用了加鎖的方式,其它的都是采用的CAS這種無(wú)鎖的技術(shù)實(shí)現(xiàn)的。

從安全性的角度來(lái)說(shuō),我們一般都要選擇有界隊(duì)列,防止生產(chǎn)者速度過(guò)快導(dǎo)致內(nèi)存溢出。

從性能的角度來(lái)說(shuō),我們一般要考慮無(wú)鎖的方式,減少線(xiàn)程上下文切換帶來(lái)的性能損耗。

從JVM的角度來(lái)說(shuō),我們一般選擇數(shù)組的實(shí)現(xiàn)方式,因?yàn)殒湵頃?huì)頻繁的增刪節(jié)點(diǎn),導(dǎo)致頻繁的垃圾回收,這也是一種性能損耗。

所以,最佳的選擇就是:數(shù)組 + 有界 + 無(wú)鎖。

而JDK并沒(méi)有提供這樣的隊(duì)列,因此,很多開(kāi)源框架都自己實(shí)現(xiàn)了高性能的隊(duì)列,比如Disruptor,以及Netty中使用的jctools。

高性能隊(duì)列

我們這里不討論具體的某一個(gè)框架,只介紹實(shí)現(xiàn)高性能隊(duì)列的通用技術(shù),并自己實(shí)現(xiàn)一個(gè)。

環(huán)形數(shù)組

通過(guò)上面的討論,我們知道實(shí)現(xiàn)高性能隊(duì)列使用的數(shù)據(jù)結(jié)構(gòu)只能是數(shù)組,而數(shù)組實(shí)現(xiàn)隊(duì)列,必然要使用到環(huán)形數(shù)組。

環(huán)形數(shù)組,一般通過(guò)設(shè)置兩個(gè)指針實(shí)現(xiàn):putIndex和takeIndex,或者叫writeIndex和readIndex,一個(gè)用于寫(xiě),一個(gè)用于讀。

當(dāng)寫(xiě)指針到達(dá)數(shù)組尾端時(shí),會(huì)從頭開(kāi)始,當(dāng)然,不能越過(guò)讀指針,同理,讀指針到達(dá)數(shù)組尾端時(shí),也會(huì)從頭開(kāi)始,當(dāng)然,不能讀取未寫(xiě)入的數(shù)據(jù)。

而為了防止寫(xiě)指針和讀指針重疊的時(shí)候,無(wú)法分清隊(duì)列到底是滿(mǎn)了還是空的狀態(tài),一般會(huì)再添加一個(gè)size字段:

如何構(gòu)建java高性能隊(duì)列

所以,使用環(huán)形數(shù)組實(shí)現(xiàn)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)一般為:

public class ArrayQueue<t> {
    private T[] array;
    private long wrtieIndex;
    private long readIndex;
    private long size;
}

在單線(xiàn)程的情況下,這樣不會(huì)有任何問(wèn)題,但是,在多線(xiàn)程環(huán)境中,這樣會(huì)帶來(lái)嚴(yán)重的偽共享問(wèn)題。

偽共享

什么是共享?

在計(jì)算機(jī)中,有很多存儲(chǔ)單元,我們接觸最多的就是內(nèi)存,又叫做主內(nèi)存,此外,CPU還有三級(jí)緩存:L1、L2、L3,L1最貼近CPU,當(dāng)然,它的存儲(chǔ)空間也很小,L2比L1稍大一些,L3最大,可以同時(shí)緩存多個(gè)核心的數(shù)據(jù)。CPU取數(shù)據(jù)的時(shí)候,先從L1緩存中讀取,如果沒(méi)有再?gòu)腖2緩存中讀取,如果沒(méi)有再?gòu)腖3中讀取,如果三級(jí)緩存都沒(méi)有,最后會(huì)從內(nèi)存中讀取。離CPU核心越遠(yuǎn),則相對(duì)的耗時(shí)就越長(zhǎng),所以,如果要做一些很頻繁的操作,要盡量保證數(shù)據(jù)緩存在L1中,這樣能極大地提高性能。

緩存行

而數(shù)據(jù)在三級(jí)緩存中,也不是說(shuō)來(lái)一個(gè)數(shù)據(jù)緩存一下,而是一次緩存一批數(shù)據(jù),這一批數(shù)據(jù)又稱(chēng)作緩存行(Cache Line),通常為64字節(jié)。

如何構(gòu)建java高性能隊(duì)列

每一次,當(dāng)CPU去內(nèi)存中拿數(shù)據(jù)的時(shí)候,都會(huì)把它后面的數(shù)據(jù)一并拿過(guò)來(lái)(組成64字節(jié)),我們以long型數(shù)組為例,當(dāng)CPU取數(shù)組中一個(gè)long的時(shí)候,同時(shí)會(huì)把后續(xù)的7個(gè)long一起取到緩存行中。

這在一定程度上能夠加快數(shù)據(jù)的處理,因?yàn)?,此時(shí)在處理下標(biāo)為0的數(shù)據(jù),下一個(gè)時(shí)刻可能就要處理下標(biāo)為1的數(shù)據(jù)了,直接從緩存中取要快很多。

但是,這樣又帶來(lái)了一個(gè)新的問(wèn)題——偽共享。

偽共享

試想一下,兩個(gè)線(xiàn)程(CPU)同時(shí)在處理這個(gè)數(shù)組中的數(shù)據(jù),兩個(gè)CPU都緩存了,一個(gè)CPU在對(duì)array[0]的數(shù)據(jù)加1,另一個(gè)CPU在對(duì)array[1]的數(shù)據(jù)加1,那么,回寫(xiě)到主內(nèi)存的時(shí)候,到底以哪個(gè)緩存行的數(shù)據(jù)為準(zhǔn)(寫(xiě)回主內(nèi)存的時(shí)候也是以緩存行的形式寫(xiě)回),所以,此時(shí),就需要對(duì)這兩個(gè)緩存行“加鎖”了,一個(gè)CPU先修改數(shù)據(jù),寫(xiě)回主內(nèi)存,另一個(gè)CPU才能讀取數(shù)據(jù)并修改數(shù)據(jù),再寫(xiě)回主內(nèi)存,這樣勢(shì)必會(huì)帶來(lái)性能的損耗,出現(xiàn)的這種現(xiàn)象就叫做偽共享,這種“加鎖”的方式叫做內(nèi)存屏障,關(guān)于內(nèi)存屏障的知識(shí)我們就不展開(kāi)敘述了。

那么,怎么解決偽共享帶來(lái)的問(wèn)題呢?

以環(huán)形數(shù)組實(shí)現(xiàn)的隊(duì)列為例,writeIndex、readIndex、size現(xiàn)在是這樣處理的:

如何構(gòu)建java高性能隊(duì)列

所以,我們只需要在writeIndex和readIndex之間加7個(gè)long就可以把它們隔離開(kāi),同理,readIndex和size之間也是一樣的。

如何構(gòu)建java高性能隊(duì)列

這樣就消除了writeIndex和readIndex之間的偽共享問(wèn)題,因?yàn)閣riteIndex和readIndex肯定是在兩個(gè)不同的線(xiàn)程中更新,所以,消除偽共享之后帶來(lái)的性能提升是很明顯的。

假如有多個(gè)生產(chǎn)者,writeIndex是肯定會(huì)被爭(zhēng)用的,此時(shí),要怎么友好地修改writeIndex呢?即一個(gè)生產(chǎn)者線(xiàn)程修改了writeIndex,另一個(gè)生產(chǎn)者線(xiàn)程要立馬可見(jiàn)。

你第一時(shí)間想到的肯定是volatile,沒(méi)錯(cuò),可是光volatile還不行哦,volatile只能保證可見(jiàn)性和有序性,不能保證原子性,所以,還需要加上原子指令CAS,CAS是誰(shuí)提供的?原子類(lèi)AtomicInteger和AtomicLong都具有CAS的功能,那我們直接使用他們嗎?肯定不是,仔細(xì)觀察,發(fā)現(xiàn)他們最終都是調(diào)用Unsafe實(shí)現(xiàn)的。

OK,下面就輪到最牛逼的底層殺手登場(chǎng)了——Unsafe。

Unsafe

Unsafe不僅提供了CAS的指令,還提供很多其它操作底層的方法,比如操作直接內(nèi)存、修改私有變量的值、實(shí)例化一個(gè)類(lèi)、阻塞/喚醒線(xiàn)程、帶有內(nèi)存屏障的方法等。

> 關(guān)于Unsafe,可以看這篇文章:死磕 java魔法類(lèi)之Unsafe解析

當(dāng)然,構(gòu)建高性能隊(duì)列,主要使用的是Unsafe的CAS指令以及帶有內(nèi)存屏障的方法等:

// 原子指令
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
// 以volatile的形式獲取值,相當(dāng)于給變量加了volatile關(guān)鍵字
public native long getLongVolatile(Object var1, long var2);
// 延遲更新,對(duì)變量的修改不會(huì)立即寫(xiě)回到主內(nèi)存,也就是說(shuō),另一個(gè)線(xiàn)程不會(huì)立即可見(jiàn)
public native void putOrderedLong(Object var1, long var2, long var4);

好了,底層知識(shí)介紹的差不多了,是時(shí)候展現(xiàn)真正的技術(shù)了——手寫(xiě)高性能隊(duì)列。

手寫(xiě)高性能隊(duì)列

我們假設(shè)這樣一種場(chǎng)景:有多個(gè)生產(chǎn)者(Multiple Producer),卻只有一個(gè)消費(fèi)者(Single Consumer),這是Netty中的經(jīng)典場(chǎng)景,這樣一種隊(duì)列該怎么實(shí)現(xiàn)?

直接上代碼:

/**
 * 多生產(chǎn)者單消費(fèi)者隊(duì)列
 *
 * @param <t>
 */
public class MpscArrayQueue<t> {

    long p01, p02, p03, p04, p05, p06, p07;
    // 存放元素的地方
    private T[] array;
    long p1, p2, p3, p4, p5, p6, p7;
    // 寫(xiě)指針,多個(gè)生產(chǎn)者,所以聲明為volatile
    private volatile long writeIndex;
    long p11, p12, p13, p14, p15, p16, p17;
    // 讀指針,只有一個(gè)消費(fèi)者,所以不用聲明為volatile
    private long readIndex;
    long p21, p22, p23, p24, p25, p26, p27;
    // 元素個(gè)數(shù),生產(chǎn)者和消費(fèi)者都可能修改,所以聲明為volatile
    private volatile long size;
    long p31, p32, p33, p34, p35, p36, p37;

    // Unsafe變量
    private static final Unsafe UNSAFE;
    // 數(shù)組基礎(chǔ)偏移量
    private static final long ARRAY_BASE_OFFSET;
    // 數(shù)組元素偏移量
    private static final long ARRAY_ELEMENT_SHIFT;
    // writeIndex的偏移量
    private static final long WRITE_INDEX_OFFSET;
    // readIndex的偏移量
    private static final long READ_INDEX_OFFSET;
    // size的偏移量
    private static final long SIZE_OFFSET;

    static {
        Field f = null;
        try {
            // 獲取Unsafe的實(shí)例
            f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            UNSAFE = (Unsafe) f.get(null);

            // 計(jì)算數(shù)組基礎(chǔ)偏移量
            ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(Object[].class);
            // 計(jì)算數(shù)組中元素偏移量
            // 簡(jiǎn)單點(diǎn)理解,64位系統(tǒng)中有壓縮指針占用4個(gè)字節(jié),沒(méi)有壓縮指針占用8個(gè)字節(jié)
            int scale = UNSAFE.arrayIndexScale(Object[].class);
            if (4 == scale) {
                ARRAY_ELEMENT_SHIFT = 2;
            } else if (8 == scale) {
                ARRAY_ELEMENT_SHIFT = 3;
            } else {
                throw new IllegalStateException("未知指針的大小");
            }

            // 計(jì)算writeIndex的偏移量
            WRITE_INDEX_OFFSET = UNSAFE
                    .objectFieldOffset(MpscArrayQueue.class.getDeclaredField("writeIndex"));
            // 計(jì)算readIndex的偏移量
            READ_INDEX_OFFSET = UNSAFE
                    .objectFieldOffset(MpscArrayQueue.class.getDeclaredField("readIndex"));
            // 計(jì)算size的偏移量
            SIZE_OFFSET = UNSAFE
                    .objectFieldOffset(MpscArrayQueue.class.getDeclaredField("size"));
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }

    // 構(gòu)造方法
    public MpscArrayQueue(int capacity) {
        // 取整到2的N次方(未考慮越界)
        capacity = 1 &lt;&lt; (32 - Integer.numberOfLeadingZeros(capacity - 1));
        // 實(shí)例化數(shù)組
        this.array = (T[]) new Object[capacity];
    }

    // 生產(chǎn)元素
    public boolean put(T t) {
        if (t == null) {
            return false;
        }
        long size;
        long writeIndex;
        do {
            // 每次循環(huán)都重新獲取size的大小
            size = this.size;
            // 隊(duì)列滿(mǎn)了直接返回
            if (size &gt;= this.array.length) {
                return false;
            }

            // 每次循環(huán)都重新獲取writeIndex的值
            writeIndex = this.writeIndex;

            // while循環(huán)中原子更新writeIndex的值
            // 如果失敗了重新走上面的過(guò)程
        } while (!UNSAFE.compareAndSwapLong(this, WRITE_INDEX_OFFSET, writeIndex, writeIndex + 1));

        // 到這里,說(shuō)明上述原子更新成功了
        // 那么,就把元素的值放到writeIndex的位置
        // 且更新size
        long eleOffset = calcElementOffset(writeIndex, this.array.length-1);
        // 延遲更新到主內(nèi)存,讀取的時(shí)候才更新
        UNSAFE.putOrderedObject(this.array, eleOffset, t);

        // 往死里更新直到成功
        do {
            size = this.size;
        } while (!UNSAFE.compareAndSwapLong(this, SIZE_OFFSET, size, size + 1));

        return true;
    }

    // 消費(fèi)元素
    public T take() {
        long size = this.size;
        // 如果size為0,表示隊(duì)列為空,直接返回
        if (size &lt;= 0) {
            return null;
        }
        // size大于0,肯定有值
        // 只有一個(gè)消費(fèi)者,不用考慮線(xiàn)程安全的問(wèn)題
        long readIndex = this.readIndex;
        // 計(jì)算讀指針處元素的偏移量
        long offset = calcElementOffset(readIndex, this.array.length-1);
            // 獲取讀指針處的元素,使用volatile語(yǔ)法,強(qiáng)制更新生產(chǎn)者的數(shù)據(jù)到主內(nèi)存
        T e = (T) UNSAFE.getObjectVolatile(this.array, offset);

        // 增加讀指針
        UNSAFE.putOrderedLong(this, READ_INDEX_OFFSET, readIndex+1);
        // 減小size
        do {
            size = this.size;
        } while (!UNSAFE.compareAndSwapLong(this, SIZE_OFFSET, size, size-1));

        return e;
    }

    private long calcElementOffset(long index, long mask) {
        // index &amp; mask 相當(dāng)于取余數(shù),表示index到達(dá)數(shù)組尾端了從頭開(kāi)始
        return ARRAY_BASE_OFFSET + ((index &amp; mask) &lt;&lt; ARRAY_ELEMENT_SHIFT);
    }

}

“如何構(gòu)建java高性能隊(duì)列”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI