溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何在.NetCore中使用BlockingCollection實現(xiàn)一個消息隊列

發(fā)布時間:2021-01-21 15:53:44 來源:億速云 閱讀:495 作者:Leah 欄目:開發(fā)技術(shù)

本篇文章為大家展示了如何在.NetCore中使用BlockingCollection實現(xiàn)一個消息隊列,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

消息隊列現(xiàn)今的應(yīng)用場景越來越大,常用的有RabbmitMQ和KafKa。

我們用BlockingCollection來實現(xiàn)簡單的消息隊列。

BlockingCollection實現(xiàn)了生產(chǎn)者/消費者模式,是對IProducerConsumerCollection<T>接口的實現(xiàn)。與其他Concurrent集合一樣,每次Add或Take元素,都會導(dǎo)致對集合的lock。只有當(dāng)確定需要在內(nèi)存中創(chuàng)建一個生產(chǎn)者,消費者模式時,再考慮這個類。

MSDN中的示例用法:

using (BlockingCollection<int> bc = new BlockingCollection<int>())
  {
    Task.Factory.StartNew(() =>
    {
      for (int i = 0; i < 1000; i++)
      {
        bc.Add(i);
        Thread.Sleep(50); 
      }
 
 
      // Need to do this to keep foreach below from hanging
      bc.CompleteAdding();
    });
 
 
    // Now consume the blocking collection with foreach.
    // Use bc.GetConsumingEnumerable() instead of just bc because the
    // former will block waiting for completion and the latter will
    // simply take a snapshot of the current state of the underlying collection.
    foreach (var item in bc.GetConsumingEnumerable())
    {
      Console.WriteLine(item);
    }
  }

實現(xiàn)消息隊列

用Vs2017創(chuàng)建一個控制臺應(yīng)用程序。創(chuàng)建DemoQueueBlock類,封裝一些常用判斷。

  • HasEle,判斷是否有元素

  • Add向隊列中添加元素

  • Take從隊列中取出元素

為了不把BlockingCollection直接暴漏給使用者,我們封裝一個DemoQueueBlock類

  /// <summary>
  /// BlockingCollection演示消息隊列
  /// </summary>
  /// <typeparam name="T"></typeparam>
  public class DemoQueueBlock<T> where T : class
  {
    private static BlockingCollection<T> Colls;
    public DemoQueueBlock()
    {

    }
    public static bool IsComleted() {
      if (Colls != null && Colls.IsCompleted) {
        return true;
      }
      return false;
    }
    public static bool HasEle()
    {
      if (Colls != null && Colls.Count>0)
      {
        return true;
      }
      return false;
    }
    
    public static bool Add(T msg)
    {
      if (Colls == null)
      {
        Colls = new BlockingCollection<T>();
      }
      Colls.Add(msg);
      return true;
    }
    public static T Take()
    {
      if (Colls == null)
      {
        Colls = new BlockingCollection<T>();
      }
      return Colls.Take();
    }
  }

  /// <summary>
  /// 消息體
  /// </summary>
  public class DemoMessage
  {
    public string BusinessType { get; set; }
    public string BusinessId { get; set; }
    public string Body { get; set; }
  }

添加元素進隊列

通過控制臺,添加元素

  //添加元素
      while (true)
      {
        Console.WriteLine("請輸入隊列");
        var read = Console.ReadLine();
        if (read == "exit")
        {
          return;
        }

        DemoQueueBlock<DemoMessage>.Add(new DemoMessage() { BusinessId = read });
      }

消費隊列

通過判斷IsComleted,來確定是否獲取隊列

 Task.Factory.StartNew(() =>
      {
        //從隊列中取元素。
        while (!DemoQueueBlock<DemoMessage>.IsComleted())
        {
          try
          {
            var m = DemoQueueBlock<DemoMessage>.Take();
           Console.WriteLine("已消費:" + m.BusinessId);
          }
          catch (Exception ex)
          {
            Console.WriteLine(ex.Message);
          }
        }
      });

查看運行結(jié)果

如何在.NetCore中使用BlockingCollection實現(xiàn)一個消息隊列

上述內(nèi)容就是如何在.NetCore中使用BlockingCollection實現(xiàn)一個消息隊列,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI