溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使C#框架的進(jìn)程間同步通訊

發(fā)布時(shí)間:2021-09-18 17:39:27 來源:億速云 閱讀:102 作者:柒染 欄目:編程語言

如何使C#框架的進(jìn)程間同步通訊,針對這個(gè)問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。

threadmsg_demo.zip ~ 41KB    下載

threadmsg_src.zip ~ 65KB    下載

如何使C#框架的進(jìn)程間同步通訊

0.背景簡介

微軟在 .NET 框架中提供了多種實(shí)用的線程同步手段,其中包括  monitor 類及  reader-writer鎖。但跨進(jìn)程的同步方法還是非常欠缺。另外,目前也沒有方便的線程間及進(jìn)程間傳遞消息的方法。例如C/S和SOA,又或者生產(chǎn)者/消費(fèi)者模式中就常常需要傳遞消息。為此我編寫了一個(gè)獨(dú)立完整的框架,實(shí)現(xiàn)了跨線程和跨進(jìn)程的同步和通訊。這框架內(nèi)包含了信號量,信箱,內(nèi)存映射文件,阻塞通道,及簡單消息流控制器等組件。文章里提到的類同屬于一個(gè)開源的庫項(xiàng)目(BSD許可),你可以從這里下載到 www.cdrnet.net/projects/threadmsg/.

這個(gè)框架的目的是:

1.封裝性:通過MSMQ消息隊(duì)列發(fā)送消息的線程無需關(guān)心消息是發(fā)送到另一個(gè)線程還是另一臺機(jī)器。

2.簡單性:向其他進(jìn)程發(fā)送消息只需調(diào)用一個(gè)方法。

注意:我刪除了本文中全部代碼的XML注釋以節(jié)省空間。如果你想知道這些方法和參數(shù)的詳細(xì)信息,請參考附件中的代碼。

1.先看一個(gè)簡單例子

使用了這個(gè)庫后,跨進(jìn)程的消息傳遞將變得非常簡單。我將用一個(gè)小例子來作示范:一個(gè)控制臺程序,根據(jù)參數(shù)可以作為發(fā)送方也可以作為接收方運(yùn)行。在發(fā)送程序里,你可以輸入一定的文本并發(fā)送到信箱內(nèi)(返回key),接收程序?qū)@示所有從信箱內(nèi)收到的消息。你可以運(yùn)行無數(shù)個(gè)發(fā)送程序和接收程序,但是每個(gè)消息只會被具體的某一個(gè)接收程序所收到。

[Serializable] struct Message {   public string Text; }  class Test {   IMailBox mail;    public Test()   {     mail = new ProcessMailBox("TMProcessTest",1024);   }    public void RunWriter()   {     Console.WriteLine("Writer started");     Message msg;     while(true)     {       msg.Text = Console.ReadLine();       if(msg.Text.Equals("exit"))         break;       mail.Content = msg;     }   }    public void RunReader()   {     Console.WriteLine("Reader started");     while(true)     {       Message msg = (Message)mail.Content;       Console.WriteLine(msg.Text);     }   }    [STAThread]   static void Main(string[] args)   {     Test test = new Test();     if(args.Length > 0)       test.RunWriter();     else       test.RunReader();   } }

信箱一旦創(chuàng)建之后(這上面代碼里是  ProcessMailBox  ),接收消息只需要讀取 Content  屬性,發(fā)送消息只需要給這個(gè)屬性賦值。當(dāng)沒有數(shù)據(jù)時(shí),獲取消息將會阻塞當(dāng)前線程;發(fā)送消息時(shí)如果信箱里已經(jīng)有數(shù)據(jù),則會阻塞當(dāng)前線程。正是有了這個(gè)阻塞,整個(gè)程序是完全基于中斷的,并且不會過度占用CPU(不需要進(jìn)行輪詢)。發(fā)送和接收的消息可以是任意支持序列化(Serializable)的類型。

然而,實(shí)際上暗地里發(fā)生的事情有點(diǎn)復(fù)雜:消息通過內(nèi)存映射文件來傳遞,這是目前唯一的跨進(jìn)程共享內(nèi)存的方法,這個(gè)例子里我們只會在  pagefile 里面產(chǎn)生虛擬文件。對這個(gè)虛擬文件的訪問是通過  win32  信號量來確保同步的。消息首先序列化成二進(jìn)制,然后再寫進(jìn)該文件,這就是為什么需要聲明Serializable屬性。內(nèi)存映射文件和  win32 信號量都需要調(diào)用  NT內(nèi)核的方法。多得了 .NET  框架中的 Marshal  類,我們可以避免編寫不安全的代碼。我們將在下面討論更多的細(xì)節(jié)。

2. .NET里面的跨線程/進(jìn)程同步

線程/進(jìn)程間的通訊需要共享內(nèi)存或者其他內(nèi)建機(jī)制來發(fā)送/接收數(shù)據(jù)。即使是采用共享內(nèi)存的方式,也還需要一組同步方法來允許并發(fā)訪問。

同一個(gè)進(jìn)程內(nèi)的所有線程都共享公共的邏輯地址空間(堆)。對于不同進(jìn)程,從 win2000  開始就已經(jīng)無法共享內(nèi)存。然而,不同的進(jìn)程可以讀寫同一個(gè)文件。WinAPI提供了多種系統(tǒng)調(diào)用方法來映射文件到進(jìn)程的邏輯空間,及訪問系統(tǒng)內(nèi)核對象(會話)指向的  pagefile  里面的虛擬文件。無論是共享堆,還是共享文件,并發(fā)訪問都有可能導(dǎo)致數(shù)據(jù)不一致。我們就這個(gè)問題簡單討論一下,該怎樣確保線程/進(jìn)程調(diào)用的有序性及數(shù)據(jù)的一致性。

2.1 線程同步

.NET 框架和 C#  提供了方便直觀的線程同步方法,即 monitor  類和 lock 語句(本文將不會討論  .NET 框架的互斥量)。對于線程同步,雖然本文提供了其他方法,我們還是推薦使用  lock 語句。

void Work1() {   NonCriticalSection1();   Monitor.Enter(this);   try   {     CriticalSection();   }   finally   {     Monitor.Exit(this);   }   NonCriticalSection2(); }

Work1 和 Work2 是等價(jià)的。在C#里面,很多人喜歡第二個(gè)方法,因?yàn)樗?,且不容易出錯(cuò)。

2.2 跨線程信號量

信號量是經(jīng)典的同步基本概念之一(由 Edsger  Dijkstra  引入)。信號量是指一個(gè)有計(jì)數(shù)器及兩個(gè)操作的對象。它的兩個(gè)操作是:獲取(也叫P或者等待),釋放(也叫V或者收到信號)。信號量在獲取操作時(shí)如果計(jì)數(shù)器為0則阻塞,否則將計(jì)數(shù)器減一;在釋放時(shí)將計(jì)數(shù)器加一,且不會阻塞。雖然信號量的原理很簡單,但是實(shí)現(xiàn)起來有點(diǎn)麻煩。好在,內(nèi)建的  monitor 類有阻塞特性,可以用來實(shí)現(xiàn)信號量。

public sealed class ThreadSemaphore : ISemaphore {   private int counter;   private readonly int max;    public ThreadSemaphore() : this(0, int.Max) {}   public ThreadSemaphore(int initial) : this(initial, int.Max) {}   public ThreadSemaphore(int initial, int max)   {     this.counter = Math.Min(initial,max);     this.max = max;   }    public void Acquire()   {     lock(this)     {       counter--;       if(counter < 0 && !Monitor.Wait(this))         throw new SemaphoreFailedException();     }   }    public void Acquire(TimeSpan timeout)   {     lock(this)     {       counter--;       if(counter < 0 && !Monitor.Wait(this,timeout))         throw new SemaphoreFailedException();     }   }    public void Release()   {     lock(this)     {       if(counter >= max)         throw new SemaphoreFailedException();       if(counter < 0)         Monitor.Pulse(this);       counter++;     }   } }

信號量在復(fù)雜的阻塞情景下更加有用,例如我們后面將要討論的通道(channel)。你也可以使用信號量來實(shí)現(xiàn)臨界區(qū)的排他性(如下面的  Work3),但是我還是推薦使用內(nèi)建的  lock 語句,像上面的 Work2  那樣。

請注意:如果使用不當(dāng),信號量也是有潛在危險(xiǎn)的。正確的做法是:當(dāng)獲取信號量失敗時(shí),千萬不要再調(diào)用釋放操作;當(dāng)獲取成功時(shí),無論發(fā)生了什么錯(cuò)誤,都要記得釋放信號量。遵循這樣的原則,你的同步才是正確的。Work3  中的 finally  語句就是為了保證正確釋放信號量。注意:獲取信號量( s.Acquire() )的操作必須放到  try 語句的外面,只有這樣,當(dāng)獲取失敗時(shí)才不會調(diào)用釋放操作。

ThreadSemaphore s = new ThreadSemaphore(1); void Work3() {   NonCriticalSection1();   s.Acquire();   try   {     CriticalSection();   }   finally   {     s.Release();   }   NonCriticalSection2(); }

2.3 跨進(jìn)程信號量

為了協(xié)調(diào)不同進(jìn)程訪問同一資源,我們需要用到上面討論過的概念。很不幸,.NET  中的 monitor  類不可以跨進(jìn)程使用。但是,win32  API提供的內(nèi)核信號量對象可以用來實(shí)現(xiàn)跨進(jìn)程同步。  Robin Galloway-Lunn  介紹了怎樣將  win32  的信號量映射到  .NET  中(見 Using Win32 Semaphores in  C#  )。我們的實(shí)現(xiàn)也類似:

[DllImport("kernel32",EntryPoint="CreateSemaphore",      SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint CreateSemaphore(   SecurityAttributes auth, int initialCount,     int maximumCount, string name);  [DllImport("kernel32",EntryPoint="WaitForSingleObject",  SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint WaitForSingleObject(  uint hHandle, uint dwMilliseconds);  [DllImport("kernel32",EntryPoint="ReleaseSemaphore",  SetLastError=true,CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool ReleaseSemaphore(   uint hHandle, int lReleaseCount, out int lpPreviousCount);      [DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true,   CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool CloseHandle(uint hHandle);
public class ProcessSemaphore : ISemaphore, IDisposable {   private uint handle;   private readonly uint interruptReactionTime;    public ProcessSemaphore(string name) : this(    name,0,int.MaxValue,500) {}   public ProcessSemaphore(string name, int initial) : this(    name,initial,int.MaxValue,500) {}   public ProcessSemaphore(string name, int initial,    int max, int interruptReactionTime)   {            this.interruptReactionTime = (uint)interruptReactionTime;     this.handle = NTKernel.CreateSemaphore(null, initial, max, name);     if(handle == 0)       throw new SemaphoreFailedException();   }    public void Acquire()   {     while(true)     { //looped 0.5s timeout to make NT-blocked threads interruptable.       uint res = NTKernel.WaitForSingleObject(handle,         interruptReactionTime);       try {System.Threading.Thread.Sleep(0);}        catch(System.Threading.ThreadInterruptedException e)       {         if(res == 0)         { //Rollback            int previousCount;           NTKernel.ReleaseSemaphore(handle,1,out previousCount);         }         throw e;       }       if(res == 0)         return;       if(res != 258)         throw new SemaphoreFailedException();     }   }    public void Acquire(TimeSpan timeout)   {     uint milliseconds = (uint)timeout.TotalMilliseconds;     if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0)       throw new SemaphoreFailedException();     }    public void Release()   {     int previousCount;     if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount))       throw new SemaphoreFailedException();     }    #region IDisposable Member   public void Dispose()   {     if(handle != 0)     {       if(NTKernel.CloseHandle(handle))         handle = 0;     }   }   #endregion }

有一點(diǎn)很重要:win32中的信號量是可以命名的。這允許其他進(jìn)程通過名字來創(chuàng)建相應(yīng)信號量的句柄。為了讓阻塞線程可以中斷,我們使用了一個(gè)(不好)的替代方法:使用超時(shí)和  Sleep(0)。我們需要中斷來安全關(guān)閉線程。更好的做法是:確定沒有線程阻塞之后才釋放信號量,這樣程序才可以完全釋放資源并正確退出。

你可能也注意到了:跨線程和跨進(jìn)程的信號量都使用了相同的接口。所有相關(guān)的類都使用了這種模式,以實(shí)現(xiàn)上面背景介紹中提到的封閉性。需要注意:出于性能考慮,你不應(yīng)該將跨進(jìn)程的信號量用到跨線程的場景,也不應(yīng)該將跨線程的實(shí)現(xiàn)用到單線程的場景。

3. 跨進(jìn)程共享內(nèi)存:內(nèi)存映射文件

我們已經(jīng)實(shí)現(xiàn)了跨線程和跨進(jìn)程的共享資源訪問同步。但是傳遞/接收消息還需要共享資源。對于線程來說,只需要聲明一個(gè)類成員變量就可以了。但是對于跨進(jìn)程來說,我們需要使用到  win32 API  提供的內(nèi)存映射文件(Memory Mapped  Files,簡稱MMF)。使用  MMF和使用 win32  信號量差不多。我們需要先調(diào)用 CreateFileMapping  方法來創(chuàng)建一個(gè)內(nèi)存映射文件的句柄:

[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping",      SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr CreateFileMapping(uint hFile,   SecurityAttributes lpAttributes, uint flProtect,   uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName);      [DllImport("Kernel32.dll",EntryPoint="MapViewOfFile",  SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject,    uint dwDesiredAccess, uint dwFileOffsetHigh,   uint dwFileOffsetLow, uint dwNumberOfBytesToMap);      [DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile",  SetLastError=true,CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);
public static MemoryMappedFile CreateFile(string name,       FileAccess access, int size) {   if(size < 0)     throw new ArgumentException("Size must not be negative","size");    IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null,    (uint)access,0,(uint)size,name);   if(fileMapping == IntPtr.Zero)     throw new MemoryMappingFailedException();    return new MemoryMappedFile(fileMapping,size,access); }

我們希望直接使用 pagefile 中的虛擬文件,所以我們用  -1(0xFFFFFFFF)  來作為文件句柄來創(chuàng)建我們的內(nèi)存映射文件句柄。我們也指定了必填的文件大小,以及相應(yīng)的名稱。這樣其他進(jìn)程就可以通過這個(gè)名稱來同時(shí)訪問該映射文件。創(chuàng)建了內(nèi)存映射文件后,我們就可以映射這個(gè)文件不同的部分(通過偏移量和字節(jié)大小來指定)到我們的進(jìn)程地址空間。我們通過  MapViewOfFile 系統(tǒng)方法來指定:

public MemoryMappedFileView CreateView(int offset, int size,       MemoryMappedFileView.ViewAccess access) {   if(this.access == FileAccess.ReadOnly && access ==      MemoryMappedFileView.ViewAccess.ReadWrite)     throw new ArgumentException(      "Only read access to views allowed on files without write access",      "access");   if(offset < 0)     throw new ArgumentException("Offset must not be negative","size");   if(size < 0)     throw new ArgumentException("Size must not be negative","size");   IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping,    (uint)access,0,(uint)offset,(uint)size);   return new MemoryMappedFileView(mappedView,size,access); }

在不安全的代碼中,我們可以將返回的指針強(qiáng)制轉(zhuǎn)換成我們指定的類型。盡管如此,我們不希望有不安全的代碼存在,所以我們使用  Marshal  類來從中讀寫我們的數(shù)據(jù)。偏移量參數(shù)是用來從哪里開始讀寫數(shù)據(jù),相對于指定的映射視圖的地址。

public byte ReadByte(int offset) {   return Marshal.ReadByte(mappedView,offset); } public void WriteByte(byte data, int offset) {   Marshal.WriteByte(mappedView,offset,data); }  public int ReadInt32(int offset) {   return Marshal.ReadInt32(mappedView,offset); } public void WriteInt32(int data, int offset) {   Marshal.WriteInt32(mappedView,offset,data); }  public void ReadBytes(byte[] data, int offset) {   for(int i=0;i<data.Length;i++)     data[i] = Marshal.ReadByte(mappedView,offset+i); } public void WriteBytes(byte[] data, int offset) {   for(int i=0;i<data.Length;i++)     Marshal.WriteByte(mappedView,offset+i,data[i]); }

但是,我們希望讀寫整個(gè)對象樹到文件中,所以我們需要支持自動進(jìn)行序列化和反序列化的方法。

public object ReadDeserialize(int offset, int length) {   byte[] binaryData = new byte[length];   ReadBytes(binaryData,offset);   System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter     = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();   System.IO.MemoryStream ms = new System.IO.MemoryStream(    binaryData,0,length,true,true);   object data = formatter.Deserialize(ms);   ms.Close();   return data; } public void WriteSerialize(object data, int offset, int length) { System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter     = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();   byte[] binaryData = new byte[length];   System.IO.MemoryStream ms = new System.IO.MemoryStream(    binaryData,0,length,true,true);   formatter.Serialize(ms,data);   ms.Flush();   ms.Close();   WriteBytes(binaryData,offset); }

請注意:對象序列化之后的大小不應(yīng)該超過映射視圖的大小。序列化之后的大小總是比對象本身占用的內(nèi)存要大的。我沒有試過直接將對象內(nèi)存流綁定到映射視圖,那樣做應(yīng)該也可以,甚至可能帶來少量的性能提升。

4. 信箱:在線程/進(jìn)程間傳遞消息

這里的信箱與 Email 及  NT  中的郵件槽(Mailslots)無關(guān)。它是一個(gè)只能保留一個(gè)對象的安全共享內(nèi)存結(jié)構(gòu)。信箱的內(nèi)容通過一個(gè)屬性來讀寫。如果信箱內(nèi)容為空,試圖讀取該信箱的線程將會阻塞,直到另一個(gè)線程往其中寫內(nèi)容。如果信箱已經(jīng)有了內(nèi)容,當(dāng)一個(gè)線程試圖往其中寫內(nèi)容時(shí)將被阻塞,直到另一個(gè)線程將信箱內(nèi)容讀取出去。信箱的內(nèi)容只能被讀取一次,它的引用在讀取后自動被刪除?;谏厦娴拇a,我們已經(jīng)可以實(shí)現(xiàn)信箱了。

4.1 跨線程的信箱

我們可以使用兩個(gè)信號量來實(shí)現(xiàn)一個(gè)信箱:一個(gè)信號量在信箱內(nèi)容為空時(shí)觸發(fā),另一個(gè)在信箱有內(nèi)容時(shí)觸發(fā)。在讀取內(nèi)容之前,線程先等待信箱已經(jīng)填充了內(nèi)容,讀取之后觸發(fā)空信號量。在寫入內(nèi)容之前,線程先等待信箱內(nèi)容清空,寫入之后觸發(fā)滿信號量。注意:空信號量在一開始時(shí)就被觸發(fā)了。

public sealed class ThreadMailBox : IMailBox {   private object content;   private ThreadSemaphore empty, full;    public ThreadMailBox()   {     empty = new ThreadSemaphore(1,1);     full = new ThreadSemaphore(0,1);   }    public object Content   {     get     {       full.Acquire();       object item = content;       empty.Release();       return item;     }     set      {       empty.Acquire();       content = value;       full.Release();     }   } }

4.2  跨進(jìn)程信箱

跨進(jìn)程信箱與跨線程信箱的實(shí)現(xiàn)基本上一樣簡單。不同的是我們使用兩個(gè)跨進(jìn)程的信號量,并且我們使用內(nèi)存映射文件來代替類成員變量。由于序列化可能會失敗,我們使用了一小段異常處理來回滾信箱的狀態(tài)。失敗的原因有很多(無效句柄,拒絕訪問,文件大小問題,Serializable屬性缺失等等)。

public sealed class ProcessMailBox : IMailBox, IDisposable {   private MemoryMappedFile file;   private MemoryMappedFileView view;   private ProcessSemaphore empty, full;    public ProcessMailBox(string name,int size)   {     empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1);     full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1);     file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox",       MemoryMappedFile.FileAccess.ReadWrite,size);     view = file.CreateView(0,size,      MemoryMappedFileView.ViewAccess.ReadWrite);   }    public object Content   {     get     {       full.Acquire();       object item;       try {item = view.ReadDeserialize();}       catch(Exception e)       {  //Rollback         full.Release();         throw e;       }       empty.Release();       return item;     }      set      {       empty.Acquire();       try {view.WriteSerialize(value);}       catch(Exception e)       {  //Rollback         empty.Release();         throw e;       }       full.Release();     }   }    #region IDisposable Member   public void Dispose()   {     view.Dispose();     file.Dispose();     empty.Dispose();     full.Dispose();   }   #endregion }

到這里我們已經(jīng)實(shí)現(xiàn)了跨進(jìn)程消息傳遞(IPC)所需要的組件。你可能需要再回頭本文開頭的那個(gè)例子,看看  ProcessMailBox  應(yīng)該如何使用

5.通道:基于隊(duì)列的消息傳遞

信箱最大的限制是它們每次只能保存一個(gè)對象。如果一系列線程(使用同一個(gè)信箱)中的一個(gè)線程需要比較長的時(shí)間來處理特定的命令,那么整個(gè)系列都會阻塞。通常我們會使用緩沖的消息通道來處理,這樣你可以在方便的時(shí)候從中讀取消息,而不會阻塞消息發(fā)送者。這種緩沖通過通道來實(shí)現(xiàn),這里的通道比信箱要復(fù)雜一些。同樣,我們將分別從線程和進(jìn)程級別來討論通道的實(shí)現(xiàn)。

5.1 可靠性

信箱和通道的另一個(gè)重要的不同是:通道擁有可靠性。例如:自動將發(fā)送失?。赡苡捎诰€程等待鎖的過程中被中斷)的消息轉(zhuǎn)存到一個(gè)內(nèi)置的容器中。這意味著處理通道的線程可以安全地停止,同時(shí)不會丟失隊(duì)列中的消息。這通過兩個(gè)抽象類來實(shí)現(xiàn),  ThreadReliability 和  ProcessReliability。每個(gè)通道的實(shí)現(xiàn)類都繼承其中的一個(gè)類。

5.2 跨線程的通道

跨線程的通道基于信箱來實(shí)現(xiàn),但是使用一個(gè)同步的隊(duì)列來作為消息緩沖而不是一個(gè)變量。得益于信號量,通道在空隊(duì)列時(shí)阻塞接收線程,在隊(duì)列滿時(shí)阻塞發(fā)送線程。這樣你就不會碰到由入隊(duì)/出隊(duì)引發(fā)的錯(cuò)誤。為了實(shí)現(xiàn)這個(gè)效果,我們用隊(duì)列大小來初始化空信號量,用0來初始化滿信號量。如果某個(gè)發(fā)送線程在等待入隊(duì)的時(shí)候被中斷,我們將消息復(fù)制到內(nèi)置容器中,并將異常往外面拋。在接收操作中,我們不需要做異常處理,因?yàn)榧词咕€程被中斷你也不會丟失任何消息。注意:線程只有在阻塞狀態(tài)才能被中斷,就像調(diào)用信號量的獲取操作(Aquire)方法時(shí)。

public sealed class ThreadChannel : ThreadReliability, IChannel {   private Queue queue;   private ThreadSemaphore empty, full;    public ThreadChannel(int size)   {     queue = Queue.Synchronized(new Queue(size));     empty = new ThreadSemaphore(size,size);     full = new ThreadSemaphore(0,size);   }    public void Send(object item)   {     try {empty.Acquire();}     catch(System.Threading.ThreadInterruptedException e)     {       DumpItem(item);       throw e;     }     queue.Enqueue(item);     full.Release();   }    public void Send(object item, TimeSpan timeout)   {     try {empty.Acquire(timeout);}     ...   }    public object Receive()   {     full.Acquire();     object item = queue.Dequeue();     empty.Release();     return item;   }    public object Receive(TimeSpan timeout)   {     full.Acquire(timeout);     ...   }      protected override void DumpStructure()   {     lock(queue.SyncRoot)     {       foreach(object item in queue)         DumpItem(item);       queue.Clear();     }   } }

5.3 跨進(jìn)程通道

實(shí)現(xiàn)跨進(jìn)程通道有點(diǎn)麻煩,因?yàn)槟阈枰紫忍峁┮粋€(gè)跨進(jìn)程的緩沖區(qū)。一個(gè)可能的解決方法是使用跨進(jìn)程信箱并根據(jù)需要將接收/發(fā)送方法加入隊(duì)列。為了避免這種方案的幾個(gè)缺點(diǎn),我們將直接使用內(nèi)存映射文件來實(shí)現(xiàn)一個(gè)隊(duì)列。MemoryMappedArray  類將內(nèi)存映射文件分成幾部分,可以直接使用數(shù)組索引來訪問。  MemoryMappedQueue  類,為這個(gè)數(shù)組提供了一個(gè)經(jīng)典的環(huán)(更多細(xì)節(jié)請查看附件中的代碼)。為了支持直接以  byte/integer  類型訪問數(shù)據(jù)并同時(shí)支持二進(jìn)制序列化,調(diào)用方需要先調(diào)用入隊(duì)(Enqueue)/出隊(duì)(Dequeue)操作,然后根據(jù)需要使用讀寫方法(隊(duì)列會自動將數(shù)據(jù)放到正確的位置)。這兩個(gè)類都不是線程和進(jìn)程安全的,所以我們需要使用跨進(jìn)程的信號量來模擬互斥量(也可以使用  win32  互斥量),以此實(shí)現(xiàn)相互間的互斥訪問。除了這兩個(gè)類,跨進(jìn)程的通道基本上和跨線程信箱一樣。同樣,我們也需要在  Send() 中處理線程中斷及序列化可能失敗的問題。

public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable {   private MemoryMappedFile file;   private MemoryMappedFileView view;   private MemoryMappedQueue queue;   private ProcessSemaphore empty, full, mutex;    public ProcessChannel( int size, string name, int maxBytesPerEntry)   {     int fileSize = 64+size*maxBytesPerEntry;      empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size);     full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size);     mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1);     file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel",       MemoryMappedFile.FileAccess.ReadWrite,fileSize);     view = file.CreateView(0,fileSize,      MemoryMappedFileView.ViewAccess.ReadWrite);     queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0);     if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry)       throw new MemoryMappedArrayFailedException();   }    public void Send(object item)   {     try {empty.Acquire();}     catch(System.Threading.ThreadInterruptedException e)     {       DumpItemSynchronized(item);       throw e;     }     try {mutex.Acquire();}     catch(System.Threading.ThreadInterruptedException e)     {       DumpItemSynchronized(item);       empty.Release();       throw e;     }     queue.Enqueue();     try {queue.WriteSerialize(item,0);}     catch(Exception e)     {       queue.RollbackEnqueue();       mutex.Release();       empty.Release();       throw e;     }     mutex.Release();     full.Release();   }    public void Send(object item, TimeSpan timeout)   {     try {empty.Acquire(timeout);}     ...   }    public object Receive()   {     full.Acquire();     mutex.Acquire();     object item;     queue.Dequeue();     try {item = queue.ReadDeserialize(0);}     catch(Exception e)     {       queue.RollbackDequeue();       mutex.Release();       full.Release();       throw e;     }     mutex.Release();     empty.Release();     return item;   }    public object Receive(TimeSpan timeout)   {     full.Acquire(timeout);     ...   }      protected override void DumpStructure()   {     mutex.Acquire();     byte[][] dmp = queue.DumpClearAll();     for(int i=0;i<dmp.Length;i++)       DumpItemSynchronized(dmp[i]);     mutex.Release();   }    #region IDisposable Member   public void Dispose()   {     view.Dispose();     file.Dispose();     empty.Dispose();     full.Dispose();     mutex.Dispose();   }   #endregion }

6. 消息路由

我們目前已經(jīng)實(shí)現(xiàn)了線程和進(jìn)程同步及消息傳遞機(jī)制(使用信箱和通道)。當(dāng)你使用阻塞隊(duì)列的時(shí)候,有可能會遇到這樣的問題:你需要在一個(gè)線程中同時(shí)監(jiān)聽多個(gè)隊(duì)列。為了解決這樣的問題,我們提供了一些小型的類:通道轉(zhuǎn)發(fā)器,多用復(fù)用器,多路復(fù)用解碼器和通道事件網(wǎng)關(guān)。你也可以通過簡單的  IRunnable  模式來實(shí)現(xiàn)類似的通道處理器。IRunnable模式由兩個(gè)抽象類SingleRunnable和  MultiRunnable 來提供(具體細(xì)節(jié)請參考附件中的代碼)。

6.1 通道轉(zhuǎn)發(fā)器

通道轉(zhuǎn)發(fā)器僅僅監(jiān)聽一個(gè)通道,然后將收到的消息轉(zhuǎn)發(fā)到另一個(gè)通道。如果有必要,轉(zhuǎn)發(fā)器可以將每個(gè)收到的消息放到一個(gè)信封中,并加上一個(gè)數(shù)字標(biāo)記,然后再轉(zhuǎn)發(fā)出去(下面的多路利用器使用了這個(gè)特性)。

public class ChannelForwarder : SingleRunnable {   private IChannel source, target;   private readonly int envelope;    public ChannelForwarder(IChannel source,     IChannel target, bool autoStart, bool waitOnStop)     : base(true,autoStart,waitOnStop)   {     this.source = source;     this.target = target;     this.envelope = -1;   }   public ChannelForwarder(IChannel source, IChannel target,     int envelope, bool autoStart, bool waitOnStop)     : base(true,autoStart,waitOnStop)   {     this.source = source;     this.target = target;     this.envelope = envelope;   }     protected override void Run()   {  //NOTE: IChannel.Send is interrupt save and       //automatically dumps the argument.      if(envelope == -1)       while(running)         target.Send(source.Receive());     else     {       MessageEnvelope env;       env.ID = envelope;       while(running)       {         env.Message = source.Receive();         target.Send(env);       }     }   } }

6.2 通道多路復(fù)用器和通道復(fù)用解碼器

通道多路復(fù)用器監(jiān)聽多個(gè)來源的通道并將接收到的消息(消息使用信封來標(biāo)記來源消息)轉(zhuǎn)發(fā)到一個(gè)公共的輸出通道。這樣就可以一次性地監(jiān)聽多個(gè)通道。復(fù)用解碼器則是監(jiān)聽一個(gè)公共的輸出通道,然后根據(jù)信封將消息轉(zhuǎn)發(fā)到某個(gè)指定的輸出通道。

public class ChannelMultiplexer : MultiRunnable {   private ChannelForwarder[] forwarders;    public ChannelMultiplexer(IChannel[] channels, int[] ids,      IChannel output, bool autoStart, bool waitOnStop)   {     int count = channels.Length;     if(count != ids.Length)       throw new ArgumentException("Channel and ID count mismatch.","ids");      forwarders = new ChannelForwarder[count];     for(int i=0;i<count;i++)       forwarders[i] = new ChannelForwarder(channels[i],        output,ids[i],autoStart,waitOnStop);      SetRunnables((SingleRunnable[])forwarders);   } }  public class ChannelDemultiplexer : SingleRunnable {   private HybridDictionary dictionary;   private IChannel input;    public ChannelDemultiplexer(IChannel[] channels, int[] ids,      IChannel input, bool autoStart, bool waitOnStop)     : base(true,autoStart,waitOnStop)   {     this.input = input;      int count = channels.Length;     if(count != ids.Length)       throw new ArgumentException("Channel and ID count mismatch.","ids");      dictionary = new HybridDictionary(count,true);     for(int i=0;i<count;i++)       dictionary.add(ids[i],channels[i]);   }    protected override void Run()   {  //NOTE: IChannel.Send is interrupt save and       //automatically dumps the argument.     while(running)     {       MessageEnvelope env = (MessageEnvelope)input.Receive();       IChannel channel = (IChannel)dictionary[env.ID];       channel.send(env.Message);     }   } }

6.3 通道事件網(wǎng)關(guān)

通道事件網(wǎng)關(guān)監(jiān)聽指定的通道,在接收到消息時(shí)觸發(fā)一個(gè)事件。這個(gè)類對于基于事件的程序(例如GUI程序)很有用,或者在使用系統(tǒng)線程池(ThreadPool)來初始化輕量的線程。需要注意的是:使用  WinForms  的程序中你不能在事件處理方法中直接訪問UI控件,只能調(diào)用Invoke  方法。因?yàn)槭录幚矸椒ㄊ怯墒录W(wǎng)關(guān)線程調(diào)用的,而不是UI線程。

public class ChannelEventGateway : SingleRunnable {   private IChannel source;   public event MessageReceivedEventHandler MessageReceived;    public ChannelEventGateway(IChannel source, bool autoStart,    bool waitOnStop) : base(true,autoStart,waitOnStop)   {     this.source = source;   }      protected override void Run()   {     while(running)     {       object c = source.Receive();       MessageReceivedEventHandler handler = MessageReceived;       if(handler != null)         handler(this,new MessageReceivedEventArgs(c));     }   } }

7. 比薩外賣店的例子 

萬事俱備,只欠東風(fēng)。我們已經(jīng)討論了這個(gè)同步及消息傳遞框架中的大部分重要的結(jié)構(gòu)和技術(shù)(本文沒有討論框架中的其他類如Rendezvous及Barrier)。就像開頭一樣,我們用一個(gè)例子來結(jié)束這篇文章。這次我們用一個(gè)小型比薩外賣店來做演示。下圖展示了這個(gè)例子:四個(gè)并行進(jìn)程相互之間進(jìn)行通訊。圖中展示了消息(數(shù)據(jù))是如何使用跨進(jìn)程通道在四個(gè)進(jìn)程中流動的,且在每個(gè)進(jìn)程中使用了性能更佳的跨線程通道和信箱。

如何使C#框架的進(jìn)程間同步通訊

一開始,一個(gè)顧客點(diǎn)了一個(gè)比薩和一些飲料。他調(diào)用了顧客(customer)接口的方法,向顧客訂單(CustomerOrders)通道發(fā)送了一個(gè)下單(Order)消息。接單員,在顧客下單后,發(fā)送了兩條配餐指令(分別對應(yīng)比薩和飲料)到廚師指令(CookInstruction)通道。同時(shí)他通過收銀(CashierOrder)通道將訂單轉(zhuǎn)發(fā)給收銀臺。收銀臺從價(jià)格中心獲取總價(jià)并將票據(jù)發(fā)給顧客,希望能提高收銀的速度  。與此同時(shí),廚師將根據(jù)配餐指令將餐配好之后交給打包員工。打包員工處理好之后,等待顧客付款,然后將外賣遞給顧客。

為了運(yùn)行這個(gè)例子,打開4個(gè)終端(cmd.exe),用  "PizzaDemo.exe cook" 啟動多個(gè)廚師進(jìn)程(多少個(gè)都可以),用  "PizzaDemo.exe backend" 啟動后端進(jìn)程,用  "PizzaDemo.exe facade"  啟動顧客接口門面(用你的程序名稱來代替 PizzaDemo  )。注意:為了模擬真實(shí)情景,某些線程(例如廚師線程)會隨機(jī)休眠幾秒。按下回車鍵就會停止和退出進(jìn)程。如果你在進(jìn)程正在處理數(shù)據(jù)的時(shí)候退出,你將可以在內(nèi)存轉(zhuǎn)存報(bào)告的結(jié)尾看到幾個(gè)未處理的消息。在真實(shí)世界的程序里面,消息一般都會被轉(zhuǎn)存到磁盤中,以便下次可以使用。

這個(gè)例子使用了上文中討論過的幾個(gè)機(jī)制。比如說,收銀臺使用一個(gè)通道復(fù)用器(ChannelMultiplexer)來監(jiān)聽顧客的訂單和支付通道,用了兩個(gè)信箱來實(shí)現(xiàn)價(jià)格服務(wù)。分發(fā)時(shí)使用了一個(gè)通道事件網(wǎng)關(guān)(ChannelEventGateway),顧客在食物打包完成之后馬上會收到通知。你也可以將這些程序注冊成  Windows NT  服務(wù)運(yùn)行,也可以遠(yuǎn)程登錄后運(yùn)行。

關(guān)于如何使C#框架的進(jìn)程間同步通訊問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI