溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ的工作原理是什么

發(fā)布時間:2021-06-21 18:39:07 來源:億速云 閱讀:262 作者:Leah 欄目:大數(shù)據(jù)

這篇文章給大家介紹RabbitMQ的工作原理是什么,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

RabbitMQ簡介

在介紹RabbitMQ之前實現(xiàn)要介紹一下MQ,MQ是什么?

MQ全稱是Message Queue,可以理解為消息隊列的意思,簡單來說就是消息以管道的方式進行傳遞。

RabbitMQ是一個實現(xiàn)了AMQP(Advanced Message Queuing Protocol)高級消息隊列協(xié)議的消息隊列服務(wù),用Erlang語言的。

使用場景

在我們秒殺搶購商品的時候,系統(tǒng)會提醒我們稍等排隊中,而不是像幾年前一樣頁面卡死或報錯給用戶。

像這種排隊結(jié)算就用到了消息隊列機制,放入通道里面一個一個結(jié)算處理,而不是某個時間斷突然涌入大批量的查詢新增把數(shù)據(jù)庫給搞宕機,所以RabbitMQ本質(zhì)上起到的作用就是削峰填谷,為業(yè)務(wù)保駕護航。

為什么選擇RabbitMQ

現(xiàn)在的市面上有很多MQ可以選擇,比如ActiveMQ、ZeroMQ、Appche Qpid,那問題來了為什么要選擇RabbitMQ?

  1. 除了Qpid,RabbitMQ是唯一一個實現(xiàn)了AMQP標準的消息服務(wù)器;

  2. 可靠性,RabbitMQ的持久化支持,保證了消息的穩(wěn)定性;

  3. 高并發(fā),RabbitMQ使用了Erlang開發(fā)語言,Erlang是為電話交換機開發(fā)的語言,天生自帶高并發(fā)光環(huán),和高可用特性;

  4. 集群部署簡單,正是應(yīng)為Erlang使得RabbitMQ集群部署變的超級簡單;

  5. 社區(qū)活躍度高,根據(jù)網(wǎng)上資料來看,RabbitMQ也是首選;

工作機制

生產(chǎn)者、消費者和代理

在了解消息通訊之前首先要了解3個概念:生產(chǎn)者、消費者和代理。

生產(chǎn)者:消息的創(chuàng)建者,負責(zé)創(chuàng)建和推送數(shù)據(jù)到消息服務(wù)器;

消費者:消息的接收方,用于處理數(shù)據(jù)和確認消息;

代理:就是RabbitMQ本身,用于扮演“快遞”的角色,本身不生產(chǎn)消息,只是扮演“快遞”的角色。

消息發(fā)送原理

首先你必須連接到Rabbit才能發(fā)布和消費消息,那怎么連接和發(fā)送消息的呢?

你的應(yīng)用程序和Rabbit Server之間會創(chuàng)建一個TCP連接,一旦TCP打開,并通過了認證,認證就是你試圖連接Rabbit之前發(fā)送的Rabbit服務(wù)器連接信息和用戶名和密碼,有點像程序連接數(shù)據(jù)庫,使用Java有兩種連接認證的方式,后面代碼會詳細介紹,一旦認證通過你的應(yīng)用程序和Rabbit就創(chuàng)建了一條AMQP信道(Channel)。

信道是創(chuàng)建在“真實”TCP上的虛擬連接,AMQP命令都是通過信道發(fā)送出去的,每個信道都會有一個唯一的ID,不論是發(fā)布消息,訂閱隊列或者介紹消息都是通過信道完成的。

為什么不通過TCP直接發(fā)送命令?

對于操作系統(tǒng)來說創(chuàng)建和銷毀TCP會話是非常昂貴的開銷,假設(shè)高峰期每秒有成千上萬條連接,每個連接都要創(chuàng)建一條TCP會話,這就造成了TCP連接的巨大浪費,而且操作系統(tǒng)每秒能創(chuàng)建的TCP也是有限的,因此很快就會遇到系統(tǒng)瓶頸。

如果我們每個請求都使用一條TCP連接,既滿足了性能的需要,又能確保每個連接的私密性,這就是引入信道概念的原因。

RabbitMQ的工作原理是什么

你必須知道的Rabbit

想要真正的了解Rabbit有些名詞是你必須知道的。

包括:ConnectionFactory(連接管理器)、Channel(信道)、Exchange(交換器)、Queue(隊列)、RoutingKey(路由鍵)、BindingKey(綁定鍵)。

**ConnectionFactory(連接管理器):**應(yīng)用程序與Rabbit之間建立連接的管理器,程序代碼中使用;

**Channel(信道):**消息推送使用的通道;

**Exchange(交換器):**用于接受、分配消息;

Queue(隊列):用于存儲生產(chǎn)者的消息;

RoutingKey(路由鍵):用于把生成者的數(shù)據(jù)分配到交換器上;

BindingKey(綁定鍵):用于把交換器的消息綁定到隊列上;

看到上面的解釋,最難理解的路由鍵和綁定鍵了,那么他們具體怎么發(fā)揮作用的,請看下圖:

RabbitMQ的工作原理是什么  

關(guān)于更多交換器的信息,我們在后面再講。

消息持久化

Rabbit隊列和交換器有一個不可告人的秘密,就是默認情況下重啟服務(wù)器會導(dǎo)致消息丟失,那么怎么保證Rabbit在重啟的時候不丟失呢?答案就是消息持久化。

當(dāng)你把消息發(fā)送到Rabbit服務(wù)器的時候,你需要選擇你是否要進行持久化,但這并不能保證Rabbit能從崩潰中恢復(fù),想要Rabbit消息能恢復(fù)必須滿足3個條件:

  1. 投遞消息的時候durable設(shè)置為true,消息持久化,代碼:channel.queueDeclare(x, true, false, false, null),參數(shù)2設(shè)置為true持久化;

  2. 設(shè)置投遞模式deliveryMode設(shè)置為2(持久),代碼:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),參數(shù)3設(shè)置為存儲純文本到磁盤;

  3. 消息已經(jīng)到達持久化交換器上;

  4. 消息已經(jīng)到達持久化的隊列;

持久化工作原理

Rabbit會將你的持久化消息寫入磁盤上的持久化日志文件,等消息被消費之后,Rabbit會把這條消息標識為等待垃圾回收。

持久化的缺點

消息持久化的優(yōu)點顯而易見,但缺點也很明顯,那就是性能,因為要寫入硬盤要比寫入內(nèi)存性能較低很多,從而降低了服務(wù)器的吞吐量,盡管使用SSD硬盤可以使事情得到緩解,但他仍然吸干了Rabbit的性能,當(dāng)消息成千上萬條要寫入磁盤的時候,性能是很低的。

所以使用者要根據(jù)自己的情況,選擇適合自己的方式。

虛擬主機

每個Rabbit都能創(chuàng)建很多vhost,我們稱之為虛擬主機,每個虛擬主機其實都是mini版的RabbitMQ,擁有自己的隊列,交換器和綁定,擁有自己的權(quán)限機制。

vhost特性

  1. RabbitMQ默認的vhost是“/”開箱即用;

  2. 多個vhost是隔離的,多個vhost無法通訊,并且不用擔(dān)心命名沖突(隊列和交換器和綁定),實現(xiàn)了多層分離;

  3. 創(chuàng)建用戶的時候必須指定vhost;

vhost操作

可以通過rabbitmqctl工具命令創(chuàng)建:

rabbitmqctl add_vhost[vhost_name]

刪除vhost:

rabbitmqctl delete_vhost[vhost_name]

查看所有的vhost:

rabbitmqctl list_vhosts

環(huán)境搭建

前文我們已經(jīng)介紹了Ubuntu搭建RabbitMQ的步驟:RabbitMQ在Ubuntu上的環(huán)境搭建

如果你是在Windows10上去安裝那就更簡單了,先放下載地址:

Erlang/Rabbit Server百度網(wǎng)盤鏈接:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密碼:wct9

當(dāng)然也可去Erlang和Rabbit官網(wǎng)去下,就是速度比較慢。我的百度云Rabbit最新版本:3.7.6,Erlang版本:20.2,注意:不要下載最新的Erlang,在Windows10上打開擴展插件有問題,打不開。

  1. 安裝Erlang;

  2. 安裝Rabbit Server;

  3. 進入安裝目錄\sbin下,使用命令“rabbitmq-plugins enable rabbitmq_management”啟動網(wǎng)頁管理插件;

  4. 重啟Rabbit服務(wù);

使用:http://localhost:15672進行測試,默認的登陸賬號為:guest,密碼為:guest

重復(fù)安裝Rabbit Server的坑

如果不是第一次在Windows上安裝Rabbit Server一定要把Rabbit和Erlang卸載干凈之后,找到注冊表:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 刪除其下的所有項。

不然會出現(xiàn)Rabbit安裝之后啟動不了的情況,理論上卸載的順序也是先Rabbit在Erlang。

代碼實現(xiàn)

java版實現(xiàn),使用maven項目,創(chuàng)建可以查看:MyEclipse2017破解設(shè)置與maven項目搭建

項目創(chuàng)建成功之后,添加Rabbit Client jar包,只需要在pom.xml里面配置,如下信息:

 <dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.2.0</version>
</dependency>復(fù)制代碼

java實現(xiàn)代碼分為兩個類,第一個是創(chuàng)建Rabbit連接,第二是應(yīng)用類使用最簡單的方式發(fā)布和消費消息。

Rabbit的連接,兩種方式:

方式一:

public static Connection GetRabbitConnection() {
	ConnectionFactory factory = new ConnectionFactory();
	factory.setUsername(Config.UserName);
	factory.setPassword(Config.Password);
	factory.setVirtualHost(Config.VHost);
	factory.setHost(Config.Host);
	factory.setPort(Config.Port);
	Connection conn = null;
	try {
		conn = factory.newConnection();
	} catch (Exception e) {
		e.printStackTrace();
	}	return conn;
}復(fù)制代碼

方式二:

public static Connection GetRabbitConnection2() {
	ConnectionFactory factory = new ConnectionFactory();
	// 連接格式:amqp://userName:password@hostName:portNumber/virtualHost
	String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port,
			Config.VHost);
	Connection conn = null;
	try {
		factory.setUri(uri);
		factory.setVirtualHost(Config.VHost);
		conn = factory.newConnection();
	} catch (Exception e) {
		e.printStackTrace();
	}	return conn;
}復(fù)制代碼

第二部分:應(yīng)用類,使用最簡單的方式發(fā)布和消費消息

public static void main(String[] args) {
	Publisher(); // 推送消息

	Consumer(); // 消費消息
}

/**
 * 推送消息
 */
public static void Publisher() {
	// 創(chuàng)建一個連接
	Connection conn = ConnectionFactoryUtil.GetRabbitConnection();	if (conn != null) {
		try {
			// 創(chuàng)建通道
			Channel channel = conn.createChannel();
			// 聲明隊列【參數(shù)說明:參數(shù)一:隊列名稱,參數(shù)二:是否持久化;參數(shù)三:是否獨占模式;參數(shù)四:消費者斷開連接時是否刪除隊列;參數(shù)五:消息其他參數(shù)】
			channel.queueDeclare(Config.QueueName, false, false, false, null);
			String content = String.format("當(dāng)前時間:%s", new Date().getTime());
			// 發(fā)送內(nèi)容【參數(shù)說明:參數(shù)一:交換機名稱;參數(shù)二:隊列名稱,參數(shù)三:消息的其他屬性-routing headers,此屬性為MessageProperties.PERSISTENT_TEXT_PLAIN用于設(shè)置純文本消息存儲到硬盤;參數(shù)四:消息主體】
			channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));
			System.out.println("已發(fā)送消息:" + content);
			// 關(guān)閉連接
			channel.close();
			conn.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

/**
 * 消費消息
 */
public static void Consumer() {
	// 創(chuàng)建一個連接
	Connection conn = ConnectionFactoryUtil.GetRabbitConnection();	if (conn != null) {
		try {
			// 創(chuàng)建通道
			Channel channel = conn.createChannel();
			// 聲明隊列【參數(shù)說明:參數(shù)一:隊列名稱,參數(shù)二:是否持久化;參數(shù)三:是否獨占模式;參數(shù)四:消費者斷開連接時是否刪除隊列;參數(shù)五:消息其他參數(shù)】
			channel.queueDeclare(Config.QueueName, false, false, false, null);

			// 創(chuàng)建訂閱器,并接受消息
			channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {
					String routingKey = envelope.getRoutingKey(); // 隊列名稱
					String contentType = properties.getContentType(); // 內(nèi)容類型
					String content = new String(body, "utf-8"); // 消息正文
					System.out.println("消息正文:" + content);
					channel.basicAck(envelope.getDeliveryTag(), false); // 手動確認消息【參數(shù)說明:參數(shù)一:該消息的index;參數(shù)二:是否批量應(yīng)答,true批量確認小于index的消息】
				}
			});

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}復(fù)制代碼

本文參與“OSC源創(chuàng)計劃”,歡迎正在閱讀的你也加入,一起分享。

關(guān)于RabbitMQ的工作原理是什么就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI