溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

machinery的功能有哪些

發(fā)布時間:2021-10-25 16:32:47 來源:億速云 閱讀:140 作者:iii 欄目:編程語言

本篇內(nèi)容主要講解“machinery的功能有哪些”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“machinery的功能有哪些”吧!

特性

上面只是簡單舉了個例子,任務(wù)隊列有著廣泛的應(yīng)用場景,比如大批量的計算任務(wù),當有大量數(shù)據(jù)插入,通過拆分并分批插入任務(wù)隊列,從而實現(xiàn)串行鏈式任務(wù)處理或者實現(xiàn)分組并行任務(wù)處理,提高系統(tǒng)魯棒性,提高系統(tǒng)并發(fā)度;或者對數(shù)據(jù)進行預(yù)處理,定期的從后端存儲將數(shù)據(jù)同步到到緩存系統(tǒng),從而在查詢請求發(fā)生時,直接去緩存系統(tǒng)中查詢,提高查詢請求的響應(yīng)速度。適用任務(wù)隊列的場景有很多,這里就不一一列舉了?;貧w本文主題,既然我們要學習machinery,就要先了解一下他都有哪些特性呢?

  • 任務(wù)重試機制
  • 延遲任務(wù)支持
  • 任務(wù)回調(diào)機制
  • 任務(wù)結(jié)果記錄
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB
 

架構(gòu)

任務(wù)隊列,簡而言之就是一個放大的生產(chǎn)者消費者模型,用戶請求會生成任務(wù),任務(wù)生產(chǎn)者不斷的向隊列中插入任務(wù),同時,隊列的處理器程序充當消費者不斷的消費任務(wù)?;谶@種框架設(shè)計思想,我們來看下machinery的簡單設(shè)計結(jié)構(gòu)圖例:

machinery的功能有哪些


 
  • Sender:業(yè)務(wù)推送模塊,生成具體任務(wù),可根據(jù)業(yè)務(wù)邏輯中,按交互進行拆分;
  • Broker:存儲具體序列化后的任務(wù),machinery中目前支持到Redis, AMQP,和SQS;
  • Worker:工作進程,負責消費者功能,處理具體的任務(wù);
  • Backend:后端存儲,用于存儲任務(wù)執(zhí)行狀態(tài)的數(shù)據(jù);
 

e.g

學習一門新東西,我都習慣先寫一個demo,先學會了走,再學會跑。所以先來看一個例子,功能很簡單,異步計算1到10的和。

先看一下配置文件代碼:

broker: redis://localhost:6379

default_queue: "asong"

result_backend: redis://localhost:6379

redis:
  max_idle: 3
  max_active: 3
  max_idle_timeout: 240
  wait: true
  read_timeout: 15
  write_timeout: 15
  connect_timeout: 15
  normal_tasks_poll_period: 1000
  delayed_tasks_poll_period: 500
  delayed_tasks_key: "asong"
 

這里brokerresult_backend來實現(xiàn)。

主代碼,完整版github獲?。?/p>


func main()  {

 cnf,err := config.NewFromYaml("./config.yml",false)
 if err != nil{
  log.Println("config failed",err)
  return
 }

 server,err := machinery.NewServer(cnf)
 if err != nil{
  log.Println("start server failed",err)
  return
 }

 // 注冊任務(wù)
 err = server.RegisterTask("sum",Sum)
 if err != nil{
  log.Println("reg task failed",err)
  return
 }

 worker := server.NewWorker("asong", 1)
 go func() {
  err = worker.Launch()
  if err != nil {
   log.Println("start worker error",err)
   return
  }
 }()

 //task signature
 signature := &tasks.Signature{
  Name: "sum",
  Args: []tasks.Arg{
   {
    Type:  "[]int64",
    Value: []int64{1,2,3,4,5,6,7,8,9,10},
   },
  },
 }

 asyncResult, err := server.SendTask(signature)
 if err != nil {
  log.Fatal(err)
 }
 res, err := asyncResult.Get(1)
 if err != nil {
  log.Fatal(err)
 }
 log.Printf("get res is %v\n", tasks.HumanReadableResults(res))

}
 

運行結(jié)果:

INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55
 

好啦,現(xiàn)在我們開始講一講上面的代碼流程,

  • 讀取配置文件,這一步是為了配置     broker和     result_backend,這里我都選擇的是     redis,因為電腦正好有這個環(huán)境,就直接用了。
  • Machinery 庫必須在使用前實例化。實現(xiàn)方法是創(chuàng)建一個     Server實例。     Server是     Machinery配置和注冊任務(wù)的基本對象。
  • 在你的     workders能消費一個任務(wù)前,你需要將它注冊到服務(wù)器。這是通過給任務(wù)分配一個唯一的名稱來實現(xiàn)的。
  • 為了消費任務(wù),你需有有一個或多個worker正在運行。運行worker所需要的只是一個具有已注冊任務(wù)的     Server實例。每個worker將只使用已注冊的任務(wù)。對于隊列中的每個任務(wù),Worker.Process()方法將在一個goroutine中運行。可以使用     server.NewWorker的第二參數(shù)來限制并發(fā)運行的worker.Process()調(diào)用的數(shù)量(每個worker)。
  • 可以通過將     Signature實例傳遞給     Server實例來調(diào)用任務(wù)。
  • 調(diào)用     HumanReadableResults這個方法可以處理反射值,獲取到最終的結(jié)果。
 

多功能

 

1. 延時任務(wù)

上面的代碼只是一個簡單machinery使用示例,其實machiney也支持延時任務(wù)的,可以通過在任務(wù)signature上設(shè)置ETA時間戳字段來延遲任務(wù)。

eta := time.Now().UTC().Add(time.Second * 20)
 signature.ETA = &eta
   

2. 重試任務(wù)

在將任務(wù)聲明為失敗之前,可以設(shè)置多次重試嘗試。斐波那契序列將用于在一段時間內(nèi)分隔重試請求。這里可以使用兩種方法,第一種直接對tsak signature中的retryTimeoutRetryCount字段進行設(shè)置,就可以,重試時間將按照斐波那契數(shù)列進行疊加。

//task signature
 signature := &tasks.Signature{
  Name: "sum",
  Args: []tasks.Arg{
   {
    Type:  "[]int64",
    Value: []int64{1,2,3,4,5,6,7,8,9,10},
   },
  },
  RetryTimeout: 100,
  RetryCount: 3,
 }
 

或者,你可以使用return.tasks.ErrRetryTaskLater 返回任務(wù)并指定重試的持續(xù)時間。

func Sum(args []int64) (int64, error) {
 sum := int64(0)
 for _, arg := range args {
  sum += arg
 }

 return sum, tasks.NewErrRetryTaskLater("我說他錯了", 4 * time.Second)

}
   

3. 工作流

上面我們講的都是運行一個異步任務(wù),但是我們往往做項目時,一個需求是需要多個異步任務(wù)以編排好的方式執(zhí)行的,所以我們就可以使用machinery的工作流來完成。

 
3.1 Groups

Group 是一組任務(wù),它們將相互獨立地并行執(zhí)行。還是畫個圖吧,這樣看起來更明了:

machinery的功能有哪些

一起來看一個簡單的例子:

 // group
 group,err :=tasks.NewGroup(signature1,signature2,signature3)
 if err != nil{
  log.Println("add group failed",err)
 }

 asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
 if err != nil {
  log.Println(err)
 }
 for _, asyncResult := range asyncResults{
  results,err := asyncResult.Get(1)
  if err != nil{
   log.Println(err)
   continue
  }
  log.Printf(
   "%v  %v  %v\n",
   asyncResult.Signature.Args[0].Value,
   tasks.HumanReadableResults(results),
  )
 }
 

group中的任務(wù)是并行執(zhí)行的。

 
3.2 chords

我們在做項目時,往往會有一些回調(diào)場景,machiney也為我們考慮到了這一點,Chord允許你定一個回調(diào)任務(wù)在groups中的所有任務(wù)執(zhí)行結(jié)束后被執(zhí)行。

machinery的功能有哪些

來看一段代碼:

callback := &tasks.Signature{
  Name: "call",
 }



 group, err := tasks.NewGroup(signature1, signature2, signature3)
 if err != nil {

  log.Printf("Error creating group: %s", err.Error())
  return
 }

 chord, err := tasks.NewChord(group, callback)
 if err != nil {
  log.Printf("Error creating chord: %s", err)
  return
 }

 chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
 if err != nil {
  log.Printf("Could not send chord: %s", err.Error())
  return
 }

 results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
 if err != nil {
  log.Printf("Getting chord result failed with error: %s", err.Error())
  return
 }
 log.Printf("%v\n", tasks.HumanReadableResults(results))
 

上面的例子并行執(zhí)行task1、task2、task3,聚合它們的結(jié)果并將它們傳遞給callback任務(wù)。

 
3.3 chains

chain就是一個接一個執(zhí)行的任務(wù)集,每個成功的任務(wù)都會觸發(fā)chain中的下一個任務(wù)。

machinery的功能有哪些

看這樣一段代碼:

//chain
 chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
 if err != nil {

  log.Printf("Error creating group: %s", err.Error())
  return
 }
 chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
 if err != nil {
  log.Printf("Could not send chain: %s", err.Error())
  return
 }

 results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
 if err != nil {
  log.Printf("Getting chain result failed with error: %s", err.Error())
 }
 log.Printf(" %v\n", tasks.HumanReadableResults(results))

上面的例子執(zhí)行task1,然后是task2,然后是task3。當一個任務(wù)成功完成時,結(jié)果被附加到chain中下一個任務(wù)的參數(shù)列表的末尾,最終執(zhí)行callback任務(wù)。

到此,相信大家對“machinery的功能有哪些”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI