程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> C++編程 –安全並發訪問容器元素

C++編程 –安全並發訪問容器元素

編輯:C++入門知識

C++編程 –安全並發訪問容器元素


C++ 安全並發訪問容器元素

2014-9-24 flyfish

標准庫STL的vector, deque, list等等不是線程安全的
例如 線程1正在使用迭代器(iterator)讀vector
線程2正在對該vector進行插入操作,使vector重新分配內存,這樣就造成線程1中的迭代器失效

STL的容器
多個線程讀是安全的,在讀的過程中,不能對容器有任何寫入操作
多個線程可以同時對不同的容器做寫入操作。
不能指望任何STL實現來解決線程難題,必須手動做同步控制.

方案1 對vector進行加鎖處理

effective STL給出的Lock框架

template //一個為容器獲取和釋放互斥體的模板  
class Lock
{ //框架;其中的很多細節被省略了  
public:
	Lock(const Container& container) :c(container) 
	{  
		getMutexFor(c); 
		//在構造函數中獲取互斥體 
	} 
	~Lock() 
	{   
		releaseMutexFor(c); 
		//在析構函數中釋放它
	} 
private: const Container& c; 
};  


如果需要實現工業強度,需要做更多的工作。


方案2 微軟的Parallel Patterns Library (PPL)


看MSDN
PPL 提供的功能

1 Task Parallelism: a mechanism to execute several work items (tasks) in parallel
任務並行:一種並行執行若干工作項(任務)的機制

2 Parallel algorithms: generic algorithms that act on collections of data in parallel
並行算法:並行作用於數據集合的泛型算法

3 Parallel containers and objects: generic container types that provide safe concurrent access to their elements
並行容器和對象:提供對其元素的安全並發訪問的泛型容器類型


示例是對斐波那契數列(Fibonacci)的順序計算和並行計算的比較

順序計算是
使用 STL std::for_each 算法
結果存儲在 std::vector 對象中。

並行計算是
使用 PPL Concurrency::parallel_for_each 算法
結果存儲在 Concurrency::concurrent_vector 對象中。
// parallel-fibonacci.cpp
// compile with: /EHsc
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 


using namespace Concurrency;
using namespace std;


// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template 
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}


// Computes the nth Fibonacci number.
int fibonacci(int n)
{
   if(n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}


int wmain()
{
   __int64 elapsed;


   // An array of Fibonacci numbers to compute.
   array a = { 24, 26, 41, 42 };


   // The results of the serial computation.
   vector> results1;


   // The results of the parallel computation.
   concurrent_vector> results2;


   // Use the for_each algorithm to compute the results serially.
   elapsed = time_call([&] 
   {
      for_each (a.begin(), a.end(), [&](int n) {
         results1.push_back(make_tuple(n, fibonacci(n)));
      });
   });   
   wcout << L"serial time: " << elapsed << L" ms" << endl;


   // Use the parallel_for_each algorithm to perform the same task.
   elapsed = time_call([&] 
   {
      parallel_for_each (a.begin(), a.end(), [&](int n) {
         results2.push_back(make_tuple(n, fibonacci(n)));
      });


      // Because parallel_for_each acts concurrently, the results do not 
      // have a pre-determined order. Sort the concurrent_vector object
      // so that the results match the serial version.
      sort(results2.begin(), results2.end());
   });   
   wcout << L"parallel time: " << elapsed << L" ms" << endl << endl;


   // Print the results.
   for_each (results2.begin(), results2.end(), [](tuple& pair) {
      wcout << L"fib(" << get<0>(pair) << L"): " << get<1>(pair) << endl;
   });
}

命名空間Concurrency首字母大寫,一般命名空間全是小寫。

貼一個簡單的示例代碼
使用parallel_for_each 算法計算std::array 對象中每個元素的平方
參數分別是lambda 函數、函數對象和函數指針。
#include "stdafx.h"
#include 
#include 
#include 
using namespace Concurrency;
using namespace std;
using namespace std::tr1;


// Function object (functor) class that computes the square of its input.
template
class SquareFunctor
{
public:
	void operator()(Ty& n) const
	{
		n *= n;
	}
};


// Function that computes the square of its input.
template
void square_function(Ty& n)
{
	n *= n;
}
int _tmain(int argc, _TCHAR* argv[])
{
	// Create an array object that contains 5 values.
	array values = { 1, 2, 3, 4, 5 };


	// Use a lambda function, a function object, and a function pointer to 
	// compute the square of each element of the array in parallel.


	// Use a lambda function to square each element.
	parallel_for_each(values.begin(), values.end(), [](int& n){n *= n;});


	// Use a function object (functor) to square each element.
	parallel_for_each(values.begin(), values.end(), SquareFunctor());


	// Use a function pointer to square each element.
	parallel_for_each(values.begin(), values.end(), &square_function);


	// Print each element of the array to the console.
	for_each(values.begin(), values.end(), [](int& n) { 
		wcout << n << endl;
	});
	return 0;
}

在微軟的concurrent_vector.h文件中有這樣一句
Microsoft would like to acknowledge that this concurrency data structure implementation
is based on Intel implementation in its Threading Building Blocks ("Intel Material").
也就是微軟的concurrent_vector是在Intel 的Threading Building Blocks基礎上實現的。

方案3 Intel TBB(Threading Building Blocks)
Intel TBB 提供的功能
1 直接使用的線程安全容器,比如 concurrent_vector 和 concurrent_queue。
2 通用的並行算法,如 parallel_for 和 parallel_reduce。
3 模板類 atomic 中提供了無鎖(Lock-free或者mutex-free)並發編程支持。

方案4 無鎖數據結構支持庫Concurrent Data Structures (libcds).
地址 http://sourceforge.net/projects/libcds/
下載以後裡面直接有從VC2008到VC2013的編譯環境,依賴於boost庫

方案5 Boost 使用boost.lockfree

boost.lockfree實現了三種無鎖數據結構:


1 boost::lockfree::queue
2 boost::lockfree::stack
3 boost::lockfree::spsc_queue

生產者-消費者
下面的代碼實現的是
實現了一個多寫生成,多消費 隊列。
產生整數,並被4個線程消費

#include 
#include 
#include 


#include 


boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);


boost::lockfree::queue queue(128);


const int iterations = 10000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;


void producer(void)
{
    for (int i = 0; i != iterations; ++i) {
        int value = ++producer_count;
        while (!queue.push(value))
            ;
    }
}


boost::atomic done (false);
void consumer(void)
{
    int value;
    while (!done) {
        while (queue.pop(value))
            ++consumer_count;
    }


    while (queue.pop(value))
        ++consumer_count;
}


int main(int argc, char* argv[])
{
    using namespace std;
    cout << "boost::lockfree::queue is ";
    if (!queue.is_lock_free())
        cout << "not ";
    cout << "lockfree" << endl;


    boost::thread_group producer_threads, consumer_threads;


    for (int i = 0; i != producer_thread_count; ++i)
        producer_threads.create_thread(producer);


    for (int i = 0; i != consumer_thread_count; ++i)
        consumer_threads.create_thread(consumer);


    producer_threads.join_all();
    done = true;


    consumer_threads.join_all();


    cout << "produced " << producer_count << " objects." << endl;
    cout << "consumed " << consumer_count << " objects." << endl;
}





  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved