您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“如何使用分布式定時(shí)任務(wù)”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
dq
封裝底層 beanstalkd
操作,分布式存儲(chǔ),延遲、定時(shí)設(shè)置。重啟服務(wù)可以重新執(zhí)行,但是消息不會(huì)丟失,因?yàn)橄⒌奶幚矶冀挥? beanstalkd
完成。
可以看出使用非常簡單,同時(shí) dq
中使用了 redis setnx
保證了每個(gè)消息只被消費(fèi)一次。但是在生產(chǎn)者端沒有使用 redis
做消息存儲(chǔ),這個(gè)和前面描述的一致。
對(duì) dq
的整體架構(gòu)做了簡單介紹,下面就開始正式的探索 :hammer:
func main() { producer := dq.NewProducer([]dq.Beanstalk{ { Endpoint: "localhost:11300", Tube: "tube", }, { Endpoint: "localhost:11301", Tube: "tube", }, }) for i := 1000; i < 1005; i++ { // Delay:延遲執(zhí)行 _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5) // At:在某一個(gè)時(shí)刻執(zhí)行 //_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5)) if err != nil { fmt.Println(err) } } }
從使用上,簡單分為兩步:
NewProducer(opts)
:將本地隊(duì)列的端口配置和主題配置傳入生產(chǎn)者;
producer.Delay()
:使用剛創(chuàng)建好的 生產(chǎn)者,調(diào)用它的 Delay()
。將需要異步發(fā)送的消息傳入,Delay
還需要傳入延遲執(zhí)行的時(shí)間。
需要說明的是:創(chuàng)建的 producer
是一個(gè)接口,Delay()
只是接口其中的一個(gè)方法。后續(xù)會(huì)其他的方法和內(nèi)部設(shè)計(jì)。那我們就繼續(xù)往下探索吧~~~
下面從 example
的代碼進(jìn)去,看整個(gè)函數(shù)的調(diào)用鏈。
dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...}) // 初始化生產(chǎn)者 |- NewProducerNode(endpoint, tube) // endpoint,tube 來自傳入的配置數(shù)組
緊接著就到 producerNode.go
,這個(gè)部分就會(huì)牽涉到 beanstalk
的初始化:
NewProducerNode(endpoint, tube) |- conn: newConnection(endpoint, tube) |- return &connection{}
這就涉及到 beanstalk
:connection.conn -> *beanstalk.Conn
。
但是在 newConnection()
中并沒有對(duì) beanstalk.Conn
進(jìn)行初始化,這屬于 延遲初始化
首先是生產(chǎn)者端調(diào)用 producer.Delay(data, timesecond)
,就把消息插入到內(nèi)部隊(duì)列,timesecond
就是延遲執(zhí)行的時(shí)間。我們來看看 Delay()
到底做了什么?
p.Delay(data, timesecond) |- p.wrap(data, time) // 將 data 和 time 包裝到一塊 |- p.insert(nodeFn) |- node.Delay() // for rangre p.node 每一個(gè)node都執(zhí)行一遍 `Delay()`
而 p.insert
就是將上一步封裝好的 data 傳遞給 p{cluster}
的每一個(gè)node去執(zhí)行 node.Delay
。
在前面的 初始化 說過,最開始是沒有對(duì) conn
進(jìn)行初始化,那現(xiàn)在要插入數(shù)據(jù),總不能不初始化這個(gè) conn
。
node.Delay() // 配置中的每個(gè)node都執(zhí)行 `Delay()` |- node.conn.get() // 獲取node中的conn【conn==nil,就初始化一個(gè)conn】 |- _, err := conn.Put(data, deplay, opts...) |- node.conn.reset() // 出現(xiàn)err情況下,如OOM/Timeout等情況 -> 關(guān)閉conn,防止泄漏
所以最后 Delay
實(shí)際上是執(zhí)行 tube.Put(data, delay)
:
tube.Put(data, delay) |- tube.Conn.cmd("put", ...) // 生產(chǎn)者發(fā)布job
這里就涉及到 beanstalk
的 Put
操作:首先看看生產(chǎn)者 Put
指令參數(shù)說明:
put <pri> <delay> <ttr> <bytes> <data>
<pri>
:優(yōu)先級(jí),值越小優(yōu)先級(jí)越高,默認(rèn)為1024;
<delay>
:延遲 ready
秒數(shù),在這段時(shí)間 job 為 delayed
狀態(tài);
<ttr>
:time to run
,允許 worker 執(zhí)行的最大秒數(shù),如果 worker 在這段時(shí)間不能 delete,release,bury job,那么當(dāng) job 超時(shí),服務(wù)器將自動(dòng) release 此job;
<bytes>
:job body
的長度,不包含\r\n
;
<data>
: job body data;
OK。那插入 job
成功,響應(yīng)什么呢?
INSERTED <id>\r\n
返回的 id
是插入 job
的任務(wù)標(biāo)識(shí)。到此 Put
分析完畢,跟著代碼走一遍:
tube.Put(data, priority, daley, ttr) |- tube.Conn.cmd("put", ...) |- tube.Conn.readResp("INSERTED id") |- return id, err // 將id返回
這樣我們?cè)?example
中直接可以看到的 生產(chǎn)者 執(zhí)行的操作就介紹完了。上圖,圖更好說話:
那么除了 example
中使用的 Delay()
,還有其余幾個(gè)方法:
Producer interface { At(body []byte, at time.Time) (string, error) Close() error Delay(body []byte, delay time.Duration) (string, error) Revoke(ids string) error }
At
:指定某個(gè)時(shí)間執(zhí)行【實(shí)質(zhì)也是執(zhí)行 Delay()
】
Close
:關(guān)閉全部node的連接
Delay
:延遲執(zhí)行。傳入延遲的時(shí)間。
Revoke
:實(shí)質(zhì)上是當(dāng)出現(xiàn)最小寫入節(jié)點(diǎn)<2時(shí),觸發(fā)添加失敗,將添加成功的job刪除掉。
當(dāng)然,事實(shí)上 dq
使用上,開發(fā)者只需要使用 At/Delay
就行了。也就是你只要知道你的任務(wù)是定時(shí)觸發(fā)還是延遲觸發(fā)即可。剩下的,dq
內(nèi)部的封裝都已經(jīng)幫你做好了。
“如何使用分布式定時(shí)任務(wù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。