溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么在C#中實(shí)現(xiàn)請求唯一性校驗(yàn)支持高并發(fā)

發(fā)布時間:2021-06-04 17:22:07 來源:億速云 閱讀:152 作者:Leah 欄目:編程語言

這篇文章將為大家詳細(xì)講解有關(guān)怎么在C#中實(shí)現(xiàn)請求唯一性校驗(yàn)支持高并發(fā),文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

使用場景描述:

  網(wǎng)絡(luò)請求中經(jīng)常會遇到發(fā)送的請求,服務(wù)端響應(yīng)是成功的,但是返回的時候出現(xiàn)網(wǎng)絡(luò)故障,導(dǎo)致客戶端無法接收到請求結(jié)果,那么客戶端程序可能判斷為網(wǎng)絡(luò)故障,而重復(fù)發(fā)送同一個請求。當(dāng)然如果接口中定義了請求結(jié)果查詢接口,那么這種重復(fù)會相對少一些。特別是交易類的數(shù)據(jù),這種操作更是需要避免重復(fù)發(fā)送請求。另外一種情況是用戶過于快速的點(diǎn)擊界面按鈕,產(chǎn)生連續(xù)的相同內(nèi)容請求,那么后端也需要進(jìn)行過濾,這種一般出現(xiàn)在系統(tǒng)對接上,無法去控制第三方系統(tǒng)的業(yè)務(wù)邏輯,需要從自身業(yè)務(wù)邏輯里面去限定。

其他需求描述:

  這類請求一般存在時間范圍和高并發(fā)的特點(diǎn),就是短時間內(nèi)會出現(xiàn)重復(fù)的請求,因此對模塊需要支持高并發(fā)性。

技術(shù)實(shí)現(xiàn):

  對請求的業(yè)務(wù)內(nèi)容進(jìn)行MD5摘要,并且將MD5摘要存儲到緩存中,每個請求數(shù)據(jù)都通過這個一個公共的調(diào)用的方法進(jìn)行判斷。

代碼實(shí)現(xiàn):

  公共調(diào)用代碼 UniqueCheck 采用單例模式創(chuàng)建唯一對象,便于在多線程調(diào)用的時候,只訪問一個統(tǒng)一的緩存庫

/*
   * volatile就像大家更熟悉的const一樣,volatile是一個類型修飾符(type specifier)。
   * 它是被設(shè)計用來修飾被不同線程訪問和修改的變量。
   * 如果沒有volatile,基本上會導(dǎo)致這樣的結(jié)果:要么無法編寫多線程程序,要么編譯器失去大量優(yōu)化的機(jī)會。
   */
  private static readonly object lockHelper = new object();
 
  private volatile static UniqueCheck _instance;  
 
  /// <summary>
  /// 獲取單一實(shí)例
  /// </summary>
  /// <returns></returns>
  public static UniqueCheck GetInstance()
  {
   if (_instance == null)
   {
    lock (lockHelper)
    {
     if (_instance == null)
      _instance = new UniqueCheck();
    }
   }
   return _instance;
  }

  這里需要注意volatile的修飾符,在實(shí)際測試過程中,如果沒有此修飾符,在高并發(fā)的情況下會出現(xiàn)報錯。

  自定義一個可以進(jìn)行并發(fā)處理隊列,代碼如下:ConcurrentLinkedQueue

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace PackgeUniqueCheck
{
 /// <summary>
 /// 非加鎖并發(fā)隊列,處理100個并發(fā)數(shù)以內(nèi)
 /// </summary>
 /// <typeparam name="T"></typeparam>
 public class ConcurrentLinkedQueue<T>
 {
  private class Node<K>
  {
   internal K Item;
   internal Node<K> Next;

   public Node(K item, Node<K> next)
   {
    this.Item = item;
    this.Next = next;
   }
  }

  private Node<T> _head;
  private Node<T> _tail;

  public ConcurrentLinkedQueue()
  {
   _head = new Node<T>(default(T), null);
   _tail = _head;
  }

  public bool IsEmpty
  {
   get { return (_head.Next == null); }
  }
  /// <summary>
  /// 進(jìn)入隊列
  /// </summary>
  /// <param name="item"></param>
  public void Enqueue(T item)
  {
   Node<T> newNode = new Node<T>(item, null);
   while (true)
   {
    Node<T> curTail = _tail;
    Node<T> residue = curTail.Next;

    //判斷_tail是否被其他process改變
    if (curTail == _tail)
    {
     //A 有其他process執(zhí)行C成功,_tail應(yīng)該指向新的節(jié)點(diǎn)
     if (residue == null)
     {
      //C 其他process改變了tail節(jié)點(diǎn),需要重新取tail節(jié)點(diǎn)
      if (Interlocked.CompareExchange<Node<T>>(
       ref curTail.Next, newNode, residue) == residue)
      {
       //D 嘗試修改tail
       Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
       return;
      }
     }
     else
     {
      //B 幫助其他線程完成D操作
      Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
     }
    }
   }
  }
  /// <summary>
  /// 隊列取數(shù)據(jù)
  /// </summary>
  /// <param name="result"></param>
  /// <returns></returns>
  public bool TryDequeue(out T result)
  {
   Node<T> curHead;
   Node<T> curTail;
   Node<T> next;
   while (true)
   {
    curHead = _head;
    curTail = _tail;
    next = curHead.Next;
    if (curHead == _head)
    {
     if (next == null) //Queue為空
     {
      result = default(T);
      return false;
     }
     if (curHead == curTail) //Queue處于Enqueue第一個node的過程中
     {
      //嘗試幫助其他Process完成操作
      Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
     }
     else
     {
      //取next.Item必須放到CAS之前
      result = next.Item;
      //如果_head沒有發(fā)生改變,則將_head指向next并退出
      if (Interlocked.CompareExchange<Node<T>>(ref _head,
       next, curHead) == curHead)
       break;
     }
    }
   }
   return true;
  }
  /// <summary>
  /// 嘗試獲取最后一個對象
  /// </summary>
  /// <param name="result"></param>
  /// <returns></returns>
  public bool TryGetTail(out T result)
  {
   result = default(T);
   if (_tail == null)
   {
    return false;
   }
   result = _tail.Item;
   return true;
  }
 }
}

雖然是一個非常簡單的唯一性校驗(yàn)邏輯,但是要做到高效率,高并發(fā)支持,高可靠性,以及低內(nèi)存占用,需要實(shí)現(xiàn)這樣的需求,需要做細(xì)致的模擬測試。

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Collections;

namespace PackgeUniqueCheck
{
 public class UniqueCheck
 {
  /*
   * volatile就像大家更熟悉的const一樣,volatile是一個類型修飾符(type specifier)。
   * 它是被設(shè)計用來修飾被不同線程訪問和修改的變量。
   * 如果沒有volatile,基本上會導(dǎo)致這樣的結(jié)果:要么無法編寫多線程程序,要么編譯器失去大量優(yōu)化的機(jī)會。
   */
  private static readonly object lockHelper = new object();

  private volatile static UniqueCheck _instance;  

  /// <summary>
  /// 獲取單一實(shí)例
  /// </summary>
  /// <returns></returns>
  public static UniqueCheck GetInstance()
  {
   if (_instance == null)
   {
    lock (lockHelper)
    {
     if (_instance == null)
      _instance = new UniqueCheck();
    }
   }
   return _instance;
  }

  private UniqueCheck()
  {
   //創(chuàng)建一個線程安全的哈希表,作為字典緩存
   _DataKey = Hashtable.Synchronized(new Hashtable());
   Queue myqueue = new Queue();
   _DataQueue = Queue.Synchronized(myqueue);
   _Myqueue = new ConcurrentLinkedQueue<string>();
   _Timer = new Thread(DoTicket);
   _Timer.Start();
  }

  #region 公共屬性設(shè)置
  /// <summary>
  /// 設(shè)定定時線程的休眠時間長度:默認(rèn)為1分鐘
  /// 時間范圍:1-7200000,值為1毫秒到2小時
  /// </summary>
  /// <param name="value"></param>
  public void SetTimeSpan(int value)
  {
   if (value > 0&& value <=7200000)
   {
    _TimeSpan = value;
   }
  }
  /// <summary>
  /// 設(shè)定緩存Cache中的最大記錄條數(shù)
  /// 值范圍:1-5000000,1到500萬
  /// </summary>
  /// <param name="value"></param>
  public void SetCacheMaxNum(int value)
  {
   if (value > 0 && value <= 5000000)
   {
    _CacheMaxNum = value;
   }
  }
  /// <summary>
  /// 設(shè)置是否在控制臺中顯示日志
  /// </summary>
  /// <param name="value"></param>
  public void SetIsShowMsg(bool value)
  {
   Helper.IsShowMsg = value;
  }
  /// <summary>
  /// 線程請求阻塞增量
  /// 值范圍:1-CacheMaxNum,建議設(shè)置為緩存最大值的10%-20%
  /// </summary>
  /// <param name="value"></param>
  public void SetBlockNumExt(int value)
  {
   if (value > 0 && value <= _CacheMaxNum)
   {
    _BlockNumExt = value;
   }
  }
  /// <summary>
  /// 請求阻塞時間
  /// 值范圍:1-max,根據(jù)阻塞增量設(shè)置請求阻塞時間
  /// 阻塞時間越長,阻塞增量可以設(shè)置越大,但是請求實(shí)時響應(yīng)就越差
  /// </summary>
  /// <param name="value"></param>
  public void SetBlockSpanTime(int value)
  {
   if (value > 0)
   {
    _BlockSpanTime = value;
   }
  }
  #endregion

  #region 私有變量
  /// <summary>
  /// 內(nèi)部運(yùn)行線程
  /// </summary>
  private Thread _runner = null;
  /// <summary>
  /// 可處理高并發(fā)的隊列
  /// </summary>
  private ConcurrentLinkedQueue<string> _Myqueue = null;
  /// <summary>
  /// 唯一內(nèi)容的時間健值對
  /// </summary>
  private Hashtable _DataKey = null;
  /// <summary>
  /// 內(nèi)容時間隊列
  /// </summary>
  private Queue _DataQueue = null;
  /// <summary>
  /// 定時線程的休眠時間長度:默認(rèn)為1分鐘
  /// </summary>
  private int _TimeSpan = 3000;
  /// <summary>
  /// 定時計時器線程
  /// </summary>
  private Thread _Timer = null;
  /// <summary>
  /// 緩存Cache中的最大記錄條數(shù)
  /// </summary>
  private int _CacheMaxNum = 500000;
  /// <summary>
  /// 線程請求阻塞增量
  /// </summary>
  private int _BlockNumExt = 10000;
  /// <summary>
  /// 請求阻塞時間
  /// </summary>
  private int _BlockSpanTime = 100;
  #endregion

  #region 私有方法
  private void StartRun()
  {
   _runner = new Thread(DoAction);
   _runner.Start();
   Helper.ShowMsg("內(nèi)部線程啟動成功!");
  }

  private string GetItem()
  {
   string tp = string.Empty;
   bool result = _Myqueue.TryDequeue(out tp);
   return tp;
  }
  /// <summary>
  /// 執(zhí)行循環(huán)操作
  /// </summary>
  private void DoAction()
  {
   while (true)
   {
    while (!_Myqueue.IsEmpty)
    {
     string item = GetItem();
     _DataQueue.Enqueue(item);
     if (!_DataKey.ContainsKey(item))
     {
      _DataKey.Add(item, DateTime.Now);
     }
    }
    //Helper.ShowMsg("當(dāng)前數(shù)組已經(jīng)為空,處理線程進(jìn)入休眠狀態(tài)...");
    Thread.Sleep(2);
   }
  }
  /// <summary>
  /// 執(zhí)行定時器的動作
  /// </summary>
  private void DoTicket()
  {
   while (true)
   {
    Helper.ShowMsg("當(dāng)前數(shù)據(jù)隊列個數(shù):" + _DataQueue.Count.ToString());
    if (_DataQueue.Count > _CacheMaxNum)
    {
     while (true)
     {
      Helper.ShowMsg(string.Format("當(dāng)前隊列數(shù):{0},已經(jīng)超出最大長度:{1},開始進(jìn)行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));
      string item = _DataQueue.Dequeue().ToString();
      if (!string.IsNullOrEmpty(item))
      {
       if (_DataKey.ContainsKey(item))
       {
        _DataKey.Remove(item);
       }
       if (_DataQueue.Count <= _CacheMaxNum)
       {
        Helper.ShowMsg("清理完成,開始休眠清理線程...");
        break;
       }
      }
     }
    }
    Thread.Sleep(_TimeSpan);
   }
  }

  /// <summary>
  /// 線程進(jìn)行睡眠等待
  /// 如果當(dāng)前負(fù)載壓力大大超出了線程的處理能力
  /// 那么需要進(jìn)行延時調(diào)用
  /// </summary>
  private void BlockThread()
  {
   if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)
   {
    Thread.Sleep(_BlockSpanTime);
   }
  }
  #endregion

  #region 公共方法
  /// <summary>
  /// 開啟服務(wù)線程
  /// </summary>
  public void Start()
  {
   if (_runner == null)
   {
    StartRun();
   }
   else
   {
    if (_runner.IsAlive == false)
    {
     StartRun();
    }
   }

  }
  /// <summary>
  /// 關(guān)閉服務(wù)線程
  /// </summary>
  public void Stop()
  {
   if (_runner != null)
   {
    _runner.Abort();
    _runner = null;
   }
  }

  /// <summary>
  /// 添加內(nèi)容信息
  /// </summary>
  /// <param name="item">內(nèi)容信息</param>
  /// <returns>true:緩存中不包含此值,隊列添加成功,false:緩存中包含此值,隊列添加失敗</returns>
  public bool AddItem(string item)
  {
   BlockThread();
   item = Helper.MakeMd5(item);
   if (_DataKey.ContainsKey(item))
   {
    return false;
   }
   else
   {
    _Myqueue.Enqueue(item);
    return true;
   }
  }
  /// <summary>
  /// 判斷內(nèi)容信息是否已經(jīng)存在
  /// </summary>
  /// <param name="item">內(nèi)容信息</param>
  /// <returns>true:信息已經(jīng)存在于緩存中,false:信息不存在于緩存中</returns>
  public bool CheckItem(string item)
  {
   item = Helper.MakeMd5(item);
   return _DataKey.ContainsKey(item);
  }
  #endregion 

 }
}

模擬測試代碼:

private static string _example = Guid.NewGuid().ToString();

  private static UniqueCheck _uck = null;

  static void Main(string[] args)
  {
   _uck = UniqueCheck.GetInstance();
   _uck.Start();
   _uck.SetIsShowMsg(false);
   _uck.SetCacheMaxNum(20000000);
   _uck.SetBlockNumExt(1000000);
   _uck.SetTimeSpan(6000);

   _uck.AddItem(_example);
   Thread[] threads = new Thread[20];

   for (int i = 0; i < 20; i++)
   {
    threads[i] = new Thread(AddInfo);
    threads[i].Start();
   }

   Thread checkthread = new Thread(CheckInfo);
   checkthread.Start();

   string value = Console.ReadLine();

   checkthread.Abort();
   for (int i = 0; i < 50; i++)
   {
    threads[i].Abort();
   }
   _uck.Stop();
  }

  static void AddInfo()
  {
   while (true)
   {
    _uck.AddItem(Guid.NewGuid().ToString());
   }
  }

  static void CheckInfo()
  {
   while (true)
   {
    Console.WriteLine("開始時間:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
    Console.WriteLine("插入結(jié)果:{0}", _uck.AddItem(_example));
    Console.WriteLine("結(jié)束時間:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
          //調(diào)整進(jìn)程休眠時間,可以測試高并發(fā)的情況
    //Thread.Sleep(1000);
   }
   
  }

測試截圖:

怎么在C#中實(shí)現(xiàn)請求唯一性校驗(yàn)支持高并發(fā)

關(guān)于怎么在C#中實(shí)現(xiàn)請求唯一性校驗(yàn)支持高并發(fā)就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI