程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> C++並發實戰17:線程安全的stack和queue

C++並發實戰17:線程安全的stack和queue

編輯:C++入門知識

1 線程安全的數據結構有幾個可以注意的地方:當一個線程看見invariants時其他線程不會破壞該invariants,比如一個線程在遍歷訪問vector另一個線程卻在修改vector這就破壞了variants;注意數據結構接口引起的競態,必要的時候將多個操作合並;注意異常的處理;避免局部操作的鎖超出其作用范圍,否則可能引起死鎖;盡可能的縮小臨界區。

線程安全的棧關鍵代碼:

#include 
struct empty_stack: std::exception
{
	const char* what() const throw();
};
template
class threadsafe_stack
{
	private:
		std::stack data;
		mutable std::mutex m;
	public:
		threadsafe_stack(){}
		threadsafe_stack(const threadsafe_stack& other)
		{
			std::lock_guard lock(other.m);
			data=other.data;
		}
		threadsafe_stack& operator=(const threadsafe_stack&) = delete;
		void push(T new_value)
		{
			std::lock_guard lock(m);
			data.push(std::move(new_value)); 
		}
		std::shared_ptr pop()//top和pop合並,采用shared_ptr返回棧頂元素防止元素構造時發生異常
		{
			std::lock_guard lock(m);
			if(data.empty()) throw empty_stack(); 
				std::shared_ptr const res(std::make_shared(std::move(data.top())));//make_shared比shared_ptr直接構造效率高 
			data.pop(); 
			return res;
		}
		void pop(T& value)//采用參數引用返回棧頂元素
		{
			std::lock_guard lock(m);
			if(data.empty()) throw empty_stack();
			value=std::move(data.top()); 
			data.pop(); 
		}
		bool empty() const
		{
			std::lock_guard lock(m);
			return data.empty();
		}
};//這裡還是有死鎖的可能:棧內元素是用戶代碼,若該元素在構造或析構時修改棧則可能發生死鎖,當然這種設計本身就有一定問題,應該從設計本身下手

一個線程安全的queue關鍵代碼:

template
class threadsafe_queue
{
	private:
		mutable std::mutex mut;
		std::queue > data_queue;//隊裡存儲的是shared_ptr這樣可以保證push和pop操作時不會引起構造或析構異常,隊列更加高效
		std::condition_variable data_cond;//采用條件變量同步入隊和出隊操作
	public:
		threadsafe_queue(){}
		void wait_and_pop(T& value)//直至容器中有元素可以刪除
		{
			std::unique_lock lk(mut);
			data_cond.wait(lk,[this]{return !data_queue.empty();});
			value=std::move(*data_queue.front()); 
			data_queue.pop();
		}
		bool try_pop(T& value)//若隊中無元素可以刪除則直接返回false
		{
			std::lock_guard lk(mut);
			if(data_queue.empty())
				return false;
			value=std::move(*data_queue.front()); 
			data_queue.pop();
			return true;
		}
		std::shared_ptr wait_and_pop()
		{
			std::unique_lock lk(mut);
			data_cond.wait(lk,[this]{return !data_queue.empty();});
			std::shared_ptr res=data_queue.front(); 
			data_queue.pop();
			return res;
		}
		std::shared_ptr try_pop()
		{
			std::lock_guard lk(mut);
			if(data_queue.empty())
				return std::shared_ptr();
			std::shared_ptr res=data_queue.front(); 
			data_queue.pop();
			return res;
		}
		void push(T new_value)
		{
			std::shared_ptr data(std::make_shared(std::move(new_value)));//數據的構造在臨界區外從而縮小臨界區,並且不會在臨界區拋出異常
			std::lock_guard lk(mut);
			data_queue.push(data);
			data_cond.notify_one();
		}
		bool empty() const
		{
			std::lock_guard lk(mut);
			return data_queue.empty();
		}
};

上面的采用一個mutex保護整個queue使得線程迫使線程順序化,為了減小鎖的粒度,使用鏈表作為queue的底層數據結構,並采用一個虛擬節點使head和tail分開:

template
class threadsafe_queue
{
  private:
     struct node
     {
        std::shared_ptr data;//通過shared_ptr管理資源T,那麼資源的初始化和析構都在臨界區外進行
        std::unique_ptr next;
     };
     std::mutex head_mutex;
     std::unique_ptr head;
     std::mutex tail_mutex;
     node* tail;
     node* get_tail(//返回tail用於判斷head==tail
     {
        std::lock_guard tail_lock(tail_mutex);
        return tail;
     }
     std::unique_ptr pop_head()/刪除隊首元素並返回該元素
     {
        std::lock_guard head_lock(head_mutex);
        if(head.get()==get_tail())//判斷隊是否為空,get_tail()必選在head_mutex保護下,試想多個線程都在pop那麼會出現什麼情形?
        {
           return nullptr;
        }
        std::unique_ptr old_head=std::move(head);
        head=std::move(old_head->next);
        return old_head;
     }
  public:
     threadsafe_queue():
     head(new node),tail(head.get())//構造函數創建一個虛擬節點,初始時head和tail都指向這個虛擬節點,這也是判斷queue為空的條件head==tail都指向同一個虛擬節點
     {}
     threadsafe_queue(const threadsafe_queue& other)=delete;
     threadsafe_queue& operator=(const threadsafe_queue& other)=delete;//為了簡化代碼禁止拷貝和賦值
     std::shared_ptr try_pop()//
     {
        std::unique_ptr old_head=pop_head();
        return old_head?old_head->data:std::shared_ptr();
     }
     void push(T new_value)//向隊列添加一個元素,T的實例在臨界區外創建即使拋出異常queue也沒有被修改,而且加速多個線程的添加操作
     {
        std::shared_ptr new_data(std::make_shared(std::move(new_value)));//注意make_shared可以提高效率,make_shared()函數要比直接創建shared_ptr對象的方式快且高效,因為它內部僅分配一次內存,消除了shared_ptr 構造時的開銷
        std::unique_ptr p(new node);//創建一個虛擬節點,tail始終指向一個虛擬節點從而和head分開(隊列中有元素時),防止隊列中只有元素時pop和top都操作的tail和head(若沒有虛擬節點此時tail和head都是同一個節點)
        node* const new_tail=p.get();
        std::lock_guard tail_lock(tail_mutex);
        tail->data=new_data;
        tail->next=std::move(p);
        tail=new_tail;
     }
};
關於上述隊列的幾點說明:

1) tail->next==nullptr;tail->data==nullptr//tail始終指向一個虛擬節點
2) head==tail表示隊列為空
3) head->next==tail表示隊中只有一個元素
4) 隊列中真實的元素x滿足x!=tail, x->data指向T的實例,x->next指向下一個節點, x->next==tail表示x的隊列中最後一個真實的節點

5) 采用兩個mutex使得鎖的粒度減小提高了並發性能


一個相對完整的線程安全queue版本:

template
class threadsafe_queue
{
  public:
    threadsafe_queue():head(new node),tail(head.get()){}
    threadsafe_queue(const threadsafe_queue& other)=delete;
    threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
    std::shared_ptr try_pop()
    {
        std::unique_ptr const old_head=try_pop_head();
        return old_head?old_head->data:std::shared_ptr();
    }
    bool try_pop(T& value)
    {
        std::unique_ptr const old_head=try_pop_head(value);
        return old_head;
    }
    std::shared_ptr wait_and_pop()
    {
        std::unique_ptr const old_head=wait_pop_head();
        return old_head->data;
    }
    void wait_and_pop(T& value)
    {
        std::unique_ptr const old_head=wait_pop_head(value);
    }
    void empty()
    {
        std::lock_guard head_lock(head_mutex);
        return (head==get_tail());
    }
    void push(T new_value)
    {
        std::shared_ptr new_data(std::make_shared(std::move(new_value)));
        std::unique_ptr p(new node);
        {
            std::lock_guard tail_lock(tail_mutex);
            tail->data=new_data;
            node* const new_tail=p.get();
            tail->next=std::move(p);
            tail=new_tail;
        }
        data_cond.notify_one();
   }
   
  private:
    node* get_tail()
    {
        std::lock_guard tail_lock(tail_mutex);
        return tail;
    }
    std::unique_ptr pop_head()
    {
        std::unique_ptr const old_head=std::move(head);
        head=std::move(old_head->next);
        return old_head;
    }
    std::unique_lock wait_for_data()
    {
        std::unique_lock head_lock(head_mutex);
        data_cond.wait(head_lock,[&]{return head!=get_tail();});
        return std::move(head_lock);
    }
    std::unique_ptr wait_pop_head()
    {
        std::unique_lock head_lock(wait_for_data());
        return pop_head();
    }
    std::unique_ptr wait_pop_head(T& value)
    {
        std::unique_lock head_lock(wait_for_data());
        value=std::move(*head->data);
        return pop_head();
    }
    std::unique_ptr try_pop_head()
    {
        std::lock_guard head_lock(head_mutex);
        if(head.get()==get_tail())
        {
            return std::unique_ptr();
        }
        return pop_head();
    }
    std::unique_ptr try_pop_head(T& value)
    {
        std::lock_guard head_lock(head_mutex);
        if(head.get()==get_tail())
        {
            return std::unique_ptr();
        }
        value=std::move(*head->data);
        return pop_head();
    }   

  private:
    struct node
    {
        std::shared_ptr data;
        std::unique_ptr next;
    };
    
    std::mutex head_mutex;
    std::unique_ptr head;
    std::mutex tail_mutex;
    node* tail;
    std::condition_variable data_cond;
};






  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved