程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> 使用C++11 實現的線程池,線程池

使用C++11 實現的線程池,線程池

編輯:C++入門知識

使用C++11 實現的線程池,線程池


  最近打算做一個服務器端程序,每來一個客戶端請求新開一個線程進行處理。在網上查了一些資料後,准備使用線程池來做這個東西。使用C++11新的庫處理想線程問題比以前簡單了許多,在網上找到一份線程池的實現,http://blog.csdn.net/goldenhawking/article/details/7919547    這個線程池實現中,每一個線程都維護一個任務隊列,我覺得這麼做不利於任務調度,有可能某個線程的待執行任務列表很長,而其他線程則在休眠。

下面是我自己實現的一個線程池,任務隊列由線程池類來分配,每個線程只有拿到任務的時候才執行,其他時間是阻塞的。另外,如果沒有任務需要執行,那麼分配任務的線程也會阻塞,直到來了新任務。 代碼在VS2013下測試通過。

 本人菜鳥一枚,歡迎指出錯誤。

threadInstance.h

 1 #pragma once
 2 
 3 #include <iostream>
 4 #include <thread>
 5 #include <mutex>
 6 #include <functional>
 7 #include <list>
 8 #include <atomic>
 9 #include <vector>
10 #include <algorithm>
11 #include <memory>
12 #include <condition_variable>
13 
14 
15 class ThreadPool;
16 
17 
18 //thread class 
19 class ThreadInstance
20 {
21 public:
22     ThreadInstance(ThreadPool *theadPool)
23         :m_stop(false), m_pthread(nullptr), m_threadPool(theadPool)
24     {
25 
26     }
27     virtual ~ThreadInstance()
28     {
29         if (m_pthread != nullptr)
30         {
31             m_pthread->join();
32             delete m_pthread;
33         }
34 
35     }
36 
37     void begin()
38     {
39         m_pthread = new std::thread(std::bind(&ThreadInstance::run, this));
40     }
41     void run();
42     void join()
43     {
44         m_stop = true;
45         std::lock_guard<std::mutex> lg(m_mutex_task);
46         m_task = nullptr;
47         m_cond_task_ready.notify_one();
48     }
49 
50     void set_task(std::function<void(void)> &task)
51     {
52         std::lock_guard<std::mutex> lg(m_mutex_task);
53         m_task = task;
54     }
55 
56 public:
57     //condition_variable to wait for task
58     std::condition_variable m_cond_task_ready;
59 protected:
60     //flag used to terminate the thread
61     std::atomic< bool> m_stop;
62     //mutex used by member  m_cond_task_ready
63     std::mutex m_mutex_cond;     
64     //int m_id;
65 
66     //task to be executed
67     std::function<void(void)> m_task;
68     // mutex to protect m_task
69     std::mutex m_mutex_task;  
70     //pointer to thread 
71     std::thread *m_pthread;
72     // pointer to thread pool
73     ThreadPool *m_threadPool;
74 };

threadInstance.cpp

 1 #include "threadInstance.h"
 2 #include "threadPool.h"
 3 
 4 void ThreadInstance::run()
 5 {
 6     
 7     while (true)
 8     {
 9         //auto x = std::this_thread::get_id();
10         std::unique_lock<std::mutex> lck(m_mutex_cond);        
11         m_cond_task_ready.wait(lck);
12 
13         if (m_stop)
14         {
15             break;
16         }
17         m_task();
18         
19         //shared_ptr<ThreadInstance> ptr(this);
20         m_threadPool->append_free_thread(this);
21     }
22 
23 }

threadPool.h

 1 #pragma once
 2 
 3 
 4 #include <thread>
 5 #include <mutex>
 6 #include <functional>
 7 #include <list>
 8 #include <atomic>
 9 #include <vector>
10 #include <algorithm>
11 #include <memory>
12 #include <condition_variable>
13 
14 
15 //semaphore class used to represent free threads and task 
16 class Semaphore {
17 public:
18     Semaphore(int value = 1) : count(value)
19     {}
20 
21     void wait(){
22         std::unique_lock<std::mutex> lock(m_mutex);
23         if (--count < 0) // count is not enough ?
24             condition.wait(lock); // suspend and wait...
25     }
26     void signal(){
27         std::lock_guard<std::mutex> lock(m_mutex);
28        //if (++count <= 0) // have some thread suspended ?
29         count++;
30         condition.notify_one(); // notify one !
31     }
32 
33 private:
34     int count;
35     std::mutex m_mutex;
36     std::condition_variable condition;
37 };
38 
39 class ThreadInstance;
40 
41 
42 //the thread pool class
43 class ThreadPool
44 {
45 public:        
46     ThreadPool(int nThreads);
47     ~ThreadPool();
48 public:
49     //total threads;
50     size_t count(){ return m_vec_threads.size(); }
51 
52     //wait until all threads is terminated;
53     void join_all();
54 
55     //append task to the thread pool
56     void append(std::function< void(void) > func);
57     //start service 
58     void start();
59     //append free thread to free thread list
60     void append_free_thread(ThreadInstance* pfThread);
61 
62 protected:
63     //function to be execute in a separate thread 
64     void start_thread();    
65     
66 public:
67     //NO. threads
68     int m_n_threads;
69    //flag used to stop the thread pool service
70     std::atomic<bool> m_stop;
71 
72     Semaphore m_sem_free_threads;
73     Semaphore m_sem_task;
74         
75 
76     //list contains all the free threads
77     std::list<ThreadInstance*> m_list_free_threads;
78     //vector contains all the threads
79     std::vector<ThreadInstance*  > m_vec_threads;
80 
81     //std::mutex m_mutex_list_task;
82     std::list<std::function<void(void)>> m_list_tasks;
83 };

 

threadPool.cpp

#include "threadPool.h"
#include "threadInstance.h"


std::mutex cond_mutex;
std::condition_variable cond_incoming_task;

ThreadPool::ThreadPool(int nThreads)
:m_n_threads(nThreads), m_sem_free_threads(nThreads), m_sem_task(0), m_stop(false)
{    
    for (int i = 0; i < nThreads; i++)
    {
        ThreadInstance*  ptr=new ThreadInstance(this);
        m_vec_threads.push_back(ptr);
        m_list_free_threads.push_back(ptr);
    }

}

ThreadPool::~ThreadPool()
{
    for (int i = 0; i != m_n_threads; ++i)
    {
        //m_vec_threads[i]->join();
        delete m_vec_threads[i];
    }
}

void ThreadPool::start()
{
    //to avoid blocking the main thread
   std::thread t(std::bind(&ThreadPool::start_thread, this));
   t.detach();
}
void ThreadPool::start_thread()
{
    for (auto free_thread:m_list_free_threads)
    {
        free_thread->begin();
    }

    while (true)
    {
        //whether there's free thread and existing task
        m_sem_free_threads.wait();
        m_sem_task.wait();
       
        if (m_stop)
        {
            break;
        }
       
        // take a free thread
        ThreadInstance* ptr = m_list_free_threads.front();
        m_list_free_threads.pop_front();
        
        ptr->set_task(m_list_tasks.front());
        m_list_tasks.pop_front();
                
        // awaken a suspended thread  
        ptr->m_cond_task_ready.notify_one();            
    }
}
void ThreadPool::append(std::function< void(void) > func)
{
    //std::lock_guard<std::mutex> lg(m_mutex_list_task);
    m_list_tasks.push_back(func);    
    m_sem_task.signal();
}

void ThreadPool::append_free_thread(ThreadInstance* pfThread)
{
    //this function only push back thread in free thread list
    // it does not need to lock the list

    //m_mutex_free_thread.lock();
    m_list_free_threads.push_back(pfThread);    
    //m_mutex_free_thread.unlock();
    m_sem_free_threads.signal();

    
}

void ThreadPool::join_all()
{
    std::for_each(m_vec_threads.begin(), m_vec_threads.end(), [this](ThreadInstance* & item)
    {
        item->join();
    });

    m_stop = true;
    m_sem_free_threads.signal();
    m_sem_task.signal();

}

 

用於測試的main函數

 

 1 #include <iostream>
 2 #include <thread>
 3 #include <mutex>
 4 #include <functional>
 5 #include <list>
 6 #include <atomic>
 7 #include <vector>
 8 #include <algorithm>
 9 #include <memory>
10 #include <condition_variable>
11 
12 
13 #include "threadPool.h"
14 //#include <vld.h>
15 
16 using namespace std;
17 class A
18 {
19 public:
20     A()
21     {}
22     ~A(){}
23 public:
24     void foo(int k)
25     {
26         //sleep for a while
27         std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 900 + 100));
28         std::cout << "k = " << k << std::endl;
29 
30     }
31 };
32 
33 //a function which will be executed in sub thread.
34 void hello()
35 {
36     //sleep for a while
37     std::this_thread::sleep_for(std::chrono::milliseconds(100));
38     cout << "hello  \n";
39 }
40 
41 //let's test the thread.
42 int main()
43 {
44     srand(0);
45 
46     ThreadPool g_threadPool(3);
47     A a;
48 
49     g_threadPool.append(&hello);
50 
51     //append object method with copy-constructor(value-assignment)    
52     g_threadPool.append(std::bind(&A::foo, a, 1));
53     g_threadPool.append(std::bind(&A::foo, a, 2));
54     g_threadPool.append(std::bind(&A::foo, a, 3));
55     g_threadPool.append(std::bind(&A::foo, a, 4));
56     
57     //auto beg = std::chrono::high_resolution_clock().now();    
58 
59     g_threadPool.start();
60 
61     //std::this_thread::sleep_for(std::chrono::milliseconds(5000));   
62 
63     g_threadPool.append(&hello);
64     //append object method with address assignment, will cause the objects' member increase.
65     g_threadPool.append(std::bind(&A::foo, &a, 5));
66     g_threadPool.append(std::bind(&A::foo, &a, 6));
67     g_threadPool.append(std::bind(&A::foo, &a, 7));
68     g_threadPool.append(std::bind(&A::foo, &a, 8));
69     
70     //std::this_thread::sleep_for(std::chrono::seconds(5));
71     char temp;
72     cin >> temp;
73     if (temp == 'e')
74     {
75         g_threadPool.join_all();
76     }
77     
78     //auto end = std::chrono::high_resolution_clock().now();
79     //auto dd = std::chrono::duration_cast<chrono::seconds>(end - beg);
80     //cout << dd.count() << endl;
81 
82     return 0;
83 }

 


C語言怎使用線程池中的某個線程

問別人問題,還這麼牛叉,真心佩服

不要給線程派任務,讓線程空閒的時候,自己去領任務
 

windows C語言實現線程池

戶提供連接,也就是50個線程。多余的其它客戶連接會被阻塞直到有空余的連接出現。其實就是所謂的“線程池”的概念,你可以搜搜這方面的內容,很多很多的。
 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved