程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> 實踐基於Task的異步模式,實踐Task異步模式

實踐基於Task的異步模式,實踐Task異步模式

編輯:C#入門知識

實踐基於Task的異步模式,實踐Task異步模式


Await

返回該系列目錄《基於Task的異步模式--全面介紹》


 

     在API級別,實現沒有阻塞的等待的方法是提供callback(回調函數)。對於Tasks來說,這是通過像ContinueWith的方法實現的。基於語言的異步支持通過允許在正常控制流內部等待異步操作隱藏callbacks,具有和編譯器生成的代碼相同的API級別的支持。

    在.Net 4.5,C#直接異步地支持等待的Task和Task<TResult>,在C#中使用"await"關鍵字。如果等待一個Task,那麼await表達式是void類型。如果等待一個Task<TResult>,那麼await表達式是TResult類型。await表達式必須出現在返回類型是void,Task,Task<TResult>的異步方法體內。

   在幕後,await功能通過在該Task上的一個continuation(後續操作)安裝了一個callback(回調函數)。此回調將恢復暫停的異步方法。當該異步方法恢復時,如果等待的異步操作成功地完成,並且它是一個Task<TResult>,那它就返回一個TResult。如果等待的Task或者Task<TResult>以Canceled狀態結束,那麼會引發OperationCanceledException。如果等待的Task或者Task<TResult>以Faulted狀態結束,那麼會引發造成它失敗的異常。對於一個Task來說,可能由於多個異常造成失敗,在這種情況下,這些異常只有一個會傳遞。然而,該Task的Exception屬性會返回一個包含所有錯誤的AggregateException。

如果一個同步上下文和執行在掛起狀態的異步方法相關聯(如SynchronizationContext.Current是非空的),那麼異步方法的恢復會通過使用該下上文的Post方法發生在相同的同步上下文上。否則,它會依賴當前在掛起點的System.Threading.Tasks.TaskScheduler是什麼(針對.Net線程池,一般這個默認是TaskScheduler.Default)。它取決於該TaskScheduler是否允許恢復在等待異步操作完成的地方或者強制恢復被調度的地方執行。默認的調度一般會允許後續操作在等待的操作完成的線程上執行。

當調用的時候,異步方法同步地執行方法體,直到遇到在還沒有完成的等待的實例上的第一個await表達式,此時控制權返回給調用者。如果該異步方法沒有返回void,那麼就會返回Task或Task<TResult>表示正在進行的計算。在一個非void的異步方法中,如果遇到了return語句或者到達了方法體的末尾,那麼該task會以RanToCompletion的最終狀態完成。如果一個未處理的異常造成控制權離開了異步方法體,該任務會以Faulted狀態結束(如果異常是OperationCanceledException,任務會以Canceled狀態結束)。結果或異常最終以這種方式發布。

Yield 和 ConfigureAwait

一些成員對異步方法的執行提供了更多的控制。Task類提供了一個Yield方法,可以使用它把一個屈服點(yield point)引入異步方法。

public class Task : …

{

public static YieldAwaitable Yield();

}

這個等價於異步地推送(post)或調度回到當前的上下文。

Task.Run(async delegate

{

for(int i=0; i<1000000; i++)

{

await Task.Yield(); // fork the continuation into a separate work item

...

}
});

Task類也提供了ConfigureAwait方法更多的控制在異步方法中如何發生掛起和恢復。正如之前提到的,默認異步方法掛起時,當前上下文被捕獲,用捕獲的上下文在恢復時調用異步方法的後續操作。在許多案例中,這是你想要的確切的行為。然而,在某些情況下你不關心你在哪裡結束,結果你可以通過避免這些返回到原始上下文的posts來實現更好的性能。為了開啟這個,可以使用ConfigureAwait通知await操作不捕獲和恢復上下文,而是更傾向於,無論異步操作在哪兒等待完成,都繼續執行:

await someTask.ConfigureAwait(continueOnCapturedContext:false);

Cancellation(撤銷)

 支持可撤銷的TAP方法都至少公開了接受一個CancellationToken的重載,該類型在是在.Net 4的System.Threading中引入的。

CancellationToken是通過CancellationTokenSource創建的。當CancellationTokenSource的Cancel方法調用時,它的Token屬性會返回接收到信號的CancellationToken。比如,思考一下,下載一個單獨的web頁面,然後想要取消該操作。我們創建一個CancellationTokenSource,再把它的token傳給TAP方法,以後會可能調用它的Cancel方法:

var cts = new CancellationTokenSource();

string result = await DownloadStringAsync(url, cts.Token);

… // at some point later, potentially on another thread

cts.Cancel();

為了取消多個異步的調用,可以將相同的token傳給多有的調用:

var cts = new CancellationTokenSource();

IList<string> results = await Task.WhenAll(

from url in urls select DownloadStringAsync(url, cts.Token));

cts.Cancel();

類似地,相同的token也可以有選擇性地傳給異步操作的子集:

var cts = new CancellationTokenSource();

byte [] data = await DownloadDataAsync(url, cts.Token);

await SaveToDiskAsync(outputPath, data, CancellationToken.None);

cts.Cancel();

來自任何線程的Cancellation請求都可以被初始化。

為了表明cancellation請求永遠不會發出,CancellationToken.None可以傳給任何接受一個CancellationToken的方法。被調用者會發現cancellationToken的CanBeCanceled屬性會返回false,因此它起到了優化。

相同的CancellationToken可以分發給任何數量的異步和同步操作。這是CancellationToken 方法強項之一:cancellation可能使同步方法的調用請求的,並且相同的cancellation請求可能激增到任何數量的監聽器。另一個好處是異步API的開發者可以完全地控制cancellation是否可能被請求以及cancellation何時生效,還有該API的消費者可以選擇性地決定多個異步調用的cancellation請求中哪一個會被傳播。

Progress

 一些異步方法通過把progress(進度)接口傳給該異步方法來公開進度。比如,思考一下異步下載一個文本字符串的函數,然後會引發包含至今已完成下載的百分比的進度的更新。下面的WPF應用的一個例子用到了這樣一個方法:

private async void btnDownload_Click(object sender, RoutedEventArgs e)

{

btnDownload.IsEnabled = false;

try

{

txtResult.Text = await DownloadStringAsync(txtUrl.Text,

new Progress<int>(p => pbDownloadProgress.Value = p));

}

finally { btnDownload.IsEnabled = true; }

}

 

使用內置的基於Task的連接器

 System.Threading.Tasks命名空間包含了幾種處理和組合tasks的主要方法。

Task.Run

Task類公開了幾種Run方法,它們可以輕易地作為線程池的Task或Task<TResult>進行卸載工作,如:

public async void button1_Click(object sender, EventArgs e)

{

textBox1.Text = await Task.Run(() =>

{

// … 這裡處理一些計算受限的任務

return answer;

});

}

其中一些Run方法是自從.Net 4 就存在的TaskFactory.StartNew方法的簡寫。然而,其他的重載(如Run<TResult>(Func<Task<TResult>>))可以在卸載工作中使用await關鍵字,如:

public async void button1_Click(object sender, EventArgs e)

{

pictureBox1.Image = await Task.Run(() =>

{

using(Bitmap bmp1 = await DownloadFirstImageAsync())

using(Bitmap bmp2 = await DownloadSecondImageAsync())

return Mashup(bmp1, bmp2);

});

}

Task.FromResult

對於已經是可使用的或只需要從返回task的方法返回的數據提升到Task<TResult>的場景,可以使用Task.FromResult方法:

public Task<int>GetValueAsync(string key)

{

int cachedValue;

return TryGetCachedValue(out cachedValue) ?

Task.FromResult(cachedValue) :

GetValueAsyncInternal();

}

private async Task<int> GetValueAsyncInternal(string key)

{

}

Task.WhenAll

WhenAll方法用於異步等待多個代表Tasks的異步操作。為了適應一系列的非泛型的tasks或者一系列不均勻的泛型tasks(例如,異步等待多個返回void的操作,或者異步等待多個返回值類型的方法,每個值可以是不同的類型)以及一系列均勻 的泛型tasks(如,異步等待多個返回TResult的方法)。

思考給多個顧客發送郵件的需求。我們可以重疊所有的郵件發送(在發送下一封郵件之前不需要等待前一封郵件已經完成發送),然後我們需要知道發送何時完成和是否有錯誤發生。

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);

await Task.WhenAll(asyncOps);

上面的代碼沒有顯示地處理可能發生的異常,反而選擇讓異常在WhenAll產生的task上的await傳播出來。為了處理這些異常,開發者可以使用像下面這樣的代碼:

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);

try

{

await Task.WhenAll(asyncOps);

}

catch(Exception exc)

{

...

}

思考另一個從web上異步地下載多個文件的例子。在這種情況下,所有的異步操作有同類的結果類型,並且這些結果的訪問很簡單:

string [] pages = await Task.WhenAll(

from url in urls select DownloadStringAsync(url));

作為之前返回void的案例,這裡可以使用相同的異步處理技巧:

Task [] asyncOps =

(from url in urls select DownloadStringAsync(url)).ToArray();

try

{

string [] pages = await Task.WhenAll(asyncOps);

...

}

catch(Exception exc)

{

foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted))

{

… // work with faulted and faulted.Exception

}

}

Task.WhenAny

WhenAny API異步地等待表示多個異步的操作,並且只是異步地等待它們中的一個完成。WhenAny主要有四種用法:

冗余

思考一下,我們是否買一只股票的決定的情況。我們有多個我們信賴的股票推薦Web服務,但是基於每日負荷,每一種服務可能終將在不同的時間變得相當緩慢。我們可以發揮WhenAny的優勢來知曉任意一個操作何時完成:

var recommendations = new List<Task<bool>>()

{

GetBuyRecommendation1Async(symbol),

GetBuyRecommendation2Async(symbol),

GetBuyRecommendation3Async(symbol)

};

Task<bool> recommendation = await Task.WhenAny(recommendations);

if (await recommendation) BuyStock(symbol);

不像WhenAll在所有的tasks成功完成的情況下返回它們為包裝的結果集,WhenAny返回完成的Task:如果一個task失敗了,知道哪一個task失敗很重要;如果一個task成功了,知道返回的值和哪一個task相關很重要。因為這個原因,我們需要訪問返回的task的Result屬性,或者進一步等待它直到它完成。

自從有了WhenAll,我們需要能夠適應異常情況。由於已經接收到了返回的已完成的task,為了讓錯誤傳播,我們可以等待返回的task,然後適當地try/catch,例如:

Task<bool> [] recommendations = …;

while(recommendations.Count > 0)

{

Task<bool> recommendation = await Task.WhenAny(recommendations);

try

{

if (await recommendation) BuyStock(symbol);

break;

}

catch(WebException exc)

{

recommendations.Remove(recommendation);

}

}

除此之外,即使第一個task成功完成了,隨後 的task也可能失敗。此時,我們有多個處理這些異常的選擇。一種用例可能要求直到所有啟動的tasks已經完成才做進一步的向前進展,在這種情況我們可以使用WhenAll。另一種用例要求所有的異常必須要記錄。對於這個,當tasks已經異步完成時,我們可以利用後續操作直接接收一個通知:

foreach(Task recommendation in recommendations)

{

var ignored = recommendation.ContinueWith(

t => { if (t.IsFaulted) Log(t.Exception); });

}

或者

foreach(Task recommendation in recommendations)

{

var ignored = recommendation.ContinueWith(

t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);

}

或者

private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)

{

foreach(var task in tasks)

{

try { await task; }

catch(Exception exc) { Log(exc); }

}
}

LogCompletionIfFailed(recommendations);

最後,開發者可能實際想要取消所有的保留的操作。

var cts = new CancellationTokenSource();

var recommendations = new List<Task<bool>>()

{

GetBuyRecommendation1Async(symbol, cts.Token),

GetBuyRecommendation2Async(symbol, cts.Token),

GetBuyRecommendation3Async(symbol, cts.Token)

};

 

Task<bool> recommendation = await Task.WhenAny(recommendations);

cts.Cancel();

if (await recommendation) BuyStock(symbol);

交叉

思考這樣一個情況,從Web下載圖片,並對每張圖片做一些處理,例如把它加到UI控件上去。我們需要按順序處理(在UI控件的例子中,在UI線程上),但我們想要盡可能地並發下載。並且我們不想所有圖片都下載完畢後再加載到UI上,而是當它們完成時就加載它們:

List<Task<Bitmap>> imageTasks =

(from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();

while(imageTasks.Count > 0)

{

try

{

Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

imageTasks.Remove(imageTask);

 

Bitmap image = await imageTask;

panel.AddImage(image);

}

catch{}

}

那相同的交叉可以應用到涉及下載以及在下載的圖片上的線程池的進行計算密集的處理的場景上,例如:

List<Task<Bitmap>> imageTasks =

(from imageUrl in urls select GetBitmapAsync(imageUrl)

.ContinueWith(t => ConvertImage(t.Result)).ToList();

while(imageTasks.Count > 0)

{

try

{

Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

imageTasks.Remove(imageTask);

 

Bitmap image = await imageTask;

panel.AddImage(image);

}

catch{}

}

調節

思考和交叉例子相同的案例,除了用戶正在下載很多圖片,這些下載需要顯示調節,例如,只有15個下載可能同時發生。為實現這個,異步操作的子集可能會被調用。當操作完成的時候,其他的操作會取而代之被調用。

const int CONCURRENCY_LEVEL = 15;

Uri [] urls = …;

int nextIndex = 0;

var imageTasks = new List<Task<Bitmap>>();

while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)

{

imageTasks.Add(GetBitmapAsync(urls[nextIndex]));

nextIndex++;

}

 

while(imageTasks.Count > 0)

{

try

{

Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

imageTasks.Remove(imageTask);

 

Bitmap image = await imageTask;

panel.AddImage(image);

}

catch(Exception exc) { Log(exc); }

 

if (nextIndex < urls.Length)

{

imageTasks.Add(GetBitmapAsync(urls[nextIndex]));

nextIndex++;

}

}

提早應急

思考當異步等待一個操作完成的同時,又要響應一個用戶的撤銷請求(例如,點擊UI上的一個取消按鈕)。

private CancellationTokenSource m_cts;

 

public void btnCancel_Click(object sender, EventArgs e)

{

if (m_cts != null) m_cts.Cancel();

}

 

public async void btnRun_Click(object sender, EventArgs e)

{

m_cts = new CancellationTokenSource();

btnRun.Enabled = false;

try

{

Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);

await UntilCompletionOrCancellation(imageDownload, m_cts.Token);

if (imageDownload.IsCompleted)

{

Bitmap image = await imageDownload;

panel.AddImage(image);

}

else imageDownload.ContinueWith(t => Log(t));

}

finally { btnRun.Enabled = true; }

}

 

private static async Task UntilCompletionOrCancellation(

Task asyncOp, CancellationToken ct)

{

var tcs = new TaskCompletionSource<bool>();

using(ct.Register(() => tcs.TrySetResult(true)))

await Task.WhenAny(asyncOp, tcs.Task);

return asyncOp;

}

我們一決定拯救而不取消隱含的異步操作,這個實現就會再次開啟UI。當我們決定拯救的時候,另一個選擇會取消掛起的操作,然而直到操作最終完成才會重建UI,這可能是因為早期由於取消請求結束造成的:

private CancellationTokenSource m_cts;

 

public async void btnRun_Click(object sender, EventArgs e)

{

m_cts = new CancellationTokenSource();

 

btnRun.Enabled = false;

try

{

Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);

await UntilCompletionOrCancellation(imageDownload, m_cts.Token);

Bitmap image = await imageDownload;

panel.AddImage(image);

}

catch(OperationCanceledException) {}

finally { btnRun.Enabled = true; }

}

使用 WhenAny 為早期提供緊急援助的另一個例子涉及到Task.WhenAny 與 Task.Delay 的一起使用。

Task.Delay

之前演示過,可以通過調用Task.Delay把中斷引入一個異步方法的執行當中。這對於各種各樣的功能是有用的,包括構建輪詢,對於一個預定時間內用戶輸入處理的延遲等諸如此類。在聯合Task.WhenAny對await實現超時也是有用的。

如果一個task是一個更大的異步操作(如,一個ASP.Net Web服務)的一部分,花費太長時間完成了,那麼整體的操作就會變差,尤其是如果該操作曾經沒有完成的話。為此,能夠在一個異步操作上超時等待很重要。同步的Task.Wait, WaitAll, 和WaitAny方法接收超時值,但相應的 ContinueWhenAll/Any和上述的 WhenAll/WhenAny APIs不這樣做。相反,Task.Delay和Task.WhenAny可以聯合使用來實現超時。

思考一個UI應用,該應用想下載一張圖片,並且當圖片在下載的過程中時,UI不可用。然而,如果下載需要花很長時間,那麼該UI應該可用並且下載的操作應該放棄。

public async void btnDownload_Click(object sender, EventArgs e)

{

btnDownload.Enabled = false;

try

{

Task<Bitmap> download = GetBitmapAsync(url);

if (download == await Task.WhenAny(download, Task.Delay(3000)))

{

Bitmap bmp = await download;

pictureBox.Image = bmp;

status.Text = "Downloaded";

}

else

{

pictureBox.Image = null;

status.Text = "Timed out";

var ignored = download.ContinueWith(

t => Trace("Task finally completed"));

}

}

finally { btnDownload.Enabled = true; }

}

既然WhenAll返回一個task,那麼一樣可以用到多個下載上:

public async void btnDownload_Click(object sender, RoutedEventArgs e)

{

btnDownload.Enabled = false;

try

{

Task<Bitmap[]> downloads =

Task.WhenAll(from url in urls select GetBitmapAsync(url));

if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))

{

foreach(var bmp in downloads) panel.AddImage(bmp);

status.Text = "Downloaded";

}

else

{

status.Text = "Timed out";

downloads.ContinueWith(t => Log(t));

}

}

finally { btnDownload.Enabled = true; }

}

 

構建基於Task的連接器

由於一個task完全有能力表示一個異步操作,並提供同步和異步連接,檢索該操作等的能力,因而構建有用的組合創建更大模式的tasks的"連接器"庫成為可能。該文章先前提到過,.Net Framework包括了多個內置的連接器,然而,也可能和期待開發者創建他們自己的。這裡我們提供幾個可能會用到的連接器方法和類型的例子。

RetryOnFault(錯誤重試)

在很多場合,如果一個之前的嘗試操作失敗了,很渴望重試一下該操作。對於同步代碼來說,我們可以構建一個幫助方法來完成這個:

public static T RetryOnFault<T>(

Func<T> function, int maxTries)

{

for(int i=0; i<maxTries; i++)

{

try { return function(); }

catch { if (i == maxTries-1) throw; }

}

return default(T);

}

我們可以構建一個幾乎完全一樣的幫助方法,但是針對使用TAP實現的異步操作的,因而返回tasks:

public static async Task<T> RetryOnFault<T>(

Func<Task<T>> function, int maxTries)

{

for(int i=0; i<maxTries; i++)

{

try { return await function().ConfigureAwait(false); }

catch { if (i == maxTries-1) throw; }

}

return default(T);

}

有了自己的函數,我們現在可以利用此連接器將重試編碼到應用邏輯中,如:

// Download the URL, trying up to three times in case of failure

string pageContents = await RetryOnFault(

() => DownloadStringAsync(url), 3);

RetryOnFault函數可以進一步擴展,例如,為了決定何時重試更好,可以接受在重試操作之間的另一個Func<Task>:

public static async Task<T> RetryOnFault<T>(

Func<Task<T>> function, int maxTries, Func<Task> retryWhen)

{

for(int i=0; i<maxTries; i++)

{

try { return await function(); }

catch { if (i == maxTries-1) throw; }

await retryWhen().ConfigureAwait(false);

}

return default(T);

}

此後可以使用像下面的代碼在重試之前再等待一秒:

// Download the URL, trying up to three times in case of failure,

// and delaying for a second between retries

string pageContents = await RetryOnFault(

() => DownloadStringAsync(url), 3, () => Task.Delay(1000));

 

NeedOnlyOne(只需要一個)

有時發揮冗余的優勢可以提高操作的延遲和成功的機會。思考一下,有多個提供股票報價的Web服務,但在當天的不同時間,每一個服務可能提供不同級別的數量和響應時間。為處理這些情況,我們可以向所有Web服務發送請求,只要獲得了任何響應就取消其他請求。我們可以實現一個函數來簡化這個啟動多個操作,等待任意一個,然後取消其余請求的通用模式:

public static async Task<T> NeedOnlyOne(

params Func<CancellationToken,Task<T>> [] functions)

{

var cts = new CancellationTokenSource();

var tasks = (from function in functions

select function(cts.Token)).ToArray();

var completed = await Task.WhenAny(tasks).ConfigureAwait(false);

cts.Cancel();

foreach(var task in tasks)

{

var ignored = task.ContinueWith(

t => Log(t), TaskContinuationOptions.OnlyOnFaulted);

}

return completed;

}

然後可以使用該函數來實現我們的例子:

double currentPrice = await NeedOnlyOne(

ct => GetCurrentPriceFromServer1Async("msft", ct),

ct => GetCurrentPriceFromServer2Async("msft", ct),

ct => GetCurrentPriceFromServer3Async("msft", ct));

Interleaved(交錯)

當使用非常大的tasks集合時,使用Task.WhenAnyl來支持一個該交錯的場景會有一個潛在的性能問題。WhenAny的每次調用會導致每一個task注冊一個後續操作,對於N個tasks的調用會在交錯的操作的生命周期內產生O(N2)數量級的後續操作。為了解決這個問題,一種方法是使用專注於目標的組合:

static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)

{

var inputTasks = tasks.ToList();

var sources = (from _ in Enumerable.Range(0, inputTasks.Count)

select new TaskCompletionSource<T>()).ToList();

int nextTaskIndex = -1;

foreach (var inputTask in inputTasks)

{

inputTask.ContinueWith(completed =>

{

var source = sources[Interlocked.Increment(ref nextTaskIndex)];

if (completed.IsFaulted)

source.TrySetException(completed.Exception.InnerExceptions);

else if (completed.IsCanceled)

source.TrySetCanceled();

else

source.TrySetResult(completed.Result);

}, CancellationToken.None,

TaskContinuationOptions.ExecuteSynchronously,

TaskScheduler.Default);

}

return from source in sources

select source.Task;

}

當tasks完成之後,可以使用這個可以處理tasks的結果,如:

IEnumerable<Task<int>> tasks = ...;

foreach(var task in tasks)

{

int result = await task;

}

WhenAllOrFirstException

在確定的分散/集中場合,可能會想要等待所有的tasks,除非它們中有錯誤,在這種情況下,只要一出現異常,你就想要停止等待。我們也可以使用連接器方法來完成,例如:

public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{

    var inputs = tasks.ToList();

    var ce = new CountdownEvent(inputs.Count);

    var tcs = new TaskCompletionSource<T[]>();


    Action<Task> onCompleted = (Task completed) =>
    {
        if (completed.IsFaulted) 
            tcs.TrySetException(completed.Exception.InnerExceptions);
        if (ce.Signal() && !tcs.Task.IsCompleted)
            tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());
    };

    foreach (var t in inputs) t.ContinueWith(onCompleted);

    return tcs.Task;
}

構建基於Task的數據結構

除了構建自定義的基於task的連接器的能力之外,在Task和代表異步操作結果以及連接必要的同步的Task<TResult>中使用數據結構可以使它成為非常強大的類型,在此類型上構建的自定義的數據結構可以用在異步情景中。

AsyncCache(異步緩存)

Task一個重要的方面是它可以提供給多個消費者,所有的消費者可以等待它,用它注冊後續操作,獲得結果(Task<TResult>的場合)或異常等等。這使得Task和Task<TResult>完美地集成到了異步緩存基礎設施中。這兒是一個小而有力的構建在Task<TResult>之上的異步緩存:

public class AsyncCache<TKey, TValue>

{

private readonly Func<TKey, Task<TValue>> _valueFactory;

private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;

 

public AsyncCache(Func<TKey, Task<TValue>> valueFactory)

{

if (valueFactory == null) throw new ArgumentNullException("loader");

_valueFactory = valueFactory;

_map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();

}

 

public Task<TValue> this[TKey key]

{

get

{

if (key == null) throw new ArgumentNullException("key");

return _map.GetOrAdd(key, toAdd =>

new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;

}

}

}

 

AsyncCache<TKey,TValue>類的構造函數接受一個TKey作為參數,返回Task<TValue>的方法委托。之前從這個cache訪問到的任何值都存儲在內部的字典中,同時AsyncCache確保每個key只生成一個task,即使並發訪問cache。

使用這個,我們可以構建一個下載web頁面的cache,如:

private AsyncCache<string,string> m_webPages =

new AsyncCache<string,string>(DownloadStringAsync);

現在,無論何時我們需要web頁面的內容,都可以在異步方法中使用這個,並且AsyncCache會確保我們盡可能同時下載多個頁面,緩存該結果。

private async void btnDownload_Click(object sender, RoutedEventArgs e)

{

btnDownload.IsEnabled = false;

try

{

txtContents.Text = await m_webPages["http://www.microsoft.com"];

}

finally { btnDownload.IsEnabled = true; }

}

AsyncProducerConsumerCollection

Tasks也用來構建數據結構來協調多個異步活動。思考一個典型的平行設計模式:生產者/消費者,生產者生成消費者消費的數據,並且生產者和消費者可能並發運行(例如,消費者處理生產者之前生產的item1時,生產者在生產item2).對於生產者/消費者,我們始終需要一些數據結構存儲生產者創建的任務,為的是可以告知消費者新數據且當它可用時發現它。

這裡有一個簡單的構建於tasks之上的數據結構的例子,它使異步方法用作生產者和消費者:

public class AsyncProducerConsumerCollection<T>

{

private readonly Queue<T> m_collection = new Queue<T>();

private readonly Queue<TaskCompletionSource<T>> m_waiting =

new Queue<TaskCompletionSource<T>>();

 

public void Add(T item)

{

TaskCompletionSource<T> tcs = null;

lock (m_collection)

{

if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();

else m_collection.Enqueue(item);

}

if (tcs != null) tcs.TrySetResult(item);

}

 

public Task<T> Take()

{

lock (m_collection)

{

if (m_collection.Count > 0)

{

return Task.FromResult(m_collection.Dequeue());

}

else

{

var tcs = new TaskCompletionSource<T>();

m_waiting.Enqueue(tcs);

return tcs.Task;

}

}

}

}

現在我們可以像下面一樣寫代碼了:

private static AsyncProducerConsumerCollection<int> m_data = …;

private static async Task ConsumerAsync()

{

while(true)

{

int nextItem = await m_data.Take();

ProcessNextItem(nextItem);

}

}

private static void Produce(int data)

{

m_data.Add(data);
}

 

返回該系列目錄《基於Task的異步模式--全面介紹》


 

這篇隨筆就先告一段落,期待我的下一篇系列隨筆。

 

作者:tkb至簡 出處:http://www.cnblogs.com/farb/

QQ:782762625

歡迎各位多多交流!

本文版權歸作者和博客園共有,歡迎轉載。未經作者同意下,必須在文章頁面明顯標出原文鏈接及作者,否則保留追究法律責任的權利。
如果您認為這篇文章還不錯或者有所收獲,可以點擊右下角的【推薦】按鈕,因為你的支持是我繼續寫作,分享的最大動力!

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved