溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Netty事件監(jiān)聽和處理(下)

發(fā)布時間:2020-06-24 01:56:16 來源:網(wǎng)絡(luò) 閱讀:7449 作者:情情說 欄目:開發(fā)技術(shù)

上一篇 介紹了事件監(jiān)聽、責任鏈模型、socket接口和IO模型、線程模型等基本概念,以及Netty的整體結(jié)構(gòu),這篇就來說下Netty三大核心模塊之一:事件監(jiān)聽和處理。

前面提到,Netty是一個NIO框架,它將IO通道的建立、可讀、可寫等狀態(tài)變化,抽象成事件,以責任鏈的方式進行傳遞,可以在處理鏈上插入自定義的Handler,對感興趣的事件進行監(jiān)聽和處理。

通過介紹,你會了解到:

  • 事件監(jiān)聽和處理模型
  • 事件監(jiān)聽:EventLoop
  • 事件處理:ChannelPipeline和ChannelHandler
  • 使用Netty實現(xiàn)Websocket協(xié)議
事件監(jiān)聽和處理模型

進行網(wǎng)絡(luò)編程時,一般的編寫過程是這樣的:

  • 創(chuàng)建服務(wù)端Socket,監(jiān)聽某個端口;
  • 當有客戶端連接時,會創(chuàng)建一個新的客戶端Socket,監(jiān)聽數(shù)據(jù)的可讀、可寫狀態(tài),每一個連接請求都會創(chuàng)建一個客戶端Socket;
  • 讀取和寫入數(shù)據(jù)都會調(diào)用Socket提供的接口,接口列表在上一篇提到過;

傳統(tǒng)的模型,每個客戶端Socket會創(chuàng)建一個單獨的線程監(jiān)聽socket事件,一方面系統(tǒng)可創(chuàng)建的線程數(shù)有限,限制了并發(fā)數(shù),一方面線程過多,線程切換頻繁,導(dǎo)致性能嚴重下降。

隨著操作系統(tǒng)IO模型的發(fā)展,可以采用多路復(fù)用IO,一個線程監(jiān)聽多個Socket,另外,服務(wù)端處理客戶端連接,與客戶端Socket的監(jiān)聽,可以在不同的線程進行處理。

Netty就是采用多路復(fù)用IO進行事件監(jiān)聽,另外,使用不同的線程分別處理客戶端的連接、數(shù)據(jù)讀寫。

整個處理結(jié)構(gòu)如下圖,簡單說明下:

  • Boss EventLoopGroup主要處理客戶端的connect事件,包含多個EventLoop,每個EventLoop一個線程;
  • Worker EventLoopGroup主要處理客戶端Socket的數(shù)據(jù)read、write事件,包含多個EventLoop,每個EventLoop一個線程;
  • 無論是Boos還是Worker,事件的處理都是通過Channel Pipleline組織的,它是責任鏈模式的實現(xiàn),包含一個或多個Handler;
  • 偵聽一個端口,只會綁定到Boss EventLoopGroup中的一個Eventloop;
  • Worker EventLoopGroup中的一個Eventloop,可以監(jiān)聽多個客戶端Socket;

Netty事件監(jiān)聽和處理(下)

EventLoop

一個EventLoop其實和一個特定的線程綁定, 并且在其生命周期內(nèi), 綁定的線程都不會再改。

EventLoop肩負著兩種任務(wù):

  • 第一個是作為 IO 線程, 執(zhí)行與 Channel 相關(guān)的 IO 操作, 包括 調(diào)用select等待就緒的IO事件、讀寫數(shù)據(jù)與數(shù)據(jù)的處理等;
  • 第二個任務(wù)是作為任務(wù)隊列, 執(zhí)行 taskQueue 中的任務(wù), 例如用戶調(diào)用eventLoop.schedule提交的定時任務(wù)也是這個線程執(zhí)行的;

第一個任務(wù)比較好理解,主要解釋下第二個:從socket數(shù)據(jù)到數(shù)據(jù)處理,再到寫入響應(yīng)數(shù)據(jù),Netty都在一個線程中處理,主要是為了線程安全考慮,減少競爭和線程切換,通過任務(wù)隊列的方式,可以在用戶線程提交處理邏輯,在Eventloop中執(zhí)行。

整個EventLoop干的事情就是select -> processIO -> runAllTask,processIO處理IO事件相關(guān)的邏輯,runAllTask處理任務(wù)隊列中的任務(wù),如果執(zhí)行的任務(wù)過多,會影響IO事件的處理,所以會限制任務(wù)處理的時間,整個處理過程如下圖:

Netty事件監(jiān)聽和處理(下)

EventLoop的run代碼如下:

protected void run() {
     for (; ; ) {
         oldWakenUp = wakenUp.getAndSet(false);
         try {
             if (hasTasks()) { //如果有任務(wù),快速返回
                 selectNow();
             } else {
                 select(); //如果沒任務(wù),等待事件返回
                 if (wakenUp.get()) {
                     selector.wakeup();
                 }
             }
             cancelledKeys = 0;
             final long ioStartTime = System.nanoTime();
             needsToSelectAgain = false;

             //處理IO事件
             if (selectedKeys != null) {
                 processSelectedKeysOptimized(selectedKeys.flip());
             } else {
                 processSelectedKeysPlain(selector.selectedKeys());
             }

             //計算IO處理時間
             final long ioTime = System.nanoTime() - ioStartTime;
             final int ioRatio = this.ioRatio; //默認為50

             //處理提交的任務(wù)
             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

             if (isShuttingDown()) {
                 closeAll();
                 if (confirmShutdown()) {
                     break;
                 }
             }
         } catch (Throwable t) {
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
             }
         }
     }
 }
ChannelPipeline和ChannelHandler

ChannelPipeline是一個接口,其有一個默認的實現(xiàn)類DefaultChannelPipeline,內(nèi)部有兩個屬性:head和tail,
這兩者都實現(xiàn)了ChannelHandler接口,對應(yīng)處理鏈的頭和尾。

 protected DefaultChannelPipeline(Channel channel) {
     this.channel = ObjectUtil.checkNotNull(channel, "channel");
     succeededFuture = new SucceededChannelFuture(channel, null);
     voidPromise =  new VoidChannelPromise(channel, true);

     tail = new TailContext(this);
     head = new HeadContext(this);

     head.next = tail;
     tail.prev = head;
}

每個Channel創(chuàng)建時,會創(chuàng)建一個ChannelPipeline對象,來處理channel的各種事件,可以在運行時動態(tài)進行動態(tài)修改其中的 ChannelHandler。

ChannelHandler承載業(yè)務(wù)處理邏輯的地方,我們接觸最多的類,可以自定義Handler,加入處理鏈中,實現(xiàn)自定義邏輯。

ChannelHandler 可分為兩大類:ChannelInboundHandler 和 ChannelOutboundHandle,這兩接口分別對應(yīng)入站和出站消息的處理,對應(yīng)數(shù)據(jù)讀取和數(shù)據(jù)寫入。它提供了接口方法供我們實現(xiàn),處理各種事件。

public interface ChannelInboundHandler extends ChannelHandler {
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}

自定義Handler時,一般繼承ChannelInboundHandlerAdapter或 ChannelOutboundHandlerAdapter。

需要注意的是,不建議在 ChannelHandler 中直接實現(xiàn)耗時或阻塞的操作,因為這可能會阻塞 Netty 工作線程,導(dǎo)致 Netty 無法及時響應(yīng) IO 處理。

Netty事件監(jiān)聽和處理(下)

使用Netty實現(xiàn)Websocket協(xié)議
Websocket協(xié)議

不是本篇的重點,簡單說明下:

  • 是一種長連接協(xié)議,大部分瀏覽器都支持,通過websocket,服務(wù)端可以主動發(fā)消息給客戶端;
  • Websocket協(xié)議,在握手階段使用HTTP協(xié)議,握手完成之后,走Websocket自己的協(xié)議;
  • Websocket是一種二進制協(xié)議;
初始化

Netty提供了ChannelInitializer類方便我們初始化,創(chuàng)建WebSocketServerInitializer類,繼承ChannelInitializer類,用于添加ChannelHandler:

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    @Resource
    private CustomTextFrameHandler customTextFrameHandler;

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("codec-http", new HttpServerCodec());
        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));

        pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler());
        pipeline.addLast("custome-handler", customTextFrameHandler);
    }
}

分析下這幾個Handler,都是Netty默認提供的:

  • HttpServerCodec:用于解析Http請求,主要在握手階段進行處理;
  • HttpObjectAggregator:用于合并Http請求頭和請求體,主要在握手階段進行處理;
  • WebSocketServerProtocolHandler:處理Websocket協(xié)議;
  • CustomTextFrameHandler:自定義的Handler,用于添加自己的業(yè)務(wù)邏輯。

是不是很方便,經(jīng)過WebSocketServerProtocolHandler處理后,讀取出來的就是文本數(shù)據(jù)了,不用自己處理數(shù)據(jù)合包、拆包問題。

CustomTextFrameHandler

自定義的Handler,進行業(yè)務(wù)處理:

public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        final String content = frame.text();
        System.out.println("接收到數(shù)據(jù):"+content);   

        // 回復(fù)數(shù)據(jù)
        TextWebSocketFrame respFrame = new TextWebSocketFrame("我收到了你的數(shù)據(jù)");
        if (ctx.channel().isWritable()) {
              ChannelFuture future = ctx.writeAndFlush(respFrame);
          }                 
    }
}

歡迎掃描下方二維碼,關(guān)注我的個人微信公眾號 ~

Netty事件監(jiān)聽和處理(下)

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI