您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關(guān)C#中實(shí)現(xiàn)ConcurrentBag的原理分析的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。
ConcurrentBag<T>
對(duì)外提供的方法沒(méi)有List<T>
那么多,但是同樣有Enumerable
實(shí)現(xiàn)的擴(kuò)展方法。類本身提供的方法如下所示。
名稱 | 說(shuō)明 |
---|---|
Add | 將對(duì)象添加到 ConcurrentBag |
CopyTo | 從指定數(shù)組索引開(kāi)始,將 ConcurrentBag |
Equals(Object) | 確定指定的 Object 是否等于當(dāng)前的 Object。 (繼承自 Object。) |
Finalize | 允許對(duì)象在“垃圾回收”回收之前嘗試釋放資源并執(zhí)行其他清理操作。 (繼承自 Object。) |
GetEnumerator | 返回循環(huán)訪問(wèn) ConcurrentBag |
GetHashCode | 用作特定類型的哈希函數(shù)。 (繼承自 Object。) |
GetType | 獲取當(dāng)前實(shí)例的 Type。 (繼承自 Object。) |
MemberwiseClone | 創(chuàng)建當(dāng)前 Object 的淺表副本。 (繼承自 Object。) |
ToArray | 將 ConcurrentBag |
ToString | 返回表示當(dāng)前對(duì)象的字符串。 (繼承自 Object。) |
TryPeek | 嘗試從 ConcurrentBag |
TryTake | 嘗試從 ConcurrentBag |
ConcurrentBag
線程安全實(shí)現(xiàn)主要是通過(guò)它的數(shù)據(jù)存儲(chǔ)的結(jié)構(gòu)和細(xì)顆粒度的鎖。
public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> { // ThreadLocalList對(duì)象包含每個(gè)線程的數(shù)據(jù) ThreadLocal<ThreadLocalList> m_locals; // 這個(gè)頭指針和尾指針指向中的第一個(gè)和最后一個(gè)本地列表,這些本地列表分散在不同線程中 // 允許在線程局部對(duì)象上枚舉 volatile ThreadLocalList m_headList, m_tailList; // 這個(gè)標(biāo)志是告知操作線程必須同步操作 // 在GlobalListsLock 鎖中 設(shè)置 bool m_needSync; }
首選我們來(lái)看它聲明的私有字段,其中需要注意的是集合的數(shù)據(jù)是存放在ThreadLocal
線程本地存儲(chǔ)中的。也就是說(shuō)訪問(wèn)它的每個(gè)線程會(huì)維護(hù)一個(gè)自己的集合數(shù)據(jù)列表,一個(gè)集合中的數(shù)據(jù)可能會(huì)存放在不同線程的本地存儲(chǔ)空間中,所以如果線程訪問(wèn)自己本地存儲(chǔ)的對(duì)象,那么是沒(méi)有問(wèn)題的,這就是實(shí)現(xiàn)線程安全的第一層,使用線程本地存儲(chǔ)數(shù)據(jù)。
然后可以看到ThreadLocalList m_headList, m_tailList;
這個(gè)是存放著本地列表對(duì)象的頭指針和尾指針,通過(guò)這兩個(gè)指針,我們就可以通過(guò)遍歷的方式來(lái)訪問(wèn)所有本地列表。它使用volatile
修飾,不允許線程進(jìn)行本地緩存,每個(gè)線程的讀寫都是直接操作在共享內(nèi)存上,這就保證了變量始終具有一致性。任何線程在任何時(shí)間進(jìn)行讀寫操作均是最新值。
最后又定義了一個(gè)標(biāo)志,這個(gè)標(biāo)志告知操作線程必須進(jìn)行同步操作,這是實(shí)現(xiàn)了一個(gè)細(xì)顆粒度的鎖,因?yàn)橹挥性趲讉€(gè)條件滿足的情況下才需要進(jìn)行線程同步。
接下來(lái)我們來(lái)看一下ThreadLocalList
類的構(gòu)造,該類就是實(shí)際存儲(chǔ)了數(shù)據(jù)的位置。實(shí)際上它是使用雙向鏈表這種結(jié)構(gòu)進(jìn)行數(shù)據(jù)存儲(chǔ)。
[Serializable] // 構(gòu)造了雙向鏈表的節(jié)點(diǎn) internal class Node { public Node(T value) { m_value = value; } public readonly T m_value; public Node m_next; public Node m_prev; } /// <summary> /// 集合操作類型 /// </summary> internal enum ListOperation { None, Add, Take }; /// <summary> /// 線程鎖定的類 /// </summary> internal class ThreadLocalList { // 雙向鏈表的頭結(jié)點(diǎn) 如果為null那么表示鏈表為空 internal volatile Node m_head; // 雙向鏈表的尾節(jié)點(diǎn) private volatile Node m_tail; // 定義當(dāng)前對(duì)List進(jìn)行操作的種類 // 與前面的 ListOperation 相對(duì)應(yīng) internal volatile int m_currentOp; // 這個(gè)列表元素的計(jì)數(shù) private int m_count; // The stealing count // 這個(gè)不是特別理解 好像是在本地列表中 刪除某個(gè)Node 以后的計(jì)數(shù) internal int m_stealCount; // 下一個(gè)列表 可能會(huì)在其它線程中 internal volatile ThreadLocalList m_nextList; // 設(shè)定鎖定是否已進(jìn)行 internal bool m_lockTaken; // The owner thread for this list internal Thread m_ownerThread; // 列表的版本,只有當(dāng)列表從空變?yōu)榉强战y(tǒng)計(jì)是底層 internal volatile int m_version; /// <summary> /// ThreadLocalList 構(gòu)造器 /// </summary> /// <param name="ownerThread">擁有這個(gè)集合的線程</param> internal ThreadLocalList(Thread ownerThread) { m_ownerThread = ownerThread; } /// <summary> /// 添加一個(gè)新的item到鏈表首部 /// </summary> /// <param name="item">The item to add.</param> /// <param name="updateCount">是否更新計(jì)數(shù).</param> internal void Add(T item, bool updateCount) { checked { m_count++; } Node node = new Node(item); if (m_head == null) { Debug.Assert(m_tail == null); m_head = node; m_tail = node; m_version++; // 因?yàn)檫M(jìn)行初始化了,所以將空狀態(tài)改為非空狀態(tài) } else { // 使用頭插法 將新的元素插入鏈表 node.m_next = m_head; m_head.m_prev = node; m_head = node; } if (updateCount) // 更新計(jì)數(shù)以避免此添加同步時(shí)溢出 { m_count = m_count - m_stealCount; m_stealCount = 0; } } /// <summary> /// 從列表的頭部刪除一個(gè)item /// </summary> /// <param name="result">The removed item</param> internal void Remove(out T result) { // 雙向鏈表刪除頭結(jié)點(diǎn)數(shù)據(jù)的流程 Debug.Assert(m_head != null); Node head = m_head; m_head = m_head.m_next; if (m_head != null) { m_head.m_prev = null; } else { m_tail = null; } m_count--; result = head.m_value; } /// <summary> /// 返回列表頭部的元素 /// </summary> /// <param name="result">the peeked item</param> /// <returns>True if succeeded, false otherwise</returns> internal bool Peek(out T result) { Node head = m_head; if (head != null) { result = head.m_value; return true; } result = default(T); return false; } /// <summary> /// 從列表的尾部獲取一個(gè)item /// </summary> /// <param name="result">the removed item</param> /// <param name="remove">remove or peek flag</param> internal void Steal(out T result, bool remove) { Node tail = m_tail; Debug.Assert(tail != null); if (remove) // Take operation { m_tail = m_tail.m_prev; if (m_tail != null) { m_tail.m_next = null; } else { m_head = null; } // Increment the steal count m_stealCount++; } result = tail.m_value; } /// <summary> /// 獲取總計(jì)列表計(jì)數(shù), 它不是線程安全的, 如果同時(shí)調(diào)用它, 則可能提供不正確的計(jì)數(shù) /// </summary> internal int Count { get { return m_count - m_stealCount; } } }
從上面的代碼中我們可以更加驗(yàn)證之前的觀點(diǎn),就是ConcurentBag<T>
在一個(gè)線程中存儲(chǔ)數(shù)據(jù)時(shí),使用的是雙向鏈表,ThreadLocalList
實(shí)現(xiàn)了一組對(duì)鏈表增刪改查的方法。
接下來(lái)我們看一看ConcurentBag<T>
是如何新增元素的。
/// <summary> /// 嘗試獲取無(wú)主列表,無(wú)主列表是指線程已經(jīng)被暫?;蛘呓K止,但是集合中的部分?jǐn)?shù)據(jù)還存儲(chǔ)在那里 /// 這是避免內(nèi)存泄漏的方法 /// </summary> /// <returns></returns> private ThreadLocalList GetUnownedList() { //此時(shí)必須持有全局鎖 Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 從頭線程列表開(kāi)始枚舉 找到那些已經(jīng)被關(guān)閉的線程 // 將它所在的列表對(duì)象 返回 ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) { currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe return currentList; } currentList = currentList.m_nextList; } return null; } /// <summary> /// 本地幫助方法,通過(guò)線程對(duì)象檢索線程線程本地列表 /// </summary> /// <param name="forceCreate">如果列表不存在,那么創(chuàng)建新列表</param> /// <returns>The local list object</returns> private ThreadLocalList GetThreadList(bool forceCreate) { ThreadLocalList list = m_locals.Value; if (list != null) { return list; } else if (forceCreate) { // 獲取用于更新操作的 m_tailList 鎖 lock (GlobalListsLock) { // 如果頭列表等于空,那么說(shuō)明集合中還沒(méi)有元素 // 直接創(chuàng)建一個(gè)新的 if (m_headList == null) { list = new ThreadLocalList(Thread.CurrentThread); m_headList = list; m_tailList = list; } else { // ConcurrentBag內(nèi)的數(shù)據(jù)是以雙向鏈表的形式分散存儲(chǔ)在各個(gè)線程的本地區(qū)域中 // 通過(guò)下面這個(gè)方法 可以找到那些存儲(chǔ)有數(shù)據(jù) 但是已經(jīng)被停止的線程 // 然后將已停止線程的數(shù)據(jù) 移交到當(dāng)前線程管理 list = GetUnownedList(); // 如果沒(méi)有 那么就新建一個(gè)列表 然后更新尾指針的位置 if (list == null) { list = new ThreadLocalList(Thread.CurrentThread); m_tailList.m_nextList = list; m_tailList = list; } } m_locals.Value = list; } } else { return null; } Debug.Assert(list != null); return list; } /// <summary> /// Adds an object to the <see cref="ConcurrentBag{T}"/>. /// </summary> /// <param name="item">The object to be added to the /// <see cref="ConcurrentBag{T}"/>. The value can be a null reference /// (Nothing in Visual Basic) for reference types.</param> public void Add(T item) { // 獲取該線程的本地列表, 如果此線程不存在, 則創(chuàng)建一個(gè)新列表 (第一次調(diào)用 add) ThreadLocalList list = GetThreadList(true); // 實(shí)際的數(shù)據(jù)添加操作 在AddInternal中執(zhí)行 AddInternal(list, item); } /// <summary> /// </summary> /// <param name="list"></param> /// <param name="item"></param> private void AddInternal(ThreadLocalList list, T item) { bool lockTaken = false; try { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add); #pragma warning restore 0420 // 同步案例: // 如果列表計(jì)數(shù)小于兩個(gè), 因?yàn)槭请p向鏈表的關(guān)系 為了避免與任何竊取線程發(fā)生沖突 必須獲取鎖 // 如果設(shè)置了 m_needSync, 這意味著有一個(gè)線程需要凍結(jié)包 也必須獲取鎖 if (list.Count < 2 || m_needSync) { // 將其重置為None 以避免與竊取線程的死鎖 list.m_currentOp = (int)ListOperation.None; // 鎖定當(dāng)前對(duì)象 Monitor.Enter(list, ref lockTaken); } // 調(diào)用 ThreadLocalList.Add方法 將數(shù)據(jù)添加到雙向鏈表中 // 如果已經(jīng)鎖定 那么說(shuō)明線程安全 可以更新Count 計(jì)數(shù) list.Add(item, lockTaken); } finally { list.m_currentOp = (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } }
從上面代碼中,我們可以很清楚的知道Add()
方法是如何運(yùn)行的,其中的關(guān)鍵就是GetThreadList()
方法,通過(guò)該方法可以獲取當(dāng)前線程的數(shù)據(jù)存儲(chǔ)列表對(duì)象,假如不存在數(shù)據(jù)存儲(chǔ)列表,它會(huì)自動(dòng)創(chuàng)建或者通過(guò)GetUnownedList()
方法來(lái)尋找那些被停止但是還存儲(chǔ)有數(shù)據(jù)列表的線程,然后將數(shù)據(jù)列表返回給當(dāng)前線程中,防止了內(nèi)存泄漏。
在數(shù)據(jù)添加的過(guò)程中,實(shí)現(xiàn)了細(xì)顆粒度的lock
同步鎖,所以性能會(huì)很高。刪除和其它操作與新增類似,本文不再贅述。
看完上面的代碼后,我很好奇ConcurrentBag<T>
是如何實(shí)現(xiàn)IEnumerator
來(lái)實(shí)現(xiàn)迭代訪問(wèn)的,因?yàn)?code>ConcurrentBag<T>是通過(guò)分散在不同線程中的ThreadLocalList
來(lái)存儲(chǔ)數(shù)據(jù)的,那么在實(shí)現(xiàn)迭代器模式時(shí),過(guò)程會(huì)比較復(fù)雜。
后面再查看了源碼之后,發(fā)現(xiàn)ConcurrentBag<T>
為了實(shí)現(xiàn)迭代器模式,將分在不同線程中的數(shù)據(jù)全都存到一個(gè)List<T>
集合中,然后返回了該副本的迭代器。所以每次訪問(wèn)迭代器,它都會(huì)新建一個(gè)List<T>
的副本,這樣雖然浪費(fèi)了一定的存儲(chǔ)空間,但是邏輯上更加簡(jiǎn)單了。
/// <summary> /// 本地幫助器方法釋放所有本地列表鎖 /// </summary> private void ReleaseAllLocks() { // 該方法用于在執(zhí)行線程同步以后 釋放掉所有本地鎖 // 通過(guò)遍歷每個(gè)線程中存儲(chǔ)的 ThreadLocalList對(duì)象 釋放所占用的鎖 ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_lockTaken) { currentList.m_lockTaken = false; Monitor.Exit(currentList); } currentList = currentList.m_nextList; } } /// <summary> /// 從凍結(jié)狀態(tài)解凍包的本地幫助器方法 /// </summary> /// <param name="lockTaken">The lock taken result from the Freeze method</param> private void UnfreezeBag(bool lockTaken) { // 首先釋放掉 每個(gè)線程中 本地變量的鎖 // 然后釋放全局鎖 ReleaseAllLocks(); m_needSync = false; if (lockTaken) { Monitor.Exit(GlobalListsLock); } } /// <summary> /// 本地幫助器函數(shù)等待所有未同步的操作 /// </summary> private void WaitAllOperations() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); ThreadLocalList currentList = m_headList; // 自旋等待 等待其它操作完成 while (currentList != null) { if (currentList.m_currentOp != (int)ListOperation.None) { SpinWait spinner = new SpinWait(); // 有其它線程進(jìn)行操作時(shí),會(huì)將cuurentOp 設(shè)置成 正在操作的枚舉 while (currentList.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } } currentList = currentList.m_nextList; } } /// <summary> /// 本地幫助器方法獲取所有本地列表鎖 /// </summary> private void AcquireAllLocks() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); bool lockTaken = false; ThreadLocalList currentList = m_headList; // 遍歷每個(gè)線程的ThreadLocalList 然后獲取對(duì)應(yīng)ThreadLocalList的鎖 while (currentList != null) { // 嘗試/最后 bllock 以避免在獲取鎖和設(shè)置所采取的標(biāo)志之間的線程港口 try { Monitor.Enter(currentList, ref lockTaken); } finally { if (lockTaken) { currentList.m_lockTaken = true; lockTaken = false; } } currentList = currentList.m_nextList; } } /// <summary> /// Local helper method to freeze all bag operations, it /// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added /// to the dictionary /// 2- Then Acquire all local lists locks to prevent steal and synchronized operations /// 3- Wait for all un-synchronized operations to be done /// </summary> /// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param> private void FreezeBag(ref bool lockTaken) { Contract.Assert(!Monitor.IsEntered(GlobalListsLock)); // 全局鎖定可安全地防止多線程調(diào)用計(jì)數(shù)和損壞 m_needSync Monitor.Enter(GlobalListsLock, ref lockTaken); // 這將強(qiáng)制同步任何將來(lái)的添加/執(zhí)行操作 m_needSync = true; // 獲取所有列表的鎖 AcquireAllLocks(); // 等待所有操作完成 WaitAllOperations(); } /// <summary> /// 本地幫助器函數(shù)返回列表中的包項(xiàng), 這主要由 CopyTo 和 ToArray 使用。 /// 這不是線程安全, 應(yīng)該被稱為凍結(jié)/解凍袋塊 /// 本方法是私有的 只有使用 Freeze/UnFreeze之后才是安全的 /// </summary> /// <returns>List the contains the bag items</returns> private List<T> ToList() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 創(chuàng)建一個(gè)新的List List<T> list = new List<T>(); ThreadLocalList currentList = m_headList; // 遍歷每個(gè)線程中的ThreadLocalList 將里面的Node的數(shù)據(jù) 添加到list中 while (currentList != null) { Node currentNode = currentList.m_head; while (currentNode != null) { list.Add(currentNode.m_value); currentNode = currentNode.m_next; } currentList = currentList.m_nextList; } return list; } /// <summary> /// Returns an enumerator that iterates through the <see /// cref="ConcurrentBag{T}"/>. /// </summary> /// <returns>An enumerator for the contents of the <see /// cref="ConcurrentBag{T}"/>.</returns> /// <remarks> /// The enumeration represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use /// concurrently with reads from and writes to the bag. /// </remarks> public IEnumerator<T> GetEnumerator() { // Short path if the bag is empty if (m_headList == null) return new List<T>().GetEnumerator(); // empty list bool lockTaken = false; try { // 首先凍結(jié)整個(gè) ConcurrentBag集合 FreezeBag(ref lockTaken); // 然后ToList 再拿到 List的 IEnumerator return ToList().GetEnumerator(); } finally { UnfreezeBag(lockTaken); } }
由上面的代碼可知道,為了獲取迭代器對(duì)象,總共進(jìn)行了三步主要的操作。
1.使用FreezeBag()
方法,凍結(jié)整個(gè)ConcurrentBag<T>
集合。因?yàn)樾枰杉系?code>List<T>副本,生成副本期間不能有其它線程更改損壞數(shù)據(jù)。
2.將ConcurrrentBag<T>
生成List<T>
副本。因?yàn)?code>ConcurrentBag<T>存儲(chǔ)數(shù)據(jù)的方式比較特殊,直接實(shí)現(xiàn)迭代器模式困難,考慮到線程安全和邏輯,最佳的辦法是生成一個(gè)副本。
3.完成以上操作以后,就可以使用UnfreezeBag()
方法解凍整個(gè)集合。
那么FreezeBag()
方法是如何來(lái)凍結(jié)整個(gè)集合的呢?也是分為三步走。
1.首先獲取全局鎖,通過(guò) Monitor.Enter(GlobalListsLock, ref lockTaken);
這樣一條語(yǔ)句,這樣其它線程就不能凍結(jié)集合。
2.然后獲取所有線程中ThreadLocalList
的鎖,通過(guò)`AcquireAllLocks()方法來(lái)遍歷獲取。這樣其它線程就不能對(duì)它進(jìn)行操作損壞數(shù)據(jù)。
3.等待已經(jīng)進(jìn)入了操作流程線程結(jié)束,通過(guò) WaitAllOperations()
方法來(lái)實(shí)現(xiàn),該方法會(huì)遍歷每一個(gè)ThreadLocalList
對(duì)象的m_currentOp
屬性,確保全部處于None
操作。
完成以上流程后,那么就是真正的凍結(jié)了整個(gè)ConcurrentBag<T>
集合,要解凍的話也類似。在此不再贅述。
下面給出一張圖,描述了ConcurrentBag<T>
是如何存儲(chǔ)數(shù)據(jù)的。通過(guò)每個(gè)線程中的ThreadLocal
來(lái)實(shí)現(xiàn)線程本地存儲(chǔ),每個(gè)線程中都有這樣的結(jié)構(gòu),互不干擾。然后每個(gè)線程中的m_headList
總是指向ConcurrentBag<T>
的第一個(gè)列表,m_tailList
指向最后一個(gè)列表。列表與列表之間通過(guò)m_locals
下的 m_nextList
相連,構(gòu)成一個(gè)單鏈表。
數(shù)據(jù)存儲(chǔ)在每個(gè)線程的m_locals
中,通過(guò)Node
類構(gòu)成一個(gè)雙向鏈表。
PS: 要注意m_tailList
和m_headList
并不是存儲(chǔ)在ThreadLocal
中,而是所有的線程共享一份。
感謝各位的閱讀!關(guān)于“C#中實(shí)現(xiàn)ConcurrentBag的原理分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。