自己以前寫TCP服務器,並不需要考慮到並發與資源的問題,使用的都是單獨線程處理單個TCP連接的方式(說謂的PPC/TPC模型)。如今自己做高並發服務器,必須處理好這些問題。因為用的是linux2.6,因此選用epoll作為I/O多路復用技術接口再好不過了(呵呵呵)。
通俗地講,epoll就是:告訴你有哪些socket准備要做哪些事。在select模型中,select用來檢測socket狀態,兩者的用法大相徑庭,但是機制不同。select的檢測方法是每次遍歷所有需要檢測的socket,並返回有動作socket。而epoll的並不會檢測所有的句柄狀態,通過內核的支持,能避免無意義的檢測。
當socket句柄的數目特別大的情況下,首先PPC/TPC模型肯定就掛掉了。而select因為每次要遍歷所有句柄,因此在句柄遍歷的過程中占用了很多的時間,如果並發的數量接近句柄總數,select並沒有浪費太多時間,但對於並發數遠低於鏈接數的情況,比如回合制的網絡游戲,select就有浪費時間的嫌疑。因此epoll是相當高效的。
在將epoll封裝成c++類之前,對epoll的數據結構以及接口做一下簡單介紹:
epoll 事件結構體:
struct epoll_event {
__uint32_t events; // Epoll events
epoll_data_t data; // User datavariable
};
這裡的events是事件的類型,常用的有:
EPOLLIN 該句柄為可讀
EPOLLOUT 該句柄為可寫
EPOLLERR 該句柄發生錯誤
EPOLLET epoll為邊緣觸發模式
epoll 事件date
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
注意epoll_data是個union。我們想要掛上句柄或是數據指針都很方便。
epoll創建:
int epoll_create(int size);
調用該函數會創建一個epoll句柄,參數size為監聽的最大數量
epoll控制:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
這個接口用於對該epdf上的句柄進行注冊、修改和刪除。
op是要進行的操作,有:
EPOLL_CTL_ADD 添加需要監測的文件句柄fd
EPOLL_CTL_MOD 更改該fd句柄的模式
EPOLL_CTL_DEL 移除掉該句柄
event是所要設置的該fd的事件。
epoll收集信息:
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
調用該函數後,如果該有epoll所管理的句柄發生對應類型的事件,這些發生事件的句柄的epoll_event將會被寫入events數組中,我們便能根據這些句柄執行接下來的I/O以及其他操作。這裡的maxevents是每次wait獲取的事件最大數。如果使用的是ET邊緣觸發模式,epoll_wait返回一個事件後,再這個時間的狀態沒有改變的情況下,epoll_wait不會再對改事件進行通知。
epoll基本的介紹完,就可以先對epoll進行一定的封裝以增強代碼的復用。
在封裝epoll之前,我先給出我封裝好的用於tcp的socket:
//總共所需要用到的頭文件,有部分是多余的 #include<iostream> #include<cstdio> #include<cstring> #include<string> #include<cstdlib> #ifdef WIN32 #include<winsock2.h> #else #include<fcntl.h> #include<sys/ioctl.h> #include<sys/socket.h> #include<sys/epoll.h> #include<unistd.h> #include<netdb.h> #include<errno.h> #include<arpa/inet.h> #include<netinet/in.h> #include<sys/types.h> #define SOCKET int #define SOCKET_ERROR -1 #define INVALID_SOCKET -1 #endif
這裡是我自己對普通tcp socket的封裝:
class msock
{
public:
SOCKET sock;
sockaddr_in addr;
msock()
{
addr.sin_family=AF_INET;
}
void setsock(SOCKET fd)
{
sock=fd;
}
SOCKET getsock()
{
return sock;
}
void createsock()
{
sock=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(sock==INVALID_SOCKET)
{
puts("socket build error");
exit(-1);
}
}
void setioctl(bool x)
{
fcntl(sock, F_SETFL, O_NONBLOCK);
}
bool setip(string ip)
{
hostent *hname=gethostbyname(ip.c_str());
if(!hname)
{
puts("can't find address");
return false;
}//puts(inet_ntoa(addr.sin_addr));
addr.sin_addr.s_addr=*(u_long *)hname->h_addr_list[0];
return true;
}
void setport(int port)
{
addr.sin_port=htons(port);
}
int msend(const char *data,const int len)
{
return send(sock,data,len,0);
}
int msend(const string data)
{
return msend(data.c_str(),data.length());
}
int msend(mdata *data)
{
return msend(data->buf,data->len);
}
int mrecv(char *data,int len)
{
return recv(sock,data,len,0);
}
int mrecv(char *data)
{
return recv(sock,data,2047,0);
}
int mclose()
{
return close(sock);
}
int operator == (msock jb)
{
return sock==jb.sock;
}
};
listen用的sock繼承於msock:
class mssock:public msock
{
public:
sockaddr_in newaddr;
socklen_t newaddrlen;
mssock():msock()
{
createsock();
addr.sin_addr.s_addr=htonl(INADDR_ANY);
newaddrlen=sizeof(newaddr);//hehe
}
int mbind()
{
return bind(sock,(sockaddr *)&addr,sizeof(addr));
}
int mlisten(int num=20)
{
return listen(sock,num);
}
msock maccept()
{
SOCKET newsock=accept(sock,(sockaddr *)&newaddr,&newaddrlen);
msock newmsock;
newmsock.setsock(newsock);
return newmsock;
}
};
以上的msock和mssock類裡面含有socket句柄,可以直接將類強制轉換為socket句柄
在對epoll封裝之前還有一步就是:定義一個數據結構用於存放不定長度的數據,以便掛入epoll的事件中。
struct mdata
{
int fd;
unsigned int len;
char buf[2048];
mdata(){}
mdata(char *s,const int length)
{
for(int i=0;i<length;i++)
{
buf[i]=s[i];
}
}
};
epoll的封裝可以開始了,使用的是邊緣觸發的方式,我的思路是:將epoll的句柄以及參數都記錄在類中,並自己維護一個events數據用於對應的事件。外部只需要根據返回事件的臨時編號通過類的方法獲取返回值即可。
class mepoll
{
public:
int epfd; //epoll自身的句柄
epoll_event ev,*events; //臨時事件和每次wait用於儲存的事件數組
int maxevents; //最大事件數
int timeout; //wait超時
//構造函數默認最大事件數為20
mepoll(unsigned short eventsnum=20)
{
epfd=epoll_create(0xfff);
maxevents=eventsnum;
events=new epoll_event[maxevents];
timeout=-1;
}
//添加新的socket句柄到epoll中
int add(SOCKET fd)
{
fcntl(fd, F_SETFL, O_NONBLOCK);//設置fd為非阻塞
ev.events=EPOLLIN|EPOLLET;
ev.data.fd=fd;
return epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);
}
//設置對應編號的句柄事件為可讀
void ctl_in(int index)
{
ev.data.fd=*(int *)events[index].data.ptr;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,*(int *)events[index].data.ptr,&ev);
}
//改可寫,並將要寫的數據data綁定到該句柄對應的事件中
void ctl_out(int index,mdata *data)
{
data->fd=events[index].data.fd;
ev.data.ptr=data;
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,events[index].data.fd,&ev);
}
int wait()
{
return epoll_wait(epfd,events,maxevents,timeout);
}
unsigned int geteventtype(int index)
{
return events[index].events;
}
//獲取對應編號中的msock
msock getsock(int index)
{
msock sk;
sk.setsock(events[index].data.fd);
return sk;
}
//從mdata裡獲取出msock
msock getsock(mdata *data)
{
msock sk;
sk.setsock(data->fd);
return sk;
}
//獲取對應編號的事件
mdata *getdata(int index)
{
return (mdata *)events[index].data.ptr;
}
};
現在有一個比較好用的epoll類了。於是可以開始實現一個簡單的完整服務器程序了。
在實現過程中,有幾點需要注意區分用於listen用的句柄和收發數據使用的句柄。因為采用的是邊緣觸發的方式,很可能會出現同事listen到多個連接的情況,但是這裡epoll_wait只會通知一次。如果我們發現有accept事件,我們卻沒有把所有accept處理完,很多的鏈接就不能連入。對於這種問題,可以這樣處理:在listen發生時,一直accept直到accept失敗吧所有鏈接都處理完再繼續。
下面我使用我的游戲邏輯的接口和epoll類實現一個基本的服務器程序:
游戲邏輯的接口很簡單,只需要調用gamemain創建出該游戲類的實例。並使用收到的數據調用mdata *gamemain::dealdata(mdata *data) 函數即可得到游戲邏輯處理後的mdata,將處理好的mdata發回去,這裡處理後的mdata*是游戲實例自動分配的,發完之後調用gamemain::freedatainpool(mdata *data)釋放(那邊也會自動釋放的)。(哈哈,沒想到自己第一次寫游戲服務器邏輯能做得如此低耦合)
#include "ssock.h"
#include "game.h"
int main()
{
gamemain game;//創建游戲實例
mepoll ep;//epoll類
mssock ssock;//服務器listen用的sock
msock csock;//臨時sock
mdata rdata;//臨時rdata
ssock.setport(5000);//使用5000端口
if(SOCKET_ERROR==ssock.mbind())
{
puts("bind error");
return -1;
}
if(SOCKET_ERROR==ssock.mlisten())
{
puts("listen error");
return -1;
}
//開始listen
//將listen句柄加入到epoll中
ep.add(ssock.getsock());
puts("server start");
int ionum;
while(1)
{
ionum=ep.wait();//獲取事件
//遍歷並處理所有事件
for(int i=0; i<ionum; i++)
{
printf("some data come: ");
csock=ep.getsock(i);
if(ep.geteventtype(i)&EPOLLERR)
{
printf("sock %u error\n",csock.sock);
csock.mclose();
}
else if(ssock==csock)//處理listen事件
{
while(1)//accept直到沒有新連接
{
csock=ssock.maccept();
if(csock.getsock()==SOCKET_ERROR)
{
break;
}
//將新連接加入到epoll中
ep.add(csock.getsock());
puts("a newsock comed:");
}
}
else if(ep.geteventtype(i)&EPOLLIN)//處理接收事件
{
//根據臨時編號獲取到對應sock並接收數據
csock=ep.getsock(i);
printf("sock %u in\n",csock.sock);
int rlen;
bool isrecv=false;
rdata.len=0;
while(1)
{
rlen=csock.mrecv(rdata.buf+rdata.len);
if(rlen<0)
{
if (errno == EAGAIN)
{
isrecv = true;
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
break;
}
}
}
if(isrecv)
{
//調用游戲邏輯處理數據並修改sock事件為發送
ep.ctl_out(i,game.dealdata(&rdata));
}
}
else if(ep.geteventtype(i)&EPOLLOUT)//處理發送事件
{
mdata *data=ep.getdata(i);
csock=ep.getsock(data);
printf("sock %u out type:%u\n",csock.sock,data->buf[4]);
int slen,cnt=0;
bool issend=false;
while(1)
{
slen=csock.msend(data);
if(slen<0)
{
if (errno == EAGAIN)
{
// 對於nonblocking 的socket而言,這裡說明了已經全部發送成功了
issend = true;
break;
}
else if (errno == EINTR)
{
// 被信號中斷
continue;
}
else
{
// 其他錯誤
break;
}
}
if(slen=0)
{
break;
}
/*cnt+=slen;
if(cnt>=data->len)*/
{
issend=true;
break;
}
}
game.freedatainpool(data);
//無論發送情況都要改為可寫,以容錯
ep.ctl_in(i);
}
}
}
puts("server ended");
return 0;
}
這個程序每一次讀操作完成後,都是在單線程處理完游戲邏輯在進行下一步。如果游戲邏輯效率高且不會涉及到數據庫等待的問題,這種方式可取,否則可以另起線程處理游戲邏輯,實現真正的高並發。
本文的整個內容已經講完了,epoll的學問可不止這些,需要在以後的實踐中要慢慢積累。