溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ如何實現(xiàn)Publish和Subscribe

發(fā)布時間:2021-12-24 09:16:34 來源:億速云 閱讀:103 作者:小新 欄目:大數(shù)據(jù)

小編給大家分享一下RabbitMQ如何實現(xiàn)Publish和Subscribe,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!



1、消息交換機【Exchange】

   在教程的前面部分,我們從隊列中發(fā)送和接收消息。在RabbitMQ中,現(xiàn)在是時候引入全消息模型。

   讓我們快速看看我們以前的教程講了什么:

   【生產者】:就是一個用于發(fā)送消息的用戶程序
   
   【消費者】:就是一個用于接收和使用消息的用戶程序

   【隊列】:是一個暫存消息的緩存區(qū)

   RabbitMQ消息傳遞模型的核心思想是,【生產者】不直接發(fā)送任何信息到隊列。事實上,【生產者】根本就不知道消息是否會被傳送到任何隊列。

   相反,【生產者】只能發(fā)送消息到【消息交換機】。交換是件很簡單的事。一方面它接收來自【生產者】的消息,另一方面是將接收到消息推送到隊列中?!鞠⒔粨Q機】必須知道它如何處理接收消息的確切方法。是否應該發(fā)送到特定隊列?它應該被發(fā)送到多個隊列呢?或者它應該被丟棄。該規(guī)則由【消息交換機】的類型來定義。

RabbitMQ如何實現(xiàn)Publish和Subscribe

 這里有一些可用的【消息交換機】的類型:【Direct】直接,【Topic】主題,【Headers】標題和【Fanout】扇出。我們將集中關注最后一個-【Fanout】扇出。讓我們創(chuàng)建一個這種類型的【消息交換機】,并給它命名為Logs:

channel.ExchangeDeclare("logs", "fanout");


   【Fanout】類型的【消息交換機】非常簡單。正如你從名字可能猜出的,它只是傳播它收到的所有消息去它知道所有的隊列中。這正是我們需要我們的日志記錄器。

    顯示【消息交換機】的列表:

    使用Rabbitmqctl列出在服務器上可以運行的最有用的【消息交換機】

sudo rabbitmqctl list_exchanges

    在這個列表中會有一些amq.*【消息交換機】和默認(未命名)消息交換機。這些都是默認創(chuàng)建的,但現(xiàn)在不太可能需要使用它們。

    默認的消息交換機

    在教程前面的部分我們隊【消息交換機】是一無所知,但是我依然可以發(fā)送消息去想去的隊列,那是因為我們使用了默認的【消息交換機】,這些默認的消息 交換機我用使用兩個雙引號“”來標識。

   我們回憶一下以前是如何發(fā)送消息的:

var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);

   第一個參數(shù)是【消息交換機】的名稱??兆址硎灸J或未命名的消息交換機:消息會被路由到指定的routingkey名稱的隊列,如果它存在的話。

   現(xiàn)在,我們可以發(fā)布到我們命名的【消息交換機】:

var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "",  basicProperties: null, body: body);


2、臨時隊列

   也許你還記得以前我們使用的隊列所指定的名稱(記得Hello和task_queue嗎?)對我們來說,能夠給一個隊列指定名稱是至關重要的--因為我們需要把【Worker】指向同一個隊列。如果要在【生產者】和【消費者】之間共享隊列,給隊列命名是很重要的。

    但這不是我們的日志記錄器的情況。我們想聽到所有的日志消息,而不僅僅是其中的一個子集。我們也只對當前剛剛收到的消息感興趣,而不是對舊的。為了解決上述問題,我們需要做兩件事。

   首先,無論何時當我們連接到Rabbit的時候,我們都需要一個新的并且是空的隊列。要做到這一點,我們可以創(chuàng)建一個具有隨機名稱的隊列,或者,甚至更好一點-讓服務器為我們選擇一個隨機隊列名稱。

   其次,一旦我們斷開與【消費者】的隊列就應該自動刪除該隊列。

   在.NET客戶端中,當我們沒有為queueDeclare()提供參數(shù)時,我們創(chuàng)建了一個具有生成名稱的非持久,排他,自動刪除隊列:

var queueName = channel.QueueDeclare().QueueName;

   在這點上,QueueName包含隨機隊列名稱。例如,它可能看起來像amq.gen-jzty20brgko-hjmujj0wlg。

3、綁定【Binding】 

 我們已經創(chuàng)建了一個【Fanout】類型的【消息交換機】和隊列?,F(xiàn)在我們需要告訴【消息交換機】向我們的隊列發(fā)送消息。【消息交換機】和【隊列】之間的關系稱為綁定。RabbitMQ如何實現(xiàn)Publish和Subscribe

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");


   從現(xiàn)在開始,日志的【消息交換機】就可以將消息推送到我們定義的隊列中去了。

   我們可以通過以下語句查看【binding】列表數(shù)據(jù):
  

rabbitmqctl list_bindings

4、把所有的代碼整合到一起

   【生產者】的程序,它發(fā)出的日志消息,看起來并沒有和以前的教程有很大的不同。最重要的變化是,我們現(xiàn)在想發(fā)送的消息是到達我們指定名稱的日志【消息交換機】,而不是無名的。我們在發(fā)送消息的時候需要提供一個routingkey表示的名稱,但【Fanout】類型的【消息交換機】會容忽視該routingKey的值的。

以上是“RabbitMQ如何實現(xiàn)Publish和Subscribe”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI