本篇系C++ socket網絡爬蟲(1)的姊妹篇,寫網絡爬蟲怎麼能少得了線程呢
源代碼地址:http://files.cnblogs.com/magicsoar/ThreadPoolProject.rar
*需要C++11的支持,在vs2013下編譯通過
運行效果
背景
在傳統的收到任務即創建線程的情況下,我們每收到一個任務,就創建一個線程,執行任務,銷毀線程,
我們把這三個過程所用的時間分別記做T1,T2,T3
任務本身所用的時間僅占T2/(T1+T2+T3),這在任務本身所用時間很短的情況下, 效率是很低的
此外,通常操作系統所能創建的線程數量都是有限的,並不能無限制的創建線程。
而在線程池中,我們通常會預先創建m個線程,放到空閒容器中,當有任務來臨時,線程池會從空閒的線程中挑選一個線程來執行該任務,
在執行完畢後再將其放回空閒容器中
C++11
在C++11中,C++對線程提供了一個很高的抽象,並沒有很好的提供優先級控制等功能,需要調用std::thread::native_handle(),獲取原生線程對象
運行平台特定的操作,但這就喪失了std::thread在不同平台上代碼層面的一致性。
所以在項目中實現了對std::thread二次封裝,並提供了基本的優先級控制
項目概述
項目中有一個主線程,即運行程序時創建的線程可以從用戶那裡獲取任務,還有一個管理線程,用於進行線程池中線程的調度,還有初始化線程池時創建的若干空閒線程,用於執行任務
項目中主要有以下幾個類:
Task:任務類,內有任務的優先級,和一個純虛Run方法,我們需要派生Task,將要完成的任務寫到Run方法中
MyThread:線程類,封裝了C++11的thread,每一個線程可以關聯一個Task對象,執行其Run方法
BusyThreadContainer:工作容器類,采用std::list<MyThread*>實現,儲存工作狀態的線程
IdleThreadContainer:空閒容器類,采用std::vector<MyThread*>實現,儲存處於空閒狀態的線程
TaskContainer:任務容器類,采用priority_queue<Task*>實現,儲存所有用戶添加未執行的任務
MyThreadPool:線程池類,用於從用戶獲取任務,管理任務,實現對線程池中線程的調度
類圖如下
Task類
namespace
{
enum PRIORITY
{
MIN = 1, NORMAL = 25, MAX = 50
};
}
class Task
{
public:
Task()
{
}
void SetPriority(int priority)
{
if (priority>(PRIORITY::MAX))
{
priority = (PRIORITY::MAX);
}
else if (priority>(PRIORITY::MAX))
{
priority = (PRIORITY::MIN);
}
}
virtual void Run() = 0;
protected:
int priority_;
};
void SetPriority(int priority) :設置線程的優先級,數值在1-50之間,值越大,優先級越高
virtual void run() = 0:線程執行的方法,用戶需要重寫為自己的方法
MyThread類
class MyThread
{
friend bool operator==(MyThread my1, MyThread my2);
friend bool operator!=(MyThread my1, MyThread my2);
public:
MyThread(MyThreadPool *pool);
void Assign(Task *Task);
void Run();
void StartThread();
int getthreadid();
void setisdetach(bool isdetach);
private:
MyThreadPool *mythreadpool_;
static int s_threadnumber;
bool isdetach_;
Task *task_;
int threadid_;
std::thread thread_;
};
方法:
MyThread(MyThreadPool *pool):構造一個MyThread對象,將自己與指定的線程池相關聯起來
void Assign(Task *Task):將一個任務與該線程相關聯起來
void Run():調用了Task的Run方法,同時在Task的Run方法結束後將自己從工作容器移回空閒容器
void StartThread():執行線程的Run方法,即執行了Task的Run方法
int getthreadid():獲取線程的id號
void setisdetach(bool isdetach):設置線程在運行的時候是join還是detach的
BusyThreadContainer類
class BusyThreadContainer
{
public:
BusyThreadContainer();
~BusyThreadContainer();
void push(MyThread *m);
std::list<MyThread*>::size_type size();
void erase(MyThread *m);
private:
std::list<MyThread*> busy_thread_container_;
typedef std::list<MyThread*> Container;
typedef Container::iterator Iterator;
};
void push(MyThread *m):將一個線程放入工作容器中
void erase(MyThread *m):刪除一個指定的線程
std::list<MyThread*>::size_type size():返回工作容器的大小
IdleThreadContainer類
class IdleThreadContainer
{
public:
IdleThreadContainer();
~IdleThreadContainer();
std::vector<MyThread*>::size_type size();
void push(MyThread *m);
void assign(int n,MyThreadPool* m);
MyThread* top();
void pop();
void erase(MyThread *m);
private:
std::vector<MyThread*> idle_thread_container_;
typedef std::vector<MyThread*> Container;
typedef Container::iterator Iterator;
};
~IdleThreadContainer(); :負責析構空閒容器中的線程
void push(MyThread *m):將一個線程放回空閒容器中
void assign(int n,MyThreadPool* m):創建n個線程與線程池m相關聯的線程放入空閒容器中
MyThread* top():返回位於空閒容器頂端的線程
void pop():彈出空閒容器頂端的線程
void erase(MyThread *m):刪除一個指定的線程
TaskContainer類
class TaskContainer
{
public:
TaskContainer();
~TaskContainer();
void push(Task *);
Task* top();
void pop();
std::priority_queue<Task*>::size_type size();
private:
std::priority_queue<Task*> task_container_;
};
void push(Task *):將一個任務放入任務容器中
Task* top():返回任務容器頂端的任務
void pop():將任務容器頂端的線程彈出
std::priority_queue<Task*>::size_type size():返回任務容器的大小
MyThreadPool類
class MyThreadPool
{
public:
MyThreadPool(){}
MyThreadPool(int number);
~MyThreadPool();
void AddTask(Task *Task,int priority);
void AddIdleThread(int n);
void RemoveThreadFromBusy(MyThread *myThread);
void Start();
void EndMyThreadPool();private:
BusyThreadContainer busy_thread_container_;
IdleThreadContainer idle_thread_container_;
bool issurvive_;
TaskContainer task_container_;
std::thread thread_this_;
std::mutex busy_mutex_;
std::mutex idle_mutex_;
std::mutex task_mutex_;
int number_of_thread_;
};
MyThreadPool(int number):構造MyThreadPool,創建包含number個線程的空閒容器
void AddTask(Task *Task,int priority):添加一個優先級為priority的任務到任務容器中
void AddIdleThread(int n):在創建n個空閒線程到空閒容器中
void RemoveThreadFromBusy(MyThread *myThread):將一個線程從工作容器中刪除,並移回空閒容器中
void Start():判斷是否有空閒線程,如有將任務從從任務容器中提出,放入空閒容器中,等待執行
void EndMyThreadPool():結束線程池的運行
派生自Task的MyTask類
class MyTask :public Task
{
friend bool operator<(MyTask &lv,MyTask &rv)
{
return lv.priority_ < rv.priority_;
}
public:
MyTask();
~MyTask();
virtual void Run();
void setdata(int d);
private:
int data_;
};
MyTask::MyTask()
{
}
MyTask::~MyTask()
{
}
void MyTask::setdata(int d)
{
data_ = d;
}
void MyTask::Run()
{
std::cout << "Hello I am "<<data_ << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
friend bool operator<(MyTask &lv,MyTask &rv) :用於確定任務在任務容器中的位置
Run:自定義的Run方法
void setdata(int d):設置數據
關鍵代碼分析:
void MyThread::Run()
void MyThread::Run()
{
cout <<"Thread:"<< threadid_ << " run ";
task_->Run();
mythreadpool_->RemoveThreadFromBusy(this);
}
調用了Task的Run方法,同時在Task的Run方法結束後,通知線程池將自己從工作容器中移回空閒容器
void MyThread::StartThread()
void MyThread::StartThread()
{
thread_ = thread(&MyThread::Run, this);
if (isdetach_ == true)
thread_.detach();
else
thread_.join();
}
將MyThread的Run方法與thread_相綁定,this表示類的Run方法的第一個隱含的參數
然後根據isdetach的值,判斷是否detach() or join()
void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)
void MyThreadPool::RemoveThreadFromBusy(MyThread *myThread)
{
busy_mutex_.lock();
cout << "Thread:" << myThread->getthreadid()<< " remove from busylist" << endl;
busy_thread_container_.erase(myThread);
busy_mutex_.unlock();
idle_mutex_.lock();
idle_thread_container_.push(myThread);
idle_mutex_.unlock();
}
將一個線程從任務容器中移除,並將其放回空閒容器中,
使用busy_mutex_和idle_mutex_進行加鎖和解鎖,確保數據的一致性
MyThreadPool::MyThreadPool(int number)
MyThreadPool::MyThreadPool(int number)
{
issurvive_ = true;
number_of_thread_ = number;
idle_thread_container_.assign(number, this);
thread_this_ =thread(&MyThreadPool::Start, this);
thread_this_.detach();
}
MyThreadPool的構造函數,創建number個空閒線程與空閒容器中,同時創建管理線程thread_this,用於進行線程池中線程的調度
void MyThreadPool::Start()
void MyThreadPool::Start()
{
while (true)
{
if (issurvive_==false)
{
busy_mutex_.lock();
if (busy_thread_container_.size()!=0)
{
busy_mutex_.unlock();
continue;
}
busy_mutex_.unlock();
break;
}
idle_mutex_.lock();
if (idle_thread_container_.size() == 0)
{
idle_mutex_.unlock();
continue;
}
idle_mutex_.unlock();
task_mutex_.lock();
if (task_container_.size() == 0)
{
task_mutex_.unlock();
continue;
}
Task *b = task_container_.top();;
task_container_.pop();
task_mutex_.unlock();
idle_mutex_.lock();
MyThread *mythread = idle_thread_container_.top();;
idle_thread_container_.pop();
mythread->Assign(b);
idle_mutex_.unlock();
busy_mutex_.lock();
busy_thread_container_.push(mythread);
busy_mutex_.unlock();
mythread->StartThread();
}
}
管理線程對應的Start方法,內有一個死循環,不停的判斷任務容器中是否有任務,和是否有空閒線程來執行任務,若有,則將任務從
任務容器中提出,從空閒線程中提取出一個空閒線程與其綁定,執行該任務,同時將該線程從空閒容器移動到工作容器中。
當線程池想要結束運行時,即survive為false時,首先要判斷工作容器是否為空,若不為空,則代表還有任務正在被線程執行,線程池不能結束運行
否則可以結束線程池的運行,跳出死循環
int main()
int main()
{
MyThreadPool mythreadPool(10);
MyTask j[50];
for (int i = 0; i < 50;i++)
{
j[i].setdata(i);
}
for (int i = 0; i < 50; i++)
{
mythreadPool.AddTask(&j[i],i);
}
int i;
//按100添加一個任務
//按-1結束線程池
while (true)
{
cin >> i;
if (i == 100)
{
MyTask j;
j.setdata(i);
mythreadPool.AddTask(&j, i);
}
if (i == -1)
{
mythreadPool.EndMyThreadPool();
break;
}
}
system("pause");
}
創建了一個含有10個空閒線程的線程池,和50個MyTask任務,並將其放入線程池中等待運行
在循環中,用戶輸入100可以再添加一個任務到線程池中等待運行,輸入-1結束線程池的運行。
運行結果如下
線程池使用後記
線程池並不是萬能的,線程池減少了創建與銷毀線程本身對任務照成的影響,但如果任務本身的運行時間很長,那麼這些開銷相當於任務本身執行開銷而言是可以忽略的。那麼我們也可以
選擇“即時創建,即時銷毀”的策略
線程池通常適合下面的幾個場合:
(1) 單位時間內處理的任務數較多,且每個任務的執行時間較短
(2) 對實時性要求較高的任務,如果接受到任務後在創建線程,再執行任務,可能滿足不了實時要求,因此必須采用線程池進行預創建。