溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

.NET操作RabbitMQ組件EasyNetQ使用中文簡版文檔。

發(fā)布時間:2020-07-31 14:44:47 來源:網(wǎng)絡(luò) 閱讀:1273 作者:26度出太陽 欄目:編程語言

EasyNetQ簡介

  EasyNetQ是基于官方.NET組件RabbitMQ.Client 的又一層封裝,使用起來更加方便,開發(fā)者不用關(guān)心具體隊列聲明,路由聲明等細節(jié),幾句簡單代碼即可發(fā)送消息到隊列,接收消息也很簡單,下面將簡單介紹EasyNetQ的使用方法。不知道什么是RabbitMQ?您可以關(guān)閉網(wǎng)頁了。

安裝EasyNetQ

  從NuGet上安裝即可,由于EasyNetQ是依賴RabbitMQ.Client所以,會同時安裝兩個dll。

PM> Install-Package EasyNetQ

連接RabbitMQ

  使用EasyNetQ連接RabbitMQ,是在應(yīng)用程序啟動時創(chuàng)建一個IBus對象,并且,在應(yīng)用程序關(guān)閉時釋放該對象。RabbitMQ連接是基于IBus接口的,當IBus中的方法被調(diào)用,連接才會開啟。創(chuàng)建一個IBus對象的方法如下:

var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=mike;password=topsecret”);

  根據(jù)上述代碼可以看出,連接字符串中是基于Key/Value形式的,每個Key中間用分號(;)斷開。其中host是必須要寫的,其他的值都是可以不用寫,會采用默認的配置。連接中可能用到的Key如下:

  • host,host=localhost 或者host =192.168.1.102或者host=my.rabbitmq.com,如果用到集群配置的話,那么可以用逗號將服務(wù)地址隔開,例如host=a.com,b.com,c.com

  • virtualHost,虛擬主機,默認為'/'

  • username,用戶登錄名

  • password,用戶登錄密碼

  • requestedHeartbeat,心跳設(shè)置,默認是10秒

  • prefetchcount,默認是50

  • pubisherConfirms,默認為false

  • persistentMessages,消息持久化,默認為true

  • product,產(chǎn)品名

  • platform,平臺

  • timeout,默認為10秒

  關(guān)閉連接,可以使用 bus.Dispose();

EasyNetQ日志(Logging)

  EasyNetQ提供了一個日志接口 IEasyNetQLogger

.NET操作RabbitMQ組件EasyNetQ使用中文簡版文檔。

public interface IEasyNetQLogger
{   void DebugWrite(string format, params object[] args);   void InfoWrite(string format, params object[] args);   void ErrorWrite(string format, params object[] args);   void ErrorWrite(Exception exception);
}

.NET操作RabbitMQ組件EasyNetQ使用中文簡版文檔。

  內(nèi)部默認用的是NullLogger,即什么也不做,不記錄日志。在測試的時候也可以用ConsoleLogger來顯示EasyNetQ運行中的各種信息。不過一般在正式使用環(huán)境中,可以自定義日志并實現(xiàn)IEasyNetQLogger接口。然后在RabbitHutch.CreateBus的重載方法中注冊想用的日志類型。(日志中會記錄連接RabbitMQ的過程和隊列創(chuàng)建細節(jié)等信息,對于不懂RabbitMQ的同學(xué),可能這些日志沒有什么意義)。代碼如下:

var logger = new MyLogger() // 繼承自 IEasyNetQLoggervar bus = RabbitHutch.CreateBus(“my connection”, x => x.Register<IEasyNetQLogger>(_ => logger));

消息發(fā)布(Publish)

  EasyNetQ支持最簡單的消息模式是發(fā)布和訂閱。發(fā)布消息后,任意消費者可以訂閱該消息,也可以多個消費者訂閱。并且不需要額外配置。首先,如上文中需要先創(chuàng)建一個IBus對象,然后,在創(chuàng)建一個可序列化的.NET對象。調(diào)用Publish方法即可。

 message =  MyMessage { Text =

  警告,Publish只顧發(fā)送消息到隊列,但是不管有沒有消費端訂閱,所以,發(fā)布之后,如果沒有消費者,該消息將不會被消費甚至丟失。

消息訂閱(Subscribe)

  EasyNetQ提供了消息訂閱,當調(diào)用Subscribe方法時候,EasyNetQ會創(chuàng)建一個用于接收消息的隊列,不過與消息發(fā)布不同的是,消息訂閱增加了一個參數(shù),subscribe_id.代碼如下:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

  第一個參數(shù)是訂閱id,另外一個是delegate參數(shù),用于處理接收到的消息。這里要注意的是,subscribe_id參數(shù)很重要,假如開發(fā)者用同一個subscribeid訂閱了同一種消息類型兩次或者多次,RabbitMQ會以輪訓(xùn)的方式給每個訂閱的隊列發(fā)送消息。接收到之后,其他隊列就接收不到該消息。如果用不同的subscribeid訂閱同一種消息類型,那么生成的每一個隊列都會收到該消息。

  舉個例子:出庫發(fā)貨,我們有五個商品倉庫,每個倉庫的商品都是一樣的,假如來了一堆訂單,那么我們需要五個倉庫共同工作,分別處理訂單。而同樣,總倉庫需要知道總出貨量,正常情況下,可以用每個倉庫的出貨量相加即可。不過如果我們在總倉庫也監(jiān)聽商品訂單消息,那么,每次來訂單,總倉庫也都會收到一份,那么可以作相應(yīng)的統(tǒng)計了。

  需要注意的是,在收到消息處理消息時候,不要占用太多的時間,會影響消息的處理效率,所以,遇到占用長時間的處理方法,最好用異步處理。代碼如下:

.NET操作RabbitMQ組件EasyNetQ使用中文簡版文檔。

bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => 
    new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
        .ContinueWith(task => 
            Console.WriteLine("Received: '{0}', Downloaded: '{1}'", 
                message.Text, 
                task.Result)));

.NET操作RabbitMQ組件EasyNetQ使用中文簡版文檔。

  取消訂閱,可以用如下方法:

var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);

...

subscriptionResult.Dispose();

  或者直接IBus.Dispose();

消息發(fā)送(Send)和接收(Receive)

  與Publish/Subscribe略有不同的是,Send/Receive 可以自己定義隊列名稱。

bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });
bus.Receive<MyMessage>("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));

  并且,也可以在同一個隊列上發(fā)送不同的消息類型,Receive方法可以這么寫:

bus.Receive("my.queue", x => x
    .Add<MyMessage>(message => deliveredMyMessage = message)
    .Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));

  如果消息到達隊列,但是沒有發(fā)現(xiàn)相應(yīng)消息類型的處理時,EasyNetQ會發(fā)送一條消息到error隊列,并且,帶上一個異常信息:No handler found for message type <message type>。與Subscribe類型,如果在同一個隊列,同一個消息類型,多次調(diào)用Receive方法時,消息會通過輪詢的形式發(fā)送給每個Receive端。

消息路由(Topic Based Routing)

  Publish方法,可以加一個topic參數(shù)。

bus.Publish(message, "X.A");

  消息訂閱方可以通過路由來過濾相應(yīng)的消息。

  * 匹配一個字符

  #匹配0個或者多個字符

  所以 X.A.2 會匹配到 "#", "X.#", "*.A.*" 但不會匹配 "X.B.*" 或者 "A". 當消息訂閱需要用到topic時候,需要調(diào)用Subscribe的重載方法

bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*"));
bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));

  上述這種方式,會將消息輪詢發(fā)送給兩個訂閱者,如果只需要一個訂閱者的話,可以這么調(diào)用:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));

總結(jié)

  以上就是EasyNetQ的一些基本用法了,是不是很簡單呢,就這么輕松實現(xiàn)了消息隊列的使用。當然,要深入內(nèi)部還是有很多東西的。比如依賴注入,自定義EasyNetQ組件,RPC實現(xiàn)等。而且,他的源碼也是比較有參考價值的,相對于之前自己寫的基于RabbitMQ的封裝,自己的簡直是不能看呀。希望本文能給讀完的你帶來幫助。


向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI