溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

go語言任務(wù)隊(duì)列machinery的用法

發(fā)布時(shí)間:2021-07-01 10:44:57 來源:億速云 閱讀:1399 作者:chen 欄目:編程語言

這篇文章主要介紹“go語言任務(wù)隊(duì)列machinery的用法”,在日常操作中,相信很多人在go語言任務(wù)隊(duì)列machinery的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”go語言任務(wù)隊(duì)列machinery的用法”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

描述

go實(shí)現(xiàn)的基于消息中間件的異步任務(wù)隊(duì)列, 下面是學(xué)習(xí)筆記

使用概述

步驟1: 創(chuàng)建server,配置參數(shù)、注冊task。(此處server只是個(gè)配置作用, 并不是單獨(dú)的server進(jìn)程)

步驟2: 啟動worker

步驟3: 發(fā)送task

與celery的用法是完全一致的

創(chuàng)建server

func startServer() (*machinery.Server, error) {
	cnf := &config.Config{
		Broker:          "amqp://guest:guest@localhost:5672/",
		DefaultQueue:    "machinery_tasks",
		ResultBackend:   "amqp://guest:guest@localhost:5672/",
		ResultsExpireIn: 3600,  //任務(wù)有效期
		AMQP: &config.AMQPConfig{
			Exchange:      "machinery_exchange",
			ExchangeType:  "direct",
			BindingKey:    "machinery_task",
			PrefetchCount: 3,   //限定消費(fèi)能力
		},
	}

	// Create server instance
	broker := amqpbroker.New(cnf)
	backend := amqpbackend.New(cnf)
	lock := eagerlock.New()     //任務(wù)鎖
	server := machinery.NewServer(cnf, broker, backend, lock)

	// Register tasks
	tasks := map[string]interface{}{
		"add":               exampletasks.Add,
		"multiply":          exampletasks.Multiply,
		"sum_ints":          exampletasks.SumInts,
		"sum_floats":        exampletasks.SumFloats,
		"concat":            exampletasks.Concat,
		"split":             exampletasks.Split,
		"panic_task":        exampletasks.PanicTask,
		"long_running_task": exampletasks.LongRunningTask,
	}

	return server, server.RegisterTasks(tasks)
}

創(chuàng)建worker

創(chuàng)建worker, 之后就可以啟動了

func worker() error {
    //消費(fèi)者的標(biāo)記
	consumerTag := "machinery_worker"

	server, err := startServer()
	if err != nil {
		return err
	}

    //第二個(gè)參數(shù)并發(fā)數(shù), 0表示不限制
	worker := server.NewWorker(consumerTag, 0)

    //鉤子函數(shù)
	errorhandler := func(err error) {}
	pretaskhandler := func(signature *tasks.Signature) {}
	posttaskhandler := func(signature *tasks.Signature) {}

	worker.SetPostTaskHandler(posttaskhandler)
	worker.SetErrorHandler(errorhandler)
	worker.SetPreTaskHandler(pretaskhandler)
	return worker.Launch()
}

啟動結(jié)果

INFO: 2021/05/01 08:28:27 worker.go:58 Launching a worker with the following settings:
INFO: 2021/05/01 08:28:27 worker.go:59 - Broker: amqp://192.168.120.101:5672
INFO: 2021/05/01 08:28:27 worker.go:61 - DefaultQueue: machinery_tasks
INFO: 2021/05/01 08:28:27 worker.go:65 - ResultBackend: amqp://192.168.120.101:5672
INFO: 2021/05/01 08:28:27 worker.go:67 - AMQP: machinery_exchange
INFO: 2021/05/01 08:28:27 worker.go:68   - Exchange: machinery_exchange
INFO: 2021/05/01 08:28:27 worker.go:69   - ExchangeType: direct
INFO: 2021/05/01 08:28:27 worker.go:70   - BindingKey: machinery_task
INFO: 2021/05/01 08:28:27 worker.go:71   - PrefetchCount: 0
INFO: 2021/05/01 08:28:27 amqp.go:96 [*] Waiting for messages. To exit press CTRL+C

發(fā)送任務(wù)

server, _ := startServer()

signature := &tasks.Signature{
    Name: "add",
    Args: []tasks.Arg{
        {
            Type:  "int64",
            Value: 1,
        },
        {
            Type:  "int64",
            Value: 1,
        },
    },
}
asyncResult, _ := server.SendTask(signature)
fmt.Println(asyncResult.Get(time.Millisecond * 5))  //等待間隔,理論上是越小越好

//asyncResult.GetWithTimeout(time.Second*120, time.Millisecond * 5)   //第一個(gè)參數(shù)才是timeout

總結(jié)

以上就是machinery的基本用法,與celery基本一樣, 更詳細(xì)內(nèi)容參考官方文檔

到此,關(guān)于“go語言任務(wù)隊(duì)列machinery的用法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

go
AI