您好,登錄后才能下訂單哦!
這篇文章主要介紹了Netty分布式客戶端處理接入事件handle的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
回到上一章NioEventLoop的processSelectedKey ()方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //獲取到channel中的unsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //如果這個key不是合法的, 說明這個channel可能有問題 if (!k.isValid()) { //代碼省略 } try { //如果是合法的, 拿到key的io事件 int readyOps = k.readyOps(); //鏈接事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //寫事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //讀事件和接受鏈接事件 //如果當(dāng)前NioEventLoop是work線程的話, 這里就是op_read事件 //如果是當(dāng)前NioEventLoop是boss線程的話, 這里就是op_accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
我們看其中的if判斷:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
上一小節(jié)我們分析過, 如果當(dāng)前NioEventLoop是work線程的話, 這里就是op_read事件, 如果是當(dāng)前NioEventLoop是boss線程的話, 這里就是op_accept事件, 這里我們以boss線程為例進行分析
之前我們講過, 無論處理op_read事件還是op_accept事件, 都走的unsafe的read()方法, 這里unsafe是通過channel拿到, 我們知道如果是處理accept事件, 這里的channel是NioServerSocketChannel, 這里與之綁定的unsafe是NioMessageUnsafe
我們跟到NioMessageUnsafe的read()方法:
public void read() { //必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用 assert eventLoop().inEventLoop(); //服務(wù)端channel的config final ChannelConfig config = config(); //服務(wù)端channel的pipeline final ChannelPipeline pipeline = pipeline(); //處理服務(wù)端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //設(shè)置配置 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //創(chuàng)建jdk底層的channel //readBuf用于臨時承載讀到鏈接 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器將讀到的鏈接進行計數(shù) allocHandle.incMessagesRead(localRead); //連接數(shù)是否超過最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷每一條客戶端連接 for (int i = 0; i < size; i ++) { readPending = false; //傳遞事件, 將創(chuàng)建NioSokectChannel進行傳遞 //最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代碼省略 } finally { //代碼省略 } }
首先獲取與NioServerSocketChannel綁定config和pipeline, config我們上一小節(jié)進行分析過, pipeline我們將在下一章進行剖析
我們看這一句:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
這里通過RecvByteBufAllocator接口調(diào)用了其內(nèi)部接口Handler
public interface RecvByteBufAllocator { Handle newHandle(); interface Handle { int guess(); void reset(ChannelConfig config); void incMessagesRead(int numMessages); void lastBytesRead(int bytes); int lastBytesRead(); void attemptedBytesRead(int bytes); int attemptedBytesRead(); boolean continueReading(); void readComplete(); } }
我們看到RecvByteBufAllocator接口只有一個方法newHandle(), 顧名思義就是用于創(chuàng)建Handle對象的方法, 而Handle中的方法, 才是實際用于操作的方法
在RecvByteBufAllocator實現(xiàn)類中包含Handle的子類, 具體實現(xiàn)關(guān)系如下:
回到read()方法中再看這段代碼:
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
unsafe()返回當(dāng)前channel綁定的unsafe對象, recvBufAllocHandle()最終會調(diào)用AbstractChannel內(nèi)部類AbstractUnsafe的recvBufAllocHandle()方法
跟進AbstractUnsafe的recvBufAllocHandle()方法:
public RecvByteBufAllocator.Handle recvBufAllocHandle() { //如果不存在, 則創(chuàng)建一個recvHandle的實例 if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }
如果如果是第一次執(zhí)行到這里, 自身屬性recvHandle為空, 會創(chuàng)建一個recvHandle實例, config()返回NioServerSocketChannel綁定的ChannelConfig, getRecvByteBufAllocator()獲取其RecvByteBufAllocator對象, 這兩部分上一小節(jié)剖析過了, 這里通過newHandle()創(chuàng)建一個Handle, 這里會走到AdaptiveRecvByteBufAllocator類中的newHandle()方法中
public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); }
這里創(chuàng)建HandleImpl傳入了三個參數(shù), 這三個參數(shù)我們上一小節(jié)剖析過, minIndex為最小內(nèi)存在SIZE_TABLE中的下標(biāo), maxIndex為最大內(nèi)存在SEIZE_TABEL中的下標(biāo), initial是初始內(nèi)存, 我們跟到HandleImpl的構(gòu)造方法中:
public HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; }
初始化minIndex和maxIndex, 根據(jù)initial找到當(dāng)前的下標(biāo), nextReceiveBufferSize是根據(jù)當(dāng)前的下標(biāo)找到對應(yīng)的內(nèi)存
這樣, 我們就創(chuàng)建了個Handle對象
在這里我們需要知道, 這個handle, 是和channel唯一綁定的屬性, 而AdaptiveRecvByteBufAllocator對象是和ChannelConfig對象唯一綁定的, 間接也是和channel進行唯一綁定
public void read() { //必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用 assert eventLoop().inEventLoop(); //服務(wù)端channel的config final ChannelConfig config = config(); //服務(wù)端channel的pipeline final ChannelPipeline pipeline = pipeline(); //處理服務(wù)端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //設(shè)置配置 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //創(chuàng)建jdk底層的channel //readBuf用于臨時承載讀到鏈接 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器將讀到的鏈接進行計數(shù) allocHandle.incMessagesRead(localRead); //連接數(shù)是否超過最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷每一條客戶端連接 for (int i = 0; i < size; i ++) { readPending = false; //傳遞事件, 將創(chuàng)建NioSokectChannel進行傳遞 //最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代碼省略 } finally { //代碼省略 } }
繼續(xù)往下跟:
allocHandle.reset(config);
這個段代碼是重新設(shè)置配置, 也就是將之前的配置信息進行初始化, 最終會走到, DefaultMaxMessagesRecvByteBufAllocator中的內(nèi)部類MaxMessageHandle的reet中
public void reset(ChannelConfig config) { this.config = config; maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; }
這里僅僅對幾個屬性做了賦值, 簡單介紹下這幾個屬性:
config
:當(dāng)前channelConfig對象
maxMessagePerRead
:表示讀取消息的時候可以讀取幾次(循環(huán)次數(shù)), maxMessagesPerRead()返回的是RecvByteBufAllocator的maxMessagesPerRead屬性, 上一小節(jié)已經(jīng)做過剖析
totalMessages
:代表目前讀循環(huán)已經(jīng)讀取的消息個數(shù), 在NIO傳輸模式下也就是已經(jīng)執(zhí)行的循環(huán)次數(shù), 這里初始化為0
totalBytesRead
:代表目前已經(jīng)讀取到的消息字節(jié)總數(shù), 這里同樣也初始化為0
我們繼續(xù)往下走, 這里首先是一個do-while循環(huán), 循環(huán)體里通過int localRead = doReadMessages(readBuf)這種方式將讀取到的連接數(shù)放入到一個List集合中, 這一步我們下一小節(jié)再分析, 我們繼續(xù)往下走:
我們首先看allocHandle.incMessagesRead(localRead)這一步, 這里的localRead表示這次循環(huán)往readBuf中放入的連接數(shù), 在Nio模式下這, 如果讀取到一條連接會返回1
跟到中的MaxMessageHandle的incMessagesRead(int amt)方法中:
public final void incMessagesRead(int amt) { totalMessages += amt; }
這里將totalMessages增加amt, 也就是+1
這里totalMessage, 剛才已經(jīng)剖析過, 在NIO傳輸模式下也就是已經(jīng)執(zhí)行的循環(huán)次數(shù), 這里每次執(zhí)行一次循環(huán)都會加一
再去看循環(huán)終止條件allocHandle.continueReading()
跟到MaxMessageHandle的continueReading()方法中:
public boolean continueReading() { //config.isAutoRead()默認(rèn)返回true // totalMessages < maxMessagePerRead //totalMessages代表當(dāng)前讀到的鏈接, 默認(rèn)是1 //maxMessagePerRead每一次最大讀多少鏈接(默認(rèn)16) return config.isAutoRead() && attemptedBytesRead == lastBytesRead && totalMessages < maxMessagePerRead && totalBytesRead < Integer.MAX_VALUE; }
我們逐個分析判斷條件:
config.isAutoRead(): 這里默認(rèn)為true
attemptedBytesRead == lastBytesRead: 表示本次讀取的字節(jié)數(shù)和最后一次讀取的字節(jié)數(shù)相等, 因為到這里都沒有進行字節(jié)數(shù)組的讀取操作, 所以默認(rèn)都為0, 這里也返回true
totalMessages < maxMessagePerRead
表示當(dāng)前讀取的次數(shù)是否小于最大讀取次數(shù), 我們知道totalMessages每次循環(huán)都會自增, 而maxMessagePerRead默認(rèn)值為16, 所以這里會限制循環(huán)不能超過16次, 也就是最多一次只能讀取16條連接
totalBytesRead < Integer.MAX_VALUE
表示讀取的字節(jié)數(shù)不能超過int類型的最大值
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Netty分布式客戶端處理接入事件handle的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。