溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Netty是如何綁定端口和啟動(dòng)服務(wù)的

發(fā)布時(shí)間:2021-12-18 17:32:25 來(lái)源:億速云 閱讀:146 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“Netty是如何綁定端口和啟動(dòng)服務(wù)的”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

服務(wù)端啟動(dòng)DEMO

      先從一個(gè)簡(jiǎn)單的服務(wù)端啟動(dòng)`DEMO`開(kāi)始,以下是一個(gè)標(biāo)準(zhǔn)的`Netty`服務(wù)端代碼

public final class NettyServer {public static void main(String[] args) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) {                            ChannelPipeline channelPipeline = channel.pipeline();                            channelPipeline.addLast("decoder", new StringDecoder());                            channelPipeline.addLast("encoder", new StringEncoder());                            channelPipeline.addLast("handler", new ServerHandler());                        }                    });            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();            channelFuture.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}br

注:ServerBootstrap.childHandler()用于指定處理新連接數(shù)據(jù)的讀寫(xiě)處理邏輯,同時(shí)ServerBootstrap還提供handler()用于指定在服務(wù)端啟動(dòng)過(guò)程中的一些邏輯,通常情況下我們用不著這個(gè)方法

`ServerHandler`代碼如下:

public class ServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) {        System.out.println("channelActive");    }    @Override    public void channelRegistered(ChannelHandlerContext ctx) {        System.out.println("channelRegistered");    }    @Override    public void handlerAdded(ChannelHandlerContext ctx) {        System.out.println("handlerAdded");    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelReadComplete");    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelInactive");    }    @Override    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("service receive msg:" + msg);    }}br

 當(dāng)有新連接接入時(shí),控制臺(tái)打印出

reStructuredTexthandlerAddedchannelRegisteredchannelActivebr

     但接收到新消息時(shí),控制臺(tái)打印出

reStructuredTextservice receive msg:xxxchannelReadCompletebr

       本文主要分析服務(wù)端的啟動(dòng)過(guò)程,而新連接接入 新消息的讀取會(huì)在后續(xù)章節(jié)中說(shuō)明

服務(wù)端啟動(dòng)源碼分析

`ServerBootstrap`是`Netty`為方便開(kāi)發(fā)者使用而設(shè)計(jì)的一個(gè)啟動(dòng)類(lèi),其核心代碼入口在`bind()`,代碼如下

public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}//通過(guò)端口號(hào)創(chuàng)建一個(gè)`InetSocketAddress`,然后繼續(xù)調(diào)用重載的`bind()`public ChannelFuture bind(SocketAddress localAddress) {// ...return doBind(localAddress);}br

由于博客篇幅有限所以有些健壯性分支會(huì)以`// ...`略過(guò),健壯性分支不會(huì)影響對(duì)`Netty`主流程的理解。

`Netty`服務(wù)端啟動(dòng)最后會(huì)調(diào)用到`doBind()`方法,代碼如下

private ChannelFuture doBind(final SocketAddress localAddress) {//...final ChannelFuture regFuture = initAndRegister();//...final Channel channel = regFuture.channel();//...    doBind0(regFuture, channel, localAddress, promise);//...return promise;br

在`doBind()`中我們關(guān)注兩個(gè)核心方法`initAndRegister()`以及`doBind0()`

服務(wù)端`Channel`創(chuàng)建

final ChannelFuture initAndRegister() {    Channel channel = null;// 新建Channel    channel = channelFactory.newChannel();// 初始化Channelinit(channel);// 將這個(gè)Channel Register到某個(gè)對(duì)象    ChannelFuture regFuture = config().group().register(channel);return regFuture;}br

`Channel`是`Netty`的核心概念之一,它是`Netty`網(wǎng)絡(luò)通信的主體由它負(fù)責(zé)同對(duì)端進(jìn)行網(wǎng)絡(luò)通信、注冊(cè)和數(shù)據(jù)操作等功能。

`Channel`的創(chuàng)建是由`channelFactory.newChannel()`完成的,接下來(lái)跟蹤`channelFactory`是在何時(shí)被初始化,我們層層回溯最終發(fā)現(xiàn)是在這個(gè)函數(shù)中

public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");    }return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}br

在`Demo`程序調(diào)用`.channel()`方法并傳入`NioServerSocketChannel.class`

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz;public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");        }this.clazz = clazz;    }@Overridepublic T newChannel() {try {return clazz.newInstance();        } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);        }    }}br

即在`Netty`服務(wù)端啟動(dòng)的時(shí)候通過(guò)反射方式(調(diào)用默認(rèn)構(gòu)造函數(shù))來(lái)創(chuàng)建一個(gè)`NioServerSocketChannel`對(duì)象,

加下來(lái)我們繼續(xù)跟進(jìn)`NioServerSocketChannel`的默認(rèn)構(gòu)造函數(shù)

public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static ServerSocketChannel newSocket(SelectorProvider provider) {try {// 利用SelectorProvider產(chǎn)生一個(gè)ServerSocketChannel對(duì)象return provider.openServerSocketChannel();    } catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);    }}br

通過(guò)`newSocket(DEFAULT_SELECTOR_PROVIDER)`創(chuàng)建一條`server`端`channel`,然后進(jìn)入到以下方法

public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);    config = new NioServerSocketChannelConfig(this, javaChannel().socket());}br

該方法主要完成兩個(gè)功能,首先是調(diào)用父類(lèi)的構(gòu)造方法然后初始化`NioServerSocketChannelConfig`屬性

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;// 設(shè)置SelectionKey.OP_ACCEPT事件this.readInterestOp = readInterestOp;// 設(shè)置ServerSocketChannel為非阻塞的    ch.configureBlocking(false);}br

這里將前面通過(guò)`provider.openServerSocketChannel()`創(chuàng)建出來(lái)的`ServerSocketChannel`保存到成員變量,然后調(diào)用將該`channel`為非阻塞模式,這是個(gè)標(biāo)準(zhǔn)的`JDK NIO`編程的玩法。這里的`readInterestOp`即前面層層傳入的`SelectionKey.OP_ACCEPT`,接下來(lái)繼續(xù)跟進(jìn)`super(parent)`

protected AbstractChannel(Channel parent) {this.parent = parent;    id = newId();unsafe = newUnsafe();    pipeline = newChannelPipeline();}br

在`AbstractChannel`的構(gòu)造方法中主要是初始化了`id`,`unsafe`,`pipeline`屬性

 服務(wù)端`Channel`初始化

在創(chuàng)建完`Channel`后,我們?cè)赻init`方法中對(duì)`Channel`進(jìn)行初始化操作,代碼如下

void init(Channel channel) throws Exception {// 給channel設(shè)置optionfinal Map<ChannelOption<?>, Object> options = options0();synchronized (options) {        channel.config().setOptions(options);    }// 給channel設(shè)置attrfinal Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();            channel.attr(key).set(e.getValue());        }    }    ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {// 設(shè)置新接入channel的options        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));    }synchronized (childAttrs) {// 設(shè)置新接入channel的attrs        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));    }    p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 設(shè)置handler到pipeline上// 這里的handler()返回的就是.handler()所設(shè)置的值            ChannelHandler handler = config.handler();if (handler != null) {                pipeline.addLast(handler);            }            ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// p.addLast()向serverChannel的流水線(xiàn)處理器中加入了一個(gè)ServerBootstrapAcceptor// 從名字上就可以看出來(lái)這是一個(gè)接入器,專(zhuān)門(mén)接受新請(qǐng)求,把新的請(qǐng)求扔給某個(gè)事件循環(huán)器                    pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                }            });        }    });}br

以上代碼主要完成如下功能:

  • 為`channel`設(shè)置`option`及`attr`
  • 初始化服務(wù)端`channel`的`pipeline`
  • 添加自定義`handler`
  • 添加`ServerBootstrapAcceptor`(請(qǐng)求接入器)
`NioEventLoop.execute()`方法為`Netty Reactor`線(xiàn)程執(zhí)行的入口,關(guān)于`Netty Reactor`線(xiàn)程我們將在下一篇中介紹。我們總結(jié)一下發(fā)現(xiàn)代碼執(zhí)行到這里`Netty`并未真正啟動(dòng)服務(wù),只是初始化了一些基本的配置和屬性以及在`pipeline`上加入了一個(gè)接入器用來(lái)專(zhuān)門(mén)接受新連接
 將`Channel`注冊(cè)到事件輪詢(xún)器
完成`Channel`注冊(cè)的代碼如下
ChannelFuture regFuture = config().group().register(channel);```此處`config().group()`返回的對(duì)象為`NioEventLoopGroup````java@Overridepublic ChannelFuture register(Channel channel) {// 調(diào)用了NioEventLoop對(duì)象中的register方法// NioEventLoop extends SingleThreadEventLoopreturn next().register(channel);}br
在`next`方法中返回一個(gè)`EventLoop`對(duì)象,每一個(gè)`EventLoop`都與一個(gè)`selector`綁定
在之前的代碼中`EventLoop`中的`Selector`一直沒(méi)有任何`Channel`注冊(cè),所以每次`select`操作都是空,但從這行代碼開(kāi)始這個(gè)`selector`中開(kāi)始有`Channel`注冊(cè)
public ChannelFuture register(Channel channel) {    return register(new DefaultChannelPromise(channel, this));}public ChannelFuture register(final ChannelPromise promise) {    ObjectUtil.checkNotNull(promise, "promise");    promise.channel().unsafe().register(this, promise);    return promise;}br
這里可以看到`register`操作是委托給`Channel`中的`Unsafe`對(duì)象來(lái)執(zhí)行的,這里的`Unsafe`對(duì)象對(duì)上文稍有印象的同學(xué)應(yīng)該能知道這個(gè)就是創(chuàng)建`NioServerSocketChannel`的時(shí)候創(chuàng)建的`Unsafe`對(duì)象
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// ...    AbstractChannel.this.eventLoop = eventLoop;// ...    register0(promise);}br
先將`EventLoop`事件循環(huán)器綁定到該`NioServerSocketChannel`上,然后調(diào)用`register0()`代碼如下
 
private void register0(ChannelPromise promise) {    try {        boolean firstRegistration = neverRegistered;        doRegister();        neverRegistered = false;        registered = true;        pipeline.invokeHandlerAddedIfNeeded();        safeSetSuccess(promise);        pipeline.fireChannelRegistered();        if (isActive()) {            if (firstRegistration) {                pipeline.fireChannelActive();            } else if (config().isAutoRead()) {                beginRead();            }        }    } catch (Throwable t) {        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}//這一段其實(shí)也很清晰,先調(diào)用`doRegister()`進(jìn)行注冊(cè)protected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            // ...        }    }}br
在這里我們終于看到`JDK`底層的`Channel`注冊(cè)到`Selector`的過(guò)程,但是這里的`OPS`為0即不關(guān)心任何事件,而我們期望`OPS`的值為`SelectionKey.OP_ACCEPT`,所以到了這里代碼還沒(méi)有結(jié)束。在執(zhí)行完`Channel`注冊(cè)后接著執(zhí)行了幾個(gè)`pipeline`相關(guān)的方法,我們后面詳細(xì)剖析`pipeline`的時(shí)候再講
服務(wù)端`Channel`端口綁定
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {    channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}//在`dobind0()`方法中通過(guò)`EventLoop`執(zhí)行一個(gè)任務(wù),接下來(lái)我們進(jìn)入到`channel.bind()`方法public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}br
       關(guān)于`Pipeline`相關(guān)的內(nèi)容將在后續(xù)博客中介紹,當(dāng)前一個(gè)比較好的方式就是`Debug`單步進(jìn)入。
      跟蹤調(diào)用鏈最后我們來(lái)到了`DefaultChannelPipeline.HeadContext`的`bind()`,代碼如下
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {unsafe.bind(localAddress, promise);}//這里的`unsafe`就是前面提到的`AbstractUnsafe`, 準(zhǔn)確點(diǎn)應(yīng)該是`NioMessageUnsafe`@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ...    boolean wasActive = isActive();// ...    doBind(localAddress);if (!wasActive && isActive()) {        invokeLater(new Runnable() {            @Overridepublic void run() {                pipeline.fireChannelActive();            }        });    }    safeSetSuccess(promise);}//在`doBind`方法中完成綁定操作,代碼如下protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {        javaChannel().bind(localAddress, config.getBacklog());    } else {        javaChannel().socket().bind(localAddress, config.getBacklog());    }}br

       最終調(diào)用到了`JDK`里面的`bind`方法真正進(jìn)行了端口的綁定。按照正常流程我們前面已經(jīng)分析到`isActive()`方法返回`false`,進(jìn)入到`doBind()`之后如果`channel`被激活了,就發(fā)起`pipeline.fireChannelActive()`調(diào)用

public void channelActive(ChannelHandlerContext ctx) throws Exception {    ctx.fireChannelActive();    readIfIsAutoRead();}//`pipeline.channelActive`會(huì)逐一調(diào)用`pipeline`中每一個(gè)節(jié)點(diǎn)的`channelActive`方法,所以`HeadContext`的`channelActive`將會(huì)被調(diào)用,即`readIfIsAutoRead`方法將會(huì)被調(diào)用public void channelActive(ChannelHandlerContext ctx) throws Exception {    ctx.fireChannelActive();    readIfIsAutoRead();}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {        channel.read();    }}//最終這個(gè)方法會(huì)調(diào)用到`AbstractNioChannel`的`doBeginRead`方法protected void doBeginRead() throws Exception {final SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;    }    readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {        selectionKey.interestOps(interestOps | readInterestOp);    }}br
     在最后一行中的`readInterestOp`
    即在上文中提到的`SelectionKey.OP_ACCEPT`,即讓事件輪詢(xún)器關(guān)注`Accept`事件,至此完成了`Channel`對(duì)`ACCEPT`事件的注冊(cè)過(guò)程
總結(jié)
      到目前為止我們看到的代碼相當(dāng)于傳統(tǒng)NIO編程中的如下代碼
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 建NioServerSocketChannelserverSocketChannel.configureBlocking(false);//AbstractNioChannel中ch.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));// NioServerSocketChannel.doBind()Selector selector = Selector.open();// NioEventLoop.openSelector()serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) //  AbstractNioChannel.doBeginRead()br
      服務(wù)端啟動(dòng)完成的主要功能為創(chuàng)建一個(gè)`Channel`,并且將`Channel`注冊(cè)到`NioEventLoop`的`Selector`上.

“Netty是如何綁定端口和啟動(dòng)服務(wù)的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI