溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Netty分布式客戶端接入流程是什么

發(fā)布時間:2022-03-28 09:23:40 來源:億速云 閱讀:146 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了Netty分布式客戶端接入流程是什么的相關(guān)知識,內(nèi)容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Netty分布式客戶端接入流程是什么文章都會有所收獲,下面我們一起來看看吧。

第一節(jié):初始化NioSockectChannelConfig

創(chuàng)建channel

在剖析接入流程之前我們首先補充下第一章有關(guān)創(chuàng)建channel的知識:

我們在第一章剖析過channel的創(chuàng)建, 其中NioServerSocketChannel中有個構(gòu)造方法:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

當(dāng)時我們并沒有剖析config相關(guān)知識, 在這一章首先對此做一個補充, 這里我們看到每一個NioServerSocketChannel都擁有一個config屬性, 這個屬性存放著NioServerSocketChannel的相關(guān)配置, 這里創(chuàng)建一個NioServerSocketChannelConfig對象, 并將當(dāng)前channel, 和channel對應(yīng)的java底層的socket對象進行了傳入, NioServerSocketChannelConfig其實是NioServerSocketChannel的內(nèi)部類

我們跟到NioServerSocketChannelConfig類的構(gòu)造方法中:

private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
    super(channel, javaSocket);
}

我們繼續(xù)跟入其父類DefaultServerSocketChannelConfig的構(gòu)造方法中:

public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
    super(channel);
    if (javaSocket == null) {
        throw new NullPointerException("javaSocket");
    }
    this.javaSocket = javaSocket;
}

這里繼續(xù)調(diào)用了其父類的構(gòu)造方法, 并保存了jdk底層的socket對象, 并且調(diào)用其父類DefaultChannelConfig的構(gòu)造方法

跟到其父類DefaultChannelConfig的構(gòu)造方法中

public DefaultChannelConfig(Channel channel) {
    this(channel, new AdaptiveRecvByteBufAllocator());
}

這里調(diào)用了自身的構(gòu)造方法, 傳入了channel和一個AdaptiveRecvByteBufAllocator對象

AdaptiveRecvByteBufAllocator是一個緩沖區(qū)分配器, 用于分配一個緩沖區(qū)Bytebuf的, 有關(guān)Bytebuf的相關(guān)內(nèi)容會在后面的章節(jié)詳細講解, 這里可以簡單介紹作為了解, 就當(dāng)對于之后知識的預(yù)習(xí)

Bytebuf相當(dāng)于jdk的ByetBuffer, Netty對其做了重新的封裝, 用于讀寫channel中的字節(jié)流, 熟悉Nio的同學(xué)對此應(yīng)該并不陌生, AdaptiveRecvByteBufAllocator就是用于分配netty中ByetBuff的緩沖區(qū)分配器, 根據(jù)名字, 我們不難看出這個緩沖區(qū)是一個可變大小的字節(jié)緩沖區(qū)

我們跟到AdaptiveRecvByteBufAllocator的構(gòu)造方法中:

public AdaptiveRecvByteBufAllocator() {
    //DEFAULT_MINIMUM:最小緩沖區(qū)長度64字節(jié)
    //DEFAULT_INITIAL:初始容量1024字節(jié)
    //最大容量65536字節(jié)
    this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}

這里調(diào)用自身的構(gòu)造方法并且傳入了三個屬性, 這三個屬性的含義分別為:

DEFAULT_MINIMUM:代表要分配的緩沖區(qū)長度最少為64個字節(jié)

DEFAULT_INITIAL:代表要分配的緩沖區(qū)的初始容量為1024個字節(jié)

DEFAULT_MAXIMUM:代表要分配的緩沖區(qū)最大容量為65536個字節(jié)

我們跟到this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM)方法中

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
    //忽略驗證代碼
    //最小容量在table中的下標(biāo)
    int minIndex = getSizeTableIndex(minimum);
    if (SIZE_TABLE[minIndex] < minimum) {
        this.minIndex = minIndex + 1;
    } else {
        this.minIndex = minIndex;
    }
    //最大容量在table中的下標(biāo)
    int maxIndex = getSizeTableIndex(maximum);
    if (SIZE_TABLE[maxIndex] > maximum) {
        this.maxIndex = maxIndex - 1;
    } else {
        this.maxIndex = maxIndex;
    }
    this.initial = initial;
}

其中這里初始化了三個屬性, 分別是:

minIndex:最小容量在size_table中的下標(biāo)

maxIndex:最大容量在table中的下標(biāo)

initial:初始容量1024個字節(jié)

這里的size_table就是一個數(shù)組, 里面盛放著byteBuf可分配的內(nèi)存大小的集合, 分配的bytebuf無論是擴容還是收縮, 內(nèi)存大小都屬于size_table中的元素, 那么這個數(shù)組是如何初始化的, 我們跟到這個屬性中:

private static final int[] SIZE_TABLE;

我們看到是一個final修飾的靜態(tài)成員變量, 我們跟到static塊中看它的初始化過程:

static {
    //List集合
    List<Integer> sizeTable = new ArrayList<Integer>();
    //從16開始, 每遞增16添加到List中, 直到大于等于512
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }
    //從512開始, 倍增添加到List中, 直到內(nèi)存溢出
    for (int i = 512; i > 0; i <<= 1) {
        sizeTable.add(i);
    }
    //初始化數(shù)組
    SIZE_TABLE = new int[sizeTable.size()];
    //將list的內(nèi)容放入數(shù)組中
    for (int i = 0; i < SIZE_TABLE.length; i ++) {
        SIZE_TABLE[i] = sizeTable.get(i);
    }
}

首先創(chuàng)建一個Integer類型的list用于盛放內(nèi)存元素

這里通過兩組循環(huán)為list添加元素

首先看第一組循環(huán):

for (int i = 16; i < 512; i += 16) {
    sizeTable.add(i);
}

這里是通過16平移的方式, 直到512個字節(jié), 將每次平移之后的內(nèi)存大小添加到list中

再看第二組循環(huán)

for (int i = 512; i > 0; i <<= 1) {
    sizeTable.add(i);
}

超過512之后, 再通過倍增的方式循環(huán), 直到int類型內(nèi)存溢出, 將每次倍增之后大小添加到list中

最后初始化SIZE_TABLE數(shù)組, 將list中的元素按下表存放到數(shù)組中

這樣就初始化了內(nèi)存數(shù)組

再回到AdaptiveRecvByteBufAllocator的構(gòu)造方法中

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
    //忽略驗證代碼
    //最小容量在table中的下標(biāo)
    int minIndex = getSizeTableIndex(minimum);
    if (SIZE_TABLE[minIndex] < minimum) {
        this.minIndex = minIndex + 1;
    } else {
        this.minIndex = minIndex;
    }
    //最大容量在table中的下標(biāo)
    int maxIndex = getSizeTableIndex(maximum);
    if (SIZE_TABLE[maxIndex] > maximum) {
        this.maxIndex = maxIndex - 1;
    } else {
        this.maxIndex = maxIndex;
    }
    this.initial = initial;
}

這里分別根據(jù)傳入的最小和最大容量去SIZE_TABLE中獲取其下標(biāo)

我們跟到getSizeTableIndex(minimum)中:

private static int getSizeTableIndex(final int size) {
    for (int low = 0, high = SIZE_TABLE.length - 1;;) {
        if (high < low) {
            return low;
        }
        if (high == low) {
            return high;
        }
        int mid = low + high >>> 1;
        int a = SIZE_TABLE[mid];
        int b = SIZE_TABLE[mid + 1];
        if (size > b) {
            low = mid + 1;
        } else if (size < a) {
            high = mid - 1;
        } else if (size == a) {
            return mid;
        } else {
            return mid + 1;
        }
    }
}

這里是通過二分查找去獲取其下表

if (SIZE_TABLE[minIndex] < minimum)這里判斷最小容量下標(biāo)所屬的內(nèi)存大小是否小于最小值, 如果小于最小值則下標(biāo)+1

最大容量的下標(biāo)獲取原理同上, 判斷最大容量下標(biāo)所屬內(nèi)存大小是否大于最大值, 如果是則下標(biāo)-1

我們回到DefaultChannelConfig的構(gòu)造方法:

public DefaultChannelConfig(Channel channel) {
    this(channel, new AdaptiveRecvByteBufAllocator());
}

剛才我們剖析過了AdaptiveRecvByteBufAllocator()的創(chuàng)建過程, 我們繼續(xù)跟到this()中:

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
    setRecvByteBufAllocator(allocator, channel.metadata());
    this.channel = channel;
}

我們看到這里初始化了channel, 在channel初始化之前, 調(diào)用了setRecvByteBufAllocator(allocator, channel.metadata())方法, 顧名思義, 這是用于設(shè)置緩沖區(qū)分配器的方法, 第一個參數(shù)是我們剛剛分析過的新建的AdaptiveRecvByteBufAllocator對象, 第二個傳入的是與channel綁定的ChannelMetadata對象, ChannelMetadata對象是什么?

我們跟進到metadata()方法當(dāng)中, 由于是channel是NioServerSocketChannel, 所以調(diào)用到了NioServerSocketChannel的metadata()方法:

public ChannelMetadata metadata() {
    return METADATA;
}

這里返回了一個成員變量METADATA, 跟到這個成員變量中:

private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);

這里創(chuàng)建了一個ChannelMetadata對象, 并在構(gòu)造方法中傳入false和16

繼續(xù)跟到ChannelMetadata的構(gòu)造方法中

public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) {
    //省略驗證代碼
    //false
    this.hasDisconnect = hasDisconnect;
    //16
    this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead;
}

這里做的事情非常簡單, 只初始化了兩個屬性:

hasDisconnect=false

defaultMaxMessagesPerRead=16

defaultMaxMessagesPerRead=16代表在讀取對方的鏈接或者channel的字節(jié)流時(無論server還是client), 最多只循環(huán)16次, 后面的講解將會看到

剖析完了ChannelMetadata對象的創(chuàng)建, 我們回到DefaultChannelConfig的構(gòu)造方法:

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
    setRecvByteBufAllocator(allocator, channel.metadata());
    this.channel = channel;
}

跟到setRecvByteBufAllocator(allocator, channel.metadata())方法中:

private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
    if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
        ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
    } else if (allocator == null) {
        throw new NullPointerException("allocator");
    }
    rcvBufAllocator = allocator;
}

首先會判斷傳入的緩沖區(qū)分配器是不是MaxMessagesRecvByteBufAllocator類型的, 因為AdaptiveRecvByteBufAllocator實現(xiàn)了MaxMessagesRecvByteBufAllocator接口, 所以此條件成立

之后將其轉(zhuǎn)換成MaxMessagesRecvByteBufAllocator類型,

然后調(diào)用其maxMessagesPerRead(metadata.defaultMaxMessagesPerRead())方法,

這里會走到其子類DefaultMaxMessagesRecvByteBufAllocator的maxMessagesPerRead(int maxMessagesPerRead)方法中,

其中參數(shù)metadata.defaultMaxMessagesPerRead()返回就是ChannelMetadata的屬性defaultMaxMessagesPerRead,

也就是16

跟到maxMessagesPerRead(int maxMessagesPerRead)方法中:

public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
    //忽略驗證代碼

    //初始化為16
    this.maxMessagesPerRead = maxMessagesPerRead;
    return this;
}

這里將自身屬性maxMessagesPerRead設(shè)置為16, 然后返回自身

回到DefaultChannelConfig的構(gòu)造方法

private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
    if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
        ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
    } else if (allocator == null) {
        throw new NullPointerException("allocator");
    }
    rcvBufAllocator = allocator;
}

設(shè)置完了內(nèi)存分配器的maxMessagesPerRead屬性, 最后將DefaultChannelConfig自身的成員變量rcvBufAllocator設(shè)置成我們初始化完畢的allocator對象

關(guān)于“Netty分布式客戶端接入流程是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“Netty分布式客戶端接入流程是什么”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI