程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> 線程池的簡單實現,線程池簡單實現

線程池的簡單實現,線程池簡單實現

編輯:關於C語言

線程池的簡單實現,線程池簡單實現


幾個基本的線程函數:

線程操縱函數
創建:         int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg);
終止自身        void pthread_exit(void *retval);
終止其他:       int pthread_cancel(pthread_t tid);    發送終止信號後目標線程不一定終止,要調用join函數等待
阻塞並等待其他線程:int pthread_join(pthread_t tid, void **retval);

屬性
初始化:   int pthread_attr_init(pthread_attr_t *attr);
設置分離狀態:   pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)
銷毀屬性:    int pthread_attr_destroy(pthread_attr_t *attr);

同步函數
互斥鎖
初始化:    int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
銷毀鎖:    int pthread_mutex_destroy(pthread_mutex_t *mutex);
加鎖:        pthread_mutex_lock(pthread_mutex_t *mutex);
嘗試加鎖: int pthread_mutex_trylock(pthread_mutex_t *mutex);
解鎖:      int pthread_mutex_unlock(pthread_mutex_t *mutex);

條件變量
初始化:int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr);
銷毀:   int pthread_cond_destroy(pthread_cond_t *cond);
等待條件:  pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
通知:       pthread_cond_signal(pthread_cond_t *cond);  喚醒第一個調用pthread_cond_wait()而進入睡眠的線程

工具函數
比較線程ID:     int pthread_equal(pthread_t t1, pthread_t t2);
分離線程:          pthread_detach(pthread_t tid);
自身ID: pthread_t pthread_self(void);

 

  1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <pthread.h>    //linux環境中多線程的頭文件,非C語言標准庫,編譯時最後要加 -lpthread 調用動態鏈接庫
  4 
  5 //工作鏈表的結構
  6 typedef struct worker {
  7     void *(*process) (void *arg);   //工作函數
  8     void *arg;                      //函數的參數
  9     struct worker *next;
 10 }CThread_worker;
 11 
 12 //線程池的結構
 13 typedef struct {
 14     pthread_mutex_t queue_lock;     //互斥鎖
 15     pthread_cond_t  queue_ready;    //條件變量/信號量
 16 
 17     CThread_worker *queue_head;     //指向工作鏈表的頭結點,臨界區
 18     int cur_queue_size;             //記錄鏈表中工作的數量,臨界區
 19 
 20     int max_thread_num;             //最大線程數
 21     pthread_t *threadid;            //線程ID
 22 
 23     int shutdown;                   //開關
 24 }CThread_pool;
 25 
 26 static CThread_pool *pool = NULL;   //一個線程池變量
 27 int pool_add_worker(void *(*process)(void *arg), void *arg);    //負責向工作鏈表中添加工作
 28 void *thread_routine(void *arg);    //線程例程
 29 
 30 //線程池初始化
 31 void
 32 pool_init(int max_thread_num)
 33 {
 34     int i = 0;
 35 
 36     pool = (CThread_pool *) malloc (sizeof (CThread_pool));    //創建線程池
 37 
 38     pthread_mutex_init(&(pool->queue_lock),  NULL);     //互斥鎖初始化,參數為鎖的地址
 39     pthread_cond_init( &(pool->queue_ready), NULL);     //條件變量初始化,參數為變量地址
 40 
 41     pool->queue_head = NULL;
 42     pool->cur_queue_size = 0;
 43 
 44     pool->max_thread_num = max_thread_num;
 45     pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
 46     for (i = 0; i < max_thread_num; i++) {
 47         pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);  //創建線程, 參數為線程ID變量地址、屬性、例程、參數
 48     }
 49 
 50     pool->shutdown = 0;
 51 }
 52 
 53 //例程,調用具體的工作函數
 54 void *
 55 thread_routine(void *arg)
 56 {
 57     printf("starting thread 0x%x\n", (int)pthread_self());
 58     while(1) {
 59         pthread_mutex_lock(&(pool->queue_lock));    //從工作鏈表中取工作,要先加互斥鎖,參數為鎖地址
 60 
 61         while(pool->cur_queue_size == 0 && !pool->shutdown) {       //鏈表為空
 62             printf("thread 0x%x is waiting\n", (int)pthread_self());
 63             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));   //等待資源,信號量用於通知。會釋放第二個參數的鎖,以供添加;函數返回時重新加鎖。
 64         }
 65 
 66         if(pool->shutdown) {
 67             pthread_mutex_unlock(&(pool->queue_lock));          //結束開關開啟,釋放鎖並退出線程
 68             printf("thread 0x%x will exit\n", (int)pthread_self());
 69             pthread_exit(NULL);     //參數為void *
 70         }
 71 
 72         printf("thread 0x%x is starting to work\n", (int)pthread_self());
 73 
 74         --pool->cur_queue_size;
 75         CThread_worker *worker = pool->queue_head;
 76         pool->queue_head = worker->next;
 77 
 78         pthread_mutex_unlock (&(pool->queue_lock));     //獲取一個工作後釋放鎖
 79 
 80 
 81         (*(worker->process))(worker->arg);      //做工作
 82         free(worker);
 83         worker = NULL;
 84     }
 85 
 86     pthread_exit(NULL);
 87 }
 88 
 89 //銷毀線程池
 90 int
 91 pool_destroy()
 92 {
 93     if(pool->shutdown)      //檢測結束開關是否開啟,若開啟,則所有線程會自動退出
 94         return -1;
 95     pool->shutdown = 1;
 96 
 97     pthread_cond_broadcast( &(pool->queue_ready) );     //廣播,喚醒所有線程,准備退出
 98 
 99     int i;
100     for(i = 0; i < pool->max_thread_num; ++i)
101         pthread_join(pool->threadid[i], NULL);      //主線程等待所有線程退出,只有join第一個參數不是指針,第二個參數類型是void **,接收exit的返回值,需要強制轉換
102     free(pool->threadid);
103 
104     CThread_worker *head = NULL;
105     while(pool->queue_head != NULL) {           //釋放未執行的工作鏈表剩余結點
106         head = pool->queue_head;
107         pool->queue_head = pool->queue_head->next;
108         free(head);
109     }
110 
111     pthread_mutex_destroy(&(pool->queue_lock));     //銷毀鎖和條件變量
112     pthread_cond_destroy(&(pool->queue_ready));
113 
114     free(pool);
115     pool=NULL;
116     return 0;
117 }
118 
119 void *
120 myprocess(void *arg)
121 {
122     printf("threadid is 0x%x, working on task %d\n", (int)pthread_self(), *(int*)arg);
123     sleep (1);
124     return NULL;
125 }
126 
127 //添加工作
128 int
129 pool_add_worker(void *(*process)(void *arg), void *arg)
130 {
131     CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));
132     newworker->process = process;   //具體的工作函數
133     newworker->arg = arg;
134     newworker->next = NULL;
135 
136     pthread_mutex_lock( &(pool->queue_lock) );      //加鎖
137 
138     CThread_worker *member = pool->queue_head;      //插入鏈表尾部
139     if( member != NULL ) {
140         while( member->next != NULL )
141             member = member->next;
142         member->next = newworker;
143     }
144     else {
145         pool->queue_head = newworker;
146     }
147     ++pool->cur_queue_size;
148 
149     pthread_mutex_unlock( &(pool->queue_lock) );   //解鎖
150 
151     pthread_cond_signal( &(pool->queue_ready) );   //通知一個等待的線程
152     return 0;
153 }
154 
155 int
156 main(int argc, char **argv)
157 {
158     pool_init(3);   //主線程創建線程池,3個線程
159 
160     int *workingnum = (int *) malloc(sizeof(int) * 10);
161     int i;
162     for(i = 0; i < 10; ++i) {
163         workingnum[i] = i;
164         pool_add_worker(myprocess, &workingnum[i]);     //主線程負責添加工作,10個工作
165     }
166 
167     sleep (5);
168     pool_destroy();     //銷毀線程池
169     free (workingnum);
170 
171     return 0;
172 }

 


java 怎實現線程池

最簡單的可以利用java.util.concurrent.Executors
調用Executors.newCachedThreadPool()獲取緩沖式線程池
Executors.newFixedThreadPool(int nThreads)獲取固定大小的線程池
 

[轉]幾種開源Java Web容器線程池的實現方法簡介—Tomcat(一)

其中Resin從V3.0後需要購買才能用於商業目的,而其他兩種則是純開源的。可以分別從他們的網站上下載最新的二進制包和源代碼。
作為Web容器,需要承受較高的訪問量,能夠同時響應不同用戶的請求,能夠在惡劣環境下保持較高的穩定性和健壯性。在HTTP服務器領域,ApacheHTTPD的效率是最高的,也是最為穩定的,但它只能處理靜態頁面的請求,如果需要支持動態頁面請求,則必須安裝相應的插件,比如mod_perl可以處理Perl腳本,mod_python可以處理Python腳本。
上面介紹的三中Web容器,都是使用Java編寫的HTTP服務器,當然他們都可以嵌到Apache中使用,也可以獨立使用。分析它們處理客戶請求的方法有助於了解Java多線程和線程池的實現方法,為設計強大的多線程服務器打好基礎。
Tomcat是使用最廣的Java Web容器,功能強大,可擴展性強。最新版本的Tomcat(5.5.17)為了提高響應速度和效率,使用了Apache Portable Runtime(APR)作為最底層,使用了APR中包含Socket、緩沖池等多種技術,性能也提高了。APR也是Apache HTTPD的最底層。可想而知,同屬於ASF(Apache Software Foundation)中的成員,互補互用的情況還是很多的,雖然使用了不同的開發語言。
Tomcat 的線程池位於tomcat-util.jar文件中,包含了兩種線程池方案。方案一:使用APR的Pool技術,使用了JNI;方案二:使用Java實現的ThreadPool。這裡介紹的是第二種。如果想了解APR的Pool技術,可以查看APR的源代碼。
ThreadPool默認創建了5個線程,保存在一個200維的線程數組中,創建時就啟動了這些線程,當然在沒有請求時,它們都處理等待狀態(其實就是一個while循環,不停的等待notify)。如果有請求時,空閒線程會被喚醒執行用戶的請求。
具體的請求過程是:服務啟動時,創建一個一維線程數組(maxThread=200個),並創建空閒線程(minSpareThreads=5個)隨時等待用戶請求。當有用戶請求時,調用 threadpool.runIt(ThreadPoolRunnable)方法,將一個需要執行的實例傳給ThreadPool中。其中用戶需要執行的實例必須實現ThreadPoolRunnable接口。 ThreadPool首先查找空閒的線程,如果有則用它運行要執行ThreadPoolRunnable;如果沒有空閒線程並且沒有超過 maxThreads,就一次性創建minSpareThreads個空閒線程;如果已經超過了maxThreads了,就等待空閒線程了。總之,要找到空閒的線程,以便用它執行實例。找到後,將該線程從線程數組中移走。接著喚醒已經找到的空閒線程,用它運行執行實例(ThreadPoolRunnable)。運行完ThreadPoolRunnable後,就將該線程重新放到線程數組中,作為空閒線程供後續使用。
由此可以看出,Tomcat的線程池實現是比較簡單的,ThreadPool.java也只有840行代碼。用一個一維數組保存空閒的線程,每次以一個較小步伐(5個)創建空閒線程並放到線程池中。使用時從數組中移走空閒的線程,用完後,再歸還給線程池。
 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved