程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> 分析.Net裡線程同步機制

分析.Net裡線程同步機制

編輯:C#入門知識

           我們知道並行編程模型兩種:一種是基於消息式的,第二種是基於共享內存式的。 前段時間項目中遇到了第二種 使用多線程開發並行程序共享資源的問題 ,今天以實際案例出發對.net裡的共享內存式的線程同步機制做個總結,由於某些類庫的應用屬於基礎,所以本次不對基本使用做出講解,基本使用 MSDN是最好的教程。

   一、volatile關鍵字

    基本介紹: 封裝了 Thread.VolatileWrite() 和  Thread.VolatileRead()的實現 ,主要作用是強制刷新高速緩存。

    使用場景: 適用於在多核多CPU的機器上 解決變量在內存和高速緩存同步不及時的問題。

    案例:參考下文   二、原子操作的 案例 或者 System.Collections.Concurrent命名空間下的 ConcurrentQueue ,ConcurrentDictionary  等並發集合的實現方式。       

  二、原子操作(Interlock)

   基本介紹: 原子操作是 實現Spinlock,Monitor,ReadWriterLock鎖的基礎,其實現原理是在計算機總線上標志一個信號來表示資源已經被占用 如果其他指令進行修改則等待本次操作完成後才能進行,因為原子操作是在硬件上實現的 所以速度非常快,大約在50個時鐘周期。其實原子操作也可以看做一種鎖。

   使用場景:性能要求較高的場合,需要對字段進行快速的同步或者對變量進行原子形式的跟新操作(例如:int b=0;  b=b+1  實際分解為多條匯編指令,在多線程情況下 多條匯編指令並行的執行可能導致錯誤的結果,所以要保證執行 b=b+1 生成的匯編指令是一個原子形式執行 ),例如實現一個並行隊列,異步隊列等。

   案例:一個基於事件觸發機制隊列的實現

/// <summary>
    /// 表示一個實時處理隊列
    /// </summary>
    public class ProcessQueue<T>
    {
         #region [成員]

        private ConcurrentQueue<IEnumerable<T>> queue;

        private Action<IEnumerable<T>> PublishHandler;

        //指定處理的線程數
        private int core = Environment.ProcessorCount;

        //正在運行的線程數
        private int runingCore = 0;

        public event Action<Exception> OnException;

        //隊列是否正在處理數據
        private int isProcessing=0; 

        //隊列是否可用
        private bool enabled = true;

        #endregion

        #region 構造函數

        public ProcessQueue(Action<IEnumerable<T>> handler)
        {

            queue = new ConcurrentQueue<IEnumerable<T>>();
        
            PublishHandler = handler;
            this.OnException += ProcessException.OnProcessException;
        }

        #endregion

        #region [方法]

        /// <summary>
        /// 入隊
        /// </summary>
        /// <param name="items">數據集合</param>
        public void Enqueue(IEnumerable<T> items)
        {
            if (items != null)
            {
                queue.Enqueue(items);
            }

            //判斷是否隊列有線程正在處理 
            if (enabled && Interlocked.CompareExchange(ref isProcessing, 1, 0) == 0)
            {
                if (!queue.IsEmpty)
                {
                    ThreadPool.QueueUserWorkItem(ProcessItemLoop);
                }
                else
                {
                    Interlocked.Exchange(ref isProcessing, 0);
                }
            }
        }

        /// <summary>
        /// 開啟隊列數據處理
        /// </summary>
        public void Start()
        {
            Thread process_Thread = new Thread(PorcessItem);
            process_Thread.IsBackground = true;
            process_Thread.Start();
        }

        /// <summary>
        /// 循環處理數據項
        /// </summary>
        /// <param name="state"></param>
        private void ProcessItemLoop(object state)
        {
            //表示一個線程遞歸 當處理完當前數據時 則開起線程處理隊列中下一條數據 遞歸終止條件是隊列為空時
            //但是可能會出現 隊列有數據但是沒有線程去處理的情況 所有一個監視線程監視隊列中的數據是否為空,如果為空
            //並且沒有線程去處理則開啟遞歸線程

            if (!enabled && queue.IsEmpty)
            {
                Interlocked.Exchange(ref isProcessing, 0);
                return;
            }

            //處理的線程數 是否小於當前CPU核數
            if (Thread.VolatileRead(ref runingCore) <= core * 2*)
            {
                IEnumerable<T> publishFrame;
                //出隊以後交給線程池處理
                if (queue.TryDequeue(out publishFrame))
                {
                    Interlocked.Increment(ref runingCore);
                    try
                    {
                        PublishHandler(publishFrame);
                        
                        if (enabled && !queue.IsEmpty)
                        {    
                            ThreadPool.QueueUserWorkItem(ProcessItemLoop);
                        }
                        else
                        {
                            Interlocked.Exchange(ref isProcessing, 0);
                        }
                        
                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex);
                    }

                    finally
                    {
                        Interlocked.Decrement(ref runingCore);
                    }
                }
            }

        }

        /// <summary>
        ///定時處理幀 線程調用函數  
        ///主要是監視入隊的時候線程 沒有來的及處理的情況
        /// </summary>
        private void PorcessItem(object state)
        {
            int sleepCount=0;
            int sleepTime = 1000;
            while (enabled)
            {
                //如果隊列為空則根據循環的次數確定睡眠的時間
                if (queue.IsEmpty)
                {
                    if (sleepCount == 0)
                    {
                        sleepTime = 1000;
                    }
                    else if (sleepCount == 3)
                    {
                        sleepTime = 1000 * 3;
                    }
                    else if (sleepCount == 5)
                    {
                        sleepTime = 1000 * 5;
                    }
                    else if (sleepCount == 8)
                    {
                        sleepTime = 1000 * 8;
                    }
                    else if (sleepCount == 10)
                    {
                        sleepTime = 1000 * 10;
                    }
                    else
                    {
                        sleepTime = 1000 * 50;
                    }
                    sleepCount++;
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    //判斷是否隊列有線程正在處理 
                    if (enabled && Interlocked.CompareExchange(ref isProcessing, 1, 0) == 0)
                    {
                        if (!queue.IsEmpty)
                        {
                            ThreadPool.QueueUserWorkItem(ProcessItemLoop);
                        }
                        else
                        {
                            Interlocked.Exchange(ref isProcessing, 0);
                        }
                        sleepCount = 0;
                        sleepTime = 1000;
                    }
                }
            }
        }

        /// <summary>
        /// 停止隊列
        /// </summary>
        public void Stop()
        {
            this.enabled = false;

        }

        /// <summary>
        /// 觸發異常處理事件
        /// </summary>
        /// <param name="ex">異常</param>
        private void OnProcessException(Exception ex)
        {
            var tempException = OnException;
            Interlocked.CompareExchange(ref tempException, null, null);

            if (tempException != null)
            {
                OnException(ex);
            }
        }

        #endregion

    }

 

  三、自旋鎖(Spinlock)

    基本介紹:  在原子操作基礎上實現的鎖,用戶態的鎖,缺點是線程一直不釋放CPU時間片。操作系統進行一次線程用戶態到內核態的切換大約需要500個時鐘周期,可以根據這個進行參考我們的線程是進行用戶等待還是轉到內核的等待.。

    使用場景:線程等待資源時間較短的情況下使用。

    案例: 和最常用的Monitor 使用方法一樣  這裡就不舉例了,在實際場景中應該優先選擇使用Monitor,除非是線程等待資源的時間特別的短

 

  四、監視器(Monitor)

         基本介紹:  原子操作基礎上實現的鎖,開始處於用戶態,自旋一段時間進入內核態的等待釋放CPU時間片,缺點使用不當容易造成死鎖    c#實現的關鍵字是Lock。         

         使用場景:  所有需要加鎖的場景都可以使用。

         案例: 案例太多了,這裡就不列出了。

 五、讀寫鎖(ReadWriterLock)

  原理分析:   原子操作基礎上實現的鎖,

  使用場景:適用於寫的次數少,讀的頻率高的情況。

  案例:一個線程安全的緩存實現(.net 4.0 可以使用基礎類庫中的  ConcurrentDictionary<K,V>)  注意:老版本ReaderWriterLock已經被淘汰,新版的是ReaderWriterLockSlim

class CacheManager<K, V>
    {
        #region [成員]

        private ReaderWriterLockSlim readerWriterLockSlim;

        private Dictionary<K, V> containter;

        #endregion

        #region [構造函數]

        public CacheManager()
        {
            this.readerWriterLockSlim = new ReaderWriterLockSlim();
            this.containter = new Dictionary<K, V>();
        }

        #endregion

        #region [方法]

        public void Add(K key, V value)
        {
            readerWriterLockSlim.EnterWriteLock();

            try
            {
                containter.Add(key, value);
            }

            finally
            {
                readerWriterLockSlim.ExitWriteLock();
            }
        }

        public V Get(K key)
        {

            bool result = false;
            V value;

            do
            {
                readerWriterLockSlim.EnterReadLock();

                try
                {
                    result = containter.TryGetValue(key, out value);
                }

                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }

            } while (!result);

            return value;
        }

        #endregion
    }

  .net中還有其他的線程同步機制:ManualResetEventSlim ,AutoResetEvent ,SemaphoreSlim 這裡就逐個進行不介紹 具體在《CLR Via C# 》中解釋的非常詳細,但在具體的實際開發中我還沒有使用到。

最好的線程同步機制是沒有同步,這取決於良好的設計,當然有些情況下無法避免使用鎖。 在性能要求不高的場合基本的lock就能滿足要求,但性能要求比較苛刻的情就需求更具實際場景進行選擇哪種線程同步機制。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved