程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> 多線程中的鎖系統(四)-談談自旋鎖,多線程自旋

多線程中的鎖系統(四)-談談自旋鎖,多線程自旋

編輯:C#入門知識

多線程中的鎖系統(四)-談談自旋鎖,多線程自旋


目錄

一:基礎

二:自旋鎖示例

三:SpinLock

四:繼續SpinLock

五:總結

一:基礎

內核鎖:基於內核對象構造的鎖機制,就是通常說的內核構造模式。用戶模式構造和內核模式構造

           優點:cpu利用最大化。它發現資源被鎖住,請求就排隊等候。線程切換到別處干活,直到接受到可用信號,線程再切回來繼續處理請求。

           缺點:托管代碼->用戶模式代碼->內核代碼損耗、線程上下文切換損耗。

                   在鎖的時間比較短時,系統頻繁忙於休眠、切換,是個很大的性能損耗。

自旋鎖:原子操作+自循環。通常說的用戶構造模式。  線程不休眠,一直循環嘗試對資源訪問,直到可用。

           優點:完美解決內核鎖的缺點。

           缺點:長時間一直循環會導致cpu的白白浪費,高並發競爭下、CPU的消耗特別嚴重。

混合鎖:內核鎖+自旋鎖。 混合鎖是先自旋鎖一段時間或自旋多少次,再轉成內核鎖。

           優點:內核鎖和自旋鎖的折中方案,利用前二者優點,避免出現極端情況(自旋時間過長,內核鎖時間過短)。

           缺點: 自旋多少時間、自旋多少次,這些策略很難把控。 

           ps:操作系統或net框架,這塊算法策略做的已經非常優了,有些API函數也提供了時間及次數可配置項,讓開發者根據需求自行判斷。

 

二:自旋鎖示例

來看下我們自己簡單實現的自旋鎖:

        int signal = 0;
            var li = new List<int>();
            Parallel.For(0, 1000 * 10000, r =>
            {
                while (Interlocked.Exchange(ref signal, 1) != 0)//加自旋鎖
                {
                    //黑魔法
                }
                li.Add(r);
                Interlocked.Exchange(ref signal, 0);  //釋放鎖
            });
            Console.WriteLine(li.Count);
            //輸出:10000000

 

上面就是自旋鎖:Interlocked.Exchange+while

1:定義signal  0可用,1不可用。

2:Parallel模擬並發競爭,原子更改signal狀態。 後續線程自旋訪問signal,是否可用。

3:A線程使用完後,更改signal為0。 剩余線程競爭訪問資源,B線程勝利後,更改signal為1,失敗線程繼續自旋,直到可用。

三:SpinLock

SpinLock是net4.0後系統幫我們實現的自旋鎖,內部做了優化。

 簡單看下實例:

  var li = new List<int>();
            var sl = new SpinLock();
            Parallel.For(0, 1000 * 10000, r =>
            {
                bool gotLock = false;     //釋放成功
                sl.Enter(ref gotLock);    //進入鎖
                li.Add(r);
                if (gotLock) sl.Exit();  //釋放
            });
            Console.WriteLine(li.Count);
            //輸出:10000000

 四:繼續SpinLock

new SpinLock(false)   這個構造函數主要用來幫我們檢查死鎖用,true是開啟。

開啟狀態下,如果發生死鎖會直接拋異常的。

貼了一部分源碼(已折疊),我們來看下:

public void Enter(ref bool lockTaken) { if (lockTaken) { lockTaken = false; throw new System.ArgumentException(Environment.GetResourceString("SpinLock_TryReliableEnter_ArgumentException")); } // Fast path to acquire the lock if the lock is released // If the thread tracking enabled set the new owner to the current thread id // Id not, set the anonymous bit lock int observedOwner = m_owner; int newOwner = 0; bool threadTrackingEnabled = (m_owner & LOCK_ID_DISABLE_MASK) == 0; if (threadTrackingEnabled) { if (observedOwner == LOCK_UNOWNED) newOwner = Thread.CurrentThread.ManagedThreadId; } else if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED) { newOwner = observedOwner | LOCK_ANONYMOUS_OWNED; // set the lock bit } if (newOwner != 0) { #if !FEATURE_CORECLR Thread.BeginCriticalRegion(); #endif #if PFX_LEGACY_3_5 if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner) { lockTaken = true; return; } #else if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner) { // Fast path succeeded return; } #endif #if !FEATURE_CORECLR Thread.EndCriticalRegion(); #endif } //Fast path failed, try slow path ContinueTryEnter(Timeout.Infinite, ref lockTaken); } private void ContinueTryEnter(int millisecondsTimeout, ref bool lockTaken) { long startTicks = 0; if (millisecondsTimeout != Timeout.Infinite && millisecondsTimeout != 0) { startTicks = DateTime.UtcNow.Ticks; } #if !FEATURE_PAL && !FEATURE_CORECLR // PAL doesn't support eventing, and we don't compile CDS providers for Coreclr if (CdsSyncEtwBCLProvider.Log.IsEnabled()) { CdsSyncEtwBCLProvider.Log.SpinLock_FastPathFailed(m_owner); } #endif if (IsThreadOwnerTrackingEnabled) { // Slow path for enabled thread tracking mode ContinueTryEnterWithThreadTracking(millisecondsTimeout, startTicks, ref lockTaken); return; } // then thread tracking is disabled // In this case there are three ways to acquire the lock // 1- the first way the thread either tries to get the lock if it's free or updates the waiters, if the turn >= the processors count then go to 3 else go to 2 // 2- In this step the waiter threads spins and tries to acquire the lock, the number of spin iterations and spin count is dependent on the thread turn // the late the thread arrives the more it spins and less frequent it check the lock avilability // Also the spins count is increaes each iteration // If the spins iterations finished and failed to acquire the lock, go to step 3 // 3- This is the yielding step, there are two ways of yielding Thread.Yield and Sleep(1) // If the timeout is expired in after step 1, we need to decrement the waiters count before returning int observedOwner; //***Step 1, take the lock or update the waiters // try to acquire the lock directly if possoble or update the waiters count SpinWait spinner = new SpinWait(); while (true) { observedOwner = m_owner; if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED) { #if !FEATURE_CORECLR Thread.BeginCriticalRegion(); #endif #if PFX_LEGACY_3_5 if (Interlocked.CompareExchange(ref m_owner, observedOwner | 1, observedOwner) == observedOwner) { lockTaken = true; return; } #else if (Interlocked.CompareExchange(ref m_owner, observedOwner | 1, observedOwner, ref lockTaken) == observedOwner) { return; } #endif #if !FEATURE_CORECLR Thread.EndCriticalRegion(); #endif } else //failed to acquire the lock,then try to update the waiters. If the waiters count reached the maximum, jsut break the loop to avoid overflow if ((observedOwner & WAITERS_MASK) == MAXIMUM_WAITERS || Interlocked.CompareExchange(ref m_owner, observedOwner + 2, observedOwner) == observedOwner) break; spinner.SpinOnce(); } // Check the timeout. if (millisecondsTimeout == 0 || (millisecondsTimeout != Timeout.Infinite && TimeoutExpired(startTicks, millisecondsTimeout))) { DecrementWaiters(); return; } //***Step 2. Spinning //lock acquired failed and waiters updated int turn = ((observedOwner + 2) & WAITERS_MASK) / 2; int processorCount = PlatformHelper.ProcessorCount; if (turn < processorCount) { int processFactor = 1; for (int i = 1; i <= turn * SPINNING_FACTOR; i++) { Thread.SpinWait((turn + i) * SPINNING_FACTOR * processFactor); if (processFactor < processorCount) processFactor++; observedOwner = m_owner; if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED) { #if !FEATURE_CORECLR Thread.BeginCriticalRegion(); #endif int newOwner = (observedOwner & WAITERS_MASK) == 0 ? // Gets the number of waiters, if zero observedOwner | 1 // don't decrement it. just set the lock bit, it is zzero because a previous call of Exit(false) ehich corrupted the waiters : (observedOwner - 2) | 1; // otherwise decrement the waiters and set the lock bit Contract.Assert((newOwner & WAITERS_MASK) >= 0); #if PFX_LEGACY_3_5 if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner) { lockTaken = true; return; } #else if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner) { return; } #endif #if !FEATURE_CORECLR Thread.EndCriticalRegion(); #endif } } } // Check the timeout. if (millisecondsTimeout != Timeout.Infinite && TimeoutExpired(startTicks, millisecondsTimeout)) { DecrementWaiters(); return; } //*** Step 3, Yielding //Sleep(1) every 50 yields int yieldsoFar = 0; while (true) { observedOwner = m_owner; if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED) { #if !FEATURE_CORECLR Thread.BeginCriticalRegion(); #endif int newOwner = (observedOwner & WAITERS_MASK) == 0 ? // Gets the number of waiters, if zero observedOwner | 1 // don't decrement it. just set the lock bit, it is zzero because a previous call of Exit(false) ehich corrupted the waiters : (observedOwner - 2) | 1; // otherwise decrement the waiters and set the lock bit Contract.Assert((newOwner & WAITERS_MASK) >= 0); #if PFX_LEGACY_3_5 if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner) { lockTaken = true; return; } #else if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner) { return; } #endif #if !FEATURE_CORECLR Thread.EndCriticalRegion(); #endif } if (yieldsoFar % SLEEP_ONE_FREQUENCY == 0) { Thread.Sleep(1); } else if (yieldsoFar % SLEEP_ZERO_FREQUENCY == 0) { Thread.Sleep(0); } else { #if PFX_LEGACY_3_5 Platform.Yield(); #else Thread.Yield(); #endif } if (yieldsoFar % TIMEOUT_CHECK_FREQUENCY == 0) { //Check the timeout. if (millisecondsTimeout != Timeout.Infinite && TimeoutExpired(startTicks, millisecondsTimeout)) { DecrementWaiters(); return; } } yieldsoFar++; } } /// <summary> /// decrements the waiters, in case of the timeout is expired /// </summary> private void DecrementWaiters() { SpinWait spinner = new SpinWait(); while (true) { int observedOwner = m_owner; if ((observedOwner & WAITERS_MASK) == 0) return; // don't decrement the waiters if it's corrupted by previous call of Exit(false) if (Interlocked.CompareExchange(ref m_owner, observedOwner - 2, observedOwner) == observedOwner) { Contract.Assert(!IsThreadOwnerTrackingEnabled); // Make sure the waiters never be negative which will cause the thread tracking bit to be flipped break; } spinner.SpinOnce(); } } View Code

從代碼中發現SpinLock並不是我們簡單的實現那樣一直自旋,其內部做了很多優化。  

1:內部使用了Interlocked.CompareExchange保持原子操作, m_owner 0可用,1不可用。

2:第一次獲得鎖失敗後,繼續調用ContinueTryEnter,ContinueTryEnter有三種獲得鎖的情況。 

3:ContinueTryEnter函數第一種獲得鎖的方式。 使用了while+SpinWait,後續再講。

4:第一種方式達到最大等待者數量後,命中走第二種。 繼續自旋 turn * 100次。100這個值是處理器核數(4, 8 ,16)下最好的。

5:第二種如果還不能獲得鎖,走第三種。   這種就有點混合構造的意味了,如下:

    if (yieldsoFar % 40 == 0) 
                    Thread.Sleep(1);
                else if (yieldsoFar % 10 == 0)
                    Thread.Sleep(0);
                else
                    Thread.Yield();

 Thread.Sleep(1) : 終止當前線程,放棄剩下時間片 休眠1毫秒。 退出跟其他線程搶占cpu。當然這個一般會更多,系統無法保證這麼細的時間粒度。

 Thread.Sleep(0):  終止當前線程,放棄剩下時間片。  但立馬還會跟其他線程搶cpu,能不能搶到跟線程優先級有關。

 Thread.Yeild():       結束當前線程。讓出cpu給其他准備好的線程。其他線程ok後或沒有准備好的線程,繼續執行。 跟優先級無關。 

                              Thread.Yeild()還會返回個bool值,是否讓出成功。

 

從源碼中,我們可以學到不少編程技巧。 比如我們也可以使用  自旋+Thread.Yeild()   或 while+Thread.Yeild() 等組合。

 

 五:總結

本章談了自旋鎖的基礎+樓主的經驗。  SpinLock類源碼這塊,只粗淺理解了下,並沒有深究。

測了下SpinLock和自己實現的自旋鎖性能對比(並行添加1000w List<int>()),SpinLock是單純的自旋鎖性能2倍以上。

還測了下lock的性能,是系統SpinLock性能的3倍以上。  可見lock內部自旋的效率更高,可惜看不到monitor.enter CLR實現的代碼。

 

參考資源

http://www.projky.com/dotnet/4.0/System/Threading/SpinLock.cs.html

 

作者:蘑菇先生   出處:http://www.cnblogs.com/mushroom/p/4245529.html

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved