程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> Parallel Programming-多消費者,多生產者同時運行並行,parallelprogramming

Parallel Programming-多消費者,多生產者同時運行並行,parallelprogramming

編輯:C#入門知識

Parallel Programming-多消費者,多生產者同時運行並行,parallelprogramming


在上一篇文章演示了並行的流水線操作(生產者和消費者並行同時執行),C#是通過BlockingCollection這個線程安全的對象作為Buffer,並且結合Task來實現的。但是上一篇文章有個缺陷,在整個流水線上,生產者和消費者是唯一的。本文將演示多個消費者多個生產者同時並行執行。

一、多消費者、多生產者示意圖

 與前一篇文章演示的流水線思想類似,不同之處就是本文的topic:消費者和生產者有多個,以buffer1為例,起生產者有兩個,消費者有兩個,現在有三個緯度的並行:

二、實現

2.1 代碼

 class PiplelineDemo
    {
        private int seed;
        public PiplelineDemo()
        {
            seed = 10;
        }

        public void Action11(BlockingCollection<string> output)
        {
            for (var i = 0; i < seed; i++)
            {
                output.Add(i.ToString());//initialize data to buffer1
            }
        }

        public void Action12(BlockingCollection<string> output)
        {
            for (var i = 0; i < seed; i++)
            {
                output.Add(i.ToString());//initialize data to buffer1
            }
        }

        public void Action21(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            foreach (var item in input.GetConsumingEnumerable())
            {
                var itemToInt = int.Parse(item);
                output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
            }
        }

        public void Action22(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            foreach (var item in input.GetConsumingEnumerable())
            {
                var itemToInt = int.Parse(item);
                output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
            }
        }

        public void Action31(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            foreach (var item in input.GetConsumingEnumerable())
            {
                output.Add((item));// add new data to buffer3
            }
        }

        public void Action32(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            foreach (var item in input.GetConsumingEnumerable())
            {
                output.Add((item));// add new data to buffer3
            }
        }
        public void Pipeline()
        {
            var buffer1 = new BlockingCollection<string>(seed * 2);
            var buffer2 = new BlockingCollection<string>(seed * 2);
            var buffer3 = new BlockingCollection<string>(seed * 2);
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            var stage11 = taskFactory.StartNew(() => Action11(buffer1));
            var stage12 = taskFactory.StartNew(() => Action12(buffer1));
            Task.Factory.ContinueWhenAll(new Task[] { stage11, stage12 }, (tasks) =>
            {
                buffer1.CompleteAdding();
            });
            var stage21 = taskFactory.StartNew(() => Action21(buffer1, buffer2));
            var stage22 = taskFactory.StartNew(() => Action22(buffer1, buffer2));
            Task.Factory.ContinueWhenAll(new Task[] { stage21, stage22 }, (tasks) =>
            {
                buffer2.CompleteAdding();
            });
            var stage31 = taskFactory.StartNew(() => Action31(buffer2, buffer3));
            var stage32 = taskFactory.StartNew(() => Action32(buffer2, buffer3));
            Task.Factory.ContinueWhenAll(new Task[] { stage31, stage32 }, (tasks) =>
            {
                buffer3.CompleteAdding();
            });
            Task.WaitAll(stage11, stage12, stage21, stage22, stage31, stage32);
            foreach (var item in buffer3.GetConsumingEnumerable())//print data in buffer3
            {
                Console.WriteLine(item);
            }
        }
    }

2.2 運行結果

2.3 代碼解釋

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved