程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> 並發數據結構:Stack

並發數據結構:Stack

編輯:關於.NET

在敘述並發Stack前,我們先來了解下非線程安全的Stack。

Stack是一種線性數據結構,只能訪問它的一端來存儲或讀取數據。Stack很像餐廳中的一疊盤子:將 新盤子堆在最上面,並從最上面取走盤子。最後一個堆在上面的盤子第一個被取走。因此Stack也被稱為 後進先出結構(LIFO)。

Stack有兩種實現方式:數組和列表。下面我們分別用這兩種方式來實現一個簡單的Stack。采用數組 實現代碼如下:

using System;
using System.Collections.Generic;

namespace Lucifer.DataStructure
{
   public class ArrayStack<T>
   {
     private T[] array;
     private int topOfStack;
     private const int defaultCapacity = 16;

     public ArrayStack() : this(defaultCapacity)
     {
     }

     public ArrayStack(int initialCapacity)
     {

   if (initialCapacity < 0)
         throw new ArgumentOutOfRangeException();
       array = new T[initialCapacity];
       topOfStack = -1;
     }

     /// <summary>
     /// 查看Stack是否為空
     /// </summary>
     public bool IsEmpty()
     {
       return topOfStack == -1;
     }

     /// <summary>
     /// 進棧
     /// </summary>
     public void Push(T item)
     {
       if (this.Count == array.Length)
       {
         T[] newArray = new T[this.Count == 0 ? defaultCapacity : 2 *  array.Length];
         Array.Copy(array, 0, newArray, 0, this.Count);
         array = newArray;
       }
       this.array[++topOfStack] = item;
     }
     /// <summary>
     /// 出棧
     /// </summary>
     public T Pop()
     {
       if (this.IsEmpty())
         throw new InvalidOperationException("The stack is empty.");
       T popped = array[topOfStack];
       array[topOfStack] = default(T);
       topOfStack--;
       return popped;
     }
     /// <summary>
     /// 返回棧頂,但不刪除棧頂的值。
     /// </summary>
     public T Peek()
     {
       if (this.IsEmpty())
         throw new InvalidOperationException("The stack is empty.");
       return array[topOfStack];
     }
     /// <summary>
     /// Stack內元素的數量
     /// </summary>
     public int Count
     {
       get
       {
         return this.topOfStack + 1;
       }
     }
   }
}

如上面的代碼所示,ArrayStack<T>定義了兩個數據成員: array用來存儲棧的數據項並在需要 時擴展;topOfStack則定位當前棧頂的索引。如果是空棧,該索引值為-1。唯一值得一提的是Push操作。 棧的每個操作時間復雜度都是O(1)。但是Push操作在數組滿載的時候會引起一個數組加倍的操作,這將花 費O(N)的時間。如果這個操作經常發生的話,我們需要考慮改進。然而,實際上這個操作很少發生,因為 包含N個元素的數組加倍只有在至少N/2次不包含數組加倍的Push後才會發生一次。因此,我們可以把加倍 的O(N)代價均談到N/2次簡單的Push操作上,這樣平均每個Push操作的代價只增加了一小點。此外,我們 沒有讓它繼承IEnumerable<T>和ICollection<T>接口,這樣做的目的是避免其他的細節實現 淹沒了我們的主題。.NET的Stack<T>采用的就是我們上述的方法,但繼承的是 IEnumerable<T>,ICollection,IEnumerable。而且它的默認容量是4,而我們這裡的 ArrayStack<T>的默認容量是16。我認為在大內存的現在,16應該是比較合理的數字。D語言的實現 請看http://code.google.com/p/d-phoenix/source/browse/trunk/source/system/collections/Stack.d 。

除了使用數組實現以外,我們還可以使用鏈表實現。鏈表的優勢在於額外的空間僅僅是一個項的引用 。而數組實現所用的額外空間則等於空余的數組項的個數。

為了使鏈表實現可以與數組實現有競爭力,我們必須能夠以常量時間執行鏈表的基本操作。要做到這 點很容易,因為對鏈表的改變僅僅在於鏈表兩端的數據項。具體實現代碼如下:

public class ListStack<T>
   {
     private ListNode<T> topOfStack;

     public bool IsEmpty()
     {
       return topOfStack == null;
     }

     public void Push(T item)
     {
       topOfStack = new ListNode<T>(item, topOfStack);
     }

     public T Pop()
     {
       if (this.IsEmpty())
         throw new InvalidOperationException("The stack is empty.");
       T popped = topOfStack.item;
       topOfStack = topOfStack.next;
       return popped;
     }

     public T Peek()
     {
       if (IsEmpty())
         throw new InvalidOperationException("The stack is empty.");
       return topOfStack.item;
     }

     class ListNode<T>
     {
       internal T item;
       internal ListNode<T> next;

       public ListNode(T initItem, ListNode<T> initNext)
       {
         this.item = initItem;
         this.next = initNext;
       }

       public ListNode(T initItem)
         : this(initItem, null)
       {
       }
     }
   }

這兩種實現的時間復雜度都是O(1)。因此,它們都相當快速,不會成為任何算法的瓶頸。從這點上來 看,使用哪種方式實現都無所謂。

使用數組實現可能比使用鏈表實現稍快一些,尤其是在能夠准確估評估所需要的容量時。如果估計正 確,就不會有數組加倍操作。此外,數組提供的順序訪問通常比由動態內存分配的非順序訪問要快。但是 數組實現也存在著浪費額外內存空間的缺點。這是一個時間-空間取捨的問題。

接下來我們將使用我們在《並發數據結構:迷人的原子》中學習到的CAS原語來構造一個Lock-Free堆棧 。因為CAS原語最多只能交換64Bit,如果采取數組實現方式,幾乎很難實現。因此我們采取鏈表的實現方 式。這只要在需要進行同步的地方采用CAS原語交換就可以了。具體實現代碼如下:

public class LockFreeStack<T>
   {
     private ListNode<T> topOfStack;

     public bool IsEmpty()
     {
       return topOfStack == null;
     }

     public void Push(T item)
     {
       ListNode<T> newTopOfStack = new ListNode<T>(item);
       ListNode<T> oldTopOfStack;
       do
       {
         oldTopOfStack = topOfStack;
         newTopOfStack.next = oldTopOfStack;
       }
       while (Interlocked.CompareExchange<ListNode<T>>(ref topOfStack,  newTopOfStack, oldTopOfStack) != oldTopOfStack);
     }
     /// <summary>
     /// 考慮到在多線程環境中,這裡不拋出異常。我們需要人為判斷其是否為空,即 ! TryPop() or result != null
     /// </summary>
     /// <returns></returns>
     public bool TryPop(out T result)
     {
       ListNode<T> oldTopOfStack;
       ListNode<T> newTopOfStack;
       do
       {
         oldTopOfStack = topOfStack;
         if (oldTopOfStack == null)
         {
           result = default(T);
           return false;
         }
         newTopOfStack = topOfStack.next;
       }
       while(Interlocked.CompareExchange<ListNode<T>>(ref topOfStack,  newTopOfStack, oldTopOfStack) != oldTopOfStack);

       result = oldTopOfStack.item;
       return true;
     }

     public bool TryPeek(out T result)
     {
       ListNode<T> head = topOfStack;
       if (head == null)
       {
         result = default(T);
         return false;
       }
       result = head.item;
       return true;
     }

     /* *****************************************
      * 簡單的單向鏈表實現
      * *****************************************/
     class ListNode<T>
     {
       internal T item;
       internal ListNode<T> next;

       public ListNode(T initItem, ListNode<T> initNext)
       {
         this.item = initItem;
         this.next = initNext;
       }

       public ListNode(T initItem)
         : this(initItem, null)
       {
       }
     }
   }

我們現在已經知道CAS原語有個ABA問題(具體請參考並發數據結構:迷人的原子)。那麼我們上面的 Lock-free代碼有沒有這個問題?這需要我們了解它的本質。CAS比較的其實是一個內存地址,這跟內存回 收機制有著莫大的關聯。C/C++的內存回收策略使得某些時候內存會被重復使用。比方,我們剛剛刪除了 某個類型的實例,如果在此時又有該類型的實例被創建。那麼很有可能這個實例的內存地址就是我們剛剛 被刪除的類型實例的地址。這樣ABA問題就出現了。凡是牽涉到顯式內存管理的地方,我們都要考慮會不 會導致ABA問題。所以用C/C++編寫Lock-free代碼相當的麻煩,我們可能會用CAS2原語或者Hazard Pointers來解決此類問題。但是.NET是有GC的。GC在這裡很好的幫助了我們。關於GC的詳細描述,請參考 《CLR via C#》第20章。這裡簡單描寫下。.NET的托管堆上維護著一個指針,我們稱之為NextObjPtr,它 表示下一個新建對象分配時在托管堆中所處的位置。在.NET中,我們只要new一個對象,NextObjPtr就會 返回對象的內存地址,並且會再次指示下一個新建對象分配時在托管堆中所處的位置。內存回收時,GC有 Mark/Clear以及壓縮階段。此外,.NET的GC還有分代機制。

通過上面的描述,我們就可以知道TryPop()不會有ABA問題。因為它壓根就沒有內存分配和回收。而只 是已經在內存中的對象位置變換而已(這些對象的內存地址肯定不同)。唯一需要考慮的是Push()操作。它 有ABA問題嗎?答案是否定的。因為我們在每次Push操作中只分配新對象,而不刪除老對象。所有等待回 收的對象全部交給GC處理。那麼假如TryPop和Push操作前後交替進行,會發生ABA問題嗎?我的看法是有 可能,但極難發生,發生了也極難重現。但我在http://msdn2.microsoft.com/zh- cn/magazine/cc163427.aspx裡看到的說法是不會發生該問題,而在 http://www.research.ibm.com/people/m/michael/ieeetpds-2004.pdf裡講的是有可能發生。我比較傾向 於後者,因為MSDN的那篇Paper沒有講出個所以然來,但後者也語焉不詳。不過.NET的並行庫中的 ConcurrentStack<T>和Java中還是這樣實現了。因為Lock-Free編碼很難證明其正確性,我們權且 相信它是安全的。如果哪位達人了解的話,在下虛心請教。還望不吝賜教。

在輕度到中度的爭用情況下,上面的代碼比基於鎖的代碼性能高出很多,大概會有3~4倍。因為 CAS 的多數時間都在第一次嘗試時就成功,而發生爭用時的開銷也不涉及線程掛起和上下文切換,只多了幾個 循環迭代。沒有爭用的 CAS 要比沒有爭用的鎖便宜得多,而爭用的 CAS 比爭用的鎖獲取涉及更短的延遲 。

在高度爭用的情況下(即有多個線程不斷爭用一個內存位置的時候),基於鎖的算法開始提供比非阻 塞算法更好的吞吐率,因為當線程阻塞時,它就會停止爭用,耐心地等候輪到自己,從而避免了進一步爭 用。但是,這麼高的爭用程度並不常見,因為多數時候,線程會把線程本地的計算與爭用共享數據的操作 分開,從而給其他線程使用共享數據的機會。(這麼高的爭用程度也表明需要重新檢查算法,朝著更少共 享數據的方向努力。)此外,CAS涉及的內存分配和回收也是阻礙性能的一大因素。

請注意,上述代碼是理想的並發形式:無需阻止其他線程存取數據,只需抱著會在爭用中“勝出”的 信念即可。如果事實證明無法“勝出”,我們將會遇到一些變化不定的問題,例如活鎖。

所以我們將在下一集引入一個新的並發數據結構:SpinLock來構造更好的並發堆棧。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved