溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

C# 中使用迭代器等待任務(wù)的操作是怎樣的

發(fā)布時間:2021-09-18 17:31:57 來源:億速云 閱讀:119 作者:柒染 欄目:編程語言

這篇文章將為大家詳細(xì)講解有關(guān)C# 中使用迭代器等待任務(wù)的操作方法,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

可能你已經(jīng)閱讀 C#5  關(guān)于 async 和 await 關(guān)鍵字以及它們?nèi)绾螏椭喕惒骄幊痰?可惜的是在升級VS2010后短短兩年時間,任然沒有準(zhǔn)備好升級到VS2012,在VS2010和C#4中不能使用異步關(guān)鍵字,你可能會想 “如果我能在VS  2010中寫看起來同步的方法,但異步執(zhí)行.我的代碼會更清晰.”

我們必須承認(rèn),async 和 await 是非常好的語法糖,我們的方法需要編寫更多的"AsyncResultcallback"方法適應(yīng)這種變化.而當(dāng)你終于升級到VS2012(或以后),這將是一件微不足道的小事,用C#關(guān)鍵字替換這個方法,只要簡單的語法變化,而不是一個艱苦的結(jié)構(gòu)重寫。

概要

async/await 是基于異步任務(wù)模式的關(guān)鍵字。鑒于 此處已經(jīng)有了非常完備的文檔描述,這里我就不再加以說明。但必須指出的是,TAP簡直帥到極點了!通過它你可以創(chuàng)建大量的將在未來某時間完成的小型單元工作(任務(wù));任務(wù)可以啟動其他的(嵌套)任務(wù)  并且/或者  建立一些僅當(dāng)前置任務(wù)完成后才會啟動的后續(xù)任務(wù)。前置與后續(xù)任務(wù)則可以鏈接為一對多或是多對一的關(guān)系。當(dāng)內(nèi)嵌任務(wù)完成時,父級任務(wù)無需與線程(重量級資源!)相綁定。執(zhí)行任務(wù)時也不必再擔(dān)心線程的時序安排,只需作出一些小小提示,框架將會自動為你處理這些事情。當(dāng)程序開始運行,所有的任務(wù)將如溪流匯入大海般各自走向終點,又像柏青哥的小鐵球一樣相互反彈相互作用。

然而在C#4里面我們卻沒有async和await,不過缺少的也只是這一點點.Net5的新特性而已,這些新特性我們要么可以稍作回避,要么可以自己構(gòu)建,關(guān)鍵的Task類型還是可用的。

在一個C#5的異步(async)方法里,你要等待一個Task。這不會導(dǎo)致線程等待;而是這個方法返回一個Task給它的調(diào)用者,這個Task能夠等待(如果它自己是異步的)或者附上后續(xù)部分。(它同樣能在任務(wù)中或它的結(jié)果中調(diào)用Wait(),但這會和線程耦合,所以避免那樣做。)當(dāng)?shù)却娜蝿?wù)成功完成,你的異步方法會在它中斷的地方繼續(xù)運行。

也許你會知道,C#5的編譯器會重寫它的異步方法為一個生成的實現(xiàn)了狀態(tài)機的嵌套類。C#正好還有一個特征(從2.0開始):迭代器(yield return  的方式)。這里的方法是使用一個迭代器方法在C#4中建造狀態(tài)機,返回一系列在全部處理過程中的等待步驟的Task。我們可以編寫一個方法接收一個從迭代器返回的任務(wù)的枚舉,返回一個重載過的Task來代表全部序列的完成以及提供它的最終結(jié)果(如果有)。

最終目標(biāo)

Stephen  Covey 建議我們目標(biāo)有先后。這就是我們現(xiàn)在做的。已經(jīng)有大量例子來告訴我們?nèi)绾问褂胊sync/await來實現(xiàn)SLAMs(synchronous-looking  asynchronous methods)。那么我們不使用這些關(guān)鍵字如何實現(xiàn)這個功能。我們來做一個C#5  async的例子,看看如何在C#4里實現(xiàn)它。然后我們討論一下轉(zhuǎn)換這些代碼的一般方法。

下面的例子展示了我們在C#5里實現(xiàn)異步讀寫方法Stream.CopyToAsync()的一種寫法。假設(shè)這個方法并沒有在.NET5里實現(xiàn)。

public static async Task CopyToAsync(     this Stream input, Stream output,     CancellationToken cancellationToken = default(CancellationToken)) {     byte[] buffer = new byte[0x1000];   // 4 KiB     while (true) {         cancellationToken.ThrowIfCancellationRequested();         int bytesRead = await input.ReadAsync(buffer, 0, buffer.Length);         if (bytesRead == 0) break;           cancellationToken.ThrowIfCancellationRequested();         await output.WriteAsync(buffer, 0, bytesRead);     } }

對C#4,我們將分成兩塊:一個是相同訪問能力的方法,另一個是私有方法,參數(shù)一樣但返回類型不同。私有方法用迭代實現(xiàn)同樣的處理,結(jié)果是一連串等待的任務(wù)(IEnumerable<Task>)。序列中的實際任務(wù)可以是非泛型或者不同類型泛型的任意組合。(幸運的是,泛型Task<T>類型是非泛型Task類型的子類型)

相同訪問能力(公用)方法返回與相應(yīng)async方法一致的類型:void,Task,或者泛型Task<T>。它將使用擴展方法調(diào)用私有迭代器并轉(zhuǎn)化為Task或者Task<T>。

public static /*async*/ Task CopyToAsync(     this Stream input, Stream output,     CancellationToken cancellationToken = default(CancellationToken)) {     return CopyToAsyncTasks(input, output, cancellationToken).ToTask(); } private static IEnumerable<Task> CopyToAsyncTasks(     Stream input, Stream output,     CancellationToken cancellationToken) {     byte[] buffer = new byte[0x1000];   // 4 KiB     while (true) {         cancellationToken.ThrowIfCancellationRequested();         var bytesReadTask = input.ReadAsync(buffer, 0, buffer.Length);         yield return bytesReadTask;         if (bytesReadTask.Result == 0) break;           cancellationToken.ThrowIfCancellationRequested();         yield return output.WriteAsync(buffer, 0, bytesReadTask.Result);     } }

異步方法通常以"Async"結(jié)尾命名(除非它是事件處理器如startButton_Click)。給迭代器以同樣的名字后跟“Tasks”(如startButton_ClickTasks)。如果異步方法返回void值,它仍然會調(diào)用ToTask()但不會返回Task。如果異步方法返回Task<X>,那么它就會調(diào)用通用的ToTask<X>()擴展方法。對應(yīng)三種返回類型,異步可替代的方法像下面這樣:

public /*async*/ void DoSomethingAsync() {     DoSomethingAsyncTasks().ToTask(); } public /*async*/ Task DoSomethingAsync() {     return DoSomethingAsyncTasks().ToTask(); } public /*async*/ Task<String> DoSomethingAsync() {     return DoSomethingAsyncTasks().ToTask<String>(); }

成對的迭代器方法不會更復(fù)雜。當(dāng)異步方法等待非通用的Task時,迭代器簡單的將控制權(quán)轉(zhuǎn)給它。當(dāng)異步方法等待task結(jié)果時,迭代器將task保存在一個變量中,轉(zhuǎn)到該方法,之后再使用它的返回值。兩種情況在上面的CopyToAsyncTasks()例子里都有顯示。

對包含通用resultTask<X>的SLAM,迭代器必須將控制轉(zhuǎn)交給確切的類型。ToTask<X>()將最終的task轉(zhuǎn)換為那種類型以便提取其結(jié)果。經(jīng)常的你的迭代器將計算來自中間task的結(jié)果數(shù)值,而且僅需要將其打包在Task<T>中。.NET  5為此提供了一個方便的靜態(tài)方法。而.NET 4沒有,所以我們用TaskEx.FromResult<T>(value)來實現(xiàn)它。

最后一件你需要知道的事情是如何處理中間返回的值。一個異步的方法可以從多重嵌套的塊中返回;我們的迭代器簡單的通過跳轉(zhuǎn)到結(jié)尾來模仿它。

// C#5 public async Task<String> DoSomethingAsync() {     while (&hellip;) {         foreach (&hellip;) {             return "Result";         }     } }   // C#4;  DoSomethingAsync() is necessary but omitted here. private IEnumerable<Task> DoSomethingAsyncTasks() {     while (&hellip;) {         foreach (&hellip;) {             yield return TaskEx.FromResult("Result");             goto END;         }     } END: ; }

現(xiàn)在我們知道如何在C#4中寫SLAM了,但是只有實現(xiàn)了FromResult<T>()和兩個 ToTask()擴展方法才能真正的做到。下面我們開始做吧。

簡單的開端

我們將在類System.Threading.Tasks.TaskEx下實現(xiàn)3個方法,  先從簡單的那2個方法開始。FromResult()方法先創(chuàng)建了一個TaskCompletionSource(), 然后給它的result賦值,最后返回Task。

public static Task<TResult> FromResult<TResult>(TResult resultValue) {     var completionSource = new TaskCompletionSource<TResult>();     completionSource.SetResult(resultValue);     return completionSource.Task; }

很顯然, 這2個ToTask()方法基本相同, 唯一的區(qū)別就是是否給返回對象Task的Result屬性賦值. 通常我們不會去寫2段相同的代碼,  所以我們會用其中的一個方法來實現(xiàn)另一個。  我們經(jīng)常使用泛型來作為返回結(jié)果集,那樣我們不用在意返回值同時也可以避免在最后進(jìn)行類型轉(zhuǎn)換。 接下來我們先實現(xiàn)那個沒有用泛型的方法。

private abstract class VoidResult { }   public static Task ToTask(this IEnumerable<Task> tasks) {     return ToTask<VoidResult>(tasks); }

目前為止我們就剩下一個  ToTask<T>()方法還沒有實現(xiàn)。

第一次天真的嘗試

對于我們第一次嘗試實現(xiàn)的方法,我們將枚舉每個任務(wù)的Wait()來完成,然后將最終的任務(wù)做為結(jié)果(如果合適的話)。當(dāng)然,我們不想占用當(dāng)前線程,我們將另一個線程來執(zhí)行循環(huán)該任務(wù)。

// BAD CODE ! public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) {     var tcs = new TaskCompletionSource<TResult>();     Task.Factory.StartNew(() => {         Task last = null;         try {             foreach (var task in tasks) {                 last = task;                 task.Wait();             }               // Set the result from the last task returned, unless no result is requested.             tcs.SetResult(                 last == null || typeof(TResult) == typeof(VoidResult)                     ? default(TResult) : ((Task<TResult>) last).Result);           } catch (AggregateException aggrEx) {             // If task.Wait() threw an exception it will be wrapped in an Aggregate; unwrap it.             if (aggrEx.InnerExceptions.Count != 1) tcs.SetException(aggrEx);             else if (aggrEx.InnerException is OperationCanceledException) tcs.SetCanceled();             else tcs.SetException(aggrEx.InnerException);         } catch (OperationCanceledException cancEx) {             tcs.SetCanceled();         } catch (Exception ex) {             tcs.SetException(ex);         }     });     return tcs.Task; }

這里有一些好東西,事實上它真的有用,只要不觸及用戶界面:

它準(zhǔn)確的返回了一個TaskCompletionSource的Task,并且通過源代碼設(shè)置了完成狀態(tài)。

1.它顯示了我們怎么通過迭代器的最后一個任務(wù)設(shè)置task的最終Result,同時避免可能沒有結(jié)果的情況。

2.它從迭代器中捕獲異常并設(shè)置Canceled或Faulted狀態(tài). 它也傳播枚舉的task狀態(tài)  (這里是通過Wait(),該方法可能拋出一個包裝了cancellation或fault的異常的集合).

但這里有些主要的問題。最嚴(yán)重的是:

1.由于迭代器需要實現(xiàn)“異步態(tài)的”的諾言,當(dāng)它從一個UI線程初始化以后,迭代器的方法將能訪問UI控件。你能發(fā)現(xiàn)這里的foreach循環(huán)都是運行在后臺;從那個時刻開始不要觸摸UI!這種方法沒有顧及SynchronizationContext。

2.在UI之外我們也有麻煩。我們可能想制造大量大量的由SLAM實現(xiàn)的并行運行的Tasks。但是看看循環(huán)中的Wait()!當(dāng)?shù)却粋€嵌套task時,可能遠(yuǎn)程需要一個很長的時間完成,我們會掛起一個線程。我們面臨線程池的線程資源枯竭的情況。

3.這種解包Aggregate異常的方法是不太自然的。我們需要捕獲并傳播它的完成狀態(tài)而不拋出異常。

4.有時SLAM可以立刻決定它的完成狀態(tài)。那種情形下,C#5的async可以異步并且有效的操作。這里我們總是計劃了一個后臺task,因此失去了那種可能。

是需要想點辦法的時候了!

連續(xù)循環(huán)

最大的想法是直接從迭代器中獲取其所產(chǎn)生的第一個任務(wù)。 我們創(chuàng)建了一個延續(xù),使其在完成時能夠檢查任務(wù)的狀態(tài)并且(如果成功的話)能接收下一個任務(wù)和創(chuàng)建另一個延續(xù)直至其結(jié)束。(如果沒有,即迭代器沒有需要完成的需求。)

// 很牛逼,但是我們還沒有。 public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) {     var taskScheduler =         SynchronizationContext.Current == null             ? TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext();     var tcs = new TaskCompletionSource<TResult>();     var taskEnumerator = tasks.GetEnumerator();     if (!taskEnumerator.MoveNext()) {         tcs.SetResult(default(TResult));         return tcs.Task;     }       taskEnumerator.Current.ContinueWith(         t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t),         taskScheduler);     return tcs.Task; } private static void ToTaskDoOneStep<TResult>(     IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler,     TaskCompletionSource<TResult> tcs, Task completedTask) {     var status = completedTask.Status;     if (status == TaskStatus.Canceled) {         tcs.SetCanceled();       } else if (status == TaskStatus.Faulted) {         tcs.SetException(completedTask.Exception);       } else if (!taskEnumerator.MoveNext()) {         // 設(shè)置最后任務(wù)返回的結(jié)果,直至無需結(jié)果為止。         tcs.SetResult(             typeof(TResult) == typeof(VoidResult)                 ? default(TResult) : ((Task<TResult>) completedTask).Result);       } else {         taskEnumerator.Current.ContinueWith(             t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t),             taskScheduler);     } }

這里有許多值得分享的:

1.我們的后續(xù)部分(continuations)使用涉及SynchronizationContext的TaskScheduler,如果有的話。這使得我們的迭代器在UI線程初始化以后,立刻或者在一個繼續(xù)點被調(diào)用,去訪問UI控件。

2.進(jìn)程不中斷的運行,因此沒有線程掛起等待!順便說一下,在ToTaskDoOneStep()中對自身的調(diào)用不是遞歸調(diào)用;它是在taskEnumerator.Currenttask結(jié)束后調(diào)用的匿名函數(shù),當(dāng)前活動在調(diào)用ContinueWith()幾乎立刻退出,它完全獨立于后續(xù)部分。

3.此外,我們在繼續(xù)點中驗證每個嵌套task的狀態(tài),不是檢查一個預(yù)測值。

然而,這兒至少有一個大問題和一些小一點的問題。

1.如果迭代器拋出一個未處理異常,或者拋出OperationCanceledException而取消,我們沒有處理它或設(shè)置主task的狀態(tài)。這是我們以前曾經(jīng)做過的但在此版本丟失了。

2.為了修復(fù)問題1,我們不得不在兩個方法中調(diào)用MoveNext()的地方引入同樣的異常處理機制。即使是現(xiàn)在,兩個方法中都有一樣的后續(xù)部分建立。我們違背了“不要重復(fù)你自己”的信條

3.如果異步方法被期望給出一個結(jié)果,但是迭代器沒有提供就退出了會怎么樣呢?或者它最后的task是錯誤的類型呢?第一種情形下,我們默默返回默認(rèn)的結(jié)果類型;第二種情形,我們拋出一個未處理的InvalidCastException,主task永遠(yuǎn)不會到達(dá)結(jié)束狀態(tài)!我們的程序?qū)⒂谰玫膾炱?/p>

4.最后,如果一個嵌套的task取消或者發(fā)生錯誤呢?我們設(shè)置主task狀態(tài),再也不會調(diào)用迭代器??赡苁窃谝粋€using塊,或帶有finally的try塊的內(nèi)部,并且有一些清理要做。我們應(yīng)當(dāng)遵守過程在中斷的時候使它結(jié)束,而不要等垃圾收集器去做這些。我們怎么做到呢?當(dāng)然通過一個后續(xù)部分!

為了解決這些問題,我們從ToTask()中移走M(jìn)oveNext()調(diào)用,取而代之一個對ToTaskDoOneStep()的初始化的同步調(diào)用。然后我們將在一個提防增加合適的異常處理。

最終版本

這里是ToTask<T>()的最終實現(xiàn).  它用一個TaskCompletionSource返回主task,永遠(yuǎn)不會引起線程等待,如果有的話還會涉及SynchronizationContext,由迭代器處理異常,直接傳播嵌套task的結(jié)束(而不是AggregateException),合適的時候向主task返回一個值,當(dāng)期望一個結(jié)果而SLAM迭代器沒有以正確的genericTask<T>類型結(jié)束時,用一個友好的異常報錯。

public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) {     var taskScheduler =         SynchronizationContext.Current == null             ? TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext();     var taskEnumerator = tasks.GetEnumerator();     var completionSource = new TaskCompletionSource<TResult>();       // Clean up the enumerator when the task completes.     completionSource.Task.ContinueWith(t => taskEnumerator.Dispose(), taskScheduler);       ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, null);     return completionSource.Task; }   private static void ToTaskDoOneStep<TResult>(     IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler,     TaskCompletionSource<TResult> completionSource, Task completedTask) {     // Check status of previous nested task (if any), and stop if Canceled or Faulted.     TaskStatus status;     if (completedTask == null) {         // This is the first task from the iterator; skip status check.     } else if ((status = completedTask.Status) == TaskStatus.Canceled) {         completionSource.SetCanceled();         return;     } else if (status == TaskStatus.Faulted) {         completionSource.SetException(completedTask.Exception);         return;     }       // Find the next Task in the iterator; handle cancellation and other exceptions.     Boolean haveMore;     try {         haveMore = taskEnumerator.MoveNext();       } catch (OperationCanceledException cancExc) {         completionSource.SetCanceled();         return;     } catch (Exception exc) {         completionSource.SetException(exc);         return;     }       if (!haveMore) {         // No more tasks; set the result (if any) from the last completed task (if any).         // We know it's not Canceled or Faulted because we checked at the start of this method.         if (typeof(TResult) == typeof(VoidResult)) {        // No result             completionSource.SetResult(default(TResult));           } else if (!(completedTask is Task<TResult>)) {     // Wrong result             completionSource.SetException(new InvalidOperationException(                 "Asynchronous iterator " + taskEnumerator +                     " requires a final result task of type " + typeof(Task<TResult>).FullName +                     (completedTask == null ? ", but none was provided." :                         "; the actual task type was " + completedTask.GetType().FullName)));           } else {             completionSource.SetResult(((Task<TResult>) completedTask).Result);         }       } else {         // When the nested task completes, continue by performing this function again.         taskEnumerator.Current.ContinueWith(             nextTask => ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, nextTask),             taskScheduler);     } }

瞧! 現(xiàn)在你會在Visual Studio  2010中用沒有asyncawait的 C#4 (或  VB10)寫SLAMs(看起來同步的方法,但異步執(zhí)行)。

關(guān)于C# 中使用迭代器等待任務(wù)的操作方法就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI