溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行的方法

發(fā)布時(shí)間:2022-03-25 16:14:22 來源:億速云 閱讀:103 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行的方法”的相關(guān)知識,小編通過實(shí)際案例向大家展示操作過程,操作方法簡單快捷,實(shí)用性強(qiáng),希望這篇“Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行的方法”文章能幫助大家解決問題。

執(zhí)行任務(wù)隊(duì)列

繼續(xù)回到NioEventLoop的run()方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //輪詢io事件(1)
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //默認(rèn)是50
            final int ioRatio = this.ioRatio; 
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                //記錄下開始時(shí)間
                final long ioStartTime = System.nanoTime();
                try {
                    //處理輪詢到的key(2)
                    processSelectedKeys();
                } finally {
                    //計(jì)算耗時(shí)
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //執(zhí)行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        //代碼省略
    }
}

我們看到處理完輪詢到的key之后, 首先記錄下耗時(shí), 然后通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執(zhí)行taskQueue中的任務(wù)

我們知道ioRatio默認(rèn)是50, 所以執(zhí)行完ioTime * (100 - ioRatio) / ioRatio后, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執(zhí)行時(shí)間:

跟進(jìn)runAllTasks方法:

protected boolean runAllTasks(long timeoutNanos) {
    //定時(shí)任務(wù)隊(duì)列中聚合任務(wù)
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個(gè)任務(wù)
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務(wù)執(zhí)行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊(duì)列不為空
    //首先算一個(gè)截止時(shí)間(+50毫秒, 因?yàn)閳?zhí)行任務(wù), 不要超過這個(gè)時(shí)間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執(zhí)行每一個(gè)任務(wù)
    for (;;) {
        safeExecute(task);
        //標(biāo)記當(dāng)前跑完的任務(wù)
        runTasks ++;
        //當(dāng)跑完64個(gè)任務(wù)的時(shí)候, 會計(jì)算一下當(dāng)前時(shí)間
        if ((runTasks & 0x3F) == 0) {
            //定時(shí)任務(wù)初始化到當(dāng)前的時(shí)間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時(shí)間則不執(zhí)行(nanoTime()是耗時(shí)的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個(gè)時(shí)間, 則繼續(xù)從普通任務(wù)隊(duì)列拿任務(wù)
        task = pollTask();
        //直到?jīng)]有任務(wù)執(zhí)行
        if (task == null) {
            //記錄下最后執(zhí)行時(shí)間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先會執(zhí)行fetchFromScheduledTaskQueue()這個(gè)方法, 這個(gè)方法的意思是從定時(shí)任務(wù)隊(duì)列中聚合任務(wù), 也就是將定時(shí)任務(wù)中找到可以執(zhí)行的任務(wù)添加到taskQueue中

我們跟進(jìn)fetchFromScheduledTaskQueue()方法

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //從定時(shí)任務(wù)隊(duì)列中抓取第一個(gè)定時(shí)任務(wù)
    //尋找截止時(shí)間為nanoTime的任務(wù)
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果該定時(shí)任務(wù)隊(duì)列不為空, 則塞到普通任務(wù)隊(duì)列里面
    while (scheduledTask != null) {
        //如果添加到普通任務(wù)隊(duì)列過程中失敗
        if (!taskQueue.offer(scheduledTask)) {
            //則重新添加到定時(shí)任務(wù)隊(duì)列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //繼續(xù)從定時(shí)任務(wù)隊(duì)列中拉取任務(wù)
        //方法執(zhí)行完成之后, 所有符合運(yùn)行條件的定時(shí)任務(wù)隊(duì)列, 都添加到了普通任務(wù)隊(duì)列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

 long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表從定時(shí)任務(wù)初始化到現(xiàn)在過去了多長時(shí)間

 Runnable scheduledTask= pollScheduledTask(nanoTime) 代表從定時(shí)任務(wù)隊(duì)列中拿到小于nanoTime時(shí)間的任務(wù), 因?yàn)樾∮诔跏蓟浆F(xiàn)在的時(shí)間, 說明該任務(wù)需要執(zhí)行了

跟到其父類AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();
    //拿到定時(shí)任務(wù)隊(duì)列
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    //peek()方法拿到第一個(gè)任務(wù)
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        //從隊(duì)列中刪除
        scheduledTaskQueue.remove();
        //返回該任務(wù)
        return scheduledTask;
    }
    return null;
}

我們看到首先獲得當(dāng)前類綁定的定時(shí)任務(wù)隊(duì)列的成員變量

如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個(gè)任務(wù)

如果當(dāng)前任務(wù)小于傳來的時(shí)間, 說明該任務(wù)需要執(zhí)行, 則從定時(shí)任務(wù)隊(duì)列中刪除

我們繼續(xù)回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //從定時(shí)任務(wù)隊(duì)列中抓取第一個(gè)定時(shí)任務(wù)
    //尋找截止時(shí)間為nanoTime的任務(wù)
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果該定時(shí)任務(wù)隊(duì)列不為空, 則塞到普通任務(wù)隊(duì)列里面
    while (scheduledTask != null) {
        //如果添加到普通任務(wù)隊(duì)列過程中失敗
        if (!taskQueue.offer(scheduledTask)) {
            //則重新添加到定時(shí)任務(wù)隊(duì)列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //繼續(xù)從定時(shí)任務(wù)隊(duì)列中拉取任務(wù)
        //方法執(zhí)行完成之后, 所有符合運(yùn)行條件的定時(shí)任務(wù)隊(duì)列, 都添加到了普通任務(wù)隊(duì)列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

彈出需要執(zhí)行的定時(shí)任務(wù)之后, 我們通過taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失敗, 則通過scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定時(shí)任務(wù)隊(duì)列中

如果添加成功, 則通過pollScheduledTask(nanoTime)方法繼續(xù)添加, 直到?jīng)]有需要執(zhí)行的任務(wù)

這樣就將定時(shí)任務(wù)隊(duì)列需要執(zhí)行的任務(wù)添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中

protected boolean runAllTasks(long timeoutNanos) {
    //定時(shí)任務(wù)隊(duì)列中聚合任務(wù)
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個(gè)任務(wù)
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務(wù)執(zhí)行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊(duì)列不為空
    //首先算一個(gè)截止時(shí)間(+50毫秒, 因?yàn)閳?zhí)行任務(wù), 不要超過這個(gè)時(shí)間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執(zhí)行每一個(gè)任務(wù)
    for (;;) {
        safeExecute(task);
        //標(biāo)記當(dāng)前跑完的任務(wù)
        runTasks ++;
        //當(dāng)跑完64個(gè)任務(wù)的時(shí)候, 會計(jì)算一下當(dāng)前時(shí)間
        if ((runTasks & 0x3F) == 0) {
            //定時(shí)任務(wù)初始化到當(dāng)前的時(shí)間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時(shí)間則不執(zhí)行(nanoTime()是耗時(shí)的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個(gè)時(shí)間, 則繼續(xù)從普通任務(wù)隊(duì)列拿任務(wù)
        task = pollTask();
        //直到?jīng)]有任務(wù)執(zhí)行
        if (task == null) {
            //記錄下最后執(zhí)行時(shí)間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先通過 Runnable task = pollTask() 從taskQueue中拿一個(gè)任務(wù)

任務(wù)不為空, 則通過 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 計(jì)算一個(gè)截止時(shí)間, 任務(wù)的執(zhí)行時(shí)間不能超過這個(gè)時(shí)間

然后在for循環(huán)中通過safeExecute(task)執(zhí)行task

我們跟到safeExecute(task)中:

protected static void safeExecute(Runnable task) {
    try {
        //直接調(diào)用run()方法執(zhí)行
        task.run();
    } catch (Throwable t) {
        //發(fā)生異常不終止
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

這里直接調(diào)用task的run()方法進(jìn)行執(zhí)行, 其中發(fā)生異常, 只打印一條日志, 代表發(fā)生異常不終止, 繼續(xù)往下執(zhí)行

回到runAllTasks(long timeoutNanos)方法:

protected boolean runAllTasks(long timeoutNanos) {
    //定時(shí)任務(wù)隊(duì)列中聚合任務(wù)
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個(gè)任務(wù)
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務(wù)執(zhí)行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊(duì)列不為空
    //首先算一個(gè)截止時(shí)間(+50毫秒, 因?yàn)閳?zhí)行任務(wù), 不要超過這個(gè)時(shí)間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執(zhí)行每一個(gè)任務(wù)
    for (;;) {
        safeExecute(task);
        //標(biāo)記當(dāng)前跑完的任務(wù)
        runTasks ++;
        //當(dāng)跑完64個(gè)任務(wù)的時(shí)候, 會計(jì)算一下當(dāng)前時(shí)間
        if ((runTasks & 0x3F) == 0) {
            //定時(shí)任務(wù)初始化到當(dāng)前的時(shí)間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時(shí)間則不執(zhí)行(nanoTime()是耗時(shí)的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個(gè)時(shí)間, 則繼續(xù)從普通任務(wù)隊(duì)列拿任務(wù)
        task = pollTask();
        //直到?jīng)]有任務(wù)執(zhí)行
        if (task == null) {
            //記錄下最后執(zhí)行時(shí)間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

每次執(zhí)行完task, runTasks自增

這里 if ((runTasks & 0x3F) == 0) 代表是否執(zhí)行了64個(gè)任務(wù), 如果執(zhí)行了64個(gè)任務(wù), 則會通過 lastExecutionTime = ScheduledFutureTask.nanoTime() 記錄定時(shí)任務(wù)初始化到現(xiàn)在的時(shí)間, 如果這個(gè)時(shí)間超過了截止時(shí)間, 則退出循環(huán)

如果沒有超過截止時(shí)間, 則通過 task = pollTask() 繼續(xù)彈出任務(wù)執(zhí)行

這里執(zhí)行64個(gè)任務(wù)統(tǒng)計(jì)一次時(shí)間, 而不是每次執(zhí)行任務(wù)都統(tǒng)計(jì), 主要原因是因?yàn)楂@取系統(tǒng)時(shí)間是個(gè)比較耗時(shí)的操作, 這里是netty的一種優(yōu)化方式

如果沒有task需要執(zhí)行, 則通過afterRunningAllTasks()做收尾工作, 最后記錄下最后的執(zhí)行時(shí)間

關(guān)于“Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行的方法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識,可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點(diǎn)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI