您好,登錄后才能下訂單哦!
這篇文章主要介紹“Netty服務(wù)端啟動(dòng)源碼是什么”,在日常操作中,相信很多人在Netty服務(wù)端啟動(dòng)源碼是什么問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Netty服務(wù)端啟動(dòng)源碼是什么”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
示例從哪里來(lái)?任何開(kāi)源框架都會(huì)有自己的示例代碼,Netty源碼也不例外,如模塊netty-example
中就包括了最常見(jiàn)的EchoServer
示例,下面通過(guò)這個(gè)示例進(jìn)入服務(wù)端啟動(dòng)流程篇章。
public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // 1. 聲明Main-Sub Reactor模式線程池:EventLoopGroup // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 創(chuàng)建 EchoServerHandler 對(duì)象 final EchoServerHandler serverHandler = new EchoServerHandler(); try { // 2. 聲明服務(wù)端啟動(dòng)引導(dǎo)器,并設(shè)置相關(guān)屬性 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // 3. 綁定端口即啟動(dòng)服務(wù)端,并同步等待 // Start the server. ChannelFuture f = b.bind(PORT).sync(); // 4. 監(jiān)聽(tīng)服務(wù)端關(guān)閉,并阻塞等待 // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // 5. 優(yōu)雅地關(guān)閉兩個(gè)EventLoopGroup線程池 // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
[代碼行18、19]聲明Main-Sub Reactor
模式線程池:EventLoopGroup
創(chuàng)建兩個(gè) EventLoopGroup
對(duì)象。其中,bossGroup
用于服務(wù)端接受客戶端的連接,workerGroup
用于進(jìn)行客戶端的 SocketChannel
的數(shù)據(jù)讀寫(xiě)。
(<u>關(guān)于EventLoopGroup
不是本文重點(diǎn)所以在后續(xù)文章中進(jìn)行分析</u>)
[代碼行23-39]聲明服務(wù)端啟動(dòng)引導(dǎo)器,并設(shè)置相關(guān)屬性
AbstractBootstrap
是一個(gè)幫助類(lèi),通過(guò)方法鏈(method chaining
)的方式,提供了一個(gè)簡(jiǎn)單易用的方式來(lái)配置啟動(dòng)一個(gè)Channel
。io.netty.bootstrap.ServerBootstrap
,實(shí)現(xiàn) AbstractBootstrap
抽象類(lèi),用于 Server
的啟動(dòng)器實(shí)現(xiàn)類(lèi)。io.netty.bootstrap.Bootstrap
,實(shí)現(xiàn) AbstractBootstrap
抽象類(lèi),用于 Client
的啟動(dòng)器實(shí)現(xiàn)類(lèi)。如下類(lèi)圖所示:
![AbstractBootstrap類(lèi)繼承.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d877706356d4427a9afd4b13d7177142~tplv-k3u1fbpfcp-zoom-1.image)
(<u>在EchoServer
示例代碼中,我們看到 ServerBootstrap
的 group
、channel
、option
、childHandler
等屬性鏈?zhǔn)皆O(shè)置都放到關(guān)于AbstractBootstrap
體系代碼中詳細(xì)介紹。</u>)
[代碼行43]綁定端口即啟動(dòng)服務(wù)端,并同步等待
先調(diào)用 #bind(int port)
方法,綁定端口,后調(diào)用 ChannelFuture#sync()
方法,阻塞等待成功。對(duì)于bind
操作就是本文要詳細(xì)介紹的"服務(wù)端啟動(dòng)流程"。
[代碼行47]監(jiān)聽(tīng)服務(wù)端關(guān)閉,并阻塞等待
先調(diào)用 #closeFuture()
方法,監(jiān)聽(tīng)服務(wù)器關(guān)閉,后調(diào)用 ChannelFuture#sync()
方法,阻塞等待成功。 注意,此處不是關(guān)閉服務(wù)器,而是channel
的監(jiān)聽(tīng)關(guān)閉。
[代碼行51、52]優(yōu)雅地關(guān)閉兩個(gè)EventLoopGroup
線程池
finally
代碼塊中執(zhí)行說(shuō)明服務(wù)端將最終關(guān)閉,所以調(diào)用 EventLoopGroup#shutdownGracefully()
方法,分別關(guān)閉兩個(gè)EventLoopGroup
對(duì)象,終止所有線程。
在服務(wù)啟動(dòng)過(guò)程的源碼分析之前,這里回顧一下我們?cè)谕ㄟ^(guò)JDK NIO
編程在服務(wù)端啟動(dòng)初始的代碼:
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
這5行代碼標(biāo)示一個(gè)最為熟悉的過(guò)程:
打開(kāi)serverSocketChannel
配置非阻塞模式
為channel
的socket
綁定監(jiān)聽(tīng)端口
創(chuàng)建Selector
將serverSocketChannel
注冊(cè)到 selector
后面等分析完Netty
的啟動(dòng)過(guò)程后,會(huì)對(duì)這些步驟有一個(gè)新的認(rèn)識(shí)。在EchoServer
示例中,進(jìn)入 #bind(int port)
方法,AbstractBootstrap#bind()
其實(shí)有多個(gè)方法,方便不同地址參數(shù)的傳遞,實(shí)際調(diào)用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress)
方法,代碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
[代碼行2] :調(diào)用 #initAndRegister()
方法,初始化并注冊(cè)一個(gè) Channel
對(duì)象。因?yàn)樽?cè)是異步的過(guò)程,所以返回一個(gè) ChannelFuture
對(duì)象。詳細(xì)解析,見(jiàn) 「initAndRegister()
」。
[代碼行4-6]]:若發(fā)生異常,直接進(jìn)行返回。
[代碼行9-34]:因?yàn)樽?cè)是異步的過(guò)程,有可能已完成,有可能未完成。所以實(shí)現(xiàn)代碼分成了【第 10 至 14 行】和【第 15 至 36 行】分別處理已完成和未完成的情況。
核心在[第 11 、29行],調(diào)用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)
方法,綁定 Channel 的端口,并注冊(cè) Channel 到 SelectionKey
中。
如果異步注冊(cè)對(duì)應(yīng)的 ChanelFuture
未完成,則調(diào)用 ChannelFuture#addListener(ChannelFutureListener)
方法,添加監(jiān)聽(tīng)器,在注冊(cè)完成后,進(jìn)行回調(diào)執(zhí)行 #doBind0(...)
方法的邏輯。
通過(guò)doBind
方法可以知道服務(wù)端啟動(dòng)流程大致如下幾個(gè)步驟:
從#doBind(final SocketAddress localAddress)
進(jìn)入到initAndRegister()
:
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
[代碼行4]調(diào)用 ChannelFactory#newChannel()
方法,創(chuàng)建Channel
對(duì)象。 ChannelFactory
類(lèi)繼承如下:
可以在ChannelFactory
注釋看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.
,這里只是包名的調(diào)整,對(duì)于繼承結(jié)構(gòu)不變。netty
默認(rèn)使用ReflectiveChannelFactory
,我們可以看到重載方法:
@Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }
很明顯,正如其名是通過(guò)反射機(jī)制構(gòu)造Channel
對(duì)象實(shí)例的。constructor
是在其構(gòu)造方法初始化的:this.constructor = clazz.getConstructor();
這個(gè)clazz
按理說(shuō)應(yīng)該是我們要?jiǎng)?chuàng)建的Channel
的Class對(duì)象。那Class
對(duì)象是什么呢?我們接著看channelFactory
是怎么初始化的。
首先在AbstractBootstrap
找到如下代碼:
@Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
調(diào)用這個(gè)方法的遞推向上看到:
public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
這個(gè)方法正是在EchoServer
中ServerBootstrap
鏈?zhǔn)皆O(shè)置時(shí)調(diào)用.channel(NioServerSocketChannel.class)
的方法。我們看到,channelClass
就是NioServerSocketChannel.class
,channelFactory
也是以ReflectiveChannelFactory
作為具體實(shí)例,并且將NioServerSocketChannel.class
作為構(gòu)造參數(shù)傳遞初始化的,所以這回答了反射機(jī)制構(gòu)造的是io.netty.channel.socket.nio.NioServerSocketChannel
對(duì)象。
繼續(xù)看NioServerSocketChannel
構(gòu)造方法邏輯做了什么事情,看之前先給出NioServerSocketChannel
類(lèi)繼承關(guān)系:
NioServerSocketChannel
與NioSocketChannel
分別對(duì)應(yīng)服務(wù)端和客戶端,公共父類(lèi)都是AbstractNioChannel
和AbstractChannel
,下面介紹創(chuàng)建過(guò)程可以參照這個(gè)Channel
類(lèi)繼承圖。進(jìn)入NioServerSocketChannel
構(gòu)造方法:
/** * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
點(diǎn)擊newSocket
進(jìn)去:
private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
以上傳進(jìn)來(lái)的provider
是DEFAULT_SELECTOR_PROVIDER
即默認(rèn)的java.nio.channels.spi.SelectorProvider
,[代碼行9]就是熟悉的jdk nio
創(chuàng)建ServerSocketChannel
。這樣newSocket(DEFAULT_SELECTOR_PROVIDER)
就返回了結(jié)果ServerSocketChannel
,回到NioServerSocketChannel()#this()
點(diǎn)進(jìn)去:
/** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
以上super
代表父類(lèi)AbstractNioMessageChannel
構(gòu)造方法,點(diǎn)進(jìn)去看到:
/** * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int) */ protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); }
以上super
代表父類(lèi)AbstractNioChannel
構(gòu)造方法,點(diǎn)進(jìn)去看到:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn("Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
以上[代碼行3]將ServerSocketChannel
保存到了AbstractNioChannel#ch
成員變量,在上面提到的NioServerSocketChannel
構(gòu)造方法的[代碼行6]javaChannel()
拿到的就是ch
保存的ServerSocketChannel
變量。
以上[代碼行6]就是熟悉的jdk nio
編程設(shè)置ServerSocketChannel
非阻塞方式。這里還有super
父類(lèi)構(gòu)造方法,點(diǎn)擊進(jìn)去看到:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
以上構(gòu)造方法中:
parent
屬性,代表父 Channel
對(duì)象。對(duì)于NioServerSocketChannel
的 parent
為null
。
id
屬性,Channel
編號(hào)對(duì)象。在構(gòu)造方法中,通過(guò)調(diào)用 #newId()
方法進(jìn)行創(chuàng)建。(<u>這里不細(xì)展開(kāi)Problem-1</u>)
unsafe
屬性,Unsafe
對(duì)象。因?yàn)?code>Channel 真正的具體操作,是通過(guò)調(diào)用對(duì)應(yīng)的 Unsafe
對(duì)象實(shí)施。所以需要在構(gòu)造方法中,通過(guò)調(diào)用 #newUnsafe()
方法進(jìn)行創(chuàng)建。這里的 Unsafe
并不是我們常說(shuō)的 jdk
自帶的sun.misc.Unsafe
,而是 io.netty.channel.Channel#Unsafe
。(<u>這里不細(xì)展開(kāi)Problem-2</u>)
pipeline
屬性默認(rèn)是DefaultChannelPipeline
對(duì)象,賦值后在后面為channel綁定端口的時(shí)候會(huì)用到
通過(guò)以上創(chuàng)建channel
源碼過(guò)程分析,總結(jié)的流程時(shí)序圖如下:
回到一開(kāi)始創(chuàng)建Channel
的initAndRegister()
入口方法,在創(chuàng)建Channel
后緊接著init(channel)
進(jìn)入初始化流程,因?yàn)槭欠?wù)端初始化,所以是ServerBootstrap#init(Channel channel)
,代碼如下:
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
[代碼 3 - 6 行]: options0()
方法返回的options
保存了用戶在EchoServer
中設(shè)置自定義的可選項(xiàng)集合,這樣ServerBootstrap
將配置的選項(xiàng)集合,設(shè)置到了 Channel
的可選項(xiàng)集合中。
[代碼 8 - 15 行]: attrs0()
方法返回的attrs
保存了用戶在EchoServer
中設(shè)置自定義的屬性集合,這樣ServerBootstrap
將配置的屬性集合,設(shè)置到了 Channel
的屬性集合中。
[代碼21-28行]:通過(guò)局部變量currentChildOptions
和currentChildAttrs
保存了用戶自定義的childOptions
和childAttrs
,用于[代碼43行] ServerBootstrapAcceptor
構(gòu)造方法。
[代碼30-47]]:創(chuàng)建ChannelInitializer
對(duì)象,添加到 pipeline
中,用于后續(xù)初始化 ChannelHandler
到 pipeline
中,包括用戶在EchoServer
配置的LoggingHandler
和創(chuàng)建的創(chuàng)建 ServerBootstrapAcceptor
對(duì)象。
[代碼行34-37]:添加啟動(dòng)器配置的 LoggingHandler
到pipeline
中。
[代碼行39-45]:創(chuàng)建 ServerBootstrapAcceptor
對(duì)象,添加到 pipeline
中。從名字上就可以看出來(lái),ServerBootstrapAcceptor
也是一個(gè) ChannelHandler
實(shí)現(xiàn)類(lèi),專(zhuān)門(mén)用于接受客戶端的新連接請(qǐng)求,把新的請(qǐng)求扔給某個(gè)事件循環(huán)器,我們先不做過(guò)多分析。我們發(fā)現(xiàn)是使用EventLoop.execute
執(zhí)行添加的過(guò)程,這是為什么呢?同樣記錄問(wèn)題(<u>Problem-</u>3)
需要說(shuō)明的是pipeline
在之前介紹Netty核心組件的時(shí)候提到是一個(gè)包含ChannelHandlerContext
的雙向鏈表,每一個(gè)context
對(duì)于唯一一個(gè)ChannelHandler
,這里初始化后,ChannelPipeline
里就是如下一個(gè)結(jié)構(gòu):
初始化Channel
一些基本配置和屬性完畢后,回到一開(kāi)始創(chuàng)建Channel
的initAndRegister()
入口方法,在初始化Channel
后緊接著[代碼行17] ChannelFuture regFuture = config().group().register(channel);
明顯這里是通過(guò)EventLoopGroup
進(jìn)入注冊(cè)流程(EventLoopGroup
體系將在后續(xù)文章講解)
在EchoServer
中啟動(dòng)器同樣通過(guò)ServerBootstrap#group()
設(shè)置了NioEventLoopGroup
,它繼承自MultithreadEventLoopGroup
,所以注冊(cè)流程會(huì)進(jìn)入MultithreadEventLoopGroup
重載的register(Channel channel)
方法,代碼如下:
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
這里會(huì)調(diào)用 next()
方法選擇出來(lái)一個(gè) EventLoop
來(lái)注冊(cè) Channel
,里面實(shí)際上使用的是一個(gè)叫做 EventExecutorChooser
的東西來(lái)選擇,它實(shí)際上又有兩種實(shí)現(xiàn)方式 ——PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
,本質(zhì)上就是從 EventExecutor
數(shù)組中選擇一個(gè) EventExecutor
,我們這里就是 NioEventLoop
,那么,它們有什么區(qū)別呢?(<u>Problem-4:在介紹EventLoopGroup
體系的后續(xù)文章中將會(huì)詳細(xì)講解,這里簡(jiǎn)單地提一下,本質(zhì)都是按數(shù)組長(zhǎng)度取余數(shù) ,不過(guò),2 的 N 次方的形式更高效。</u>)
接著,來(lái)到 NioEventLoop
的 register(channel)
方法,你會(huì)不會(huì)問(wèn)找不到該方法?提示NioEventLoop
繼承SingleThreadEventLoop
,所以父類(lèi)方法:
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
可以看到,先創(chuàng)建了一個(gè)叫做 ChannelPromise
的東西,它是 ChannelFuture
的子類(lèi)。[代碼行9]又調(diào)回了 Channel
的 Unsafe
的 register ()
方法,這里第一個(gè)參數(shù)是 this
,也就是 NioEventLoop
,第二個(gè)參數(shù)是剛創(chuàng)建的 ChannelPromise
。
點(diǎn)擊 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)
方法進(jìn)去,代碼如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
[代碼行15]這行代碼是設(shè)置 Channel
的 eventLoop
屬性。這行前面的代碼主要是在校驗(yàn)傳入的 eventLoop
參數(shù)非空,校驗(yàn)是否有注冊(cè)過(guò)以及校驗(yàn) Channel
和 eventLoop
類(lèi)型是否匹配。
[代碼18、24]接著,跟蹤到 AbstractUnsafe#register0(ChannelPromise promise)
方法中:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
[代碼行9]進(jìn)入 AbstractNioChannel#doRegister()
方法:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
[代碼行5]關(guān)鍵一行代碼,將 Java 原生NIO Selector
與 Java 原生 NIO
的 Channel
對(duì)象(ServerSocketChannel
) 綁定在一起,并將當(dāng)前 Netty 的Channel
通過(guò) attachment
的形式綁定到 SelectionKey
上:
調(diào)用 #unwrappedSelector()
方法,返回 Java 原生 NIO Selector
對(duì)象,而且每個(gè)NioEventLoop
與Selector
唯一一對(duì)應(yīng)。
調(diào)用 SelectableChannel#register(Selector sel, int ops, Object att)
方法,注冊(cè) Java 原生NIO
的 Channel
對(duì)象到 NIO Selector
對(duì)象上。
通過(guò)以上注冊(cè)channel源碼分析,總結(jié)流程的時(shí)序圖如下:
注冊(cè)完Channel
最后回到AbstractBootstrap#doBind()
方法,分析 Channel
的端口綁定邏輯。進(jìn)入doBind0
代碼如下:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
[代碼行7]:在前面Channel
注冊(cè)成功的條件下,調(diào)用 EventLoop
執(zhí)行 Channel
的端口綁定邏輯。但是,實(shí)際上當(dāng)前線程已經(jīng)是 EventLoop
所在的線程了,為何還要這樣操作呢?答案在【第 5 至 6 行】的英語(yǔ)注釋?zhuān)@里作為一個(gè)問(wèn)題記著(<u>Problem-5</u>)。
[代碼行11]:進(jìn)入AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
,同樣立即異步返回并添加ChannelFutureListener.CLOSE_ON_FAILURE
監(jiān)聽(tīng)事件。
[代碼行13]:如果綁定端口之前的操作并沒(méi)有成功,自然也就不能進(jìn)行端口綁定操作了,通過(guò)promise記錄異常原因。
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
方法如下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
pipeline
是之前創(chuàng)建channel
的時(shí)候創(chuàng)建的DefaultChannelPipeline
,進(jìn)入該方法:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
[在分析初始化流程的時(shí)候最后畫(huà)一個(gè)DefaultChannelPipeline
內(nèi)部的結(jié)構(gòu),能夠便于分析后面進(jìn)入DefaultChannelPipeline
一系列bind
方法。]
首先,tail
代表TailContext
,進(jìn)入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { //省略部分代碼 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
[代碼行3]:findContextOutbound
方法里主要是執(zhí)行ctx = ctx.prev;
那么得到的next
就是綁定LoggingHandler
的context
[代碼行6]:進(jìn)入invokeBind(localAddress, promise)
方法并直接執(zhí)行LoggingHandler#bind(this, localAddress, promise)
,進(jìn)入后的方法如下:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND", localAddress)); } ctx.bind(localAddress, promise); }
設(shè)置了LoggingHandler
的日志基本級(jí)別為默認(rèn)的INFO后,進(jìn)行綁定操作的信息打印。接著,繼續(xù)循環(huán)到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
方法執(zhí)行ctx = ctx.prev
取出HeadContext
進(jìn)入到bind方法:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); }
兜兜轉(zhuǎn)轉(zhuǎn),最終跳出了pipeline
輪回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise)
方法,Channel 的端口綁定邏輯。代碼如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //此處有省略... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } //此處有省略... }
做實(shí)事方法doBind
進(jìn)入后如下:
@Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
到了此處,服務(wù)端的 Java 原生 NIO ServerSocketChannel
終于綁定上了端口。
Problem-1: 創(chuàng)建Channel
流程中AbstractChannel
構(gòu)造函數(shù)中為channel
分配ID的算法如何實(shí)現(xiàn)?
Problem-2: AbstractChannel
內(nèi)部類(lèi)AbstractUnsafe
的作用?
Problem-3: 初始化channel
流程中pipeline
添加ServerBootstrapAcceptor
是通過(guò)EventLoop.execute
執(zhí)行添加的過(guò)程,這是為什么呢?
Problem-4:注冊(cè)channel
流程中PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
的區(qū)別和優(yōu)化原理?
Problem-5:綁定端口流程中調(diào)用 EventLoop
執(zhí)行 Channel
的端口綁定邏輯。但是,實(shí)際上當(dāng)前線程已經(jīng)是 EventLoop
所在的線程了,為何還要這樣操作呢?
到此,關(guān)于“Netty服務(wù)端啟動(dòng)源碼是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。