程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> .NET相關問題: 異步流處理

.NET相關問題: 異步流處理

編輯:關於.NET

問:我已經閱讀了一些絕妙的技巧,可以使用 C# 迭代器輕松地開發異步實現。(Jeffrey Richter 在前一期《MSDN® 雜志》中曾高度贊揚過此類技術。)但是,我曾嘗試找出如何在不進行此類代碼轉 換的情況下完成相似的工作,那真是困難重重。例如,比方說我想僅通過對 Stream 使用異步方法便從一 個流復制到另一個流。我知道如何發出一次異步請求(完成一次讀和一次寫操作),但如何才能夠繼續循 環請求以便處理整個流呢?這可能嗎?

問:我已經閱讀了一些絕妙的技巧,可以使用 C# 迭代器 輕松地開發異步實現。(Jeffrey Richter 在前一期《MSDN® 雜志》中曾高度贊揚過此類技術。)但 是,我曾嘗試找出如何在不進行此類代碼轉換的情況下完成相似的工作,那真是困難重重。例如,比方說 我想僅通過對 Stream 使用異步方法便從一個流復制到另一個流。我知道如何發出一次異步請求(完成一 次讀和一次寫操作),但如何才能夠繼續循環請求以便處理整個流呢?這可能嗎?

答:是的,可 能。我將使用您的示例介紹如何通過 C# 和 Visual Basic® 完成此項工作(請注意,Visual Basic 不提供對迭代器的編譯器支持)。為了設置此階段,圖 1 顯示了我們希望異步實現的同步實現,該實現 可通過以下代碼調用:

答:是的,可能。我將使用您的示例介紹如何通過 C# 和 Visual Basic® 完成此項工作(請注意,Visual Basic 不提供對迭代器的編譯器支持)。為了設置此階段, 圖 1 顯示了我們希望異步實現的同步實現,該實現可通過以下代碼調用:

Figure 1 Synchronous CopyStreamToStream

public static void CopyStreamToStream(
    Stream source, Stream destination,
    Action<Stream, Stream, Exception> completed)
{
  byte[] buffer = new byte[0x1000];
  int read;
  try
  {
    while ((read = source.Read(buffer, 0, buffer.Length)) > 0)
    {
      destination.Write(buffer, 0, read);
    }
    if (completed != null) completed(source, destination, null);
  }
  catch (Exception exc)
  {
    if (completed != null) completed(source, destination, exc);
  }
}
FileStream input = ...;
FileStream output = ...;
CopyStreamToStream(input, output, (src,dst,exc) => {
  src.Close();
  dst.Close();
});

使用同步實現,CopyStreamToStream 在流復制完成和回調執行完畢之前將一直處於阻塞狀 態。您可以采用多種方法輕松地將其實現為異步調用。其中之一是顯式將調用此方法的工作項加入到 ThreadPool 隊列中:

ThreadPool.QueueUserWorkItem(delegate
{
  CopyStreamToStream(input, output, (src,dst,exc) =>
  {
    src.Close();
    dst.Close();
  });
});

另一種方法是使用異步委托調用,這種方法的效果與前一種方法大致相同:

delegate void CopyStreamToStreamHandler(
  Stream source, Stream destination,
  Action<Stream, Stream, Exception> completed);
...
CopyStreamToStreamHandler handler = CopyStreamToStream;
handler.BeginInvoke(source, destination, (src,dst,exc) =>
{
  src.Close();
  dst.Close();
}, iar => handler.EndInvoke(iar), null);

但這兩種方法都不是您想要的異步方式。的確 ,相對於啟動執行的線程而言它們是異步的(即,調用線程不會等待它們完成);但是,在這兩種情況下 ,CopyStreamToStream 方法僅在池線程上運行,並且它發出的所有讀和寫操作相對於其余方法調用均是 同步的。

如果我用這種方式異步啟動 CopyStreamToStream 100 次,則執行這些復制需要 100 個 線程(可能會重復使用)。這不僅會產生大量開銷,而且由於線程池中的可用線程數和線程計數增加率有 限,因此,大量復制可能在其他線程結束之前無法開始執行,從而導致完成任務所需的總時間增加。

您所想要的是 CopyStreamToStream 的真正異步實現,在此情況下,您需要發出 BeginRead 和 BeginWrite 調用以及與 EndRead 和 EndWrite 適當匹配的調用,而不是對 Stream 發出 Read 和 Write 調用。

如您在同步實現中所見,您需要對源 Stream 發出讀請求、提取檢索的數據並將其寫入到 目標 Stream,然後刪除並重復。通過使用異步實現來代替在同步實現中使用的循環方法,我將使用連續 傳遞方式將異步請求之後的所有操作打包,並將其作為要在異步請求完成時執行的閉包傳遞。首先,我發 出一個異步讀請求:

byte[] buffer = new byte[0x1000];
source.BeginRead(buffer, 0, buffer.Length, readResult =>
{
  int read = source.EndRead(readResult);
  ... // process read results in buffer here
}, null);

BeginRead 方法接受與 Read 方法相同的三個參數:存放數據的目標緩沖區、指定數 據存放位置的緩沖區索引,以及要讀取的數據量。BeginRead 方法還采用另外兩個參數:第一個參數代表 在讀請求完成時將調用的委托,第二個參數是希望在回調中可用的任何狀態。在回調中,我向源發出 EndRead 調用,從而為其提供來自 BeginRead 調用的 IAsyncResult,並允許我檢索讀請求的結果。 (EndRead 還會拋出讀取結果可能發生的任何異常。)然後,我希望將此數據寫出到目標 Stream:

int read = source.EndRead(readResult);
if (read > 0)
{
  destination.BeginWrite(buffer, 0, read, writeResult =>
  {
    destination.EndWrite(writeResult);
    ...
  }, null);
}

再次,我對目標 Stream 調用 BeginWrite 而非 Write,以便為其傳遞與 Write 相同的參數 ,以及在寫請求完成時將要執行的委托(還有狀態參數,我在這裡省略了該參數)。到目前為止一切順利 。我已異步發出讀請求,並且發出請求的線程不必等待請求完成。當讀請求完成時,我異步發出寫請求, 讀請求的回調也不必等待寫請求完成。

因此現在,我陷入困境了。在 BeginWrite 回調中,我是 否應該編寫另一個對 BeginRead 的調用,以便從源流中讀取更多信息?假如是這樣,我應該為讀請求傳 遞什麼回調呢?我不能顯式編碼每個讀/寫對;因為可能會有無數個讀/寫對,那樣將會使代碼文件變得相 當長。

相反,一個不錯的方法是在可以通過 AsyncCallback 委托調用的方法中包含對 EndRead 的調用。此方法將作為 BeginRead 的回調。然後,在 BeginWrite 回調中,在結束前一個寫請求之後, 我可以調用該方法發出下一個 BeginRead 請求。首先,我發出對 BeginRead 的單個調用,傳遞我剛才介 紹的回調委托。實現如下所示:

AsyncCallback rc = readResult =>
{
  int read = source.EndRead(readResult);
  if (read > 0)
  {
    destination.BeginWrite(buffer, 0, read, writeResult =>
    {
      destination.EndWrite(writeResult);
      source.BeginRead(buffer, 0, buffer.Length, rc, null);
    }
  }
}
source.BeginRead(buffer, 0, buffer.Length, rc, null);

目前為止一切順利,但當您試圖編 譯這段代碼時,友好的 C# 編譯器將告訴您:

error CS0165: Use of unassigned local 

variable 'rc'

換句話說,我試圖在回調中使用 'rc' 變量,並且我正在為回調初始化 'rc' 變量 ,但 C# 編譯器關心的是此時 'rc' 尚未定義。(有關詳細信息,請參閱 Eric Lippert 博客上 的帖子,網址為 go.microsoft.com/fwlink/?LinkId=109263)。簡單的解決方法是首先聲明 'rc' 變量,並將其初始化為 null。然後將回調作為單獨的語句賦給 'rc':

AsyncCallback rc = null;
rc = readResult =>
{
  ...
};

完整實現如圖 2 所示。您將注意到,該實現中添加了一些附加功能,而不僅僅是 Read/Write 到 BeginRead/BeginWrite 的轉換。首先,我為該實現添加了異常處理程序。與同步實現一 樣,當捕獲到異常時,將會調用出現異常的那個完成的委托,以通知應用程序發生了錯誤並允許對異常進 行適當的處理。其次,我們將利用在 Microsoft® .NET Framework 2.0 中引入的 AsyncOperation 類。此類使用 AsyncOperationManager 來創建 AsyncOperation 的實例,並捕獲當前的 SynchronizationContext,從而允許我在 SynchronizationContext 的上下文中調用操作。(有關詳細信 息,請參閱 msdn.microsoft.com/msdnmag/issues/06/06/NETMatters/#qa7。)

Figure 2 Async CopyStreamToStream in C#

public static void CopyStreamToStream(
  Stream source, Stream destination,
  Action<Stream,Stream,Exception> completed) {
  byte[] buffer = new byte[0x1000];
  AsyncOperation asyncOp = AsyncOperationManager.CreateOperation(null);
  Action<Exception> done = e => {
    if (completed != null) asyncOp.Post(delegate {
      completed(source, destination, e); }, null);
  };
  AsyncCallback rc = null;
  rc = readResult => {
    try {
      int read = source.EndRead(readResult);
      if (read > 0) {
        destination.BeginWrite(buffer, 0, read, writeResult => {
          try {
            destination.EndWrite(writeResult);
            source.BeginRead(
              buffer, 0, buffer.Length, rc, null);
          }
          catch (Exception exc) { done(exc); }
        }, null);
      }
      else done(null);
    }
    catch (Exception exc) { done(exc); }
  };
  source.BeginRead(buffer, 0, buffer.Length, rc, null);
}

例如,如果從 Windows® Forms 應用程序的 GUI 線程中調用 CopyStreamToStream,則捕 獲的 SynchronizationContext 實際是 WindowsFormsSynchronizationContext,並且對產生的 AsyncOperation 調用 Post 將導致提供的委托在 GUI 線程上執行。這將使 CopyStreamToStream 在任何 環境下都能很好地運行。

現在,我將回應在本文開始處所做的承諾:以 C# 和 Visual Basic 完成異步實現。如您所見,雖然 我沒有利用 C# 中的迭代器,但卻利用了其他 C# 功能,如匿名方法。而 Visual Basic 尚不支持匿名方 法。(Visual Basic 2008 支持 lambda 表達式,但僅限於單個表達式。)在 Visual Basic 中完成相同 的核心任務的確可行,但它需要更多工作,如圖 3 所示。此處大部分額外代碼與我在使用 C# 時免費獲 得的手動創建和傳遞閉包的代碼相關聯。

Figure 3 Async CopyStreamToStream in Visual Basic

Public Shared Sub CopyStreamToStream( _
  ByVal source As Stream, ByVal destination As Stream, _
  ByVal completed As Action(Of Stream, Stream, Exception))
  
  Dim buffer(&H1000) As Byte
  Dim data = New CopyData With { _
    .Source = source, .Destination = destination, _
    .Buffer = buffer, .Completed = completed, _
    .AsyncOperation = AsyncOperationManager.CreateOperation(Nothing)}
  source.BeginRead(buffer, 0, buffer.Length, _
    AddressOf ReadCallbackSub, data)
End Sub
Private Shared Sub ReadCallbackSub(ByVal readResult As IAsyncResult)
  Dim copyData = CType(readResult.AsyncState, CopyData)
  Try
    Dim read = copyData.Source.EndRead(readResult)
    If read > 0 Then
      copyData.Destination.BeginWrite( _
        copyData.Buffer, 0, read, _
        AddressOf WriteCallbackSub, copyData)
    Else
      copyData.AsyncOperation.Post( _
        Function() Done(copyData, Nothing), Nothing)
    End If
  Catch ex As Exception
    copyData.AsyncOperation.Post( _
      Function() Done(copyData, ex), Nothing)
  End Try
End Sub
Private Shared Sub WriteCallbackSub(ByVal writeResult As IAsyncResult)
  Dim copyData = CType(writeResult.AsyncState, CopyData)
  Try
    copyData.Destination.EndWrite(writeResult)
    copyData.Source.BeginRead( _
      copyData.Buffer, 0, copyData.Buffer.Length, _
      AddressOf ReadCallbackSub, copyData)
  Catch ex As Exception
    copyData.AsyncOperation.Post( _
      Function() Done(copyData, ex), Nothing)
  End Try
End Sub
Private Shared Function Done( _
    ByVal copyData As CopyData, ByVal ex As Exception) As Boolean
  If copyData.Completed <> Nothing Then
    copyData.Completed(copyData.Source, copyData.Destination, ex)
  End If
  Return True
End Function
Private Class CopyData
  Public Source, Destination As Stream
  Public Buffer() As Byte
  Public AsyncOperation As AsyncOperation
  Public Completed As Action(Of Stream, Stream, Exception)
End Class

無論使用 C# 還是 Visual Basic 實現,總之現在我已經得到了一個能夠將所有數據 從一個流復制到另一個流的真正異步實現。不過,其優勢取決於流自身如何實現異步 BeginRead 和 BeginWrite 方法。基本 Stream 類中這些方法的實現僅使用異步委托調用執行來自 ThreadPool 的 Read 和 Write 請求。因此,盡管每次讀取和寫入都是異步的,但從性能角度而言,整體上這可能比執行同步 讀寫更慢。

派生流可以選擇提供它們自己的 BeginRead 和 BeginWrite 實現,當這樣做時,由於 預期域提供真正的異步讀寫支持,因此通常都可以實現。例如,BeginRead 和 BeginWrite 的 FileStream 實現提供了對 Windows 文件系統中可用異步 I/O 支持的訪問。如果通過向 useAsync 構造 函數參數傳遞 true 來構造 FileStream,並且當前運行的 Windows 版本支持異步 I/O,則 FileStream 的 BeginRead 和 BeginWrite 方法將真正實現異步。盡管數據正在從磁盤讀取或正寫入磁盤,但它不會 阻塞您應用程序中的任何線程,也不會利用您應用程序中的線程來完成這些操作;僅當讀數據可用或寫入 完成時,才會發出相關的回調並使用來自 ThreadPool 的線程。這使得編寫將資源使用降至最低的可擴展 系統成為可能。

如您所見,雖然手動編寫此類代碼能夠創建高性能的應用程序,但它非常單調, 而且容易出錯。希望今後在 C# 和 Visual Basic 這類的主流語言中,能夠更加容易地創建此類構造。

F# 在異步工作流方面已經前進了一大步(請參閱 blogs.msdn.com/dsyme/archive/2007/10/11/introducing-f-asynchronous-workflows.aspx)。Web 上 還流傳著幾個利用 C# 迭代器來減輕編寫異步代碼困難的庫。其中一個示例是“並發和協調運行庫 ”(CCR),您可在 msdn.microsoft.com/msdnmag/issues/06/09/ConcurrentAffairs 中閱讀到相關 信息。

正如您在問題中提到的一樣,Jeffrey Richter 已經創建了更通用的 AsyncEnumerator 實現,他已開 始在他的專欄 msdn.microsoft.com/msdnmag/issues/07/11/ConcurrentAffairs。

請將您想向 Stephen 詢問的問題和提出的意見發送至 [email protected].

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved