溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

.Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

發(fā)布時(shí)間:2022-11-01 09:21:53 來源:億速云 閱讀:91 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了.Net Core和RabbitMQ怎么限制循環(huán)消費(fèi)的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇.Net Core和RabbitMQ怎么限制循環(huán)消費(fèi)文章都會(huì)有所收獲,下面我們一起來看看吧。

    前言

    當(dāng)消費(fèi)者端接收消息處理業(yè)務(wù)時(shí),如果出現(xiàn)異常或是拒收消息將消息又變更為等待投遞再次推送給消費(fèi)者,這樣一來,則形成循環(huán)的條件。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    循環(huán)場景

    生產(chǎn)者發(fā)送100條消息到RabbitMQ中,消費(fèi)者設(shè)定讀取到第50條消息時(shí),設(shè)置拒收,同時(shí)設(shè)定是否還留存在當(dāng)前隊(duì)列中(當(dāng)requeue為false時(shí),設(shè)置了死信隊(duì)列則進(jìn)入死信隊(duì)列,否則移除消息)。

    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));
    
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            Console.WriteLine("拒收");
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
    
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };

    當(dāng)?shù)?0條消息拒收,則仍在隊(duì)列中且處在隊(duì)列頭部,重新推送給消費(fèi)者,再次拒收,再次推送,反反復(fù)復(fù)。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    最終其他消息全部消費(fèi)完畢,僅剩第50條消息往復(fù)間不斷消費(fèi),拒收,消費(fèi),這將可能導(dǎo)致RabbitMQ出現(xiàn)內(nèi)存泄漏問題。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    解決方案

    RabbitMQ及AMQP協(xié)議本身沒有提供這類重試功能,但可以利用一些已有的功能來間接實(shí)現(xiàn)重試限定(以下只考慮基于手動(dòng)確認(rèn)模式情況)。此處只想到或是只查到了如下幾種方案解決消息循環(huán)消費(fèi)問題。

    • 一次消費(fèi)

      • 無論成功與否,消費(fèi)者都對(duì)外返回ack,將拒收原因或是異常信息catch存入本地或是新隊(duì)列中另作重試。

      • 消費(fèi)者拒絕消息或是出現(xiàn)異常,返回Nack或Reject,消息進(jìn)入死信隊(duì)列或丟棄(requeue設(shè)定為false)。

    • 限定重試次數(shù)

      • 在消息的頭中添加重試次數(shù),并將消息重新發(fā)送出去,再每次重新消費(fèi)時(shí)從頭中判斷重試次數(shù),遞增或遞減該值,直到達(dá)到限制,requeue改為false,最終進(jìn)入死信隊(duì)列或丟棄。

      • 可以在Redis、Memcache或其他存儲(chǔ)中存儲(chǔ)消息唯一鍵(例如Guid、雪花Id等,但必須在發(fā)布消息時(shí)手動(dòng)設(shè)置它),甚至在mysql中連同重試次數(shù)一起存儲(chǔ),然后在每次重新消費(fèi)時(shí)遞增/遞減該值,直到達(dá)到限制,requeue改為false,最終進(jìn)入死信隊(duì)列或丟棄。

      • 隊(duì)列使用Quorum類型,限制投遞次數(shù),超過次數(shù)消息被刪除。

    • 隊(duì)列消息過期

      • 設(shè)置過期時(shí)間,給隊(duì)列或是消息設(shè)置TTL,重試一定次數(shù)消息達(dá)到過期時(shí)間后進(jìn)入死信隊(duì)列或丟棄(requeue設(shè)定為true)。

    • 也許還有更多好的方案...

    一次消費(fèi)

    對(duì)外總是Ack

    消息到達(dá)了消費(fèi)端,可因某些原因消費(fèi)失敗了,對(duì)外可以發(fā)送Ack,而在內(nèi)部走額外的方式去執(zhí)行補(bǔ)償操作,比如將消息轉(zhuǎn)發(fā)到內(nèi)部的RabbitMQ或是其他處理方式,終歸是只消費(fèi)一次。

    var queueName = "alwaysack_queue";
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.BasicQos(0, 5, false);
    
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        try
        {
            var message = ea.Body;
            Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));
    
            if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
            {
                throw new Exception("模擬異常");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        finally
        {
            ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
        }
    };
    
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    當(dāng)消費(fèi)端收到消息,處理時(shí)出現(xiàn)異常,可以另想辦法去處理,而對(duì)外保持著ack的返回,以避免消息的循環(huán)消費(fèi)。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    消息不重入隊(duì)列

    在消費(fèi)者端,因異?;蚴蔷苁障r(shí),對(duì)requeue設(shè)置為false時(shí),如果設(shè)置了死信隊(duì)列,則符合“消息被拒絕且不重入隊(duì)列”這一進(jìn)入死信隊(duì)列的情況,從而避免消息反復(fù)重試。如未設(shè)置死信隊(duì)列,則消息被丟失。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    此處假定接收100條消息,在接收到第50條消息時(shí)設(shè)置拒收,并且設(shè)置了requeue為false。

    var dlxExchangeName = "dlx_exchange";
    channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
    var dlxQueueName = "dlx_queue";
    channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
    
    var queueName = "nackorreject_queue";
    var arguments = new Dictionary<string, object>
    {
        { "x-dead-letter-exchange", dlxExchangeName }
    };
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
    channel.BasicQos(0, 5, false);
    
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));
    
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            Console.WriteLine("拒收");
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//關(guān)鍵在于requeue=false
            return;
        }
    
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    如此一來,拒收消息不會(huì)重入隊(duì)列,并且現(xiàn)有隊(duì)列綁定了死信交換機(jī),因此,消息進(jìn)入到死信隊(duì)列中,如不綁定,則消息丟失。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    限定重試次數(shù)

    設(shè)置重試次數(shù),限定循環(huán)消費(fèi)的次數(shù),允許短暫的循環(huán),但最終打破循環(huán)。

    消息頭設(shè)定次數(shù)

    在消息頭中設(shè)置次數(shù)記錄作為標(biāo)記,但是,消費(fèi)端無法對(duì)接收到的消息修改消息頭然后將原消息送回MQ,因此,需要將原消息內(nèi)容重新發(fā)送消息到MQ,具體步驟如下

    • 原消息設(shè)置不重入隊(duì)列。

    • 再發(fā)送新的消息其內(nèi)容與原消息一致,可設(shè)置新消息的消息頭來攜帶重試次數(shù)。

    • 消費(fèi)端再次消費(fèi)時(shí),便可從消息頭中查看消息被消費(fèi)的次數(shù)。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    此處假定接收10條消息,在接收到第5條消息時(shí)設(shè)置拒收, 當(dāng)消息頭中重試次數(shù)未超過設(shè)定的3次時(shí),消息可以重入隊(duì)列,再次被消費(fèi)。

    var queueName = "messageheaderretrycount_queue";
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.BasicQos(0, 5, false);
    
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));
    
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))
        {
            var maxRetryCount = 3;
    
            Console.WriteLine($"拒收 {DateTime.Now}");
    
            //初次消費(fèi)
            if (ea.BasicProperties.Headers == null)
            {
                //原消息設(shè)置為不重入隊(duì)列
                ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
    
                //發(fā)送新消息到隊(duì)列中
                RetryPublishMessage(channel, queueName, message.ToArray(), 1);
                return;
            }
    
            //獲取重試次數(shù)
            var retryCount = ParseRetryCount(ea);
            if (retryCount < maxRetryCount)
            {
                //原消息設(shè)置為不重入隊(duì)列
                ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
    
                //發(fā)送新消息到隊(duì)列中
                RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);
                return;
            }
    
            //到達(dá)最大次數(shù),不再重試消息
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
            return;
        }
    
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    
    static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount)
    {
        var basicProperties = channel.CreateBasicProperties();
        basicProperties.Headers = new Dictionary<string, object>();
        basicProperties.Headers.Add("retryCount", retryCount);
        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
    }
    
    static int ParseRetryCount(BasicDeliverEventArgs ea)
    {
        var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);
        if (!existRetryRecord)
        {
            throw new Exception("沒有設(shè)置重試次數(shù)");
        }
    
        return (int)retryCount;
    }

    消息被拒收后,再重新發(fā)送消息到原有交換機(jī)或是隊(duì)列下中,以使得消息像是消費(fèi)失敗回到了隊(duì)列中,如此來控制消費(fèi)次數(shù),但是這種場景下,新消息排在了隊(duì)列的尾部,而不是原消息排在隊(duì)列頭部。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    存儲(chǔ)重試次數(shù)

    在存儲(chǔ)服務(wù)中存儲(chǔ)消息的唯一標(biāo)識(shí)與對(duì)應(yīng)重試次數(shù),消費(fèi)消息前對(duì)消息進(jìn)行判斷是否存在。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    與消息頭判斷一致,只是消息重試次數(shù)的存儲(chǔ)從消息本身挪入存儲(chǔ)服務(wù)中了。需要注意的是,消息發(fā)送端需要設(shè)置消息的唯一標(biāo)識(shí)(MessageId屬性)

    //模擬外部存儲(chǔ)服務(wù)
    var MessageRetryCounts = new Dictionary<ulong, int>();
    
    var queueName = "storageretrycount_queue";
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.BasicQos(0, 5, false);
    
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));
    
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            var maxRetryCount = 3;
            Console.WriteLine("拒收");
        
            //重試次數(shù)判斷
            var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);
            if (!existRetryRecord)
            {
                //重入隊(duì)列,繼續(xù)重試
                MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);
                ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
                return;
            }
        
            if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)
            {
                //重入隊(duì)列,繼續(xù)重試
                MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;
                ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
                return;
            }
        
            //到達(dá)最大次數(shù),不再重試消息
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
            return;
        }
    
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    除第一次拒收外,允許三次重試機(jī)會(huì),三次重試完畢后,設(shè)置requeue為false,消息丟失或進(jìn)入死信隊(duì)列(如有設(shè)置的話)。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    隊(duì)列使用Quorum類型

    第一種和第二種分別是消息自身、外部存儲(chǔ)服務(wù)來管理消息重試次數(shù),使用Quorum,由MQ來限定消息的投遞次數(shù),也就控制了重試次數(shù)。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    設(shè)置隊(duì)列類型為quorum,設(shè)置投遞最大次數(shù),當(dāng)超過投遞次數(shù)后,消息被丟棄。

    var queueName = "quorumtype_queue";
    var arguments = new Dictionary<string, object>()
    {
        { "x-queue-type", "quorum"},
        { "x-delivery-limit", 3 }
    };
    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
    channel.BasicQos(0, 5, false);
    
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));
    
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            Console.WriteLine($"拒收 {DateTime.Now}");
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
    
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    第一次消費(fèi)被拒收重入隊(duì)列后,經(jīng)最大三次投遞后,消費(fèi)端不再收到消息,如此一來也限制了消息的循環(huán)消費(fèi)。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    隊(duì)列消息過期

    當(dāng)為消息設(shè)置了過期時(shí)間時(shí),當(dāng)消息沒有受到Ack,且還在隊(duì)列中,受到過期時(shí)間的限制,反復(fù)消費(fèi)但未能成功時(shí),消息將走向過期,進(jìn)入死信隊(duì)列或是被丟棄。

    聚焦于過期時(shí)間的限制,因此在消費(fèi)者端,因異常或是拒收消息時(shí),需要對(duì)requeue設(shè)置為true,將消息再次重入到原隊(duì)列中。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    設(shè)定消費(fèi)者端第五十條消息會(huì)被拒收,且隊(duì)列的TTL設(shè)置為5秒。

    //死信交換機(jī)和死信隊(duì)列
    var dlxExchangeName = "dlx_exchange";
    channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
    var dlxQueueName = "dlx_queue";
    channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
    
    //常規(guī)隊(duì)列
    var queueName = "normalmessage_queue";
    var arguments = new Dictionary<string, object>
    {
        { "x-message-ttl", 5000},
        { "x-dead-letter-exchange", dlxExchangeName }
    };
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
    channel.BasicQos(0, 5, false);
    
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));
    
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            Console.WriteLine($"拒收 {DateTime.Now}");
    
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
    
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    當(dāng)消費(fèi)者端拒收消息后消息重入隊(duì)列,再次消費(fèi),反復(fù)進(jìn)行超過5秒后,消息在隊(duì)列中達(dá)到了過期時(shí)間,則被挪入到死信隊(duì)列中。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    從Web管理中死信隊(duì)列中可查看該條過期的消息。

    .Net?Core和RabbitMQ怎么限制循環(huán)消費(fèi)

    關(guān)于“.Net Core和RabbitMQ怎么限制循環(huán)消費(fèi)”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“.Net Core和RabbitMQ怎么限制循環(huán)消費(fèi)”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI