溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ的核心概念是什么

發(fā)布時間:2021-12-14 16:09:09 來源:億速云 閱讀:132 作者:iii 欄目:大數(shù)據(jù)

本篇內容介紹了“RabbitMQ的核心概念是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

RabbitMQ 特點

RabbitMQ 相較于其他消息隊列,有一系列防止消息丟失的措施,擁有強悍的高可用性能,它的吞吐量可能沒有其他消息隊列大,但是其消息的保障性出類拔萃,被廣泛用于金融類業(yè)務。

AMQP 協(xié)議

AMQP: Advanced Message Queuing Protocol 高級消息隊列協(xié)議

AMQP定義:是具有現(xiàn)代特征的二進制協(xié)議。是一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。

Erlang語言最初在于交換機領域的架構模式,這樣使得RabbitMQ在Broker之間進行數(shù)據(jù)交互的性能是非常優(yōu)秀的,Erlang的優(yōu)點: Erlang有著和原生Socket一樣的延遲。

RabbitMQ是一個開源的消息代理和隊列服務器,用來通過普通協(xié)議在完全不同的應用之間共享數(shù)據(jù), RabbitMQ是使用Erlang語言來編寫的,并且RabbitMQ是基于AMQP協(xié)議的。關注公眾號Java技術棧獲取系列RabbitMQ教程。

RabbitMQ 消息傳遞機制

生產者發(fā)送消息到指定的 Exchange,Exchange 依據(jù)自身的類型(direct、topic等),根據(jù) routing key 將消息發(fā)送給 0 - n 個 隊列,隊列再將消息轉發(fā)給了消費者。

RabbitMQ的核心概念是什么

Server: 又稱Broker, 接受客戶端的連接,實現(xiàn)AMQP實體服務,這里指RabbitMQ 服務器

Connection: 連接,應用程序與Broker的網(wǎng)絡連接。

Channel: 網(wǎng)絡信道,幾乎所有的操作都在 Channel 中進行,Channel是進行消息讀寫的通道??蛻舳丝山⒍鄠€Channel:,每個Channel代表一個會話任務。

Virtual host: 虛似地址,用于迸行邏輯隔離,是最上層的消息路由。一個 Virtual Host 里面可以有若干個 Exchange和 Queue ,同一個 VirtualHost 里面不能有相同名稱的 Exchange 或 Queue。權限控制的最小粒度是Virtual Host。

Binding: Exchange 和 Queue 之間的虛擬連接,binding 中可以包含 routing key。

Routing key: 一 個路由規(guī)則,虛擬機可用它來確定如何路由一個特定消息,即交換機綁定到 Queue 的鍵。

Queue: 也稱為Message Queue,消息隊列,保存消息并將它們轉發(fā)給消費者。

Message

消息,服務器和應用程序之間傳送的數(shù)據(jù),由 Properties 和 Body 組成。Properties 可以對消息進行修飾,比如消息的優(yōu)先級、延遲等高級特性;,Body 則就 是消息體內容。

properties 中我們可以設置消息過期時間以及是否持久化等,也可以傳入自定義的map屬性,這些在消費端也都可以獲取到。

生產者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;

public class MessageProducer {
    public static void main(String[] args) throws Exception {
        //1. 創(chuàng)建一個 ConnectionFactory 并進行設置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2. 通過連接工廠來創(chuàng)建連接
        Connection connection = factory.newConnection();

        //3. 通過 Connection 來創(chuàng)建 Channel
        Channel channel = connection.createChannel();

        //4. 聲明 使用默認交換機 以隊列名作為 routing key
        String queueName = "msg_queue";

        /**
         * deliverMode 設置為 2 的時候代表持久化消息
         * expiration 意思是設置消息的有效期,超過10秒沒有被消費者接收后會被自動刪除
         * headers 自定義的一些屬性
         * */
        //5. 發(fā)送
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("myhead1", "111");
        headers.put("myhead2", "222");

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("100000")
                .headers(headers)
                .build();
        String msg = "test message";
        channel.basicPublish("", queueName, properties, msg.getBytes());
        System.out.println("Send message : " + msg);

        //6. 關閉連接
        channel.close();
        connection.close();

    }
}

消費者

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;

public class MessageConsumer {
    public static void main(String[] args) throws Exception{
        //1. 創(chuàng)建一個 ConnectionFactory 并進行設置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2. 通過連接工廠來創(chuàng)建連接
        Connection connection = factory.newConnection();

        //3. 通過 Connection 來創(chuàng)建 Channel
        Channel channel = connection.createChannel();

        //4. 聲明
        String queueName = "msg_queue";
        channel.queueDeclare(queueName, false, false, false, null);

        //5. 創(chuàng)建消費者并接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                Map<String, Object> headers = properties.getHeaders();
                System.out.println("head: " + headers.get("myhead1"));
                System.out.println(" [x] Received '" + message + "'");
                System.out.println("expiration : "+ properties.getExpiration());
            }
        };

        //6. 設置 Channel 消費者綁定隊列
        channel.basicConsume(queueName, true, consumer);
    }
}
Send message : test message

head: 111
 [x] Received 'test message'
100000

Exchange

1. 簡介

Exchange 就是交換機,接收消息,根據(jù)路由鍵轉發(fā)消息到綁定的隊列。有很多的 Message 進入到 Exchange 中,Exchange 根據(jù) Routing key 將 Message 分發(fā)到不同的 Queue 中。

2. 類型

RabbitMQ中的 Exchange 有多種類型,類型不同,Message 的分發(fā)機制不同,如下:

  • fanout:廣播模式。這種類型的 Exchange 會將 Message 分發(fā)到綁定到該 Exchange 的所有 Queue。

  • direct:這種類型的 Exchange 會根據(jù) Routing key(精確匹配,將Message分發(fā)到指定的Queue。

  • Topic:這種類型的 Exchange 會根據(jù) Routing key(模糊匹配,將Message分發(fā)到指定的Queue。

  • headers: 主題交換機有點相似,但是不同于主題交換機的路由是基于路由鍵,頭交換機的路由值基于消息的header數(shù)據(jù)。主題交換機路由鍵只有是字符串,而頭交換機可以是整型和哈希值 .

3. 屬性

/**
* Declare an exchange, via an interface that allows the complete set of
* arguments.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param internal true if the exchange is internal, i.e. can't be directly
* published to by a client.
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange,
                                 String type,boolean durable,
                                 boolean autoDelete,boolean internal,
                                 Map<String, Object> arguments) throws IOException;
  • Name: 交換機名稱

  • Type: 交換機類型direct、topic、 fanout、 headers

  • Durability: 是否需要持久化,true為持久化

  • Auto Delete: 當最后一個綁定到Exchange. 上的隊列刪除后,自動刪除該Exchange

  • Internal: 當前Exchange是否用于RabbitMQ內部使用,默認為False

  • Arguments: 擴展參數(shù),用于擴展AMQP協(xié)議自制定化使用

“RabbitMQ的核心概念是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網(wǎng)站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI