程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> kqueue例子,kqueue

kqueue例子,kqueue

編輯:C++入門知識

kqueue例子,kqueue


網絡服務器通常都使用epoll進行異步IO處理,而開發者通常使用mac,為了方便開發,我把自己的handy庫移植到了mac平台上。移植過程中,網上居然沒有搜到kqueue的使用例子,讓我驚訝不已。為了讓大家不用像我一樣再次花費大力氣搞定kqueue,我整理了一個簡單清晰可運行的kqueue例子,供大家參考。 
kqueue一共有幾個函數:

 1 //類似epoll_create
 2 int kqueue(void); 
 3 //兼具epoll_ctl及epoll_wait功能
 4 int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout); 
 5 //設定kevent參數的宏
 6 EV_SET(&kev, ident, filter, flags, fflags, data, udata);
 7 struct kevent {
 8 uintptr_t ident; /* identifier for this event */
 9 int16_t filter; /* filter for event */
10 uint16_t flags; /* general flags */
11 uint32_t fflags; /* filter-specific flags */
12 intptr_t data; /* filter-specific data */
13 void *udata; /* opaque user data identifier */
14 };

 

函數調用示例:

 1 //創建kqueue
 2 int epollfd = kqueue();
 3 //添加或者修改fd
 4 struct kevent ev[2];
 5 int n = 0;
 6 if (events & kReadEvent) {
 7     EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
 8 } else if (modify){
 9     EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
10 }
11 if (events & kWriteEvent) {
12     EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
13 } else if (modify){
14     EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
15 }
16 printf("%s fd %d events read %d write %d\n",
17        modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
18 int r = kevent(efd, ev, n, NULL, 0, NULL);
19 //獲取ready的fd
20 struct timespec timeout;
21 timeout.tv_sec = waitms / 1000;
22 timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
23 const int kMaxEvents = 20;
24 struct kevent activeEvs[kMaxEvents];
25 int n = kevent(efd, NULL, 0, activeEvs, kMaxEvents, &timeout);
26 //處理IO事件
27 for (int i = 0; i < n; i ++) {
28     int fd = (int)(intptr_t)activeEvs[i].udata;
29     int events = activeEvs[i].filter;
30     if (events == EVFILT_READ) {
31         handleRead(efd, fd);
32     } else if (events == EVFILT_WRITE) {
33         handleWrite(efd, fd);
34     }
35 }

注意kevent與epoll最大的不同在於READ/WRITE事件是分開注冊並且分開返回的,而Epoll則是一個fd一次返回讀和寫事件,用標志位來判斷。 
可以運行的代碼如下:kqueue-example(handy對kqueue提供了封裝版本)

 

  1 #include <sys/socket.h>
  2 #include <sys/event.h>
  3 #include <netinet/in.h>
  4 #include <arpa/inet.h>
  5 #include <fcntl.h>
  6 #include <unistd.h>
  7 #include <stdio.h>
  8 #include <errno.h>
  9 #include <string.h>
 10 #include <stdlib.h>
 11 
 12 #define exit_if(r, ...) if(r) {printf(__VA_ARGS__); printf("error no: %d error msg %s\n", errno, strerror(errno)); exit(1);}
 13 
 14 const int kReadEvent = 1;
 15 const int kWriteEvent = 2;
 16 
 17 void setNonBlock(int fd) {
 18     int flags = fcntl(fd, F_GETFL, 0);
 19     exit_if(flags<0, "fcntl failed");
 20     int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
 21     exit_if(r<0, "fcntl failed");
 22 }
 23 
 24 void updateEvents(int efd, int fd, int events, bool modify) {
 25     struct kevent ev[2];
 26     int n = 0;
 27     if (events & kReadEvent) {
 28         EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
 29     } else if (modify){
 30         EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
 31     }
 32     if (events & kWriteEvent) {
 33         EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
 34     } else if (modify){
 35         EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
 36     }
 37     printf("%s fd %d events read %d write %d\n",
 38            modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
 39     int r = kevent(efd, ev, n, NULL, 0, NULL);
 40     exit_if(r, "kevent failed ");
 41 }
 42 
 43 void handleAccept(int efd, int fd) {
 44     struct sockaddr_in raddr;
 45     socklen_t rsz = sizeof(raddr);
 46     int cfd = accept(fd,(struct sockaddr *)&raddr,&rsz);
 47     exit_if(cfd<0, "accept failed");
 48     sockaddr_in peer, local;
 49     socklen_t alen = sizeof(peer);
 50     int r = getpeername(cfd, (sockaddr*)&peer, &alen);
 51     exit_if(r<0, "getpeername failed");
 52     printf("accept a connection from %s\n", inet_ntoa(raddr.sin_addr));
 53     setNonBlock(cfd);
 54     updateEvents(efd, cfd, kReadEvent|kWriteEvent, false);
 55 }
 56 
 57 void handleRead(int efd, int fd) {
 58     char buf[4096];
 59     int n = 0;
 60     while ((n=::read(fd, buf, sizeof buf)) > 0) {
 61         printf("read %d bytes\n", n);
 62         int r = ::write(fd, buf, n); //寫出讀取的數據
 63         //實際應用中,寫出數據可能會返回EAGAIN,此時應當監聽可寫事件,當可寫時再把數據寫出
 64         exit_if(r<=0, "write error");
 65     }
 66     if (n<0 && (errno == EAGAIN || errno == EWOULDBLOCK))
 67         return;
 68     exit_if(n<0, "read error"); //實際應用中,n<0應當檢查各類錯誤,如EINTR
 69     printf("fd %d closed\n", fd);
 70     close(fd);
 71 }
 72 
 73 void handleWrite(int efd, int fd) {
 74     //實際應用應當實現可寫時寫出數據,無數據可寫才關閉可寫事件
 75     updateEvents(efd, fd, kReadEvent, true);
 76 }
 77 
 78 void loop_once(int efd, int lfd, int waitms) {
 79     struct timespec timeout;
 80     timeout.tv_sec = waitms / 1000;
 81     timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
 82     const int kMaxEvents = 20;
 83     struct kevent activeEvs[kMaxEvents];
 84     int n = kevent(efd, NULL, 0, activeEvs, kMaxEvents, &timeout);
 85     printf("epoll_wait return %d\n", n);
 86     for (int i = 0; i < n; i ++) {
 87         int fd = (int)(intptr_t)activeEvs[i].udata;
 88         int events = activeEvs[i].filter;
 89         if (events == EVFILT_READ) {
 90             if (fd == lfd) {
 91                 handleAccept(efd, fd);
 92             } else {
 93                 handleRead(efd, fd);
 94             }
 95         } else if (events == EVFILT_WRITE) {
 96             handleWrite(efd, fd);
 97         } else {
 98             exit_if(1, "unknown event");
 99         }
100     }
101 }
102 
103 int main() {
104     short port = 99;
105     int epollfd = kqueue();
106     exit_if(epollfd < 0, "epoll_create failed");
107     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
108     exit_if(listenfd < 0, "socket failed");
109     struct sockaddr_in addr;
110     memset(&addr, 0, sizeof addr);
111     addr.sin_family = AF_INET;
112     addr.sin_port = htons(port);
113     addr.sin_addr.s_addr = INADDR_ANY;
114     int r = ::bind(listenfd,(struct sockaddr *)&addr, sizeof(struct sockaddr));
115     exit_if(r, "bind to 0.0.0.0:%d failed %d %s", port, errno, strerror(errno));
116     r = listen(listenfd, 20);
117     exit_if(r, "listen failed %d %s", errno, strerror(errno));
118     printf("fd %d listening at %d\n", listenfd, port);
119     setNonBlock(listenfd);
120     updateEvents(epollfd, listenfd, kReadEvent, false);
121     for (;;) { //實際應用應當注冊信號處理函數,退出時清理資源
122         loop_once(epollfd, listenfd, 10000);
123     }
124     return 0;
125 }

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved