http線程池的主要用途是異步處理使用無狀態短連接的http請求,在傳輸層通信基於tcp協議和應用層基於http協議的基礎上,達到c++服務器與web服務器通信的目的。
設計上:
(1)服務器啟動時,初始化配置數量的線程(形成被動連接線程池)。每個線程會生成epoll描述符。
(2)主線程生成監聽socket,綁定端口。生成epoll描述符,注冊監聽socket,非阻塞接收(限定最大時間,如2s)新連接到連接隊列。
(2)投放主線程連接隊列中的新連接到被動連接線程池。根據硬哈希選擇需求的線程來投放。加入後需要注冊連接socket(注冊時連接對象作為epoll事件的攜帶數據)到線程的epoll描述符。
(3)在每個線程的例程裡會非阻塞監聽epoll描述符上發生的讀事件,並解析和處理獲取的http請求。
這樣每個業務線程可以相對獨立的處理無狀態的http請求。跟單業務線程的場景不同的是,http線程池的線程之間盡量減少數據共享(實在需要緩存在內存則加鎖),每個線程又可以作為客戶端短時間阻塞向其他服務器請求數據。
http線程池代碼如下:(大致上http線程池的思路可以看得出來。主線程接收連接對象和連接對象接收數據並沒有在這裡展現實現過程。注意接收時需要忽略EINTR和SIGPIPE信號,如果接收返回-1且錯誤號為EAGAIN或EWOULDBLOCK,說明接收緩沖區滿了,需要繼續嘗試接收,直到超時。接收返回0則表示對方斷開連接,則接收失敗。接收成功、失敗、超時都需要移除連接對象(epoll描述符注銷連接socket、關閉socket、移出和銷毀連接對象),因為是短連接)
線程池頭文件
/**
* rief 定義實現輕量級(lightweight)的http服務框架類
*/
class zHttpTaskPool : private zNoncopyable
{
public:
/**
* rief 構造函數
*/
zHttpTaskPool()
{
}
/**
* rief 析構函數,銷毀一個線程池對象
*
*/
~zHttpTaskPool()
{
final();
}
bool addHttp(zHttpTask *task);
bool init();
void final();
private:
static const int maxHttpThreads = 16; /**< 最大驗證線程數量 */
zThreadGroup httpThreads; /**< http服務處理線程組 */
};
/**
* rief 輕量級http服務的主處理線程
*/
class zHttpThread : public zThread
{
private:
/**
* rief http連接任務鏈表類型
*/
typedef std::list zHttpTaskContainer;
/**
* rief epoll事件結構向量類型
*/www.Bkjia.com
typedef std::vector epollfdContainer;
zHttpTaskPool *pool; /**< 所屬的池 */
zRTime currentTime; /**< 當前時間 */
zMutex mutex; /**< 互斥變量 */
zHttpTaskContainer tasks; /**< 任務列表 */
int kdpfd;
epollfdContainer epfds;
epollfdContainer::size_type fds_count;
public:
/**
* rief 構造函數
* param pool 所屬的連接池
* param name 線程名稱
*/
zHttpThread(
zHttpTaskPool *pool,
const std::string &name = std::string(zHttpThread))
: zThread(name), pool(pool), currentTime()
{
kdpfd = epoll_create(256);
assert(-1 != kdpfd);
epfds.resize(256);
fds_count = 0;
}
/**
* rief 析構函數
*/
~zHttpThread()
{
TEMP_FAILURE_RETRY(::close(kdpfd));
}
void run();
/**
* rief 添加一個連接任務
* param task 連接任務
*/
void add(zHttpTask *task)
{
mutex.lock();
task->addEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI, (void *)task);
tasks.push_back(task);
++fds_count;
if (fds_count > epfds.size())
{
epfds.resize(fds_count + 16);
}
mutex.unlock();
}
typedef zHttpTask* zHttpTaskP;
void remove(zHttpTaskP &task)
{
task->delEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI);
tasks.remove(task);
SAFE_DELETE(task);
fds_count--;
}
void remove(zHttpTaskContainer::iterator &it)
{
zHttpTask *task = *it;
task->delEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI);
tasks.erase(it);
SAFE_DELETE(task);
fds_count--;
}
};
/**
* rief http線程例程
*/
void zHttpThread::run()
{
zHttpTaskContainer::iterator it, next;
while(!isFinal())
{
mutex.lock();
if (!tasks.empty())
{
int retcode = epoll_wait(kdpfd, &epfds[0], fds_count, 0);
if (retcode > 0)
{
for(int i = 0; i < retcode; ++i)
{
zHttpTask *task = (zHttpTask *)epfds[i].data.ptr;//獲取epoll事件的數據,即連接對象
if (epfds[i].events & (EPOLLERR | EPOLLPRI))//檢查epoll事件是否有錯
{
//套接口出現錯誤
remove(task);
}
else if (epfds[i].events & EPOLLIN)//檢查epoll事件是否是讀事件
{
switch(task->httpCore())//阻塞recv連接對象緩沖區數據
{
case 1: //接收成功
case -1: //接收失敗
remove(task);
break;
case 0: //接收超時,
break;
}
}
}
}
currentTime.now();
for(it = tasks.begin(), next = it, ++next; it != tasks.end(); it = next, ++next)//檢查短連接的連接對象,移除超時的連接對象
{
zHttpTask *task = *it;
if (task->checkHttpTimeout(currentTime))
{
//超過指定時間驗證還沒有通過,需要回收連接
remove(it);
}
}
}
mutex.unlock();
zThread::msleep(50);
}
//把所有等待驗證隊列中的連接加入到回收隊列中,回收這些連接
for(it = tasks.begin(), next = it, ++next; it != tasks.end(); it = next, ++next)
{
remove(it);
}
}
/**
* rief 把一個TCP連接添加到驗證隊列中,因為存在多個驗證隊列,需要按照一定的算法添加到不同的驗證處理隊列中
* param task 一個連接任務
*/
bool zHttpTaskPool::addHttp(zHttpTask *task)
{
//因為存在多個驗證隊列,需要按照一定的算法添加到不同的驗證處理隊列中
static unsigned int hashcode = 0;
zHttpThread *pHttpThread = (zHttpThread *)httpThreads.getByIndex(hashcode++ % maxHttpThreads);
if (pHttpThread)
pHttpThread->add(task);
return true;
}
/**
* rief 初始化線程池,預先創建各種線程
*
eturn 初始化是否成功
*/
bool zHttpTaskPool::init()
{
//創建初始化驗證線程
for(int i = 0; i < maxHttpThreads; ++i)
{
std::ostringstream name;
name << zHttpThread[ << i << ];
zHttpThread *pHttpThread = new zHttpThread(this, name.str());
if (NULL == pHttpThread)
return false;
if (!pHttpThread->start())
return false;
httpThreads.add(pHttpThread);
}
return true;
}
/**
* rief 釋放線程池,釋放各種資源,等待各種線程退出
*/
void zHttpTaskPool::final()
{
httpThreads.joinAll();
}