幾個基本的線程函數:
線程操縱函數
創建: int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg);
終止自身 void pthread_exit(void *retval);
終止其他: int pthread_cancel(pthread_t tid); 發送終止信號後目標線程不一定終止,要調用join函數等待
阻塞並等待其他線程:int pthread_join(pthread_t tid, void **retval);
屬性
初始化: int pthread_attr_init(pthread_attr_t *attr);
設置分離狀態: pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)
銷毀屬性: int pthread_attr_destroy(pthread_attr_t *attr);
同步函數
互斥鎖
初始化: int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
銷毀鎖: int pthread_mutex_destroy(pthread_mutex_t *mutex);
加鎖: pthread_mutex_lock(pthread_mutex_t *mutex);
嘗試加鎖: int pthread_mutex_trylock(pthread_mutex_t *mutex);
解鎖: int pthread_mutex_unlock(pthread_mutex_t *mutex);
條件變量
初始化:int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr);
銷毀: int pthread_cond_destroy(pthread_cond_t *cond);
等待條件: pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
通知: pthread_cond_signal(pthread_cond_t *cond); 喚醒第一個調用pthread_cond_wait()而進入睡眠的線程
工具函數
比較線程ID: int pthread_equal(pthread_t t1, pthread_t t2);
分離線程: pthread_detach(pthread_t tid);
自身ID: pthread_t pthread_self(void);
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <pthread.h> //linux環境中多線程的頭文件,非C語言標准庫,編譯時最後要加 -lpthread 調用動態鏈接庫
4
5 //工作鏈表的結構
6 typedef struct worker {
7 void *(*process) (void *arg); //工作函數
8 void *arg; //函數的參數
9 struct worker *next;
10 }CThread_worker;
11
12 //線程池的結構
13 typedef struct {
14 pthread_mutex_t queue_lock; //互斥鎖
15 pthread_cond_t queue_ready; //條件變量/信號量
16
17 CThread_worker *queue_head; //指向工作鏈表的頭結點,臨界區
18 int cur_queue_size; //記錄鏈表中工作的數量,臨界區
19
20 int max_thread_num; //最大線程數
21 pthread_t *threadid; //線程ID
22
23 int shutdown; //開關
24 }CThread_pool;
25
26 static CThread_pool *pool = NULL; //一個線程池變量
27 int pool_add_worker(void *(*process)(void *arg), void *arg); //負責向工作鏈表中添加工作
28 void *thread_routine(void *arg); //線程例程
29
30 //線程池初始化
31 void
32 pool_init(int max_thread_num)
33 {
34 int i = 0;
35
36 pool = (CThread_pool *) malloc (sizeof (CThread_pool)); //創建線程池
37
38 pthread_mutex_init(&(pool->queue_lock), NULL); //互斥鎖初始化,參數為鎖的地址
39 pthread_cond_init( &(pool->queue_ready), NULL); //條件變量初始化,參數為變量地址
40
41 pool->queue_head = NULL;
42 pool->cur_queue_size = 0;
43
44 pool->max_thread_num = max_thread_num;
45 pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
46 for (i = 0; i < max_thread_num; i++) {
47 pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); //創建線程, 參數為線程ID變量地址、屬性、例程、參數
48 }
49
50 pool->shutdown = 0;
51 }
52
53 //例程,調用具體的工作函數
54 void *
55 thread_routine(void *arg)
56 {
57 printf("starting thread 0x%x\n", (int)pthread_self());
58 while(1) {
59 pthread_mutex_lock(&(pool->queue_lock)); //從工作鏈表中取工作,要先加互斥鎖,參數為鎖地址
60
61 while(pool->cur_queue_size == 0 && !pool->shutdown) { //鏈表為空
62 printf("thread 0x%x is waiting\n", (int)pthread_self());
63 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); //等待資源,信號量用於通知。會釋放第二個參數的鎖,以供添加;函數返回時重新加鎖。
64 }
65
66 if(pool->shutdown) {
67 pthread_mutex_unlock(&(pool->queue_lock)); //結束開關開啟,釋放鎖並退出線程
68 printf("thread 0x%x will exit\n", (int)pthread_self());
69 pthread_exit(NULL); //參數為void *
70 }
71
72 printf("thread 0x%x is starting to work\n", (int)pthread_self());
73
74 --pool->cur_queue_size;
75 CThread_worker *worker = pool->queue_head;
76 pool->queue_head = worker->next;
77
78 pthread_mutex_unlock (&(pool->queue_lock)); //獲取一個工作後釋放鎖
79
80
81 (*(worker->process))(worker->arg); //做工作
82 free(worker);
83 worker = NULL;
84 }
85
86 pthread_exit(NULL);
87 }
88
89 //銷毀線程池
90 int
91 pool_destroy()
92 {
93 if(pool->shutdown) //檢測結束開關是否開啟,若開啟,則所有線程會自動退出
94 return -1;
95 pool->shutdown = 1;
96
97 pthread_cond_broadcast( &(pool->queue_ready) ); //廣播,喚醒所有線程,准備退出
98
99 int i;
100 for(i = 0; i < pool->max_thread_num; ++i)
101 pthread_join(pool->threadid[i], NULL); //主線程等待所有線程退出,只有join第一個參數不是指針,第二個參數類型是void **,接收exit的返回值,需要強制轉換
102 free(pool->threadid);
103
104 CThread_worker *head = NULL;
105 while(pool->queue_head != NULL) { //釋放未執行的工作鏈表剩余結點
106 head = pool->queue_head;
107 pool->queue_head = pool->queue_head->next;
108 free(head);
109 }
110
111 pthread_mutex_destroy(&(pool->queue_lock)); //銷毀鎖和條件變量
112 pthread_cond_destroy(&(pool->queue_ready));
113
114 free(pool);
115 pool=NULL;
116 return 0;
117 }
118
119 void *
120 myprocess(void *arg)
121 {
122 printf("threadid is 0x%x, working on task %d\n", (int)pthread_self(), *(int*)arg);
123 sleep (1);
124 return NULL;
125 }
126
127 //添加工作
128 int
129 pool_add_worker(void *(*process)(void *arg), void *arg)
130 {
131 CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker));
132 newworker->process = process; //具體的工作函數
133 newworker->arg = arg;
134 newworker->next = NULL;
135
136 pthread_mutex_lock( &(pool->queue_lock) ); //加鎖
137
138 CThread_worker *member = pool->queue_head; //插入鏈表尾部
139 if( member != NULL ) {
140 while( member->next != NULL )
141 member = member->next;
142 member->next = newworker;
143 }
144 else {
145 pool->queue_head = newworker;
146 }
147 ++pool->cur_queue_size;
148
149 pthread_mutex_unlock( &(pool->queue_lock) ); //解鎖
150
151 pthread_cond_signal( &(pool->queue_ready) ); //通知一個等待的線程
152 return 0;
153 }
154
155 int
156 main(int argc, char **argv)
157 {
158 pool_init(3); //主線程創建線程池,3個線程
159
160 int *workingnum = (int *) malloc(sizeof(int) * 10);
161 int i;
162 for(i = 0; i < 10; ++i) {
163 workingnum[i] = i;
164 pool_add_worker(myprocess, &workingnum[i]); //主線程負責添加工作,10個工作
165 }
166
167 sleep (5);
168 pool_destroy(); //銷毀線程池
169 free (workingnum);
170
171 return 0;
172 }
最簡單的可以利用java.util.concurrent.Executors
調用Executors.newCachedThreadPool()獲取緩沖式線程池
Executors.newFixedThreadPool(int nThreads)獲取固定大小的線程池
其中Resin從V3.0後需要購買才能用於商業目的,而其他兩種則是純開源的。可以分別從他們的網站上下載最新的二進制包和源代碼。
作為Web容器,需要承受較高的訪問量,能夠同時響應不同用戶的請求,能夠在惡劣環境下保持較高的穩定性和健壯性。在HTTP服務器領域,ApacheHTTPD的效率是最高的,也是最為穩定的,但它只能處理靜態頁面的請求,如果需要支持動態頁面請求,則必須安裝相應的插件,比如mod_perl可以處理Perl腳本,mod_python可以處理Python腳本。
上面介紹的三中Web容器,都是使用Java編寫的HTTP服務器,當然他們都可以嵌到Apache中使用,也可以獨立使用。分析它們處理客戶請求的方法有助於了解Java多線程和線程池的實現方法,為設計強大的多線程服務器打好基礎。
Tomcat是使用最廣的Java Web容器,功能強大,可擴展性強。最新版本的Tomcat(5.5.17)為了提高響應速度和效率,使用了Apache Portable Runtime(APR)作為最底層,使用了APR中包含Socket、緩沖池等多種技術,性能也提高了。APR也是Apache HTTPD的最底層。可想而知,同屬於ASF(Apache Software Foundation)中的成員,互補互用的情況還是很多的,雖然使用了不同的開發語言。
Tomcat 的線程池位於tomcat-util.jar文件中,包含了兩種線程池方案。方案一:使用APR的Pool技術,使用了JNI;方案二:使用Java實現的ThreadPool。這裡介紹的是第二種。如果想了解APR的Pool技術,可以查看APR的源代碼。
ThreadPool默認創建了5個線程,保存在一個200維的線程數組中,創建時就啟動了這些線程,當然在沒有請求時,它們都處理等待狀態(其實就是一個while循環,不停的等待notify)。如果有請求時,空閒線程會被喚醒執行用戶的請求。
具體的請求過程是:服務啟動時,創建一個一維線程數組(maxThread=200個),並創建空閒線程(minSpareThreads=5個)隨時等待用戶請求。當有用戶請求時,調用 threadpool.runIt(ThreadPoolRunnable)方法,將一個需要執行的實例傳給ThreadPool中。其中用戶需要執行的實例必須實現ThreadPoolRunnable接口。 ThreadPool首先查找空閒的線程,如果有則用它運行要執行ThreadPoolRunnable;如果沒有空閒線程並且沒有超過 maxThreads,就一次性創建minSpareThreads個空閒線程;如果已經超過了maxThreads了,就等待空閒線程了。總之,要找到空閒的線程,以便用它執行實例。找到後,將該線程從線程數組中移走。接著喚醒已經找到的空閒線程,用它運行執行實例(ThreadPoolRunnable)。運行完ThreadPoolRunnable後,就將該線程重新放到線程數組中,作為空閒線程供後續使用。
由此可以看出,Tomcat的線程池實現是比較簡單的,ThreadPool.java也只有840行代碼。用一個一維數組保存空閒的線程,每次以一個較小步伐(5個)創建空閒線程並放到線程池中。使用時從數組中移走空閒的線程,用完後,再歸還給線程池。