引言
線程池很普通的老話題,討論的很多.深入的不多,也就那些基礎庫中才能見到這種精妙完備的技巧.而本文隨大流
想深入簡述一種高效控制性強的一種線程池實現.
先引入一個概念, 驚群. 簡單舉個例子. 春天來了, 公園出現了很多麻雀. 而你恰巧有一個玉米粒. 扔出去,
立馬無數麻雀過來爭搶.而最終只有一只麻雀得到了.而那些沒有搶到的麻雀很累.......
編程中驚群,是個很古老的編程話題了.在服務器開發有機會遇到.有興趣的可以自行搜索, 多數介紹的質量非常高.
而我們今天只討論線程池中驚群現象.采用的POSIX跨平台的線程庫 pthread.
PTW32_DLLPORT int PTW32_CDECL pthread_cond_signal (pthread_cond_t * cond);
上面函數就是線程池中出現驚群來源. 它會激活 pthread_cond_wait 等待態一個或多個線程.
扯一點 要開發多線程強烈看看 <<POSIX 程序設計>> (csdn網速卡沒上傳上去, 大家自己搜搜,放在手機裡看看)
還有一本講多線程部分也特別好 <<C接口與實現>> 的最後一章.
本文參照
線程池中驚群問題 http://zsxxsz.iteye.com/blog/2028452
(注,上面博主思路很清晰,寫了偽代碼. 並且講了避免線程池驚群的想法. 是個真高手,半步宗師)
還參照了很多,寫的很好,但都有些錯誤,就沒貼了. 希望本文是上面那篇文章探討的下篇. 思路一脈相承.
相比上面文章,本文是個大更新. 真代碼真實現.並且加了銷毀函數.(線程池釋放), 簡易函數重載(宏).
前言
先說設計
對於C的設計那就是頭文件中接口的定義. 首先來介紹一下本文中用到的 schead.h 中一些結構和宏
typedef void (*vdel_f)(void* node);
上面函數指針定義類型就是線程池中注冊函數所采用的類型.
還使用了一個結構體判斷宏, C中不支持 struct a == struct b的語法判斷. 實現了一個
/*
* 比較兩個結構體棧上內容是否相等,相等返回true,不等返回false
* a : 第一個結構體值
* b : 第二個結構體值
* : 相等返回true, 否則false
*/
#define STRUCTCMP(a, b) \
(!memcmp(&a, &b, sizeof(a)))
本質是比較結構體中棧中內存.這個技巧是不是很巧妙. 內存是編程中一個大問題,很多事都在內存上出了問題.容我再扯一點, 黑一下C++
個人慚愧的覺得C++真的需要一個垃圾回收器了. 這麼繁瑣的語言還需要自己洞悉內存變化. 太恐怖了.特別是STL源碼擊垮了多少人的心房.
黑的不好意思, 一個基礎過硬獨當一面的C++程序員.也好難遇到了.哎C, C++確實有點'過時'了. 入行的太少, 轉行的太多了.工資還不高.
好這裡就直接到了重頭戲 scpthread.h 線程池的接口設計
#ifndef _H_SCPTHREAD
#define _H_SCPTHREAD
#include <schead.h>
/*
* 這是個基於 pthread.h 的線程池. 簡單方便高效.
* 這裡使用了頭文件 schead.h 也可以省掉,這裡只使用了cdel_f 的類型.
* typedef void (*vdel_f)(void* arg);
* 也自定義了一個類型 threadpool_t 線程池指針類型,也叫作不完整(全)類型.
* 只聲明不寫實現.也是個常用技巧
*/
typedef struct threadpool* threadpool_t;
/*
* 通過這個接口創建線程池對象.後面就可以使用了.
* size : 當前線程池中最大的線程個數
* : 返回創建好的線程池值
*/
extern threadpool_t sp_new(int size);
/*
* 在當前線程池中添加待處理的線程對象.
* pool : 線程池對象, sp_new 創建的那個
* run : 運行的函數體, 返回值void, 參數void*
* arg : 傳入運行的參數
* : 沒有返回值
*/
extern void sp_add(threadpool_t pool, vdel_f run, void* arg);
/*
* 優化擴展宏,簡化操作.唯一惡心的是宏調試難
* _INT_THREADPOOL 是一個簡單的大小設置,控制線程池中線程多少
*
* sp_CREATE 同樣在上面宏幫助下, 少些一個參數. 認為是函數重載
*
* sp_ADD 是一個開發技巧,幫助我們 把 void (*)(type* pi) => void (*)(void* pi),
* 這樣我們寫函數定義的時候更方便隨意.
*/
#define _INT_THREADPOOL (128)
#define sp_NEW() \
sp_new(_INT_THREADPOOL)
#define sp_ADD(pool, run, arg) \
sp_add(pool, (vdel_f)run, arg)
/*
* 銷毀當前線程池,釋放內存,並嘗試停止線程池中線程.
* ppopl : 指向 sp_new創建的對象的指針
* : 沒有返回值
*/
extern void sp_del(threadpool_t* ppool);
#endif // !_H_SCPTHREAD
定義了不完全類型.線程池類型.有創建銷毀,添加內容等. 還寫了幾個充當'函數重載'的宏. 很多人討厭宏,覺得宏不可取.
但是在你們用函數重載的時候,你想過麻煩嗎. 脫離你的IDE,用vi試試函數重載是不是每次都需要看一遍源碼,才知道這個函數到底怎麼用的.
但確實函數宏,不好理解. 模板宏更不好理解. 而且調試難度僅次於多線程調試了.
到這裡接口設計部分已經完工了.沒有好的設計, 什麼都不是......
正文
先說容易實現的
這裡先看看用到的結構部分,首先是任務鏈表結構和創建
// 線程任務鏈表
struct threadjob {
vdel_f run; //當前任務中要執行函數體,注冊的事件
void* arg; //任務中待執行事件的參數
struct threadjob* next; //指向下一個線程任務鏈表
};
// struct threadjob 結構對象創建
static inline struct threadjob* _new_threadjob(vdel_f run, void* arg)
{
struct threadjob* job = malloc(sizeof(struct threadjob));
if(!job)
CERR_EXIT("malloc struct threadjob is NULL!");
job->run = run;
job->arg = arg;
job->next = NULL;
return job;
}
這裡對於內存處理方式,采用了C++中new做法, new出錯了程序直接崩.上面if(!job)也可以省略.主要看個人理解了.
下面核心結構設計
// 線程結構體,每個線程一個信號量
struct thread {
pthread_t tid; //運行的線程id, 在釋放的時候用
pthread_cond_t cond; //當前線程的條件變量
struct thread* next; //下一個線程
};
// 線程池類型定義
struct threadpool {
int size; //線程池大小,最大線程數限制
int curr; //當前線程池中總的線程數
int idle; //當前線程池中空閒的線程數
pthread_mutex_t mutex; //線程互斥鎖
struct thread* threads; //線程條件變量,依賴mutex線程互斥鎖
struct threadjob* head; //線程任務鏈表的表頭, head + tail就是一個隊列結構
struct threadjob* tail; //線程任務鏈表的表尾,這個量是為了後插入的後執行
};
// 添加一個等待的 struct thread 對象到 線程池pool中
static void _thread_add(struct threadpool* pool, pthread_t tid)
{
struct thread* thread = malloc(sizeof(struct thread));
if(!thread)
CERR_EXIT("malloc sizeof(struct thread) is error!");
thread->tid = tid;
pthread_cond_init(&thread->cond, NULL);
thread->next = pool->threads;
pool->threads = thread;
}
// 依據cnd內存地址屬性, 刪除pool->threads 中指定數據
static void _thread_del(struct threadpool* pool, pthread_cond_t* cnd)
{
struct thread* head = pool->threads;
if(cnd == &head->cond){
pool->threads = head->next;
pthread_cond_destroy(&head->cond);
free(head);
return;
}
// 下面是處理非頭結點刪除
while(head->next){
struct thread* tmp = head->next;
if(cnd == &tmp->cond){ //找見了,刪掉退出
head->next = tmp->next;
pthread_cond_destroy(&tmp->cond);
free(tmp);
break;
}
head = tmp;
}
}
// 使用了棧內存比較函數,返回對應線程的cond
static pthread_cond_t* _thread_get(struct threadpool* pool, pthread_t tid)
{
struct thread* head = pool->threads;
while (head) {
if (STRUCTCMP(tid, head->tid))
break;
head = head->next;
}
return &head->cond;
}
對於上面的結構是本文核心,就是每個開啟的線程都會有一個獨有的線程等待變量. 這樣pthread_cond_signal 發送信息都是指定發送的.
性能損耗在 _thread_get 上, 這裡設計是采用單鏈表導致每次都要輪序找到指定線程的線程條件變量. 有好想法的同學可以優化.
對於struct threadpool 結構中 struct threadjob *head, *tail; 是個線程任務隊列.
struct thread* threads; 是個線程鏈表. 當前文件中共用struct threadpool 中 mutex一個互斥量.這裡說一下,鏈表是C結構中基礎的基礎,
所有代碼都是圍繞它這個結構. 一定要磨練中熟悉提高.對於剛學習的人.
上面代碼都是業務代碼, 做的好的就是 pthread_cond_destroy 釋放條件變量信息. 也許這個函數中沒有釋放內存, 但推薦和init成對出現.有始有終.
前戲講完了, 現在講解其它簡單的代碼接口實現
/*
* 通過這個接口創建線程池對象.後面就可以使用了.
* max : 當前線程池中最大的線程個數
* : 返回創建好的線程池值.創建失敗返回NULL
*/
threadpool_t
sp_new(int size)
{
struct threadpool* pool;
// 錯誤判斷,有點丑陋, 申請內存並初始化
if((size <= 0) || !(pool = calloc(1, sizeof(struct threadpool)))){
CERR("struct threadpool calloc is error!");
return NULL;
}
pool->size = size;
pthread_mutex_init(&pool->mutex, NULL);
return pool;
}
上面就是創建接口的實現代碼,calloc相比malloc多調用了memset(&src, 0, sizeof(src))清空置零了.
還有一個釋放資源函數.這裡是允許創建多個線程池.自然要有提供釋放函數.
/*
* 銷毀當前線程池,釋放內存,並嘗試停止線程池中線程.
* ppopl : 指向 sp_new創建的對象的指針
* : 沒有返回值
*/
void
sp_del(threadpool_t* ppool)
{
struct threadpool* pool;
struct thread* thread;
struct threadjob* head;
if((!ppool) || !(pool = *ppool)) return;
//加鎖,等待完全占有鎖的時候再去釋放資源
pthread_mutex_lock(&pool->mutex);
//先釋放線程
thread = pool->threads;
while(thread){
struct thread* next = thread->next;
pthread_cancel(thread->tid);
pthread_cond_destroy(&thread->cond);
free(thread);
thread = next;
}
//再來釋放任務列表
head = pool->head;
while(head) {
struct threadjob* next = head->next;
free(head);
head = next;
}
pthread_mutex_unlock(&pool->mutex);
//最後要銷毀這個使用的線程鎖對象
pthread_mutex_destroy(&pool->mutex);
*ppool = NULL;
}
也許就是多了這個需求原先的代碼量多了一半. 需要圍繞它讓開啟的線程能夠支持可需求.安全取消等.本文中用到的很多pthread api.
不熟悉的多搜索,多做筆記.不懂多了, 需要的是自己學習.
對於上面釋放函數先競爭唯一互斥量,競爭到了那麼就開始釋放了.先關閉線程後面釋放任務列表和線程條件變量資源.
再說核心實現
這部分和上面參照的博文有很多相似之處,大家看了上面代碼,看這個應該很好理解. 核心部分就兩個函數,一個是線程輪詢處理任務的函數.
一個是構建線程池函數. 線程輪序函數如下
// 線程運行的時候執行函數
static void* _consumer(struct threadpool* pool)
{
struct threadjob* job;
int status;
pthread_t tid = pthread_self();
pthread_mutex_t* mtx = &pool->mutex;
pthread_cond_t* cnd;
//設置線程屬性, 默認線程屬性 允許退出線程
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); //設置立即取消
pthread_detach(tid); //設置線程分離,自銷毀
// 改消費者線程加鎖, 並且得到當前線程的條件變量,vdel_f為了linux上消除警告
pthread_cleanup_push((vdel_f)pthread_mutex_unlock, mtx);
pthread_mutex_lock(mtx);
cnd = _thread_get(pool, tid);
__loop:
if(pool->head != NULL) {
// 有多線程任務,取出數據從下面處理
job = pool->head;
pool->head = job->next;
if(pool->tail == job)
pool->tail = NULL;
// 解鎖, 允許其它消費者線程加鎖或生產線程添加新任務
pthread_mutex_unlock(mtx);
// 回調函數,後面再去刪除這個任務
job->run(job->arg);
free(job);
// 新的一輪開始需要重新加鎖
pthread_mutex_lock(mtx);
goto __loop;
}
// 這裡相當於 if 的 else, pool->first == NULL
++pool->idle;
// 調用pthread_cond_wait 等待線程條件變量被通知且自動解鎖
status = pthread_cond_wait(cnd, mtx);
--pool->idle;
if(status == 0) //等待成功了,那就開始輪序處理任務
goto __loop;
//到這裡是程序出現異常, 進程退出中, 先減少當前線程
--pool->curr;
//去掉這個線程鏈表pool->threads中對應數據
_thread_del(pool, cnd);
pthread_mutex_unlock(mtx);
pthread_cleanup_pop(0);
return NULL;
}
對於pthread_cleanup_push 和 pthread_cleanup_pop 也是posix線程的標准用法, 為了在函數取消的時候釋放 lock鎖.
其中采用強制轉換 (vdel_f) 是為了消除linux上gcc編譯的警告.因為gcc上 函數只有 void* (*)(void*) 類型.而window上對於上面宏調用的時候強加了轉換
/*
* C implementation of PThreads cancel cleanup
*/
#define pthread_cleanup_push( _rout, _arg ) \
{ \
ptw32_cleanup_t _cleanup; \
\
ptw32_push_cleanup( &_cleanup, (ptw32_cleanup_callback_t) (_rout), (_arg) ); \
#define pthread_cleanup_pop( _execute ) \
(void) ptw32_pop_cleanup( _execute ); \
}
linux上代碼是這樣的
/* Install a cleanup handler: ROUTINE will be called with arguments ARG
when the thread is canceled or calls pthread_exit. ROUTINE will also
be called with arguments ARG when the matching pthread_cleanup_pop
is executed with non-zero EXECUTE argument.
pthread_cleanup_push and pthread_cleanup_pop are macros and must always
be used in matching pairs at the same nesting level of braces. */
# define pthread_cleanup_push(routine, arg) \
do { \
struct __pthread_cleanup_frame __clframe \
__attribute__ ((__cleanup__ (__pthread_cleanup_routine))) \
= { .__cancel_routine = (routine), .__cancel_arg = (arg), \
.__do_it = 1 };
/* Remove a cleanup handler installed by the matching pthread_cleanup_push.
If EXECUTE is non-zero, the handler function is called. */
# define pthread_cleanup_pop(execute) \
__clframe.__do_it = (execute); \
} while (0)
其中 struct __pthread_cleanup_frame 結構如下
/* Structure to hold the cleanup handler information. */
struct __pthread_cleanup_frame
{
void (*__cancel_routine) (void *);
void *__cancel_arg;
int __do_it;
int __cancel_type;
};
提升技術最好的辦法
1.多看書
2.多寫代碼,多搜搜,多問問
3.多看別人的好代碼, 多臨摹源碼
4.多創造,多改進,多實戰
等這該明白的都明白了,一切都是那樣容易,那樣的美的時候. 就可以回家種田了. 哈哈.
再補充上面說明一下.為什麼用goto, 不喜歡無腦的for(;;) {}, 並且黑屏幕小,vi上太長了換行不好看.
最後測試
那到了測試環節,測試代碼 test_spthread.c
#include <scpthread.h>
//全局計時器,存在鎖問題
static int _old;
//簡單的線程打印函數
static void _ppt(const char* str)
{
printf("%d => %s\n", ++_old, str);
}
//另一個線程測試函數
static void _doc(void* arg)
{
printf("p = %d, 技術不決定項目的成敗!我老大哭了\n", ++_old);
}
// 測試開啟線程量集
#define _INT_THS (10000)
int main(void)
{
int i;
//創建線程池
threadpool_t pool = sp_NEW();
//添加任務到線程池中
for(i=0; i<_INT_THS; ++i){
sp_ADD(pool, _ppt, "你為你負責的項目拼命過嗎.流過淚嗎");
sp_ADD(pool, _doc, NULL);
}
//等待5s 再結束吧
SLEEPMS(5000);
//清除當前線程池資源, 實戰上線程池是常駐內存,不要清除.
sp_del(&pool);
return 0;
}
window 上測試截圖

linux上 編譯代碼部分如下
[pirate@wangzhi_test linux_sc_template]$ make cc -g -Wall -D_DEBUG -c -o test_scpthread.o main/test_scpthread.c -I./module/schead/include -I./module/struct/include -I./module/service/include cc -g -Wall -D_DEBUG -o test_scpthread.out test_scpthread.o scpthread.o -lpthread -lm -I./module/schead/include -I./module/struct/include -I./module/service/include
測試結果截圖如下

上面最後沒有到 20000主要原因是 ++_old 不是線程安全的. 好的到這裡我們關於 避免驚群的線程池就已經設計完畢了.
後記
錯誤難免的,就和煙盒上的吸煙有害健康一樣. 哈哈 吸煙有害, 但健康. 有問題交流立馬解決.
用到的所有代碼.

schead.h

scpthread.h

scpthread.c

test_scpthread.c

Makefile
