程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> C 實現有追求的線程池 探究,線程池

C 實現有追求的線程池 探究,線程池

編輯:關於C語言

C 實現有追求的線程池 探究,線程池


引言

  線程池很普通的老話題,討論的很多.深入的不多,也就那些基礎庫中才能見到這種精妙完備的技巧.而本文隨大流

想深入簡述一種高效控制性強的一種線程池實現.

  先引入一個概念, 驚群. 簡單舉個例子. 春天來了, 公園出現了很多麻雀. 而你恰巧有一個玉米粒. 扔出去,

立馬無數麻雀過來爭搶.而最終只有一只麻雀得到了.而那些沒有搶到的麻雀很累.......

  編程中驚群,是個很古老的編程話題了.在服務器開發有機會遇到.有興趣的可以自行搜索, 多數介紹的質量非常高.

而我們今天只討論線程池中驚群現象.采用的POSIX跨平台的線程庫 pthread. 

PTW32_DLLPORT int PTW32_CDECL pthread_cond_signal (pthread_cond_t * cond);

上面函數就是線程池中出現驚群來源. 它會激活 pthread_cond_wait 等待態一個或多個線程.

  扯一點 要開發多線程強烈看看 <<POSIX 程序設計>>  (csdn網速卡沒上傳上去, 大家自己搜搜,放在手機裡看看)

     還有一本講多線程部分也特別好 <<C接口與實現>> 的最後一章.

本文參照

  線程池中驚群問題    http://zsxxsz.iteye.com/blog/2028452

(注,上面博主思路很清晰,寫了偽代碼. 並且講了避免線程池驚群的想法. 是個真高手,半步宗師)

還參照了很多,寫的很好,但都有些錯誤,就沒貼了. 希望本文是上面那篇文章探討的下篇. 思路一脈相承.

相比上面文章,本文是個大更新. 真代碼真實現.並且加了銷毀函數.(線程池釋放), 簡易函數重載(宏).

 

 

前言

先說設計 

對於C的設計那就是頭文件中接口的定義. 首先來介紹一下本文中用到的 schead.h 中一些結構和宏

typedef void (*vdel_f)(void* node);

上面函數指針定義類型就是線程池中注冊函數所采用的類型.

還使用了一個結構體判斷宏, C中不支持 struct a == struct b的語法判斷. 實現了一個

/*
 * 比較兩個結構體棧上內容是否相等,相等返回true,不等返回false
 * a    : 第一個結構體值
 * b    : 第二個結構體值
 *        : 相等返回true, 否則false
 */
#define STRUCTCMP(a, b) \
    (!memcmp(&a, &b, sizeof(a)))

本質是比較結構體中棧中內存.這個技巧是不是很巧妙. 內存是編程中一個大問題,很多事都在內存上出了問題.容我再扯一點, 黑一下C++

個人慚愧的覺得C++真的需要一個垃圾回收器了. 這麼繁瑣的語言還需要自己洞悉內存變化. 太恐怖了.特別是STL源碼擊垮了多少人的心房.

黑的不好意思, 一個基礎過硬獨當一面的C++程序員.也好難遇到了.哎C, C++確實有點'過時'了. 入行的太少, 轉行的太多了.工資還不高.

好這裡就直接到了重頭戲 scpthread.h 線程池的接口設計

#ifndef _H_SCPTHREAD
#define _H_SCPTHREAD

#include <schead.h>

/*
 *   這是個基於 pthread.h 的線程池. 簡單方便高效.
 * 這裡使用了頭文件 schead.h 也可以省掉,這裡只使用了cdel_f 的類型.
 * typedef void (*vdel_f)(void* arg);
 * 也自定義了一個類型 threadpool_t 線程池指針類型,也叫作不完整(全)類型.
 * 只聲明不寫實現.也是個常用技巧
 */
typedef struct threadpool* threadpool_t;

/*
 * 通過這個接口創建線程池對象.後面就可以使用了.
 * size        : 當前線程池中最大的線程個數
 *            : 返回創建好的線程池值
 */
extern threadpool_t sp_new(int size);

/*
 * 在當前線程池中添加待處理的線程對象.
 * pool        : 線程池對象, sp_new 創建的那個
 * run        : 運行的函數體, 返回值void, 參數void*
 * arg        : 傳入運行的參數
 *            : 沒有返回值
 */
extern void sp_add(threadpool_t pool, vdel_f run, void* arg);

/*
 * 優化擴展宏,簡化操作.唯一惡心的是宏調試難
 * _INT_THREADPOOL 是一個簡單的大小設置,控制線程池中線程多少
 *
 * sp_CREATE 同樣在上面宏幫助下, 少些一個參數. 認為是函數重載
 * 
 * sp_ADD 是一個開發技巧,幫助我們 把 void (*)(type* pi) => void (*)(void* pi), 
 * 這樣我們寫函數定義的時候更方便隨意.
 */
#define _INT_THREADPOOL    (128)

#define sp_NEW() \
    sp_new(_INT_THREADPOOL)

#define sp_ADD(pool, run, arg) \
    sp_add(pool, (vdel_f)run, arg)

/*
 * 銷毀當前線程池,釋放內存,並嘗試停止線程池中線程.
 * ppopl        : 指向 sp_new創建的對象的指針
 *                : 沒有返回值
 */
extern void sp_del(threadpool_t* ppool);

#endif // !_H_SCPTHREAD

定義了不完全類型.線程池類型.有創建銷毀,添加內容等. 還寫了幾個充當'函數重載'的宏. 很多人討厭宏,覺得宏不可取.

但是在你們用函數重載的時候,你想過麻煩嗎. 脫離你的IDE,用vi試試函數重載是不是每次都需要看一遍源碼,才知道這個函數到底怎麼用的.

但確實函數宏,不好理解. 模板宏更不好理解. 而且調試難度僅次於多線程調試了.

到這裡接口設計部分已經完工了.沒有好的設計, 什麼都不是......

 

正文

先說容易實現的

這裡先看看用到的結構部分,首先是任務鏈表結構和創建

// 線程任務鏈表
struct threadjob {
    vdel_f run;                    //當前任務中要執行函數體,注冊的事件
    void* arg;                    //任務中待執行事件的參數    
    struct threadjob* next;        //指向下一個線程任務鏈表
};

// struct threadjob 結構對象創建
static inline struct threadjob* _new_threadjob(vdel_f run, void* arg)
{
    struct threadjob* job = malloc(sizeof(struct threadjob));
    if(!job)
        CERR_EXIT("malloc struct threadjob is NULL!");
    job->run = run;
    job->arg = arg;
    job->next = NULL;
    return job;
}

這裡對於內存處理方式,采用了C++中new做法, new出錯了程序直接崩.上面if(!job)也可以省略.主要看個人理解了.

下面核心結構設計

// 線程結構體,每個線程一個信號量
struct thread {
    pthread_t tid;                //運行的線程id, 在釋放的時候用
    pthread_cond_t cond;        //當前線程的條件變量
    struct thread* next;        //下一個線程
};

// 線程池類型定義
struct threadpool {
    int size;                    //線程池大小,最大線程數限制
    int curr;                    //當前線程池中總的線程數
    int idle;                    //當前線程池中空閒的線程數
    pthread_mutex_t mutex;        //線程互斥鎖
    struct thread* threads;        //線程條件變量,依賴mutex線程互斥鎖
    struct threadjob* head;        //線程任務鏈表的表頭, head + tail就是一個隊列結構
    struct threadjob* tail;        //線程任務鏈表的表尾,這個量是為了後插入的後執行
};

// 添加一個等待的 struct thread 對象到 線程池pool中
static void _thread_add(struct threadpool* pool, pthread_t tid)
{
    struct thread* thread = malloc(sizeof(struct thread));
    if(!thread)
        CERR_EXIT("malloc sizeof(struct thread) is error!");
    thread->tid = tid;
    pthread_cond_init(&thread->cond, NULL);
    thread->next = pool->threads;
    pool->threads = thread;
}

// 依據cnd內存地址屬性, 刪除pool->threads 中指定數據
static void _thread_del(struct threadpool* pool, pthread_cond_t* cnd)
{
    struct thread* head = pool->threads;
    if(cnd == &head->cond){
        pool->threads = head->next;
        pthread_cond_destroy(&head->cond);
        free(head);
        return;
    }
    // 下面是處理非頭結點刪除
    while(head->next){
        struct thread* tmp = head->next;
        if(cnd == &tmp->cond){ //找見了,刪掉退出
            head->next = tmp->next;
            pthread_cond_destroy(&tmp->cond);
            free(tmp);
            break;
        }
        head = tmp;
    }
}

// 使用了棧內存比較函數,返回對應線程的cond
static pthread_cond_t* _thread_get(struct threadpool* pool, pthread_t tid)
{
    struct thread* head = pool->threads;

    while (head) {
        if (STRUCTCMP(tid, head->tid))
            break;
        head = head->next;
    }
    return &head->cond;
}

對於上面的結構是本文核心,就是每個開啟的線程都會有一個獨有的線程等待變量. 這樣pthread_cond_signal 發送信息都是指定發送的.

性能損耗在 _thread_get 上, 這裡設計是采用單鏈表導致每次都要輪序找到指定線程的線程條件變量. 有好想法的同學可以優化.

對於struct threadpool 結構中 struct threadjob *head, *tail; 是個線程任務隊列.

struct thread* threads; 是個線程鏈表. 當前文件中共用struct threadpool 中 mutex一個互斥量.這裡說一下,鏈表是C結構中基礎的基礎,

所有代碼都是圍繞它這個結構. 一定要磨練中熟悉提高.對於剛學習的人.

上面代碼都是業務代碼, 做的好的就是 pthread_cond_destroy 釋放條件變量信息. 也許這個函數中沒有釋放內存, 但推薦和init成對出現.有始有終.

前戲講完了, 現在講解其它簡單的代碼接口實現

/*
 * 通過這個接口創建線程池對象.後面就可以使用了.
 * max        : 當前線程池中最大的線程個數
 *            : 返回創建好的線程池值.創建失敗返回NULL
 */
threadpool_t 
sp_new(int size)
{
    struct threadpool* pool;
    // 錯誤判斷,有點丑陋, 申請內存並初始化
    if((size <= 0) || !(pool = calloc(1, sizeof(struct threadpool)))){
        CERR("struct threadpool calloc is error!");
        return NULL;
    }
    pool->size = size;
    pthread_mutex_init(&pool->mutex, NULL);
    
    return pool;
}

上面就是創建接口的實現代碼,calloc相比malloc多調用了memset(&src, 0, sizeof(src))清空置零了.

還有一個釋放資源函數.這裡是允許創建多個線程池.自然要有提供釋放函數.

/*
 * 銷毀當前線程池,釋放內存,並嘗試停止線程池中線程.
 * ppopl        : 指向 sp_new創建的對象的指針
 *                : 沒有返回值
 */
void 
sp_del(threadpool_t* ppool)
{
    struct threadpool* pool;
    struct thread* thread;
    struct threadjob* head;
    if((!ppool) || !(pool = *ppool)) return;
    
    //加鎖,等待完全占有鎖的時候再去釋放資源
    pthread_mutex_lock(&pool->mutex);
    
    //先釋放線程
    thread = pool->threads;
    while(thread){
        struct thread* next = thread->next;
        pthread_cancel(thread->tid);
        pthread_cond_destroy(&thread->cond);
        free(thread);
        thread = next;
    }
    //再來釋放任務列表
    head = pool->head;
    while(head) {
        struct threadjob* next = head->next;
        free(head);
        head = next;
    }
    pthread_mutex_unlock(&pool->mutex);
    
    //最後要銷毀這個使用的線程鎖對象
    pthread_mutex_destroy(&pool->mutex);    
    *ppool = NULL;
}

也許就是多了這個需求原先的代碼量多了一半. 需要圍繞它讓開啟的線程能夠支持可需求.安全取消等.本文中用到的很多pthread api.

不熟悉的多搜索,多做筆記.不懂多了, 需要的是自己學習.

對於上面釋放函數先競爭唯一互斥量,競爭到了那麼就開始釋放了.先關閉線程後面釋放任務列表和線程條件變量資源.

再說核心實現

     這部分和上面參照的博文有很多相似之處,大家看了上面代碼,看這個應該很好理解. 核心部分就兩個函數,一個是線程輪詢處理任務的函數.

一個是構建線程池函數. 線程輪序函數如下

// 線程運行的時候執行函數
static void* _consumer(struct threadpool* pool)
{
    struct threadjob* job;
    int status;
    pthread_t tid = pthread_self();
    pthread_mutex_t* mtx = &pool->mutex;
    pthread_cond_t* cnd;

    //設置線程屬性, 默認線程屬性 允許退出線程 
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); //設置立即取消 
    pthread_detach(tid); //設置線程分離,自銷毀
    
    // 改消費者線程加鎖, 並且得到當前線程的條件變量,vdel_f為了linux上消除警告
    pthread_cleanup_push((vdel_f)pthread_mutex_unlock, mtx);
    pthread_mutex_lock(mtx);

    cnd = _thread_get(pool, tid);
__loop:
    if(pool->head != NULL) {
        // 有多線程任務,取出數據從下面處理
        job = pool->head;
        pool->head = job->next;
        if(pool->tail == job)
            pool->tail = NULL;
        
        // 解鎖, 允許其它消費者線程加鎖或生產線程添加新任務
        pthread_mutex_unlock(mtx);
        // 回調函數,後面再去刪除這個任務
        job->run(job->arg);
        free(job);
        
        // 新的一輪開始需要重新加鎖
        pthread_mutex_lock(mtx);
        goto __loop;
    }
    // 這裡相當於 if 的 else, pool->first == NULL
    ++pool->idle;
    // 調用pthread_cond_wait 等待線程條件變量被通知且自動解鎖
    status = pthread_cond_wait(cnd, mtx);
    --pool->idle;
    if(status == 0) //等待成功了,那就開始輪序處理任務
        goto __loop;
    
    //到這裡是程序出現異常, 進程退出中, 先減少當前線程
    --pool->curr;
    //去掉這個線程鏈表pool->threads中對應數據
    _thread_del(pool, cnd);

    pthread_mutex_unlock(mtx);
    pthread_cleanup_pop(0);

    return NULL;
}

對於pthread_cleanup_push 和 pthread_cleanup_pop 也是posix線程的標准用法, 為了在函數取消的時候釋放 lock鎖.

其中采用強制轉換 (vdel_f) 是為了消除linux上gcc編譯的警告.因為gcc上 函數只有 void* (*)(void*) 類型.而window上對於上面宏調用的時候強加了轉換

        /*
         * C implementation of PThreads cancel cleanup
         */

#define pthread_cleanup_push( _rout, _arg ) \
        { \
            ptw32_cleanup_t     _cleanup; \
            \
            ptw32_push_cleanup( &_cleanup, (ptw32_cleanup_callback_t) (_rout), (_arg) ); \

#define pthread_cleanup_pop( _execute ) \
            (void) ptw32_pop_cleanup( _execute ); \
        }

linux上代碼是這樣的

/* Install a cleanup handler: ROUTINE will be called with arguments ARG
   when the thread is canceled or calls pthread_exit.  ROUTINE will also
   be called with arguments ARG when the matching pthread_cleanup_pop
   is executed with non-zero EXECUTE argument.

   pthread_cleanup_push and pthread_cleanup_pop are macros and must always
   be used in matching pairs at the same nesting level of braces.  */
#  define pthread_cleanup_push(routine, arg) \
  do {                                                                        \
    struct __pthread_cleanup_frame __clframe                                  \
      __attribute__ ((__cleanup__ (__pthread_cleanup_routine)))               \
      = { .__cancel_routine = (routine), .__cancel_arg = (arg),               \
          .__do_it = 1 };

/* Remove a cleanup handler installed by the matching pthread_cleanup_push.
   If EXECUTE is non-zero, the handler function is called. */
#  define pthread_cleanup_pop(execute) \
    __clframe.__do_it = (execute);                                            \
  } while (0)

其中 struct __pthread_cleanup_frame 結構如下

/* Structure to hold the cleanup handler information.  */
struct __pthread_cleanup_frame
{
  void (*__cancel_routine) (void *);
  void *__cancel_arg;
  int __do_it;
  int __cancel_type;
};

提升技術最好的辦法

1.多看書

2.多寫代碼,多搜搜,多問問

3.多看別人的好代碼, 多臨摹源碼

4.多創造,多改進,多實戰

等這該明白的都明白了,一切都是那樣容易,那樣的美的時候. 就可以回家種田了. 哈哈.

再補充上面說明一下.為什麼用goto, 不喜歡無腦的for(;;) {}, 並且黑屏幕小,vi上太長了換行不好看.

 

最後測試

  那到了測試環節,測試代碼 test_spthread.c 

#include <scpthread.h>

//全局計時器,存在鎖問題
static int _old;

//簡單的線程打印函數
static void _ppt(const char* str)
{
    printf("%d => %s\n", ++_old, str);
}

//另一個線程測試函數
static  void _doc(void* arg)
{
    printf("p = %d, 技術不決定項目的成敗!我老大哭了\n", ++_old);
}

// 測試開啟線程量集
#define _INT_THS (10000)

int main(void)
{    
    int i;
    //創建線程池
    threadpool_t pool = sp_NEW();
    
    //添加任務到線程池中
    for(i=0; i<_INT_THS; ++i){
        sp_ADD(pool, _ppt, "你為你負責的項目拼命過嗎.流過淚嗎");
        sp_ADD(pool, _doc, NULL);
    }
    
    //等待5s 再結束吧
    SLEEPMS(5000);
    //清除當前線程池資源, 實戰上線程池是常駐內存,不要清除.
    sp_del(&pool);
    return 0;
}

window 上測試截圖

linux上 編譯代碼部分如下

[pirate@wangzhi_test linux_sc_template]$ make
cc -g -Wall -D_DEBUG -c -o test_scpthread.o main/test_scpthread.c -I./module/schead/include -I./module/struct/include -I./module/service/include
cc -g -Wall -D_DEBUG -o test_scpthread.out test_scpthread.o scpthread.o -lpthread -lm -I./module/schead/include -I./module/struct/include -I./module/service/include

測試結果截圖如下

上面最後沒有到 20000主要原因是 ++_old 不是線程安全的. 好的到這裡我們關於 避免驚群的線程池就已經設計完畢了.

 

後記

  錯誤難免的,就和煙盒上的吸煙有害健康一樣. 哈哈 吸煙有害, 但健康. 有問題交流立馬解決.

用到的所有代碼.

schead.h

#ifndef _H_SCHEAD #define _H_SCHEAD #include <stdio.h> #include <stdlib.h> #include <stdbool.h> #include <errno.h> #include <string.h> #include <time.h> #include <stdint.h> #include <stddef.h> /* * 1.0 錯誤定義宏 用於判斷返回值狀態的狀態碼 _RF表示返回標志 * 使用舉例 : int flag = scconf_get("pursue"); if(flag != _RT_OK){ sclog_error("get config %s error! flag = %d.", "pursue", flag); exit(EXIT_FAILURE); } * 這裡是內部 使用的通用返回值 標志 */ #define _RT_OK (0) //結果正確的返回宏 #define _RT_EB (-1) //錯誤基類型,所有錯誤都可用它,在不清楚的情況下 #define _RT_EP (-2) //參數錯誤 #define _RT_EM (-3) //內存分配錯誤 #define _RT_EC (-4) //文件已經讀取完畢或表示鏈接關閉 #define _RT_EF (-5) //文件打開失敗 /* * 1.1 定義一些 通用的函數指針幫助,主要用於基庫的封裝中 * 有構造函數, 釋放函數, 比較函數等 */ typedef void* (*pnew_f)(); typedef void (*vdel_f)(void* node); // icmp_f 最好 是 int cmp(const void* ln,const void* rn); 標准結構 typedef int (*icmp_f)(); /* * c 如果是空白字符返回 true, 否則返回false * c : 必須是 int 值,最好是 char 范圍 */ #define sh_isspace(c) \ ((c==' ')||(c>='\t'&&c<='\r')) /* * 2.0 如果定義了 __GNUC__ 就假定是 使用gcc 編譯器,為Linux平台 * 否則 認為是 Window 平台,不可否認宏是丑陋的 */ #if defined(__GNUC__) //下面是依賴 Linux 實現,等待毫秒數 #include <unistd.h> #include <sys/time.h> #define SLEEPMS(m) \ usleep(m * 1000) #else // 這裡創建等待函數 以毫秒為單位 , 需要依賴操作系統實現 #include <Windows.h> #include <direct.h> // 加載多余的頭文件在 編譯階段會去掉 #define rmdir _rmdir /** * Linux sys/time.h 中獲取時間函數在Windows上一種移植實現 **tv : 返回結果包含秒數和微秒數 **tz : 包含的時區,在window上這個變量沒有用不返回 ** : 默認返回0 **/ extern int gettimeofday(struct timeval* tv, void* tz); //為了解決 不通用功能 #define localtime_r(t, tm) localtime_s(tm, t) #define SLEEPMS(m) \ Sleep(m) #endif /*__GNUC__ 跨平台的代碼都很丑陋 */ //3.0 浮點數據判斷宏幫助, __開頭表示不希望你使用的宏 #define __DIFF(x, y) ((x)-(y)) //兩個表達式做差宏 #define __IF_X(x, z) ((x)<z&&(x)>-z) //判斷宏,z必須是宏常量 #define EQ(x, y, c) EQ_ZERO(__DIFF(x,y), c) //判斷x和y是否在誤差范圍內相等 //3.1 float判斷定義的宏 #define _FLOAT_ZERO (0.000001f) //float 0的誤差判斷值 #define EQ_FLOAT_ZERO(x) __IF_X(x,_FLOAT_ZERO) //float 判斷x是否為零是返回true #define EQ_FLOAT(x, y) EQ(x, y, _FLOAT_ZERO) //判斷表達式x與y是否相等 //3.2 double判斷定義的宏 #define _DOUBLE_ZERO (0.000000000001) //double 0誤差判斷值 #define EQ_DOUBLE_ZERO(x) __IF_X(x,_DOUBLE_ZERO) //double 判斷x是否為零是返回true #define EQ_DOUBLE(x,y) EQ(x, y, _DOUBLE_ZERO) //判斷表達式x與y是否相等 //4.0 控制台打印錯誤信息, fmt必須是雙引號括起來的宏 #ifndef CERR #define CERR(fmt, ...) \ fprintf(stderr,"[%s:%s:%d][error %d:%s]" fmt "\r\n",\ __FILE__, __func__, __LINE__, errno, strerror(errno),##__VA_ARGS__) #endif/* !CERR */ //4.1 控制台打印錯誤信息並退出, t同樣fmt必須是 ""括起來的字符串常量 #ifndef CERR_EXIT #define CERR_EXIT(fmt,...) \ CERR(fmt,##__VA_ARGS__),exit(EXIT_FAILURE) #endif/* !ERR */ #ifndef IF_CERR /* *4.2 if 的 代碼檢測 * * 舉例: * IF_CERR(fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP), "socket create error!"); * 遇到問題打印日志直接退出,可以認為是一種簡單模板 * code : 要檢測的代碼 * fmt : 必須是""括起來的字符串宏 * ... : 後面的參數,參照printf */ #define IF_CERR(code, fmt, ...) \ if((code) < 0) \ CERR_EXIT(fmt, ##__VA_ARGS__) #endif //!IF_CERR #ifndef IF_CHECK /* * 是上面IF_CERR 的簡化版很好用 */ #define IF_CHECK(code) \ if((code) < 0) \ CERR_EXIT(#code) #endif // !IF_CHECK //5.0 獲取數組長度,只能是數組類型或""字符串常量,後者包含'\0' #ifndef LEN #define LEN(arr) \ (sizeof(arr)/sizeof(*(arr))) #endif/* !ARRLEN */ //6.0 程序清空屏幕函數 #ifndef CONSOLE_CLEAR #ifndef _WIN32 #define CONSOLE_CLEAR() \ system("printf '\ec'") #else #define CONSOLE_CLEAR() \ system("cls") #endif/* _WIN32 */ #endif /*!CONSOLE_CLEAR*/ //7.0 置空操作 #ifndef BZERO //v必須是個變量 #define BZERO(v) \ memset(&v,0,sizeof(v)) #endif/* !BZERO */ //9.0 scanf 健壯的 #ifndef SAFETY_SCANF #define SAFETY_SCANF(scanf_code,...) \ while(printf(__VA_ARGS__),scanf_code){\ while(getchar()!='\n');\ puts("輸入出錯,請按照提示重新操作!");\ }\ while(getchar()!='\n') #endif /*!SAFETY_SCANF*/ //10.0 簡單的time幫助宏 #ifndef TIME_PRINT #define TIME_PRINT(code) {\ clock_t __st,__et;\ __st=clock();\ code\ __et=clock();\ printf("當前代碼塊運行時間是:%lf秒\n",(0.0+__et-__st)/CLOCKS_PER_SEC);\ } #endif /*!TIME_PRINT*/ /* * 10.1 這裡是一個 在 DEBUG 模式下的測試宏 * * 用法 : * DEBUG_CODE({ * puts("debug start..."); * }); */ #ifndef DEBUG_CODE # ifdef _DEBUG # define DEBUG_CODE(code) code # else # define DEBUG_CODE(code) # endif // ! _DEBUG #endif // !DEBUG_CODE //11.0 等待的宏 是個單線程沒有加鎖 #define _STR_PAUSEMSG "請按任意鍵繼續. . ." extern void sh_pause(void); #ifndef INIT_PAUSE # ifdef _DEBUG # define INIT_PAUSE() atexit(sh_pause) # else # define INIT_PAUSE() (void)316 /* 別說了,都重新開始吧 */ # endif #endif/* !INIT_PAUSE */ //12.0 判斷是大端序還是小端序,大端序返回true extern bool sh_isbig(void); /** * sh_free - 簡單的釋放內存函數,對free再封裝了一下 **可以避免野指針 **pobj:指向待釋放內存的指針(void*) **/ extern void sh_free(void** pobj); /** * 獲取 當前時間串,並塞入tstr中長度並返回 ** 使用舉例 char tstr[64]; sh_times(tstr, LEN(tstr)); puts(tstr); **tstr : 保存最後生成的最後串 **len : tstr數組的長度 ** : 返回tstr首地址 **/ extern int sh_times(char tstr[], int len); /* * 比較兩個結構體棧上內容是否相等,相等返回true,不等返回false * a : 第一個結構體值 * b : 第二個結構體值 * : 相等返回true, 否則false */ #define STRUCTCMP(a, b) \ (!memcmp(&a, &b, sizeof(a))) #endif/* ! _H_SCHEAD */ View Code

 

scpthread.h

#ifndef _H_SCPTHREAD #define _H_SCPTHREAD #include <schead.h> /* * 這是個基於 pthread.h 的線程池. 簡單方便高效. * 這裡使用了頭文件 schead.h 也可以省掉,這裡只使用了cdel_f 的類型. * typedef void (*vdel_f)(void* arg); * 也自定義了一個類型 threadpool_t 線程池指針類型,也叫作不完整(全)類型. * 只聲明不寫實現.也是個常用技巧 */ typedef struct threadpool* threadpool_t; /* * 通過這個接口創建線程池對象.後面就可以使用了. * size : 當前線程池中最大的線程個數 * : 返回創建好的線程池值 */ extern threadpool_t sp_new(int size); /* * 在當前線程池中添加待處理的線程對象. * pool : 線程池對象, sp_new 創建的那個 * run : 運行的函數體, 返回值void, 參數void* * arg : 傳入運行的參數 * : 沒有返回值 */ extern void sp_add(threadpool_t pool, vdel_f run, void* arg); /* * 優化擴展宏,簡化操作.唯一惡心的是宏調試難 * _INT_THREADPOOL 是一個簡單的大小設置,控制線程池中線程多少 * * sp_CREATE 同樣在上面宏幫助下, 少些一個參數. 認為是函數重載 * * sp_ADD 是一個開發技巧,幫助我們 把 void (*)(type* pi) => void (*)(void* pi), * 這樣我們寫函數定義的時候更方便隨意. */ #define _INT_THREADPOOL (128) #define sp_NEW() \ sp_new(_INT_THREADPOOL) #define sp_ADD(pool, run, arg) \ sp_add(pool, (vdel_f)run, arg) /* * 銷毀當前線程池,釋放內存,並嘗試停止線程池中線程. * ppopl : 指向 sp_new創建的對象的指針 * : 沒有返回值 */ extern void sp_del(threadpool_t* ppool); #endif // !_H_SCPTHREAD View Code

 

scpthread.c

#include <scpthread.h> #include <pthread.h> // 線程任務鏈表 struct threadjob { vdel_f run; //當前任務中要執行函數體,注冊的事件 void* arg; //任務中待執行事件的參數 struct threadjob* next; //指向下一個線程任務鏈表 }; // struct threadjob 結構對象創建 static inline struct threadjob* _new_threadjob(vdel_f run, void* arg) { struct threadjob* job = malloc(sizeof(struct threadjob)); if(!job) CERR_EXIT("malloc struct threadjob is NULL!"); job->run = run; job->arg = arg; job->next = NULL; return job; } // 線程結構體,每個線程一個信號量 struct thread { pthread_t tid; //運行的線程id, 在釋放的時候用 pthread_cond_t cond; //當前線程的條件變量 struct thread* next; //下一個線程 }; // 線程池類型定義 struct threadpool { int size; //線程池大小,最大線程數限制 int curr; //當前線程池中總的線程數 int idle; //當前線程池中空閒的線程數 pthread_mutex_t mutex; //線程互斥鎖 struct thread* threads; //線程條件變量,依賴mutex線程互斥鎖 struct threadjob* head; //線程任務鏈表的表頭, head + tail就是一個隊列結構 struct threadjob* tail; //線程任務鏈表的表尾,這個量是為了後插入的後執行 }; // 添加一個等待的 struct thread 對象到 線程池pool中 static void _thread_add(struct threadpool* pool, pthread_t tid) { struct thread* thread = malloc(sizeof(struct thread)); if(!thread) CERR_EXIT("malloc sizeof(struct thread) is error!"); thread->tid = tid; pthread_cond_init(&thread->cond, NULL); thread->next = pool->threads; pool->threads = thread; } // 依據cnd內存地址屬性, 刪除pool->threads 中指定數據 static void _thread_del(struct threadpool* pool, pthread_cond_t* cnd) { struct thread* head = pool->threads; if(cnd == &head->cond){ pool->threads = head->next; pthread_cond_destroy(&head->cond); free(head); return; } // 下面是處理非頭結點刪除 while(head->next){ struct thread* tmp = head->next; if(cnd == &tmp->cond){ //找見了,刪掉退出 head->next = tmp->next; pthread_cond_destroy(&tmp->cond); free(tmp); break; } head = tmp; } } // 使用了棧內存比較函數,返回對應線程的cond static pthread_cond_t* _thread_get(struct threadpool* pool, pthread_t tid) { struct thread* head = pool->threads; while (head) { if (STRUCTCMP(tid, head->tid)) break; head = head->next; } return &head->cond; } /* * 通過這個接口創建線程池對象.後面就可以使用了. * max : 當前線程池中最大的線程個數 * : 返回創建好的線程池值.創建失敗返回NULL */ threadpool_t sp_new(int size) { struct threadpool* pool; // 錯誤判斷,有點丑陋, 申請內存並初始化 if((size <= 0) || !(pool = calloc(1, sizeof(struct threadpool)))){ CERR("struct threadpool calloc is error!"); return NULL; } pool->size = size; pthread_mutex_init(&pool->mutex, NULL); return pool; } // 線程運行的時候執行函數 static void* _consumer(struct threadpool* pool) { struct threadjob* job; int status; pthread_t tid = pthread_self(); pthread_mutex_t* mtx = &pool->mutex; pthread_cond_t* cnd; //設置線程屬性, 默認線程屬性 允許退出線程 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); //設置立即取消 pthread_detach(tid); //設置線程分離,自銷毀 // 改消費者線程加鎖, 並且得到當前線程的條件變量,vdel_f為了linux上消除警告 pthread_cleanup_push((vdel_f)pthread_mutex_unlock, mtx); pthread_mutex_lock(mtx); cnd = _thread_get(pool, tid); __loop: if(pool->head != NULL) { // 有多線程任務,取出數據從下面處理 job = pool->head; pool->head = job->next; if(pool->tail == job) pool->tail = NULL; // 解鎖, 允許其它消費者線程加鎖或生產線程添加新任務 pthread_mutex_unlock(mtx); // 回調函數,後面再去刪除這個任務 job->run(job->arg); free(job); // 新的一輪開始需要重新加鎖 pthread_mutex_lock(mtx); goto __loop; } // 這裡相當於 if 的 else, pool->first == NULL ++pool->idle; // 調用pthread_cond_wait 等待線程條件變量被通知且自動解鎖 status = pthread_cond_wait(cnd, mtx); --pool->idle; if(status == 0) //等待成功了,那就開始輪序處理任務 goto __loop; //到這裡是程序出現異常, 進程退出中, 先減少當前線程 --pool->curr; //去掉這個線程鏈表pool->threads中對應數據 _thread_del(pool, cnd); pthread_mutex_unlock(mtx); pthread_cleanup_pop(0); return NULL; } /* * 在當前線程池中添加待處理的線程對象. * pool : 線程池對象, sp_new 創建的那個 * run : 運行的函數體, 返回值void, 參數void* * arg : 傳入運行的參數 * : 不需要返回值 */ void sp_add(threadpool_t pool, vdel_f run, void* arg) { struct threadjob* job = _new_threadjob(run, arg); pthread_mutex_t* mtx = &pool->mutex; pthread_mutex_lock(mtx); if(!pool->head) //線程池中沒有線程頭,那就設置線程頭 pool->head = job; else pool->tail->next = job; pool->tail = job; // 有空閒線程,添加到處理任務隊列中,直接返回 if(pool->idle > 0){ pthread_mutex_unlock(mtx); // 這是一種算法, 先釋放鎖後發送信號激活線程,速度快,缺點喪失線程執行優先級 pthread_cond_signal(&pool->threads->cond); } else if(pool->curr < pool->size){ // 沒有那就新建線程, 條件不滿足那就等待 pthread_t tid; if(pthread_create(&tid, NULL, (void* (*)(void*))_consumer, pool) == 0) ++pool->curr; //添加開啟線程的信息 _thread_add(pool, tid); pthread_mutex_unlock(mtx); } } /* * 銷毀當前線程池,釋放內存,並嘗試停止線程池中線程. * ppopl : 指向 sp_new創建的對象的指針 * : 沒有返回值 */ void sp_del(threadpool_t* ppool) { struct threadpool* pool; struct thread* thread; struct threadjob* head; if((!ppool) || !(pool = *ppool)) return; //加鎖,等待完全占有鎖的時候再去釋放資源 pthread_mutex_lock(&pool->mutex); //先釋放線程 thread = pool->threads; while(thread){ struct thread* next = thread->next; pthread_cancel(thread->tid); pthread_cond_destroy(&thread->cond); free(thread); thread = next; } //再來釋放任務列表 head = pool->head; while(head) { struct threadjob* next = head->next; free(head); head = next; } pthread_mutex_unlock(&pool->mutex); //最後要銷毀這個使用的線程鎖對象 pthread_mutex_destroy(&pool->mutex); *ppool = NULL; } View Code

 

test_scpthread.c

#include <scpthread.h> //全局計時器,存在鎖問題 static int _old; //簡單的線程打印函數 static void _ppt(const char* str) { printf("%d => %s\n", ++_old, str); } //另一個線程測試函數 static void _doc(void* arg) { printf("p = %d, 技術不決定項目的成敗!我老大哭了\n", ++_old); } // 測試開啟線程量集 #define _INT_THS (10000) int main(void) { int i; //創建線程池 threadpool_t pool = sp_NEW(); //添加任務到線程池中 for(i=0; i<_INT_THS; ++i){ sp_ADD(pool, _ppt, "你為你負責的項目拼命過嗎.流過淚嗎"); sp_ADD(pool, _doc, NULL); } //等待5s 再結束吧 SLEEPMS(5000); //清除當前線程池資源, 實戰上線程池是常駐內存,不要清除. sp_del(&pool); return 0; } View Code

 

Makefile

C = gcc DEBUG = -g -Wall -D_DEBUG #指定pthread線程庫 LIB = -lpthread -lm #指定一些目錄 DIR = -I./module/schead/include -I./module/struct/include -I./module/service/include #具體運行函數 RUN = $(CC) $(DEBUG) -o $@ $^ $(LIB) $(DIR) RUNO = $(CC) $(DEBUG) -c -o $@ $^ $(DIR) # 主要生成的產品 all:test_cjson_write.out test_csjon.out test_csv.out test_json_read.out test_log.out\ test_scconf.out test_tstring.out test_sctimer.out test_scpthread.out #挨個生產的產品 test_cjson_write.out:test_cjson_write.o schead.o sclog.o tstring.o cjson.o $(RUN) test_csjon.out:test_csjon.o schead.o sclog.o tstring.o cjson.o $(RUN) test_csv.out:test_csv.o schead.o sclog.o sccsv.o tstring.o $(RUN) test_json_read.out:test_json_read.o schead.o sclog.o sccsv.o tstring.o cjson.o $(RUN) test_log.out:test_log.o schead.o sclog.o $(RUN) test_scconf.out:test_scconf.o schead.o scconf.o tree.o tstring.o sclog.o $(RUN) test_tstring.out:test_tstring.o tstring.o sclog.o schead.o $(RUN) test_sctimer.out:test_sctimer.o schead.o sctimer.o $(RUN) test_scpthread.out:test_scpthread.o scpthread.o $(RUN) #產品主要的待鏈接文件 test_cjson_write.o:./main/test_cjson_write.c $(RUNO) test_csjon.o:./main/test_csjon.c $(RUNO) test_csv.o:./main/test_csv.c $(RUNO) test_json_read.o:./main/test_json_read.c $(RUNO) test_log.o:./main/test_log.c $(RUNO) -std=gnu99 test_scconf.o:./main/test_scconf.c $(RUNO) test_tstring.o:./main/test_tstring.c $(RUNO) test_sctimer.o:./main/test_sctimer.c $(RUNO) test_scpthread.o:./main/test_scpthread.c $(RUNO) #工具集機械碼,待別人鏈接 schead.o:./module/schead/schead.c $(RUNO) sclog.o:./module/schead/sclog.c $(RUNO) sccsv.o:./module/schead/sccsv.c $(RUNO) tstring.o:./module/struct/tstring.c $(RUNO) cjson.o:./module/schead/cjson.c $(RUNO) scconf.o:./module/schead/scconf.c $(RUNO) tree.o:./module/struct/tree.c $(RUNO) sctimer.o:./module/service/sctimer.c $(RUNO) scpthread.o:./module/service/scpthread.c $(RUNO) #刪除命令 clean: rm -rf *.i *.s *.o *.out __* log ; ls -hl .PHONY:clean View Code

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved