程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> C# 實現生產者消費者隊列,

C# 實現生產者消費者隊列,

編輯:C#入門知識

C# 實現生產者消費者隊列,


開發過程中經常會碰到這樣的場景:需要從一個地方獲取一些數據,然後處理數據並將其保存在數據庫中。

private void FetchData() {}
private void SaveData() {}
static void Main(string[] args)
{
    for (int i = 0; i < 10; i++)
    {
        FetchData();  // 獲取數據
        SaveData();  // 處理並保存
    }
}

例如上述代碼例子這樣順序執行,執行會很慢,原因是獲取數據和處理並保存的過程都可能導致阻塞,然而FetchData()每次取數據並不需要等待上一條數據保存完成再進行。

這樣的場景非常適合用生產者消費者隊列:生產者就是FetchData(),用來生產數據;消費者SaveData(),用來消費數據。

舉個實際例子,我們需要通過一個Web Api獲取一些城市的天氣情況,並將其保存到數據庫中。

實現方式:

下邊是實現的完整代碼:

class Program
    {
        // 任務隊列
        static Queue<string> _tasks = new Queue<string>();
        
        // 為保證線程安全,使用一個鎖來保護_task的訪問
        readonly static object _locker = new object();
        
        // 通過 _wh 給工作線程發信號
        static EventWaitHandle _wh = new AutoResetEvent(false);

        static Thread _worker;

        static void Main(string[] args)
        {
            // 需要獲取天氣情況的城市對應代碼
            var cityIds = new List<int> {101280601, 101010100, 101020100, 101110101, 101040100};
            
            // 任務開始,啟動工作線程
            _worker = new Thread(Work);
            _worker.Start();

            // 生產者將數據插入隊裡中,並給工作線程發信號
            foreach (var cityId in cityIds)
                EnqueueTask(FetchData(cityId));

            // 任務結束
            Dispose();  
        }

        /// <summary>執行工作</summary>
        static void Work()
        {
            while (true)
            {
                string work = null;
                lock (_locker)
                {
                    if (_tasks.Count > 0)
                    {
                        work = _tasks.Dequeue(); // 有任務時,出列任務
                        
                        if (work == null)  // 退出機制:當遇見一個null任務時,代表任務結束
                            return;
                    }
                }

                if (work != null)
                    SaveData(work);  // 任務不為null時,處理並保存數據
                else
                    _wh.WaitOne();   // 沒有任務了,等待信號
            }
        }

        /// <summary>插入任務</summary>
        static void EnqueueTask(string task)
        {
            lock (_locker) 
                _tasks.Enqueue(task);  // 向隊列中插入任務 
            
            _wh.Set();  // 給工作線程發信號
        }
        
        /// <summary>結束釋放</summary>
        static void Dispose()
        {
            EnqueueTask(null);      // 插入一個Null任務,通知工作線程退出
            _worker.Join();         // 等待工作線程完成
            _wh.Close();            // 釋放資源
        }

        /// <summary>獲取數據</summary>
        static string FetchData(int cityId)
        {
            var wc = new WebClient { Encoding = Encoding.UTF8 };
            var url = string.Format("http://www.weather.com.cn/adat/sk/{0}.html", cityId);

            return wc.DownloadString(url);
        }

        /// <summary>處理保存</summary>
        static void SaveData(string data)
        {
            var weatherInfo = (JsonConvert.DeserializeObject(data, typeof(Dictionary<string, Weatherinfo>)) as Dictionary<string, Weatherinfo>)["weatherinfo"];
            
            Console.WriteLine("[{0}]:{1} 氣溫({2}) 風向({3}) 風力({4})", weatherInfo.Time, weatherInfo.City, weatherInfo.Temp, weatherInfo.Wd, weatherInfo.Ws);
            
            Thread.Sleep(200);  // 模擬數據保存
        }
    }

    public class Weatherinfo
    {
        public string City { get; set; }
        public string Temp { get; set; }
        public string Time { get; set; }
        public string Wd { get; set; }
        public string Ws { get; set; }
    }
}

 解釋:

 

參考:Threading in C# --> 中文翻譯

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved