溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

消息隊列原理之如何掌握rabbitmq

發(fā)布時間:2021-10-21 13:52:42 來源:億速云 閱讀:122 作者:iii 欄目:編程語言

這篇文章主要講解了“消息隊列原理之如何掌握rabbitmq”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“消息隊列原理之如何掌握rabbitmq”吧!

介紹

RabbitMQ 是一個由 Erlang 開發(fā)的 AMQP(Advanced Message Queuing Protocol,高級消息隊列協(xié)議)的開源實現(xiàn),用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。支持多種客戶端語言。

架構(gòu)

整體架構(gòu)對照下面的圖說明

消息隊列原理之如何掌握rabbitmq

先看看圖片上各個名次的解釋:

  • Broker:它提供一種傳輸服務(wù),它的角色就是維護一條從生產(chǎn)者到消費者的路線,保證數(shù)據(jù)能按照指定的方式進行傳輸,簡單來說就是消息隊列服務(wù)器實體。

  • Connection: 客戶端與 Rabbitmq Broker 直接的 TCP 連接,通常一個客戶端與 Broker 之間只需要一個連接即可。

  • Channel: 消息通道,在客戶端的每個連接里,可建立多個channel,最好每個線程都用獨立的Channel,后續(xù)的對 QueueExchange 的操作都是在 Channel 中完成的。

  • Producer: 消息生產(chǎn)者,通過和 Broker 建立 Connection 和 Channel ,向 Exchange 發(fā)送消息。

  • Consumer: 消息消費者,通過和 Broker 建立 Connection 和 Channel,從 Queue 中消費消息。

  • Exchange: 消息交換機,按照一定的策略把 Producer 生產(chǎn)的消息投遞到 Queue 中,等待消費者消費。

  • Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列。

  • Vhost: 虛擬主機,一個broker里可以開設(shè)多個vhost,用作權(quán)限分離,把不同的系統(tǒng)使用的rabbitmq區(qū)分開,共用一個消息隊列服務(wù)器,但看上去就像各自在用不用的rabbitmq服務(wù)器一樣。

  • Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。

  • RoutingKey:路由關(guān)鍵字,生產(chǎn)者在將消息發(fā)送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規(guī)則,而這個routing key需要與Exchange Type及binding key聯(lián)合使用才能最終生效。

這里面比較難理解的概念是 RoutingKey,Exchange,Binding ,消費發(fā)送時不會直接發(fā)送給 Queue ,而是先發(fā)送給 Exchange,由 Exchange 按照一定的規(guī)則投遞到與它綁定的 Queue 中,那這個規(guī)則是什么呢? 規(guī)則就與 Exchange 的 Type、Binding、RoutingKey 相關(guān),Exchange 支持的類型有 4 種,direct,fanout,topic,headers,含義如下:

  • direct: QueueExchange 在綁定時需要指定一個 key, 我們稱為 Bindkey。ProducerExchange 發(fā)送消息時,也需要指定一個 key ,這個 key 就是 Routekey。這種模式下 Exchange 會把消息投遞給 RoutekeyBindkey 相同的隊列

  • fanout: 類似于廣播的方式,會把消息投遞給和 Exchange 綁定的所有隊列,不需要檢查 RoutekeyBindkey 。

  • topic: 類似于組播的方式,這種模式下 Bingkey 支持模糊匹配,* 代表匹配一個任意詞組,#代表匹配0個或多個詞組。如 Producer 產(chǎn)生一條 RouteKey 為 benz.car 的消息, 同時這個 Exchange 綁定了3組隊列(請注意是3組不是3個,意思是Exchange可以和同一個Queue進行多次綁定,通過Bindkey 的不同,它們之間是多對多的關(guān)系),Bindkey 分別為: car ,*.car ,benz.car ,那么會把這個消息投遞到 *.car、benz.car 對應(yīng)的 Queue 中。

  • headers: 這個類型 RoutekeyBindkey 的匹配規(guī)則來路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的 headers 屬性進行匹配。

對照上面圖和名次解釋應(yīng)該比較清晰明了了,下面我們通過幾個例子說明如何使用。

用法(golang)

direct

消息隊列原理之如何掌握rabbitmq

先看看 Rabbitmq 默認(rèn)的 exchange ,其中第一個(AMQP default) 是默認(rèn)的,默認(rèn)綁定了所有的 Queue ,會把消息投遞到 Routekey 對應(yīng)的隊列中,即: Routekey==QueueName

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func handlerError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

var url = "amqp://username:password@ip:port"

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	queueNameCar := "car"
	if _, err := channel.QueueDeclare(queueNameCar, false, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decare Queue")
	}

	if err := channel.Publish("", queueNameCar, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
  • 這里是一個完整的 Demo, 后面只會提供main() 函數(shù)的示例代碼,其他的和這里這里類似。

  • 申明了一個名稱為 car 的消息隊列,并沒有做任何的綁定,往 defalut exchange 發(fā)送一條消息,routekey 為 car ,可以看到和隊列名相同。

  • 為了方便演示,結(jié)果以圖片的方式展現(xiàn),可以看到這里有 car 的隊列,并且有一條消息。 消息隊列原理之如何掌握rabbitmq

在創(chuàng)建隊列有幾個參數(shù)可以關(guān)注一下 消息隊列原理之如何掌握rabbitmq

  • Durability: 持久化,是否將隊列持久化到磁盤,當(dāng)選擇持久化時當(dāng) rabbitmq 重啟了,這個隊列還在,否則當(dāng)重啟了之后這個隊列就沒有了,需要重新創(chuàng)建,這個需要設(shè)計程序時考慮到。

  • Auto delete: 當(dāng)其中一個消費者已經(jīng)完成之后,會刪除這個隊列并斷開與其他的消費者的連接。

  • Arguments:

    • x-message-ttl: 消息的過期時間,發(fā)布到隊列中的消息在被丟棄之前可以存活多久。

    • x-expires: 隊列的過期時間,一個隊列在多長時間內(nèi)未使用會被自動刪除。

    • x-max-length: 隊列的長度,最多剋容納多少條消息。

    • x-max-length-bytes: 隊列最大可以包含多大的消息。

    • x-dead-letter-exchange: 當(dāng)消息過期或者被客戶端reject 之后應(yīng)該重新投遞到那個exchange ,類似與一個producer發(fā)送消息時選擇exchange

    • x-dead-letter-routing-key: 當(dāng)消息過期或者被客戶端reject 之后重新投遞時的 Routekey,類似與一個producer發(fā)送消息時設(shè)置routekey,默認(rèn)是原消息的 routekey

    • x-max-priority: 消息的優(yōu)先級設(shè)置,設(shè)置可以支持的最大優(yōu)先級,如設(shè)置為10,則可以在發(fā)送消息設(shè)置優(yōu)先級,可以根據(jù)優(yōu)先級處理消息,默認(rèn)為空,當(dāng)為空時則不支持優(yōu)先級

    • x-queue-mode: 將隊列設(shè)置為懶惰模式,盡可能多地將消息保留在磁盤上,以減少RAM的使用量;如果不設(shè)置,隊列將保留內(nèi)存中的緩存,以盡可能快地傳遞消息。

我們自己創(chuàng)建一個 direct 類型的 exchange 并綁定一些隊列看看是什么效果。

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	directExchangeNameCar := "direct.car"
	if err := channel.ExchangeDeclare(directExchangeNameCar, "direct", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "big.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameSmallCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameSmallCar, "small.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.Publish(directExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
  • 代碼中申明了 1 一個 Exchange ,4個 Queue,7個 Binding ,其中一個 Binding 詳情如下: 消息隊列原理之如何掌握rabbitmq

  • 可以看到向這個 Exchange 中發(fā)消息,Routekey 為 car ,匹配的隊列有個,那么這4個隊列中都應(yīng)該有消息才對 消息隊列原理之如何掌握rabbitmq 和我們的設(shè)想是一直

Queue 的創(chuàng)建上面已經(jīng)講過了,這里有 Exchange 的創(chuàng)建,那么再看看創(chuàng)建 Exchange 有哪些參數(shù) 消息隊列原理之如何掌握rabbitmq

  • Type: 類型,上面已經(jīng)涉及到了

  • Durability: 持久化

  • Auto delete: 是否自動刪除,如果為yes 則當(dāng)其中隊列完成 unbind 操作,則其他的 queue 或者 exchange 也會 unbind 并且刪除這個 exchange 。

  • Internal: 如果為yes ,則客戶端不能直接往這個 exchange 上發(fā)送消息,只能用作和 exchange 綁定。

fanout

fanout 工作方式類似于廣播,看看下面的代碼

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	fanoutExchangeNameCar := "fanout.car"
	if err := channel.ExchangeDeclare(fanoutExchangeNameCar, "fanout", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.Publish(fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
  • 這個申明了一個 fanout 類型的 exchange ,和上面的代碼類似,只有 exchange 不同。

  • 可以先在腦海中想想每個 queue 中有幾條消息。

  • fanout.car 這個 exchange 發(fā)消息指定 Routekey 為 middle.car ,但是由于是廣播模式,所以和 routekey 是沒有關(guān)系的,每個消息隊列中各有一條消息。

  • 請注意有些 binding 指向的是同一個 queue ,那么會產(chǎn)生多條消息到相同的 queue 中,答案是否定的。producer 產(chǎn)生一條消息,根據(jù)一定的規(guī)則,每個隊列只會收到一條(如何符合投遞規(guī)則的話)。 消息隊列原理之如何掌握rabbitmq

topic

topic 比較有意思了,和之前的簡單粗暴的用法不一樣了,先看看下面的代碼,聲明了一個 topic 類型的 exchange, 4個 queue

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	topicExchangeNameCar := "topic.car"
	if err := channel.ExchangeDeclare(topicExchangeNameCar, "topic", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameBigCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameBigCar, "big.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }

    if err := channel.QueueBind(queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }

    if err := channel.QueueBind(queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "*.small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "#.small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
}

現(xiàn)在思考每個 producer 產(chǎn)生消息之后,會有哪些 queue 會收到消息。

	if err := channel.Publish(topicExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • 每個 queue 都會收到消息

	if err := channel.Publish(topicExchangeNameCar, "small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • small-car 這一個隊列會收到消息。

    • 符合 Routekey 為 small.car*.small.car#.small.car 的binding

	if err := channel.Publish(topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • small-car 這一個隊列會收到消息。

    • 符合 Routekey 為 *.small.car、#.small.car 的binding

	if err := channel.Publish(topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • small-car 這一個隊列會收到消息。

    • 符合 Routekey 為 #.small.car 的binding

	if err := channel.Publish(topicExchangeNameCar, "bike", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • 都不會收到消息,沒有符合的 routekey 。

headers

這種類型很少有實際的應(yīng)用場景。

感謝各位的閱讀,以上就是“消息隊列原理之如何掌握rabbitmq”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對消息隊列原理之如何掌握rabbitmq這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI