最近打算做一個服務器端程序,每來一個客戶端請求新開一個線程進行處理。在網上查了一些資料後,准備使用線程池來做這個東西。使用C++11新的庫處理想線程問題比以前簡單了許多,在網上找到一份線程池的實現,http://blog.csdn.net/goldenhawking/article/details/7919547 這個線程池實現中,每一個線程都維護一個任務隊列,我覺得這麼做不利於任務調度,有可能某個線程的待執行任務列表很長,而其他線程則在休眠。
下面是我自己實現的一個線程池,任務隊列由線程池類來分配,每個線程只有拿到任務的時候才執行,其他時間是阻塞的。另外,如果沒有任務需要執行,那麼分配任務的線程也會阻塞,直到來了新任務。 代碼在VS2013下測試通過。
本人菜鳥一枚,歡迎指出錯誤。
threadInstance.h
1 #pragma once
2
3 #include <iostream>
4 #include <thread>
5 #include <mutex>
6 #include <functional>
7 #include <list>
8 #include <atomic>
9 #include <vector>
10 #include <algorithm>
11 #include <memory>
12 #include <condition_variable>
13
14
15 class ThreadPool;
16
17
18 //thread class
19 class ThreadInstance
20 {
21 public:
22 ThreadInstance(ThreadPool *theadPool)
23 :m_stop(false), m_pthread(nullptr), m_threadPool(theadPool)
24 {
25
26 }
27 virtual ~ThreadInstance()
28 {
29 if (m_pthread != nullptr)
30 {
31 m_pthread->join();
32 delete m_pthread;
33 }
34
35 }
36
37 void begin()
38 {
39 m_pthread = new std::thread(std::bind(&ThreadInstance::run, this));
40 }
41 void run();
42 void join()
43 {
44 m_stop = true;
45 std::lock_guard<std::mutex> lg(m_mutex_task);
46 m_task = nullptr;
47 m_cond_task_ready.notify_one();
48 }
49
50 void set_task(std::function<void(void)> &task)
51 {
52 std::lock_guard<std::mutex> lg(m_mutex_task);
53 m_task = task;
54 }
55
56 public:
57 //condition_variable to wait for task
58 std::condition_variable m_cond_task_ready;
59 protected:
60 //flag used to terminate the thread
61 std::atomic< bool> m_stop;
62 //mutex used by member m_cond_task_ready
63 std::mutex m_mutex_cond;
64 //int m_id;
65
66 //task to be executed
67 std::function<void(void)> m_task;
68 // mutex to protect m_task
69 std::mutex m_mutex_task;
70 //pointer to thread
71 std::thread *m_pthread;
72 // pointer to thread pool
73 ThreadPool *m_threadPool;
74 };
threadInstance.cpp
1 #include "threadInstance.h"
2 #include "threadPool.h"
3
4 void ThreadInstance::run()
5 {
6
7 while (true)
8 {
9 //auto x = std::this_thread::get_id();
10 std::unique_lock<std::mutex> lck(m_mutex_cond);
11 m_cond_task_ready.wait(lck);
12
13 if (m_stop)
14 {
15 break;
16 }
17 m_task();
18
19 //shared_ptr<ThreadInstance> ptr(this);
20 m_threadPool->append_free_thread(this);
21 }
22
23 }
threadPool.h
1 #pragma once
2
3
4 #include <thread>
5 #include <mutex>
6 #include <functional>
7 #include <list>
8 #include <atomic>
9 #include <vector>
10 #include <algorithm>
11 #include <memory>
12 #include <condition_variable>
13
14
15 //semaphore class used to represent free threads and task
16 class Semaphore {
17 public:
18 Semaphore(int value = 1) : count(value)
19 {}
20
21 void wait(){
22 std::unique_lock<std::mutex> lock(m_mutex);
23 if (--count < 0) // count is not enough ?
24 condition.wait(lock); // suspend and wait...
25 }
26 void signal(){
27 std::lock_guard<std::mutex> lock(m_mutex);
28 //if (++count <= 0) // have some thread suspended ?
29 count++;
30 condition.notify_one(); // notify one !
31 }
32
33 private:
34 int count;
35 std::mutex m_mutex;
36 std::condition_variable condition;
37 };
38
39 class ThreadInstance;
40
41
42 //the thread pool class
43 class ThreadPool
44 {
45 public:
46 ThreadPool(int nThreads);
47 ~ThreadPool();
48 public:
49 //total threads;
50 size_t count(){ return m_vec_threads.size(); }
51
52 //wait until all threads is terminated;
53 void join_all();
54
55 //append task to the thread pool
56 void append(std::function< void(void) > func);
57 //start service
58 void start();
59 //append free thread to free thread list
60 void append_free_thread(ThreadInstance* pfThread);
61
62 protected:
63 //function to be execute in a separate thread
64 void start_thread();
65
66 public:
67 //NO. threads
68 int m_n_threads;
69 //flag used to stop the thread pool service
70 std::atomic<bool> m_stop;
71
72 Semaphore m_sem_free_threads;
73 Semaphore m_sem_task;
74
75
76 //list contains all the free threads
77 std::list<ThreadInstance*> m_list_free_threads;
78 //vector contains all the threads
79 std::vector<ThreadInstance* > m_vec_threads;
80
81 //std::mutex m_mutex_list_task;
82 std::list<std::function<void(void)>> m_list_tasks;
83 };
threadPool.cpp
#include "threadPool.h"
#include "threadInstance.h"
std::mutex cond_mutex;
std::condition_variable cond_incoming_task;
ThreadPool::ThreadPool(int nThreads)
:m_n_threads(nThreads), m_sem_free_threads(nThreads), m_sem_task(0), m_stop(false)
{
for (int i = 0; i < nThreads; i++)
{
ThreadInstance* ptr=new ThreadInstance(this);
m_vec_threads.push_back(ptr);
m_list_free_threads.push_back(ptr);
}
}
ThreadPool::~ThreadPool()
{
for (int i = 0; i != m_n_threads; ++i)
{
//m_vec_threads[i]->join();
delete m_vec_threads[i];
}
}
void ThreadPool::start()
{
//to avoid blocking the main thread
std::thread t(std::bind(&ThreadPool::start_thread, this));
t.detach();
}
void ThreadPool::start_thread()
{
for (auto free_thread:m_list_free_threads)
{
free_thread->begin();
}
while (true)
{
//whether there's free thread and existing task
m_sem_free_threads.wait();
m_sem_task.wait();
if (m_stop)
{
break;
}
// take a free thread
ThreadInstance* ptr = m_list_free_threads.front();
m_list_free_threads.pop_front();
ptr->set_task(m_list_tasks.front());
m_list_tasks.pop_front();
// awaken a suspended thread
ptr->m_cond_task_ready.notify_one();
}
}
void ThreadPool::append(std::function< void(void) > func)
{
//std::lock_guard<std::mutex> lg(m_mutex_list_task);
m_list_tasks.push_back(func);
m_sem_task.signal();
}
void ThreadPool::append_free_thread(ThreadInstance* pfThread)
{
//this function only push back thread in free thread list
// it does not need to lock the list
//m_mutex_free_thread.lock();
m_list_free_threads.push_back(pfThread);
//m_mutex_free_thread.unlock();
m_sem_free_threads.signal();
}
void ThreadPool::join_all()
{
std::for_each(m_vec_threads.begin(), m_vec_threads.end(), [this](ThreadInstance* & item)
{
item->join();
});
m_stop = true;
m_sem_free_threads.signal();
m_sem_task.signal();
}
用於測試的main函數
1 #include <iostream>
2 #include <thread>
3 #include <mutex>
4 #include <functional>
5 #include <list>
6 #include <atomic>
7 #include <vector>
8 #include <algorithm>
9 #include <memory>
10 #include <condition_variable>
11
12
13 #include "threadPool.h"
14 //#include <vld.h>
15
16 using namespace std;
17 class A
18 {
19 public:
20 A()
21 {}
22 ~A(){}
23 public:
24 void foo(int k)
25 {
26 //sleep for a while
27 std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 900 + 100));
28 std::cout << "k = " << k << std::endl;
29
30 }
31 };
32
33 //a function which will be executed in sub thread.
34 void hello()
35 {
36 //sleep for a while
37 std::this_thread::sleep_for(std::chrono::milliseconds(100));
38 cout << "hello \n";
39 }
40
41 //let's test the thread.
42 int main()
43 {
44 srand(0);
45
46 ThreadPool g_threadPool(3);
47 A a;
48
49 g_threadPool.append(&hello);
50
51 //append object method with copy-constructor(value-assignment)
52 g_threadPool.append(std::bind(&A::foo, a, 1));
53 g_threadPool.append(std::bind(&A::foo, a, 2));
54 g_threadPool.append(std::bind(&A::foo, a, 3));
55 g_threadPool.append(std::bind(&A::foo, a, 4));
56
57 //auto beg = std::chrono::high_resolution_clock().now();
58
59 g_threadPool.start();
60
61 //std::this_thread::sleep_for(std::chrono::milliseconds(5000));
62
63 g_threadPool.append(&hello);
64 //append object method with address assignment, will cause the objects' member increase.
65 g_threadPool.append(std::bind(&A::foo, &a, 5));
66 g_threadPool.append(std::bind(&A::foo, &a, 6));
67 g_threadPool.append(std::bind(&A::foo, &a, 7));
68 g_threadPool.append(std::bind(&A::foo, &a, 8));
69
70 //std::this_thread::sleep_for(std::chrono::seconds(5));
71 char temp;
72 cin >> temp;
73 if (temp == 'e')
74 {
75 g_threadPool.join_all();
76 }
77
78 //auto end = std::chrono::high_resolution_clock().now();
79 //auto dd = std::chrono::duration_cast<chrono::seconds>(end - beg);
80 //cout << dd.count() << endl;
81
82 return 0;
83 }
問別人問題,還這麼牛叉,真心佩服
不要給線程派任務,讓線程空閒的時候,自己去領任務
戶提供連接,也就是50個線程。多余的其它客戶連接會被阻塞直到有空余的連接出現。其實就是所謂的“線程池”的概念,你可以搜搜這方面的內容,很多很多的。