溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

HashedWheelTimer的作用是什么

發(fā)布時(shí)間:2021-06-24 09:58:14 來源:億速云 閱讀:186 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要講解了“HashedWheelTimer的作用是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“HashedWheelTimer的作用是什么”吧!

和同事討論一個(gè)定時(shí)審核的需求,運(yùn)營設(shè)定審核通過的時(shí)間,到了這個(gè)時(shí)間之后,相關(guān)內(nèi)容自動(dòng)審核通過,本是個(gè)小的需求,但是考慮到如果需要定時(shí)審核的東西很多,這樣大量的定時(shí)任務(wù)帶來的一系列問題,然后逐步的討論到了netty的HashedWheelTimer這個(gè)的實(shí)現(xiàn)。

方案一 單定時(shí)器方案

描述:

把所有需要定時(shí)審核的資源放到redis中,例如sorted set中,需要審核通過的時(shí)間作為score值。后臺(tái)啟動(dòng)一個(gè)定時(shí)器,定時(shí)輪詢sortedSet,當(dāng)score值小于當(dāng)前時(shí)間,則運(yùn)行任務(wù)審核通過。

問題

這個(gè)方案在小批量數(shù)據(jù)的情況下沒有問題,但是在大批量任務(wù)的情況下就會(huì)出現(xiàn)問題了,因?yàn)槊看味家喸內(nèi)康臄?shù)據(jù),逐個(gè)判斷是否需要執(zhí)行,一旦輪詢?nèi)蝿?wù)執(zhí)行比較長,就會(huì)出現(xiàn)任務(wù)無法按照定時(shí)的時(shí)間執(zhí)行的問題。

方案二 多定時(shí)器方案

描述

每個(gè)需要定時(shí)完成的任務(wù)都啟動(dòng)一個(gè)定時(shí)任務(wù),然后等待完成之后銷毀

問題

這個(gè)方案帶來的問題很明顯,定時(shí)任務(wù)比較多的情況下,會(huì)啟動(dòng)很多的線程,這樣服務(wù)器會(huì)承受不了之后崩潰?;旧喜粫?huì)采取這個(gè)方案。

方案三 借用redis的過期通知功能

描述

和方案一類似,針對(duì)每一個(gè)需要定時(shí)審核的任務(wù),設(shè)定過期時(shí)間,過期時(shí)間也就是審核通過的時(shí)間,訂閱redis的過期事件,當(dāng)這個(gè)事件發(fā)生時(shí),執(zhí)行相應(yīng)的審核通過任務(wù)。

問題

這個(gè)方案來說是借用了redis這種中間件來實(shí)現(xiàn)我們的功能,這中實(shí)際上屬于redis的發(fā)布訂閱功能中的一部分,針對(duì)redis發(fā)布訂閱功能是不推薦我們?cè)谏a(chǎn)環(huán)境中做業(yè)務(wù)操作的,通常redis內(nèi)部(例如redis集群節(jié)點(diǎn)上下線,選舉等等來使用),我們業(yè)務(wù)系統(tǒng)使用它的這個(gè)事件會(huì)產(chǎn)生如下兩個(gè)問題 1、redis發(fā)布訂閱的不穩(wěn)定問題 2、redid發(fā)布訂閱的可靠性問題 具體可以參考 https://my.oschina.net/u/2457218/blog/3065021 (redis的發(fā)布訂閱缺陷)

方案四 Hash分層記時(shí)輪算法

也許你和我一樣都是第一次聽說這個(gè)東西,這個(gè)東西就是專為大批量定時(shí)任務(wù)管理而生。具體論文詳見參考文獻(xiàn)[2]

算法概要

簡(jiǎn)要的說這個(gè)是一個(gè)輪,里面有指針,指針會(huì)根據(jù)設(shè)定的時(shí)間單位旋轉(zhuǎn),任務(wù)根據(jù)一些算法會(huì)落在相應(yīng)的槽位上。如下圖 HashedWheelTimer的作用是什么

首先會(huì)有一個(gè)輪,這個(gè)輪在這里分成了8個(gè)槽位,任務(wù)任務(wù)添加的時(shí)候會(huì)根據(jù)相應(yīng)的算法對(duì)槽位個(gè)數(shù)取模,得到任務(wù)會(huì)存儲(chǔ)在具體哪個(gè)槽位,每個(gè)槽位是一個(gè)鏈表結(jié)構(gòu),任務(wù)存儲(chǔ)了任務(wù)的過期時(shí)間(任務(wù)執(zhí)行時(shí)間),任務(wù)需要執(zhí)行需要指針轉(zhuǎn)的輪數(shù),指針(tick) 每間隔一個(gè)單位的時(shí)間會(huì)往下走一個(gè)槽位,然后會(huì)查詢這個(gè)槽位上的存儲(chǔ)的任務(wù),并且任務(wù)的存儲(chǔ)的剩余輪數(shù)會(huì)減一,當(dāng)剩余輪數(shù)小于等于零時(shí),就會(huì)開始執(zhí)行這個(gè)任務(wù),執(zhí)行之后會(huì)把任務(wù)從這個(gè)槽位上給刪除掉。

例如上圖: 槽位為8個(gè)槽位 Bucket 指針每個(gè)時(shí)間間隔(100ms)會(huì)往下走一個(gè)槽位,這個(gè)時(shí)間間隔叫做tickDuration 那相當(dāng)于每隔8*100ms=800ms,會(huì)輪詢一圈。

HashedWheelTimer

算法理解起來比較簡(jiǎn)單,并且也有成熟的實(shí)現(xiàn),那就是在netty中有一個(gè)HashedWheelTimer這個(gè)類,把這個(gè)算法實(shí)現(xiàn)了出來。接下來分析分析一下它的這個(gè)代碼。

初始化

在這個(gè)類上定義的有幾個(gè)比較重要的屬性

  /**
     *這個(gè)work是一個(gè)內(nèi)部類,實(shí)現(xiàn)了Runable接口,是比較核心的一個(gè)類,包裝了具體任務(wù)的運(yùn)行,把任務(wù)放到具體如何放到某個(gè)槽位上,指針往下走的具體方法,任務(wù)取消等。 
     */
    private final Worker worker = new Worker();
    /**
     *工作線程,這個(gè)就是整個(gè)HashedWheelTimer啟動(dòng)的起點(diǎn) 
     */
    private final Thread workerThread;

    /**
     *當(dāng)前任務(wù)的狀態(tài),1代表任務(wù)已經(jīng)開始執(zhí)行,0任務(wù)初始化,2任務(wù)已關(guān)閉 
     */
    public static final int WORKER_STATE_INIT = 0;
    public static final int WORKER_STATE_STARTED = 1;
    public static final int WORKER_STATE_SHUTDOWN = 2;

    /**
     *這個(gè)很核心的一個(gè)概念,就是指針往下走的單位,在HashedWheelTimer這個(gè)類中,默認(rèn)是100ms指針往下走一個(gè)單位 
     */
    private final long tickDuration;
    /**
     * 這個(gè)就是指的時(shí)間輪,有多少個(gè)槽位,wheel的大小就是多大,HashedWheelTimer中默認(rèn)槽位有512個(gè)
     */
    private final HashedWheelBucket[] wheel;
    /**
     * 主要輔助計(jì)算任務(wù)會(huì)存儲(chǔ)在哪個(gè)槽位上,mask =wheel.length-1 
     */
    private final int mask;

    /**
     *所有要執(zhí)行的任務(wù)的任務(wù)隊(duì)列 
     */
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    /**
     *所有要取消的任務(wù)的任務(wù)隊(duì)列 
     */
    private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
    /**
     *HashedWheelTimer實(shí)例開始運(yùn)行的時(shí)間,是納秒數(shù),開始時(shí)間是System.nanotime() 
     */
    private volatile long startTime;

這些屬性的定義和概念映射到上面時(shí)間輪算法上就是下圖的樣子了。 HashedWheelTimer的作用是什么

HashedWheelTimer初始化主要是在它的構(gòu)造函數(shù)中,提供了多種重載方式,只需要看最全的構(gòu)造函數(shù)即可。

/**
     * Creates a new timer.
     * @param threadFactory        執(zhí)行任務(wù)的工廠
     * @param tickDuration         指針往下走一步的時(shí)間間隔
     * @param unit                 指針往下走一步的時(shí)間單位,秒,毫秒。納秒等
     * @param ticksPerWheel        時(shí)間輪的大小,也就是槽位的個(gè)數(shù)
     */
    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        /**
         * 先校驗(yàn)參數(shù)的合法性,對(duì)threadFactory,時(shí)間單位,時(shí)間間隔,時(shí)間輪大小做了限制
         */
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // 創(chuàng)建槽位,實(shí)際上就是初始化HashedWheelBucket數(shù)組,直接new出來的
        wheel = createWheel(ticksPerWheel);
        //用來計(jì)算槽位的輔助變量,一會(huì)兒會(huì)在Worker中尋找槽位時(shí)使用到
        mask = wheel.length - 1;
        ...
        //初始化線程,是用threadFactory創(chuàng)建出來的一個(gè)worker線程
        workerThread = threadFactory.newThread(worker);

      ...

    }

任務(wù)添加和運(yùn)行

當(dāng)需要添加一個(gè)定時(shí)任務(wù)的時(shí)候,是通過newTimeout方法添加的,添加的任務(wù)必須實(shí)現(xiàn)TimerTask接口的run方法。任務(wù)添加之后,無需顯式的開啟任務(wù),添加之后任務(wù)會(huì)自動(dòng)開啟,等到了執(zhí)行的時(shí)間會(huì)被自動(dòng)執(zhí)行??蛻舳耸褂玫姆绞饺缦拢?/p>

@Test
    public void testRun() throws Exception{
        final CountDownLatch latch = new CountDownLatch(1);

        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("hello world");
            latch.countDown();
        }, 5, TimeUnit.SECONDS);

        latch.await();
        System.out.println("執(zhí)行結(jié)束");

    }

5秒鐘之后會(huì)被輸出"hello world",然后任務(wù)執(zhí)行完畢。既然任務(wù)的添加和執(zhí)行入口都是通過newTimeout這個(gè)方法搞定的,那就看一下這個(gè)方法里面有哪些秘密吧。

 @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ...
        start();
        ...
        /**
         * 可以看到任務(wù)存活的時(shí)間計(jì)算,當(dāng)前時(shí)間的毫秒數(shù)加上我們?cè)O(shè)定的時(shí)間,然后減去程序開始執(zhí)行的時(shí)間。這是一個(gè)時(shí)間段
         */
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

進(jìn)去看了之后,這個(gè)方法很簡(jiǎn)單,有兩個(gè)關(guān)鍵的方法調(diào)用 1、start(),這個(gè)方法主要是看當(dāng)前HashedWheelTimer的狀態(tài)是否已經(jīng)啟動(dòng),如果沒有啟動(dòng)則會(huì)調(diào)用workThread線程的啟動(dòng)方法。2、計(jì)算超時(shí)時(shí)間和任務(wù)添加。我們傳進(jìn)來的任務(wù)會(huì)被包裝成一個(gè)HashWheelTimeout這個(gè)類,包裝之后會(huì)把這個(gè)包裝類放到timeouts這個(gè)阻塞隊(duì)列中去,實(shí)際上這時(shí)候任務(wù)并沒有放到某個(gè)具體槽位中,只是先放到阻塞隊(duì)列中,等待work從這個(gè)隊(duì)列中取值然后放到具體的槽位上,HashedWheelTimer是一個(gè)雙向鏈表,上面圖中已經(jīng)有這個(gè)類的類圖結(jié)構(gòu),再貼一次: HashedWheelTimer的作用是什么

我們傳進(jìn)來的任務(wù)就是它的task屬性,然后會(huì)根據(jù)當(dāng)前時(shí)間、過期時(shí)間和任務(wù)開始時(shí)間計(jì)算出它的deadline,同事計(jì)算出它剩余的輪數(shù)(remainingRounds)。
任務(wù)執(zhí)行實(shí)際上是調(diào)用的它的expire方法。當(dāng)expire的時(shí)候會(huì)調(diào)用具體的業(yè)務(wù)任務(wù)的run方法。

HashedWheelTimer的expire方法是什么時(shí)候被執(zhí)行的呢。上面也也說到在HashedWheelTimer中有一個(gè)workThread,這里面會(huì)運(yùn)行work。能讀到這個(gè)地方來的人應(yīng)該很少了吧,不過能到這個(gè)地方你是幸運(yùn)的,因?yàn)閣ork這個(gè)類也就是實(shí)現(xiàn)這個(gè)算法中最核心的一個(gè)類了,先來概覽一下這個(gè)類 HashedWheelTimer的作用是什么

這個(gè)類實(shí)現(xiàn)了Runable接口,也就說是一個(gè)線程類,然后它會(huì)被workTread調(diào)用執(zhí)行啟動(dòng)。

  • transferTimeoutsToBuckets 把新加入的定時(shí)任務(wù)從阻塞隊(duì)列中取出然后放到相應(yīng)的bucket中

  • processCancelledTasks 把取消的定時(shí)任務(wù)從阻塞隊(duì)列中取出,然后從相應(yīng)的bucket中remove掉

  • waitForNextTick 指針往下走的方法,經(jīng)過一個(gè)時(shí)間單位,指針會(huì)往下走,指向下一個(gè)bucket

run方法會(huì)一直循環(huán)從阻塞隊(duì)列中取值,然后放到bucket中,指針循環(huán)往下走,對(duì)remainderRounds對(duì)于0的任務(wù)進(jìn)行執(zhí)行,不是0的減一

do {
                /**
                 * 里面是一個(gè)Thread.sleep操作,模擬指針一步一步往下走的操作。
                 */
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    /**
                     * 計(jì)算任務(wù)將要落到槽位,這本應(yīng)該是個(gè)取模運(yùn)算,不過這里用了一個(gè)小技巧,就是把取模運(yùn)算換為了“按位與”,因?yàn)椤鞍次慌c”要比取模運(yùn)算快的多,
                     * 這個(gè)技巧就是當(dāng)mast的值為2的n次方-1時(shí),能達(dá)到取模的效果。這里要感謝一下王洪濤的分享
                     */
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    //取到具體bucket,然后把任務(wù)放從阻塞隊(duì)列中拿到,放到bucket中
                    HashedWheelBucket bucket =
                            wheel[idx];

                    transferTimeoutsToBuckets();
                    //這里面會(huì)調(diào)用所有HashedWheelTimeout的方法,就是看他的剩余的輪數(shù)是不是大于0,如果是的話則會(huì)被執(zhí)行,不是的話剩余輪數(shù)減1
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

后繼

當(dāng)然HashedWheelTimer這個(gè)類屬于全內(nèi)存任務(wù)計(jì)算,通常在我們真正的業(yè)務(wù)中,是不會(huì)把這些任務(wù)直接放到j(luò)vm內(nèi)存中的,要不然重啟之后任務(wù)不都會(huì)消失了么,這樣我們需要重寫HashedWheelTimer,只需要對(duì)它任務(wù)的添加和獲取進(jìn)行重寫到相應(yīng)的持久化中間件中即可(例如數(shù)據(jù)庫或者es等等)

參考和引用

[1][redis的發(fā)布訂閱缺陷]

[[2]][Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil] [Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil]: http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf "Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil"

[[3]][Hashed and Hierarchical Timing Wheels] [Hashed and Hierarchical Timing Wheels]: http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt "Hashed and Hierarchical Timing Wheels"

感謝各位的閱讀,以上就是“HashedWheelTimer的作用是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)HashedWheelTimer的作用是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI