溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪

發(fā)布時(shí)間:2021-07-06 15:55:49 來(lái)源:億速云 閱讀:203 作者:Leah 欄目:云計(jì)算

Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。


簡(jiǎn)單時(shí)間輪

在時(shí)間輪中存儲(chǔ)任務(wù)的是一個(gè)環(huán)形隊(duì)列,底層采用數(shù)組實(shí)現(xiàn),數(shù)組中的每個(gè)元素可以存放一個(gè)定時(shí)任務(wù)列表。定時(shí)任務(wù)列表是一個(gè)環(huán)形的雙向鏈表,鏈表中的每一項(xiàng)表示的都是定時(shí)任務(wù)項(xiàng),其中封裝了真正的定時(shí)任務(wù)。

時(shí)間輪由多個(gè)時(shí)間格組成,每個(gè)時(shí)間格代表當(dāng)前時(shí)間輪的基本時(shí)間跨度(tickMs)。時(shí)間輪的時(shí)間格個(gè)數(shù)是固定的,可用 wheelSize 來(lái)表示,那么整個(gè)時(shí)間輪的總體時(shí)間跨度(interval)可以通過(guò)公式 tickMs×wheelSize 計(jì)算得出。

時(shí)間輪還有一個(gè)表盤指針(currentTime),用來(lái)表示時(shí)間輪當(dāng)前所處的時(shí)間,currentTime 是 tickMs 的整數(shù)倍。currentTime指向的地方是表示到期的時(shí)間格,表示需要處理的時(shí)間格所對(duì)應(yīng)的鏈表中的所有任務(wù)。

如下圖是一個(gè)tickMs為1s,wheelSize等于10的時(shí)間輪,每一格里面放的是一個(gè)定時(shí)任務(wù)鏈表,鏈表里面存有真正的任務(wù)項(xiàng):

Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪

初始情況下表盤指針 currentTime 指向時(shí)間格0,若時(shí)間輪的 tickMs 為 1ms 且 wheelSize 等于10,那么interval則等于10s。如下圖此時(shí)有一個(gè)定時(shí)為2s的任務(wù)插進(jìn)來(lái)會(huì)存放到時(shí)間格為2的任務(wù)鏈表中,用紅色標(biāo)記。隨著時(shí)間的不斷推移,指針 currentTime 不斷向前推進(jìn),如果過(guò)了2s,那么 currentTime 會(huì)指向時(shí)間格2的位置,會(huì)將此時(shí)間格的任務(wù)鏈表獲取出來(lái)處理。

Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪

如果當(dāng)前的指針 currentTime 指向的是2,此時(shí)如果插入一個(gè)9s的任務(wù)進(jìn)來(lái),那么新來(lái)的任務(wù)會(huì)服用原來(lái)的時(shí)間格鏈表,會(huì)存放到時(shí)間格1中

Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪

這里所講的時(shí)間輪都是簡(jiǎn)單時(shí)間輪,只有一層,總體時(shí)間范圍在 currentTime 和 currentTime+interval 之間。如果現(xiàn)在有一個(gè)15s的定時(shí)任務(wù)是需要重新開(kāi)啟一個(gè)時(shí)間輪,設(shè)置一個(gè)時(shí)間跨度至少為15s的時(shí)間輪才夠用。但是這樣擴(kuò)充是沒(méi)有底線的,如果需要一個(gè)1萬(wàn)秒的時(shí)間輪,那么就需要一個(gè)這么大的數(shù)組去存放,不僅占用很大的內(nèi)存空間,而且也會(huì)因?yàn)樾枰闅v這么大的數(shù)組從而拉低效率。

因此引入了層級(jí)時(shí)間輪的概念。

層級(jí)時(shí)間輪

如圖是一個(gè)兩層的時(shí)間輪,第二層時(shí)間輪也是由10個(gè)時(shí)間格組成,每個(gè)時(shí)間格的跨度是10s。第二層的時(shí)間輪的 tickMs 為第一層時(shí)間輪的 interval,即10s。每一層時(shí)間輪的 wheelSize 是固定的,都是10,那么第二層的時(shí)間輪的總體時(shí)間跨度 interval 為100s。

圖中展示了每個(gè)時(shí)間格對(duì)應(yīng)的過(guò)期時(shí)間范圍, 我們可以清晰地看到, 第二層時(shí)間輪的第0個(gè)時(shí)間格的過(guò)期時(shí)間范圍是 [0,9]。也就是說(shuō), 第二層時(shí)間輪的一個(gè)時(shí)間格就可以表示第一層時(shí)間輪的所有(10個(gè))時(shí)間格;

如果向該時(shí)間輪中添加一個(gè)15s的任務(wù),那么當(dāng)?shù)谝粚訒r(shí)間輪容納不下時(shí),進(jìn)入第二層時(shí)間輪,并插入到過(guò)期時(shí)間為[10,19]的時(shí)間格中。

Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪

隨著時(shí)間的流逝,當(dāng)原本15s的任務(wù)還剩下5s的時(shí)候,這里就有一個(gè)時(shí)間輪降級(jí)的操作,此時(shí)第一層時(shí)間輪的總體時(shí)間跨度已足夠,此任務(wù)被添加到第一層時(shí)間輪到期時(shí)間為5的時(shí)間格中,之后再經(jīng)歷5s后,此任務(wù)真正到期,最終執(zhí)行相應(yīng)的到期操作。

代碼實(shí)現(xiàn)

因?yàn)槲覀冞@個(gè)Go語(yǔ)言版本的時(shí)間輪代碼是仿照Kafka寫的,所以在具體實(shí)現(xiàn)時(shí)間輪 TimingWheel 時(shí)還有一些小細(xì)節(jié):

  • 時(shí)間輪的時(shí)間格中每個(gè)鏈表會(huì)有一個(gè)root節(jié)點(diǎn)用于簡(jiǎn)化邊界條件。它是一個(gè)附加的鏈表節(jié)點(diǎn),該節(jié)點(diǎn)作為第一個(gè)節(jié)點(diǎn),它的值域中并不存儲(chǔ)任何東西,只是為了操作的方便而引入的;

  • 除了第一層時(shí)間輪,其余高層時(shí)間輪的起始時(shí)間(startMs)都設(shè)置為創(chuàng)建此層時(shí)間輪時(shí)前面第一輪的 currentTime。每一層的 currentTime 都必須是 tickMs 的整數(shù)倍,如果不滿足則會(huì)將 currentTime 修剪為 tickMs 的整數(shù)倍。修剪方法為:currentTime = startMs - (startMs % tickMs);

  • Kafka 中的定時(shí)器只需持有 TimingWheel 的第一層時(shí)間輪的引用,并不會(huì)直接持有其他高層的時(shí)間輪,但每一層時(shí)間輪都會(huì)有一個(gè)引用(overflowWheel)指向更高一層的應(yīng)用;

  • Kafka 中的定時(shí)器使用了 DelayQueue 來(lái)協(xié)助推進(jìn)時(shí)間輪。在操作中會(huì)將每個(gè)使用到的時(shí)間格中每個(gè)鏈表都加入 DelayQueue,DelayQueue 會(huì)根據(jù)時(shí)間輪對(duì)應(yīng)的過(guò)期時(shí)間 expiration 來(lái)排序,最短 expiration 的任務(wù)會(huì)被排在 DelayQueue 的隊(duì)頭,通過(guò)單獨(dú)線程來(lái)獲取 DelayQueue 中到期的任務(wù);

結(jié)構(gòu)體

type TimingWheel struct {
	// 時(shí)間跨度,單位是毫秒
	tick      int64 // in milliseconds
	// 時(shí)間輪個(gè)數(shù)
	wheelSize int64
	// 總跨度
	interval    int64 // in milliseconds
	// 當(dāng)前指針指向時(shí)間
	currentTime int64 // in milliseconds
	// 時(shí)間格列表
	buckets     []*bucket
	// 延遲隊(duì)列
	queue       *delayqueue.DelayQueue 
	// 上級(jí)的時(shí)間輪引用
	overflowWheel unsafe.Pointer // type: *TimingWheel

	exitC     chan struct{}
	waitGroup waitGroupWrapper
}

tick、wheelSize、interval、currentTime都比較好理解,buckets字段代表的是時(shí)間格列表,queue是一個(gè)延遲隊(duì)列,所有的任務(wù)都是通過(guò)延遲隊(duì)列來(lái)進(jìn)行觸發(fā),overflowWheel是上層時(shí)間輪的引用。

type bucket struct {
	// 任務(wù)的過(guò)期時(shí)間
	expiration int64

	mu     sync.Mutex
	// 相同過(guò)期時(shí)間的任務(wù)隊(duì)列
	timers *list.List
}

bucket里面實(shí)際上封裝的是時(shí)間格里面的任務(wù)隊(duì)列,里面放入的是相同過(guò)期時(shí)間的任務(wù),到期后會(huì)將隊(duì)列timers拿出來(lái)進(jìn)行處理。這里有個(gè)有意思的地方是由于會(huì)有多個(gè)線程并發(fā)的訪問(wèn)bucket,所以需要用到原子類來(lái)獲取int64位的值,為了保證32位系統(tǒng)上面讀取64位數(shù)據(jù)的一致性,需要進(jìn)行64位對(duì)齊。具體的可以看這篇:https://www.luozhiyun.com/archives/429,講的是對(duì)內(nèi)存對(duì)齊的思考。

type Timer struct {
  // 到期時(shí)間
	expiration int64 // in milliseconds
  // 要被執(zhí)行的具體任務(wù)
	task       func()
	// Timer所在bucket的指針
	b unsafe.Pointer // type: *bucket
	// bucket列表中對(duì)應(yīng)的元素
	element *list.Element
}

Timer是時(shí)間輪的最小執(zhí)行單元,是定時(shí)任務(wù)的封裝,到期后會(huì)調(diào)用task來(lái)執(zhí)行任務(wù)。

Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪

初始化時(shí)間輪

例如現(xiàn)在初始化一個(gè)tick是1s,wheelSize是10的時(shí)間輪:

func main() {
	tw := timingwheel.NewTimingWheel(time.Second, 10)
	tw.Start() 
}

func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {
  // 將傳入的tick轉(zhuǎn)化成毫秒
	tickMs := int64(tick / time.Millisecond)
  // 如果小于零,那么panic
	if tickMs <= 0 {
		panic(errors.New("tick must be greater than or equal to 1ms"))
	}
	// 設(shè)置開(kāi)始時(shí)間
	startMs := timeToMs(time.Now().UTC())
	// 初始化TimingWheel
	return newTimingWheel(
		tickMs,
		wheelSize,
		startMs,
		delayqueue.New(int(wheelSize)),
	)
}

func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel {
  // 初始化buckets的大小
	buckets := make([]*bucket, wheelSize)
	for i := range buckets {
		buckets[i] = newBucket()
	}
  // 實(shí)例化TimingWheel
	return &TimingWheel{
		tick:        tickMs,
		wheelSize:   wheelSize,
    // currentTime必須是tickMs的倍數(shù),所以這里使用truncate進(jìn)行修剪
		currentTime: truncate(startMs, tickMs),
		interval:    tickMs * wheelSize,
		buckets:     buckets,
		queue:       queue,
		exitC:       make(chan struct{}),
	}
}

初始化十分簡(jiǎn)單,大家可以看看上面的代碼注釋即可。

啟動(dòng)時(shí)間輪

下面我們看看start方法:

func (tw *TimingWheel) Start() {
	// Poll會(huì)執(zhí)行一個(gè)無(wú)限循環(huán),將到期的元素放入到queue的C管道中
	tw.waitGroup.Wrap(func() {
		tw.queue.Poll(tw.exitC, func() int64 {
			return timeToMs(time.Now().UTC())
		})
	})
	// 開(kāi)啟無(wú)限循環(huán)獲取queue中C的數(shù)據(jù)
	tw.waitGroup.Wrap(func() {
		for {
			select {
			// 從隊(duì)列里面出來(lái)的數(shù)據(jù)都是到期的bucket
			case elem := <-tw.queue.C:
				b := elem.(*bucket)
				// 時(shí)間輪會(huì)將當(dāng)前時(shí)間 currentTime 往前移動(dòng)到 bucket的到期時(shí)間
				tw.advanceClock(b.Expiration())
				// 取出bucket隊(duì)列的數(shù)據(jù),并調(diào)用addOrRun方法執(zhí)行
				b.Flush(tw.addOrRun)
			case <-tw.exitC:
				return
			}
		}
	})
}

這里使用了util封裝的一個(gè)Wrap方法,這個(gè)方法會(huì)起一個(gè)goroutines異步執(zhí)行傳入的函數(shù),具體的可以到我上面給出的鏈接去看源碼。

Start方法會(huì)啟動(dòng)兩個(gè)goroutines。第一個(gè)goroutines用來(lái)調(diào)用延遲隊(duì)列的queue的Poll方法,這個(gè)方法會(huì)一直循環(huán)獲取隊(duì)列里面的數(shù)據(jù),然后將到期的數(shù)據(jù)放入到queue的C管道中;第二個(gè)goroutines會(huì)無(wú)限循環(huán)獲取queue中C的數(shù)據(jù),如果C中有數(shù)據(jù)表示已經(jīng)到期,那么會(huì)先調(diào)用advanceClock方法將當(dāng)前時(shí)間 currentTime 往前移動(dòng)到 bucket的到期時(shí)間,然后再調(diào)用Flush方法取出bucket中的隊(duì)列,并調(diào)用addOrRun方法執(zhí)行。

func (tw *TimingWheel) advanceClock(expiration int64) {
	currentTime := atomic.LoadInt64(&tw.currentTime)
	// 過(guò)期時(shí)間大于等于(當(dāng)前時(shí)間+tick)
	if expiration >= currentTime+tw.tick {
		// 將currentTime設(shè)置為expiration,從而推進(jìn)currentTime
		currentTime = truncate(expiration, tw.tick)
		atomic.StoreInt64(&tw.currentTime, currentTime)

		// Try to advance the clock of the overflow wheel if present
		// 如果有上層時(shí)間輪,那么遞歸調(diào)用上層時(shí)間輪的引用
		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
		if overflowWheel != nil {
			(*TimingWheel)(overflowWheel).advanceClock(currentTime)
		}
	}
}

advanceClock方法會(huì)根據(jù)到期時(shí)間來(lái)從新設(shè)置currentTime,從而推進(jìn)時(shí)間輪前進(jìn)。

func (b *bucket) Flush(reinsert func(*Timer)) {
	var ts []*Timer

	b.mu.Lock()
	// 循環(huán)獲取bucket隊(duì)列節(jié)點(diǎn)
	for e := b.timers.Front(); e != nil; {
		next := e.Next()

		t := e.Value.(*Timer)
		// 將頭節(jié)點(diǎn)移除bucket隊(duì)列
		b.remove(t)
		ts = append(ts, t)

		e = next
	}
	b.mu.Unlock()

	b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()

	for _, t := range ts {
		reinsert(t)
	}
}

Flush方法會(huì)根據(jù)bucket里面timers列表進(jìn)行遍歷插入到ts數(shù)組中,然后調(diào)用reinsert方法,這里是調(diào)用的addOrRun方法。

func (tw *TimingWheel) addOrRun(t *Timer) {
	// 如果已經(jīng)過(guò)期,那么直接執(zhí)行
	if !tw.add(t) { 
		// 異步執(zhí)行定時(shí)任務(wù)
		go t.task()
	}
}

addOrRun會(huì)調(diào)用add方法檢查傳入的定時(shí)任務(wù)Timer是否已經(jīng)到期,如果到期那么異步調(diào)用task方法直接執(zhí)行。add方法我們下面會(huì)接著分析。

整個(gè)start執(zhí)行流程如圖:

Go語(yǔ)言中怎么實(shí)現(xiàn)一個(gè)時(shí)間輪

  1. start方法回啟動(dòng)一個(gè)goroutines調(diào)用poll來(lái)處理DelayQueue中到期的數(shù)據(jù),并將數(shù)據(jù)放入到管道C中;

  2. start方法啟動(dòng)第二個(gè)goroutines方法會(huì)循環(huán)獲取DelayQueue中管道C的數(shù)據(jù),管道C中實(shí)際上存放的是一個(gè)bucket,然后遍歷bucket的timers列表,如果任務(wù)已經(jīng)到期,那么異步執(zhí)行,沒(méi)有到期則重新放入到DelayQueue中。

add task

func main() {
	tw := timingwheel.NewTimingWheel(time.Second, 10)
	tw.Start() 
	// 添加任務(wù)
	tw.AfterFunc(time.Second*15, func() {
		fmt.Println("The timer fires")
		exitC <- time.Now().UTC()
	})
}

我們通過(guò)AfterFunc方法添加一個(gè)15s的定時(shí)任務(wù),如果到期了,那么執(zhí)行傳入的函數(shù)。

func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {
	t := &Timer{
		expiration: timeToMs(time.Now().UTC().Add(d)),
		task:       f,
	}
	tw.addOrRun(t)
	return t
}

AfterFunc方法回根據(jù)傳入的任務(wù)到期時(shí)間,以及到期需要執(zhí)行的函數(shù)封裝成Timer,調(diào)用addOrRun方法。addOrRun方法我們上面已經(jīng)看過(guò)了,會(huì)根據(jù)到期時(shí)間來(lái)決定是否需要執(zhí)行定時(shí)任務(wù)。

下面我們來(lái)看一下add方法:

func (tw *TimingWheel) add(t *Timer) bool {
	currentTime := atomic.LoadInt64(&tw.currentTime)
	// 已經(jīng)過(guò)期
	if t.expiration < currentTime+tw.tick {
		// Already expired
		return false
	// 	到期時(shí)間在第一層環(huán)內(nèi)
	} else if t.expiration < currentTime+tw.interval {
		// Put it into its own bucket
		// 獲取時(shí)間輪的位置
		virtualID := t.expiration / tw.tick
		b := tw.buckets[virtualID%tw.wheelSize]
		// 將任務(wù)放入到bucket隊(duì)列中
		b.Add(t) 
		// 如果是相同的時(shí)間,那么返回false,防止被多次插入到隊(duì)列中
		if b.SetExpiration(virtualID * tw.tick) { 
			// 將該bucket加入到延遲隊(duì)列中
			tw.queue.Offer(b, b.Expiration())
		}

		return true
	} else {
		// Out of the interval. Put it into the overflow wheel
		// 如果放入的到期時(shí)間超過(guò)第一層時(shí)間輪,那么放到上一層中去
		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
		if overflowWheel == nil {
			atomic.CompareAndSwapPointer(
				&tw.overflowWheel,
				nil,
				// 需要注意的是,這里tick變成了interval
				unsafe.Pointer(newTimingWheel(
					tw.interval,
					tw.wheelSize,
					currentTime,
					tw.queue,
				)),
			)
			overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
		}
		// 往上遞歸
		return (*TimingWheel)(overflowWheel).add(t)
	}
}

add方法根據(jù)到期時(shí)間來(lái)分成了三部分,第一部分是小于當(dāng)前時(shí)間+tick,表示已經(jīng)到期,那么返回false執(zhí)行任務(wù)即可;

第二部分的判斷會(huì)根據(jù)expiration是否小于時(shí)間輪的跨度,如果小于的話表示該定時(shí)任務(wù)可以放入到當(dāng)前時(shí)間輪中,通過(guò)取模找到buckets對(duì)應(yīng)的時(shí)間格并放入到bucket隊(duì)列中,SetExpiration方法會(huì)根據(jù)傳入的參數(shù)來(lái)判斷是否已經(jīng)執(zhí)行過(guò)延遲隊(duì)列的Offer方法,防止重復(fù)插入;

第三部分表示該定時(shí)任務(wù)的時(shí)間跨度超過(guò)了當(dāng)前時(shí)間輪,需要升級(jí)到上一層的時(shí)間輪中。需要注意的是,上一層的時(shí)間輪的tick是當(dāng)前時(shí)間輪的interval,延遲隊(duì)列還是同一個(gè),然后設(shè)置為指針overflowWheel,并調(diào)用add方法往上層遞歸。

到這里時(shí)間輪已經(jīng)講完了,不過(guò)還有需要注意的地方,我們?cè)谟蒙厦娴臅r(shí)間輪實(shí)現(xiàn)中,使用了DelayQueue加環(huán)形隊(duì)列的方式實(shí)現(xiàn)了時(shí)間輪。對(duì)定時(shí)任務(wù)項(xiàng)的插入和刪除操作而言,TimingWheel時(shí)間復(fù)雜度為 O(1),在DelayQueue中的隊(duì)列使用的是優(yōu)先隊(duì)列,時(shí)間復(fù)雜度是O(log n),但是由于buckets列表實(shí)際上是非常小的,所以并不會(huì)影響性能。

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI