程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> .NET相關問題:使用WebClient執行異步I/O

.NET相關問題:使用WebClient執行異步I/O

編輯:關於.NET

問:我非常喜歡 System.IO.File 類上用於讀取和寫入數據的靜態幫助程序方法:ReadAllText、 ReadAllBytes、WriteAllText、WriteAllBytes 等等。但是,這些方法都是同步的,我希望能夠異步使用 它們,讓它們也異步使用 I/O。我知道 System.IO.FileStream 類支持異步 I/O;那 System.IO.File 呢 ?

答:System.IO.File 的方法僅支持同步操作。但是用於實施您所述的異步方法的功能肯定是存 在的。在本專欄中,我將詳細介紹用於實現此目標的兩種方法,首先是其中一個更復雜但更有效的方法。

首先,我定義實際想要實現的 API。您引用的原始簽名如下所示:

public static byte 

[] ReadAllBytes(string path);
public static string ReadAllText(string path);
public static void WriteAllBytes(string path, byte [] bytes);
public static void WriteAllText(string path, string contents);

我想要它們的異步版本 ,並將它們定義為:

public static void ReadAllBytesAsync(
  string path, Action<byte[]> success, Action<Exception> failure);
public static void ReadAllTextAsync(
  string path, Action<string> success, Action<Exception> failure);
public static void WriteAllBytesAsync(
  string path, byte[] bytes,
  Action success, Action<Exception> failure);
public static void WriteAllTextAsync(
  string path, string contents,
  Action success, Action<Exception> failure);

這些簽名與原件非常類似。Read* 方法不是同步返回數據,而是接受兩個委托,一個用於成功執行一個用於異常執行,其中前者傳遞讀取數 據,而後者傳遞所有產生的異常。Write* 方法也接受兩個委托,但 success 委托沒有參數,因為並無預 期的輸出(原始 Write* 方法返回 void)。

更為復雜的方法涉及通過 System.IO.Stream 類 (FileStream 即從它派生而來)公開的異步編程模型 (APM) 模式直接實現它們。在《MSDN® 雜志》 2008 年 3 月刊我的專欄 (msdn.microsoft.com/magazine/cc337900) 中,我展示了實現方法 CopyStreamToStream(此方法將使用每個流的 APM 實現流之間的異步復制);圖 1 顯示了該實現以供參 考,並且我將在此重復使用它(此實現與您上次看到的實現相比存在少許簡化和更改。其中包括不再使用 System.ComponentModel.AsyncOperationManager 和 AsyncOperation,在本專欄中實現的更高級別 API 中使用它們。稍後我將介紹原因)。

圖 1 異步流復制

public static void 

CopyStreamToStream(
  Stream source, Stream destination, Action<Exception> completed)
{
  byte[] buffer = new byte[0x1000];
  if (completed == null) completed = delegate {};
  AsyncCallback rc = null;
  rc = readResult =>
  {
    try
    {
      int read = source.EndRead(readResult);
      if (read > 0)
      {
        destination.BeginWrite(buffer, 0, read, writeResult =>
        {
          try
          {
            destination.EndWrite(writeResult);
            source.BeginRead(
              buffer, 0, buffer.Length, rc, null);
          }
          catch (Exception exc) { completed(exc); }
        }, null);
      }
      else completed(null);
    }
    catch (Exception exc) { completed(exc); }
  };
  source.BeginRead(buffer, 0, buffer.Length, rc, null);
}

該實現代表了實現此類異步方法最困難的方面。在 CopyStreamToStream 之上,我將實現兩個 幫助程序方法:一個用於從文件異步讀取字節,一個用於將字節異步寫入文件。它們會完成大部分工作( 請參見圖 2)。

圖 2 異步幫助程序方法

private static void 

ReadAllBytesAsyncInternal(string path,
  Action<byte[]> success, Action<Exception> failure)
{
  var input = new FileStream(path, FileMode.Open,
    FileAccess.Read, FileShare.Read, 0x1000, true);
  var output = new MemoryStream((int)input.Length);
  CopyStreamToStream(input, output, e =>
  {
    byte [] bytes = e == null ? output.GetBuffer() : null;
    output.Close();
    input.Close();
    if (e != null) failure(e);
    else success(bytes);
  });
}
private static void WriteAllBytesAsyncInternal(
  string path, byte[] bytes,
  Action success, Action<Exception> failure)
{
  var input = new MemoryStream(bytes);
  var output = new FileStream(path, FileMode.Create,
    FileAccess.Write, FileShare.None, 0x1000, true);
  CopyStreamToStream(input, output, e =>
  {
    input.Close();
    output.Close();
    if (e != null) failure(e);
    else success();
  });
}

ReadAllBytesAsyncInternal 創建一個輸入 FileStream 來異步處理底層流,並創建 MemoryStream 以存儲從文件讀取的字節。然後使用 CopyStreamToStream 方法將所有數據從 FileStream 異步復制到 MemoryStream。操作完成後關閉流。如果拋出異常,調用 failure 委托;否則會在從文件向 MemoryStream 讀入數據後調用 success 委托。

WriteAllBytesAsyncInternal 非常類似。它創建一個輸入 MemoryStream 來封裝提供的字節數組,並 創建一個輸出 FileStream 來支持異步 I/O。與 ReadAllBytesAsyncInternal 一樣,一旦完成即關閉流 ,如果發生異常則調用 failure 委托。

要實現之前所示的每個公開簽名,現在只需在圖 1 和 2 中創建的方法之上再添加幾行代碼即可。如圖 3 所示。

圖 3 實現 Public 方法

public 

static void ReadAllBytesAsync(
  string path, Action<byte[]> success, Action<Exception>
    failure)
{
  AsyncOperation asyncOp =
    AsyncOperationManager.CreateOperation(null);
  ReadAllBytesAsyncInternal(path,
    bytes => asyncOp.Post(delegate { success(bytes); }, null),
    exception => asyncOp.Post(
      delegate { failure(exception); }, null));
}
public static void ReadAllTextAsync(
  string path, Action<string> success, Action<Exception>
    failure)
{
  AsyncOperation asyncOp =
    AsyncOperationManager.CreateOperation(null);
  ReadAllBytesAsyncInternal(path,
    bytes => {
      string text;
      using (var ms = new MemoryStream(bytes))
        text = new StreamReader(ms).ReadToEnd();
      asyncOp.Post(delegate { success(text); }, null);
    },
    exception => asyncOp.Post(
      delegate { failure(exception); }, null));
}
public static void WriteAllBytesAsync(
  string path, byte[] bytes, Action success, Action<Exception>
    failure)
{
  AsyncOperation asyncOp =
    AsyncOperationManager.CreateOperation(null);
  WriteAllBytesAsyncInternal(path, bytes,
    () => asyncOp.Post(delegate {success(); }, null),
    exception => asyncOp.Post(
      delegate { failure(exception); }, null));
}
public static void WriteAllTextAsync(
  string path, string contents, Action success, Action<Exception>
    failure)
{
  AsyncOperation asyncOp =
    AsyncOperationManager.CreateOperation(null);
  ThreadPool.QueueUserWorkItem(delegate
  {
    var bytes = Encoding.UTF8.GetBytes(contents);
    WriteAllBytesAsyncInternal(path, bytes,
      () => asyncOp.Post(delegate {success(); }, null),
      exception => asyncOp.Post(
        delegate { failure(exception); }, null));
  });
}

在大多數情況下,每個這樣的方法都是對圖 2 中所示內部實現的簡單包裝。此外還有一些有 趣的微妙之處。首先,我之前提到過從 CopyStreamToStream 中剝離了 AsyncOperationManager 支持。 AsyncOperation 本身是對 System.Threading.SynchronizationContext 的包裝,它使用從 AsyncOperationManager.CreateOperation 調用中捕獲的底層 SynchronizationContext 來以一種適合於 創建時同步環境的方式回發委托。例如,在 Windows® 窗體應用程序的 UI 線程上, SynchronizationContext.Current 可能會返回支持將委托調用封送回 UI 線程的 WindowsFormsSynchronizationContext。因此,如果在 UI 線程上調用 AsyncOperationManager.CreateOperation,使用生成的 AsyncOperation 的 Post 方法會將提供的委托 封送到 UI 線程並在其中執行。

但是,我想盡可能減少在 UI 線程上執行的工作量。現在來看看 ReadAllTextAsync 方法。完成從文件加載數據的操作後,我希望將讀取字節轉換成字符串(如果有可寫 StringStream 類並能將它的實例傳遞給 CopyStreamToStream 而非傳遞 MemoryStream,會更有成效)。 此轉換會增加 UI 線程的負荷。因此我希望在回送到 UI 之前執行此轉換。但在最初實現的 CopyStreamToStream 中,完成委托是在捕獲的 SynchronizationContext 下運行的,因此在我想要執行 轉換時我已處於 UI 線程之中。我選擇將 AsyncOperation 作業拖入每個這樣的外部方法中;因此我可以 延遲調用 Post,直至我完成實際的計算工作。

另一有趣的實現細節出現在 WriteAllTextAsync 中。WriteAllTextAsync 在啟動異步操作之前不會返回。但在可以調用 WriteAllBytesAsyncInternal 執 行此操作之前,我需要將提供的文本字符串轉換成字節數組。因此,我將工作項排入 ThreadPool,而不 是阻止調用線程。該工作項執行從字符串到字節的轉換,然後啟動內部復制。

結果是它的代碼數量並不算可怕,但也不簡單。大家可能希望這些用於讀取和寫入文件數據的常見異 步模式已存在於 Microsoft® .NET Framework 的某個位置中,尤其是考慮可能在其上添加其他功能 時(如讀取或寫入數據時的進度通知)。實際上,此類功能確實已經存在,但卻在一個不太可能的位置中 :System.Net。

System.Net.WebClient 類極適合實現多種用途。如果給定在 System.Net 命名空 間中的名稱和位置,大多數人都將其視作僅用於與 Web 相關的活動也不足為奇:從 HTTP 服務器下載文 件,將文件上載到 FTP 站點以及類似操作。但 WebClient 是在 WebRequest 和 WebResponse(它們支持 用於創建這些類型具體實現的可插入工廠模型)的基礎上精細抽象而成。使用 HTTP URL 調用 WebRequest.Create 會返回一個 HttpWebRequest 實例,就像通過 FTP URL 調用它會返回一個 FtpWebRequest 實例一樣。WebClient 在內部使用 WebRequest 和 WebResponse 來實現大量有用的功能 。以下列舉了一些更有關聯的方法:

public void DownloadDataAsync(Uri address);
public event DownloadDataCompletedEventHandler DownloadDataCompleted;
public void DownloadStringAsync(Uri address);
public event
  DownloadStringCompletedEventHandler DownloadStringCompleted;
public void UploadDataAsync(Uri address, byte[] data);
public event UploadDataCompletedEventHandler UploadDataCompleted;
public void UploadStringAsync(Uri address, string data);
public event UploadStringCompletedEventHandler UploadStringCompleted;

因此,如果現在 調用 DownloadDataAsync 並向其提供下載數據的 URL,它會異步執行此操作;完成時,它會觸發 DownloadDataCompleted 事件。與此類似,調用 UploadStringAsync 並向其提供一個文本字符串和此字 符串的上載位置,它會異步執行此操作;完成時,它會觸發 UploadStringCompleted 事件。使用便利, 性能出色。

現在,內置到 .NET Framework 中的一個 WebRequest 提供程序是 FileWebRequest( 以及對應的 FileWebResponse),這是其中的妙筆。因此,我可以編寫類似如下的代碼:

WebRequest wr = Webrequest.Create(@"file://C:\test.txt");

此代碼向我提供了一個 WebRequest,它的處理方式與其他屬性類似(唯一的不同是此 WebRequest 的 目標是磁盤上的一個文件而非某個網站上的文件)。將它與 WebClient 的 WebRequest 用法結合起來, 我確信您會明白我的目的:我可以使用 WebClient 來異步讀取和寫入文件!

要實現之前的讀取和 寫入方法,每個方法僅需要幾行代碼,如圖 4 所示(實際上,此時我懷疑甚至是否有必要使用這些包裝 ,因為大多數代碼只是為了使構成的 API 簽名適應實現相同任務的 WebClient 提供的項目)。 WebClient 內部使用 AsyncOperationManager 來確保回調發生在正確的環境中, FileWebRequest/FileWebResponse 將在使用 WebClient 的異步方法時使用異步 I/O(WebClient 還提供 這些方法的異步版本)。

圖 4 使用 WebClient 讀取和寫入文件

public static void 

ReadAllBytesAsync(
  string path, Action<byte[]> success, Action<Exception> failure)
{
  var wc = new WebClient();
  wc.DownloadDataCompleted += (sender, e) =>
  {
    if (e.Error != null) failure(e.Error);
    else success(e.Result);
  };
  wc.DownloadDataAsync(new Uri("file://" + path));
}
public static void ReadAllTextAsync(
  string path, Action<string> success, Action<Exception> failure)
{
  var wc = new WebClient();
  wc.DownloadStringCompleted += (sender, e) =>
  {
    if (e.Error != null) failure(e.Error);
    else success(e.Result);
  };
  wc.DownloadStringAsync(new Uri("file://" + path));
}
public static void WriteAllBytesAsync(
  string path, byte [] bytes,
  Action success, Action<Exception> failure)
{
  var wc = new WebClient();
  wc.UploadDataCompleted += (sender, e) =>
  {
    if (e.Error != null) failure(e.Error);
    else success();
  };
  wc.UploadDataAsync(new Uri("file://" + path), bytes);
}
public static void WriteAllTextAsync(
  string path, string contents,
  Action success, Action<Exception> failure)
{
  var wc = new WebClient();
  wc.UploadStringCompleted += (sender, e) =>
  {
    if (e.Error != null) failure(e.Error);
    else success();
  };
  wc.UploadStringAsync(new Uri("file://" + path), contents);
}

更酷的是 WebClient 在此基礎上提供了更多有用的功能;尤其是它內置支持進度跟蹤。它提 供兩個相關的事件:

public event
    DownloadProgressChangedEventHandler DownloadProgressChanged;
public event
    UploadProgressChangedEventHandler UploadProgressChanged;

當針對正在進行的下載或上載提供進度更新時會分別觸發它們。提供給這些事件處理程序的事件參數 還為進度報告提供了有用的信息。例如,以下為 DownloadProgressChangedEventArgs 類型:

public class DownloadProgressChangedEventArgs :
    ProgressChangedEventArgs
{
    public long BytesReceived { get; }
    public long TotalBytesToReceive { get; }
    /* from the base ProgressChangedEventArgs type
    public int ProgressPercentage { get; }
    public object UserState { get; }
    */
}

它們可使我將異步 I/O 順利集成到 GUI 應用程序中。假想一個 Windows 窗體應用程序,在使用一個 大型文件之前需先將它從磁盤加載到內存中。我可以異步加載它,並且具有更新進度條的進度通知以及完 成通知:

private void button1_Click(object sender, EventArgs e) {
    WebClient wc = new WebClient();
    wc.DownloadDataCompleted += (s, de) => {
        _fileData = de.Result;
        MessageBox.Show("File loaded");
    };
    wc.DownloadProgressChanged += 
        (s, de) => progressBar1.Value = de.ProgressPercentage;
    wc.DownloadDataAsync(new Uri(@"file://c:\largeFile.dat"));
}
private byte [] _fileData;

很遺憾,API 雖然非常靈活,卻仍存在相當大的問題。盡管 FileWebRequest/FileWebResponse 實際 上確實使用的是異步 I/O,但實現還是要依賴工作 ThreadPool 回調通知;有時它會阻礙線程池中的這些 線程等待異步 I/O 完成。(究竟是代碼缺陷還是性能不好還值得商榷。)結果是如果嘗試使用此 WebClient 技術來異步並行讀取或寫入大量文件,您可能會發現 ThreadPool 中線程的數量對性能的影響 非常之大。
在 ThreadPool 中阻止線程通常並不是個好辦法,因為它會迫使 ThreadPool 插入新的線程,這樣不僅會 消耗其他資源,還會降低應用程序的速度,因為 ThreadPool 的插入會導致延遲。總而言之,用於異步讀 取和寫入文件的 WebClient 方法可能非常適合於一次處理少數文件的情形,但在改進 FileWebRequest/FileWebResponse 以修復此問題之前,最好還是使用本專欄前面展示的手工編碼版本。 另一解決方案是提供可正確實現功能的自定義 WebRequest/WebResponse 實現,然後使用 WebClient 注 冊它們。

請將您想向 Stephen 詢問的問題和提出的意見發送至 [email protected]

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved