您好,登錄后才能下訂單哦!
本文小編為大家詳細(xì)介紹“netty服務(wù)端輔助類ServerBootstrap如何創(chuàng)建”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“netty服務(wù)端輔助類ServerBootstrap如何創(chuàng)建”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學(xué)習(xí)新知識吧。
ServerBootstrap 為 netty 建立服務(wù)端的輔助類, 以 NIO為例,創(chuàng)建代碼如下:
public static void main(String[] args) throws Exception { ServerBootstrap bs = new ServerBootstrap(); bs.group(new NioEventLoopGroup(1), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(65535)) .addLast(new Controller()); } }).bind(8080).sync().channel().closeFuture().sync(); }
//配置屬性,如 SO_KEEPALIVE 等private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); //acceot 的 子channel所綁定的 事件循環(huán)組" private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler;
主要為 綁定本地端口 -> 注冊自身到 EventLoop , 并注冊 accept 和 read 事件 -> EventLoop的主循環(huán)中會不斷的select注冊的channel的事件,并處理。
核心邏輯位于
io.netty.bootstrap.AbstractBootstrap.doBind(SocketAddress) 和 io.netty.bootstrap.AbstractBootstrap.initAndRegister()中
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); ..........if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); //綁定邏輯 doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
先來看 initAndRegister , 核心邏輯就是利用channelFactory初始化一個(gè)NioServerSocketChannel實(shí)例,并為其設(shè)置上config中的參數(shù),然后將其注冊到EventLoop中,實(shí)際上是委托的channel的Unsafe來實(shí)現(xiàn)注冊的,核心邏輯位于 AbstractUnsafe.register0 中 完成注冊
final ChannelFuture initAndRegister() { Channel channel = null; try { //本例子中實(shí)際調(diào)用的是 NioServerSocketChannel的構(gòu)造參數(shù), 并為其設(shè)置感興趣的事件類型為 OP_ACCEPT channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
void init(Channel channel) throws Exception { //設(shè)置屬性 .......... p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { //為NioServerSocketChannel 設(shè)置一個(gè) 默認(rèn)的 channelhandler : ServerBootstrapAcceptor , 當(dāng)發(fā)生 accept事件時(shí),將 accept的channel注冊到 childEventLoop中 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //執(zhí)行channel到 eventloop的 selector doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise);
//觸發(fā) InboundChannelHnader.channelRegistered 事件 pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { //觸發(fā)channelActive事件,并會為 channel 綁定上 read 事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
initAndRegister注冊成功后,開始執(zhí)行真正的綁定端口邏輯,核心邏輯位于 NioSocketChannel.doBind0(SocketAddress) 中
private void doBind0(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { SocketUtils.bind(javaChannel(), localAddress); } else { SocketUtils.bind(javaChannel().socket(), localAddress); } }
至此 綁定個(gè)成功, 當(dāng)觸發(fā) ACCEPT 事件時(shí), 會觸發(fā) NioServerSocketChannel.doReadMessages -> ServerBootstrapAcceptor.channelRead , 并將 子channel 注冊到 childEventLoop中
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //注冊channel childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
讀到這里,這篇“netty服務(wù)端輔助類ServerBootstrap如何創(chuàng)建”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識點(diǎn)還需要大家自己動手實(shí)踐使用過才能領(lǐng)會,如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。