溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

發(fā)布時(shí)間:2021-12-21 13:54:10 來(lái)源:億速云 閱讀:176 作者:iii 欄目:編程語(yǔ)言

本篇內(nèi)容介紹了“基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

Netty概述

Netty是一個(gè)基于異步與事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架,它支持快速與簡(jiǎn)單地開發(fā)可維護(hù)的高性能的服務(wù)器與客戶端。

所謂事件驅(qū)動(dòng)就是由通過(guò)各種事件響應(yīng)來(lái)決定程序的流程,在Netty中到處都充滿了異步與事件驅(qū)動(dòng),這種特點(diǎn)使得應(yīng)用程序可以以任意的順序響應(yīng)在任意的時(shí)間點(diǎn)產(chǎn)生的事件,它帶來(lái)了非常高的可伸縮性,讓你的應(yīng)用可以在需要處理的工作不斷增長(zhǎng)時(shí),通過(guò)某種可行的方式或者擴(kuò)大它的處理能力來(lái)適應(yīng)這種增長(zhǎng)。

Netty提供了高性能與易用性,它具有以下特點(diǎn):

  • 擁有設(shè)計(jì)良好且統(tǒng)一的API,支持NIO與OIO(阻塞IO)等多種傳輸類型,支持真正的無(wú)連接UDP Socket。

  • 簡(jiǎn)單而強(qiáng)大的線程模型,可高度定制線程(池)。(定制化的Reactor模型)

  • 良好的模塊化與解耦,支持可擴(kuò)展和靈活的事件模型,可以很輕松地分離關(guān)注點(diǎn)以復(fù)用邏輯組件(可插拔的)。

  • 性能高效,擁有比Java核心API更高的吞吐量,通過(guò)zero-copy功能以實(shí)現(xiàn)最少的內(nèi)存復(fù)制消耗。

  • 內(nèi)置了許多常用的協(xié)議編解碼器,如HTTP、SSL、WebScoket等常見協(xié)議可以通過(guò)Netty做到開箱即用。用戶也可以利用Netty簡(jiǎn)單方便地實(shí)現(xiàn)自己的應(yīng)用層協(xié)議。

大多數(shù)人使用Netty主要還是為了提高應(yīng)用的性能,而高性能則離不開非阻塞IO。Netty的非阻塞IO是基于Java NIO的,并且對(duì)其進(jìn)行了封裝(直接使用Java NIO API在高復(fù)雜度下的應(yīng)用中是一項(xiàng)非常繁瑣且容易出錯(cuò)的操作,而Netty幫你封裝了這些復(fù)雜操作)。

etty簡(jiǎn)介

讀完這一章,我們基本上可以了解到Netty所有重要的組件,對(duì)Netty有一個(gè)全面的認(rèn)識(shí),這對(duì)下一步深入學(xué)習(xí)Netty是十分重要的,而學(xué)完這一章,我們其實(shí)已經(jīng)可以用Netty解決一些常規(guī)的問(wèn)題了。

Netty都有哪些組件?

為了更好的理解和進(jìn)一步深入Netty,我們先總體認(rèn)識(shí)一下Netty用到的組件及它們?cè)谡麄€(gè)Netty架構(gòu)中是怎么協(xié)調(diào)工作的。Netty應(yīng)用中必不可少的組件:
  • Bootstrap or ServerBootstrap

  • EventLoop

  • EventLoopGroup

  • ChannelPipeline

  • Channel

  • Future or ChannelFuture

  • ChannelInitializer

  • ChannelHandler

  • Bootstrap,一個(gè)Netty應(yīng)用通常由一個(gè)Bootstrap開始,它主要作用是配置整個(gè)Netty程序,串聯(lián)起各個(gè)組件。

    Handler,為了支持各種協(xié)議和處理數(shù)據(jù)的方式,便誕生了Handler組件。Handler主要用來(lái)處理各種事件,這里的事件很廣泛,比如可以是連接、數(shù)據(jù)接收、異常、數(shù)據(jù)轉(zhuǎn)換等。

    ChannelInboundHandler,一個(gè)最常用的Handler。這個(gè)Handler的作用就是處理接收到數(shù)據(jù)時(shí)的事件,也就是說(shuō),我們的業(yè)務(wù)邏輯一般就是寫在這個(gè)Handler里面的,ChannelInboundHandler就是用來(lái)處理我們的核心業(yè)務(wù)邏輯。

    ChannelInitializer,當(dāng)一個(gè)鏈接建立時(shí),我們需要知道怎么來(lái)接收或者發(fā)送數(shù)據(jù),當(dāng)然,我們有各種各樣的Handler實(shí)現(xiàn)來(lái)處理它,那么ChannelInitializer便是用來(lái)配置這些Handler,它會(huì)提供一個(gè)ChannelPipeline,并把Handler加入到ChannelPipeline。

    ChannelPipeline,一個(gè)Netty應(yīng)用基于ChannelPipeline機(jī)制,這種機(jī)制需要依賴于EventLoop和EventLoopGroup,因?yàn)樗鼈內(nèi)齻€(gè)都和事件或者事件處理相關(guān)。

    EventLoops的目的是為Channel處理IO操作,一個(gè)EventLoop可以為多個(gè)Channel服務(wù)。

    EventLoopGroup會(huì)包含多個(gè)EventLoop。

    Channel代表了一個(gè)Socket鏈接,或者其它和IO操作相關(guān)的組件,它和EventLoop一起用來(lái)參與IO處理。

    Future,在Netty中所有的IO操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過(guò)一會(huì)等它執(zhí)行完成或者直接注冊(cè)一個(gè)監(jiān)聽,具體的實(shí)現(xiàn)就是通過(guò)Future和ChannelFutures,他們可以注冊(cè)一個(gè)監(jiān)聽,當(dāng)操作執(zhí)行成功或失敗時(shí)監(jiān)聽會(huì)自動(dòng)觸發(fā)??傊?strong>,所有的操作都會(huì)返回一個(gè)ChannelFuture。

Netty是如何處理連接請(qǐng)求和業(yè)務(wù)邏輯的呢?

Channels、Events 和 IO

Netty是一個(gè)非阻塞的、事件驅(qū)動(dòng)的、網(wǎng)絡(luò)編程框架。當(dāng)然,我們很容易理解Netty會(huì)用線程來(lái)處理IO事件,對(duì)于熟悉多線程編程的人來(lái)說(shuō),你或許會(huì)想到如何同步你的代碼,但是Netty不需要我們考慮這些,具體是這樣:

一個(gè)Channel會(huì)對(duì)應(yīng)一個(gè)EventLoop,而一個(gè)EventLoop會(huì)對(duì)應(yīng)著一個(gè)線程,也就是說(shuō),僅有一個(gè)線程在負(fù)責(zé)一個(gè)Channel的IO操作。

關(guān)于這些名詞之間的關(guān)系,可以見下圖:

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

如圖所示:當(dāng)一個(gè)連接到達(dá),Netty會(huì)注冊(cè)一個(gè)channel,然后EventLoopGroup會(huì)分配一個(gè)EventLoop綁定到這個(gè)channel,在這個(gè)channel的整個(gè)生命周期過(guò)程中,都會(huì)由綁定的這個(gè)EventLoop來(lái)為它服務(wù),而這個(gè)EventLoop就是一個(gè)線程。

說(shuō)到這里,那么EventLoops和EventLoopGroups關(guān)系是如何的呢?我們前面說(shuō)過(guò)一個(gè)EventLoopGroup包含多個(gè)Eventloop,但是我們看一下下面這幅圖,這幅圖是一個(gè)繼承樹,從這幅圖中我們可以看出,EventLoop其實(shí)繼承自EventloopGroup,也就是說(shuō),在某些情況下,我們可以把一個(gè)EventLoopGroup當(dāng)做一個(gè)EventLoop來(lái)用。

如何配置一個(gè)Netty應(yīng)用?

BootsStrapping

我們利用BootsStrap來(lái)配置netty 應(yīng)用,它有兩種類型,一種用于Client端:BootsStrap,另一種用于Server端:ServerBootstrap,要想?yún)^(qū)別如何使用它們,你僅需要記住一個(gè)用在Client端,一個(gè)用在Server端。下面我們來(lái)詳細(xì)介紹一下這兩種類型的區(qū)別:

1.第一個(gè)最明顯的區(qū)別是,ServerBootstrap用于Server端,通過(guò)調(diào)用bind()方法來(lái)綁定到一個(gè)端口監(jiān)聽連接;Bootstrap用于Client端,需要調(diào)用connect()方法來(lái)連接服務(wù)器端,但我們也可以通過(guò)調(diào)用bind()方法返回的ChannelFuture中獲取Channel從而去connect服務(wù)器端。

2.客戶端的Bootstrap一般用一個(gè)EventLoopGroup,而服務(wù)器端的ServerBootstrap會(huì)用到兩個(gè)(這兩個(gè)也可以是同一個(gè)實(shí)例)。為何服務(wù)器端要用到兩個(gè)EventLoopGroup呢?這么設(shè)計(jì)有明顯的好處,如果一個(gè)ServerBootstrap有兩個(gè)EventLoopGroup,那么就可以把第一個(gè)EventLoopGroup用來(lái)專門負(fù)責(zé)綁定到端口監(jiān)聽連接事件,而把第二個(gè)EventLoopGroup用來(lái)處理每個(gè)接收到的連接,下面我們用一幅圖來(lái)展現(xiàn)一下這種模式:

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

PS: 如果僅由一個(gè)EventLoopGroup處理所有請(qǐng)求和連接的話,在并發(fā)量很大的情況下,這個(gè)EventLoopGroup有可能會(huì)忙于處理已經(jīng)接收到的連接而不能及時(shí)處理新的連接請(qǐng)求,用兩個(gè)的話,會(huì)有專門的線程來(lái)處理連接請(qǐng)求,不會(huì)導(dǎo)致請(qǐng)求超時(shí)的情況,大大提高了并發(fā)處理能力。

我們知道一個(gè)Channel需要由一個(gè)EventLoop來(lái)綁定,而且兩者一旦綁定就不會(huì)再改變。一般情況下一個(gè)EventLoopGroup中的EventLoop數(shù)量會(huì)少于Channel數(shù)量,那么就很有可能出現(xiàn)一個(gè)多個(gè)Channel公用一個(gè)EventLoop的情況,這就意味著如果一個(gè)Channel中的EventLoop很忙的話,會(huì)影響到這個(gè)Eventloop對(duì)其它Channel的處理這也就是為什么我們不能阻塞EventLoop的原因。

當(dāng)然,我們的Server也可以只用一個(gè)EventLoopGroup,由一個(gè)實(shí)例來(lái)處理連接請(qǐng)求和IO事件,請(qǐng)看下面這幅圖:

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

Netty是如何處理數(shù)據(jù)的?

Netty核心ChannelHandler

下面我們來(lái)看一下netty中是怎樣處理數(shù)據(jù)的,回想一下我們前面講到的Handler,對(duì)了,就是它。說(shuō)到Handler我們就不得不提ChannelPipeline,ChannelPipeline負(fù)責(zé)安排Handler的順序及其執(zhí)行,下面我們就來(lái)詳細(xì)介紹一下他們:

ChannelPipeline and handlers

我們的應(yīng)用程序中用到的最多的應(yīng)該就是ChannelHandler,我們可以這么想象,數(shù)據(jù)在一個(gè)ChannelPipeline中流動(dòng),而ChannelHandler便是其中的一個(gè)個(gè)的小閥門,這些數(shù)據(jù)都會(huì)經(jīng)過(guò)每一個(gè)ChannelHandler并且被它處理。這里有一個(gè)公共接口ChannelHandler:

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

從上圖中我們可以看到,ChannelHandler有兩個(gè)子類ChannelInboundHandler和ChannelOutboundHandler,這兩個(gè)類對(duì)應(yīng)了兩個(gè)數(shù)據(jù)流向,如果數(shù)據(jù)是從外部流入我們的應(yīng)用程序,我們就看做是inbound,相反便是outbound。其實(shí)ChannelHandler和Servlet有些類似,一個(gè)ChannelHandler處理完接收到的數(shù)據(jù)會(huì)傳給下一個(gè)Handler,或者什么不處理,直接傳遞給下一個(gè)。下面我們看一下ChannelPipeline是如何安排ChannelHandler的:

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

從上圖中我們可以看到,一個(gè)ChannelPipeline可以把兩種Handler(ChannelInboundHandler和ChannelOutboundHandler)混合在一起,當(dāng)一個(gè)數(shù)據(jù)流進(jìn)入ChannelPipeline時(shí),它會(huì)從ChannelPipeline頭部開始傳給第一個(gè)ChannelInboundHandler,當(dāng)?shù)谝粋€(gè)處理完后再傳給下一個(gè),一直傳遞到管道的尾部。與之相對(duì)應(yīng)的是,當(dāng)數(shù)據(jù)被寫出時(shí),它會(huì)從管道的尾部開始,先經(jīng)過(guò)管道尾部的“最后”一個(gè)ChannelOutboundHandler,當(dāng)它處理完成后會(huì)傳遞給前一個(gè)ChannelOutboundHandler。

數(shù)據(jù)在各個(gè)Handler之間傳遞,這需要調(diào)用方法中傳遞的ChanneHandlerContext來(lái)操作, 在netty的API中提供了兩個(gè)基類分ChannelOutboundHandlerAdapterChannelInboundHandlerAdapter,他們僅僅實(shí)現(xiàn)了調(diào)用ChanneHandlerContext來(lái)把消息傳遞給下一個(gè)Handler,因?yàn)槲覀冎魂P(guān)心處理數(shù)據(jù),因此我們的程序中可以繼承這兩個(gè)基類來(lái)幫助我們做這些,而我們僅需實(shí)現(xiàn)處理數(shù)據(jù)的部分即可。

我們知道InboundHandler和OutboundHandler在ChannelPipeline中是混合在一起的,那么它們如何區(qū)分彼此呢?其實(shí)很容易,因?yàn)?strong>它們各自實(shí)現(xiàn)的是不同的接口,對(duì)于inbound event,Netty會(huì)自動(dòng)跳過(guò)OutboundHandler,相反若是outbound event,ChannelInboundHandler會(huì)被忽略掉。

當(dāng)一個(gè)ChannelHandler被加入到ChannelPipeline中時(shí),它便會(huì)獲得一個(gè)ChannelHandlerContext的引用,而ChannelHandlerContext可以用來(lái)讀寫Netty中的數(shù)據(jù)流。因此,現(xiàn)在可以有兩種方式來(lái)發(fā)送數(shù)據(jù),一種是把數(shù)據(jù)直接寫入Channel,一種是把數(shù)據(jù)寫入ChannelHandlerContext,它們的區(qū)別是寫入Channel的話,數(shù)據(jù)流會(huì)從Channel的頭開始傳遞,而如果寫入ChannelHandlerContext的話,數(shù)據(jù)流會(huì)流入管道中的下一個(gè)Handler。

如何處理我們的業(yè)務(wù)邏輯?

Encoders, Decoders and Domain Logic

Netty中會(huì)有很多Handler,具體是哪種Handler還要看它們繼承的是InboundAdapter還是OutboundAdapter。當(dāng)然,Netty中還提供了一些列的Adapter來(lái)幫助我們簡(jiǎn)化開發(fā),我們知道在Channelpipeline中每一個(gè)Handler都負(fù)責(zé)把Event傳遞給下一個(gè)Handler,如果有了這些輔助Adapter,這些額外的工作都可自動(dòng)完成,我們只需覆蓋實(shí)現(xiàn)我們真正關(guān)心的部分即可。此外,還有一些Adapter會(huì)提供一些額外的功能,比如編碼和解碼。那么下面我們就來(lái)看一下其中的三種常用的ChannelHandler:

Encoders和Decoders

因?yàn)槲覀冊(cè)诰W(wǎng)絡(luò)傳輸時(shí)只能傳輸字節(jié)流,因此,在發(fā)送數(shù)據(jù)之前,我們必須把我們的message型轉(zhuǎn)換為bytes,與之對(duì)應(yīng),我們?cè)诮邮諗?shù)據(jù)后,必須把接收到的bytes再轉(zhuǎn)換成message。我們把bytes to message這個(gè)過(guò)程稱作Decode(解碼成我們可以理解的),把message to bytes這個(gè)過(guò)程成為Encode。

Netty中提供了很多現(xiàn)成的編碼/解碼器,我們一般從他們的名字中便可知道他們的用途,如ByteToMessageDecoder、MessageToByteEncoder,如專門用來(lái)處理Google Protobuf協(xié)議的ProtobufEncoder、 ProtobufDecoder

我們前面說(shuō)過(guò),具體是哪種Handler就要看它們繼承的是InboundAdapter還是OutboundAdapter,對(duì)于Decoders,很容易便可以知道它是繼承自ChannelInboundHandlerAdapter或 ChannelInboundHandler,因?yàn)榻獯a的意思是把ChannelPipeline傳入的bytes解碼成我們可以理解的message(即Java Object),而ChannelInboundHandler正是處理Inbound Event,而Inbound Event中傳入的正是字節(jié)流。Decoder會(huì)覆蓋其中的“ChannelRead()”方法,在這個(gè)方法中來(lái)調(diào)用具體的decode方法解碼傳遞過(guò)來(lái)的字節(jié)流,然后通過(guò)調(diào)用ChannelHandlerContext.fireChannelRead(decodedMessage)方法把編碼好的Message傳遞給下一個(gè)Handler。與之類似,Encoder就不必多少了。

Domain Logic

其實(shí)我們最最關(guān)心的事情就是如何處理接收到的解碼后的數(shù)據(jù),我們真正的業(yè)務(wù)邏輯便是處理接收到的數(shù)據(jù)。Netty提供了一個(gè)最常用的基類SimpleChannelInboundHandler,其中T就是這個(gè)Handler處理的數(shù)據(jù)的類型(上一個(gè)Handler已經(jīng)替我們解碼好了),消息到達(dá)這個(gè)Handler時(shí),Netty會(huì)自動(dòng)調(diào)用這個(gè)Handler中的channelRead0(ChannelHandlerContext,T)方法,T是傳遞過(guò)來(lái)的數(shù)據(jù)對(duì)象,在這個(gè)方法中我們便可以任意寫我們的業(yè)務(wù)邏輯了。

Netty從某方面來(lái)說(shuō)就是一套NIO框架,在Java NIO基礎(chǔ)上做了封裝,所以要想學(xué)好Netty我建議先理解好Java NIO,

NIO可以稱為New IO也可以稱為Non-blocking IO,它比Java舊的阻塞IO在性能上要高效許多(如果讓每一個(gè)連接中的IO操作都單獨(dú)創(chuàng)建一個(gè)線程,那么阻塞IO并不會(huì)比NIO在性能上落后,但不可能創(chuàng)建無(wú)限多的線程,在連接數(shù)非常多的情況下會(huì)很糟糕)。

  • ByteBuffer:NIO的數(shù)據(jù)傳輸是基于緩沖區(qū)的,ByteBuffer正是NIO數(shù)據(jù)傳輸中所使用的緩沖區(qū)抽象。ByteBuffer支持在堆外分配內(nèi)存,并且嘗試避免在執(zhí)行I/O操作中的多余復(fù)制。一般的I/O操作都需要進(jìn)行系統(tǒng)調(diào)用,這樣會(huì)先切換到內(nèi)核態(tài),內(nèi)核態(tài)要先從文件讀取數(shù)據(jù)到它的緩沖區(qū),只有等數(shù)據(jù)準(zhǔn)備完畢后,才會(huì)從內(nèi)核態(tài)把數(shù)據(jù)寫到用戶態(tài),所謂的阻塞IO其實(shí)就是說(shuō)的在等待數(shù)據(jù)準(zhǔn)備好的這段時(shí)間內(nèi)進(jìn)行阻塞。如果想要避免這個(gè)額外的內(nèi)核操作,可以通過(guò)使用mmap(虛擬內(nèi)存映射)的方式來(lái)讓用戶態(tài)直接操作文件。

  • Channel:它類似于(fd)文件描述符,簡(jiǎn)單地來(lái)說(shuō)它代表了一個(gè)實(shí)體(如一個(gè)硬件設(shè)備、文件、Socket或者一個(gè)能夠執(zhí)行一個(gè)或多個(gè)不同的I/O操作的程序組件)。你可以從一個(gè)Channel中讀取數(shù)據(jù)到緩沖區(qū),也可以將一個(gè)緩沖區(qū)中的數(shù)據(jù)寫入到Channel。

  • Selector:選擇器是NIO實(shí)現(xiàn)的關(guān)鍵,NIO采用的是I/O多路復(fù)用的方式來(lái)實(shí)現(xiàn)非阻塞,Selector通過(guò)在一個(gè)線程中監(jiān)聽每個(gè)Channel的IO事件來(lái)確定有哪些已經(jīng)準(zhǔn)備好進(jìn)行IO操作的Channel,因此可以在任何時(shí)間檢查任意的讀操作或?qū)懖僮鞯耐瓿蔂顟B(tài)。這種方式避免了等待IO操作準(zhǔn)備數(shù)據(jù)時(shí)的阻塞,使用較少的線程便可以處理許多連接,減少了線程切換與維護(hù)的開銷。

    基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

了解了NIO的實(shí)現(xiàn)思想之后,我覺得還很有必要了解一下Unix中的I/O模型,Unix中擁有以下5種I/O模型:

  • 阻塞I/O(Blocking I/O)

  • 非阻塞I/O(Non-blocking I/O)

  • I/O多路復(fù)用(I/O multiplexing (select and poll))

  • 信號(hào)驅(qū)動(dòng)I/O(signal driven I/O (SIGIO))

  • 異步I/O(asynchronous I/O (the POSIX aio_functions))

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

阻塞I/O模型是最常見的I/O模型,通常我們使用的InputStream/OutputStream都是基于阻塞I/O模型。在上圖中,我們使用UDP作為例子,recvfrom()函數(shù)是UDP協(xié)議用于接收數(shù)據(jù)的函數(shù),它需要使用系統(tǒng)調(diào)用并一直阻塞到內(nèi)核將數(shù)據(jù)準(zhǔn)備好,之后再由內(nèi)核緩沖區(qū)復(fù)制數(shù)據(jù)到用戶態(tài)(即是recvfrom()接收到數(shù)據(jù)),所謂阻塞就是在等待內(nèi)核準(zhǔn)備數(shù)據(jù)的這段時(shí)間內(nèi)什么也不干。

舉個(gè)生活中的例子,阻塞I/O就像是你去餐廳吃飯,在等待飯做好的時(shí)間段中,你只能在餐廳中坐著干等(如果你在玩手機(jī)那么這就是非阻塞I/O了)。

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

在非阻塞I/O模型中,內(nèi)核在數(shù)據(jù)尚未準(zhǔn)備好的情況下回返回一個(gè)錯(cuò)誤碼EWOULDBLOCK,而recvfrom并沒(méi)有在失敗的情況下選擇阻塞休眠,而是不斷地向內(nèi)核詢問(wèn)是否已經(jīng)準(zhǔn)備完畢,在上圖中,前三次內(nèi)核都返回了EWOULDBLOCK,直到第四次詢問(wèn)時(shí),內(nèi)核數(shù)據(jù)準(zhǔn)備完畢,然后開始將內(nèi)核中緩存的數(shù)據(jù)復(fù)制到用戶態(tài)。這種不斷詢問(wèn)內(nèi)核以查看某種狀態(tài)是否完成的方式被稱為polling(輪詢)。

非阻塞I/O就像是你在點(diǎn)外賣,只不過(guò)你非常心急,每隔一段時(shí)間就要打電話問(wèn)外賣小哥有沒(méi)有到

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

I/O多路復(fù)用的思想跟非阻塞I/O是一樣的,只不過(guò)在非阻塞I/O中,是在recvfrom的用戶態(tài)(或一個(gè)線程)中去輪詢內(nèi)核,這種方式會(huì)消耗大量的CPU時(shí)間。而I/O多路復(fù)用則是通過(guò)select()或poll()系統(tǒng)調(diào)用來(lái)負(fù)責(zé)進(jìn)行輪詢,以實(shí)現(xiàn)監(jiān)聽I(yíng)/O讀寫事件的狀態(tài)。如上圖中,select監(jiān)聽到一個(gè)datagram可讀時(shí),就交由recvfrom去發(fā)送系統(tǒng)調(diào)用將內(nèi)核中的數(shù)據(jù)復(fù)制到用戶態(tài)。

這種方式的優(yōu)點(diǎn)很明顯,通過(guò)I/O多路復(fù)用可以監(jiān)聽多個(gè)文件描述符,且在內(nèi)核中完成監(jiān)控的任務(wù)。但缺點(diǎn)是至少需要兩個(gè)系統(tǒng)調(diào)用(select()與recvfrom())。

I/O多路復(fù)用同樣適用于點(diǎn)外賣這個(gè)例子,只不過(guò)你在等外賣的期間完全可以做自己的事情,當(dāng)外賣到的時(shí)候會(huì)通過(guò)外賣APP或者由外賣小哥打電話來(lái)通知你(因?yàn)閮?nèi)核會(huì)幫你輪詢)。

Unix中提供了兩種I/O多路復(fù)用函數(shù),select()和poll()。select()的兼容性更好,但它在單個(gè)進(jìn)程中所能監(jiān)控的文件描述符是有限的,這個(gè)值與FD_SETSIZE相關(guān),32位系統(tǒng)中默認(rèn)為1024,64位系統(tǒng)中為2048。select()還有一個(gè)缺點(diǎn)就是他輪詢的方式,它采取了線性掃描的輪詢方式,每次都要遍歷FD_SETSIZE個(gè)文件描述符,不管它們是否活不活躍的。poll()本質(zhì)上與select()的實(shí)現(xiàn)沒(méi)有區(qū)別,不過(guò)在數(shù)據(jù)結(jié)構(gòu)上區(qū)別很大,用戶必須分配一個(gè)pollfd結(jié)構(gòu)數(shù)組,該數(shù)組維護(hù)在內(nèi)核態(tài)中,正因如此,poll()并不像select()那樣擁有大小上限的限制,但缺點(diǎn)同樣也很明顯,大量的fd數(shù)組會(huì)在用戶態(tài)與內(nèi)核態(tài)之間不斷復(fù)制,不管這樣的復(fù)制是否有意義。

還有一種比select()與poll()更加高效的實(shí)現(xiàn)叫做epoll(),它是由Linux內(nèi)核2.6推出的可伸縮的I/O多路復(fù)用實(shí)現(xiàn),目的是為了替代select()與poll()。epoll()同樣沒(méi)有文件描述符上限的限制,它使用一個(gè)文件描述符來(lái)管理多個(gè)文件描述符,并使用一個(gè)紅黑樹來(lái)作為存儲(chǔ)結(jié)構(gòu)。同時(shí)它還支持邊緣觸發(fā)(edge-triggered)與水平觸發(fā)(level-triggered)兩種模式(poll()只支持水平觸發(fā)),在邊緣觸發(fā)模式下,epoll_wait僅會(huì)在新的事件對(duì)象首次被加入到epoll時(shí)返回,而在水平觸發(fā)模式下,epoll_wait會(huì)在事件狀態(tài)未變更前不斷地觸發(fā)。也就是說(shuō),邊緣觸發(fā)模式只會(huì)在文件描述符變?yōu)榫途w狀態(tài)時(shí)通知一次,水平觸發(fā)模式會(huì)不斷地通知該文件描述符直到被處理。

關(guān)于epoll_wait請(qǐng)參考如下epoll API。

// 創(chuàng)建一個(gè)epoll對(duì)象并返回它的文件描述符。
// 參數(shù)flags允許修改epoll的行為,它只有一個(gè)有效值EPOLL_CLOEXEC。
int epoll_create1(int flags);
// 配置對(duì)象,該對(duì)象負(fù)責(zé)描述監(jiān)控哪些文件描述符和哪些事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待與epoll_ctl注冊(cè)的任何事件,直至事件發(fā)生一次或超時(shí)。
// 返回在events中發(fā)生的事件,最多同時(shí)返回maxevents個(gè)。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll另一亮點(diǎn)是采用了事件驅(qū)動(dòng)的方式而不是輪詢,在epoll_ctl中注冊(cè)的文件描述符在事件觸發(fā)的時(shí)候會(huì)通過(guò)一個(gè)回調(diào)機(jī)制來(lái)激活該文件描述符,epoll_wait便可以收到通知。這樣效率就不會(huì)與文件描述符的數(shù)量成正比

在Java NIO2(從JDK1.7開始引入)中,只要Linux內(nèi)核版本在2.6以上,就會(huì)采用epoll,如下源碼所示(DefaultSelectorProvider.java)。

public static SelectorProvider create() {
String osname = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}
// use EPollSelectorProvider for Linux kernels >= 2.6
if ("Linux".equals(osname)) {
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String[] vers = osversion.split("\\.", 0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return new sun.nio.ch.EPollSelectorProvider();
}
} catch (NumberFormatException x) {
// format not recognized
}
}
}
return new sun.nio.ch.PollSelectorProvider();
}

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

信號(hào)驅(qū)動(dòng)I/O模型使用到了信號(hào),內(nèi)核在數(shù)據(jù)準(zhǔn)備就緒時(shí)會(huì)通過(guò)信號(hào)來(lái)進(jìn)行通知。我們首先開啟了一個(gè)信號(hào)驅(qū)動(dòng)I/O套接字,并使用sigaction系統(tǒng)調(diào)用來(lái)安裝信號(hào)處理程序,內(nèi)核直接返回,不會(huì)阻塞用戶態(tài)。當(dāng)datagram準(zhǔn)備好時(shí),內(nèi)核會(huì)發(fā)送SIGIN信號(hào),recvfrom接收到信號(hào)后會(huì)發(fā)送系統(tǒng)調(diào)用開始進(jìn)行I/O操作。

這種模型的優(yōu)點(diǎn)是主進(jìn)程(線程)不會(huì)被阻塞,當(dāng)數(shù)據(jù)準(zhǔn)備就緒時(shí),通過(guò)信號(hào)處理程序來(lái)通知主進(jìn)程(線程)準(zhǔn)備進(jìn)行I/O操作與對(duì)數(shù)據(jù)的處理。

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

我們之前討論的各種I/O模型無(wú)論是阻塞還是非阻塞,它們所說(shuō)的阻塞都是指的數(shù)據(jù)準(zhǔn)備階段。異步I/O模型同樣依賴信號(hào)處理程序來(lái)進(jìn)行通知,但與以上I/O模型都不相同的是,異步I/O模型通知的是I/O操作已經(jīng)完成,而不是數(shù)據(jù)準(zhǔn)備完成。

可以說(shuō)異步I/O模型才是真正的非阻塞,主進(jìn)程只管做自己的事情,然后在I/O操作完成時(shí)調(diào)用回調(diào)函數(shù)來(lái)完成一些對(duì)數(shù)據(jù)的處理操作即可。

閑扯了這么多,想必大家已經(jīng)對(duì)I/O模型有了一個(gè)深刻的認(rèn)識(shí)。之后,我們將會(huì)結(jié)合部分源碼(Netty4.X)來(lái)探討Netty中的各大核心組件,以及如何使用Netty,你會(huì)發(fā)現(xiàn)實(shí)現(xiàn)一個(gè)Netty程序是多么簡(jiǎn)單(而且還伴隨了高性能與可維護(hù)性)。

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

ByteBuf


網(wǎng)絡(luò)傳輸?shù)幕締挝皇亲止?jié),在Java NIO中提供了ByteBuffer作為字節(jié)緩沖區(qū)容器,但該類的API使用起來(lái)不太方便,所以Netty實(shí)現(xiàn)了ByteBuf作為其替代品,下面是使用ByteBuf的優(yōu)點(diǎn):

  • 相比ByteBuffer使用起來(lái)更加簡(jiǎn)單。

  • 通過(guò)內(nèi)置的復(fù)合緩沖區(qū)類型實(shí)現(xiàn)了透明的zero-copy

  • 容量可以按需增長(zhǎng)。

  • 讀和寫使用了不同的索引指針。

  • 支持鏈?zhǔn)秸{(diào)用。

  • 支持引用計(jì)數(shù)與池化。

  • 可以被用戶自定義的緩沖區(qū)類型擴(kuò)展。

在討論ByteBuf之前,我們先需要了解一下ByteBuffer的實(shí)現(xiàn),這樣才能比較深刻地明白它們之間的區(qū)別。

ByteBuffer繼承于abstract class Buffer(所以還有LongBuffer、IntBuffer等其他類型的實(shí)現(xiàn)),本質(zhì)上它只是一個(gè)有限的線性的元素序列,包含了三個(gè)重要的屬性。

  • Capacity:緩沖區(qū)中元素的容量大小,你只能將capacity個(gè)數(shù)量的元素寫入緩沖區(qū),一旦緩沖區(qū)已滿就需要清理緩沖區(qū)才能繼續(xù)寫數(shù)據(jù)。

  • Position:指向下一個(gè)寫入數(shù)據(jù)位置的索引指針,初始位置為0,最大為capacity-1。當(dāng)寫模式轉(zhuǎn)換為讀模式時(shí),position需要被重置為0。

  • Limit:在寫模式中,limit是可以寫入緩沖區(qū)的最大索引,也就是說(shuō)它在寫模式中等價(jià)于緩沖區(qū)的容量。在讀模式中,limit表示可以讀取數(shù)據(jù)的最大索引。

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

由于Buffer中只維護(hù)了position一個(gè)索引指針,所以它在讀寫模式之間的切換需要調(diào)用一個(gè)flip()方法來(lái)重置指針。使用Buffer的流程一般如下:

  • 寫入數(shù)據(jù)到緩沖區(qū)。

  • 調(diào)用flip()方法。

  • 從緩沖區(qū)中讀取數(shù)據(jù)

  • 調(diào)用buffer.clear()或者buffer.compact()清理緩沖區(qū),以便下次寫入數(shù)據(jù)。

RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
// 分配一個(gè)48字節(jié)大小的緩沖區(qū)
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf); // 讀取數(shù)據(jù)到緩沖區(qū)
while (bytesRead != -1) {
buf.flip(); // 將position重置為0
while(buf.hasRemaining()){
System.out.print((char) buf.get()); // 讀取數(shù)據(jù)并輸出到控制臺(tái)
}
buf.clear(); // 清理緩沖區(qū)
bytesRead = inChannel.read(buf);
}
aFile.close();
Buffer中核心方法的實(shí)現(xiàn)也非常簡(jiǎn)單,主要就是在操作指針position。

Buffer中核心方法的實(shí)現(xiàn)也非常簡(jiǎn)單,主要就是在操作指針position。

/**
* Sets this buffer's mark at its position.
*
* @return This buffer
*/
public final Buffer mark() {
mark = position; // mark屬性是用來(lái)標(biāo)記當(dāng)前索引位置的
return this;
}
// 將當(dāng)前索引位置重置為mark所標(biāo)記的位置
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
// 翻轉(zhuǎn)這個(gè)Buffer,將limit設(shè)置為當(dāng)前索引位置,然后再把position重置為0
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
// 清理緩沖區(qū)
// 說(shuō)是清理,也只是把postion與limit進(jìn)行重置,之后再寫入數(shù)據(jù)就會(huì)覆蓋之前的數(shù)據(jù)了
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
// 返回剩余空間
public final int remaining() {
return limit - position;
}

Java NIO中的Buffer API操作的麻煩之處就在于讀寫轉(zhuǎn)換需要手動(dòng)重置指針。而ByteBuf沒(méi)有這種繁瑣性,它維護(hù)了兩個(gè)不同的索引,一個(gè)用于讀取,一個(gè)用于寫入。當(dāng)你從ByteBuf讀取數(shù)據(jù)時(shí),它的readerIndex將會(huì)被遞增已經(jīng)被讀取的字節(jié)數(shù),同樣的,當(dāng)你寫入數(shù)據(jù)時(shí),writerIndex則會(huì)遞增。readerIndex的最大范圍在writerIndex的所在位置,如果試圖移動(dòng)readerIndex超過(guò)該值則會(huì)觸發(fā)異常。

ByteBuf中名稱以read或write開頭的方法將會(huì)遞增它們其對(duì)應(yīng)的索引,而名稱以get或set開頭的方法則不會(huì)。ByteBuf同樣可以指定一個(gè)最大容量,試圖移動(dòng)writerIndex超過(guò)該值則會(huì)觸發(fā)異常。

public byte readByte() {
 this.checkReadableBytes0(1); // 檢查readerIndex是否已越界
 int i = this.readerIndex;
 byte b = this._getByte(i);
 this.readerIndex = i + 1; // 遞增readerIndex
 return b;
}
private void checkReadableBytes0(int minimumReadableBytes) {
 this.ensureAccessible();
 if(this.readerIndex > this.writerIndex - minimumReadableBytes) {
 throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this}));
 }
}
public ByteBuf writeByte(int value) {
 this.ensureAccessible();
 this.ensureWritable0(1); // 檢查writerIndex是否會(huì)越過(guò)capacity
 this._setByte(this.writerIndex++, value);
 return this;
}
private void ensureWritable0(int minWritableBytes) {
 if(minWritableBytes > this.writableBytes()) {
 if(minWritableBytes > this.maxCapacity - this.writerIndex) {
 throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this}));
 } else {
 int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity);
 this.capacity(newCapacity);
 }
 }
}
// get與set只對(duì)傳入的索引進(jìn)行了檢查,然后對(duì)其位置進(jìn)行g(shù)et或set
public byte getByte(int index) {
 this.checkIndex(index);
 return this._getByte(index);
}
public ByteBuf setByte(int index, int value) {
 this.checkIndex(index);
 this._setByte(index, value);
 return this;
}

ByteBuf同樣支持在堆內(nèi)和堆外進(jìn)行分配。在堆內(nèi)分配也被稱為支撐數(shù)組模式,它能在沒(méi)有使用池化的情況下提供快速的分配和釋放。

ByteBuf heapBuf = Unpooled.copiedBuffer(bytes);
if (heapBuf.hasArray()) { // 判斷是否有一個(gè)支撐數(shù)組
byte[] array = heapBuf.array();
// 計(jì)算第一個(gè)字節(jié)的偏移量
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
int length = heapBuf.readableBytes(); // 獲得可讀字節(jié)
handleArray(array,offset,length); // 調(diào)用你的處理方法
}

另一種模式為堆外分配,Java NIO ByteBuffer類在JDK1.4時(shí)就已經(jīng)允許JVM實(shí)現(xiàn)通過(guò)JNI調(diào)用來(lái)在堆外分配內(nèi)存(調(diào)用malloc()函數(shù)在JVM堆外分配內(nèi)存),這主要是為了避免額外的緩沖區(qū)復(fù)制操作。

ByteBuf directBuf = Unpooled.directBuffer(capacity);
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
// 將字節(jié)復(fù)制到數(shù)組中
directBuf.getBytes(directBuf.readerIndex(),array);
handleArray(array,0,length);
}

ByteBuf還支持第三種模式,它被稱為復(fù)合緩沖區(qū),為多個(gè)ByteBuf提供了一個(gè)聚合視圖。在這個(gè)視圖中,你可以根據(jù)需要添加或者刪除ByteBuf實(shí)例,ByteBuf的子類CompositeByteBuf實(shí)現(xiàn)了該模式。

一個(gè)適合使用復(fù)合緩沖區(qū)的場(chǎng)景是HTTP協(xié)議,通過(guò)HTTP協(xié)議傳輸?shù)南⒍紩?huì)被分成兩部分——頭部和主體,如果這兩部分由應(yīng)用程序的不同模塊產(chǎn)生,將在消息發(fā)送時(shí)進(jìn)行組裝,并且該應(yīng)用程序還會(huì)為多個(gè)消息復(fù)用相同的消息主體,這樣對(duì)于每個(gè)消息都將會(huì)創(chuàng)建一個(gè)新的頭部,產(chǎn)生了很多不必要的內(nèi)存操作。使用CompositeByteBuf是一個(gè)很好的選擇,它消除了這些額外的復(fù)制,以幫助你復(fù)用這些消息。

CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ....;
ByteBuf bodyBuf = ....;
messageBuf.addComponents(headerBuf,bodyBuf);
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}

CompositeByteBuf透明的實(shí)現(xiàn)了zero-copy,zero-copy其實(shí)就是避免數(shù)據(jù)在兩個(gè)內(nèi)存區(qū)域中來(lái)回的復(fù)制。從操作系統(tǒng)層面上來(lái)講,zero-copy指的是避免在內(nèi)核態(tài)與用戶態(tài)之間的數(shù)據(jù)緩沖區(qū)復(fù)制(通過(guò)mmap避免),而Netty中的zero-copy更偏向于在用戶態(tài)中的數(shù)據(jù)操作的優(yōu)化,就像使用CompositeByteBuf來(lái)復(fù)用多個(gè)ByteBuf以避免額外的復(fù)制,也可以使用wrap()方法來(lái)將一個(gè)字節(jié)數(shù)組包裝成ByteBuf,又或者使用ByteBuf的slice()方法把它分割為多個(gè)共享同一內(nèi)存區(qū)域的ByteBuf,這些都是為了優(yōu)化內(nèi)存的使用率。

那么如何創(chuàng)建ByteBuf呢?在上面的代碼中使用到了Unpooled,它是Netty提供的一個(gè)用于創(chuàng)建與分配ByteBuf的工具類,建議都使用這個(gè)工具類來(lái)創(chuàng)建你的緩沖區(qū),不要自己去調(diào)用構(gòu)造函數(shù)。經(jīng)常使用的是wrappedBuffer()與copiedBuffer(),它們一個(gè)是用于將一個(gè)字節(jié)數(shù)組或ByteBuffer包裝為一個(gè)ByteBuf,一個(gè)是根據(jù)傳入的字節(jié)數(shù)組與ByteBuffer/ByteBuf來(lái)復(fù)制出一個(gè)新的ByteBuf。

// 通過(guò)array.clone()來(lái)復(fù)制一個(gè)數(shù)組進(jìn)行包裝
public static ByteBuf copiedBuffer(byte[] array) {
return array.length == 0?EMPTY_BUFFER:wrappedBuffer((byte[])array.clone());
}
// 默認(rèn)是堆內(nèi)分配
public static ByteBuf wrappedBuffer(byte[] array) {
return (ByteBuf)(array.length == 0?EMPTY_BUFFER:new UnpooledHeapByteBuf(ALLOC, array, array.length));
}
// 也提供了堆外分配的方法
private static final ByteBufAllocator ALLOC;
public static ByteBuf directBuffer(int initialCapacity) {
return ALLOC.directBuffer(initialCapacity);
}
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
ByteBuf buffer = allocator.directBuffer();
do something.......

為了優(yōu)化內(nèi)存使用率,Netty提供了一套手動(dòng)的方式來(lái)追蹤不活躍對(duì)象,像UnpooledHeapByteBuf這種分配在堆內(nèi)的對(duì)象得益于JVM的GC管理,無(wú)需額外操心,而UnpooledDirectByteBuf是在堆外分配的,它的內(nèi)部基于DirectByteBuffer,DirectByteBuffer會(huì)先向Bits類申請(qǐng)一個(gè)額度(Bits還擁有一個(gè)全局變量totalCapacity,記錄了所有DirectByteBuffer總大?。看紊暾?qǐng)前都會(huì)查看是否已經(jīng)超過(guò)-XX:MaxDirectMemorySize所設(shè)置的上限,如果超限就會(huì)嘗試調(diào)用System.gc(),以試圖回收一部分內(nèi)存,然后休眠100毫秒,如果內(nèi)存還是不足,則只能拋出OOM異常。堆外內(nèi)存的回收雖然有了這么一層保障,但為了提高性能與使用率,主動(dòng)回收也是很有必要的。由于Netty還實(shí)現(xiàn)了ByteBuf的池化,像PooledHeapByteBuf和PooledDirectByteBuf就必須依賴于手動(dòng)的方式來(lái)進(jìn)行回收(放回池中)。

Netty使用了引用計(jì)數(shù)器的方式來(lái)追蹤那些不活躍的對(duì)象。引用計(jì)數(shù)的接口為ReferenceCounted,它的思想很簡(jiǎn)單,只要ByteBuf對(duì)象的引用計(jì)數(shù)大于0,就保證該對(duì)象不會(huì)被釋放回收,可以通過(guò)手動(dòng)調(diào)用release()與retain()方法來(lái)操作該對(duì)象的引用計(jì)數(shù)值遞減或遞增。用戶也可以通過(guò)自定義一個(gè)ReferenceCounted的實(shí)現(xiàn)類,以滿足自定義的規(guī)則。

package io.netty.buffer;
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
// 由于ByteBuf的實(shí)例對(duì)象會(huì)非常多,所以這里沒(méi)有將refCnt包裝為AtomicInteger
// 而是使用一個(gè)全局的AtomicIntegerFieldUpdater來(lái)負(fù)責(zé)操作refCnt
private static final AtomicIntegerFieldUpdater
 refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
// 每個(gè)ByteBuf的初始引用值都為1
private volatile int refCnt = 1;
public int refCnt() {
return this.refCnt;
}
protected final void setRefCnt(int refCnt) {
this.refCnt = refCnt;
}
public ByteBuf retain() {
return this.retain0(1);
}
// 引用計(jì)數(shù)值遞增increment,increment必須大于0
public ByteBuf retain(int increment) {
return this.retain0(ObjectUtil.checkPositive(increment, "increment"));
}
public static int checkPositive(int i, String name) {
if(i <= 0) {
throw new IllegalArgumentException(name + ": " + i + " (expected: > 0)");
} else {
return i;
} 
}
// 使用CAS操作不斷嘗試更新值
private ByteBuf retain0(int increment) {
int refCnt;
int nextCnt;
do {
refCnt = this.refCnt;
nextCnt = refCnt + increment;
if(nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, nextCnt));
return this;
}
public boolean release() {
return this.release0(1);
}
public boolean release(int decrement) {
return this.release0(ObjectUtil.checkPositive(decrement, "decrement"));
}
private boolean release0(int decrement) {
int refCnt;
do {
refCnt = this.refCnt;
if(refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));
if(refCnt == decrement) {
this.deallocate();
return true;
} else {
return false;
}
}
protected abstract void deallocate();
}

Channel


Netty中的Channel與Java NIO的概念一樣,都是對(duì)一個(gè)實(shí)體或連接的抽象,但Netty提供了一套更加通用的API。就以網(wǎng)絡(luò)套接字為例,在Java中OIO與NIO是截然不同的兩套API,假設(shè)你之前使用的是OIO而又想更改為NIO實(shí)現(xiàn),那么幾乎需要重寫所有代碼。而在Netty中,只需要更改短短幾行代碼(更改Channel與EventLoop的實(shí)現(xiàn)類,如把OioServerSocketChannel替換為NioServerSocketChannel),就可以完成OIO與NIO(或其他)之間的轉(zhuǎn)換。

基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件

每個(gè)Channel最終都會(huì)被分配一個(gè)ChannelPipelineChannelConfig,前者持有所有負(fù)責(zé)處理入站與出站數(shù)據(jù)以及事件的ChannelHandler,后者包含了該Channel的所有配置設(shè)置,并且支持熱更新,由于不同的傳輸類型可能具有其特別的配置,所以該類可能會(huì)實(shí)現(xiàn)為ChannelConfig的不同子類。

Channel是線程安全的(與之后要講的線程模型有關(guān)),因此你完全可以在多個(gè)線程中復(fù)用同一個(gè)Channel,就像如下代碼所示。

final Channel channel = ...
final ByteBuf buffer = Unpooled.copiedBuffer("Hello,World!", CharsetUtil.UTF_8).retain();
Runnable writer = new Runnable() {
@Override
public void run() {
channel.writeAndFlush(buffer.duplicate());
}
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(writer);
executor.execute(writer);
.......

Netty除了支持常見的NIO與OIO,還內(nèi)置了其他的傳輸類型。

NmaePackageDescription
NIOio.netty.channel.socket.nio以Java NIO為基礎(chǔ)實(shí)現(xiàn)
OIOio.netty.channel.socket.oio以java.net為基礎(chǔ)實(shí)現(xiàn),使用阻塞I/O模型
Epollio.netty.channel.epoll由JNI驅(qū)動(dòng)epoll()實(shí)現(xiàn)的更高性能的非阻塞I/O,它只能使用在Linux
Localio.netty.channel.local本地傳輸,在JVM內(nèi)部通過(guò)管道進(jìn)行通信
Embeddedio.netty.channel.embedded允許在不需要真實(shí)網(wǎng)絡(luò)傳輸?shù)沫h(huán)境下使用ChannelHandler,主要用于對(duì)ChannelHandler進(jìn)行測(cè)試

NIO、OIO、Epoll我們應(yīng)該已經(jīng)很熟悉了,下面主要說(shuō)說(shuō)Local與Embedded。

Local傳輸用于在同一個(gè)JVM中運(yùn)行的客戶端和服務(wù)器程序之間的異步通信,與服務(wù)器Channel相關(guān)聯(lián)的SocketAddress并沒(méi)有綁定真正的物理網(wǎng)絡(luò)地址,它會(huì)被存儲(chǔ)在注冊(cè)表中,并在Channel關(guān)閉時(shí)注銷。因此Local傳輸不會(huì)接受真正的網(wǎng)絡(luò)流量,也就是說(shuō)它不能與其他傳輸實(shí)現(xiàn)進(jìn)行互操作。

Embedded傳輸主要用于對(duì)ChannelHandler進(jìn)行單元測(cè)試,ChannelHandler是用于處理消息的邏輯組件,Netty通過(guò)將入站消息與出站消息都寫入到EmbeddedChannel中的方式(提供了write/readInbound()與write/readOutbound()來(lái)讀寫入站與出站消息)來(lái)實(shí)現(xiàn)對(duì)ChannelHandler的單元測(cè)試。

ChannelHandler


ChannelHandler充當(dāng)了處理入站出站數(shù)據(jù)的應(yīng)用程序邏輯的容器,該類是基于事件驅(qū)動(dòng)的,它會(huì)響應(yīng)相關(guān)的事件然后去調(diào)用其關(guān)聯(lián)的回調(diào)函數(shù),例如當(dāng)一個(gè)新的連接被建立時(shí),ChannelHandler的channelActive()方法將會(huì)被調(diào)用。

關(guān)于入站消息和出站消息的數(shù)據(jù)流向定義,如果以客戶端為主視角來(lái)說(shuō)的話,那么從客戶端流向服務(wù)器的數(shù)據(jù)被稱為出站,反之為入站。

入站事件是可能被入站數(shù)據(jù)或者相關(guān)的狀態(tài)更改而觸發(fā)的事件,包括:連接已被激活、連接失活、讀取入站數(shù)據(jù)、用戶事件、發(fā)生異常等。

出站事件是未來(lái)將會(huì)觸發(fā)的某個(gè)動(dòng)作的結(jié)果的事件,這些動(dòng)作包括:打開或關(guān)閉遠(yuǎn)程節(jié)點(diǎn)的連接、將數(shù)據(jù)寫(或沖刷)到套接字。

ChannelHandler的主要用途包括:

  • 對(duì)入站與出站數(shù)據(jù)的業(yè)務(wù)邏輯處理

  • 記錄日志

  • 將數(shù)據(jù)從一種格式轉(zhuǎn)換為另一種格式,實(shí)現(xiàn)編解碼器。以一次HTTP協(xié)議(或者其他應(yīng)用層協(xié)議)的流程為例,數(shù)據(jù)在網(wǎng)絡(luò)傳輸時(shí)的單位為字節(jié),當(dāng)客戶端發(fā)送請(qǐng)求到服務(wù)器時(shí),服務(wù)器需要通過(guò)解碼器(處理入站消息)將字節(jié)解碼為協(xié)議的消息內(nèi)容,服務(wù)器在發(fā)送響應(yīng)的時(shí)候(處理出站消息),還需要通過(guò)編碼器將消息內(nèi)容編碼為字節(jié)。

  • 捕獲異常

  • 提供Channel生命周期內(nèi)的通知,如Channel活動(dòng)時(shí)與非活動(dòng)時(shí)

Netty中到處都充滿了異步與事件驅(qū)動(dòng),而回調(diào)函數(shù)正是用于響應(yīng)事件之后的操作。由于異步會(huì)直接返回一個(gè)結(jié)果,所以Netty提供了ChannelFuture(實(shí)現(xiàn)了java.util.concurrent.Future)來(lái)作為異步調(diào)用返回的占位符,真正的結(jié)果會(huì)在未來(lái)的某個(gè)時(shí)刻完成,到時(shí)候就可以通過(guò)ChannelFuture對(duì)其進(jìn)行訪問(wèn),每個(gè)Netty的出站I/O操作都將會(huì)返回一個(gè)ChannelFuture。

Netty還提供了ChannelFutureListener接口來(lái)監(jiān)聽ChannelFuture是否成功,并采取對(duì)應(yīng)的操作。

Channel channel = ...
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1",6666));
// 注冊(cè)一個(gè)監(jiān)聽器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// do something....
} else {
// 輸出錯(cuò)誤信息
Throwable cause = future.cause();
cause.printStackTrace();
// do something....
}
}
});

ChannelFutureListener接口中還提供了幾個(gè)簡(jiǎn)單的默認(rèn)實(shí)現(xiàn),方便我們使用。

package io.netty.channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
public interface ChannelFutureListener extends GenericFutureListener{
// 在Future完成時(shí)關(guān)閉
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
// 如果失敗則關(guān)閉
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(!future.isSuccess()) {
future.channel().close();
}
}
};
// 將異常信息傳遞給下一個(gè)ChannelHandler
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
};
}ChannelHandler接口定義了對(duì)它生命周期進(jìn)行監(jiān)聽的回調(diào)函數(shù),在ChannelHandler被添加到ChannelPipeline或者被移除時(shí)都會(huì)調(diào)用這些函數(shù)。package io.netty.channel;
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext var1) throws Exception;
void handlerRemoved(ChannelHandlerContext var1) throws Exception;
/** @deprecated */
@Deprecated
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
// 該注解表明這個(gè)ChannelHandler可被其他線程復(fù)用
@Inherited
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface Sharable {
}
}入站消息與出站消息由其對(duì)應(yīng)的接口ChannelInboundHandler與ChannelOutboundHandler負(fù)責(zé),這兩個(gè)接口定義了監(jiān)聽Channel的生命周期的狀態(tài)改變事件的回調(diào)函數(shù)。package io.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
public interface ChannelInboundHandler extends ChannelHandler {
// 當(dāng)channel被注冊(cè)到EventLoop時(shí)被調(diào)用
void channelRegistered(ChannelHandlerContext var1) throws Exception;
// 當(dāng)channel已經(jīng)被創(chuàng)建,但還未注冊(cè)到EventLoop(或者從EventLoop中注銷)被調(diào)用
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
// 當(dāng)channel處于活動(dòng)狀態(tài)(連接到遠(yuǎn)程節(jié)點(diǎn))被調(diào)用
void channelActive(ChannelHandlerContext var1) throws Exception;
// 當(dāng)channel處于非活動(dòng)狀態(tài)(沒(méi)有連接到遠(yuǎn)程節(jié)點(diǎn))被調(diào)用
void channelInactive(ChannelHandlerContext var1) throws Exception;
// 當(dāng)從channel讀取數(shù)據(jù)時(shí)被調(diào)用
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
// 當(dāng)channel的上一個(gè)讀操作完成時(shí)被調(diào)用
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
// 當(dāng)ChannelInboundHandler.fireUserEventTriggered()方法被調(diào)用時(shí)被調(diào)用
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
// 當(dāng)channel的可寫狀態(tài)發(fā)生改變時(shí)被調(diào)用
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
// 當(dāng)處理過(guò)程中發(fā)生異常時(shí)被調(diào)用
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
package io.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.net.SocketAddress;
public interface ChannelOutboundHandler extends ChannelHandler {
// 當(dāng)請(qǐng)求將Channel綁定到一個(gè)地址時(shí)被調(diào)用
// ChannelPromise是ChannelFuture的一個(gè)子接口,定義了如setSuccess(),setFailure()等方法
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
// 當(dāng)請(qǐng)求將Channel連接到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
// 當(dāng)請(qǐng)求將Channel從遠(yuǎn)程節(jié)點(diǎn)斷開時(shí)被調(diào)用
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當(dāng)請(qǐng)求關(guān)閉Channel時(shí)被調(diào)用
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當(dāng)請(qǐng)求將Channel從它的EventLoop中注銷時(shí)被調(diào)用
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當(dāng)請(qǐng)求從Channel讀取數(shù)據(jù)時(shí)被調(diào)用
void read(ChannelHandlerContext var1) throws Exception;
// 當(dāng)請(qǐng)求通過(guò)Channel將數(shù)據(jù)寫到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
// 當(dāng)請(qǐng)求通過(guò)Channel將緩沖中的數(shù)據(jù)沖刷到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用
void flush(ChannelHandlerContext var1) throws Exception;
}通過(guò)實(shí)現(xiàn)ChannelInboundHandler或者ChannelOutboundHandler就可以完成用戶自定義的應(yīng)用邏輯處理程序,不過(guò)Netty已經(jīng)幫你實(shí)現(xiàn)了一些基本操作,用戶只需要繼承并擴(kuò)展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter來(lái)作為自定義實(shí)現(xiàn)的起始點(diǎn)。ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter都繼承于ChannelHandlerAdapter,該抽象類簡(jiǎn)單實(shí)現(xiàn)了ChannelHandler接口。public abstract class ChannelHandlerAdapter implements ChannelHandler {
boolean added;
public ChannelHandlerAdapter() {
}
// 該方法不允許將此ChannelHandler共享復(fù)用
protected void ensureNotSharable() {
if(this.isSharable()) {
throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
}
}
// 使用反射判斷實(shí)現(xiàn)類有沒(méi)有@Sharable注解,以確認(rèn)該類是否為可共享復(fù)用的
public boolean isSharable() {
Class clazz = this.getClass();
Map cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = (Boolean)cache.get(clazz);
if(sharable == null) {
sharable = Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class));
cache.put(clazz, sharable);
}
return sharable.booleanValue();
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter默認(rèn)只是簡(jiǎn)單地將請(qǐng)求傳遞給ChannelPipeline中的下一個(gè)ChannelHandler,源碼如下:public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public ChannelInboundHandlerAdapter() {
}
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
public ChannelOutboundHandlerAdapter() {
}
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}對(duì)于處理入站消息,另外一種選擇是繼承SimpleChannelInboundHandler,它是Netty的一個(gè)繼承于ChannelInboundHandlerAdapter的抽象類,并在其之上實(shí)現(xiàn)了自動(dòng)釋放資源的功能。我們?cè)诹私釨yteBuf時(shí)就已經(jīng)知道了Netty使用了一套自己實(shí)現(xiàn)的引用計(jì)數(shù)算法來(lái)主動(dòng)釋放資源,假設(shè)你的ChannelHandler繼承于ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter,那么你就有責(zé)任去管理你所分配的ByteBuf,一般來(lái)說(shuō),一個(gè)消息對(duì)象(ByteBuf)已經(jīng)被消費(fèi)(或丟棄)了,并不會(huì)傳遞給ChannelHandler鏈中的下一個(gè)處理器(如果該消息到達(dá)了實(shí)際的傳輸層,那么當(dāng)它被寫入或Channel關(guān)閉時(shí),都會(huì)被自動(dòng)釋放),所以你就需要去手動(dòng)釋放它。通過(guò)一個(gè)簡(jiǎn)單的工具類ReferenceCountUtil的release方法,就可以做到這一點(diǎn)。// 這個(gè)泛型為消息對(duì)象的類型
public abstract class SimpleChannelInboundHandler extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean autoRelease;
protected SimpleChannelInboundHandler() {
    this(true);
}
protected SimpleChannelInboundHandler(boolean autoRelease) {
        this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
        this.autoRelease = autoRelease;
}
protected SimpleChannelInboundHandler(Class inboundMessageType) {
        this(inboundMessageType, true);
}
protected SimpleChannelInboundHandler(Class inboundMessageType, boolean autoRelease) {
            this.matcher = TypeParameterMatcher.get(inboundMessageType);
            this.autoRelease = autoRelease;
}
public boolean acceptInboundMessage(Object msg) throws Exception {
        return this.matcher.match(msg);
}
// SimpleChannelInboundHandler只是替你做了ReferenceCountUtil.release()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
    if(this.acceptInboundMessage(msg)) {
             this.channelRead0(ctx, msg);
    } else {
            release = false;
            ctx.fireChannelRead(msg);
    }
} finally {
        if(this.autoRelease && release) {
            //ByteBuf的釋放
            ReferenceCountUtil.release(msg);
        }
    }
}
// 這個(gè)方法才是我們需要實(shí)現(xiàn)的方法
protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;
}
// ReferenceCountUtil中的源碼,release方法對(duì)消息對(duì)象的類型進(jìn)行判斷然后調(diào)用它的release()方法
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted?((ReferenceCounted)msg).release():false;
}ChannelPipeline為了模塊化與解耦合,不可能由一個(gè)ChannelHandler來(lái)完成所有應(yīng)用邏輯,所以Netty采用了攔截器鏈的設(shè)計(jì)。ChannelPipeline就是用來(lái)管理ChannelHandler實(shí)例鏈的容器,它的職責(zé)就是保證實(shí)例鏈的流動(dòng)。每一個(gè)新創(chuàng)建的Channel都將會(huì)被分配一個(gè)新的ChannelPipeline,這種關(guān)聯(lián)關(guān)系是永久性的,一個(gè)Channel一生只能對(duì)應(yīng)一個(gè)ChannelPipeline。一個(gè)入站事件被觸發(fā)時(shí),它會(huì)先從ChannelPipeline的最左端(頭部)開始一直傳播到ChannelPipeline的最右端(尾部),而出站事件正好與入站事件順序相反(從最右端一直傳播到最左端)。這個(gè)順序是定死的,Netty總是將ChannelPipeline的入站口作為頭部,而將出站口作為尾部。在事件傳播的過(guò)程中,ChannelPipeline會(huì)判斷下一個(gè)ChannelHandler的類型是否和事件的運(yùn)動(dòng)方向相匹配,如果不匹配,就跳過(guò)該ChannelHandler并繼續(xù)檢查下一個(gè)(保證入站事件只會(huì)被ChannelInboundHandler處理),一個(gè)ChannelHandler也可以同時(shí)實(shí)現(xiàn)ChannelInboundHandler與ChannelOutboundHandler,它在入站事件與出站事件中都會(huì)被調(diào)用。在閱讀ChannelHandler的源碼時(shí),發(fā)現(xiàn)很多方法需要一個(gè)ChannelHandlerContext類型的參數(shù),該接口是ChannelPipeline與ChannelHandler之間相關(guān)聯(lián)的關(guān)鍵。ChannelHandlerContext可以通知ChannelPipeline中的當(dāng)前ChannelHandler的下一個(gè)ChannelHandler,還可以動(dòng)態(tài)地改變當(dāng)前ChannelHandler在ChannelPipeline中的位置(通過(guò)調(diào)用ChannelPipeline中的各種方法來(lái)修改)。ChannelHandlerContext負(fù)責(zé)了在同一個(gè)ChannelPipeline中的ChannelHandler與其他ChannelHandler之間的交互,每個(gè)ChannelHandlerContext都對(duì)應(yīng)了一個(gè)ChannelHandler。在DefaultChannelPipeline的源碼中,已經(jīng)表現(xiàn)的很明顯了。public class DefaultChannelPipeline implements ChannelPipeline {
.........
// 頭部節(jié)點(diǎn)和尾部節(jié)點(diǎn)的引用變量
// ChannelHandlerContext在ChannelPipeline中是以鏈表的形式組織的
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
.........
// 添加一個(gè)ChannelHandler到鏈表尾部
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return this.addLast((EventExecutorGroup)null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized(this) {
// 檢查ChannelHandler是否為一個(gè)共享對(duì)象(@Sharable)
// 如果該ChannelHandler沒(méi)有@Sharable注解,并且是已被添加過(guò)的那么就拋出異常
checkMultiplicity(handler);
// 返回一個(gè)DefaultChannelHandlerContext,注意該對(duì)象持有了傳入的ChannelHandler
newCtx = this.newContext(group, this.filterName(name, handler), handler);
this.addLast0(newCtx);
// 如果當(dāng)前ChannelPipeline沒(méi)有被注冊(cè),那么就先加到未決鏈表中
if(!this.registered) {
newCtx.setAddPending();
this.callHandlerCallbackLater(newCtx, true);
return this;
}
// 否則就調(diào)用ChannelHandler中的handlerAdded()
EventExecutor executor = newCtx.executor();
if(!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
public void run() {
DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
}
});
return this;
}
}
this.callHandlerAdded0(newCtx);
return this;
}
// 將新的ChannelHandlerContext插入到尾部與尾部之前的節(jié)點(diǎn)之間
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = this.tail.prev;
newCtx.prev = prev;
newCtx.next = this.tail;
prev.next = newCtx;
this.tail.prev = newCtx;
}
.....
}ChannelHandlerContext還定義了許多與Channel和ChannelPipeline重合的方法(像read()、write()、connect()這些用于出站的方法或者如fireChannelXXXX()這樣用于入站的方法),不同之處在于調(diào)用Channel或者ChannelPipeline上的這些方法,它們將會(huì)從頭沿著整個(gè)ChannelHandler實(shí)例鏈進(jìn)行傳播,而調(diào)用位于ChannelHandlerContext上的相同方法,則會(huì)從當(dāng)前所關(guān)聯(lián)的ChannelHandler開始,且只會(huì)傳播給實(shí)例鏈中的下一個(gè)ChannelHandler。而且,事件之間的移動(dòng)(從一個(gè)ChannelHandler到下一個(gè)ChannelHandler)也是通過(guò)ChannelHandlerContext中的方法調(diào)用完成的。public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelPipeline fireChannelRead(Object msg) {
// 注意這里將頭節(jié)點(diǎn)傳入了進(jìn)去
AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
return this;
}
}
---------------------------------------------------------------
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if(executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if(this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelRead(this, msg);
} catch (Throwable var3) {
this.notifyHandlerException(var3);
}
} else {
// 尋找下一個(gè)ChannelHandler
this.fireChannelRead(msg);
}
}
public ChannelHandlerContext fireChannelRead(Object msg) {
invokeChannelRead(this.findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while(!ctx.inbound); // 直到找到一個(gè)ChannelInboundHandler
return ctx;
}
}EventLoop為了最大限度地提供高性能和可維護(hù)性,Netty設(shè)計(jì)了一套強(qiáng)大又易用的線程模型。在一個(gè)網(wǎng)絡(luò)框架中,最重要的能力是能夠快速高效地處理在連接的生命周期內(nèi)發(fā)生的各種事件,與之相匹配的程序構(gòu)造被稱為事件循環(huán),Netty定義了接口EventLoop來(lái)負(fù)責(zé)這項(xiàng)工作。如果是經(jīng)常用Java進(jìn)行多線程開發(fā)的童鞋想必經(jīng)常會(huì)使用到線程池,也就是Executor這套API。Netty就是從Executor(java.util.concurrent)之上擴(kuò)展了自己的EventExecutorGroup(io.netty.util.concurrent),同時(shí)為了與Channel的事件進(jìn)行交互,還擴(kuò)展了EventLoopGroup接口(io.netty.channel)。在io.netty.util.concurrent包下的EventExecutorXXX負(fù)責(zé)實(shí)現(xiàn)線程并發(fā)相關(guān)的工作,而在io.netty.channel包下的EventLoopXXX負(fù)責(zé)實(shí)現(xiàn)網(wǎng)絡(luò)編程相關(guān)的工作(處理Channel中的事件)。在Netty的線程模型中,一個(gè)EventLoop將由一個(gè)永遠(yuǎn)不會(huì)改變的Thread驅(qū)動(dòng),而一個(gè)Channel一生只會(huì)使用一個(gè)EventLoop(但是一個(gè)EventLoop可能會(huì)被指派用于服務(wù)多個(gè)Channel),在Channel中的所有I/O操作和事件都由EventLoop中的線程處理,也就是說(shuō)一個(gè)Channel的一生之中都只會(huì)使用到一個(gè)線程。不過(guò)在Netty3,只有入站事件會(huì)被EventLoop處理,所有出站事件都會(huì)由調(diào)用線程處理,這種設(shè)計(jì)導(dǎo)致了ChannelHandler的線程安全問(wèn)題。Netty4簡(jiǎn)化了線程模型,通過(guò)在同一個(gè)線程處理所有事件,既解決了這個(gè)問(wèn)題,還提供了一個(gè)更加簡(jiǎn)單的架構(gòu)。package io.netty.channel;
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", 2147483647));
    //內(nèi)部隊(duì)列
private final QueuetailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
// 返回它所在的EventLoopGroup
public EventLoopGroup parent() {
return (EventLoopGroup)super.parent();
}
public EventLoop next() {
return (EventLoop)super.next();
}
// 注冊(cè)Channel,這里ChannelPromise和Channel關(guān)聯(lián)到了一起
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
// 剩下這些函數(shù)都是用于調(diào)度任務(wù)
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if(this.isShutdown()) {
reject();
}
if(!this.tailTasks.offer(task)) {
this.reject(task);
}
if(this.wakesUpForTask(task)) {
this.wakeup(this.inEventLoop());
}
}
final boolean removeAfterEventLoopIterationTask(Runnable task) {
return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof SingleThreadEventLoop.NonWakeupRunnable);
}
protected void afterRunningAllTasks() {
this.runAllTasksFrom(this.tailTasks);
}
protected boolean hasTasks() {
return super.hasTasks() || !this.tailTasks.isEmpty();
}
public int pendingTasks() {
return super.pendingTasks() + this.tailTasks.size();
}
interface NonWakeupRunnable extends Runnable {
}
}為了確保一個(gè)Channel的整個(gè)生命周期中的I/O事件會(huì)被一個(gè)EventLoop負(fù)責(zé),Netty通過(guò)inEventLoop()方法來(lái)判斷當(dāng)前執(zhí)行的線程的身份,確定它是否是分配給當(dāng)前Channel以及它的EventLoop的那一個(gè)線程。如果當(dāng)前(調(diào)用)線程正是EventLoop中的線程,那么所提交的任務(wù)將會(huì)被(true)直接執(zhí)行,否則,EventLoop將調(diào)度該任務(wù)以便(false)稍后執(zhí)行,并將它放入內(nèi)部的任務(wù)隊(duì)列(每個(gè)EventLoop都有它自己的任務(wù)隊(duì)列,SingleThreadEventLoop的源碼就能發(fā)現(xiàn)很多用于調(diào)度內(nèi)部任務(wù)隊(duì)列的方法),在下次處理它的事件時(shí),將會(huì)執(zhí)行隊(duì)列中的那些任務(wù)。這種設(shè)計(jì)可以讓任何線程與Channel直接交互,而無(wú)需在ChannelHandler中進(jìn)行額外的同步。從性能上來(lái)考慮,千萬(wàn)不要將一個(gè)需要長(zhǎng)時(shí)間來(lái)運(yùn)行的任務(wù)放入到任務(wù)隊(duì)列中,它會(huì)影響到該隊(duì)列中的其他任務(wù)的執(zhí)行。解決方案是使用一個(gè)專門的EventExecutor來(lái)執(zhí)行它(ChannelPipeline提供了帶有EventExecutorGroup參數(shù)的addXXX()方法,該方法可以將傳入的ChannelHandler綁定到你傳入的EventExecutor之中),這樣它就會(huì)在另一條線程中執(zhí)行,與其他任務(wù)隔離。public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
.....
public void execute(Runnable task) {
if(task == null) {
throw new NullPointerException("task");
} else {
boolean inEventLoop = this.inEventLoop();
if(inEventLoop) {
this.addTask(task);
} else {
this.startThread();
this.addTask(task);
if(this.isShutdown() && this.removeTask(task)) {
reject();
}
}
if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
this.wakeup(inEventLoop);
}
}
}
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
.....
}Bootstrap在深入了解地Netty的核心組件之后,發(fā)現(xiàn)它們的設(shè)計(jì)都很模塊化,如果想要實(shí)現(xiàn)你自己的應(yīng)用程序,就需要將這些組件組裝到一起。Netty通過(guò)Bootstrap類,以對(duì)一個(gè)Netty應(yīng)用程序進(jìn)行配置(組裝各個(gè)組件),并最終使它運(yùn)行起來(lái)。對(duì)于客戶端程序和服務(wù)器程序所使用到的Bootstrap類是不同的,后者需要使用ServerBootstrap,這樣設(shè)計(jì)是因?yàn)?,在如TCP這樣有連接的協(xié)議中,服務(wù)器程序往往需要一個(gè)以上的Channel,通過(guò)父Channel來(lái)接受來(lái)自客戶端的連接,然后創(chuàng)建子Channel用于它們之間的通信,而像UDP這樣無(wú)連接的協(xié)議,它不需要每個(gè)連接都創(chuàng)建子Channel,只需要一個(gè)Channel即可。一個(gè)比較明顯的差異就是Bootstrap與ServerBootstrap的group()方法,后者提供了一個(gè)接收2個(gè)EventLoopGroup的版本。// 該方法在Bootstrap的父類AbstractBootstrap中,泛型B為它當(dāng)前子類的類型(為了鏈?zhǔn)秸{(diào)用)
public B group(EventLoopGroup group) {
if(group == null) {
throw new NullPointerException("group");
} else if(this.group != null) {
throw new IllegalStateException("group set already");
} else {
this.group = group;
return this;
}
}
// ServerBootstrap中的實(shí)現(xiàn),它也支持只用一個(gè)EventLoopGroup
public ServerBootstrap group(EventLoopGroup group) {
return this.group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if(childGroup == null) {
throw new NullPointerException("childGroup");
} else if(this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
} else {
this.childGroup = childGroup;
return this;
}
}Bootstrap其實(shí)沒(méi)有什么可以好說(shuō)的,它就只是一個(gè)裝配工,將各個(gè)組件拼裝組合到一起,然后進(jìn)行一些配置,有關(guān)它的詳細(xì)API請(qǐng)參考
Netty JavaDoc。Echo示例下面我們將通過(guò)一個(gè)經(jīng)典的Echo客戶端與服務(wù)器的例子,來(lái)梳理一遍創(chuàng)建Netty應(yīng)用的流程。首先實(shí)現(xiàn)的是服務(wù)器,我們先實(shí)現(xiàn)一個(gè)EchoServerInboundHandler,處理入站消息。public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.printf("Server received: %s \n", in.toString(CharsetUtil.UTF_8));
// 由于讀事件不是一次性就能把完整消息發(fā)送過(guò)來(lái)的,這里并沒(méi)有調(diào)用writeAndFlush
ctx.write(in); // 直接把消息寫回給客戶端(會(huì)被出站消息處理器處理,不過(guò)我們的應(yīng)用沒(méi)有實(shí)現(xiàn)任何出站消息處理器)
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 等讀事件已經(jīng)完成時(shí),沖刷之前寫數(shù)據(jù)的緩沖區(qū)
// 然后添加了一個(gè)監(jiān)聽器,它會(huì)在Future完成時(shí)進(jìn)行關(guān)閉該Channel.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
// 處理異常,輸出異常信息,然后關(guān)閉Channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}服務(wù)器的應(yīng)用邏輯只有這么多,剩下就是用ServerBootstrap進(jìn)行配置了。public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler();
EventLoopGroup group = new NioEventLoopGroup(); // 傳輸類型使用NIO
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group) // 配置EventLoopGroup
.channel(NioServerSocketChannel.class) // 配置Channel的類型
.localAddress(new InetSocketAddress(port)) // 配置端口號(hào)
.childHandler(new ChannelInitializer() {
// 實(shí)現(xiàn)一個(gè)ChannelInitializer,它可以方便地添加多個(gè)ChannelHandler
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
// 綁定地址,同步等待它完成
ChannelFuture f = b.bind().sync();
// 關(guān)閉這個(gè)Future
f.channel().closeFuture().sync();
} finally {
// 關(guān)閉應(yīng)用程序,一般來(lái)說(shuō)Netty應(yīng)用只需要調(diào)用這個(gè)方法就夠了
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.printf(
"Usage: %s\n",
EchoServer.class.getSimpleName()
);
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
}接下來(lái)實(shí)現(xiàn)客戶端,同樣需要先實(shí)現(xiàn)一個(gè)入站消息處理器。public class EchoClientInboundHandler extends SimpleChannelInboundHandler{
/**
* 我們?cè)贑hannel連接到遠(yuǎn)程節(jié)點(diǎn)直接發(fā)送一條消息給服務(wù)器
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
// 輸出從服務(wù)器Echo的消息
System.out.printf("Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}然后配置客戶端。public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port)) // 服務(wù)器的地址
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientInboundHandler());
}
});
ChannelFuture f = b.connect().sync(); // 連接到服務(wù)器
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s\n", EchoClient.class.getSimpleName());
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}實(shí)現(xiàn)一個(gè)Netty應(yīng)用程序就是如此簡(jiǎn)單,用戶大多數(shù)都是在編寫各種應(yīng)用邏輯的ChannelHandler(或者使用Netty內(nèi)置的各種實(shí)用ChannelHandler),然后只需要將它們?nèi)刻砑拥紺hannelPipeline即可。參考文獻(xiàn)Netty: HomeChapter 6. I/O Multiplexing: The select and poll Functions - Shichao’s Notesepoll(7) - Linux manual pageJava NIONetty: HomeChapter 6. I/O Multiplexing: The select and poll Functions - Shichao’s Notesepoll(7) - Linux manual pageJava NIO

“基于NIO的網(wǎng)絡(luò)編程框架Netty有哪些組件”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI