溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Parallel類如何c# 中使用

發(fā)布時(shí)間:2020-11-16 14:55:33 來源:億速云 閱讀:227 作者:Leah 欄目:開發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)Parallel類如何c# 中使用,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

  Parallel類是對(duì)線程的抽象,提供數(shù)據(jù)與任務(wù)的并行性。類定義了靜態(tài)方法For和ForEach,使用多個(gè)任務(wù)來完成多個(gè)作業(yè)。Parallel.For和Parallel.ForEach方法在每次迭代的時(shí)候調(diào)用相同的代碼,而Parallel.Invoke()方法允許同時(shí)調(diào)用不同的方法。Parallel.ForEach()方法用于數(shù)據(jù)的并行性,Parallel.Invoke()方法用于任務(wù)的并行性。

1、For()方法

  For()方法用于多次執(zhí)行一個(gè)任務(wù),可以并行運(yùn)行迭代,但迭代的順序并沒指定。For()方法前兩個(gè)參數(shù)為定義循環(huán)的開始和結(jié)束,第三個(gè)參數(shù)為Action<int>委托。方法的返回值是ParallelLoopResult結(jié)構(gòu),它提供了是否結(jié)束的信息。如以下循環(huán)方法,不能保證輸出順序: 

static void ParallelFor()
{
  ParallelLoopResult result =
    Parallel.For(0, 10, async i =>
      {
        Console.WriteLine("{0}, task: {1}, thread: {2}", i,
          Task.CurrentId, Thread.CurrentThread.ManagedThreadId);

        await Task.Delay(10);//異步方法,用于釋放線程供其他任務(wù)使用。完成后,可能看不到方法的輸出,因?yàn)橹?前臺(tái)線)程結(jié)束,所有的后臺(tái)線程也將結(jié)束
        Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
      });
  Console.WriteLine("Is completed: {0}", result.IsCompleted);
}

  異步功能雖然方便,但是知道后臺(tái)發(fā)生了什么仍然重要,必須留意。

提前停止For()方法

  可以根據(jù)條件提前停止For()方法,而不必完成全部的迭代。,傳入?yún)?shù)ParallelLoopState的對(duì)象,調(diào)用Break()方法或者Stop()方法。如調(diào)用Break()方法,當(dāng)?shù)荡笥?5的時(shí)候中斷(當(dāng)前線程結(jié)束,類似于普通for的Continue),但其他任務(wù)可以同時(shí)運(yùn)行,有其他值的任務(wù)也可以運(yùn)行(如果當(dāng)前線程是主線程,那么就等同于Stop(),結(jié)束所有線程)。Stop()方法結(jié)束的是所有操作(類似于普通for的Break)。利用LowestBreakIteration屬性可以忽略其他任務(wù)的結(jié)果:

static void ParallelFor()
{
  ParallelLoopResult result = Parallel.For(10, 40, (int i, ParallelLoopState pls) =>
     {
       Console.WriteLine("i: {0} task {1}", i, Task.CurrentId);
       Thread.Sleep(10);
       if (i > 15)
         pls.Break();
     });
  Console.WriteLine("Is completed: {0}", result.IsCompleted);
  if (!result.IsCompleted)
    Console.WriteLine("lowest break iteration: {0}", result.LowestBreakIteration);
}

  For()方法可以使用幾個(gè)線程執(zhí)行循環(huán)。如果要對(duì)每個(gè)線程進(jìn)行初始化,就需要使用到For<TLocal>(int, int, Func<TLocal>, Func<int, ParallelLoopState, TLocal, TLocal> , Action<TLocal>)方法。

  • 前兩個(gè)參數(shù)是對(duì)應(yīng)的循環(huán)起始和終止條件;
  • 第二個(gè)參數(shù)類型是Func<TLocal>,返回一個(gè)值,傳遞給第三個(gè)參數(shù)。
  • 第三個(gè)參數(shù)類型是Func<int, ParallelLoopState, TLocal, TLocal>,是循環(huán)體的委托,其內(nèi)部的第一個(gè)參數(shù)是循環(huán)迭代,內(nèi)部第二個(gè)參數(shù)允許停止迭代,內(nèi)部第三個(gè)參數(shù)用于接收For()方法的前一個(gè)參數(shù)的返回值。循環(huán)體應(yīng)當(dāng)返回與For()循環(huán)泛型類型一致的值。
  • 第四個(gè)參數(shù)是指定的一個(gè)委托,用于執(zhí)行相關(guān)后續(xù)操作。
static void ParallelFor()
{
  Parallel.For<string>(0, 20, () =>
   {
     // invoked once for each thread
     Console.WriteLine("init thread {0}, task {1}", Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
     return String.Format("t{0}", Thread.CurrentThread.ManagedThreadId);
   },
   (i, pls, str1) =>
   {
     // invoked for each member
     Console.WriteLine("body i {0} str1 {1} thread {2} task {3}", i, str1, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
     Thread.Sleep(10);
     return String.Format("i {0}", i);
   },
   (str1) =>
   {
     // final action on each thread
     Console.WriteLine("finally {0}", str1);
   });
}

2、使用ForEach()方法循環(huán)

  ForEach()方法遍歷實(shí)現(xiàn)了IEnumerable的集合,其方式類似于foreach語句,但是以異步方式遍歷,沒有確定的順序。如果要中斷循環(huán),同樣可以采用ParallelLoopState參數(shù)。ForEach<TSource>有許多泛型的重載方法。

static void ParallelForeach()
{
  string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve" };

  ParallelLoopResult result = Parallel.ForEach<string>(data, s =>
       {
         Console.WriteLine(s);
       });
  Parallel.ForEach<string>(data, (s, pls, l) =>
  {
    Console.WriteLine("{0} {1}", s, l);
  });
}

3、調(diào)用多個(gè)方法
  如果有多個(gè)任務(wù)并行,可以使用Parallel.Invoke()方法,它提供任務(wù)的并行性模式:

static void ParallelInvoke()
{
  Parallel.Invoke(Foo, Bar);
}

static void Foo()
{
  Console.WriteLine("foo");
}

static void Bar()
{
  Console.WriteLine("bar");
}

4、For()方法的取消

  在For()方法的重載方法中,可以傳遞一個(gè)ParallelOptions類型的參數(shù),利用此參數(shù)可以傳遞一個(gè)CancellationToken參數(shù)。使用CancellationTokenSource對(duì)象用于注冊(cè)CancellationToken,并允許調(diào)用Cancel方法用于取消操作。

  一旦取消操作,F(xiàn)or()方法就拋出一個(gè)OperationCanceledException類型的異常,使用CancellationToken可以注冊(cè)取消操作時(shí)的信息。調(diào)用Register方法,傳遞一個(gè)在取消操作時(shí)調(diào)用的委托。通過取消操作,可以將其他的迭代操作在啟動(dòng)之前取消,但已經(jīng)啟動(dòng)的迭代操作允許完成。取消操作是以協(xié)作方式進(jìn)行的,以避免在取消迭代操作的中間泄露資源。

static void CancelParallelLoop()
{
  var cts = new CancellationTokenSource();
  cts.Token.ThrowIfCancellationRequested();
  cts.Token.Register(() => Console.WriteLine("** token cancelled"));
  // 在500ms后取消標(biāo)記
  cts.CancelAfter(500);
  try
  {
    ParallelLoopResult result = Parallel.For(0, 100,
      new ParallelOptions()
      {
        CancellationToken = cts.Token
      },
        x =>
        {
          Console.WriteLine("loop {0} started", x);
          int sum = 0;
          for (int i = 0; i < 100; i++)
          {
            Thread.Sleep(2);
            sum += i;
          }
          Console.WriteLine("loop {0} finished", x);
        });
  }
  catch (OperationCanceledException ex)
  {
    Console.WriteLine(ex.Message);
  }
}

5、發(fā)現(xiàn)存在的問題

  使用并行循環(huán)時(shí),若出現(xiàn)以下兩個(gè)問題,需要使用Partitioner(命名空間 System.Collections.Concurrent中)解決。

  1. 使用并行循環(huán)時(shí),應(yīng)確保每次迭代的工作量要明顯大于同步共享狀態(tài)的開銷。 如果循環(huán)把時(shí)間都耗在了阻塞式的訪問共享的循環(huán)變量上,那么并行執(zhí)行的好處就很容易完全喪失。盡可能讓每次循環(huán)迭代都只是在局部進(jìn)行,避免阻塞式訪問造成的損耗。見示例1
  2. 并行循環(huán)的每一次迭代都會(huì)生成一個(gè)委托,如果每次生成委托或方法的開銷比迭代完成的工作量大,使用并行方案就不適合了(委托會(huì)設(shè)計(jì)兩類開銷:構(gòu)造開銷和調(diào)用開銷。大多數(shù)調(diào)用開銷和普通方法的調(diào)用差不多。 但委托是一種對(duì)象,構(gòu)造開銷可能相當(dāng)大,最好是只做一次構(gòu)造,然后把對(duì)象緩存起來)。見示例2

  示例1中,求1000000000以內(nèi)所有自然數(shù)開方的和。第一部分采用直接計(jì)算的方式,第二部分采用分區(qū)計(jì)算。第二部分的Partitioner 會(huì)把需要迭代的區(qū)間分拆為多個(gè)不同的空間,并存入Tuple對(duì)象中。

/*   示例1  */public static void PartitionerTest()
{
  //使用計(jì)時(shí)器
  System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch();
  const int maxValue = 1000000000;
  long sum = 0;
  stopwatch.Restart();//開始計(jì)時(shí)
  Parallel.For(0, maxValue, (i) => {
    Interlocked.Add(ref sum, (long )Math.Sqrt(i));//Interlocked是原子操作,多線程訪問時(shí)的線程互斥操作
  });
  stopwatch.Stop();
  Console.WriteLine($"Parallel.For:{stopwatch.Elapsed}");//我的機(jī)器運(yùn)行出的時(shí)間是:00:01:37.0391204


  var partitioner = System.Collections.Concurrent.Partitioner.Create(0, maxValue);//拆分區(qū)間
  sum = 0;
  stopwatch.Restart();
  Parallel.ForEach(partitioner, (rang) => {
    long partialSum = 0;
    //迭代區(qū)間的數(shù)據(jù)
    for(int i=rang.Item1;i<rang.Item2;i++)
    {
      partialSum += (long)Math.Sqrt(i);
    }
    Interlocked.Add(ref sum, partialSum);//原子操作
  });
  stopwatch.Stop();
  Console.WriteLine($"Parallel.ForEach:{stopwatch.Elapsed}"); //我的機(jī)器運(yùn)行出的時(shí)間是:00:00:02.7111666
}

  Partitioner的分區(qū)是靜態(tài)的,只要迭代分區(qū)劃分完成,每個(gè)分區(qū)上都會(huì)運(yùn)行一個(gè)委托。如果某一段區(qū)間的迭代次數(shù)提前完成,也不會(huì)嘗試重新分區(qū)并讓處理器分擔(dān)工作。 對(duì)于任意IEnumerable<T>類型都可以創(chuàng)建不指定區(qū)間的分區(qū),但這樣就會(huì)讓每個(gè)迭代項(xiàng)目都創(chuàng)建一個(gè)委托,而不是對(duì)每個(gè)區(qū)間創(chuàng)建委托。創(chuàng)建自定義的Partitioner可以解決這個(gè)問題,代碼比較復(fù)雜。請(qǐng)自行參閱:http://www.writinghighperf.net/go/20

  示例2中,采用一個(gè)委托方法來計(jì)算兩個(gè)數(shù)之間的關(guān)系值。前一種是每次運(yùn)行都重新構(gòu)造委托,后一種是先構(gòu)造出委托的方法而后每一次調(diào)用。

//聲明一個(gè)委托
 private delegate int MathOp(int x, int y);
 private int Add(int x,int y)
 {
   return x + y;
 }

 private int DoOperation(MathOp op,int x,int y)
 {
   return op(x, y);
 }

 /*
 * 委托會(huì)設(shè)計(jì)兩類開銷:構(gòu)造開銷和調(diào)用開銷。大多數(shù)調(diào)用開銷和普通方法的調(diào)用差不多。 但委托是一種對(duì)象,構(gòu)造開銷可能相當(dāng)大,最好是只做一次構(gòu)造,然后把對(duì)象緩存起來。
 */
 public void Test()
 {
   System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch();
   stopwatch.Restart();
   for(int i=0;i<10;i++)
   {
     //每一次遍歷循環(huán),都會(huì)產(chǎn)生一次構(gòu)造和調(diào)用開銷
     DoOperation(Add, 1, 2);
   }
   stopwatch.Stop();
   Console.WriteLine("Construction and invocation: {0}", stopwatch.Elapsed);//00:00:00.0003812

   stopwatch.Restart();
   MathOp op = Add;//只產(chǎn)生一次構(gòu)造開銷
   for(int i=0;i<10;i++)
   {
     DoOperation(op, 1, 2);//每一次遍歷都只產(chǎn)生遍歷開銷
   }
   stopwatch.Stop();
   Console.WriteLine("Once Construction and invocation: {0}", stopwatch.Elapsed);//00:00:00.0000011
 }

關(guān)于Parallel類如何c# 中使用就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI