溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

c# 利用Task如何實現(xiàn)一個非阻塞式I/O

發(fā)布時間:2020-11-11 15:00:54 來源:億速云 閱讀:189 作者:Leah 欄目:開發(fā)技術

今天就跟大家聊聊有關c# 利用Task如何實現(xiàn)一個非阻塞式I/O,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

1、異步讀取文件數(shù)據(jù)

public static void TaskFromIOStreamAsync(string fileName)
{
  int chunkSize = 4096;
  byte[] buffer = new byte[chunkSize];

  FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true);

  Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length);
  task.ContinueWith((readTask) =>
  {
    int amountRead = readTask.Result;
    //必須在ContinueWith中釋放文件流 
    fileStream.Dispose();
    Console.WriteLine($"Async(Simple) Read {amountRead} bytes");
  });
}

  上述代碼中,異步讀取數(shù)據(jù)只讀取了一次,完成讀取后就將執(zhí)行權交還主線程了。但在真實場景中,需要從流中讀取多次才能獲得全部的數(shù)據(jù)(如文件數(shù)據(jù)大于給定緩沖區(qū)大小,或處理來自網(wǎng)絡流的數(shù)據(jù)(數(shù)據(jù)還沒全部到達機器))。因此,為了完成異步讀取操作,需要連續(xù)從流中讀取數(shù)據(jù),直到獲取所需全部數(shù)據(jù)。

  上述問題導致需要兩級Task來處理。外層的Task用于全部的讀取工作,供調(diào)用程序使用。內(nèi)層的Task用于每次的讀取操作。

  第一次異步讀取會返回一個Task。如果直接返回調(diào)用Wait或者ContinueWith的地方,會在第一次讀取結束后繼續(xù)向下執(zhí)行。實際上是希望調(diào)用者在完成全部讀取操作后才執(zhí)行。因此,不能把第一個Task發(fā)布會給調(diào)用者,需要一個“偽Task”在完成全部讀取操作后再返回。

  上述問題需要使用到TaskCompletionSource<T>類解決,該類可以生成一個用于返回的“偽Task”。當異步讀取操作全部完成后,調(diào)用其對象的TrySetResult,讓Wait或ContinueWith的調(diào)用者繼續(xù)執(zhí)行。

public static Task<long> AsynchronousRead(string fileName)
{
  int chunkSize = 4096;
  byte[] buffer = new byte[chunkSize];
  //創(chuàng)建一個返回的偽Task對象
  TaskCompletionSource<long> tcs = new TaskCompletionSource<long>();

  MemoryStream fileContents = new MemoryStream();//用于保存讀取的內(nèi)容
  FileStream fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, true);
  fileContents.Capacity += chunkSize;//指定緩沖區(qū)大小。好像Capacity會自動增長,設置與否沒關系,后續(xù)寫入多少數(shù)據(jù),就增長多少

  Task<int> task = fileStream.ReadAsync(buffer, 0, buffer.Length);
  task.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs));
  //在ContinueWith中循環(huán)讀取,讀取完成后,再返回tcs的Task
  return tcs.Task;
}

/// <summary>
/// 繼續(xù)讀取數(shù)據(jù)
/// </summary>
/// <param name="task">讀取數(shù)據(jù)的線程</param>
/// <param name="fileStream">文件流</param>
/// <param name="fileContents">文件存放位置</param>
/// <param name="buffer">讀取數(shù)據(jù)緩存</param>
/// <param name="tcs">偽Task對象</param>
private static void ContinueRead(Task<int> task, FileStream fileStream, MemoryStream fileContents, byte[] buffer, TaskCompletionSource<long> tcs)
{
  if (task.IsCompleted)
  {
    int bytesRead = task.Result;
    fileContents.Write(buffer, 0, bytesRead);//寫入內(nèi)存區(qū)域。似乎Capacity會自動增長
    if (bytesRead > 0)
    {
      //雖然看似是一個新的任務,但是使用了ContinueWith,所以使用的是同一個線程。
      //沒有讀取完,開啟另一個異步繼續(xù)讀取
      Task<int> newTask = fileStream.ReadAsync(buffer, 0, buffer.Length);
      //此處做了一個循環(huán)
      newTask.ContinueWith(readTask => ContinueRead(readTask, fileStream, fileContents, buffer, tcs));
    }
    else
    {
      //已經(jīng)全部讀取完,所以需要返回數(shù)據(jù)
      tcs.TrySetResult(fileContents.Length);
      fileStream.Dispose();
      fileContents.Dispose();//應該是在使用了數(shù)據(jù)之后才釋放數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)
    }
  }
}

2、適應Task的異步編程模式

  .NET Framework中的舊版異步方法都帶有“Begin-”和“End-”前綴。這些方法仍然有效,為了接口的一致性,它們可以被封裝到Task中。

  FromAsyn方法把流的BeginRead和EndRead方法作為參數(shù),再加上存放數(shù)據(jù)的緩沖區(qū)。BeginRead和EndRead方法會執(zhí)行,并在EndRead完成后調(diào)用Continuation Task,把控制權交回主代碼。上述例子會關閉流并返回轉(zhuǎn)換的數(shù)據(jù)

const int ReadSize = 256;

/// <summary>
/// 從文件中獲取字符串
/// </summary>
/// <param name="path">文件路徑</param>
/// <returns>字符串</returns>
public static Task<string> GetStringFromFile(string path)
{
  FileInfo file = new FileInfo(path);
  byte[] buffer = new byte[1024];//存放數(shù)據(jù)的緩沖區(qū)

  FileStream fileStream = new FileStream(
    path, FileMode.Open, FileAccess.Read, FileShare.None, buffer.Length,
    FileOptions.DeleteOnClose | FileOptions.Asynchronous);

  Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead,
    buffer, 0, ReadSize, null);//此參數(shù)為BeginRead需要的參數(shù)

  TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();

  task.ContinueWith(taskRead => OnReadBuffer(taskRead, fileStream, buffer, 0, tcs));

  return tcs.Task;
}

/// <summary>
/// 讀取數(shù)據(jù)
/// </summary>
/// <param name="taskRead">讀取任務</param>
/// <param name="fileStream">文件流</param>
/// <param name="buffer">讀取數(shù)據(jù)存放位置</param>
/// <param name="offset">讀取偏移量</param>
/// <param name="tcs">偽Task</param>
private static void OnReadBuffer(Task<int> taskRead, FileStream fileStream, byte[] buffer, int offset, TaskCompletionSource<string> tcs)
{
  int readLength = taskRead.Result;
  if (readLength > 0)
  {
    int newOffset = offset + readLength;
    Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead,
      buffer, newOffset, Math.Min(buffer.Length - newOffset, ReadSize), null);

    task.ContinueWith(callBackTask => OnReadBuffer(callBackTask, fileStream, buffer, newOffset, tcs));
  }
  else
  {
    tcs.TrySetResult(System.Text.Encoding.UTF8.GetString(buffer, 0, buffer.Length));
    fileStream.Dispose();
  }
}

3、使用async 和 await方式讀取數(shù)據(jù)

  下面的示例中,使用了async和await關鍵字實現(xiàn)異步讀取一個文件的同時進行壓縮并寫入另一個文件。所有位于await關鍵字之前的操作都運行于調(diào)用者線程,從await開始的操作都是在Continuation Task中運行。但有無法使用這兩個關鍵字的場合:①Task的結束時機不明確時;②必須用到多級Task和TaskCompletionSource時

/// <summary>
/// 同步方法的壓縮
/// </summary>
/// <param name="lstFiles">文件清單</param>
public static void SyncCompress(IEnumerable<string> lstFiles)
{
  byte[] buffer = new byte[16384];
  foreach(string file in lstFiles)
  {
    using (FileStream inputStream = File.OpenRead(file))
    {
      using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
      {
        using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress))
        {
          int read = 0;
          while((read=inputStream.Read(buffer,0,buffer.Length))>0)
          {
            compressStream.Write(buffer, 0,read);
          }
        }
      }
    }
  }
}

/// <summary>
/// 異步方法的文件壓縮
/// </summary>
/// <param name="lstFiles">需要壓縮的文件</param>
/// <returns></returns>
public static async Task AsyncCompress(IEnumerable<string> lstFiles)
{
  byte[] buffer = new byte[16384];
  foreach(string file in lstFiles)
  {
    using (FileStream inputStream = File.OpenRead(file))
    {
      using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
      {
        using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream, System.IO.Compression.CompressionMode.Compress))
        {
          int read = 0;
          while ((read = await inputStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
          {
            await compressStream.WriteAsync(buffer, 0, read);
          }
        }
      }
    }
  }
}

看完上述內(nèi)容,你們對c# 利用Task如何實現(xiàn)一個非阻塞式I/O有進一步的了解嗎?如果還想了解更多知識或者相關內(nèi)容,請關注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI