最近公司搞了一個寫程序的比賽,要求從2G的文件裡統計出出現頻率最高的10個單詞。
最開始的想法是使用字典樹,後來發現字典樹更適合用在找前綴上,在查找沒有hash表效率高。
之後使用Hash表+DataFlow完成了功能,2G的文件處理在20秒以內(其實我有信心優化到10秒以內,但是太折騰了)。
這是我的設計圖:

為什麼要形成那麼多結果?因為我不想寫鎖,寫鎖會降低很多效率,而且也失去了線程的意義,每個線程做自己的工作,
最後在把每個線程處理的結果匯總起來,這樣也符合fork join 的設計。
而且我也試過,如果寫鎖的話,效率會降低10秒以上,我也嘗試過微軟提供的ConcurrentDictionary 原子哈希表,但是效果都不是
很理想,而且,在並行的年代,在寫鎖這個東西,感覺很惡心,好像在代碼裡加了一坨屎一樣,我以前就很討厭鎖,也出現過代碼死鎖的情況。
最後我選擇了使用微軟的TPL 庫來解決並行的問題。
使用DataFlow解決了我處理時多線程管理的問題,還有線程等待消息隊列的問題,
使用BufferBlock 進行主控與工作線程之間消息傳遞,這是我的設計圖:

讀取文件之後使用BufferBlock.Post發送給工作線程,工作線程使用TryReceive接收消息,並且處理。
在MSDNhttps://msdn.microsoft.com/zh-cn/library/hh228601(v=vs.110).aspx 裡有詳細的介紹。
這是典型的單生產者,多使用者的列子。
代碼方面首先是讀取文件:
public class FileBufferBlock
{
private string _fileName;
BufferBlock<WordStream> _buffer = null;
public FileBufferBlock(BufferBlock<WordStream> buffer,string fileName)
{
this._fileName = fileName;
this._buffer = buffer;
}
/// <summary>
/// 按32M讀取文件,循環發送給WordBufferBlock
/// </summary>
public void ReadFile()
{
using (FileStream fs = new FileStream(_fileName, FileMode.Open, FileAccess.Read))
{
using (StreamReader sr = new StreamReader(fs))
{
while (!sr.EndOfStream)
{
char[] charBuffer = new char[32 * 1024 * 1024];
sr.ReadBlock(charBuffer, 0, charBuffer.Length);
_buffer.Post(new WordStream(charBuffer));
}
}
}
_buffer.Complete();
}
在這裡使用BufferBlock.Post 發送消息給工作線程,如果不用它,你得去找個能阻塞的消息隊列。
下面是我的接收方的代碼,使用BufferBlock.TryReceive 接收消息,然後處理,在這裡可以開多個個線程去處理。
而且線程是它幫你管理的:
// --------------------------------------------------------------------------------------------------------------------
// <copyright file="WordProcessBufferBlock.cs" company="yada">
// Copyright (c) yada Corporation. All rights reserved.
// </copyright>
// change by qugang 2015.4.18
// 描述:用於截取單詞的工作線程
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace WordStatistics
{
public class WordProcessBufferBlock
{
private int _taskCount = 1;
BufferBlock<WordStream> _buffer = null;
private List<Task<Dictionary<string, int>>> _list = new List<Task<Dictionary<string, int>>>();
/// <summary>
/// 單詞處理類
/// </summary>
/// <param name="taskCount">工作線程數</param>
/// <param name="buffer">DataFlow的BufferBlock</param>
public WordProcessBufferBlock(int taskCount, BufferBlock<WordStream> buffer)
{
_taskCount = taskCount;
this._buffer = buffer;
}
public void StartWord()
{
for (int i = 0; i < _taskCount; i++)
{
_list.Add(Process());
}
}
/// <summary>
/// 等待所有工作完成
/// </summary>
/// <param name="f">完成後的工作函數</param>
public void WaitAll(Action<Dictionary<string,int>> f)
{
Task.WaitAll(_list.ToArray());
foreach (var row in _list)
{
f(row.Result);
}
}
/// <summary>
/// 使用BufferBlock.TryReceive循環從消息裡取從FileBufferBlock發送的buffer
/// </summary>
/// <returns>工作結果</returns>
private async Task<Dictionary<string, int>> Process()
{
Dictionary<string, int> dic = new Dictionary<string, int>();
while (await _buffer.OutputAvailableAsync())
{
WordStream ws;
while (_buffer.TryReceive(out ws))
{
foreach (string value in ws)
{
if (dic.ContainsKey(value))
{
dic[value]++;
}
else
{
dic.Add(value, 1);
}
}
}
}
return dic;
}
}
}
WordStrem是我自己寫的一個單詞枚舉流,繼承了IEnumerable接口,將找單詞的算法寫到枚舉器裡面,實現流化。
// --------------------------------------------------------------------------------------------------------------------
// <copyright file="WordStatistics.cs" company="yada">
// Copyright (c) yada Corporation. All rights reserved.
// </copyright>
// change by qugang 2015.4.18
// 單詞枚舉器:算法從開始找字母,如果不是字母,則返回從pos 到end 的組成單詞
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WordStatistics
{
/// <summary>
/// 單詞枚舉器
/// </summary>
public class WordStream : IEnumerable
{
private char[] buffer;
public WordStream(char[] buffer)
{
this.buffer = buffer;
}
IEnumerator IEnumerable.GetEnumerator()
{
return (IEnumerator)GetEnumerator();
}
public WordStreamEnum GetEnumerator()
{
return new WordStreamEnum(this.buffer);
}
}
public class WordStreamEnum : IEnumerator
{
private char[] buffer;
int pos = 0;
int endCount = 0;
int index = -1;
public WordStreamEnum(char[] buffer)
{
this.buffer = buffer;
}
public bool MoveNext()
{
while (index < buffer.Length - 1)
{
index++;
char buff = buffer[index];
if ((buff >= 'a' && buff <= 'z') || (buff >= 'A' && buff <= 'Z'))
{
if (endCount == 0)
{
pos = index;
endCount++;
}
else
{
endCount++;
}
}
else
{
if (endCount != 0)
return true;
}
if (buff == '\0')
{
return false;
}
}
return false;
}
public object Current
{
get
{
int tempInt = endCount;
endCount = 0;
return new string(buffer, pos, tempInt);
}
}
public void Reset()
{
index = -1;
}
}
}
到這裡就完成了,然後再Main函數裡添加調用
static void Main(string[] args)
{
DateTime dt = DateTime.Now;
var buffer = new BufferBlock<WordStream>();
//創建工作BufferBlock
WordProcessBufferBlock wb = new WordProcessBufferBlock(8, buffer);
wb.StartWord();
//創建讀取文件,發送的BufferBlock
FileBufferBlock fb = new FileBufferBlock(buffer, @"D:\content.txt");
fb.ReadFile();
Dictionary<string,int> dic = new Dictionary<string,int>();
//等待工作完成匯總結果
wb.WaitAll(p =>
{
foreach (var row in p)
{
if (!dic.ContainsKey(row.Key))
dic.Add(row.Key, row.Value);
else
{
dic[row.Key] += row.Value;
}
}
}
);
var myList = dic.ToList();
myList.Sort((p, v) => v.Value.CompareTo(p.Value));
foreach (var row in myList.Take(10))
{
Console.WriteLine(row);
}
Console.WriteLine(DateTime.Now - dt);
}
最後2G的文件,我的機器跑出來是19秒多。
如果代碼沒有包,請從NuGet上下載Dataflow包。
代碼下載:http://files.cnblogs.com/files/qugangf/WordStatistics.rar