程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> C# 並行編程 之 PLINQ 規約操作和聚合函數

C# 並行編程 之 PLINQ 規約操作和聚合函數

編輯:C#入門知識

C# 並行編程 之 PLINQ 規約操作和聚合函數


概要

PLINQ可以簡化對一個序列或一個組中所有成員應用同一個函數的過程,這個過程稱之為規約操作。類似Sum()函數就是一個規約操作。PLINQ提供一個可重載Aggregate的接口,這裡用戶可以定義自己的規約函數。

規約操作是對每一個成員進行的操作,當操作完成後有可能需要將操作結果進行匯總得到一個最終的結果,這個就是聚合的概念。

規約操作

示例中要求計算 1 到 50000000中能被5整除的數除以PI以後得到的平均數。它可以用LINQ完成,也可以用PLINQ完成。

代碼示例:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;

namespace Sample6_2_plinq_calculate
{
    class Program
    {
        static int NUM_INTS = 50000000;

        static IEnumerable GenerateInputeData()
        {
            return Enumerable.Range(1, NUM_INTS);
        }

        static ParallelQuery GenerateInputeData4Parallel()
        {
            return ParallelEnumerable.Range(1, NUM_INTS);
        }

        static void Main(string[] args)
        {
            var seqTarget = GenerateInputeData();

            Console.WriteLine("============================================================");
            Console.WriteLine("TEST NORMAL LINQ");
            Console.WriteLine("============================================================");
            var swatchpn = Stopwatch.StartNew();

            var seqQuery = (from intNum in seqTarget
                            where ((intNum % 5) == 0)
                            select (intNum / Math.PI)).Average();
            swatchpn.Stop();

            Console.WriteLine("LINQ Result: " + seqQuery + "    LINQ Use Time: {0}", swatchpn.Elapsed);



            var palTarget = GenerateInputeData4Parallel();
            Console.WriteLine("\n\n");
            Console.WriteLine("============================================================");
            Console.WriteLine("TEST PARALLEL LINQ");
            Console.WriteLine("============================================================");
            var swatchp = Stopwatch.StartNew();

            var palQuery = (from intNum in palTarget.AsParallel()
                            where ((intNum % 5) == 0)
                            select (intNum / Math.PI)).Average();
            swatchp.Stop();

            Console.WriteLine("PLINQ Result: " + palQuery + "    LINQ Use Time: {0}", swatchp.Elapsed);

            Console.ReadLine();

        }
    }
}

測試結果:

這裡寫圖片描述

聚合操作

代碼示例會計算一個數組的標准偏差,偏度,和峰度來說明聚合的使用。
順便補補數學吧:

標准偏差:一種量度數據分布的分散程度之標准,用以衡量數據值偏離算術平均值的程度。標准偏差越小,這些值偏離平均值就越少,反之亦然。標准偏差的大小可通過標准偏差與平均值的倍率關系來衡量。
圖片公式來自百度百科。
這裡寫圖片描述
偏度:偏度系數是描述分布偏離對稱性程度的一個特征數。當分布左右對稱時,偏度系數為0。當偏度系數大於0時,即重尾在右側時,該分布為右偏。當偏度系數小於0時,即重尾在左側時,該分布左偏。

這裡寫圖片描述

這裡寫圖片描述


峰度:表示分布相對於正太分布而言是更加高聳還是更加平坦。正值表示相對高聳的分布,負值表示相對平坦的峰度。簡單的說,峰度是描述分布形態的陡緩程度。也可以這樣理解,在相同的標准差下,峰度系數越大,分布就有更多的極端值,那麼其余值必然要更加集中在眾數周圍,其分布必然就更加陡峭。

這裡寫圖片描述


關於Aggregate 函數的參數說明參考
https://msdn.microsoft.com/en-us/zh-en/library/dd383667(v=vs.110).aspx

關於參數的簡單說明:

seed:是累加器初始化的值。 update accumulator function:對數組中每一個值進行運算,PLINQ中由於它是對數據源進行了分區然後並行運算的,這一步產生的結果其實是保存的每一個分區的計算結果。 combine accumulator function:將每一分區的計算結果進行累加,得到一個總的數組的累加結果。 result selector:對累加結果進行運算,得到最終的結果,也就是返回值。

示例的重點並不是各種數字運算,而是說明Aggregate() 可以對數據源每一個元素運算後將結果進行匯總再次運算,它可以在一個步驟中完成,省去了分別編寫的麻煩。而且它對數據運算時是數據分區,任務並行的。<喎?http://www.Bkjia.com/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxwPs/Cw+bKx7zGy+O1xLT6wuvKvsD9o7o8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;"> using System; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Diagnostics; using System.Linq; using System.IO; using System.Collections.Generic; namespace Sample6_2_plink_aggregate { class Program { static void Main(string[] args) { int[] inputInts = {0,3,4,8,15,22,34,57,68,32,30}; var mean = inputInts.AsParallel().Average(); var standarddeviation = inputInts.AsParallel().Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => Math.Sqrt((finalSum / (inputInts.Count()-1))) ); var skewness = inputInts.AsParallel().Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count()-1)*(inputInts.Count()-2)) ); var kurtosis = inputInts.AsParallel().Aggregate( 0d, // seed // update accumulator function // An accumulator function to be invoked on each element in a partition (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4), // combine accumulator function // An accumulator function to be invoked on the yielded accumulator result from each partition. (total, thisTask) => total + thisTask, // result selector // A function to transform the final accumulator value into the result value. (finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) / ((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) - (3 * Math.Pow((inputInts.Count() - 2), 2)) / ((inputInts.Count() - 2) * (inputInts.Count() - 3)) ); Console.WriteLine("============================================================"); Console.WriteLine("TEST Parallel LINQ Calculate Result"); Console.WriteLine("============================================================"); Console.WriteLine("Mean : {0}", mean); Console.WriteLine("Standard Deviaton : {0}", standarddeviation); Console.WriteLine("Skewness : {0}", skewness); Console.WriteLine("Kurtosis : {0}", kurtosis); Console.ReadLine(); } } }

並發的PLINQ任務和任務的取消

PLINQ同樣也可以和其他形式的並發任務一起使用。例如在計算 標准偏差,偏度和峰度的過程中。
實際的執行順序是 平均值 => 標准偏差 => 偏度 => 峰度

但根據運算的公式,完全可以把偏度和峰度進行並行化處理的。標准差是他們公共的輸入。
平均值 => 標准偏差 => 偏度
=> 峰度

它們完全可以使用ContinueWith操作,如果有超時控制或取消需要的話,可以使用WithCancellation() 接口。

代碼示例:
代碼中用函數將 PLINQ 的操作又進行了封裝,然後用Task的方式進行並行化的調用。deferredCancelTask 是一個搗亂任務,如果把注釋打開,在2秒時它會發出一個Cancel信號,取消任務的執行,並且在異常處理時打印任務的狀態。

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;
using System.IO;
using System.Collections.Generic;


namespace Sample6_4_parallel_task_with_plinq
{
    class Program
    {
        private static ParallelQuery inputInts =
            ParallelEnumerable.Range(1, 100000000);

        private static double CalculateMean(System.Threading.CancellationToken ct)
        {
            return inputInts.AsParallel().WithCancellation(ct).Average();
        }

        private static double CalculateStandardDeviation(System.Threading.CancellationToken ct, double mean)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow((thisNumber - mean), 2),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => Math.Sqrt((finalSum / (inputInts.Count() - 1)))
                );
        }

        private static double CalculateSkewness(System.Threading.CancellationToken ct, double mean, double standarddeviation)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 3),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => (finalSum * inputInts.Count()) / ((inputInts.Count() - 1) * (inputInts.Count() - 2))
                );
        }

        private static double CalculateKurtosis(System.Threading.CancellationToken ct, double mean, double standarddeviation)
        {
            return inputInts.AsParallel().WithCancellation(ct).Aggregate(

                0d, // seed

                // update accumulator function
                // An accumulator function to be invoked on each element in a partition
                (subTotal, thisNumber) => subTotal + Math.Pow(((thisNumber - mean) / standarddeviation), 4),

                // combine accumulator function
                // An accumulator function to be invoked on the yielded accumulator result from each partition.
                (total, thisTask) => total + thisTask,

                // result selector
                // A function to transform the final accumulator value into the result value.
                (finalSum) => ((finalSum * inputInts.Count() * (inputInts.Count() + 1)) /
                    ((inputInts.Count() - 1) * (inputInts.Count() - 2) * (inputInts.Count() - 3))) -
                    (3 * Math.Pow((inputInts.Count() - 2), 2)) /
                    ((inputInts.Count() - 2) * (inputInts.Count() - 3))
                );
        }

        static void Main(string[] args)
        {
            Console.WriteLine("============================================================");
            Console.WriteLine("TEST Parallel TASK work with PLINQ");
            Console.WriteLine("============================================================");

            var cts = new System.Threading.CancellationTokenSource();
            var ct = cts.Token;


            var TaskMean = new Task(()=> CalculateMean(ct), ct);
            var TaskSTDev = TaskMean.ContinueWith((t) => { return CalculateStandardDeviation(ct, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);
            var TaskSkewness = TaskSTDev.ContinueWith((t) => { return CalculateSkewness(ct, TaskMean.Result, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);
            var TaskKurtosis = TaskSTDev.ContinueWith((t) => { return CalculateKurtosis(ct, TaskMean.Result, t.Result); },
                                                          TaskContinuationOptions.OnlyOnRanToCompletion);

            //var deferredCancelTask = Task.Factory.StartNew(() => { System.Threading.Thread.Sleep(2000); cts.Cancel();});

            try
            {
                TaskMean.Start();

                Task.WaitAll(TaskSkewness, TaskKurtosis);
                Console.WriteLine("Mean : {0}", TaskMean.Result);
                Console.WriteLine("Standard Deviaton : {0}", TaskSTDev.Result);
                Console.WriteLine("Skewness : {0}", TaskSkewness.Result);
                Console.WriteLine("Kurtosis : {0}", TaskKurtosis.Result);

            }
            catch(AggregateException aex)
            {
                foreach (var ex in aex.InnerExceptions)
                {
                    //Console.WriteLine(ex.ToString());

                    if (ex is TaskCanceledException)
                    {
                        Console.WriteLine("Mean Task: {0}", TaskMean.Status);
                        Console.WriteLine("Standard Deviation Task: {0}", TaskSTDev.Status);
                        Console.WriteLine("Skewness Task: {0}", TaskSkewness.Status);
                        Console.WriteLine("Kurtosis Task: {0}", TaskKurtosis.Status);
                    }
                }
            }

            Console.ReadLine();
        }
    }
}

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved