溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

58. Netty源代碼分析-ServerBootstrap bind 過(guò)程-1

發(fā)布時(shí)間:2020-10-21 22:31:53 來(lái)源:網(wǎng)絡(luò) 閱讀:6119 作者:rongwei84n 欄目:軟件技術(shù)

一. 開始

接上一篇 ServerBootstrap的初始化
https://blog.51cto.com/483181/2119149

二. bind過(guò)程

2.1 代碼

先看下調(diào)用的源代碼

public void bind(int port) throws Exception {
        ...
        try {
            ...

            ChannelFuture f = b.bind(port).sync(); //bind過(guò)程
            ...
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

2.2 bind

public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
}

從上面代碼可以看出幾點(diǎn):

  1. bind方法邏輯很簡(jiǎn)單,經(jīng)過(guò)一系列的判斷后最后調(diào)用doBind()方法
  2. 發(fā)現(xiàn)Netty代碼里面,從外面調(diào)用進(jìn)去后,內(nèi)部方法一般用doxxx,xxx0這種命名;以前自己看安卓源代碼的時(shí)候,安卓一般喜歡用xxxInner的命名。風(fēng)格而已,也許自己以后寫代碼可以參考(看源代碼除了了解原理外,學(xué)習(xí)別人的代碼架構(gòu)方法也是一種收獲)。

繼續(xù)看doBind

2.3 doBind

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); //1. init和register
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) { 
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

上面這一段代碼包含的東西就比較多了,先來(lái)看 initAndRegister

2.4 initAndRegister

顧名思義,這個(gè)方法包含初始化和注冊(cè)兩個(gè)步驟,代碼如下:

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            ...
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

從上面代碼,我們可以看到幾點(diǎn):

  1. channel = channelFactory.newChannel();
    channelFactory是什么?它的類型是ReflectiveChannelFactory,如果大家不記得了,可以看看上一篇channel設(shè)置那個(gè)地方。
    https://blog.51cto.com/483181/2119149
public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {

        }
    }   
}       

它的newChannel方法也是非常的簡(jiǎn)單,直接實(shí)例化傳入的channel對(duì)象,也就是NioServerSocketChannel (可以看上一篇初始化的分析)
代碼如下:

ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)

我們先看看NioServerSocketChannel的實(shí)現(xiàn)

2.5 NioServerSocketChannel

先看下NioServerSocketChannel的繼承關(guān)系
58. Netty源代碼分析-ServerBootstrap bind 過(guò)程-1

NioServerSocketChannel提供了一個(gè)無(wú)參構(gòu)造函數(shù),然后分別有SelectorProvider,ServerSocketChannel的構(gòu)造函數(shù),如下:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
        }
    }

    private final ServerSocketChannelConfig config;

    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

無(wú)參構(gòu)造函數(shù)里面調(diào)用newSocket(xx),參數(shù)是SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
先看看SelectorProvider.provider()

private static SelectorProvider provider = null;
public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
                ...
        }
    }

可以看到provider是個(gè)單例,不知道大家是否記得上上一篇文章(NioEventLoopGroup實(shí)例化)分析的時(shí)候也有provider,類型是KQueueSelectorProvider
具體可以看: https://blog.51cto.com/483181/2118817

回到newSocket里面,調(diào)用的是provider.openServerSocketChannel()

代碼是SelectorProviderImpl里面,返回的是 ServerSocketChannel

public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }

得到ServerSocketChannel之后,繼續(xù)調(diào)用構(gòu)造函數(shù)

public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

這個(gè)構(gòu)造方法里面做了兩件事

  1. 調(diào)用父類的構(gòu)造方法
  2. 利用剛剛生成好的ServerSocketChannel實(shí)例化了一個(gè)NioServerSocketChannelConfig

看它的父類構(gòu)造函數(shù)是怎么實(shí)現(xiàn)的

首先是AbstractNioChannel.java

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
        }
    }
  1. 繼續(xù)調(diào)用父類的構(gòu)造方法
  2. 首先吧傳入的ServerSocketChannel保存起來(lái),變量是ch
  3. 然后把readInterestOp存起來(lái),變量是readInterestOp,值是SelectionKey.OP_ACCEPT
  4. 調(diào)用ch.configureBlocking(false);把channel設(shè)置成非阻塞。
    這里稍微介紹下SelectionKey.OP_ACCEPT
    SelectionKey有4種類型,是java提供的,分別是
public static final int OP_READ = 1 << 0;

public static final int OP_WRITE = 1 << 2;

public static final int OP_CONNECT = 1 << 3;

public static final int OP_ACCEPT = 1 << 4;

然后繼續(xù)看AbstractNioChannel的父類構(gòu)造方法,也就是AbstractChannel

private final ChannelId id;
protected abstract AbstractUnsafe newUnsafe();
private final DefaultChannelPipeline pipeline;

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
}

可以看到這幾點(diǎn):

  1. Channel parent變量,null
  2. 初始化ChannelId id
  3. 初始化unsafe
  4. 初始化pipeline

先看unsafe的初始化

2.6 newUnsafe

在AbstractChannel里面,它是一個(gè)抽象類

protected abstract AbstractUnsafe newUnsafe();

實(shí)現(xiàn)類在子類AbstractNioMessageChannel里面,如下,類型是NioMessageUnsafe

@Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }

NioMessageUnsafe代碼后面再看。

繼續(xù)看pipeline的初始化,初始化了一個(gè) DefaultChannelPipeline

protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

在DefaultChannelPipeline里面初始化了一個(gè)head和tail,分別是HeadContext和TailConext類型,而且head和tail組成雙向鏈表。
head和tail的區(qū)別之一就是inbound和outbound值是相反的,如下:

節(jié)點(diǎn) inbound outbound
head false true
tail true false
HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }

TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }               

借一張圖顯示下ChannelInBound和ChannelOutBound,如下。head是發(fā)送出去的入口,tail是接收消息的入口。

58. Netty源代碼分析-ServerBootstrap bind 過(guò)程-1

另外我們來(lái)看一下添加一個(gè)ChannelHandler的流程,比如addLast

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            ...
                    return this;
    }

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }       

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }       
  1. 首先它初始化了一個(gè)DefaultChannelHandlerContext對(duì)象,里面封裝了要add的channelHandler,這個(gè)很重要,在Netty的pipeLine里面,都是通過(guò)ChannelHandlerContext來(lái)描述的,不是直接添加channelHandler。

  2. addLast0()里面就是簡(jiǎn)單的雙向鏈表添加的方法,把封裝了channelHandler的ChannelHandlerContext對(duì)象添加到tail的前一個(gè)節(jié)點(diǎn)。

那,我們來(lái)總結(jié)下NioServerSocketChannel的初始化過(guò)程:

1. NioServerSocketChannel提供了一個(gè)無(wú)參構(gòu)造函數(shù),里面SelectorProvider DEFAULT_SELECTOR_PROVIDER,它是一個(gè)單例,類型是KQueueSelectorProvider。

2. 我們調(diào)用KQueueSelectorProvider.openServerSocketChannel()方法,得到一個(gè)ServerSocketChannel

3. 我們用生成的ServerSocketChannel對(duì)象創(chuàng)建了一個(gè)ServerSocketChannelConfig config,具體是NioServerSocketChannelConfig對(duì)象,存在NioServerSocketChannel里面

4. 我們用生成的ServerSocketChannel調(diào)用它的父類構(gòu)造函數(shù),先來(lái)到了AbstractNioChannel

5. 在AbstractNioChannel會(huì)把ServerSocketChannel存起來(lái),變量是ch,然后把channel設(shè)置成非阻塞。

6. AbstractNioChannel還會(huì)把readInterestOp存起來(lái),類型是SelectionKey.OP_ACCEPT

7. 繼續(xù)調(diào)用父類構(gòu)造函數(shù),來(lái)到AbstractChannel

8. AbstractChannel里面的parent設(shè)置成null

9. AbstractChannel初始化channel id

10. AbstractChannel初始化unsafe,類型是NioMessageUnsafe.

11. AbstractChannel初始化pipeline,類型是DefaultChannelPipeline, 每個(gè)Channel都有一個(gè)自己的Pipeline

看完NioServerSocketChannel的實(shí)例化方法后,我們繼續(xù)往下看init

2.7 init

abstract void init(Channel channel) throws Exception;

AbstractBootstrap里面的init(channel)方法是一個(gè)抽象方法,參數(shù)是Channel類型,其實(shí)就是上一步實(shí)例化好的NioServerSocketChannel對(duì)象。

具體實(shí)現(xiàn)方法在它的子類ServerBootstrap和Bootstrap(給客戶端啟動(dòng)使用的),那我們是分析服務(wù)端的代碼,所以看ServerBootstrap里面的實(shí)現(xiàn)。

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) { //1. 設(shè)置options
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { //設(shè)置attr屬性
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

先來(lái)看設(shè)置options

2.8 setOptions

final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) { //1. 設(shè)置options
            setChannelOptions(channel, options, logger);
        }

static void setChannelOptions(
            Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
        for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
            setChannelOption(channel, e.getKey(), e.getValue(), logger);
        }
    }

private static void setChannelOption(
            Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
        try {
            if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
            }
        } catch (Throwable t) {}
    }       

這段代碼我們這樣看

  1. options是哪來(lái)的?
    options是一個(gè)map,服務(wù)器代碼是這樣設(shè)置的
 b.xxxx.
    .option(ChannelOption.SO_BACKLOG, 100)
  1. 它其實(shí)調(diào)用的是channel.config()對(duì)象去設(shè)置option,那config對(duì)象是什么呢?這個(gè)上面分析Channel初始化的時(shí)候說(shuō)過(guò),它是NioServerSocketChannelConfig對(duì)象,NioServerSocketChannelConfig的類繼承關(guān)系如下:

58. Netty源代碼分析-ServerBootstrap bind 過(guò)程-1`

  1. 所以setOption的實(shí)現(xiàn)在DefaultServerSocketChannelConfig里面
@Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);

        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_BACKLOG) {
            setBacklog((Integer) value);
        } else {
            return super.setOption(option, value);
        }

        return true;
    }

父類 DefaultChannelConfig.java

public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);

        if (option == CONNECT_TIMEOUT_MILLIS) {
            setConnectTimeoutMillis((Integer) value);
        } else if (option == MAX_MESSAGES_PER_READ) {
            setMaxMessagesPerRead((Integer) value);
        } else if (option == WRITE_SPIN_COUNT) {
            setWriteSpinCount((Integer) value);
        } else if (option == ALLOCATOR) {
            setAllocator((ByteBufAllocator) value);
        } else if (option == RCVBUF_ALLOCATOR) {
            setRecvByteBufAllocator((RecvByteBufAllocator) value);
        } else if (option == AUTO_READ) {
            setAutoRead((Boolean) value);
        } else if (option == AUTO_CLOSE) {
            setAutoClose((Boolean) value);
        } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
            setWriteBufferHighWaterMark((Integer) value);
        } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
            setWriteBufferLowWaterMark((Integer) value);
        } else if (option == WRITE_BUFFER_WATER_MARK) {
            setWriteBufferWaterMark((WriteBufferWaterMark) value);
        } else if (option == MESSAGE_SIZE_ESTIMATOR) {
            setMessageSizeEstimator((MessageSizeEstimator) value);
        } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
            setPinEventExecutorPerGroup((Boolean) value);
        } else {
            return false;
        }

        return true;
    }       

根據(jù)傳入的屬性不行,用不同的方法進(jìn)行設(shè)置,這些屬性的值大家可以去單獨(dú)百度,可能不同的環(huán)境配置不同的值對(duì)服務(wù)器性能有好處。

那繼續(xù)往下面看,設(shè)置attr

2.9 setAttr

setAttr是封裝了一個(gè)Attribute的類,然后存儲(chǔ)key,value,大家具體要看的話,可以看DefaultAttributeMap.java

繼續(xù)往下看

2.10 addLast

p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

上面這段代碼,我們一步步看

  1. 首先,config.handler()是哪里來(lái)的?其實(shí)就是我們?cè)O(shè)置的handler,這一點(diǎn)可以從上一篇分析看到
    https://blog.51cto.com/483181/2119149
b..handler(new LoggingHandler(LogLevel.INFO));

所以 pipeline.addLast(handler); 就是把我們?cè)O(shè)置的handler添加到pipeline里面。

  1. 然后又實(shí)例化了一個(gè)ServerBootstrapAcceptor,把childHandler那些參數(shù)都傳了進(jìn)去,具體在ServerBootstrapAcceptor里面怎么使用這些childHandler的.
ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });

ServerBootstrapAcceptor是把客戶端連接的channel從bossGroup轉(zhuǎn)移到workGroup,代碼如下:
ServerBootstrap.java

@Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

上面這段代碼把客戶端的channel讀進(jìn)來(lái)轉(zhuǎn)換成一個(gè)channel類型,然后調(diào)用childGroup,然后把channel注冊(cè)進(jìn)去,這樣workGroup就接手了channel后面的事情。

那init就看完了,總結(jié)一下init做的事情

  1. 設(shè)置options,參數(shù)有很多,不同的服務(wù)器業(yè)務(wù)可以用不用的參數(shù)。
  2. 設(shè)置attr
  3. 把handler添加到pipeLine的尾部
  4. 初始化了一個(gè)ServerBootstrapAcceptor,里面封裝了childHandler的那些參數(shù)。

其實(shí)看到這里,我們會(huì)發(fā)現(xiàn)init還只是初始化參數(shù),把handler添加到pipeLine里面,做好一切準(zhǔn)備,并沒有bind服務(wù)器端口。

那我們繼續(xù)看

ChannelFuture regFuture = config().group().register(channel);

2.12 register

先繼續(xù)貼一下initAndRegister的代碼,因?yàn)樯厦嬷v的東西有點(diǎn)多,大家可能忘記initAndRegister里面的代碼了。

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); //1. NioServerSocketChannel的初始化已經(jīng)講了
            init(channel); //2. init過(guò)程已經(jīng)講了
        } catch (Throwable t) {
        }

        ChannelFuture regFuture = config().group().register(channel); //3. 現(xiàn)在講register
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

如同上面的注釋,我們講register過(guò)程

  1. config.group()是什么呢?參考我們上一篇的ServerBootstrap初始化,config.group()指的bossGroup,類型是NioEventLoopGroup
    ServerBootstrap初始化
EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)

由于NioEventLoopGroup繼承自MultithreadEventLoopGroup,所以調(diào)用的是MultithreadEventLoopGroup的register(channel)方法,如下:

public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

@Override
    public EventExecutor next() {
        return chooser.next();
    }       

那next()又是什么呢?在上篇 NioEventLoopGroup實(shí)例化 里面我們分析了,NioEventLoopGroup里面初始化了跟傳入線程數(shù)目相同的NioEventLoop對(duì)象,而next()方法有兩種算法選出下一個(gè)NioEventLoop對(duì)象是什么。

這兩種算法是PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser,所以我們就可以知道繼續(xù)會(huì)調(diào)用NioEventLoop對(duì)象的register(channel)對(duì)象。

而NioEventLoop類并沒有實(shí)現(xiàn)register(channel)方法,它繼承自SingleThreadEventLoop,它里面有實(shí)現(xiàn)register(channel)方法,如下:

public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

這個(gè)方法里面實(shí)例化了一個(gè)DefaultChannelPromise對(duì)象,它其實(shí)就是保存channel和當(dāng)前的NioEventLoop對(duì)象,做了一層封裝而已,如下:

public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = checkNotNull(channel, "channel");
    }

public DefaultPromise(EventExecutor executor) {
        this.executor = checkNotNull(executor, "executor");
    }       

所以我們可以暫時(shí)不管它,繼續(xù)往下面走.

@Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

調(diào)用的是unsafe.register(this, promise)

那unsafe是什么對(duì)象呢?從上面2.6可以看到unsafe()初始化的是NioMessageUnsafe對(duì)象

protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }

由于NioMessageUnsafe并沒有重寫register(EventLoop eventLoop, ChannelPromise promise)方法,所以追蹤它的父類,最后在AbstractUnsafe里面看到了register(EventLoop eventLoop, ChannelPromise promise),如下:
先附上NioMessageUnsafe的繼承關(guān)系圖:
58. Netty源代碼分析-ServerBootstrap bind 過(guò)程-1

AbstractUnsafe.java

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ...
                }
            }
        }

都會(huì)走到register0(promise)這個(gè)方法里面,繼續(xù)看register0(promise)

private void register0(ChannelPromise promise) {
            try {
                ...

                boolean firstRegistration = neverRegistered;
                doRegister(); //1. 
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                ...
            }
        }

先看doRegister

2.13 doRegister

這個(gè)方法在AbstractChannel里面,是個(gè)空實(shí)現(xiàn)

/**
     * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
     *
     * Sub-classes may override this method
     */
    protected void doRegister() throws Exception {
        // NOOP
    }

在AbstractNioChannel里面有重寫

@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
            }
        }
    }

protected SelectableChannel javaChannel() {
        return ch;
}       
  1. 首先,ch是ServerSocketChannelImpl類型,這個(gè)可以從上面 2.5 NioServerSocketChannel的初始化可以看出來(lái)來(lái)
public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
}

ServerSocketChannelImpl是JDK提供的類,那javaChannel().register(xxx)就是調(diào)用JDK nio的方法實(shí)現(xiàn)register,那就不繼續(xù)深入下去了。

  1. 但是這里有個(gè)疑惑,調(diào)用register的時(shí)候傳入的ops是0,并沒有使用上面4種監(jiān)聽類型的任何一種,這個(gè)先記下來(lái)。
public static final int OP_READ = 1 << 0;

public static final int OP_WRITE = 1 << 2;

public static final int OP_CONNECT = 1 << 3;

public static final int OP_ACCEPT = 1 << 4;
  1. eventLoop().unwrappedSelector()是什么呢?
    從上一篇NioEventGroupLoop初始化 2.2.3分析可以知道,它是一個(gè)KQueueSelectorImpl,繼承自Selector

58. Netty源代碼分析-ServerBootstrap bind 過(guò)程-1

那我們可以這樣理解,上面這段代碼是把一個(gè)Selector對(duì)象注冊(cè)到Java的 Channel里面,這個(gè)Channel和我們上面講的Netty Channel不是一個(gè)東西。

繼續(xù)看register0()

2.14 pipeline.fireChannelRegistered()

private void register0(ChannelPromise promise) {
            try {
                ...
                doRegister(); //1. 把selector注冊(cè)到Java channel, ops = 0
                ...
                pipeline.fireChannelRegistered(); //2. 通知handler channel已經(jīng)注冊(cè)

                                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
                ...
            } catch (Throwable t) {
                ...
            }
        }

pipeline里面維護(hù)channelHandler的列表,通過(guò)鏈表的方法,如DefaultChannelPipeline.java里面

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

然后通知channel registered,如果channelHandler有重寫channelRegitstered(ChannelHandlerContext ctx)的話,就會(huì)被回調(diào)。如LoggingHandler就會(huì)打印

58. Netty源代碼分析-ServerBootstrap bind 過(guò)程-1

然后判斷isActive(),isActive()是一個(gè)多態(tài)方法,對(duì)于服務(wù)器,它是判斷監(jiān)聽是否啟動(dòng);
NioServerSocketChannle.java

@Override
    public boolean isActive() {
        return javaChannel().socket().isBound();
    }

對(duì)于客戶端,它是判斷TCP連接是否完成
NioSocketChannel.java

@Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

我們這里直講服務(wù)器,如果isActive(),那么就會(huì)調(diào)用 pipeline.fireChannelActive(); 通知channelHander已經(jīng)active,這樣就會(huì)回調(diào)他們的channelActive方法。

繼續(xù)看pipeline.fireChannelActive();

DefaultChannelPipeline.java

@Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }

AbstractChannelHandlerContext.invokeChannelActive方法就不看了,就是調(diào)用參數(shù)的channelActive。由于參數(shù)是head,那么我們?nèi)タ碿hannelActive方法。

DefaultChannelPipeline.java

@Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }

private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }               

調(diào)用的是channel.read(),channel是NioServerSocketChannel,它的實(shí)現(xiàn)是在父類AbstractChannel.java里面

@Override
    public Channel read() {
        pipeline.read();
        return this;
    }

DefaultChannelPipeline.java

@Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }

AbstractChannelHandlerContext.java

@Override
    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            ...
        }

        return this;
    }

private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }       

首先要尋找findContextOutbound,由于head的inbound=false,outbound=true,所以next=head,那么就是調(diào)用head的read方法,如下:
DefaultChannelPipeline.java

@Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

AbstractChannel.java

@Override
        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                ...
            }
        }

直接看doBeginRead()

AbstractNioChannel.java

@Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

還記得我們初始化NioServerSocketChannel的時(shí)候,我們傳給父類的readInterestOp嗎?沒錯(cuò),就是SelectionKey.OP_ACCEPT,如下:

public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;

                ...
}               

上面doReadBegin就是把我們?cè)O(shè)置的readInterestOp重新設(shè)置到j(luò)ava selector上面,代表我們監(jiān)聽的類型是SelectionKey.OP_ACCEPT,不在是最開始的0了。

到這里,initAndRegister方法就基本講完了,再貼一次它的代碼,加深下印象。

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); //1. 實(shí)例化NioServerSocketChannel
            init(channel); //2. 初始化
        } catch (Throwable t) {
        }

        ChannelFuture regFuture = config().group().register(channel); //3. 注冊(cè)selector到Java channel上面,注冊(cè)類型是0
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

我們?cè)賮?lái)回憶一下initAndRegister方法

1. 實(shí)例化NioServerSocketChannel對(duì)象,channelFactory.newChannel()
a. 傳入父類的ops是SelectionKey.OP_ACCEPT
b. 它的父類AbstractNioChannel把channel設(shè)置成非阻塞,然后把SelectionKey.OP_ACCEPT存起來(lái)
c. 父類AbstractChannel初始化了ChannelId
d. AbstractChannel初始化了unsafe,類型是NioMessageUnsafe。
e. AbstractChannel初始化了pipeline,類型是DefaultChannelPipeline,每個(gè)channel都有自己的pipleline,它維護(hù)了channelHandler列表,如果有事件發(fā)生,那么pipeline就負(fù)責(zé)把事件從頭傳到尾。

2. init方法
a. 它是在子類ServerBootstrap里面實(shí)現(xiàn),子類Bootstrap實(shí)現(xiàn)的是客戶端的。
b. setOptions()設(shè)置屬性,類型有很多,不同的業(yè)務(wù)場(chǎng)景可以設(shè)置不同的屬性。
c. addLast把我們?cè)O(shè)置的channelHandler添加到pipeline
d. 實(shí)例化了一個(gè)ServerBootstrapAcceptor,里面封裝了childChannel,也添加到pipeline里面

3. register
a. register調(diào)用的是bossGroup NioEventLoopGroup的register方法,NioEventLoopGroup regitster方法調(diào)用的next().regitster,next()調(diào)用chooser.next.
b. chooser有兩種PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser,它們負(fù)責(zé)選擇NioEventLoopGroup里面下一個(gè)NioEventLoop(NioEventLoopGroup里面有nThreads個(gè)NioEventLoop,nThreads表示線程數(shù),默認(rèn)是cpu*2)
c. NioEventLoop.register調(diào)用的是它的父類SingleThreadEventLoop.register,所以它調(diào)用的是unsafe.register。從上面的初始化就可以知道,unsafe指的是NioMessageUnsafe,所以調(diào)用的是NioMessageUnsafe.register
d. NioMessageUnsafe并沒有實(shí)現(xiàn)register,所以調(diào)用的是它的父類AbstractUnsafe.regitster,然后調(diào)用register0
e. 在doRegitster里面把selector注冊(cè)到Java的channel,key=0
f. 調(diào)用pipeline.fireChannelRegistered(),通知pipeline維護(hù)的channelHander,channel已經(jīng)注冊(cè)了,回調(diào)了它們的channelRegitstered方法。

那initAndRegister就講完了,bind過(guò)程還沒有結(jié)束,因?yàn)槠悬c(diǎn)多了,下一篇繼續(xù)介紹doBind0:

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); //1. 這一篇的內(nèi)容
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise); //2. 下一篇講doBind0()
            return promise;
        } else {
            ...
            });
            return promise;
        }
    }
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI