您好,登錄后才能下訂單哦!
本篇內容介紹了“RabbitMQ的核心概念是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
RabbitMQ 相較于其他消息隊列,有一系列防止消息丟失的措施,擁有強悍的高可用性能,它的吞吐量可能沒有其他消息隊列大,但是其消息的保障性出類拔萃,被廣泛用于金融類業(yè)務。
AMQP: Advanced Message Queuing Protocol 高級消息隊列協(xié)議
AMQP定義:是具有現(xiàn)代特征的二進制協(xié)議。是一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。
Erlang語言最初在于交換機領域的架構模式,這樣使得RabbitMQ在Broker之間進行數(shù)據(jù)交互的性能是非常優(yōu)秀的,Erlang的優(yōu)點: Erlang有著和原生Socket一樣的延遲。
RabbitMQ是一個開源的消息代理和隊列服務器,用來通過普通協(xié)議在完全不同的應用之間共享數(shù)據(jù), RabbitMQ是使用Erlang語言來編寫的,并且RabbitMQ是基于AMQP協(xié)議的。關注公眾號Java技術棧獲取系列RabbitMQ教程。
生產者發(fā)送消息到指定的 Exchange,Exchange 依據(jù)自身的類型(direct、topic等),根據(jù) routing key 將消息發(fā)送給 0 - n 個 隊列,隊列再將消息轉發(fā)給了消費者。
Server: 又稱Broker, 接受客戶端的連接,實現(xiàn)AMQP實體服務,這里指RabbitMQ 服務器
Connection: 連接,應用程序與Broker的網(wǎng)絡連接。
Channel: 網(wǎng)絡信道,幾乎所有的操作都在 Channel 中進行,Channel是進行消息讀寫的通道??蛻舳丝山⒍鄠€Channel:,每個Channel代表一個會話任務。
Virtual host: 虛似地址,用于迸行邏輯隔離,是最上層的消息路由。一個 Virtual Host 里面可以有若干個 Exchange和 Queue ,同一個 VirtualHost 里面不能有相同名稱的 Exchange 或 Queue。權限控制的最小粒度是Virtual Host。
Binding: Exchange 和 Queue 之間的虛擬連接,binding 中可以包含 routing key。
Routing key: 一 個路由規(guī)則,虛擬機可用它來確定如何路由一個特定消息,即交換機綁定到 Queue 的鍵。
Queue: 也稱為Message Queue,消息隊列,保存消息并將它們轉發(fā)給消費者。
消息,服務器和應用程序之間傳送的數(shù)據(jù),由 Properties 和 Body 組成。Properties 可以對消息進行修飾,比如消息的優(yōu)先級、延遲等高級特性;,Body 則就 是消息體內容。
properties 中我們可以設置消息過期時間以及是否持久化等,也可以傳入自定義的map屬性,這些在消費端也都可以獲取到。
生產者
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; public class MessageProducer { public static void main(String[] args) throws Exception { //1. 創(chuàng)建一個 ConnectionFactory 并進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連接工廠來創(chuàng)建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創(chuàng)建 Channel Channel channel = connection.createChannel(); //4. 聲明 使用默認交換機 以隊列名作為 routing key String queueName = "msg_queue"; /** * deliverMode 設置為 2 的時候代表持久化消息 * expiration 意思是設置消息的有效期,超過10秒沒有被消費者接收后會被自動刪除 * headers 自定義的一些屬性 * */ //5. 發(fā)送 Map<String, Object> headers = new HashMap<String, Object>(); headers.put("myhead1", "111"); headers.put("myhead2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("100000") .headers(headers) .build(); String msg = "test message"; channel.basicPublish("", queueName, properties, msg.getBytes()); System.out.println("Send message : " + msg); //6. 關閉連接 channel.close(); connection.close(); } }
消費者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.Map; public class MessageConsumer { public static void main(String[] args) throws Exception{ //1. 創(chuàng)建一個 ConnectionFactory 并進行設置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); //2. 通過連接工廠來創(chuàng)建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創(chuàng)建 Channel Channel channel = connection.createChannel(); //4. 聲明 String queueName = "msg_queue"; channel.queueDeclare(queueName, false, false, false, null); //5. 創(chuàng)建消費者并接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); Map<String, Object> headers = properties.getHeaders(); System.out.println("head: " + headers.get("myhead1")); System.out.println(" [x] Received '" + message + "'"); System.out.println("expiration : "+ properties.getExpiration()); } }; //6. 設置 Channel 消費者綁定隊列 channel.basicConsume(queueName, true, consumer); } }
Send message : test message head: 111 [x] Received 'test message' 100000
Exchange 就是交換機,接收消息,根據(jù)路由鍵轉發(fā)消息到綁定的隊列。有很多的 Message 進入到 Exchange 中,Exchange 根據(jù) Routing key 將 Message 分發(fā)到不同的 Queue 中。
RabbitMQ中的 Exchange 有多種類型,類型不同,Message 的分發(fā)機制不同,如下:
fanout:廣播模式。這種類型的 Exchange 會將 Message 分發(fā)到綁定到該 Exchange 的所有 Queue。
direct:這種類型的 Exchange 會根據(jù) Routing key(精確匹配,將Message分發(fā)到指定的Queue。
Topic:這種類型的 Exchange 會根據(jù) Routing key(模糊匹配,將Message分發(fā)到指定的Queue。
headers: 主題交換機有點相似,但是不同于主題交換機的路由是基于路由鍵,頭交換機的路由值基于消息的header數(shù)據(jù)。主題交換機路由鍵只有是字符串,而頭交換機可以是整型和哈希值 .
/** * Declare an exchange, via an interface that allows the complete set of * arguments. * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) * @param autoDelete true if the server should delete the exchange when it is no longer in use * @param internal true if the exchange is internal, i.e. can't be directly * published to by a client. * @param arguments other properties (construction arguments) for the exchange * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java.io.IOException if an error is encountered */ Exchange.DeclareOk exchangeDeclare(String exchange, String type,boolean durable, boolean autoDelete,boolean internal, Map<String, Object> arguments) throws IOException;
Name: 交換機名稱
Type: 交換機類型direct、topic、 fanout、 headers
Durability: 是否需要持久化,true為持久化
Auto Delete: 當最后一個綁定到Exchange. 上的隊列刪除后,自動刪除該Exchange
Internal: 當前Exchange是否用于RabbitMQ內部使用,默認為False
Arguments: 擴展參數(shù),用于擴展AMQP協(xié)議自制定化使用
“RabbitMQ的核心概念是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網(wǎng)站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。