溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

分布式Netty源碼EventLoopGroup分析

發(fā)布時間:2022-03-25 09:11:23 來源:億速云 閱讀:160 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“分布式Netty源碼EventLoopGroup分析”的相關(guān)知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“分布式Netty源碼EventLoopGroup分析”文章能幫助大家解決問題。

    EventLoopGroup介紹

    在前面一篇文章中提到了,EventLoopGroup主要負責2個事情,這里再重復下:

    它主要包含2個方面的功能,注冊Channel和執(zhí)行一些Runnable任務。

    分布式Netty源碼EventLoopGroup分析

    功能1:先來看看注冊Channel

    即將Channel注冊到Selector上,由Selector來調(diào)度Channel的相關(guān)事件,如讀、寫、Accept等事件。

    而EventLoopGroup的設(shè)計是,它包含多個EventLoop(每一個EventLoop通常內(nèi)部包含一個線程),在執(zhí)行上述注冊過程中是需要選擇其中的一個EventLoop來執(zhí)行上述注冊行為,這里就出現(xiàn)了一個選擇策略的問題,該選擇策略接口是EventExecutorChooser,你也可以自定義一個實現(xiàn)。

    從上面可以看到,EventLoopGroup做的工作大部分是一些總體性的工作如初始化上述多個EventLoop、EventExecutorChooser等,具體的注冊Channel還是交給它內(nèi)部的EventLoop來實現(xiàn)。

    功能2:執(zhí)行一些Runnable任務

    EventLoopGroup繼承了EventExecutorGroup,EventExecutorGroup也是EventExecutor的集合,EventExecutorGroup也是掌管著EventExecutor的初始化工作,EventExecutorGroup對于Runnable任務的執(zhí)行也是選擇內(nèi)部中的一個EventExecutor來做具體的執(zhí)行工作。

    netty中很多任務都是異步執(zhí)行的,一旦當前線程要對某個EventLoop執(zhí)行相關(guān)操作,如注冊Channel到某個EventLoop,如果當前線程和所要操作的EventLoop內(nèi)部的線程不是同一個,則當前線程就僅僅向EventLoop提交一個注冊任務,對外返回一個ChannelFuture。

    總結(jié):EventLoopGroup含有上述2種功能,它更多的是一個集合,但是具體的功能實現(xiàn)還是選擇內(nèi)部的一個item元素來執(zhí)行相關(guān)任務。 這里的內(nèi)部item元素通常即實現(xiàn)了EventLoop,又實現(xiàn)了EventExecutor,如NioEventLoop等

    繼續(xù)來看看EventLoopGroup的整體類圖

    分布式Netty源碼EventLoopGroup分析

    從圖中可以看到有2路分支:

    • 1 MultithreadEventLoopGroup:用于封裝多線程的初始化邏輯,指定線程數(shù)等,即初始化對應數(shù)量的EventLoop,每個EventLoop分配到一個線程

    分布式Netty源碼EventLoopGroup分析

    上圖中的newChild方法,NioEventLoopGroup就采用NioEventLoop作為實現(xiàn),EpollEventLoopGroup就采用EpollEventLoop作為實現(xiàn)

    如NioEventLoopGroup的實現(xiàn):

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    • 2 EventLoop接口實現(xiàn)了EventLoopGroup接口,主要因為EventLoopGroup中的功能接口還是要靠內(nèi)部的EventLoop來完成具體的操作

    EventLoop介紹

    EventLoop主要工作就是注冊Channel,并負責監(jiān)控管理Channel的讀寫等事件,這就涉及到不同的監(jiān)控方式,linux下有3種方式來進行事件監(jiān)聽

    select、poll、epoll

    目前java的Selector接口的實現(xiàn)如下:

    PollSelectorImpl:實現(xiàn)了poll方式

    EPollSelectorImpl:實現(xiàn)了epoll方式

    而Netty呢則使用如下:

    NioEventLoop:采用的是jdk Selector接口(使用PollSelectorImpl的poll方式)來實現(xiàn)對Channel的事件檢測

    EpollEventLoop:沒有采用jdk Selector的接口實現(xiàn)EPollSelectorImpl,而是Netty自己實現(xiàn)的epoll方式來實現(xiàn)對Channel的事件檢測,所以在EpollEventLoop中就不存在jdk的Selector。

    NioEventLoop介紹

    對于NioEventLoopGroup的功能,NioEventLoop都要做實際的實現(xiàn),NioEventLoop既要實現(xiàn)注冊功能,又要實現(xiàn)運行Runnable任務

    對于注冊Channel:NioEventLoop將Channel注冊到NioEventLoop內(nèi)部的PollSelectorImpl上,來監(jiān)聽該Channel的讀寫事件

    對于運行Runnable任務:NioEventLoop的父類的父類SingleThreadEventExecutor實現(xiàn)了運行Runnable任務,在SingleThreadEventExecutor中,有一個任務隊列還有一個分配的線程

    private final Queue<Runnable> taskQueue;
    private volatile Thread thread;

    NioEventLoop在該線程中不僅要執(zhí)行Selector帶來的IO事件,還要不斷的從上述taskQueue中取出任務來執(zhí)行這些非IO事件。下面我們來詳細看下這個過程

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();
    
                    processSelectedKeys();
    
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
    
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                ...
            }
        }
    }

    來詳細說下這個過程:

    • 1 計算當前是否需要執(zhí)行select過程

    如果當前沒有Runnable任務,則執(zhí)行select(這個select過程稍后詳細來說)。

    如果當前有Runnable任務,則要去執(zhí)行處理流程,此時順便執(zhí)行下selector.selectNow(),萬一有事件發(fā)生那就賺了,沒有白走這次處理流程

    • 2 根據(jù)IO任務的時間占比設(shè)置來執(zhí)行IO任務和非IO任務,即上面提到的Runnable任務

    如果ioRatio=100則每次都是執(zhí)行全部的IO任務,執(zhí)行全部的非IO任務 默認ioRatio=50,即一半時間用于處理IO任務,另一半時間用于處理非IO任務。怎么去控制非IO任務所占用時間呢?

    這里是每執(zhí)行64個非IO任務(這里可能是每個非IO任務比較短暫,減少一些判斷帶來的消耗)就判斷下占用時間是否超過了上述時間限制

    接下來詳細看下上述select過程

    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
            // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
            // Selector#wakeup. So we need to check task queue again before executing select operation.
            // If we don't, the task might be pended until select operation was timed out.
            // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }
            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);
                rebuildSelector();
                selector = this.selector;
                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }
    } catch (CancelledKeyException e) {
    	...
    }
    • 1 首先計算此次select過程的截止時間

        protected long delayNanos(long currentTimeNanos) {
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
            if (scheduledTask == null) {
                return SCHEDULE_PURGE_INTERVAL;
            }
            return scheduledTask.delayNanos(currentTimeNanos);
        }

    這里其實就是從一個定時 任務隊列中取出定時任務,如果有則計算出離當前定時任務的下一次執(zhí)行時間之差,如果沒有則按照固定的1s作為select過程的時間

    • 2 將當前時間差轉(zhuǎn)化成ms

    如果當前時間差不足0.5ms的話,即timeoutMillis<=0,并且是第一次執(zhí)行,則認為時間太短執(zhí)行執(zhí)行一次selectNow

    • 3 如果有任務,則立即執(zhí)行一次selectNow,跳出for循環(huán)

    • 4 然后就是普通的selector.select(timeoutMillis)

    在這段時間內(nèi)如果有事件則跳出for循環(huán),如果沒有事件則已經(jīng)花費對應的時間差了,再次執(zhí)行for循環(huán),計算的timeoutMillis就會小于0,也會跳出for循環(huán)

    在上述邏輯中,基本selectCnt都是1,不會出現(xiàn)很多次,而這里針對selectCnt有很多次的處理是基于一個情況:

     selector.select(timeoutMillis)

    Selector的正常邏輯是一旦有事件就返回,沒有事件則最多等待timeoutMillis時間。 然而底層操作系統(tǒng)實現(xiàn)可能有bug,會出現(xiàn):即使沒有產(chǎn)生事件就直接返回了,并沒有按照要求等待timeoutMillis時間。

    現(xiàn)在的解決辦法就是: 記錄上述出現(xiàn)的次數(shù),一旦超過512這個閾值(可設(shè)置),就重新建立新的Selector,并將之前的Channel也全部遷移到新的Selector上

    至此,NioEventLoop的主邏輯流程就介紹完了,之后就該重點介紹其中對于IO事件的處理了。然后就會引出來ChannelPipeline的處理流程

    EpollEventLoop介紹

    EpollEventLoop和NioEventLoop的主流程邏輯基本上是差不多的,不同之處就在于EpollEventLoop用epoll方式替換NioEventLoop中的PollSelectorImpl的poll方式。

    這里不再詳細說明了,之后會詳細的說明Netty的epoll方式和jdk中的epoll方式的區(qū)別。

    關(guān)于“分布式Netty源碼EventLoopGroup分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識,可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點。

    向AI問一下細節(jié)

    免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI