您好,登錄后才能下訂單哦!
AMQP協(xié)議及RabbitMQ原理是什么,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。
AMQP:是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息 隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。
Publisher
消息的生產(chǎn)者。也是一個(gè)向交換器Exchange發(fā)送消息的客戶端應(yīng)用程序。
Consumer
消息的消費(fèi)者。表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
Server/Broker
又稱Broker,接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)。
Virtual host
虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由。
表示一批交換器,消息隊(duì)列和相關(guān)對(duì)象。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host里面不能有相同名稱的Exchange和Queue。
虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域,每個(gè)vhost本質(zhì)上就是一個(gè)mini版本的RabbitMQ服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost是AMQP概念的基礎(chǔ),必須在鏈接時(shí)指定,RabbitMQ默認(rèn)的vhost是“/”。
Message
消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)。消息是不具名的,由Properties和Body組成(消息頭和消息體)。Properties可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;Body這就是消息體內(nèi)容。
Exchange
交換機(jī),接收生產(chǎn)者發(fā)送的消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列。
三種常見(jiàn)的交換機(jī)類型:
1、direct(發(fā)布與訂閱,完全匹配)
2、fanout(廣播)
3、topic(主題,規(guī)則匹配)
Binding
綁定。Exchange和Queue之間的虛擬連接,binding中可以包含routing key。
一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來(lái)的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。
Routing key
路由鍵。一個(gè)路由規(guī)則,虛擬機(jī)可用它來(lái)確定如何路由一個(gè)特定消息。
隊(duì)列通過(guò)路由鍵綁定到交換機(jī)。
消息發(fā)送到MQ服務(wù)器時(shí),消息將擁有一個(gè)路由鍵,即便是空的。RabbitMQ也會(huì)將其和綁定使用的路由鍵進(jìn)行匹配。
如果匹配,消息將投遞到該隊(duì)列;如果不匹配,消息將會(huì)進(jìn)入黑洞。
Connection
連接,應(yīng)用程序與Broker的TCP網(wǎng)絡(luò)連接。
Channel
網(wǎng)絡(luò)信道,是TCP里面的虛擬連接。幾乎所有的操作都在Channel中進(jìn)行, Channel是進(jìn)行消息讀寫的通道??蛻舳丝梢越⒍鄠€(gè)Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)(類似數(shù)據(jù)庫(kù)中Connection中的session)。例如:電纜相當(dāng)于TCP,信道是一個(gè)獨(dú)立光纖束,一條TCP連接上創(chuàng)建多條信道是沒(méi)有問(wèn)題的。
TCP一旦打開(kāi),就會(huì)創(chuàng)建AMQP信道。
無(wú)論是發(fā)布消息、接收消息、訂閱隊(duì)列,這些動(dòng)作都是通過(guò)信道完成的。
RabbitMQ為什么需要信道?為什么不是直接通信?
1、TCP的創(chuàng)建和銷毀開(kāi)銷特別大。創(chuàng)建需要3次握手,銷毀需要4次分手;
2、如果不用信道,那應(yīng)用程序就會(huì)以TCP連接RabbitMQ,高峰時(shí)每秒成千上萬(wàn)條連接會(huì)造成資源巨大浪費(fèi),而且操作系統(tǒng)每秒處理TCP連接數(shù)也是有限制的,必定造成性能瓶頸;
3、信道的原理是一條線程一條通道,多條線程多條通道同用一條TCP連接。一條TCP連接可以容納無(wú)限的信道,即使每秒成千上萬(wàn)的請(qǐng)求也不會(huì)成為性能瓶頸。
Queue
也稱為Message Queue(消費(fèi)者創(chuàng)建),消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可以投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列上將其取走。
RabbitMQ(Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議)是一個(gè)開(kāi)源的消息代理和隊(duì)列服務(wù)器,用來(lái)通過(guò)普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ是使用Erlang語(yǔ)言來(lái)編寫的,并且RabbitMQ是基于AMQP協(xié)議的。
RabbitMQ高性能的原因:
Erlang語(yǔ)言是一種通用的面向并發(fā)的編程語(yǔ)言,最初在于交換機(jī)領(lǐng)域的架構(gòu)模式,這樣使得RabbitMQ在Broker之間通過(guò)數(shù)據(jù)交互的性能是非常優(yōu)秀的,Erlang有著和原生Socket一樣的延遲。
發(fā)消息的時(shí)候需要指定發(fā)往哪個(gè)Exchange,然后借助routing key發(fā)送到對(duì)應(yīng)的消息隊(duì)列queue,消費(fèi)者從訂閱的消息隊(duì)列上取消息。
可以從架構(gòu)圖看出,RabbitMQ是典型的生產(chǎn)者-消費(fèi)者模型。
Exchange交換機(jī),接收消息,并根據(jù)路由鍵轉(zhuǎn)發(fā)消息所綁定的隊(duì)列。
交換機(jī)屬性:
Name:交換機(jī)名稱
Type:交換機(jī)類型direct、topic、fanout、headers
Durability:是否需要持久化(true表示需要持久化)
Auto Delete:當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列刪除后,自動(dòng)刪除該Exchange
Internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為false(很多場(chǎng)景都不會(huì)用到該設(shè)置)
Arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議自制定化使用
所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey中指定Queue。
注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進(jìn)行任何綁定(binding)操作,消息傳遞時(shí),RouteKey必須完全匹配才會(huì)被隊(duì)列接收,否則該消息會(huì)拋棄。
所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上。
Exchange將RouteKey和某個(gè)Topic進(jìn)行模糊匹配,此時(shí)隊(duì)列需要綁定一個(gè)Topic。
注:可以使用通配符進(jìn)行模糊匹配。
Fanout(群發(fā))不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。
發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。
Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。
什么是生產(chǎn)端的可靠性投遞?
1、保障消息的成功發(fā)出;
2、保障MQ節(jié)點(diǎn)的成功接收;
3、發(fā)送端收到MQ節(jié)點(diǎn)(Broker)確認(rèn)應(yīng)答ACK;
4、完善的消息進(jìn)行補(bǔ)償機(jī)制。
互聯(lián)網(wǎng)大廠的解決方案:
消息落庫(kù),對(duì)消息狀態(tài)進(jìn)行打標(biāo);
消息的延遲投遞,做二次確認(rèn),回調(diào)檢查。
1、消息落庫(kù)/持久化
消息信息落庫(kù)(即消息持久化),對(duì)消息狀態(tài)進(jìn)行打標(biāo):
注:這種方案需要對(duì)數(shù)據(jù)庫(kù)進(jìn)行兩次持久化操作。
2、延遲投遞
消息落庫(kù)在高并發(fā)場(chǎng)景下,數(shù)據(jù)庫(kù)IO壓力大,不適用?;ヂ?lián)網(wǎng)大廠一般采用的是延遲投遞,做二次檢查,回調(diào)檢查。
注:upstream表示生產(chǎn)端,downstream表示消費(fèi)端。
1、首先,數(shù)據(jù)庫(kù)持久化,然后發(fā)送first send消息;
2、同時(shí)發(fā)送一個(gè)延遲的檢查消息(檢查第一次發(fā)送消息消費(fèi)情況);
3、消費(fèi)端消費(fèi)消息;
4、消費(fèi)端發(fā)送一個(gè)確認(rèn)消息給Broker;
5、回調(diào)服務(wù)檢測(cè)到消費(fèi)端的確認(rèn)消息,進(jìn)行數(shù)據(jù)庫(kù)的狀態(tài)持久化(這樣相當(dāng)于數(shù)據(jù)庫(kù)一次操作,異步持久化);
6、回調(diào)服務(wù)響應(yīng)第二個(gè)延時(shí)消息,確認(rèn)消息成功消費(fèi),如果出現(xiàn)異常,回調(diào)服務(wù)調(diào)用RPC給生產(chǎn)者,再次發(fā)送。
冪等性即對(duì)數(shù)據(jù)進(jìn)行若干次操作,仍然保證正確。
消費(fèi)端實(shí)現(xiàn)冪等性,就意味著,我們的消息永遠(yuǎn)不會(huì)消費(fèi)多次,即使收到多條一樣的消息。
業(yè)界主流的冪等性操作:
唯一ID+指紋碼機(jī)制
利用Redis的原子性實(shí)現(xiàn)
1、唯一ID+指紋碼機(jī)制
使用唯一ID不能保證唯一性(用戶可能在短時(shí)間內(nèi)執(zhí)行多次消費(fèi)),還需要一個(gè)指紋碼(這個(gè)可能是業(yè)務(wù)規(guī)則,比如時(shí)間戳等生成的,也可以是數(shù)據(jù)庫(kù)主鍵)。
執(zhí)行SQL:
SELECTCOUNT(1)FROMT_ORDERWHEREID=唯一ID+指紋碼
如果返回0可以執(zhí)行ISNERT操作,如果返回1則不執(zhí)行。
優(yōu)點(diǎn):
實(shí)現(xiàn)簡(jiǎn)單
缺點(diǎn):
高并發(fā)下有數(shù)據(jù)庫(kù)寫入的性能瓶頸
解決方案:
跟進(jìn)ID進(jìn)行分庫(kù)分表進(jìn)行算法路由
2、利用Redis的原子性實(shí)現(xiàn)
借助Redis本身的原子性操作實(shí)現(xiàn)。使用Redis進(jìn)行冪等,需要考慮的問(wèn)題:
1、我們是否要進(jìn)行數(shù)據(jù)落庫(kù),如果落庫(kù)的話,關(guān)鍵解決的問(wèn)題是數(shù)據(jù)庫(kù)和緩存如何做到原子性?
2、如果不進(jìn)行落庫(kù),那么都存儲(chǔ)在緩存中,如何設(shè)置定時(shí)同步的策略?
消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。
生產(chǎn)者進(jìn)行接收應(yīng)答,用來(lái)確定這個(gè)消息是否正常的發(fā)送到Broker,這種方式也是消息的可靠性投遞的核心保證!
如何實(shí)現(xiàn)Confirm確認(rèn)消息?
1、在channel上開(kāi)啟確認(rèn)模式:channel.confirmSelect()
2、在channel上添加監(jiān)聽(tīng)addConfirmListener,監(jiān)聽(tīng)成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理。
Return Listener用于處理一些不可路由的消息!
我們的消息生產(chǎn)者通過(guò)指定一個(gè)Exchange和RoutingKey,把消息送到到某一個(gè)隊(duì)列中去,然后我們的消費(fèi)者監(jiān)聽(tīng)隊(duì)列,進(jìn)行消費(fèi)處理操作。
但是在某些情況下,如果我們?cè)诎l(fā)送消息的時(shí)候,當(dāng)前的Exchange不存在或者指定的路由key路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽(tīng)這種不可達(dá)的消息,就要使用Return Listener。
假設(shè)這樣一個(gè)場(chǎng)景,首先,RabbitMQ服務(wù)器有上萬(wàn)條未處理的消息,我們隨便打開(kāi)一個(gè)消費(fèi)者客戶端,會(huì)出現(xiàn)下面情況:巨量的消息瞬間全部推送過(guò)來(lái),但是我們單個(gè)客戶端無(wú)法同時(shí)處理這么多數(shù)據(jù)。
RabbitMQ提供了一種QOS(服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過(guò)基于consumer或者channel設(shè)置Qos的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。
消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償。如果由于服務(wù)器宕機(jī)等嚴(yán)重問(wèn)題,那我們就需要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功。
消息確認(rèn)ACK:如果在處理消息的過(guò)程中,消費(fèi)者的服務(wù)器在處理消息時(shí)出現(xiàn)異常,那可能這條正在處理的消息就沒(méi)有完成消息消費(fèi),數(shù)據(jù)就會(huì)丟失。為了確保數(shù)據(jù)不會(huì)丟失,RabbitMQ支持消息確認(rèn)ACK。
ACK的消息確認(rèn)機(jī)制是消費(fèi)者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊(duì)列中刪除。
1、如果一個(gè)消費(fèi)者在處理消息出現(xiàn)了網(wǎng)絡(luò)不穩(wěn)定、服務(wù)器異常等現(xiàn)象,那么就不會(huì)有ACK反饋,RabbitMQ會(huì)認(rèn)為這個(gè)消息沒(méi)有正常消費(fèi),會(huì)將消息重新放入隊(duì)列中。
2、如果在集群的情況下:RabbitMQ會(huì)立即將這個(gè)消息推送給這個(gè)在線的其他消費(fèi)者。這種機(jī)制保證了在消費(fèi)者服務(wù)器故障的時(shí)候,不丟失任何消息和任務(wù)。
3、消息永遠(yuǎn)不會(huì)從RabbitMQ中刪除:只有當(dāng)消費(fèi)者正確發(fā)送ACK反饋,RabbitMQ確認(rèn)收到后,消息才會(huì)從RabbitMQ服務(wù)器的數(shù)據(jù)中刪除;
4、消息的ACK確認(rèn)機(jī)制默認(rèn)是打開(kāi)的。
ACK機(jī)制的開(kāi)發(fā)注意事項(xiàng):
如果忘記了ACK,那么后果很嚴(yán)重。當(dāng)Consumer退出時(shí),Message會(huì)一直重新分發(fā),然后RabbitMQ會(huì)占用越來(lái)越多的額內(nèi)存,由于RabbitMQ會(huì)長(zhǎng)時(shí)間運(yùn)行,因此這個(gè)“內(nèi)存泄露”是致命的。
消費(fèi)端重回隊(duì)列是為了對(duì)沒(méi)有處理成功的消息,把消息重新遞給Broker。
一般在實(shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是設(shè)置為false。
TTL是Time To Live的縮寫,也就是生存時(shí)間。
RabbitMQ支持消息的過(guò)期時(shí)間,在消息發(fā)送時(shí)可以進(jìn)行指定。
RabbitMQ支持隊(duì)列的過(guò)期時(shí)間,從消息入隊(duì)列開(kāi)始計(jì)算,只要超過(guò)了隊(duì)列的超時(shí)時(shí)間配置,那么消息會(huì)自動(dòng)的清除。
死信隊(duì)列(DLX,Dead-Letter-Exchange)。利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX。
消息變成死信的幾種情況:
1、 消息被拒絕(basic.reject/basic.nack)并且request=false;
2、 消息TTL過(guò)期;
3、 隊(duì)列達(dá)到最大長(zhǎng)度。
DLX也是一個(gè)正常的Exchange,和一般的Exchange沒(méi)有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。
當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新分布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列。
可以監(jiān)聽(tīng)這個(gè)隊(duì)列中消息做相應(yīng)的處理,這個(gè)特性可以彌補(bǔ)RabbitMQ3.0以前的immediate參數(shù)的功能。
死信隊(duì)列的設(shè)置:
1、首先,需要設(shè)置死信隊(duì)列的Exchange和queue,然后進(jìn)行綁定:
Exchange:dlx.exchangeQueue:dlx.queueRoutingKey:#
2、然后,我們進(jìn)行正常聲明交換機(jī)、隊(duì)列、綁定,只不過(guò)我們需要在隊(duì)列機(jī)上一個(gè)參數(shù)即可:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);這樣消息在過(guò)期、requeue、隊(duì)列在達(dá)到最大長(zhǎng)度時(shí),消息就可以直接路由到死信隊(duì)列。
rabbitmqctl stop_app:關(guān)閉應(yīng)用
rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用
rabbitmqctl status:節(jié)點(diǎn)狀態(tài)
rabbitmqctl add_user username password:添加用戶
rabbitmqctl list_users:列出所有用戶
rabbitmqctl delete_user username:刪除用戶
rabbitmqctl clear_permissions -p vhostpath username:清除用戶權(quán)限
rabbitmqctl list_user_permissions username:列出用戶權(quán)限
rabbitmqctl change_password username newpassword:修改密碼
rabbitmqctlset_permission -p vhostpath username “.*”“.*”“.*”:設(shè)置用戶權(quán)限
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。