多線程並發程序與協同程序其實是不同的概念。多線程並發是多個執行序同時運行,而協同程序是多個執行序列相互協作,同一時刻只有一個執行序列。今天想到的是將兩者結合起來,拿現實生活中的例子來說,假設一個班級有100個學生,一個老師要批改100個學生的作業,有時老師太忙或者趕時間會叫幾個同學幫忙批改,等所有同學都批改完後都交到老師手中,老師在下次上課的時候將作業本一起發給班上的學生。。。。其實在並發編程的時候也可以借鑒這一個思想和模式,特別是網絡服務器開發的過程中,並發與協同經常出現,於是今天寫了一個簡單的程序模擬了這種情形,當然這個程序本身並沒有任何意義,只是記錄下這種思想,個人一直都覺得,程序開發中,思想是最為重要的,用什麼語言來實現只是表現上不同,今天記錄下來,日後的開發過程中,在適當地方以此思想為基礎,根據項目需要進行拓展!
1 //--------------------------------------------------------------
2 開發工具:Visual Studio 2012
3 //---------------------------------------------------------------
4 //C++
5 #include <iostream>
6 #include <memory>
7 #include <thread>
8 #include <mutex>
9 #include <condition_variable>
10 #include <queue>
11 #include <vector>
12
13 using namespace std;
14
15 //windows
16 #include <windows.h>
17
18
19 /************************************************
20 [示例]實現一個多線程方式下的協同工作程序
21
22 當一個線程(相對的主線程)在完成一個任務的時
23 候,有時候為了提高效率,可以充分利用多核CPU的
24 優勢可以將手中的任務分成多個部分,分發給比較
25 空閒的輔助線程來幫助處理,並且主線程要等待所
26 有的輔助線程都處理完成後,對所有任務進行一次
27 匯總,才能進行下一步操作,此時就需要一個同步的
28 多線程協同工作類。
29 *************************************************/
30
31
32 //定義一個求累積和的任務類
33 class CSumTask
34 {
35 public:
36 CSumTask(double dStart,double dEnd);
37 ~CSumTask();
38 double DoTask();
39 double GetResult();
40 private:
41 double m_dMin;
42 double m_dMax;
43 double m_dResult;
44 };
45
46 CSumTask::CSumTask(double dStart,double dEnd):m_dMin(dStart),m_dMax(dEnd),m_dResult(0.0)
47 {
48
49 }
50 CSumTask::~CSumTask()
51 {
52
53 }
54 double CSumTask::DoTask()
55 {
56
57 for(double dNum = m_dMin;dNum <= m_dMax;++dNum)
58 {
59 m_dResult += dNum;
60 }
61 return m_dResult;
62 }
63
64 double CSumTask::GetResult()
65 {
66 return m_dResult;
67 }
68
69
70 //定義一個任務管理者
71 class CTaskManager
72 {
73 public:
74 CTaskManager();
75 ~CTaskManager();
76 size_t Size();
77 void AddTask(const std::shared_ptr<CSumTask> TaskPtr);
78 std::shared_ptr<CSumTask> PopTask();
79 protected:
80 std::queue<std::shared_ptr<CSumTask>> m_queTask;
81 };
82
83 CTaskManager::CTaskManager()
84 {
85
86 }
87
88 CTaskManager::~CTaskManager()
89 {
90
91 }
92
93 size_t CTaskManager::Size()
94 {
95 return m_queTask.size();
96 }
97
98 void CTaskManager::AddTask(const std::shared_ptr<CSumTask> TaskPtr)
99 {
100 m_queTask.push(std::move(TaskPtr));
101 }
102
103 std::shared_ptr<CSumTask> CTaskManager::PopTask()
104 {
105 std::shared_ptr<CSumTask> tmPtr = m_queTask.front();
106 m_queTask.pop();
107 return tmPtr;
108 }
109
110
111 //協同工作線程管理類,負責創建協同工作線程並接受來自主線程委托的任務進行處理
112 class CWorkThreadManager
113 {
114 public:
115 CWorkThreadManager(unsigned int uiThreadSum );
116 ~CWorkThreadManager();
117 bool AcceptTask(std::shared_ptr<CSumTask> TaskPtr);
118 bool StopAll(bool bStop);
119 unsigned int ThreadNum();
120 protected:
121 std::queue<std::shared_ptr<CSumTask>> m_queTask;
122 std::mutex m_muTask;
123 int m_iWorkingThread;
124 int m_iWorkThreadSum;
125 std::vector<std::shared_ptr<std::thread>> m_vecWorkers;
126
127 void WorkThread(int iWorkerID);
128 bool m_bStop;
129 std::condition_variable_any m_condPop;
130 std::condition_variable_any m_stopVar;
131 };
132
133 CWorkThreadManager::~CWorkThreadManager()
134 {
135
136 }
137 unsigned int CWorkThreadManager::ThreadNum()
138 {
139 return m_iWorkThreadSum;
140 }
141
142 CWorkThreadManager::CWorkThreadManager(unsigned int uiThreadSum ):m_bStop(false),m_iWorkingThread(0),m_iWorkThreadSum(uiThreadSum)
143 {
144 //創建工作線程
145 for(int i = 0; i < m_iWorkThreadSum;++i)
146 {
147 std::shared_ptr<std::thread> WorkPtr(new std::thread(&CWorkThreadManager::WorkThread,this,i+1));
148 m_vecWorkers.push_back(WorkPtr);
149 }
150
151 }
152
153 bool CWorkThreadManager::AcceptTask(std::shared_ptr<CSumTask> TaskPtr)
154 {
155 std::unique_lock<std::mutex> muLock(m_muTask);
156 if(m_iWorkingThread >= m_iWorkThreadSum)
157 {
158 return false; //當前已沒有多余的空閒的線程處理任務
159 }
160 m_queTask.push(TaskPtr);
161 m_condPop.notify_all();
162 return true;
163 }
164
165 void CWorkThreadManager::WorkThread(int iWorkerID)
166 {
167 while(!m_bStop)
168 {
169 std::shared_ptr<CSumTask> TaskPtr;
170 bool bDoTask = false;
171 {
172 std::unique_lock<std::mutex> muLock(m_muTask);
173 while(m_queTask.empty() && !m_bStop)
174 {
175 m_condPop.wait(m_muTask);
176 }
177 if(!m_queTask.empty())
178 {
179 TaskPtr = m_queTask.front();
180 m_queTask.pop();
181 m_iWorkingThread++;
182 bDoTask = true;
183 }
184
185 }
186 //處理任務
187 if(bDoTask)
188 {
189 TaskPtr->DoTask();
190 {
191 std::unique_lock<std::mutex> muLock(m_muTask);
192 m_iWorkingThread--;
193 cout<<">>>DoTask in thread ["<<iWorkerID<<"]\n";
194 }
195 }
196 m_stopVar.notify_all();
197 }
198 }
199
200 bool CWorkThreadManager::StopAll(bool bStop)
201 {
202 {
203 std::unique_lock<std::mutex> muLock(m_muTask);
204 while(m_queTask.size()>0 || m_iWorkingThread>0)
205 {
206 m_stopVar.wait(m_muTask);
207 cout<<">>>Waiting finish....\n";
208 }
209 cout<<">>>All task finished!\n";
210
211 }
212
213 m_bStop = true;
214 m_condPop.notify_all();
215 //等待所有線程關閉
216 for(std::vector<std::shared_ptr<std::thread>>::iterator itTask = m_vecWorkers.begin();itTask != m_vecWorkers.end();++itTask)
217 {
218 (*itTask)->join();
219 }
220 return true;
221 }
222
223
224 /**************************************
225 [示例程序說明]
226
227 每個任務對象表示求1+2+....+1000的累
228 積和,現在有2000個這樣的任務,需要將每個
229 任務進行計算,然後將所有的結果匯總求和。
230 利用多線程協同工作類對象輔助完成每
231 個任務結果計算,主線程等待所有結果完成
232 後將所有結果匯總求和。
233 ****************************************/
234
235
236 int main(int arg,char *arv[])
237 {
238
239 std::cout.sync_with_stdio(true);
240 CTaskManager TaskMgr;
241 CWorkThreadManager WorkerMgr(5);
242 std::vector<std::shared_ptr<CSumTask>> vecResultTask;
243
244 for(int i = 0; i < 2000; ++i)
245 {
246 std::shared_ptr<CSumTask> TaskPtr(new CSumTask(1.0,1000.0));
247 TaskMgr.AddTask(TaskPtr);
248 vecResultTask.push_back(TaskPtr);
249 }
250
251 //
252 DWORD dStartTime = ::GetTickCount();
253 while(TaskMgr.Size()>0)
254 {
255 std::shared_ptr<CSumTask> WorkPtr = TaskMgr.PopTask();
256 if(!WorkerMgr.AcceptTask(WorkPtr))
257 {
258 //輔助線程此刻處於忙碌狀態(沒有空閒幫忙),自己處理該任務
259 WorkPtr->DoTask();
260 cout<<">>>DoTask in thread [0]\n";
261 }
262 }
263 WorkerMgr.StopAll(true); //等待所有的任務完成
264
265 //對所有結果求和
266 double dSumResult = 0.0;
267 for(std::vector<std::shared_ptr<CSumTask>>::iterator itTask = vecResultTask.begin();itTask != vecResultTask.end();++itTask)
268 {
269 dSumResult += (*itTask)->GetResult();
270 }
271
272 DWORD dEndTime = ::GetTickCount();
273 cout<<"\n[Status]"<<endl;
274 cout<<"\tEvery task result:"<<vecResultTask[0]->GetResult()<<endl;
275 cout<<"\tTask num:"<<vecResultTask.size()<<endl;
276 cout<<"\tAll result sum:"<<dSumResult;
277 cout<<"\tCast to int,result:"<<static_cast<long long>(dSumResult)<<endl;
278 cout<<"\tWorkthread num:"<<WorkerMgr.ThreadNum()<<endl;
279 cout<<"\tTime of used:"<<dEndTime-dStartTime<<" ms"<<endl;
280 getchar();
281 return 0;
282 }
