溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Netty分布式ByteBuf怎么使用命中緩存分配

發(fā)布時間:2022-03-29 09:07:50 來源:億速云 閱讀:141 作者:iii 欄目:開發(fā)技術(shù)

今天小編給大家分享一下Netty分布式ByteBuf怎么使用命中緩存分配的相關(guān)知識點(diǎn),內(nèi)容詳細(xì),邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

分析先關(guān)邏輯之前, 首先介紹緩存對象的數(shù)據(jù)結(jié)構(gòu)

回顧上一小節(jié)的內(nèi)容, 我們講到PoolThreadCache中維護(hù)了三個緩存數(shù)組(實(shí)際上是六個, 這里僅僅以Direct為例, heap類型的邏輯是一樣的): tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches分別代表tiny類型, small類型和normal類型的緩存數(shù)組

這三個數(shù)組保存在PoolThreadCache的成員變量中:

private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

其中是在構(gòu)造方法中進(jìn)行了初始化:

tinySubPageDirectCaches = createSubPageCaches(
        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
        smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
normalDirectCaches = createNormalCaches(
        normalCacheSize, maxCachedBufferCapacity, directArena);

我們以tiny類型為例跟到createSubPageCaches方法中

private static <T> MemoryRegionCache<T>[] createSubPageCaches(
        int cacheSize, int numCaches, SizeClass sizeClass) {
    if (cacheSize > 0) {
        @SuppressWarnings("unchecked")
        MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
        for (int i = 0; i < cache.length; i++) {
            cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
        }
        return cache;
    } else {
        return null;
    }
}

這里上面的小節(jié)已經(jīng)分析過, 這里創(chuàng)建了一個緩存數(shù)組, 這個緩存數(shù)組的長度,也就是numCaches, 在不同的類型, 這個長度不一樣, tiny類型長度是32, small類型長度為4, normal類型長度為3

我們知道, 緩存數(shù)組中每個節(jié)點(diǎn)代表一個緩存對象, 里面維護(hù)了一個隊列, 隊列大小由PooledByteBufAllocator類中的tinyCacheSize, smallCacheSize, normalCacheSize屬性決定的, 這里之前小節(jié)已經(jīng)剖析過

其中每個緩存對象, 隊列中緩存的ByteBuf大小是固定的, netty將每種緩沖區(qū)類型分成了不同長度規(guī)格, 而每個緩存中的隊列緩存的ByteBuf的長度, 都是同一個規(guī)格的長度, 而緩沖區(qū)數(shù)組的長度, 就是規(guī)格的數(shù)量

比如, 在tiny類型中, netty將其長度分成32個規(guī)格, 每個規(guī)格都是16的整數(shù)倍, 也就是包含0B, 16B, 32B, 48B, 64B, 80B, 96B......496B總共32種規(guī)格, 而在其緩存數(shù)組tinySubPageDirectCaches中, 這每一種規(guī)格代表數(shù)組中的一個緩存對象緩存的ByteBuf的大小, 我們以tinySubPageDirectCaches[1]為例(這里下標(biāo)選擇1是因?yàn)橄聵?biāo)為0代表的規(guī)格是0B, 其實(shí)就代表一個空的緩存, 這里不進(jìn)行舉例), 在tinySubPageDirectCaches[1]的緩存對象中所緩存的ByteBuf的緩沖區(qū)長度是16B, 在tinySubPageDirectCaches[2]中緩存的ByteBuf長度都為32B, 以此類推, tinySubPageDirectCaches[31]中緩存的ByteBuf長度為496B

有關(guān)類型規(guī)則的分配如下:

tiny:總共32個規(guī)格, 均是16的整數(shù)倍, 0B, 16B, 32B, 48B, 64B, 80B, 96B......496B

small:4種規(guī)格, 512b, 1k, 2k, 4k

nomal:3種規(guī)格, 8k, 16k, 32k

這樣, PoolThreadCache中緩存數(shù)組的數(shù)據(jù)結(jié)構(gòu)為

Netty分布式ByteBuf怎么使用命中緩存分配

大概了解緩存數(shù)組的數(shù)據(jù)結(jié)構(gòu), 我們再繼續(xù)剖析在緩沖中分配內(nèi)存的邏輯

回到PoolArena的allocate方法中

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    //規(guī)格化
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { 
        int tableIdx;
        PoolSubpage<T>[] table;
        //判斷是不是tinty
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            //緩存分配
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            //通過tinyIdx拿到tableIdx
            tableIdx = tinyIdx(normCapacity);
            //subpage的數(shù)組
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        //拿到對應(yīng)的節(jié)點(diǎn)
        final PoolSubpage<T> head = table[tableIdx];

        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            //默認(rèn)情況下, head的next也是自身
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {
                    allocationsTiny.increment();
                } else {
                    allocationsSmall.increment();
                }
                return;
            }
        }
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    if (normCapacity <= chunkSize) {
        //首先在緩存上進(jìn)行內(nèi)存分配
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            //分配成功, 返回
            return;
        }
        //分配不成功, 做實(shí)際的內(nèi)存分配
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        //大于這個值, 就不在緩存上分配
        allocateHuge(buf, reqCapacity);
    }
}

首先通過normalizeCapacity方法進(jìn)行內(nèi)存規(guī)格化

我們跟到normalizeCapacity方法中

int normalizeCapacity(int reqCapacity) {
    if (reqCapacity < 0) {
        throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
    }
    if (reqCapacity >= chunkSize) {
        return reqCapacity;
    }
    //如果>tiny
    if (!isTiny(reqCapacity)) { // >= 512
        //找一個2的冪次方的數(shù)值, 確保數(shù)值大于等于reqCapacity
        int normalizedCapacity = reqCapacity;
        normalizedCapacity --;
        normalizedCapacity |= normalizedCapacity >>>  1;
        normalizedCapacity |= normalizedCapacity >>>  2;
        normalizedCapacity |= normalizedCapacity >>>  4;
        normalizedCapacity |= normalizedCapacity >>>  8;
        normalizedCapacity |= normalizedCapacity >>> 16;
        normalizedCapacity ++;

        if (normalizedCapacity < 0) {
            normalizedCapacity >>>= 1;
        }

        return normalizedCapacity;
    }
    //如果是16的倍數(shù)
    if ((reqCapacity & 15) == 0) {
        return reqCapacity;
    }
    //不是16的倍數(shù), 變成最大小于當(dāng)前值的值+16
    return (reqCapacity & ~15) + 16;
}

 if (!isTiny(reqCapacity)) 代表如果大于tiny類型的大小, 也就是512, 則會找一個2的冪次方的數(shù)值, 確保這個數(shù)值大于等于reqCapacity

如果是tiny, 則繼續(xù)往下

 if ((reqCapacity & 15) == 0) 這里判斷如果是16的倍數(shù), 則直接返回

如果不是16的倍數(shù), 則返回 (reqCapacity & ~15) + 16 , 也就是變成最小大于當(dāng)前值的16的倍數(shù)值

從上面規(guī)格化邏輯看出, 這里將緩存大小規(guī)格化成固定大小, 確保每個緩存對象緩存的ByteBuf容量統(tǒng)一

回到allocate方法中

 if(isTinyOrSmall(normCapacity)) 這里是根據(jù)規(guī)格化后的大小判斷是否tiny或者small類型, 我們跟到方法中:

boolean isTinyOrSmall(int normCapacity) {
    return (normCapacity & subpageOverflowMask) == 0;
}

這里是判斷如果normCapacity小于一個page的大小, 也就是8k代表其實(shí)tiny或者small

繼續(xù)看allocate方法:

如果當(dāng)前大小是tiny或者small, 則isTiny(normCapacity)判斷是否是tiny類型, 跟進(jìn)去:

static boolean isTiny(int normCapacity) {
    return (normCapacity & 0xFFFFFE00) == 0;
}

這里是判斷如果小于512, 則認(rèn)為是tiny

再繼續(xù)看allocate方法:

如果是tiny, 則通過cache.allocateTiny(this, buf, reqCapacity, normCapacity)在緩存上進(jìn)行分配

我們就以tiny類型為例, 分析在緩存上分配ByteBuf的流程

allocateTiny是緩存分配的入口

我們跟進(jìn)去, 進(jìn)入到了PoolThreadCache的allocateTiny方法中:

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

這里有個方法cacheForTiny(area, normCapacity), 這個方法的作用是根據(jù)normCapacity找到tiny類型緩存數(shù)組中的一個緩存對象

我們跟進(jìn)cacheForTiny:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { 
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}

PoolArena.tinyIdx(normCapacity)是找到tiny類型緩存數(shù)組的下標(biāo)

繼續(xù)跟tinyIdx:

static int tinyIdx(int normCapacity) {
    return normCapacity >>> 4;
}

這里直接將normCapacity除以16, 通過前面的內(nèi)容我們知道, tiny類型緩存數(shù)組中每個元素規(guī)格化的數(shù)據(jù)都是16的倍數(shù), 所以通過這種方式可以找到其下標(biāo), 參考圖5-2, 如果是16B會拿到下標(biāo)為1的元素, 如果是32B則會拿到下標(biāo)為2的元素

回到acheForTiny方法中

 if (area.isDirect()) 這里判斷是否是分配堆外內(nèi)存, 因?yàn)槲覀兪前凑斩淹鈨?nèi)存進(jìn)行舉例, 所以這里為true

再繼續(xù)跟到cache(tinySubPageDirectCaches, idx)方法中:

private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    if (cache == null || idx > cache.length - 1) {
        return null;
    } 
    return cache[idx];
}

這里我們看到直接通過下標(biāo)的方式拿到了緩存數(shù)組中的對象

回到PoolThreadCache的allocateTiny方法中:

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

拿到了緩存對象之后, 我們跟到allocate(cacheForTiny(area, normCapacity), buf, reqCapacity)方法中:

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
    if (cache == null) {
        return false;
    }
    boolean allocated = cache.allocate(buf, reqCapacity);
    if (++ allocations >= freeSweepAllocationThreshold) {
        allocations = 0;
        trim();
    }
    return allocated;
}

這里通過cache.allocate(buf, reqCapacity)進(jìn)行繼續(xù)進(jìn)行分配

再繼續(xù)往里跟, 跟到內(nèi)部類MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法中:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    entry.recycle();
    ++ allocations;
    return true;
}

這里首先通過queue.poll()這種方式彈出一個entry, 我們之前的小節(jié)分析過, MemoryRegionCache維護(hù)著一個隊列, 而隊列中的每一個值是一個entry

我們簡單看下Entry這個類

static final class Entry<T> {
    final Handle<Entry<?>> recyclerHandle;
    PoolChunk<T> chunk;
    long handle = -1;
    //代碼省略
}

這里重點(diǎn)關(guān)注chunk和handle的這兩個屬性, chunk代表一塊連續(xù)的內(nèi)存, 我們之前簡單介紹過, netty是通過chunk為單位進(jìn)行內(nèi)存分配的, 我們之后會對chunk進(jìn)行剖析

handle相當(dāng)于一個指針, 可以唯一定位到chunk里面的一塊連續(xù)的內(nèi)存, 之后也會詳細(xì)分析

這樣, 通過chunk和handle就可以定位ByteBuf中指定一塊連續(xù)內(nèi)存, 有關(guān)ByteBuf相關(guān)的讀寫, 都會在這塊內(nèi)存中進(jìn)行

我們回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    entry.recycle();
    ++ allocations;
    return true;
}

彈出entry之后, 通過initBuf(entry.chunk, entry.handle, buf, reqCapacity)這種方式給ByteBuf初始化, 這里參數(shù)傳入我們剛才分析過的當(dāng)前Entry的chunk和hanle

因?yàn)槲覀兎治龅膖iny類型的緩存對象是SubPageMemoryRegionCache類型,所以我們繼續(xù)跟到SubPageMemoryRegionCache類的initBuf(entry.chunk, entry.handle, buf, reqCapacity)方法中:

protected void initBuf(
        PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
    chunk.initBufWithSubpage(buf, handle, reqCapacity);
}

這里的chunk調(diào)用了initBufWithSubpage(buf, handle, reqCapacity)方法, 其實(shí)就是PoolChunk類中的方法

我們繼續(xù)跟initBufWithSubpage:

void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
}

這里有關(guān)bitmapIdx(handle)相關(guān)的邏輯, 會在后續(xù)的章節(jié)進(jìn)行剖析, 這里繼續(xù)往里跟:

private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
    assert bitmapIdx != 0;
    int memoryMapIdx = memoryMapIdx(handle);
    PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
    assert subpage.doNotDestroy;
    assert reqCapacity <= subpage.elemSize;
    buf.init(
        this, handle, 
        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize, 
        arena.parent.threadCache());
}

這里我們先關(guān)注init方法, 因?yàn)槲覀兪且訮ooledUnsafeDirectByteBuf為例, 所以這里走的是PooledUnsafeDirectByteBuf的init方法

跟進(jìn)init方法

void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength, 
          PoolThreadCache cache) {
    super.init(chunk, handle, offset, length, maxLength, cache);
    initMemoryAddress();
}

首先調(diào)用了父類的init方法, 再跟進(jìn)去:

void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
    //初始化
    assert handle >= 0;
    assert chunk != null;
    //在哪一塊內(nèi)存上進(jìn)行分配的
    this.chunk = chunk;
    //這一塊內(nèi)存上的哪一塊連續(xù)內(nèi)存
    this.handle = handle;
    memory = chunk.memory;
    this.offset = offset;
    this.length = length;
    this.maxLength = maxLength;
    tmpNioBuf = null;
    this.cache = cache;
}

這里將PooledUnsafeDirectByteBuf的各個屬性進(jìn)行了初始化

 this.chunk = chunk 這里初始化了chunk, 代表當(dāng)前的ByteBuf是在哪一塊內(nèi)存中分配的

 this.handle = handle 這里初始化了handle, 代表當(dāng)前的ByteBuf是這塊內(nèi)存的哪個連續(xù)內(nèi)存

有關(guān)offset和length, 我們會在之后的小節(jié)進(jìn)行分析, 在這里我們只需要知道, 通過緩存分配ByteBuf, 我們只需要通過一個chunk和handle, 就可以確定一塊內(nèi)存

以上就是通過緩存分配ByteBuf對象的過程

我們回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    entry.recycle();
    ++ allocations;
    return true;
}

分析完了initBuf方法, 再繼續(xù)往下看

entry.recycle()這步是將entry對象進(jìn)行回收, 因?yàn)閑ntry對象彈出之后沒有再被引用, 可能gc會將entry對象回收, netty為了將對象進(jìn)行循環(huán)利用, 就將其放在對象回收站進(jìn)行回收

我們跟進(jìn)recycle方法

void recycle() {
    chunk = null;
    handle = -1;
    recyclerHandle.recycle(this);
}

chunk = null和handle = -1表示當(dāng)前Entry不指向任何一塊內(nèi)存

 recyclerHandle.recycle(this) 將當(dāng)前entry回收, 有關(guān)對象回收站, 我們會在后面的章節(jié)詳細(xì)剖析

以上就是“Netty分布式ByteBuf怎么使用命中緩存分配”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學(xué)習(xí)更多的知識,請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI