溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Netty 源碼(ChannelHandler 死磕)

發(fā)布時間:2020-07-07 10:28:07 來源:網(wǎng)絡 閱讀:956 作者:瘋狂創(chuàng)客圈 欄目:web開發(fā)

精進篇:netty源碼死磕5? - 揭開 ChannelHandler 的神秘面紗

目錄

1. 前言
2. Handler在經(jīng)典Reactor中的角色
3. Handler在Netty中的坐標位置
4. Netty中Handler的類型

1.1. ChannelInboundHandler入站處理器
1.2. ChannelOutboundHandler出站處理器
5. 揭開Pipeline的神秘面紗
6. Handler的上下文環(huán)境

7. Handler的注冊
7.1. 第一步:包裹
7.2. 加入鏈表并注冊完成回調(diào)事件
7.3. 回調(diào)添加完成事件
8. 小結


1. 前言

Reactor模式是Netty的基礎和靈魂,掌握了經(jīng)典的Reactor模式實現(xiàn),徹底掌握Netty就事半功倍了。《Reactor模式(netty源碼死磕3)》對Reactor模式的經(jīng)典實現(xiàn),進行了詳細介紹。作為本文的閱讀準備,可以去溫習一下。

Reactor模式的兩個重要的組件,一個是Reactor反應器,在Netty中的對應的實現(xiàn)是EventLoop,在文章《EventLoop(netty源碼死磕4)》中,已經(jīng)有了非常詳細的介紹。

此文聚焦于Reactor模式的另一個重要的組成部分Handler。

2. Handler在經(jīng)典Reactor中的角色

在Reactor經(jīng)典模型中,Reactor查詢到NIO就緒的事件后,分發(fā)到Handler,由Handler完成NIO操作和計算的操作。

Netty 源碼(ChannelHandler 死磕)

Handler主要的操作為Channel緩存讀、數(shù)據(jù)解碼、業(yè)務處理、寫Channel緩存,然后由Channel(代表client)發(fā)送到最終的連接終端。

3. Handler在Netty中的坐標

經(jīng)典的Reactor模式,更多在于演示和說明,僅僅是有一種濃縮和抽象。

由于Netty更多用于生產(chǎn),在實際開發(fā)中的業(yè)務處理這塊,主要通過Handler來實現(xiàn),所以Netty中在Handler的組織設計這塊,遠遠比經(jīng)典的Reactor模式實現(xiàn),要紛繁復雜得多。

Netty 源碼(ChannelHandler 死磕)


在分析Handler之前,首先回顧一下Netty中的Channel。在《EventLoop(netty源碼死磕4)》中,已經(jīng)有詳細的說明。一個Netty Channel對應于一個Client連接,內(nèi)部封裝了一個Java NIO SelectableChannel 可查詢通道。

再回到Handler。

Hander的根本使命,就是處理Channel的就緒事件,根據(jù)就緒事件,完成NIO處理和業(yè)務操作。比方Channel讀就緒時,Hander就開始讀;Channel寫就緒時,Hander就開始寫。

4. Netty中Handler的類型

從應用程序開發(fā)人員的角度來看,Netty的主要組件是ChannelHandler,所以,對ChannelHandler的分類,也是從應用開發(fā)的角度來的。

Netty 源碼(ChannelHandler 死磕)

從應用程序開發(fā)人員的角度來看,數(shù)據(jù)有入站和出站兩種類型。

這里的出站和入站,不是網(wǎng)絡通信方面的入站和出站。而是相對于Netty Channel與Java NIO Channel而言的。

數(shù)據(jù)入站,指的是數(shù)據(jù)從底層的Java NIO channel到Netty的Channel。數(shù)據(jù)出站,指的是通過Netty的Channel來操作底層的 Java NIO chanel。

從入站和出戰(zhàn)的角度出發(fā),Netty中的ChannelHandler主要由兩種類型,ChannelInboundHandler和ChannelOutboundHandler。

1.1. ChannelInboundHandler入站處理器

當Java NIO事件進站到Channel時,產(chǎn)生一的一系列事件將由ChannelHandler所對應的API處理。

當查詢到Java NIO底層Channel的就緒事件時,通過一系列的ChannelInboundHandler處理器,完成底層就緒事件的處理。比方說底層連接建立事件、底層連接斷開事件、從底層讀寫就緒事件等等。

Netty 源碼(ChannelHandler 死磕)


啰嗦一下,入站(inbound)處理通常由底層Java NIO channel觸發(fā),主要事件如下:

1. 注冊事件 fireChannelRegistered。

2. 連接建立事件 fireChannelActive。

3. 讀事件和讀完成事件 fireChannelRead、fireChannelReadComplete。

4. 異常通知事件 fireExceptionCaught。

5. 用戶自定義事件 fireUserEventTriggered。

6. Channel 可寫狀態(tài)變化事件 fireChannelWritabilityChanged。

7. 連接關閉事件 fireChannelInactive。


1.2. ChannelOutboundHandler出站處理器


當需要Netty Channel需要操作Java NIO底層Channel時,通過一系列的ChannelOutboundHandler處理器,完成底層操作。比方說建立底層連接、斷開底層連接、從底層Java NIO通道讀入、寫入底層Java NIO通道等。ChannelOutboundHandler是一個接口,主要操作如下圖所示:

Netty 源碼(ChannelHandler 死磕)



啰嗦一下,出站(inbound) Handler通常是Netty channel操作底層Java NIO channel,主要操作如下:

1. 端口綁定 bind。

2. 連接服務端 connect。

3. 寫事件 write。

4. 刷新時間 flush。

5. 讀事件 read。

6. 主動斷開連接 disconnect。

7. 關閉 channel 事件 close。

至此,Netty中的兩大處理器的類型,就已經(jīng)說得很清楚了。

再說說Handler和Channel的關系。

打個比方,如果Hander是太陽系的行星,那么Channel就是太陽系的恒星。Hander的服務對象和公轉的軸心,就是Channel。

這可能是最為不恰當?shù)囊粋€比方,但是說的是事實。


5. 揭開Pipeline的神秘面紗

一個Channel在數(shù)量上,肯定不止擁有一個Handler。 如何將雜亂無章的Handler,有序的組織起來呢?

來了一個Handler的裝配器——Pipeline。

Pipeline是何方神圣呢?

先揭一下神秘面紗:

Netty中, 使用一個雙向鏈表,將屬于一個Channel的所有Handler組織起來,并且給這個雙向鏈表封裝在一個類中,再給這個類取了一個非常牛逼的名字,叫做ChannelPipeline。

為什么這個名字很牛逼呢?

實際上這里用了Java中一種非常重要的設計模式,Pipeline設計模式。后面將用專門的文章,來介紹這種牛逼模式。

回到主題:

一個Channel,僅僅一個ChannelPipeline。該pipeline在Channel被創(chuàng)建的時候創(chuàng)建。ChannelPipeline相當于是ChannelHandler的容器,它包含了一個ChannelHander形成的列表,且所有ChannelHandler都會注冊到ChannelPipeline中。


6. Handler的上下文環(huán)境

在Netty的設計中,Handler是無狀態(tài)的,不保存和Channel有關的信息。打個不恰當?shù)谋确剑琀andler就像國際雇傭軍一樣,誰給錢,給誰打仗。Handler的目標,是將自己的處理邏輯做得很完成,可以給不同的Channel使用。

與之不同的是,Pipeline是有狀態(tài)的,保存了Channel的關系。

于是乎,Handler和Pipeline之間,需要一個中間角色,把他們聯(lián)系起來。這個中間角色是誰呢?

它就是——ChannelHandlerContext 。

所以,ChannelPipeline 中維護的,是一個由 ChannelHandlerContext 組成的雙向鏈表。這個鏈表的頭是 HeadContext, 鏈表的尾是 TailContext。而無狀態(tài)的Handler,作為Context的成員,關聯(lián)在ChannelHandlerContext 中。在對應關系上,每個 ChannelHandlerContext 中僅僅關聯(lián)著一個 ChannelHandler。

Netty 源碼(ChannelHandler 死磕)Netty 源碼(ChannelHandler 死磕)



我們繼續(xù)用源碼說話。

Context的雙向鏈表的主要代碼,在 AbstractChannelHandlerContext類中。該類主要包含一個雙向鏈表節(jié)點的前置和后置節(jié)點引用 prev、next,以及數(shù)據(jù)引用 handler,相當于鏈表數(shù)據(jù)結構中的 Node 節(jié)點。

部分關鍵源碼節(jié)選如下:

// ChannelHandler 首位指針
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    // pipeline 所屬 channel
    private final Channel channel;
    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
 succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
head = new HeadContext(this);

        head.next = tail;
tail.prev = head;
    }

7. Handler的注冊

Handler是如何注冊到Pipeline中的呢?

1.3. 第一步:包裹

加入到Pipeline之前,在Pipeline的基類DefaultChannelPipeline中,首先對Handler進行包裹。

代碼如下:

// 使用 AbstractChannelHandlerContext 包裹 ChannelHandler

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
1.4. 加入鏈表并注冊完成回調(diào)事件

1. 構建了 AbstractChannelHandlerContext 節(jié)點,并加入到了鏈表尾部。

2. 如果 channel 尚未注冊到 EventLoop,就添加一個任務到 PendingHandlerCallback 上,后續(xù)channel 注冊完畢,再調(diào)用 ChannelHandler.handlerAdded。

3. 如果已經(jīng)注冊,馬上調(diào)用 callHandlerAdded0 方法來執(zhí)行 ChannelHandler.handlerAdded 注冊完成的回調(diào)函數(shù)。

代碼如下:

@Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 檢查,若不是 Sharable,而且已經(jīng)被添加到其他 pipeline,則拋出異常
            checkMultiplicity(handler);
            // 構建 AbstractChannelHandlerContext 節(jié)點
            newCtx = newContext(group, filterName(name, handler), handler);
            // 添加到鏈表尾部
            addLast0(newCtx);

            // registered 為 false 表示 channel 尚未注冊到 EventLoop 上。
            // 添加一個任務到 PendingHandlerCallback 上,后續(xù)注冊完畢,再調(diào)用 ChannelHandler.handlerAdded
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            // registered 為 true,則立即調(diào)用 ChannelHandler.handlerAdded
            EventExecutor executor = newCtx.executor();
            // inEvent 用于判斷當前線程是否是 EventLoop 線程。執(zhí)行 ChannelHandler 時,必須在對應的 EventLoop 線程池中執(zhí)行。
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

 @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 檢查,若不是 Sharable,而且已經(jīng)被添加到其他 pipeline,則拋出異常
            checkMultiplicity(handler);
            // 構建 AbstractChannelHandlerContext 節(jié)點
            newCtx = newContext(group, filterName(name, handler), handler);
            // 添加到鏈表尾部
            addLast0(newCtx);

            // registered 為 false 表示 channel 尚未注冊到 EventLoop 上。
            // 添加一個任務到 PendingHandlerCallback 上,后續(xù)注冊完畢,再調(diào)用 ChannelHandler.handlerAdded
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

     // registered 為 true,則立即調(diào)用 ChannelHandler.handlerAdded
            EventExecutor executor = newCtx.executor();
            // inEvent 用于判斷當前線程是否是 EventLoop 線程。執(zhí)行 ChannelHandler 時,必須在對應的 EventLoop 線程池中執(zhí)行。
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
1.5. 回調(diào)添加完成事件

添加完成后,執(zhí)行回調(diào)方法如下:

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
           …….
    }

會執(zhí)行handler的handlerAdded 方法,這是一個回調(diào)方法。添加完成后的回調(diào)代碼,基本上寫在這里。



8. 小結


至此,牛逼的Netty Handler和Netty Reactor 介紹完了。

對于Pipeline模式和基于Pipeline的Netty 入站和出站的事件傳輸機制,【瘋狂創(chuàng)客圈】在后面的系列死磕文章,會做一個非常精彩的介紹。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI