溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

AMQP協(xié)議及RabbitMQ原理是什么

發(fā)布時(shí)間:2021-12-03 18:28:45 來(lái)源:億速云 閱讀:202 作者:柒染 欄目:云計(jì)算

AMQP協(xié)議及RabbitMQ原理是什么,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

01 AMQP協(xié)議

1.1 概述

    AMQP:是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息    隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。

AMQP協(xié)議及RabbitMQ原理是什么  

1.2 核心概念

    Publisher

    消息的生產(chǎn)者。也是一個(gè)向交換器Exchange發(fā)送消息的客戶端應(yīng)用程序。

    Consumer

    消息的消費(fèi)者。表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。

    Server/Broker

    又稱Broker,接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)。

    Virtual host

    虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由。

    表示一批交換器,消息隊(duì)列和相關(guān)對(duì)象。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host里面不能有相同名稱的Exchange和Queue。

    虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域,每個(gè)vhost本質(zhì)上就是一個(gè)mini版本的RabbitMQ服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost是AMQP概念的基礎(chǔ),必須在鏈接時(shí)指定,RabbitMQ默認(rèn)的vhost是“/”。

    Message

    消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)。消息是不具名的,由Properties和Body組成(消息頭和消息體)。Properties可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;Body這就是消息體內(nèi)容。

    Exchange

    交換機(jī),接收生產(chǎn)者發(fā)送的消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列。

    三種常見(jiàn)的交換機(jī)類型:

    1、direct(發(fā)布與訂閱,完全匹配)

    2、fanout(廣播)

    3、topic(主題,規(guī)則匹配)

    Binding

    綁定。Exchange和Queue之間的虛擬連接,binding中可以包含routing key。

    一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來(lái)的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。

    Routing key

    路由鍵。一個(gè)路由規(guī)則,虛擬機(jī)可用它來(lái)確定如何路由一個(gè)特定消息。

    隊(duì)列通過(guò)路由鍵綁定到交換機(jī)。

    消息發(fā)送到MQ服務(wù)器時(shí),消息將擁有一個(gè)路由鍵,即便是空的。RabbitMQ也會(huì)將其和綁定使用的路由鍵進(jìn)行匹配。

    如果匹配,消息將投遞到該隊(duì)列;如果不匹配,消息將會(huì)進(jìn)入黑洞。

    Connection

    連接,應(yīng)用程序與Broker的TCP網(wǎng)絡(luò)連接。

    Channel

    網(wǎng)絡(luò)信道,是TCP里面的虛擬連接。幾乎所有的操作都在Channel中進(jìn)行,    Channel是進(jìn)行消息讀寫的通道??蛻舳丝梢越⒍鄠€(gè)Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)(類似數(shù)據(jù)庫(kù)中Connection中的session)。例如:電纜相當(dāng)于TCP,信道是一個(gè)獨(dú)立光纖束,一條TCP連接上創(chuàng)建多條信道是沒(méi)有問(wèn)題的。

    TCP一旦打開(kāi),就會(huì)創(chuàng)建AMQP信道。

    無(wú)論是發(fā)布消息、接收消息、訂閱隊(duì)列,這些動(dòng)作都是通過(guò)信道完成的。

    RabbitMQ為什么需要信道?為什么不是直接通信?

    1、TCP的創(chuàng)建和銷毀開(kāi)銷特別大。創(chuàng)建需要3次握手,銷毀需要4次分手;

    2、如果不用信道,那應(yīng)用程序就會(huì)以TCP連接RabbitMQ,高峰時(shí)每秒成千上萬(wàn)條連接會(huì)造成資源巨大浪費(fèi),而且操作系統(tǒng)每秒處理TCP連接數(shù)也是有限制的,必定造成性能瓶頸;

    3、信道的原理是一條線程一條通道,多條線程多條通道同用一條TCP連接。一條TCP連接可以容納無(wú)限的信道,即使每秒成千上萬(wàn)的請(qǐng)求也不會(huì)成為性能瓶頸。

    Queue

    也稱為Message Queue(消費(fèi)者創(chuàng)建),消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可以投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列上將其取走。

02 概述

    RabbitMQ(Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議)是一個(gè)開(kāi)源的消息代理和隊(duì)列服務(wù)器,用來(lái)通過(guò)普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ是使用Erlang語(yǔ)言來(lái)編寫的,并且RabbitMQ是基于AMQP協(xié)議的。

    RabbitMQ高性能的原因:

    Erlang語(yǔ)言是一種通用的面向并發(fā)的編程語(yǔ)言,最初在于交換機(jī)領(lǐng)域的架構(gòu)模式,這樣使得RabbitMQ在Broker之間通過(guò)數(shù)據(jù)交互的性能是非常優(yōu)秀的,Erlang有著和原生Socket一樣的延遲。

03 架構(gòu)

AMQP協(xié)議及RabbitMQ原理是什么  

    發(fā)消息的時(shí)候需要指定發(fā)往哪個(gè)Exchange,然后借助routing key發(fā)送到對(duì)應(yīng)的消息隊(duì)列queue,消費(fèi)者從訂閱的消息隊(duì)列上取消息。

    可以從架構(gòu)圖看出,RabbitMQ是典型的生產(chǎn)者-消費(fèi)者模型。

04 RabbitMQ交換機(jī)

AMQP協(xié)議及RabbitMQ原理是什么  

    Exchange交換機(jī),接收消息,并根據(jù)路由鍵轉(zhuǎn)發(fā)消息所綁定的隊(duì)列。

    交換機(jī)屬性:

    Name:交換機(jī)名稱

    Type:交換機(jī)類型direct、topic、fanout、headers

    Durability:是否需要持久化(true表示需要持久化)

    Auto Delete:當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列刪除后,自動(dòng)刪除該Exchange

    Internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為false(很多場(chǎng)景都不會(huì)用到該設(shè)置)

    Arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議自制定化使用

4.1 Direc Exchange 

    所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey中指定Queue。

    注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進(jìn)行任何綁定(binding)操作,消息傳遞時(shí),RouteKey必須完全匹配才會(huì)被隊(duì)列接收,否則該消息會(huì)拋棄。

4.2 Topic Exchange

    所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上。

    Exchange將RouteKey和某個(gè)Topic進(jìn)行模糊匹配,此時(shí)隊(duì)列需要綁定一個(gè)Topic。

    注:可以使用通配符進(jìn)行模糊匹配。

4.3 Fanout Exchange

    Fanout(群發(fā))不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。

    發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。

    Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。

05 消息可靠性

 5.1 可靠性投遞

    什么是生產(chǎn)端的可靠性投遞?

    1、保障消息的成功發(fā)出;

    2、保障MQ節(jié)點(diǎn)的成功接收;

    3、發(fā)送端收到MQ節(jié)點(diǎn)(Broker)確認(rèn)應(yīng)答ACK;

    4、完善的消息進(jìn)行補(bǔ)償機(jī)制。

5.2 解決方案

    互聯(lián)網(wǎng)大廠的解決方案:

    消息落庫(kù),對(duì)消息狀態(tài)進(jìn)行打標(biāo);

    消息的延遲投遞,做二次確認(rèn),回調(diào)檢查。

    1、消息落庫(kù)/持久化

    消息信息落庫(kù)(即消息持久化),對(duì)消息狀態(tài)進(jìn)行打標(biāo):

AMQP協(xié)議及RabbitMQ原理是什么

    注:這種方案需要對(duì)數(shù)據(jù)庫(kù)進(jìn)行兩次持久化操作。 

    2、延遲投遞

    消息落庫(kù)在高并發(fā)場(chǎng)景下,數(shù)據(jù)庫(kù)IO壓力大,不適用?;ヂ?lián)網(wǎng)大廠一般采用的是延遲投遞,做二次檢查,回調(diào)檢查。 

AMQP協(xié)議及RabbitMQ原理是什么  

    注:upstream表示生產(chǎn)端,downstream表示消費(fèi)端。

    1、首先,數(shù)據(jù)庫(kù)持久化,然后發(fā)送first send消息;

    2、同時(shí)發(fā)送一個(gè)延遲的檢查消息(檢查第一次發(fā)送消息消費(fèi)情況);

    3、消費(fèi)端消費(fèi)消息;

    4、消費(fèi)端發(fā)送一個(gè)確認(rèn)消息給Broker;

    5、回調(diào)服務(wù)檢測(cè)到消費(fèi)端的確認(rèn)消息,進(jìn)行數(shù)據(jù)庫(kù)的狀態(tài)持久化(這樣相當(dāng)于數(shù)據(jù)庫(kù)一次操作,異步持久化);

    6、回調(diào)服務(wù)響應(yīng)第二個(gè)延時(shí)消息,確認(rèn)消息成功消費(fèi),如果出現(xiàn)異常,回調(diào)服務(wù)調(diào)用RPC給生產(chǎn)者,再次發(fā)送。

06 消息冪等性

6.1 冪等性

    冪等性即對(duì)數(shù)據(jù)進(jìn)行若干次操作,仍然保證正確。

    消費(fèi)端實(shí)現(xiàn)冪等性,就意味著,我們的消息永遠(yuǎn)不會(huì)消費(fèi)多次,即使收到多條一樣的消息。

6.2 解決方案

    業(yè)界主流的冪等性操作:

    唯一ID+指紋碼機(jī)制

    利用Redis的原子性實(shí)現(xiàn)

   1、唯一ID+指紋碼機(jī)制

    使用唯一ID不能保證唯一性(用戶可能在短時(shí)間內(nèi)執(zhí)行多次消費(fèi)),還需要一個(gè)指紋碼(這個(gè)可能是業(yè)務(wù)規(guī)則,比如時(shí)間戳等生成的,也可以是數(shù)據(jù)庫(kù)主鍵)。

    執(zhí)行SQL:

    SELECTCOUNT(1)FROMT_ORDERWHEREID=唯一ID+指紋碼

    如果返回0可以執(zhí)行ISNERT操作,如果返回1則不執(zhí)行。

    優(yōu)點(diǎn):

    實(shí)現(xiàn)簡(jiǎn)單

    缺點(diǎn):

    高并發(fā)下有數(shù)據(jù)庫(kù)寫入的性能瓶頸

    解決方案:

    跟進(jìn)ID進(jìn)行分庫(kù)分表進(jìn)行算法路由

    2、利用Redis的原子性實(shí)現(xiàn)

    借助Redis本身的原子性操作實(shí)現(xiàn)。使用Redis進(jìn)行冪等,需要考慮的問(wèn)題:

    1、我們是否要進(jìn)行數(shù)據(jù)落庫(kù),如果落庫(kù)的話,關(guān)鍵解決的問(wèn)題是數(shù)據(jù)庫(kù)和緩存如何做到原子性?

    2、如果不進(jìn)行落庫(kù),那么都存儲(chǔ)在緩存中,如何設(shè)置定時(shí)同步的策略?

07 消息投遞

7.1 Confirm確認(rèn)消息

    消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會(huì)給我們生產(chǎn)者一個(gè)應(yīng)答。

    生產(chǎn)者進(jìn)行接收應(yīng)答,用來(lái)確定這個(gè)消息是否正常的發(fā)送到Broker,這種方式也是消息的可靠性投遞的核心保證!

AMQP協(xié)議及RabbitMQ原理是什么  

    如何實(shí)現(xiàn)Confirm確認(rèn)消息?

    1、在channel上開(kāi)啟確認(rèn)模式:channel.confirmSelect()

    2、在channel上添加監(jiān)聽(tīng)addConfirmListener,監(jiān)聽(tīng)成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理。

7.2 Return返回消息

AMQP協(xié)議及RabbitMQ原理是什么  

    Return Listener用于處理一些不可路由的消息!

    我們的消息生產(chǎn)者通過(guò)指定一個(gè)Exchange和RoutingKey,把消息送到到某一個(gè)隊(duì)列中去,然后我們的消費(fèi)者監(jiān)聽(tīng)隊(duì)列,進(jìn)行消費(fèi)處理操作。

    但是在某些情況下,如果我們?cè)诎l(fā)送消息的時(shí)候,當(dāng)前的Exchange不存在或者指定的路由key路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽(tīng)這種不可達(dá)的消息,就要使用Return Listener。

08 消息端限流

    假設(shè)這樣一個(gè)場(chǎng)景,首先,RabbitMQ服務(wù)器有上萬(wàn)條未處理的消息,我們隨便打開(kāi)一個(gè)消費(fèi)者客戶端,會(huì)出現(xiàn)下面情況:巨量的消息瞬間全部推送過(guò)來(lái),但是我們單個(gè)客戶端無(wú)法同時(shí)處理這么多數(shù)據(jù)。

    RabbitMQ提供了一種QOS(服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過(guò)基于consumer或者channel設(shè)置Qos的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。

09 消息端確認(rèn)

    消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償。如果由于服務(wù)器宕機(jī)等嚴(yán)重問(wèn)題,那我們就需要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功。

    消息確認(rèn)ACK:如果在處理消息的過(guò)程中,消費(fèi)者的服務(wù)器在處理消息時(shí)出現(xiàn)異常,那可能這條正在處理的消息就沒(méi)有完成消息消費(fèi),數(shù)據(jù)就會(huì)丟失。為了確保數(shù)據(jù)不會(huì)丟失,RabbitMQ支持消息確認(rèn)ACK。

    ACK的消息確認(rèn)機(jī)制是消費(fèi)者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊(duì)列中刪除。

    1、如果一個(gè)消費(fèi)者在處理消息出現(xiàn)了網(wǎng)絡(luò)不穩(wěn)定、服務(wù)器異常等現(xiàn)象,那么就不會(huì)有ACK反饋,RabbitMQ會(huì)認(rèn)為這個(gè)消息沒(méi)有正常消費(fèi),會(huì)將消息重新放入隊(duì)列中。

    2、如果在集群的情況下:RabbitMQ會(huì)立即將這個(gè)消息推送給這個(gè)在線的其他消費(fèi)者。這種機(jī)制保證了在消費(fèi)者服務(wù)器故障的時(shí)候,不丟失任何消息和任務(wù)。

    3、消息永遠(yuǎn)不會(huì)從RabbitMQ中刪除:只有當(dāng)消費(fèi)者正確發(fā)送ACK反饋,RabbitMQ確認(rèn)收到后,消息才會(huì)從RabbitMQ服務(wù)器的數(shù)據(jù)中刪除;

    4、消息的ACK確認(rèn)機(jī)制默認(rèn)是打開(kāi)的。

    ACK機(jī)制的開(kāi)發(fā)注意事項(xiàng):

    如果忘記了ACK,那么后果很嚴(yán)重。當(dāng)Consumer退出時(shí),Message會(huì)一直重新分發(fā),然后RabbitMQ會(huì)占用越來(lái)越多的額內(nèi)存,由于RabbitMQ會(huì)長(zhǎng)時(shí)間運(yùn)行,因此這個(gè)“內(nèi)存泄露”是致命的。

10 重回隊(duì)列/TTL隊(duì)列/死信隊(duì)列

10.1 重回隊(duì)列機(jī)制

    消費(fèi)端重回隊(duì)列是為了對(duì)沒(méi)有處理成功的消息,把消息重新遞給Broker。

    一般在實(shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是設(shè)置為false。

10.2 TTL隊(duì)列/消息

    TTL是Time To Live的縮寫,也就是生存時(shí)間。

    RabbitMQ支持消息的過(guò)期時(shí)間,在消息發(fā)送時(shí)可以進(jìn)行指定。

    RabbitMQ支持隊(duì)列的過(guò)期時(shí)間,從消息入隊(duì)列開(kāi)始計(jì)算,只要超過(guò)了隊(duì)列的超時(shí)時(shí)間配置,那么消息會(huì)自動(dòng)的清除。

10.3 死信隊(duì)列

    死信隊(duì)列(DLX,Dead-Letter-Exchange)。利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX。

    消息變成死信的幾種情況:

    1、 消息被拒絕(basic.reject/basic.nack)并且request=false;

    2、 消息TTL過(guò)期;

    3、 隊(duì)列達(dá)到最大長(zhǎng)度。

    DLX也是一個(gè)正常的Exchange,和一般的Exchange沒(méi)有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。

    當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新分布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列。

    可以監(jiān)聽(tīng)這個(gè)隊(duì)列中消息做相應(yīng)的處理,這個(gè)特性可以彌補(bǔ)RabbitMQ3.0以前的immediate參數(shù)的功能。

    死信隊(duì)列的設(shè)置:

    1、首先,需要設(shè)置死信隊(duì)列的Exchange和queue,然后進(jìn)行綁定:

    Exchange:dlx.exchangeQueue:dlx.queueRoutingKey:#

    2、然后,我們進(jìn)行正常聲明交換機(jī)、隊(duì)列、綁定,只不過(guò)我們需要在隊(duì)列機(jī)上一個(gè)參數(shù)即可:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);這樣消息在過(guò)期、requeue、隊(duì)列在達(dá)到最大長(zhǎng)度時(shí),消息就可以直接路由到死信隊(duì)列。

11 RabbitMQ常用命令

    rabbitmqctl stop_app:關(guān)閉應(yīng)用

    rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用

    rabbitmqctl status:節(jié)點(diǎn)狀態(tài)

    rabbitmqctl add_user username password:添加用戶

    rabbitmqctl list_users:列出所有用戶

    rabbitmqctl delete_user username:刪除用戶

    rabbitmqctl clear_permissions -p vhostpath username:清除用戶權(quán)限

    rabbitmqctl list_user_permissions username:列出用戶權(quán)限

    rabbitmqctl change_password username newpassword:修改密碼

    rabbitmqctlset_permission -p vhostpath username “.*”“.*”“.*”:設(shè)置用戶權(quán)限

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI