您好,登錄后才能下訂單哦!
這篇文章主要介紹“Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行的方法”的相關(guān)知識,小編通過實(shí)際案例向大家展示操作過程,操作方法簡單快捷,實(shí)用性強(qiáng),希望這篇“Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行的方法”文章能幫助大家解決問題。
繼續(xù)回到NioEventLoop的run()方法:
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //輪詢io事件(1) select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默認(rèn)是50 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //記錄下開始時(shí)間 final long ioStartTime = System.nanoTime(); try { //處理輪詢到的key(2) processSelectedKeys(); } finally { //計(jì)算耗時(shí) final long ioTime = System.nanoTime() - ioStartTime; //執(zhí)行task(3) runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代碼省略 } }
我們看到處理完輪詢到的key之后, 首先記錄下耗時(shí), 然后通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執(zhí)行taskQueue中的任務(wù)
我們知道ioRatio默認(rèn)是50, 所以執(zhí)行完ioTime * (100 - ioRatio) / ioRatio后, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執(zhí)行時(shí)間:
protected boolean runAllTasks(long timeoutNanos) { //定時(shí)任務(wù)隊(duì)列中聚合任務(wù) fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個(gè)任務(wù) Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務(wù)執(zhí)行收尾的操作 afterRunningAllTasks(); return false; } //如果隊(duì)列不為空 //首先算一個(gè)截止時(shí)間(+50毫秒, 因?yàn)閳?zhí)行任務(wù), 不要超過這個(gè)時(shí)間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執(zhí)行每一個(gè)任務(wù) for (;;) { safeExecute(task); //標(biāo)記當(dāng)前跑完的任務(wù) runTasks ++; //當(dāng)跑完64個(gè)任務(wù)的時(shí)候, 會計(jì)算一下當(dāng)前時(shí)間 if ((runTasks & 0x3F) == 0) { //定時(shí)任務(wù)初始化到當(dāng)前的時(shí)間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時(shí)間則不執(zhí)行(nanoTime()是耗時(shí)的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個(gè)時(shí)間, 則繼續(xù)從普通任務(wù)隊(duì)列拿任務(wù) task = pollTask(); //直到?jīng)]有任務(wù)執(zhí)行 if (task == null) { //記錄下最后執(zhí)行時(shí)間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先會執(zhí)行fetchFromScheduledTaskQueue()這個(gè)方法, 這個(gè)方法的意思是從定時(shí)任務(wù)隊(duì)列中聚合任務(wù), 也就是將定時(shí)任務(wù)中找到可以執(zhí)行的任務(wù)添加到taskQueue中
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //從定時(shí)任務(wù)隊(duì)列中抓取第一個(gè)定時(shí)任務(wù) //尋找截止時(shí)間為nanoTime的任務(wù) Runnable scheduledTask = pollScheduledTask(nanoTime); //如果該定時(shí)任務(wù)隊(duì)列不為空, 則塞到普通任務(wù)隊(duì)列里面 while (scheduledTask != null) { //如果添加到普通任務(wù)隊(duì)列過程中失敗 if (!taskQueue.offer(scheduledTask)) { //則重新添加到定時(shí)任務(wù)隊(duì)列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //繼續(xù)從定時(shí)任務(wù)隊(duì)列中拉取任務(wù) //方法執(zhí)行完成之后, 所有符合運(yùn)行條件的定時(shí)任務(wù)隊(duì)列, 都添加到了普通任務(wù)隊(duì)列中 scheduledTask = pollScheduledTask(nanoTime); } return true; }
long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表從定時(shí)任務(wù)初始化到現(xiàn)在過去了多長時(shí)間
Runnable scheduledTask= pollScheduledTask(nanoTime) 代表從定時(shí)任務(wù)隊(duì)列中拿到小于nanoTime時(shí)間的任務(wù), 因?yàn)樾∮诔跏蓟浆F(xiàn)在的時(shí)間, 說明該任務(wù)需要執(zhí)行了
跟到其父類AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //拿到定時(shí)任務(wù)隊(duì)列 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; //peek()方法拿到第一個(gè)任務(wù) ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { //從隊(duì)列中刪除 scheduledTaskQueue.remove(); //返回該任務(wù) return scheduledTask; } return null; }
我們看到首先獲得當(dāng)前類綁定的定時(shí)任務(wù)隊(duì)列的成員變量
如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個(gè)任務(wù)
如果當(dāng)前任務(wù)小于傳來的時(shí)間, 說明該任務(wù)需要執(zhí)行, 則從定時(shí)任務(wù)隊(duì)列中刪除
我們繼續(xù)回到fetchFromScheduledTaskQueue()方法中:
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //從定時(shí)任務(wù)隊(duì)列中抓取第一個(gè)定時(shí)任務(wù) //尋找截止時(shí)間為nanoTime的任務(wù) Runnable scheduledTask = pollScheduledTask(nanoTime); //如果該定時(shí)任務(wù)隊(duì)列不為空, 則塞到普通任務(wù)隊(duì)列里面 while (scheduledTask != null) { //如果添加到普通任務(wù)隊(duì)列過程中失敗 if (!taskQueue.offer(scheduledTask)) { //則重新添加到定時(shí)任務(wù)隊(duì)列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //繼續(xù)從定時(shí)任務(wù)隊(duì)列中拉取任務(wù) //方法執(zhí)行完成之后, 所有符合運(yùn)行條件的定時(shí)任務(wù)隊(duì)列, 都添加到了普通任務(wù)隊(duì)列中 scheduledTask = pollScheduledTask(nanoTime); } return true; }
彈出需要執(zhí)行的定時(shí)任務(wù)之后, 我們通過taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失敗, 則通過scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定時(shí)任務(wù)隊(duì)列中
如果添加成功, 則通過pollScheduledTask(nanoTime)方法繼續(xù)添加, 直到?jīng)]有需要執(zhí)行的任務(wù)
這樣就將定時(shí)任務(wù)隊(duì)列需要執(zhí)行的任務(wù)添加到了taskQueue中
protected boolean runAllTasks(long timeoutNanos) { //定時(shí)任務(wù)隊(duì)列中聚合任務(wù) fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個(gè)任務(wù) Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務(wù)執(zhí)行收尾的操作 afterRunningAllTasks(); return false; } //如果隊(duì)列不為空 //首先算一個(gè)截止時(shí)間(+50毫秒, 因?yàn)閳?zhí)行任務(wù), 不要超過這個(gè)時(shí)間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執(zhí)行每一個(gè)任務(wù) for (;;) { safeExecute(task); //標(biāo)記當(dāng)前跑完的任務(wù) runTasks ++; //當(dāng)跑完64個(gè)任務(wù)的時(shí)候, 會計(jì)算一下當(dāng)前時(shí)間 if ((runTasks & 0x3F) == 0) { //定時(shí)任務(wù)初始化到當(dāng)前的時(shí)間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時(shí)間則不執(zhí)行(nanoTime()是耗時(shí)的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個(gè)時(shí)間, 則繼續(xù)從普通任務(wù)隊(duì)列拿任務(wù) task = pollTask(); //直到?jīng)]有任務(wù)執(zhí)行 if (task == null) { //記錄下最后執(zhí)行時(shí)間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先通過 Runnable task = pollTask() 從taskQueue中拿一個(gè)任務(wù)
任務(wù)不為空, 則通過 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 計(jì)算一個(gè)截止時(shí)間, 任務(wù)的執(zhí)行時(shí)間不能超過這個(gè)時(shí)間
然后在for循環(huán)中通過safeExecute(task)執(zhí)行task
我們跟到safeExecute(task)中:
protected static void safeExecute(Runnable task) { try { //直接調(diào)用run()方法執(zhí)行 task.run(); } catch (Throwable t) { //發(fā)生異常不終止 logger.warn("A task raised an exception. Task: {}", task, t); } }
這里直接調(diào)用task的run()方法進(jìn)行執(zhí)行, 其中發(fā)生異常, 只打印一條日志, 代表發(fā)生異常不終止, 繼續(xù)往下執(zhí)行
回到runAllTasks(long timeoutNanos)方法:
protected boolean runAllTasks(long timeoutNanos) { //定時(shí)任務(wù)隊(duì)列中聚合任務(wù) fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個(gè)任務(wù) Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務(wù)執(zhí)行收尾的操作 afterRunningAllTasks(); return false; } //如果隊(duì)列不為空 //首先算一個(gè)截止時(shí)間(+50毫秒, 因?yàn)閳?zhí)行任務(wù), 不要超過這個(gè)時(shí)間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執(zhí)行每一個(gè)任務(wù) for (;;) { safeExecute(task); //標(biāo)記當(dāng)前跑完的任務(wù) runTasks ++; //當(dāng)跑完64個(gè)任務(wù)的時(shí)候, 會計(jì)算一下當(dāng)前時(shí)間 if ((runTasks & 0x3F) == 0) { //定時(shí)任務(wù)初始化到當(dāng)前的時(shí)間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時(shí)間則不執(zhí)行(nanoTime()是耗時(shí)的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個(gè)時(shí)間, 則繼續(xù)從普通任務(wù)隊(duì)列拿任務(wù) task = pollTask(); //直到?jīng)]有任務(wù)執(zhí)行 if (task == null) { //記錄下最后執(zhí)行時(shí)間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
每次執(zhí)行完task, runTasks自增
這里 if ((runTasks & 0x3F) == 0) 代表是否執(zhí)行了64個(gè)任務(wù), 如果執(zhí)行了64個(gè)任務(wù), 則會通過 lastExecutionTime = ScheduledFutureTask.nanoTime() 記錄定時(shí)任務(wù)初始化到現(xiàn)在的時(shí)間, 如果這個(gè)時(shí)間超過了截止時(shí)間, 則退出循環(huán)
如果沒有超過截止時(shí)間, 則通過 task = pollTask() 繼續(xù)彈出任務(wù)執(zhí)行
這里執(zhí)行64個(gè)任務(wù)統(tǒng)計(jì)一次時(shí)間, 而不是每次執(zhí)行任務(wù)都統(tǒng)計(jì), 主要原因是因?yàn)楂@取系統(tǒng)時(shí)間是個(gè)比較耗時(shí)的操作, 這里是netty的一種優(yōu)化方式
如果沒有task需要執(zhí)行, 則通過afterRunningAllTasks()做收尾工作, 最后記錄下最后的執(zhí)行時(shí)間
關(guān)于“Netty分布式NioEventLoop任務(wù)隊(duì)列執(zhí)行的方法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識,可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點(diǎn)。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。