開發過程中經常會碰到這樣的場景:需要從一個地方獲取一些數據,然後處理數據並將其保存在數據庫中。
private void FetchData() {}
private void SaveData() {}
static void Main(string[] args)
{
for (int i = 0; i < 10; i++)
{
FetchData(); // 獲取數據
SaveData(); // 處理並保存
}
}
例如上述代碼例子這樣順序執行,執行會很慢,原因是獲取數據和處理並保存的過程都可能導致阻塞,然而FetchData()每次取數據並不需要等待上一條數據保存完成再進行。
這樣的場景非常適合用生產者消費者隊列:生產者就是FetchData(),用來生產數據;消費者SaveData(),用來消費數據。
舉個實際例子,我們需要通過一個Web Api獲取一些城市的天氣情況,並將其保存到數據庫中。
實現方式:
下邊是實現的完整代碼:
class Program
{
// 任務隊列
static Queue<string> _tasks = new Queue<string>();
// 為保證線程安全,使用一個鎖來保護_task的訪問
readonly static object _locker = new object();
// 通過 _wh 給工作線程發信號
static EventWaitHandle _wh = new AutoResetEvent(false);
static Thread _worker;
static void Main(string[] args)
{
// 需要獲取天氣情況的城市對應代碼
var cityIds = new List<int> {101280601, 101010100, 101020100, 101110101, 101040100};
// 任務開始,啟動工作線程
_worker = new Thread(Work);
_worker.Start();
// 生產者將數據插入隊裡中,並給工作線程發信號
foreach (var cityId in cityIds)
EnqueueTask(FetchData(cityId));
// 任務結束
Dispose();
}
/// <summary>執行工作</summary>
static void Work()
{
while (true)
{
string work = null;
lock (_locker)
{
if (_tasks.Count > 0)
{
work = _tasks.Dequeue(); // 有任務時,出列任務
if (work == null) // 退出機制:當遇見一個null任務時,代表任務結束
return;
}
}
if (work != null)
SaveData(work); // 任務不為null時,處理並保存數據
else
_wh.WaitOne(); // 沒有任務了,等待信號
}
}
/// <summary>插入任務</summary>
static void EnqueueTask(string task)
{
lock (_locker)
_tasks.Enqueue(task); // 向隊列中插入任務
_wh.Set(); // 給工作線程發信號
}
/// <summary>結束釋放</summary>
static void Dispose()
{
EnqueueTask(null); // 插入一個Null任務,通知工作線程退出
_worker.Join(); // 等待工作線程完成
_wh.Close(); // 釋放資源
}
/// <summary>獲取數據</summary>
static string FetchData(int cityId)
{
var wc = new WebClient { Encoding = Encoding.UTF8 };
var url = string.Format("http://www.weather.com.cn/adat/sk/{0}.html", cityId);
return wc.DownloadString(url);
}
/// <summary>處理保存</summary>
static void SaveData(string data)
{
var weatherInfo = (JsonConvert.DeserializeObject(data, typeof(Dictionary<string, Weatherinfo>)) as Dictionary<string, Weatherinfo>)["weatherinfo"];
Console.WriteLine("[{0}]:{1} 氣溫({2}) 風向({3}) 風力({4})", weatherInfo.Time, weatherInfo.City, weatherInfo.Temp, weatherInfo.Wd, weatherInfo.Ws);
Thread.Sleep(200); // 模擬數據保存
}
}
public class Weatherinfo
{
public string City { get; set; }
public string Temp { get; set; }
public string Time { get; set; }
public string Wd { get; set; }
public string Ws { get; set; }
}
}
解釋:
參考:Threading in C# --> 中文翻譯