(代碼有參考網上的一些實現)還有幾個功能需要慢慢的實現和一些bug需要改,比如實現線程池的動態增長:
詳細請看我的github: https://github.com/chengshuguang/thread-pool
thread_pool.h
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
typedef struct task
{
void *(*taskfunc)(void *arg);//聲明一個函數指針
void *arg;//函數的參數
struct task *next;
}task;
typedef struct thread_pool
{
task *task_queue_head;//任務隊列
task *task_queue_end;//指向任務隊列結尾
int task_queue_size;
pthread_t *thread_queue;//線程隊列
int thread_num;
int idle_thread_num;//空閒線程數
int is_pool_destroyed;
pthread_mutex_t queue_mutex;//用來互斥訪問任務隊列
pthread_cond_t queue_cond;
}thread_pool;
#ifdef __cplusplus
extern "C"{
#endif
extern thread_pool *pool;
extern int thread_pool_init(int thread_pool_size);
//extern void * thread_pool_entrance(void *arg);
extern int thread_pool_add_task(void *(*taskfunc)(void *arg), void *arg);
extern int thread_pool_destroy();
#ifdef __cplusplus
}
#endif
#endif //THREAD_POOL_H
thread_pool.c
#include "thread_pool.h"
#include <pthread.h>
thread_pool *pool = NULL;
void * thread_pool_entrance(void *arg)
{
int thread_id = (int)arg;
printf("thread %d is created\n",thread_id);
while(1)
{
pthread_mutex_lock(&(pool->queue_mutex));
while(pool->task_queue_size == 0 && !pool->is_pool_destroyed)//必須用while,防止假喚醒
{
pthread_cond_wait(&(pool->queue_cond),&(pool->queue_mutex));//等待的時候會解鎖,喚醒後加鎖
}
if(pool->is_pool_destroyed)
{
printf("thread %d exit!!!\n",thread_id);
pthread_mutex_unlock(&(pool->queue_mutex));//中途退出最容易出錯,注意要解鎖
pthread_exit(NULL);
}
pool->idle_thread_num--;//線程進入忙碌狀態
//從任務隊列中取出任務
task *work;
work = pool->task_queue_head;
pool->task_queue_head = pool->task_queue_head->next;
if(pool->task_queue_head == NULL)
pool->task_queue_end = NULL;
pool->task_queue_size--;
pthread_mutex_unlock(&(pool->queue_mutex));
//回調函數
(*(work->taskfunc))(work->arg);
pool->idle_thread_num++;//線程空閒
}
return NULL;
}
int thread_pool_init(int thread_pool_size)
{
pool = (thread_pool *)malloc(sizeof(thread_pool));//不要最先給線程池分配空間
pool->is_pool_destroyed = 0;
pool->task_queue_head = NULL;
pool->task_queue_end = NULL;
pool->task_queue_size = 0;
pool->thread_num = thread_pool_size;
pool->thread_queue = (pthread_t *)malloc(thread_pool_size * sizeof(pthread_t));
pool->idle_thread_num = thread_pool_size;
//創建線程
int i, ret;
for(i=0; i<thread_pool_size; i++)
{
ret = pthread_create(&(pool->thread_queue[i]), NULL, thread_pool_entrance, (void *)i);
if(ret < 0)
{
printf("thread create error!!!\n");
thread_pool_destroy();//注意銷毀,避免內存洩漏
return -1;
}
}
pthread_mutex_init(&(pool->queue_mutex), NULL);
pthread_cond_init(&(pool->queue_cond), NULL);
return 0;
}
typedef void *(*taskfunc)(void *arg);
int thread_pool_add_task(taskfunc func, void *arg)
{
task *newtask;
newtask = (task *)malloc(sizeof(task));
newtask->taskfunc = func;
newtask->arg = arg;
newtask->next = NULL;
pthread_mutex_lock(&(pool->queue_mutex));
if(pool->task_queue_head == NULL)
{
pool->task_queue_head = pool->task_queue_end = newtask;
}
else
{
pool->task_queue_end = pool->task_queue_end->next = newtask;
}
pool->task_queue_size++;
pthread_cond_signal(&(pool->queue_cond));
pthread_mutex_unlock(&(pool->queue_mutex));
return 0;
}
int thread_pool_destroy()
{
if(pool->is_pool_destroyed)//防止多次銷毀
return -1;
pool->is_pool_destroyed = 1;
pthread_cond_broadcast(&(pool->queue_cond));//通知所有線程線程池銷毀了
int i;
for(i=0; i<pool->thread_num; i++)//等待線程全部執行完
pthread_join(pool->thread_queue[i], NULL);
//銷毀任務隊列
task *temp = NULL;
while(pool->task_queue_head)
{
temp = pool->task_queue_head;
pool->task_queue_head = pool->task_queue_head->next;
free(temp);
}
//pool->task_queue_head = NULL;
//pool->task_queue_end = NULL;
//銷毀線程隊列
free(pool->thread_queue);
pool->thread_queue = NULL;
pthread_mutex_destroy(&(pool->queue_mutex));
pthread_cond_destroy(&(pool->queue_cond));
free(pool);
pool = NULL;
return 0;
}
#include "thread_pool.h"
#include <stdio.h>
void *taskprocess(void *arg)
{
printf("aaaaaadoing tasksaaaaaaaaa\n");
usleep(1000);
return NULL;
}
int main()
{
thread_pool_init(5);
int i;
for(i=1; i<=10; i++)
{
thread_pool_add_task(taskprocess,(void *)i);
usleep(1000);
}
sleep(1);
thread_pool_destroy();
return 0;
}