您好,登錄后才能下訂單哦!
這篇文章主要講解了“消息隊列原理之如何掌握rabbitmq”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“消息隊列原理之如何掌握rabbitmq”吧!
RabbitMQ 是一個由 Erlang 開發(fā)的 AMQP(Advanced Message Queuing Protocol,高級消息隊列協(xié)議)的開源實現(xiàn),用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。支持多種客戶端語言。
整體架構(gòu)對照下面的圖說明
先看看圖片上各個名次的解釋:
Broker:它提供一種傳輸服務(wù),它的角色就是維護一條從生產(chǎn)者到消費者的路線,保證數(shù)據(jù)能按照指定的方式進行傳輸,簡單來說就是消息隊列服務(wù)器實體。
Connection: 客戶端與 Rabbitmq Broker
直接的 TCP
連接,通常一個客戶端與 Broker 之間只需要一個連接即可。
Channel: 消息通道,在客戶端的每個連接里,可建立多個channel,最好每個線程都用獨立的Channel,后續(xù)的對 Queue
和 Exchange
的操作都是在 Channel 中完成的。
Producer: 消息生產(chǎn)者,通過和 Broker
建立 Connection 和 Channel ,向 Exchange 發(fā)送消息。
Consumer: 消息消費者,通過和 Broker
建立 Connection 和 Channel,從 Queue 中消費消息。
Exchange: 消息交換機,按照一定的策略把 Producer 生產(chǎn)的消息投遞到 Queue 中,等待消費者消費。
Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列。
Vhost: 虛擬主機,一個broker里可以開設(shè)多個vhost,用作權(quán)限分離,把不同的系統(tǒng)使用的rabbitmq區(qū)分開,共用一個消息隊列服務(wù)器,但看上去就像各自在用不用的rabbitmq服務(wù)器一樣。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
RoutingKey:路由關(guān)鍵字,生產(chǎn)者在將消息發(fā)送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規(guī)則,而這個routing key需要與Exchange Type及binding key聯(lián)合使用才能最終生效。
這里面比較難理解的概念是
RoutingKey
,Exchange
,Binding
,消費發(fā)送時不會直接發(fā)送給Queue
,而是先發(fā)送給Exchange
,由Exchange
按照一定的規(guī)則投遞到與它綁定的Queue
中,那這個規(guī)則是什么呢? 規(guī)則就與Exchange
的 Type、Binding
、RoutingKey
相關(guān),Exchange
支持的類型有 4 種,direct,fanout,topic,headers
,含義如下:
direct:
Queue
和Exchange
在綁定時需要指定一個key
, 我們稱為Bindkey
。Producer
往Exchange
發(fā)送消息時,也需要指定一個key
,這個key
就是Routekey
。這種模式下 Exchange 會把消息投遞給Routekey
和Bindkey
相同的隊列fanout: 類似于廣播的方式,會把消息投遞給和 Exchange 綁定的所有隊列,不需要檢查
Routekey
和Bindkey
。topic: 類似于組播的方式,這種模式下
Bingkey
支持模糊匹配,*
代表匹配一個任意詞組,#
代表匹配0個或多個詞組。如 Producer 產(chǎn)生一條 RouteKey 為benz.car
的消息, 同時這個 Exchange 綁定了3組隊列(請注意是3組不是3個,意思是Exchange可以和同一個Queue進行多次綁定,通過Bindkey 的不同,它們之間是多對多的關(guān)系),Bindkey 分別為:car
,*.car
,benz.car
,那么會把這個消息投遞到*.car
、benz.car
對應(yīng)的Queue
中。headers: 這個類型
Routekey
和Bindkey
的匹配規(guī)則來路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers
屬性進行匹配。
對照上面圖和名次解釋應(yīng)該比較清晰明了了,下面我們通過幾個例子說明如何使用。
先看看 Rabbitmq 默認(rèn)的 exchange ,其中第一個(AMQP default) 是默認(rèn)的,默認(rèn)綁定了所有的 Queue
,會把消息投遞到 Routekey 對應(yīng)的隊列中,即: Routekey==QueueName
。
package main import ( "fmt" "github.com/streadway/amqp" "log" ) func handlerError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } var url = "amqp://username:password@ip:port" func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() queueNameCar := "car" if _, err := channel.QueueDeclare(queueNameCar, false, false, false, false, nil); err != nil { handlerError(err, "Failed to decare Queue") } if err := channel.Publish("", queueNameCar, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
這里是一個完整的 Demo, 后面只會提供main() 函數(shù)
的示例代碼,其他的和這里這里類似。
申明了一個名稱為 car
的消息隊列,并沒有做任何的綁定,往 defalut exchange
發(fā)送一條消息,routekey 為 car
,可以看到和隊列名相同。
為了方便演示,結(jié)果以圖片的方式展現(xiàn),可以看到這里有 car
的隊列,并且有一條消息。
在創(chuàng)建隊列有幾個參數(shù)可以關(guān)注一下
Durability: 持久化,是否將隊列持久化到磁盤,當(dāng)選擇持久化時當(dāng) rabbitmq 重啟了,這個隊列還在,否則當(dāng)重啟了之后這個隊列就沒有了,需要重新創(chuàng)建,這個需要設(shè)計程序時考慮到。
Auto delete: 當(dāng)其中一個消費者已經(jīng)完成之后,會刪除這個隊列并斷開與其他的消費者的連接。
Arguments:
x-message-ttl: 消息的過期時間,發(fā)布到隊列中的消息在被丟棄之前可以存活多久。
x-expires: 隊列的過期時間,一個隊列在多長時間內(nèi)未使用會被自動刪除。
x-max-length: 隊列的長度,最多剋容納多少條消息。
x-max-length-bytes: 隊列最大可以包含多大的消息。
x-dead-letter-exchange: 當(dāng)消息過期或者被客戶端
reject
之后應(yīng)該重新投遞到那個exchange
,類似與一個producer
發(fā)送消息時選擇exchange
x-dead-letter-routing-key: 當(dāng)消息過期或者被客戶端
reject
之后重新投遞時的Routekey
,類似與一個producer
發(fā)送消息時設(shè)置routekey
,默認(rèn)是原消息的routekey
x-max-priority: 消息的優(yōu)先級設(shè)置,設(shè)置可以支持的最大優(yōu)先級,如設(shè)置為
10
,則可以在發(fā)送消息設(shè)置優(yōu)先級,可以根據(jù)優(yōu)先級處理消息,默認(rèn)為空,當(dāng)為空時則不支持優(yōu)先級x-queue-mode: 將隊列設(shè)置為懶惰模式,盡可能多地將消息保留在磁盤上,以減少RAM的使用量;如果不設(shè)置,隊列將保留內(nèi)存中的緩存,以盡可能快地傳遞消息。
我們自己創(chuàng)建一個 direct 類型的 exchange 并綁定一些隊列看看是什么效果。
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() directExchangeNameCar := "direct.car" if err := channel.ExchangeDeclare(directExchangeNameCar, "direct", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.Publish(directExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
代碼中申明了 1 一個 Exchange
,4個 Queue
,7個 Binding
,其中一個 Binding
詳情如下:
可以看到向這個 Exchange 中發(fā)消息,Routekey 為 car
,匹配的隊列有個,那么這4個隊列中都應(yīng)該有消息才對 和我們的設(shè)想是一直
Queue
的創(chuàng)建上面已經(jīng)講過了,這里有Exchange
的創(chuàng)建,那么再看看創(chuàng)建Exchange
有哪些參數(shù)
Type: 類型,上面已經(jīng)涉及到了
Durability: 持久化
Auto delete: 是否自動刪除,如果為yes 則當(dāng)其中隊列完成
unbind
操作,則其他的queue
或者exchange
也會 unbind 并且刪除這個exchange
。Internal: 如果為yes ,則客戶端不能直接往這個 exchange 上發(fā)送消息,只能用作和
exchange
綁定。
fanout 工作方式類似于廣播,看看下面的代碼
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() fanoutExchangeNameCar := "fanout.car" if err := channel.ExchangeDeclare(fanoutExchangeNameCar, "fanout", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.Publish(fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
這個申明了一個 fanout
類型的 exchange ,和上面的代碼類似,只有 exchange 不同。
可以先在腦海中想想每個 queue 中有幾條消息。
向 fanout.car
這個 exchange 發(fā)消息指定 Routekey 為 middle.car
,但是由于是廣播模式,所以和 routekey 是沒有關(guān)系的,每個消息隊列中各有一條消息。
請注意有些 binding 指向的是同一個 queue ,那么會產(chǎn)生多條消息到相同的 queue 中,答案是否定的。producer 產(chǎn)生一條消息,根據(jù)一定的規(guī)則,每個隊列只會收到一條(如何符合投遞規(guī)則的話)。
topic 比較有意思了,和之前的簡單粗暴的用法不一樣了,先看看下面的代碼,聲明了一個 topic
類型的 exchange
, 4個 queue
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() topicExchangeNameCar := "topic.car" if err := channel.ExchangeDeclare(topicExchangeNameCar, "topic", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "*.small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "#.small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } }
現(xiàn)在思考每個 producer
產(chǎn)生消息之后,會有哪些 queue
會收到消息。
if err := channel.Publish(topicExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
每個 queue 都會收到消息
if err := channel.Publish(topicExchangeNameCar, "small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-car
這一個隊列會收到消息。
符合 Routekey 為 small.car
、*.small.car
、#.small.car
的binding
if err := channel.Publish(topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-car
這一個隊列會收到消息。
符合 Routekey 為 *.small.car
、#.small.car
的binding
if err := channel.Publish(topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-car
這一個隊列會收到消息。
符合 Routekey 為 #.small.car
的binding
if err := channel.Publish(topicExchangeNameCar, "bike", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
都不會收到消息,沒有符合的 routekey 。
這種類型很少有實際的應(yīng)用場景。
感謝各位的閱讀,以上就是“消息隊列原理之如何掌握rabbitmq”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對消息隊列原理之如何掌握rabbitmq這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。