溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Netty分布式解碼器讀取數(shù)據(jù)不完整的邏輯是什么

發(fā)布時間:2022-03-29 11:47:05 來源:億速云 閱讀:141 作者:小新 欄目:開發(fā)技術(shù)

這篇文章將為大家詳細講解有關(guān)Netty分布式解碼器讀取數(shù)據(jù)不完整的邏輯是什么,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

概述

如果Server在讀取客戶端的數(shù)據(jù)的時候, 如果一次讀取不完整, 就觸發(fā)channelRead事件, 那么Netty是如何處理這類問題的, 在這一章中, 會對此做詳細剖析

之前的章節(jié)我們學習過pipeline, 事件在pipeline中傳遞, handler可以將事件截取并對其處理, 而之后剖析的編解碼器, 其實就是一個handler, 截取byteBuf中的字節(jié), 然后組建成業(yè)務(wù)需要的數(shù)據(jù)進行繼續(xù)傳播

編碼器, 通常是OutBoundHandler, 也就是以自身為基準, 對那些對外流出的數(shù)據(jù)做處理, 所以也叫編碼器, 將數(shù)據(jù)經(jīng)過編碼發(fā)送出去

解碼器, 通常是inboundHandler, 也就是以自身為基準, 對那些流向自身的數(shù)據(jù)做處理, 所以也叫解碼器, 將對向的數(shù)據(jù)接收之后經(jīng)過解碼再進行使用

同樣, 在netty的編碼器中, 也會對半包和粘包問題做相應(yīng)的處理

什么是半包, 顧名思義, 就是不完整的數(shù)據(jù)包, 因為netty在輪詢讀事件的時候, 每次將channel中讀取的數(shù)據(jù), 不一定是一個完整的數(shù)據(jù)包, 這種情況, 就叫半包

粘包同樣也不難理解, 如果client往server發(fā)送數(shù)據(jù)包, 如果發(fā)送頻繁很有可能會將多個數(shù)據(jù)包的數(shù)據(jù)都發(fā)送到通道中, 如果在server在讀取的時候可能會讀取到超過一個完整數(shù)據(jù)包的長度, 這種情況叫粘包

有關(guān)半包和粘包, 入下圖所示:

Netty分布式解碼器讀取數(shù)據(jù)不完整的邏輯是什么

6-0-1

 netty對半包的或者粘包的處理其實也很簡單, 通過之前的學習, 我們知道, 每個handler是和channel唯一綁定的, 一個handler只對應(yīng)一個channel, 所以將channel中的數(shù)據(jù)讀取時候經(jīng)過解析, 如果不是一個完整的數(shù)據(jù)包, 則解析失敗, 將這塊數(shù)據(jù)包進行保存, 等下次解析時再和這個數(shù)據(jù)包進行組裝解析, 直到解析到完整的數(shù)據(jù)包, 才會將數(shù)據(jù)包進行向下傳遞

具體流程是在代碼中如何體現(xiàn)的呢?我們進入到源碼分析中

第一節(jié): ByteToMessageDecoder

ByteToMessageDecoder解碼器, 顧名思義, 是一個將Byte解析成消息的解碼器,

我們看他的定義

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{
    //類體省略
}

這里繼承了ChannelInboundHandlerAdapter, 根據(jù)之前的學習, 我們知道, 這是個inbound類型的handler, 也就是處理流向自身事件的handler

其次, 該類通過abstract關(guān)鍵字修飾, 說明是個抽象類, 在我們實際使用的時候, 并不是直接使用這個類, 而是使用其子類, 類定義了解碼器的骨架方法, 具體實現(xiàn)邏輯交給子類, 同樣, 在半包處理中也是由該類進行實現(xiàn)的

netty中很多解碼器都實現(xiàn)了這個類, 并且, 我們也可以通過實現(xiàn)該類進行自定義解碼器

我們重點關(guān)注一下該類的一個屬性:

ByteBuf cumulation;

這個屬性, 就是有關(guān)半包處理的關(guān)鍵屬性, 從概述中我們知道, netty會將不完整的數(shù)據(jù)包進行保存, 這個數(shù)據(jù)包就是保存在這個屬性中

之前的學習我們知道, ByteBuf讀取完數(shù)據(jù)會傳遞channelRead事件, 傳播過程中會調(diào)用handler的channelRead方法, ByteToMessageDecoder的channelRead方法, 就是編碼的關(guān)鍵部分

我們看其channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //如果message是byteBuf類型
    if (msg instanceof ByteBuf) {
        //簡單當成一個arrayList, 用于盛放解析到的對象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            //當前累加器為空, 說明這是第一次從io流里面讀取數(shù)據(jù)
            first = cumulation == null;
            if (first) {
                //如果是第一次, 則將累加器賦值為剛讀進來的對象
                cumulation = data;
            } else {
                //如果不是第一次, 則把當前累加的數(shù)據(jù)和讀進來的數(shù)據(jù)進行累加
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            //調(diào)用子類的方法進行解析
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                numReads = 0;
                discardSomeReadBytes();
            }
            //記錄list長度
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            //向下傳播
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        //不是byteBuf類型則向下傳播
        ctx.fireChannelRead(msg);
    }
}

這方法比較長, 帶大家一步步剖析

首先判斷如果傳來的數(shù)據(jù)是ByteBuf, 則進入if塊中

 CodecOutputList out = CodecOutputList.newInstance() 這里就當成一個ArrayList就好, 用于盛放解碼完成的數(shù)據(jù)

 ByteBuf data = (ByteBuf) msg 這步將數(shù)據(jù)轉(zhuǎn)化成ByteBuf

 first = cumulation == null  這里表示如果cumulation == null, 說明沒有存儲板半包數(shù)據(jù), 則將當前的數(shù)據(jù)保存在屬性cumulation中

如果 cumulation != null , 說明存儲了半包數(shù)據(jù), 則通過cumulator.cumulate(ctx.alloc(), cumulation, data)將讀取到的數(shù)據(jù)和原來的數(shù)據(jù)進行累加, 保存在屬性cumulation中

我們看cumulator屬性

private Cumulator cumulator = MERGE_CUMULATOR;

這里調(diào)用了其靜態(tài)屬性MERGE_CUMULATOR, 我們跟過去:

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        //不能到過最大內(nèi)存
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1) {
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        //將當前數(shù)據(jù)buffer
        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};

這里創(chuàng)建了Cumulator類型的靜態(tài)對象, 并重寫了cumulate方法, 這里cumulate方法, 就是用于將ByteBuf進行拼接的方法:

方法中, 首先判斷cumulation的寫指針+in的可讀字節(jié)數(shù)是否超過了cumulation的最大長度, 如果超過了, 將對cumulation進行擴容, 如果沒超過, 則將其賦值到局部變量buffer中

然后將in的數(shù)據(jù)寫到buffer中, 將in進行釋放, 返回寫入數(shù)據(jù)后的ByteBuf

回到channelRead方法中:

最后通過callDecode(ctx, cumulation, out)方法進行解碼, 這里傳入了Context對象, 緩沖區(qū)cumulation和集合out:

我們跟到callDecode(ctx, cumulation, out)方法中:

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        //只要累加器里面有數(shù)據(jù)
        while (in.isReadable()) {
            int outSize = out.size();
            //判斷當前List是否有對象
            if (outSize > 0) {
                //如果有對象, 則向下傳播事件
                fireChannelRead(ctx, out, outSize);
                //清空當前l(fā)ist
                out.clear();
                //解碼過程中如ctx被removed掉就break
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }
            //當前可讀數(shù)據(jù)長度
            int oldInputLength = in.readableBytes();
            //子類實現(xiàn)
            //子類解析, 解析玩對象放到out里面
            decode(ctx, in, out);
            if (ctx.isRemoved()) {
                break;
            }
            //List解析前大小 和解析后長度一樣(什么沒有解析出來)
            if (outSize == out.size()) {
                //原來可讀的長度==解析后可讀長度
                //說明沒有讀取數(shù)據(jù)(當前累加的數(shù)據(jù)并沒有拼成一個完整的數(shù)據(jù)包)
                if (oldInputLength == in.readableBytes()) {
                    //跳出循環(huán)(下次在讀取數(shù)據(jù)才能進行后續(xù)的解析)
                    break;
                } else {
                    //沒有解析到數(shù)據(jù), 但是進行讀取了
                    continue;
                }
            }
            //out里面有數(shù)據(jù), 但是沒有從累加器讀取數(shù)據(jù)
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Throwable cause) {
        throw new DecoderException(cause);
    }
}

這里首先循環(huán)判斷傳入的ByteBuf是否有可讀字節(jié), 如果還有可讀字節(jié)說明沒有解碼完成, 則循環(huán)繼續(xù)解碼

然后判斷集合out的大小, 如果大小大于1, 說明out中盛放了解碼完成之后的數(shù)據(jù), 然后將事件向下傳播, 并清空out

因為我們第一次解碼out是空的, 所以這里不會進入if塊, 這部分我們稍后分析, 這里繼續(xù)往下看

通過 int oldInputLength = in.readableBytes() 獲取當前ByteBuf, 其實也就是屬性cumulation的可讀字節(jié)數(shù), 這里就是一個備份用于比較, 我們繼續(xù)往下看:

decode(ctx, in, out)方法是最終的解碼操作, 這部會讀取cumulation并且將解碼后的數(shù)據(jù)放入到集合out中, 在ByteToMessageDecoder中的該方法是一個抽象方法, 讓子類進行實現(xiàn), 我們使用的netty很多的解碼都是繼承了ByteToMessageDecoder并實現(xiàn)了decode方法從而完成了解碼操作, 同樣我們也可以遵循相應(yīng)的規(guī)則進行自定義解碼器, 在之后的小節(jié)中會講解netty定義的解碼器, 并剖析相關(guān)的實現(xiàn)細節(jié), 這里我們繼續(xù)往下看:

 if (outSize == out.size()) 這個判斷表示解析之前的out大小和解析之后out大小進行比較, 如果相同, 說明并沒有解析出數(shù)據(jù), 我們進入到if塊中:

 if (oldInputLength == in.readableBytes()) 表示cumulation的可讀字節(jié)數(shù)在解析之前和解析之后是相同的, 說明解碼方法中并沒有解析數(shù)據(jù), 也就是當前的數(shù)據(jù)并不是一個完整的數(shù)據(jù)包, 則跳出循環(huán), 留給下次解析, 否則, 說明沒有解析到數(shù)據(jù), 但是讀取了, 所以跳過該次循環(huán)進入下次循環(huán)

最后判斷 if (oldInputLength == in.readableBytes()) , 這里代表out中有數(shù)據(jù), 但是并沒有從cumulation讀數(shù)據(jù), 說明這個out的內(nèi)容是非法的, 直接拋出異常

我們回到channRead方法中

我們關(guān)注finally中的內(nèi)容:

finally {
    if (cumulation != null && !cumulation.isReadable()) {
        numReads = 0;
        cumulation.release();
        cumulation = null;
    } else if (++ numReads >= discardAfterReads) {
        numReads = 0;
        discardSomeReadBytes();
    }
    //記錄list長度
    int size = out.size();
    decodeWasNull = !out.insertSinceRecycled();
    //向下傳播
    fireChannelRead(ctx, out, size);
    out.recycle();
}

首先判斷cumulation不為null, 并且沒有可讀字節(jié), 則將累加器進行釋放, 并設(shè)置為null

之后記錄out的長度, 通過fireChannelRead(ctx, out, size)將channelRead事件進行向下傳播, 并回收out對象

我們跟到fireChannelRead(ctx, out, size)方法中:

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    //遍歷List
    for (int i = 0; i < numElements; i ++) {
        //逐個向下傳遞
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}

這里遍歷out集合, 并將里面的元素逐個向下傳遞

關(guān)于“Netty分布式解碼器讀取數(shù)據(jù)不完整的邏輯是什么”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI