程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> Parallel Extensions:使用.NET構建多核應用程序

Parallel Extensions:使用.NET構建多核應用程序

編輯:關於.NET

現代計算機在處理器和可供系統使用的內核數量方面取得了舉世矚目的突破。系統開發人員可以在他們的軟件中以各種形式利用這些強大特性,特別是在處理復雜算法或較大的數據集時。

微軟的並行計算平台 (Parallel Computing Platform, PCP) 所提供的工具支持開發人員以有效、可維護和可伸縮的方式利用這種強大特性。並行擴展在 .NET Framework 工具集中引入了一些重要的概念:任務並行庫 (Task Parallel Library, TPL) 和並行 LINQ (Parallel LINQ, PLINQ) 提供了命令和任務並行機制,允許開發人員采用聲明的方式處理數據並行機制。

目標

在本次動手實驗中,您將學習如何:

通過使用 Parallel 幫助程序類以及自動處理並發表達式,並行化已有算法。

創建並運行能夠在運行過程中取消運行的任務。

使用 Task<T> 類創建和運行可返回值的任務。

使用並行 LINQ (PLINQ) 優化 LINQ 查詢,以便在並行環境中執行。

系統要求

您必須擁有以下內容才能完成本實驗:

Microsoft Visual Studio 2010 Beta 2

.Net Framework 4

安裝

使用 Configuration Wizard 驗證本實驗的所有先決條件。要確保正確配置所有內容,請按照以下步驟進行。

注意:要執行安裝步驟,您需要使用管理員權限在命令行窗口中運行腳本。

1.如果之前沒有執行,運行 Training Kit 的 Configuration Wizard。為此,運行位於 %TrainingKitInstallationFolder%\Labs\IntroToWF\Setup 文件夾下的 CheckDependencies.cmd 腳本。安裝先決條件中沒有安裝的軟件(如有必要請重新掃描),並完成向導。

注意:為了方便,本實驗中管理的許多代碼都可用於 Visual Studio 代碼片段。CheckDependencies.cmd 文件啟動 Visual Studio 安裝程序文件安裝該代碼片段。

練習

本次動手實驗由以下練習組成:

使用靜態 Parallel 類來並行化已有算法。

創建和運行並行化任務。

使用 Task<T> 類創建和運行可返回值的任務。

使用 PLINQ 並行化 LINQ 查詢。

完成本實驗的估計時間:60 分鐘。

下一步:

練習 1:使用靜態 Parallel 幫助程序類並行化已有算法

練習 1:使用靜態 Parallel 幫助程序類並行化已有算法

在本練習中,您將學習如何使用靜態 Parallel 幫助程序類並行化已有算法。這允許我們將 for() 替換為 Parallel.For()。

注意:要驗證每個步驟是否正確執行,建議在每次任務結束時構建解決方案。

任務 1 –並行化持續運行的服務

在此任務中,您將編寫一些簡單的示例代碼,模擬持續運行的服務調用。

您將使用本練習的初始解決方案提供的 PayrollServices.GetPayrollDeduction() 方法。這種持續運行的代碼非常符合您對並行的實際需求。

1.從 Start | All Programs | Microsoft Visual Studio 2010 | Microsoft Visual Studio 2010 打開 Microsoft Visual Studio 2010。

2.打開 %TrainingKitInstallationFolder%\Labs\IntroToParallelExtensions\Source\Ex01-UsingStaticParallelHelper\begin 下的解決方案文件 ParallelExtLab.sln。

注意:此解決方案為您的工作提供了一個起始點,其中的幫助程序類 EmployeeList 包含您需要處理的數據。

3.在 Visual Studio 中,打開 Program 類並導航到它的 Main() 方法。首先,我們需要創建一個員工列表,因此添加一個類變量並在 Main() 中對它執行初始化:

(代碼片段 – 並行擴展庫簡介實驗 - 練習 1 創建員工列表)

C#

class Program
{
private static EmployeeList employeeData;

static void Main(string[] args)
    {
employeeData = new EmployeeList();

Console.WriteLine("Payroll process started at {0}", DateTime.Now);
var sw = Stopwatch.StartNew();

// Methods to call

Console.WriteLine("Payroll finished at {0} and took {1}",
DateTime.Now, sw.Elapsed.TotalSeconds);
Console.WriteLine();
Console.ReadLine();
    }
}

4.現在,將以下方法添加到 Program 類中。該方法將使用一個標准的 for 循環來迭代 Employees 列表(由預建代碼提供)並調用持續運行的 PayrollServices.GetPayrollDeduction() 方法。代碼應如下所示:

(代碼片段 – 並行擴展庫簡介實驗 - 練習 1 Ex1Task1_ParallelizeLongRunningService)

C#

private static void Ex1Task1_ParallelizeLongRunningService()
{
Console.WriteLine("Non-parallelized for loop");

for (int i = 0; i < employeeData.Count; i++)
    {
Console.WriteLine("Starting process for employee id {0}",
employeeData[i].EmployeeID);
decimal span =
PayrollServices.GetPayrollDeduction(employeeData[i]);
Console.WriteLine("Completed process for employee id {0}" +         
"process took {1} seconds",
employeeData[i].EmployeeID, span);
Console.WriteLine();
    }
}

5.從 Main() 中調用 Ex1Task1_ParallelizeLongRunningService 方法。

C#

static void Main(string[] args)
{
    ...
// Methods to call 
Ex1Task1_ParallelizeLongRunningService();
    ...
}

6.編譯並運行應用程序。

7.您應該可以看到全部員工按 ID 順序進行處理,類似於下圖(完成處理的准確時間可能不同):

圖 1

非並行調用持續運行服務的輸出

8.要使用並行特性,將以下方法添加到 Program 類中。該代碼使用靜態 Parallel 對象中的 For() 方法:

(代碼片段 – 並行擴展庫簡介實驗 - 練習 1 Ex1Task1_UseParallelForMethod)

C#

private static void Ex1Task1_UseParallelForMethod()
{
Parallel.For(0, employeeData.Count, i =>
    {
Console.WriteLine("Starting process for employee id {0}",
employeeData[i].EmployeeID);
decimal span =
PayrollServices.GetPayrollDeduction(employeeData[i]);
Console.WriteLine("Completed process for employee id {0}",
employeeData[i].EmployeeID);
Console.WriteLine();
    });
}

9.將 Main() 中的當前方法調用替換為對 Ex1Task1_UseParallelForMethod() 方法的調

用。

C#

static void Main(string[] args)
{
    ...
// Methods to call 
Ex1Task1_UseParallelForMethod();
    ...
}

注意:在以上代碼片段中,您可能看到了一些不熟悉的內容:

Parallel.ForEach(employeeData, ed => { /* …interesting code here… */ });

這些代碼是一種 C# 3.0 語言特性,即 lambda 語句,它起源於 lambda 運算。簡單來講,lambdas 是匿名代理和/或閉包的簡潔表示。

10.編譯並運行應用程序。

11.您應該可以看到不一定要按員工的 ID 順序來進行處理。您還可以注意到,在第一個調用返回之前調用了多次 GetPayrollDeduction() 方法。最後,您應該可以看到,與串行方式相比,並行運行調用將整個任務的完成速度提高了很多。

圖 2

並行調用持續運行服務的輸出

注意:由於循環采用並行方式執行,因此每次迭代都將按計劃在可用的內核上運行。這意味著並不一定要按順序來處理列表,從而避免對代碼造成顯著影響。您在設計代碼的時候應該確保循環中的各個迭代在彼此之間都是完全獨立的。任何迭代的正確運行都不能依賴於其他迭代。

12.並行擴展庫還提供了並行版本的 foreach 結構。以下代碼演示了實現此結構的並行方式。將以下代碼添加到 Program 類中。

(代碼片段 – 並行擴展庫簡介實驗 - 練習 1 Ex1Task1_StandardForEach)

C#

private static void Ex1Task1_StandardForEach()
{
foreach (Employee employee in employeeData)
    {
Console.WriteLine("Starting process for employee id {0}",
employee.EmployeeID);
decimal span =
PayrollServices.GetPayrollDeduction(employee);
Console.WriteLine("Completed process for employee id {0}",
employee.EmployeeID);
Console.WriteLine();
    }
}

13.在 Main() 方法中,將 Parallel.For(…) 循環替換為以下代碼:

C#

static void Main(string[] args)
{
    ...
// Methods to call 
Ex1Task1_StandardForEach();
    ...
}

14.編譯並運行應用程序。

注意:您應該可以注意到,程序還是按 ID 順序處理員工。還需注意完成任務所花費的總時間(准確時間會有所不同)

 

圖 3

非並行 for…each 實現的輸出

15.要利用 for…each 結構的並行擴展實現,您需要讓代碼使用靜態 Parallel 類中的 ForEach() 方法:

(代碼片段 – 並行擴展庫簡介實驗 - 練習 1 Ex1Task1_ParallelForEach)

C#

private static void Ex1Task1_ParallelForEach()
{
Parallel.ForEach(employeeData, ed =>
    {
Console.WriteLine("Starting process for employee id {0}",
ed.EmployeeID);
decimal span = PayrollServices.GetPayrollDeduction(ed);
Console.WriteLine("Completed process for employee id {0}",
ed.EmployeeID);
Console.WriteLine();
    });
}

16.將 Main() 中的當前代碼替換為一個 Ex1Task1_ParallelForEach 方法調用。

C#

static void Main(string[] args)
{
    ...
// Methods to call 
Ex1Task1_ParallelForEach();
    ...
}

17.編譯並運行應用程序。

18.您會再次發現,並不一定要按 ID 順序來處理員工,因為各循環是並行運行的,循環中的每次迭代都是單獨在可用的內核上運行的。另外,由於應用程序利用了所有可用的內核,因此完成任務的速度要比串行方式更快。

圖 4

並行 for…each 實現的輸出

注意:並行擴展庫還提供了一個實用的 Invoke() 方法,開發人員可以通過靜態 Parallel 類訪問它,從而能夠並行執行匿名方法或 lambda 表達式。為了演示如何使用 Invoke 方法,我們將分析一個常用的樹遍歷算法,然後展示如何通過並行方式實現它,從而縮短遍歷整個樹所需的總時間。

在本例中,我們將遍歷一個員工層次結構,並對我們遇到的各個員工調用 GetPayrollDeduction() 方法。

19.將 Main() 中的當前方法調用替換為一個 Ex1Task1_WalkTree() 方法調用。此代碼將實例化員工層次結構並調用樹遍歷方法。

C#

static void Main(string[] args)
{
    ...
// Methods to call 
Ex1Task1_WalkTree();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 1 Ex1Task1_WalkTree)

C#

private static void Ex1Task1_WalkTree()
{
EmployeeHierarchy employeeHierarchy = new EmployeeHierarchy();
WalkTree(employeeHierarchy);
}

20.將以下方法添加到 Program 類中:

(代碼片段 – 並行擴展庫簡介實驗 - 練習 1 WalkTree)

C#

private static void WalkTree(Tree<Employee> node)
{
if (node == null)
return;

if (node.Data != null)
    {
Employee emp = node.Data;
Console.WriteLine("Starting process for employee id {0}",
emp.EmployeeID);
decimal span = PayrollServices.GetPayrollDeduction(emp);
Console.WriteLine("Completed process for employee id {0}",
emp.EmployeeID);
Console.WriteLine();
    }

WalkTree(node.Left);
WalkTree(node.Right);
}

21.編譯並運行應用程序。

22.您會注意到員工按 ID 順序進行處理。還需注意完成遍歷樹花費的總時間(准確時間會有所不同)

圖 5

非並行樹遍歷的輸出

注意:使用上述非並行算法遍歷樹時,樹的結構必須能讓數據按 ID 順序寫出。

23.要采用並行方式遍歷樹,需要將 WalkTree() 方法結束部分的 WalkTree() 調用替換為靜態 Parallel 類的 Invoke() 方法調用。

C#

private static void WalkTree(Tree<Employee> node)
{
if (node == null)
return;

if (node.Data != null)
    {
Employee emp = node.Data;
Console.WriteLine("Starting process for employee id {0}",
emp.EmployeeID);
decimal span = PayrollServices.GetPayrollDeduction(emp);
Console.WriteLine("Completed process for employee id {0}",
emp.EmployeeID);
Console.WriteLine();
    }

Parallel.Invoke(delegate { WalkTree(node.Left); }, delegate { WalkTree(node.Right); });
}

24.編譯並運行應用程序。

25.您會注意到,樹中的員工不再采用相同處理方式,一些節點在其他節點還沒有完成時就開始處理了。還需要注意它遍歷整個樹所花的時間更少。

圖 6

並行樹遍歷的輸出

注意:Invoke() 方法根據內核可用性單獨計劃各個 WalkTree() 調用。這意味著不一定要以可預測的方式來遍歷樹。在設計代碼時需要記住這一點。

下一步:

練習 2:創建和運行並行化任務

練習 2:創建和運行並行化任務

並行擴展庫提供了一個 Task 類,可用於跨多個內核並行執行工作項。基本上,您可以將 Task 對象看作一個輕量級的工作單元,TaskManager 可以決定是否並行計劃它的運行。

當 Task 對象創建之後,您需要為它們提供一個包含要執行的邏輯的代理或 lambda 語句。然後,並行擴展庫的實際核心 TaskManager 將調度 Task,在不同內核的不同線程上運行。

注意:要驗證每個步驟是否正確執行,建議在每次任務結束時構建解決方案。

任務 1 –本機運行並行化任務

1.從 Start | All Programs | Microsoft Visual Studio 2010 | Microsoft Visual Studio 2010 打開 Microsoft Visual Studio 2010。

2.打開 %TrainingKitInstallationFolder%\Labs\IntroToParallelExtensions\Source\Ex02-CreateAndRunParallelizedTasks\begin 下的解決方案文件 ParallelExtLab.sln。您也可以繼續使用上一個練習完成時獲得的解決方案。

3.將 Main() 中的當前方法調用替換為 Ex2Task1_NativeParallelTasks() 方法調用。

C#

static void Main(string[] args)
{
    ...
// Methods to call 
Ex2Task1_NativeParallelTasks();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 2 Ex2Task1_NativeParallelTasks)

C#

private static void Ex2Task1_NativeParallelTasks()
{
Task task1 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); });
Task task2 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); });
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });
}

4.編譯並運行應用程序。

5.您應該會注意到,在並行運行時,一些任務可能必須在 Ex2Task1_NativeParallelTasks 方法退出並且控制權返回到 Main 之後才會完成。鑒於此,輸出時間也無法反映總處理時間,因為任務在控制權返回到 Main 之前並未完成。

圖 7

並行運行多個任務的輸出

任務 2 –使用 Wait() 和 WaitAll() 方法

並行執行任務的優勢是速度更快,並且可以利用多核處理器。但是,您還應該注意到在當前實現中,主應用程序可能在處理任務的線程結束之前退出。

我們可以通過對各個   Task  對象調用 Wait() 方法來解決此可能情形。這會造成主線程等待指定任務完成之後才繼續下一條指令。

1.將 Main() 中的當前方法替換為 Ex2Task2_WaitHandling() 調用。此代碼將為我們示例添加等待處理。

C#

static void Main(string[] args)
{
    ...
// Methods to call 
Ex2Task2_WaitHandling();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 2 Ex2Task2_WaitHandling)

C#

private static void Ex2Task2_WaitHandling()
{
Task task1 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); });
Task task2 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); });
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });

task1.Wait();
task2.Wait();
task3.Wait();
}

2.編譯並運行應用程序。

3.您應該注意到,這次所有三個任務都在報告最終時間之前完成了,這意味著主線程已結束。

圖 8

使用單獨的 Wait() 條件並行運行任務的輸出

注意:主線程等待 Task 對象完成之後才會繼續操作。這種方法比使用 ThreadPool.QueueUserWorkItem 更加簡單直觀,後者涉及創建和管理手動重置事件,並且可能還需要添加互鎖操作。

4.除了各個Task對象上的 Wait() 方法之外,靜態 Task 類還提供了一個 WaitAll() 方法,可允許您通過一次調用來等待指定的任務列表。要實際運行此方法,將任務 1、任務 2 和任務 3 的 Wait() 調用替換為以下內容:

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex2Task2_WaitHandlingWaitAll();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 2 Ex2Task2_WaitHandlingWaitAll)

C#

private static void Ex2Task2_WaitHandlingWaitAll()
{
Task task1 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); });
Task task2 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); });
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });

Task.WaitAll(task1, task2, task3);
}

5.編譯並運行應用程序。

6.您應該注意到主應用程序等待所有單獨的任務完成之後才會繼續執行。

圖 9

使用 WaitAll() 方法並行運行任務的輸出

任務 3 –使用 IsCompleted 屬性

您有時可能需要檢查某個 Task 的完成狀態,然後再執行其他任務(比如說,您可能需要運行另一個獨立於第一個任務的任務),但是您並不希望使用 Wait() 方法,因為 Wait() 會阻塞 Task 所依托線程的執行。對於這些情形,Task 類公開了一個 IsCompleted 屬性。這使我們能夠在繼續其他處理之前檢查 Task 對象是否完成了自己的任務。

1.將 Main() 中的當前方法替換為 Ex2Task3_TaskIsCompleted() 調用。

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex2Task3_TaskIsCompleted();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 2 Ex2Task3_TaskIsCompleted)

C#

private static void Ex2Task3_TaskIsCompleted()
{
Task task1 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); });

while (!task1.IsCompleted)
    {
Thread.Sleep(1000);
Console.WriteLine("Waiting on task 1");
    }

Task task2 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); });
while (!task2.IsCompleted)
    {
Thread.Sleep(1000);
Console.WriteLine("Waiting on task 2");
    }

Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });
while (!task3.IsCompleted)
    {
Thread.Sleep(1000);
Console.WriteLine("Waiting on task 3");
    }
}

2.編譯並運行應用程序。

3.您應該注意到,任務 2 和任務 3 直到之前任務的 IsCompleted 屬性變為 true 時才開始啟

動。

圖 10

並行運行任務並使用 IsCompleted 屬性的輸出

任務 4 –使用 ContinueWith() 方法

雖然 IsCompleted 屬性非常適用於輪詢 Task 以檢查它是否完成,以便能夠觸發更多工作,但 Task 類提供了一種更加便捷的方法。使用 ContinueWith() 方法可以更加輕松地將任務以特定的順序串接起來。

作為參數傳遞給 ContinueWith() 方法的功能將在 Task 對象的邏輯繼續執行時執行。

1.將 Main() 中的當前方法替換為 Ex2Task4_ContinueWith() 調用。

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex2Task4_ContinueWith();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 2 Ex2Task4_ContinueWith)

C#

private static void Ex2Task4_ContinueWith()
{
Task task3 = Task.Factory.StartNew(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[0]); })
.ContinueWith(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[1]); })
.ContinueWith(delegate
{ PayrollServices.GetPayrollDeduction(employeeData[2]); });

task3.Wait();
}

注意:此處,我們像往常一樣創建了第一個 Task,但我們使用 ContinueWith() 方法讓運行時按順序執行後續調用。

2.編譯並運行應用程序。

3.您應該注意到任務是按順序執行的——員工 1、員工 2、員工 3 依次執行。

圖 11

並行運行任務並使用 ContinueWith 確保其順序和等待條件時的輸出。

下一步:

練習 3:使用 Task<T> 類創建和運行可返回值的任務

練習 3:使用 Task<T> 類創建和運行可返回值的任務

如您所見,Tasks 可用於在並行環境中啟動功能單元;它們還提供了一種在執行功能單元後返回值的機制。

為了演示此機制,我們將創建一個新的 Task<decimal> 實例,然後使用靜態 Task.Factory.StartNew() 方法,以便以允許獲取返回值的方式來執行 GetPayrollDeduction() 方法。

任務 1 –捕獲任務的返回值

1.從 Start | All Programs | Microsoft Visual Studio 2010 | Microsoft Visual Studio 2010 打開 Microsoft Visual Studio 2010。

2.打開 %TrainingKitInstallationFolder%\Labs\IntroToParallelExtensions\Source\Ex03-UseTaskResult\begin 下的解決方案文件 ParallelExtLab.sln。您也可以繼續使用上一個練習完成時獲得的解決方案。

3.將 Main() 中的當前方法替換為 Ex3Task1_TaskReturnValue() 調用。

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex3Task1_TaskReturnValue();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習3 Ex3Task1_TaskReturnValue��

C#

private static void Ex3Task1_TaskReturnValue()
{
Console.WriteLine("Calling parallel task with return value");
var data = Task.Factory.StartNew(() =>
PayrollServices.GetPayrollDeduction(employeeData[0]));
Console.WriteLine("Parallel task returned with value of {0}",
data.Result);
}

注意:我們將通過檢查 data.Result 屬性來獲取值。如果任務在 Result 屬性被調用時已完成,那麼它會立即返回所捕獲的值,否則它會阻塞代碼執行,直到任務完成並且可以檢索值。在上面的示例中,我們剛才調用了 Result 屬性,但它並不是理想的情形。Task<T> 最適合的情形是:在觸發工作單元時,需要在稍後的某個時刻檢索返回值。

4.編譯並運行應用程序。

5.您會注意到任務已完成並且提供了一個返回值。

圖 12

運行任務以捕獲返回值的輸出

下一步:

練習 4:使用 PLINQ 並行化 LINQ 查詢

練習 4:使用 PLINQ 並行化 LINQ 查詢

開發人員可以使用並行 LINQ (PLINQ) 優化他們的 LINQ 查詢,使它們能夠並行執行。

並行擴展庫提供了許多不同的方式來為 LINQ 查詢實現並行機制。PLINQ 為我們提供了一個 System.Linq.ParallelEnumerable 類,它提供了與 System.Linq.Enumerable 類相似的功能。

任務 1 –使用 ParallelEnumerable 類的靜態方法並行化 LINQ

在此任務中,您將繼續使用之前練習中的解決方案。

1.從 Start | All Programs | Microsoft Visual Studio 2010 | Microsoft Visual Studio 2010 打開 Microsoft Visual Studio 2010。

2.打開%TrainingKitInstallationFolder%\Labs\IntroToParallelExtensions\Source\Ex04-PLINQ\begin 下的解決方案文件 ParallelExtLab.sln。您也可以繼續使用上一個練習完成時獲得的解決方案。

3.將 Main() 中的當前方法替換為 Ex4Task1_PLINQ() 調用。

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex4Task1_PLINQ();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 4 Ex4Task1_PLINQ)

C#

static void Ex4Task1_PLINQ()
{
var q = Enumerable.Select(
Enumerable.OrderBy(
Enumerable.Where(employeeData,
x => x.EmployeeID % 2 == 0),
x => x.EmployeeID),
x => PayrollServices.GetEmployeeInfo(x))
.ToList();

foreach (var e in q)
    {
Console.WriteLine(e);
    }
}

注意:Select()、OrderBy() 和 Where() 方法是對 IEnumerable<T> 類的擴展方法,但我們在此處將以靜態方式訪問它們。我們稍後將演示一種更加簡單的用法。

ToList() 調用供演示之用,而在生產代碼中通常是不需要的。此處使用它的原因是,我們希望立即觸發 LINQ 查詢來收集所有 Employee Info 字符串,並在稍後將它們輸出到屏幕。

如果將 ToList() 刪除,則查詢仍然可以按 Employee ID 的順序觸發,但各個 GetEmployeeInfo() 調用必須在 foreach 循環迭代了 IEnumerable<T> 之後才會觸發。這稱作延遲執行。

有關更多信息,請參閱 Scott Wisniewski 的文章:http://msdn.microsoft.com/en-us/magazine/cc163378.aspx。

4.編譯並運行應用程序。

5.您應該會注意到,LINQ 查詢是按員工 ID 的順序執行的。還需注意完成任務花費的總時間(准確時間會有所不同):

圖 13

非並行 LINQ 查詢的輸出

6.可以使用靜態 ParallelEnumerable 類中的相同 LINQ 方法來並行化此查詢。另外,您需要向查詢的數據源添加一個 AsParallel() 調用。修改 Main() 方法,讓它只調用 PLINQAsParallel 方法。

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex4Task1_PLINQAsParallel();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 4 Ex4Task1_PLINQAsParallel)

C#

static void Ex4Task1_PLINQAsParallel()
{
var q = ParallelEnumerable.Select(
ParallelEnumerable.OrderBy(
ParallelEnumerable.Where(employeeData.AsParallel(),
x => x.EmployeeID % 2 == 0),
x => x.EmployeeID),
x => PayrollServices.GetEmployeeInfo(x))
.ToList();

foreach (var e in q)
    {
Console.WriteLine(e);
    }
}

注意:Select()、OrderBy() 和 Where() 方法(之前由 Enumerable 靜態類調用)現在將由 ParallelEnumerable 類調用。還需注意,數據源中添加了一個 AsParallel() 調用。

7.編譯並運行應用程序。

8.您應該注意到 LINQ 查詢不再以特定順序執行操作。還需注意,在本例中,並行版本的程序要比非並行版本的程序運行更快(在雙核機器上,運行時間大概減少了一半;您的結果因所使用的硬件而異)。

圖 14

並行 LINQ 查詢的輸出

注意:操作將以物理內核允許的最大並行操作數量並行執行。

任務 2 –使用 ParallelEnumerable 類的擴展方法並行化 LINQ

如前所述,使用 Enumerable 和 ParallelEnumerable 類的靜態 LINQ 的一種更加簡潔的方法是將它們作為擴展方法使用。

1.將使用擴展方法實現的非並行 LINQ 查詢轉換成 PLINQ 查詢非常簡單。將 Main() 方法中的 PLINQ 查詢替換為以下 LINQ 查詢:

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex4Task2_Extensions();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 4 Ex4Task2_Extensions)

C#

private static void Ex4Task2_Extensions()
{
var q = employeeData.
Where(x => x.EmployeeID % 2 == 0).OrderBy(x => x.EmployeeID)
.Select(x => PayrollServices.GetEmployeeInfo(x))
.ToList();

foreach (var e in q)
    {
Console.WriteLine(e);
    }
}

注意: ToList() 仍然用於立即執行 LINQ 查詢,而不是等待 foreach 在稍後迭代執行 Select() 返回的 IEnumerable<T> 時才執行。我們將避免延遲執行。

2.編譯並運行應用程序。

3.您應該會注意到,LINQ 查詢是按員工 ID 的順序執行的。還需注意完成任務花費的總時間(准確時間會有所不同):

圖 15

4.要並行化此 LINQ 查詢,只需要將 Main() 中的當前方法替換為 AsParallel() 方法。

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex4Task2_ConvertToParallelExtensions();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 4 Ex4Task2_ConvertToParallelExtensions)

C#

private static void Ex4Task2_ConvertToParallelExtensions()
{
var q = employeeData.AsParallel()
.Where(x => x.EmployeeID % 2 == 0).OrderBy(x => x.EmployeeID)
.Select(x => PayrollServices.GetEmployeeInfo(x))
.ToList();

foreach (var e in q)
    {
Console.WriteLine(e);
    }
}

5.編譯並運行應用程序。

6.您應該會注意到,與基於 ParallelEnumerable 靜態類的 LINQ 查詢類似,作為擴展方法實現的 PLINQ 查詢不會再按 EmployeeID 的順序執行操作。操作將以物理內核允許的最大並行操作數量並行執行。還需注意,與之前的並行化 LINQ 示例相同,並行程序完成所需的時間大約只有非並行程序的一半。

圖 16

使用擴展方法的並行 LINQ 查詢的輸出

任務 3 –使用支持查詢理解語法 (Query Comprehension Syntax) 的 AsParallel()

在此任務中,您將使用並行擴展庫和 AsParallel() 方法創建使用查詢理解語法的並行 LINQ 查詢。

1.將 Main() 方法中的 PLINQ 查詢替換為以下查詢理解語法:

C#

static void Main(string[] args)
{
    ...
// Methods to call
Ex4Task3_PLINQComprehensionSyntax();
    ...
}

(代碼片段 – 並行擴展庫簡介實驗 - 練習 4 Ex4Task3_PLINQComprehensionSyntax)

C#

private static void Ex4Task3_PLINQComprehensionSyntax()
{
var q = from e in employeeData.AsParallel()
where e.EmployeeID % 2 == 0
orderby e.EmployeeID
select PayrollServices.GetEmployeeInfo(e);

foreach (var e in q)
    {
Console.WriteLine(e);
    }
}

2.編譯並運行應用程序。

3.您應該會注意到,雖然 LINQ 語法發生了變化,但數據仍然采用與 ParallelEnumerable 擴展方法中相同的並行方式執行。

圖 17

使用查詢理解語法的並行 LINQ 查詢的輸出

總結

在本實驗中,您通過操作並行擴展庫理解了它的各種特性,從而能以簡單、可控的方式執行並行任務。您已經了解了如何使用 Parallel 和 Task 等並行擴展類來管理工作單元。您使用了

Wait()、WaitAll()、IsComplete() 和 ContinueWith() 等並行擴展特性來控制執行流程。您還了解了如何使用 PLINQ 來處理並行化查詢。

本實驗幫助您很好地掌握了並行擴展庫以及它的優勢。有關更多信息,我們建議您訪問以下站點:

MSDN 上的並行擴展博客:http://blogs.msdn.com/pfxteam/

MSDN 上的並行計算論壇:http://forums.microsoft.com/MSDN/default.aspx?ForumGroupID=551&SiteID=1

並行計算開發人員中心:http://msdn.microsoft.com/en-us/library/ms792872.aspx

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved