溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法

發(fā)布時(shí)間:2021-06-28 17:54:27 來源:億速云 閱讀:165 作者:chen 欄目:編程語言

這篇文章主要介紹“最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法”,在日常操作中,相信很多人在最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

使用

結(jié)合其他 mq 的使用經(jīng)歷,基本的使用流程:

  1. 創(chuàng)建 producerconsumer

  2. 啟動(dòng) mq

  3. 生產(chǎn)消息/消費(fèi)消息

對(duì)應(yīng)到 queue 中,大致也是這個(gè):

創(chuàng)建 queue

// 生產(chǎn)者創(chuàng)建工廠
producer := newMockedProducer()
// 消費(fèi)者創(chuàng)建工廠
consumer := newMockedConsumer()
// 將生產(chǎn)者以及消費(fèi)者的創(chuàng)建工廠函數(shù)傳遞給 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

我們看看 NewQueue 需要什么構(gòu)建條件:

  1. producer constructor

  2. consumer constructor

將雙方的工廠函數(shù)傳遞給 queue ,由它去執(zhí)行以及重試。

這兩個(gè)需要的目的是將生產(chǎn)者/消費(fèi)者的構(gòu)建和消息生產(chǎn)/消費(fèi)都封裝在 mq 中,而且將生產(chǎn)者/消費(fèi)者的整套邏輯交給開發(fā)者處理:

type (
	// 開發(fā)者需要實(shí)現(xiàn)此接口
	Producer interface {
		AddListener(listener ProduceListener)
		Produce() (string, bool)
	}
	...
	// ProducerFactory定義了生成Producer的方法
	ProducerFactory func() (Producer, error)
)
  1. 其實(shí)也就是將生產(chǎn)者的邏輯交個(gè)開發(fā)者自己完成,mq 只負(fù)責(zé)生產(chǎn)者/消費(fèi)者的消息傳遞和之間的調(diào)度。

  2. 工廠方法的設(shè)計(jì),是將生產(chǎn)者本身和生產(chǎn)消息,這兩個(gè)任務(wù)都交給 queue 自己來做調(diào)度或者重試。

生產(chǎn)msg

生產(chǎn)消息當(dāng)然要回到生產(chǎn)者本身:

type mockedProducer struct {
	total int32
	count int32
  // 使用waitgroup來模擬任務(wù)的完成
	wait  sync.WaitGroup
}
// 實(shí)現(xiàn) Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
	if atomic.AddInt32(&p.count, 1) <= p.total {
		p.wait.Done()
		return "item", true
	}
	time.Sleep(time.Second)
	return "", false
}

queue 中的生產(chǎn)者編寫都必須實(shí)現(xiàn):

  • Produce():由開發(fā)者編寫生產(chǎn)消息的邏輯

  • AddListener():生產(chǎn)者

消費(fèi)msg

和生產(chǎn)者類似:

type mockedConsumer struct {
	count  int32
}

func (c *mockedConsumer) Consume(string) error {
	atomic.AddInt32(&c.count, 1)
	return nil
}

啟動(dòng) queue

啟動(dòng),然后驗(yàn)證我們上述的生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)是否傳輸成功:

func TestQueue(t *testing.T) {
	producer := newMockedProducer(rounds)
	consumer := newMockedConsumer()
	// 創(chuàng)建 queue
	q := NewQueue(func() (Producer, error) {
		return producer, nil
	}, func() (Consumer, error) {
		return consumer, nil
	})
	// 當(dāng)生產(chǎn)者生產(chǎn)完畢,執(zhí)行 Stop() 關(guān)閉生產(chǎn)端生產(chǎn)
	go func() {
		producer.wait.Wait()
    // mq生產(chǎn)端停止生產(chǎn),不是mq本身 Stop 運(yùn)行
		q.Stop()
	}()
	// 啟動(dòng)
	q.Start()
	// 驗(yàn)證生產(chǎn)消費(fèi)端是否消息消費(fèi)完成
	assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}

以上就是 queue 最簡(jiǎn)易的入門使用代碼。開發(fā)者可以根據(jù)自己的業(yè)務(wù)實(shí)際情況:自由定義生產(chǎn)者/消費(fèi)者已經(jīng)生產(chǎn)/消費(fèi)邏輯。

整體設(shè)計(jì)

![image-20210506224102836](/Users/dyhxl/Library/Application Support/typora-user-images/image-20210506224102836.png)

整體流程如上圖:

  1. 全體的通信都由 channel 進(jìn)行

  2. 通過加入監(jiān)聽器 listener ,以及事件觸發(fā) event ,相當(dāng)于將觸發(fā)器邏輯分離出來

  3. 生產(chǎn)者有 produceone ,這個(gè)是生產(chǎn)消息的邏輯,但是其中的 Produce() 是由開發(fā)者編寫【上面的 interface 中正是這個(gè)函數(shù)】

  4. 同理消費(fèi)者,Consume()

基本的消息流動(dòng)就入上圖以及上述描寫的,具體的代碼分析我們就留到下一篇,我們????分析里面,尤其是如何控制 channel 是整個(gè)設(shè)計(jì)的核心。

到此,關(guān)于“最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI