程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> 統計大文件裡,頻率最高的10個單詞,(C# TPL DataFlow版),

統計大文件裡,頻率最高的10個單詞,(C# TPL DataFlow版),

編輯:C#入門知識

統計大文件裡,頻率最高的10個單詞,(C# TPL DataFlow版),


最近公司搞了一個寫程序的比賽,要求從2G的文件裡統計出出現頻率最高的10個單詞。

最開始的想法是使用字典樹,後來發現字典樹更適合用在找前綴上,在查找沒有hash表效率高。

之後使用Hash表+DataFlow完成了功能,2G的文件處理在20秒以內(其實我有信心優化到10秒以內,但是太折騰了)。

這是我的設計圖:

為什麼要形成那麼多結果?因為我不想寫鎖,寫鎖會降低很多效率,而且也失去了線程的意義,每個線程做自己的工作,

最後在把每個線程處理的結果匯總起來,這樣也符合fork join 的設計。

而且我也試過,如果寫鎖的話,效率會降低10秒以上,我也嘗試過微軟提供的ConcurrentDictionary 原子哈希表,但是效果都不是

很理想,而且,在並行的年代,在寫鎖這個東西,感覺很惡心,好像在代碼裡加了一坨屎一樣,我以前就很討厭鎖,也出現過代碼死鎖的情況。

最後我選擇了使用微軟的TPL 庫來解決並行的問題。

使用DataFlow解決了我處理時多線程管理的問題,還有線程等待消息隊列的問題,

使用BufferBlock 進行主控與工作線程之間消息傳遞,這是我的設計圖:

 

讀取文件之後使用BufferBlock.Post發送給工作線程,工作線程使用TryReceive接收消息,並且處理。

在MSDNhttps://msdn.microsoft.com/zh-cn/library/hh228601(v=vs.110).aspx 裡有詳細的介紹。

這是典型的單生產者,多使用者的列子。

代碼方面首先是讀取文件:

  public class FileBufferBlock
    {
       
        private string _fileName;
        BufferBlock<WordStream> _buffer = null;
        public FileBufferBlock(BufferBlock<WordStream> buffer,string fileName)
        {
            this._fileName = fileName;
            this._buffer = buffer;
        }

        /// <summary>
        /// 按32M讀取文件,循環發送給WordBufferBlock
        /// </summary>
        public void ReadFile()
        {
            using (FileStream fs = new FileStream(_fileName, FileMode.Open, FileAccess.Read))
            {
                using (StreamReader sr = new StreamReader(fs))
                {
                    while (!sr.EndOfStream)
                    {

                        char[] charBuffer = new char[32 * 1024 * 1024];
                        sr.ReadBlock(charBuffer, 0, charBuffer.Length);
                        _buffer.Post(new WordStream(charBuffer));
                    }
                }
            }
            _buffer.Complete();
        }

在這裡使用BufferBlock.Post 發送消息給工作線程,如果不用它,你得去找個能阻塞的消息隊列。

下面是我的接收方的代碼,使用BufferBlock.TryReceive 接收消息,然後處理,在這裡可以開多個個線程去處理。

而且線程是它幫你管理的:

// --------------------------------------------------------------------------------------------------------------------
// <copyright file="WordProcessBufferBlock.cs" company="yada">
//   Copyright (c) yada Corporation. All rights reserved.
// </copyright>
// change by qugang 2015.4.18
// 描述:用於截取單詞的工作線程
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace WordStatistics
{
    public class WordProcessBufferBlock
    {
        private int _taskCount = 1;
        BufferBlock<WordStream> _buffer = null;
        private List<Task<Dictionary<string, int>>> _list = new List<Task<Dictionary<string, int>>>();

        /// <summary>
        /// 單詞處理類
        /// </summary>
        /// <param name="taskCount">工作線程數</param>
        /// <param name="buffer">DataFlow的BufferBlock</param>
        public WordProcessBufferBlock(int taskCount, BufferBlock<WordStream> buffer)
        {
            _taskCount = taskCount;
            this._buffer = buffer;
        }

        public void StartWord()
        {
            for (int i = 0; i < _taskCount; i++)
            {
                _list.Add(Process());
            }
        }
        /// <summary>
        /// 等待所有工作完成
        /// </summary>
        /// <param name="f">完成後的工作函數</param>
        public void WaitAll(Action<Dictionary<string,int>> f)
        {
            Task.WaitAll(_list.ToArray());
            foreach (var row in _list)
            {
                f(row.Result);
            }
        }

        /// <summary>
        /// 使用BufferBlock.TryReceive循環從消息裡取從FileBufferBlock發送的buffer
        /// </summary>
        /// <returns>工作結果</returns>
        private async Task<Dictionary<string, int>> Process()
        {
            Dictionary<string, int> dic = new Dictionary<string, int>();
            while (await _buffer.OutputAvailableAsync())
            {
                WordStream ws;
                while (_buffer.TryReceive(out ws))
                {
                    foreach (string value in ws)
                    {
                        if (dic.ContainsKey(value))
                        {
                            dic[value]++;
                        }
                        else
                        {
                            dic.Add(value, 1);
                        }
                    }
                }
            }
            return dic;
        }
    }
}

WordStrem是我自己寫的一個單詞枚舉流,繼承了IEnumerable接口,將找單詞的算法寫到枚舉器裡面,實現流化。

// --------------------------------------------------------------------------------------------------------------------
// <copyright file="WordStatistics.cs" company="yada">
//   Copyright (c) yada Corporation. All rights reserved.
// </copyright>
// change by qugang 2015.4.18
// 單詞枚舉器:算法從開始找字母,如果不是字母,則返回從pos 到end 的組成單詞
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace WordStatistics
{
    /// <summary>
    /// 單詞枚舉器
    /// </summary>
    public class WordStream : IEnumerable
    {
        private char[] buffer;
        public WordStream(char[] buffer)
        {
            this.buffer = buffer;
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return (IEnumerator)GetEnumerator();
        }

        public WordStreamEnum GetEnumerator()
        {
            return new WordStreamEnum(this.buffer);
        }


    }

    public class WordStreamEnum : IEnumerator
    {
        private char[] buffer;
        int pos = 0;
        int endCount = 0;
        int index = -1;

        public WordStreamEnum(char[] buffer)
        {
            this.buffer = buffer;
        }

        public bool MoveNext()
        {
            while (index < buffer.Length - 1)
            {
                index++;
                char buff = buffer[index];
                if ((buff >= 'a' && buff <= 'z') || (buff >= 'A' && buff <= 'Z'))
                {
                    if (endCount == 0)
                    {
                        pos = index;
                        endCount++;
                    }
                    else
                    {
                        endCount++;
                    }
                }
                else
                {
                    if (endCount != 0)
                        return true;
                }
                if (buff == '\0')
                {
                    return false;
                }
            }
            return false;
        }

        public object Current
        {
            get
            {
                int tempInt = endCount;
                endCount = 0;
                return new string(buffer, pos, tempInt);
            }
        }

        public void Reset()
        {
            index = -1;
        }
    }

}

到這裡就完成了,然後再Main函數裡添加調用

  static void Main(string[] args)
        {
            DateTime dt = DateTime.Now;

            var buffer = new BufferBlock<WordStream>();

            //創建工作BufferBlock
            WordProcessBufferBlock wb = new WordProcessBufferBlock(8, buffer);
            wb.StartWord();

            //創建讀取文件,發送的BufferBlock
            FileBufferBlock fb = new FileBufferBlock(buffer, @"D:\content.txt");
            fb.ReadFile();

            Dictionary<string,int> dic = new Dictionary<string,int>();

            //等待工作完成匯總結果
            wb.WaitAll(p =>
                {
                    foreach (var row in p)
                    {
                        if (!dic.ContainsKey(row.Key))
                            dic.Add(row.Key, row.Value);
                        else
                        {
                            dic[row.Key] += row.Value;
                        }
                    }
                }
                );

            var myList = dic.ToList();
            myList.Sort((p, v) => v.Value.CompareTo(p.Value));
            foreach (var row in myList.Take(10))
            {
                Console.WriteLine(row);
            }

            
            Console.WriteLine(DateTime.Now - dt);

        }

最後2G的文件,我的機器跑出來是19秒多。

如果代碼沒有包,請從NuGet上下載Dataflow包。

代碼下載:http://files.cnblogs.com/files/qugangf/WordStatistics.rar

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved