溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

56. Netty源代碼分析-服務(wù)器初始化 NioEventLoopGroup實(shí)例化

發(fā)布時(shí)間:2020-07-14 13:39:05 來(lái)源:網(wǎng)絡(luò) 閱讀:1126 作者:rongwei84n 欄目:軟件技術(shù)

一. 代碼下載

Netty代碼下載和編譯參考前一篇Netty文章
https://blog.51cto.com/483181/2112163

二. 服務(wù)器代碼分析

2.1 服務(wù)器代碼編寫

一般Netty服務(wù)器端這樣編寫

    EventLoopGroup bossGroup = new NioEventLoopGroup();   //1. 實(shí)例化NioEventLoopGroup對(duì)象
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); //2. 
            b.group(bossGroup, workerGroup) //3. 
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                    }
                });

            ChannelFuture f = b.bind(port).sync(); //4.
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

2.2 NioEventLoopGroup

2.2.1 NioEventLoopGroup繼承關(guān)系

一步步來(lái)看,首先看第一個(gè)注釋,初始化NioEventLoopGroup對(duì)象

EventLoopGroup bossGroup = new NioEventLoopGroup();   //1. 實(shí)例化NioEventLoopGroup對(duì)象

下圖是NioEventLoopGroup的類繼承圖,包含類成員和方法,比較詳細(xì)。 這個(gè)功能是IntelliJ 自帶的。
右擊NioEventLoopGroup類名,選擇Diagrams->Show Diagram->上面有f,m的按鈕,分別對(duì)應(yīng)field和method。

如下:

56. Netty源代碼分析-服務(wù)器初始化 NioEventLoopGroup實(shí)例化

2.2.2 NioEventLoopGroup構(gòu)造函數(shù)

public NioEventLoopGroup() {
        this(0);
    }

        public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }       

        public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

我們可以看到幾點(diǎn)

  1. NioEventLoopGroup采用的是構(gòu)造函數(shù)重載的方式,以適應(yīng)不同的初始化場(chǎng)景
  2. Executor傳的是null
  3. SelectorProvider用的是SelectorProvider.provider()
  4. 然后把構(gòu)造好的參數(shù)都傳給父類MultithreadEventLoopGroup (繼承關(guān)系可以看上圖)

2.2.3 SelectorProvider.provider()

private static SelectorProvider provider = null;

public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
        return new KQueueSelectorProvider();
    }
}

public class KQueueSelectorProvider extends SelectorProviderImpl {
    public KQueueSelectorProvider() {
    }

    public AbstractSelector openSelector() throws IOException {
        return new KQueueSelectorImpl(this);
    }
}

這段代碼我們也可以看到幾點(diǎn):

  1. SelectorProvider provider是一個(gè)單例,static類型
  2. SelectorProvider.provider的實(shí)現(xiàn),產(chǎn)生了一個(gè)KQueueSelectorProvider
  3. KQueueSelectorProvider的openSelector會(huì)生成一個(gè)KQueueSelectorImpl

這個(gè)先記下來(lái),也許后面分析會(huì)有用,繼續(xù)分析MultithreadEventLoopGroup的構(gòu)造函數(shù)。

2.2.4 MultithreadEventLoopGroup

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    }       

上面這段代碼我們可以看到這幾點(diǎn):

  1. 如果我們實(shí)例化NioEventLoopGroup沒(méi)有傳入?yún)?shù),也就是沒(méi)有nThreads,那么就會(huì)采用默認(rèn)的DEFAULT_EVENT_LOOP_THREADS
    DEFAULT_EVENT_LOOP_THREADS如果沒(méi)有配置io.netty.eventLoopThreads的話,一般是cpu核數(shù)*2
  2. MultithreadEventLoopGroup的實(shí)例化方法是繼續(xù)調(diào)用父類的初始化方法。

繼續(xù)父類MultithreadEventExecutorGroup

2.2.5 MultithreadEventExecutorGroup

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        ...

        children = new EventExecutor[nThreads]; //1. 實(shí)例化children數(shù)組

        for (int i = 0; i < nThreads; i ++) { //2. 循環(huán)初始化children
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                ...
            }
        }

        chooser = chooserFactory.newChooser(children); //3. 實(shí)例化chooser

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

上面這段代碼可以從下面幾個(gè)點(diǎn)分析:

private final EventExecutor[] children;
  1. children - EventExecutor數(shù)組,大小是nThreads,線程數(shù)目。
  2. newChild初始化
    實(shí)例類是NioEventLoopGroup.java,返回NioEventLoop對(duì)象
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

    NioEventLoop的繼承關(guān)系是這樣的,繼承于SingleThreadEventLoop,別忘了上面我們看到NioEventLoopGroup繼承自MultithreadEventLoopGroup.(看名字是單線程和多線程的區(qū)別?)

56. Netty源代碼分析-服務(wù)器初始化 NioEventLoopGroup實(shí)例化

繼續(xù)看NioEventLoop的構(gòu)造函數(shù)

2.2.6 NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
                if (DISABLE_KEYSET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }
                ...
}

從上面這段代碼我們可以看出這幾點(diǎn)

  1. NioEventLoop里面保存了SelectorProvider selectorProvider, Selector selector, unwrappedSelector(類型是KQueueSelectorImpl)
  2. selector, unwrappedSelector是通過(guò)provider.openSelector()打開的.
    根據(jù)2.3段的介紹,provider之前介紹的類型是KQueueSelectorProvider,然后它的openSelector會(huì)生成一個(gè)KQueueSelectorImpl
    所以provider.openSelector()得到是KQueueSelectorImpl,KQueueSelectorImpl的繼承關(guān)系如下:

    56. Netty源代碼分析-服務(wù)器初始化 NioEventLoopGroup實(shí)例化

繼續(xù)往回看,看MultithreadEventExecutorGroup的構(gòu)造函數(shù)。

2.2.7 newChooser

EventExecutorChooserFactory.EventExecutorChooser chooser;

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

chooser = chooserFactory.newChooser(children);

上面代碼我們可以看到:

  1. chooserFactory的類型是DefaultEventExecutorChooserFactory,所以newChooser調(diào)用的是DefaultEventExecutorChooserFactory.newChooser方法。
    如下:
public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
  1. 傳入的參數(shù)是children,也就是NioEventLoop數(shù)組
  2. DefaultEventExecutorChooserFactory INSTANCE是一個(gè)static final類型對(duì)象,也就是一種餓漢式的單例模式,如下:
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

繼續(xù)看newChooser的實(shí)現(xiàn)

2.2.8 newChooser

newChooser的代碼就不貼了,上面就有,從上面代碼可以看到:

  1. isPowerOfTwo是用來(lái)判斷一個(gè)整數(shù)是否是2的冪,比如(2,4, 8,16,32等等),它的實(shí)現(xiàn)方式如下:
private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }       

這種實(shí)現(xiàn)方法感覺比較優(yōu)雅和高效,首先拿到-val,也就是val的二進(jìn)制倒轉(zhuǎn),然后+1。再做&運(yùn)算。
大家自己可以拿到數(shù)字舉個(gè)例子,比較巧妙。后續(xù)自己寫代碼可以借鑒,這是讀源碼的一個(gè)好處,可以學(xué)習(xí)到別人很多優(yōu)秀的寫法。

  1. PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser的不同之處在于next方法的算法不一樣,作用都是從NioEventLoop數(shù)組里面選出一個(gè)NioEventLoop對(duì)象來(lái)。

但是說(shuō)實(shí)話,我沒(méi)有想到這兩種算法有什么區(qū)別,如果誰(shuí)知道,請(qǐng)告訴我,謝謝。

return executors[idx.getAndIncrement() & executors.length - 1];

return executors[Math.abs(idx.getAndIncrement() % executors.length)];

繼續(xù)往回走,MultithreadEventExecutorGroup的構(gòu)造函數(shù)就基本看完了。

三. 總結(jié)

我們來(lái)總結(jié)下NioEventLoopGroup的實(shí)例化過(guò)程,可以得到以下幾點(diǎn)。

1. NioEventLoopGroup的父類MultithreadEventExecutorGroup包含一個(gè)NioEventLoop數(shù)組children,數(shù)組的大小等于nThreads線程數(shù)目。如果沒(méi)有指定,默認(rèn)一般是cpu核數(shù) x 2

2. NioEventLoopGroup和NioEventLoop一樣都是繼承自Executor,但是NioEventLoopGroup又包含多個(gè)NioEventLoop(children數(shù)組),這種關(guān)系有點(diǎn)像android里面ViewGroup和View的關(guān)系?;蛘哐b飾者模式?

3. NioEventLoopGroup繼承自MultithreadEventLoopGroup,而NioEventLoop繼承自SingleThreadEventLoop,從名字看,不知道和多線程,單線程有沒(méi)有關(guān)系。

4. MultithreadEventLoopGroup有個(gè)chooser,執(zhí)行next方法的時(shí)候,會(huì)選擇下一個(gè)NioEventLoop對(duì)象,雖然并不知道兩個(gè)chooser算法有何區(qū)別。

5. NioEventLoopGroup里面重寫了newChild方法,里面實(shí)例化NioEventLoop。

6. NioEventLoop里面包含了Selector,類型是KQueueSelectorImpl

SelectorProvider provider
SelectStrategy selectStrategy

SelectStrategy這個(gè)我們上面我們沒(méi)有關(guān)注,其實(shí)它是NioEventLoopGroup構(gòu)造函數(shù)傳進(jìn)去的,如下:

public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
    public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();

    private DefaultSelectStrategyFactory() { }

    @Override
    public SelectStrategy newSelectStrategy() {
        return DefaultSelectStrategy.INSTANCE;
    }
}

final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

    private DefaultSelectStrategy() { }

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
}

所以SelectStrategy的實(shí)現(xiàn)類是DefaultSelectStrategy.

在理清楚NioEventLoopGroup實(shí)例化的過(guò)程之后,我們下一篇繼續(xù)按照源代碼分析Netty服務(wù)器端的源代碼。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI