溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Netty NioEventLoop啟動過程是怎樣的

發(fā)布時間:2021-12-23 09:54:50 來源:億速云 閱讀:123 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Netty NioEventLoop啟動過程是怎樣的”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

啟動

分析NioEventLoop的execute()接口,主要邏輯如下:

  • 添加任務(wù)隊列

  • 綁定當(dāng)前線程到EventLoop上

  • 調(diào)用EventLoop的run()方法

private static void doBind0(        final ChannelFuture regFuture, final Channel channel,        final SocketAddress localAddress, final ChannelPromise promise) {    // 通過eventLoop來執(zhí)行channel綁定的Task
    channel.eventLoop().execute(new Runnable() {        @Override
        public void run() {            if (regFuture.isSuccess()) {                // channel綁定
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

往下追蹤到 SingleThreadEventExecutor 中 execute 接口,如下:

@Overridepublic void execute(Runnable task) {    if (task == null) {        throw new NullPointerException("task");
    }    // 判斷當(dāng)前運行時線程是否與EventLoop中綁定的線程一致
    // 這里還未綁定Thread,所以先返回false
    boolean inEventLoop = inEventLoop();    // 將任務(wù)添加任務(wù)隊列,也就是我們前面講EventLoop創(chuàng)建時候提到的 MpscQueue.
    addTask(task);    if (!inEventLoop) {        // 啟動線程
        startThread();        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

啟動線程接口:

private void startThread() {    // 狀態(tài)比較,最開始時state = 1 ,為true
    if (state == ST_NOT_STARTED) {        // cs操作后,state狀態(tài)設(shè)置為 2
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {            try {                // 啟動接口
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}// 執(zhí)行線程啟動方法private void doStartThread() {    // 斷言判斷 SingleThreadEventExecutor 還未綁定 Thread
    assert thread == null;    // executor 執(zhí)行任務(wù)
    executor.execute(new Runnable() {        @Override
        public void run() {            // 將 SingleThreadEventExecutor(在我們的案例中就是NioEventLoop) 與 當(dāng)前線程進(jìn)行綁定
            thread = Thread.currentThread();            if (interrupted) {
                thread.interrupt();
            }            // 設(shè)置狀態(tài)為 false
            boolean success = false;            // 更新最近一次任務(wù)的執(zhí)行時間
            updateLastExecutionTime();            try {                // 往下調(diào)用 NioEventLoop 的 run 方法,執(zhí)行
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
               
               ...
               
            }
        }
    });
}

執(zhí)行

往下調(diào)用到 NioEventLoop 中的 run 方法,通過無限for循環(huán),主要做以下三件事情:

  • 輪循I/O事件:select(wakenUp.getAndSet(false))

  • 處理I/O事件:processSelectedKeys

  • 運行Task任務(wù):runAllTasks

@Overrideprotected void run() {    for (;;) {        try {            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                case SelectStrategy.CONTINUE:                    continue;                case SelectStrategy.SELECT:                    // 輪訓(xùn)檢測I/O事件
                    // wakenUp為了標(biāo)記selector是否是喚醒狀態(tài),每次select操作,都設(shè)置為false,也就是未喚醒狀態(tài)。
                    select(wakenUp.getAndSet(false));                    // 'wakenUp.compareAndSet(false, true)' 總是在調(diào)用 'selector.wakeup()' 之前進(jìn)行評估,以減少喚醒的開銷
                    // (Selector.wakeup() 是非常耗性能的操作.)
                    
                    // 但是,這種方法存在競爭條件。當(dāng)「wakeup」太早設(shè)置為true時觸發(fā)競爭條件
                    
                    // 在下面兩種情況下,「wakenUp」會過早設(shè)置為true:
                    // 1)Selector 在 'wakenUp.set(false)' 與 'selector.select(...)' 之間被喚醒。(BAD)
                    // 2)Selector 在 'selector.select(...)' 與 'if (wakenUp.get()) { ... }' 之間被喚醒。(OK)
                    
                    // 在第一種情況下,'wakenUp'設(shè)置為true,后面的'selector.select(...)'將立即喚醒。 直到'wakenUp'在下一輪中再次設(shè)置為false,'wakenUp.compareAndSet(false,true)'將失敗,因此任何喚醒選擇器的嘗試也將失敗,從而導(dǎo)致以下'selector.select(。 ..)'呼吁阻止不必要的。
                    
                    // 要解決這個問題,如果在selector.select(...)操作之后wakenUp立即為true,我們會再次喚醒selector。 它是低效率的,因為它喚醒了第一種情況(BAD - 需要喚醒)和第二種情況(OK - 不需要喚醒)的選擇器。
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }                    // fall through
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;            // ioRatio 表示處理I/O事件與執(zhí)行具體任務(wù)事件之間所耗時間的比值。
            // ioRatio 默認(rèn)為50
            final int ioRatio = this.ioRatio;            if (ioRatio == 100) {                try {                    // 處理I/O事件
                    processSelectedKeys();
                } finally {                    // 處理任務(wù)隊列
                    runAllTasks();
                }
            } else {                // 處理IO事件的開始時間
                final long ioStartTime = System.nanoTime();                try {                    // 處理I/O事件
                    processSelectedKeys();
                } finally {                    // 記錄io所耗時間
                    final long ioTime = System.nanoTime() - ioStartTime;                    // 處理任務(wù)隊列,設(shè)置最大的超時時間
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }        
        // Always handle shutdown even if the loop processing threw an exception.
        try {            if (isShuttingDown()) {
                closeAll();                if (confirmShutdown()) {                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

輪循檢測I/O事件

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;    try {        // select操作計數(shù)
        int selectCnt = 0;        // 記錄當(dāng)前系統(tǒng)時間
        long currentTimeNanos = System.nanoTime();        // delayNanos方法用于計算定時任務(wù)隊列,最近一個任務(wù)的截止時間
        // selectDeadLineNanos 表示當(dāng)前select操作所不能超過的最大截止時間
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);        for (;;) {            // 計算超時時間,判斷是否超時
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;            // 如果 timeoutMillis <= 0, 表示超時,進(jìn)行一個非阻塞的 select 操作。設(shè)置 selectCnt 為 1. 并終止本次循環(huán)。
            if (timeoutMillis <= 0) {                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }                break;
            }            // 當(dāng)wakenUp為ture時,恰好有task被提交,這個task將無法獲得調(diào)用的機會
            // Selector#wakeup. 因此,在執(zhí)行select操作之前,需要再次檢查任務(wù)隊列
            // 如果不這么做,這個Task將一直掛起,直到select操作超時
            // 如果 pipeline 中存在 IdleStateHandler ,那么Task將一直掛起直到 空閑超時。
            
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {                // 調(diào)用非阻塞方法
                selector.selectNow();
                selectCnt = 1;                break;
            }            // 如果當(dāng)前任務(wù)隊列為空,并且超時時間未到,則進(jìn)行一個阻塞式的selector操作。timeoutMillis 為最大的select時間
            int selectedKeys = selector.select(timeoutMillis);            // 操作計數(shù) +1
            selectCnt ++;			
            // 存在以下情況,本次selector則終止
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                // - 輪訓(xùn)到了事件(Selected something,)
                // - 被用戶喚醒(waken up by user,)
                // - 已有任務(wù)隊列(the task queue has a pending task.)
                // - 已有定時任務(wù)(a scheduled task is ready for processing)
                break;
            }            if (Thread.interrupted()) {                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +                            "Thread.currentThread().interrupt() was called. Use " +                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;                break;
            }            // 記錄當(dāng)前時間
            long time = System.nanoTime();            // 如果time > currentTimeNanos + timeoutMillis(超時時間),則表明已經(jīng)執(zhí)行過一次select操作
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } 
            // 如果 time <= currentTimeNanos + timeoutMillis,表示觸發(fā)了空輪訓(xùn)
            // 如果空輪訓(xùn)的次數(shù)超過 SELECTOR_AUTO_REBUILD_THRESHOLD (512),則重建一個新的selctor,避免空輪訓(xùn)
            else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);                // 重建創(chuàng)建一個新的selector
                rebuildSelector();
                selector = this.selector;                // Select again to populate selectedKeys.
                // 對重建后的selector進(jìn)行一次非阻塞調(diào)用,用于獲取最新的selectedKeys
                selector.selectNow();                // 設(shè)置select計數(shù)
                selectCnt = 1;                break;
            }
            currentTimeNanos = time;
        }        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }        // Harmless exception - log anyway
    }
}

重新創(chuàng)建一個新的Selector

該方法的主要邏輯就是:

  • 創(chuàng)建一個新的selector

  • 將老的selector上的 selectKey注冊到新的 selector 上

public void rebuildSelector() {    if (!inEventLoop()) {
        execute(new Runnable() {            @Override
            public void run() {
                rebuildSelector0();
            }
        });        return;
    }
    rebuildSelector0();
}// 重新創(chuàng)建selectorprivate void rebuildSelector0() {    // 暫存老的selector
    final Selector oldSelector = selector;    final SelectorTuple newSelectorTuple;    if (oldSelector == null) {        return;
    }    try {        // 創(chuàng)建一個新的 SelectorTuple
        // openSelector()在之前分析過了
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);        return;
    }    // Register all channels to the new Selector.
    // 記錄select上注冊的channel數(shù)量
    int nChannels = 0;    // 遍歷老的 selector 上的 SelectionKey 
    for (SelectionKey key: oldSelector.keys()) {        // 獲取 attachment,這里的attachment就是我們前面在講 Netty Channel注冊時,select會將channel賦值到 attachment 變量上。
        // 獲取老的selector上注冊的channel 
        Object a = key.attachment();        try {            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {                continue;
            }			// 獲取興趣集
            int interestOps = key.interestOps();            // 取消 SelectionKey
            key.cancel();            // 將老的興趣集重新注冊到前面新創(chuàng)建的selector上
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);            
            if (a instanceof AbstractNioChannel) {                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }            // nChannels計數(shù) + 1
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }	
    // 設(shè)置新的 selector
    selector = newSelectorTuple.selector;    // 設(shè)置新的 unwrappedSelector
    unwrappedSelector = newSelectorTuple.unwrappedSelector;    try {        // time to close the old selector as everything else is registered to the new one
        // 關(guān)閉老的seleclor
        oldSelector.close();
    } catch (Throwable t) {        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }    if (logger.isInfoEnabled()) {
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
}

處理I/O事件

private void processSelectedKeysOptimized() {    for (int i = 0; i < selectedKeys.size; ++i) {        final SelectionKey k = selectedKeys.keys[i];        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        // 設(shè)置為null,有利于GC回收
        selectedKeys.keys[i] = null;		// 獲取 SelectionKey 中的 attachment, 我們這里就是 NioChannel
        final Object a = k.attachment();        if (a instanceof AbstractNioChannel) {            // 處理 SelectedKey
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }        if (needsToSelectAgain) {            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}// 處理 SelectedKeyprivate void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {    // 獲取Netty Channel中的 NioUnsafe 對象,用于后面的IO操作
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();    // 判斷 SelectedKey 的有效性,如果無效,則直接返回并關(guān)閉channel
    if (!k.isValid()) {        final EventLoop eventLoop;        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop != this || eventLoop == null) {            return;
        }        // close the channel if the key is not valid anymore
        // 關(guān)閉channel
        unsafe.close(unsafe.voidPromise());        return;
    }    try {        // 獲取 SelectionKey 中所有準(zhǔn)備就緒的操作集
        int readyOps = k.readyOps();        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        // 在調(diào)用處理READ與WRITE事件之間,先調(diào)用finishConnect()接口,避免異常 NotYetConnectedException 發(fā)生。
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        // 處理 WRITE 事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 處理 ACCEPT 與 READ 事件
        // 如果當(dāng)前的EventLoop是WorkGroup,則表示有 READ 事件
        // 如果當(dāng)前的EventLoop是BossGroup,則表示有 ACCEPT 事件,有新連接進(jìn)來了
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {            // 讀取數(shù)據(jù)
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

關(guān)于 unsafe.read() 的分析,請看  后文

執(zhí)行所有任務(wù)

接下來,我們了解一下執(zhí)行具體Task任務(wù)的接口:runAllTasks。在EventLoop中,待執(zhí)行的任務(wù)隊列分為兩種:一種是普通任務(wù)隊列,一種是定時任務(wù)隊列。

前面  我們講 EventLoop 創(chuàng)建時提到過NioEventLoop中 taskQueue 的創(chuàng)建,是一個MpscQueue,關(guān)于高效率的MpscQueue 后面單獨寫文章進(jìn)行介紹:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    
    ...	
    // 存放普通任務(wù)的隊列
    private final Queue<Runnable> taskQueue;
    ...    
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {        super(parent);        this.addTaskWakesUp = addTaskWakesUp;        this.maxPendingTasks = Math.max(16, maxPendingTasks);        this.executor = ObjectUtil.checkNotNull(executor, "executor");        // 創(chuàng)建TaskQueue
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }  
    
    ...
    
}public final class NioEventLoop extends SingleThreadEventLoop {
	... 
    // NioEventLoop 創(chuàng)建TaskQueue隊列
	@Override
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {        // This event loop never calls takeTask()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }
    
    
    ...
        
}

存放定時任務(wù)的隊列在 AbstractScheduledEventExecutor 中,成員變量為 scheduledTaskQueue,代碼如下:

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {	
    // 優(yōu)先級隊列的比較器
    private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =            new Comparator<ScheduledFutureTask<?>>() {                @Override
                public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {                    return o1.compareTo(o2);
                }
            };	
    // 存放定時任務(wù)的優(yōu)先級隊列
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;	// 創(chuàng)建定時任務(wù)隊列    
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                    SCHEDULED_FUTURE_TASK_COMPARATOR,                    // Use same initial capacity as java.util.PriorityQueue
                    11);
        }        return scheduledTaskQueue;
    }    
    // 保存定時任務(wù)
    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(unit, "unit");        if (delay < 0) {
            delay = 0;
        }
        validateScheduled0(delay, unit);        return schedule(new ScheduledFutureTask<Void>(                this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }    // 保存定時任務(wù)
    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(callable, "callable");
        ObjectUtil.checkNotNull(unit, "unit");        if (delay < 0) {
            delay = 0;
        }
        validateScheduled0(delay, unit);        return schedule(new ScheduledFutureTask<V>(                this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }	// 保存定時任務(wù)
 	<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {        // 判斷是否為當(dāng)前線程
        if (inEventLoop()) {            // 添加定時任務(wù)隊列
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {                @Override
                public void run() {                    // 添加定時任務(wù)隊列
                    scheduledTaskQueue().add(task);
                }
            });
        }        return task;
  	}
}

Netty存放定時任務(wù)隊列為  DefaultPriorityQueue  ,定時任務(wù)的封裝對象為 ScheduledFutureTask ,在隊列中的優(yōu)先按照它們的截止時間進(jìn)行排序,其次在按照id進(jìn)行排序。

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
    ...	
    // 比較 ScheduledFutureTask 之間的排序
    @Override
    public int compareTo(Delayed o) {        if (this == o) {            return 0;
        }
		
        ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;        long d = deadlineNanos() - that.deadlineNanos();        if (d < 0) {            return -1;
        } else if (d > 0) {            return 1;
        } else if (id < that.id) {            return -1;
        } else if (id == that.id) {            throw new Error();
        } else {            return 1;
        }
    }    
    
    ...
    
}

再來看看任務(wù)的執(zhí)行邏輯,首先將定時任務(wù)取出,聚合到普通任務(wù)隊列中,再去for循環(huán)運行每個Task。

protected boolean runAllTasks(long timeoutNanos) {    // 將定時任務(wù)從定時隊列中取出,放入普通隊列中
    fetchFromScheduledTaskQueue();    // 從隊列中取出任務(wù)
    Runnable task = pollTask();    if (task == null) {
        afterRunningAllTasks();        return false;
    }	// 計算任務(wù)執(zhí)行的最大超時時間
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;    // 任務(wù)計數(shù)
    long runTasks = 0;    // 最近一次任務(wù)執(zhí)行的時間
    long lastExecutionTime;    for (;;) {        // 執(zhí)行任務(wù)
        safeExecute(task);		// 任務(wù)計數(shù) +1
        runTasks ++;        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        // 由于nanoTime() 是非常好性能的操作,因此每64次就對比一下 定時任務(wù)的執(zhí)行時間與 deadline,
        // 如果 lastExecutionTime >= deadline,則表示任務(wù)超時了,需要中斷退出
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();            if (lastExecutionTime >= deadline) {                break;
            }
        }		
        // 獲取任務(wù)
        task = pollTask();        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();            break;
        }
    }
	
    afterRunningAllTasks();    // 記錄最后一次的執(zhí)行時間
    this.lastExecutionTime = lastExecutionTime;    return true;
}// 取出任務(wù)protected Runnable pollTask() {    assert inEventLoop();    return pollTaskFrom(taskQueue);
}// 從隊列中取出任務(wù)protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {    for (;;) {
        Runnable task = taskQueue.poll();        if (task == WAKEUP_TASK) {            continue;
        }        return task;
    }
}// 將定時任務(wù)從定時隊列中取出,聚合到普通隊列中:private boolean fetchFromScheduledTaskQueue() {    // 得到nanoTime = 當(dāng)前時間 - ScheduledFutureTask的START_TIME(開始時間)
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();    // 獲得截止時間小于nanoTime的定時任務(wù)
    Runnable scheduledTask  = pollScheduledTask(nanoTime);    while (scheduledTask != null) {        // 將定時任務(wù)放入普通隊列中,以備運行
        if (!taskQueue.offer(scheduledTask)) {            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            // 如果 taskQueue 沒有足夠的空間,導(dǎo)致添加失敗,則將其返回定時任務(wù)隊列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);            return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }    return true;
}// 獲得截止時間小于nanoTime的定時任務(wù)protected final Runnable pollScheduledTask(long nanoTime) {    assert inEventLoop();	// 獲取定時任務(wù)隊列
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;    // 獲取第一個定時任務(wù)
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();    if (scheduledTask == null) {        return null;
    }	// 如果該定時任務(wù)的截止時間 <= nanoTime ,則返回
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        scheduledTaskQueue.remove();        return scheduledTask;
    }    return null;
}

“Netty NioEventLoop啟動過程是怎樣的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI