程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> poll調用深入解析

poll調用深入解析

編輯:C++入門知識

poll調用和select調用實現的功能一樣,都是網絡IO利用的一種機制。先看一下poll的調用形式

一,poll調用

[cpp] 
#include <poll.h> 
int poll(struct pollfd fds[], nfds_t nfds, int timeout); 
struct pollfd結構如下:【在源碼文件poll.h文件中】
[cpp] 
struct pollfd { 
    int fd; 
    short events; 
    short revents; 
}; 
這個結構中fd表示文件描述符,events表示請求檢測的事件,revents表示檢測之後返回的事件,如果當某個文件描述符有狀態變化時,revents的值就不為空。

二,參數說明
fds:存放需要被檢測狀態的Socket描述符;與select不同(select函數在調用之後,會清空檢測socket描述符的數組),每當調用這個函數之後,系統不會清空這個數組,而是將有狀態變化的描述符結構的revents變量狀態變化,操作起來比較方便;
nfds:用於標記數組fds中的struct pollfd結構元素的總數量;
timeout:poll函數調用阻塞的時間,單位是MS(毫秒)
三,返回值
大於0:表示數組fds中有socket描述符的狀態發生變化,或可以讀取、或可以寫入、或出錯。並且返回的值表示這些狀態有變化的socket描述符的總數量;此時可以對fds數組進行遍歷,以尋找那些revents不空的socket描述符,然後判斷這個裡面有哪些事件以讀取數據。
等於0:表示沒有socket描述符有狀態變化,並且調用超時。
小於0:此時表示有錯誤發生,此時全局變量errno保存錯誤碼。
四,內核實現
      poll系統調用的內核實現是sys_poll,其代碼如下:
[cpp] 
asmlinkage long sys_poll(struct pollfd __user *ufds, unsigned int nfds, 
            long timeout_msecs) 

    s64 timeout_jiffies; 
    int ret; 
 
    if (timeout_msecs > 0) { 
#if HZ > 1000 
        /* We can only overflow if HZ > 1000 */ 
        if (timeout_msecs / 1000 > (s64)0x7fffffffffffffffULL / (s64)HZ) 
            timeout_jiffies = -1; 
        else 
#endif 
            timeout_jiffies = msecs_to_jiffies(timeout_msecs); 
    } else { 
        /* Infinite (< 0) or no (0) timeout */ 
        timeout_jiffies = timeout_msecs; 
    } 
 
    ret = do_sys_poll(ufds, nfds, &timeout_jiffies); 
    if (ret == -EINTR) { 
        struct restart_block *restart_block; 
        restart_block = &current_thread_info()->restart_block; 
        restart_block->fn = do_restart_poll; 
        restart_block->arg0 = (unsigned long)ufds; 
        restart_block->arg1 = nfds; 
        restart_block->arg2 = timeout_jiffies & 0xFFFFFFFF; 
        restart_block->arg3 = (u64)timeout_jiffies >> 32; 
        ret = -ERESTART_RESTARTBLOCK; 
    } 
    return ret; 

這個函數還是比較容易理解,包括三個部分的工作:
函數調用超時阻塞時間轉換,根據內核的軟時鐘設置頻率將超時時間設置為jiffies標准時間。
調用do_sys_poll,這裡完成主要的工作。
如果當前進程有待處理的信號,則先處理信號,這是根據do_sys_poll返回來決定的,事實上在這個調用中會檢查當前的進程是否有未處理信號,如果有,就會返回EINTR以處理信號,然後返回-ERESTART_RESTARTBLOCK,這會導致重新調用。
進入到do_sys_poll函數中
[cpp] 
int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, s64 *timeout) 

    struct poll_wqueues table; 
    int err = -EFAULT, fdcount, len, size; 
    /* Allocate small arguments on the stack to save memory and be
       faster - use long to make sure the buffer is aligned properly
       on 64 bit archs to avoid unaligned access */ 
    long stack_pps[POLL_STACK_ALLOC/sizeof(long)]; 
    struct poll_list *const head = (struct poll_list *)stack_pps; 
    struct poll_list *walk = head; 
    unsigned long todo = nfds; 
 
    if (nfds > current->signal->rlim[RLIMIT_NOFILE].rlim_cur) 
        return -EINVAL; 
 
    len = min_t(unsigned int, nfds, N_STACK_PPS); 
    for (;;) { 
        walk->next = NULL; 
        walk->len = len; 
        if (!len) 
            break; 
 
        if (copy_from_user(walk->entries, ufds + nfds-todo, 
                    sizeof(struct pollfd) * walk->len)) 
            goto out_fds; 
 
        todo -= walk->len; 
        if (!todo) 
            break; 
 
        len = min(todo, POLLFD_PER_PAGE); 
        size = sizeof(struct poll_list) + sizeof(struct pollfd) * len; 
        walk = walk->next = kmalloc(size, GFP_KERNEL); 
        if (!walk) { 
            err = -ENOMEM; 
            goto out_fds; 
        } 
    } 
pollfd 
    poll_initwait(&table); 
    fdcount = do_poll(nfds, head, &table, timeout); 
    poll_freewait(&table); 
 
    for (walk = head; walk; walk = walk->next) { 
        struct pollfd *fds = walk->entries; 
        int j; 
 
        for (j = 0; j < walk->len; j++, ufds++) 
            if (__put_user(fds[j].revents, &ufds->revents)) 
                goto out_fds; 
    } 
 
    err = fdcount; 
out_fds: 
    walk = head->next; 
    while (walk) { 
        struct poll_list *pos = walk; 
        walk = walk->next; 
        kfree(pos); 
    } 
 
    return err; 

為了加快處理速度和提高系統性能,這裡優先使用已經定好的一個棧空間,其大小為POLL_STACK_ALLOC,在我系統上,其值為256,大小為256個字節的棧空間轉換為struct poll_list結構,以存儲需要被檢測的socket描述符,struct poll_list的結構如下:
[cpp] 
struct poll_list { 
    struct poll_list *next; 
    int len; 
    struct pollfd entries[0]; 
}; 
上面可以看到該結構的entries為一個數組,結構為struct pollfd,這個有點眼熟,沒錯,它就是存儲poll調用中需要被檢測的socket描述符。那麼前面分配的棧空間能存儲多少個struct pollfd呢?這計算如下:
[cpp] 
len = min_t(unsigned int, nfds, N_STACK_PPS); 
式中的N_STACK_PPS就是計算前面默認的固定棧大小能夠存儲多少個struct pollfd的
[cpp] view plaincopy
#define N_STACK_PPS ((sizeof(stack_pps) - sizeof(struct poll_list))  / \ 
            sizeof(struct pollfd)) 
然後就復制len個struct pollfd至內核空間,這裡有細心的用戶就會發現:如果nfds比N_STACK_PPS大的話,怎麼辦呢?注意上面的函數,是一個循環,如果nfds比N_STACK_PPS大(事實上,一般都會比這裡大),那麼會再請求內存,然後接著復制,就是這個代碼片段:
[cpp]
              len = min(todo, POLLFD_PER_PAGE); 
size = sizeof(struct poll_list) + sizeof(struct pollfd) * len; 
walk = walk->next = kmalloc(size, GFP_KERNEL); 
if (!walk) { 
    err = -ENOMEM; 
    goto out_fds; 

POLLFD_PER_PAGE表示一頁的內存能夠存儲多少個struct pollfd,可以計算一下,一頁是4K,而struct pollfd的內存占用8個字節,就是一頁的內存可以將近存儲512個socket描述符。如果在分配一頁的內存之後,還不夠nfds來用,沒關系,循環不會退出的,會再分配一個頁,並且所有分配的塊都被struct poll_list鏈接起來,上面可以看到,這個結構有一個next域,就是專門做這個的。
在這之後,就會形成一個以stack_pps存儲空間為頭,然後一頁一頁分配的內存為接點的鏈表,這個鏈表上就存儲了poll調用時傳入的所有的socket描述符。

接下來調用一個很重要的部分
[cpp] 
poll_initwait(&table); 
fdcount = do_poll(nfds, head, &table, timeout); 
poll_freewait(&table); 
這是最重要的部分,因為接下來的部分比較容易理解,在這之後,做兩件事:
將鏈表上的所有struct pollfd中的revents的狀態寫入到用戶空間(記得之前也從用戶空間寫入過內核空間,這是因為內核態地址,用戶空間應用不能訪問),所以需要寫入到用戶空間中去。
之前調用kmalloc分配了很多內存,現在要釋放了,所以要從stack_pps地址處的head開始,順著next不斷的釋放內存。
再回到最重要的部分,先看poll_initwait調用,下面是主要相關的數據結構
[cpp] 
struct poll_wqueues { 
    poll_table pt; 
    struct poll_table_page * table; 
    int error; 
    int inline_index; 
    struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; 
}; 
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *); 
typedef struct poll_table_struct { 
    poll_queue_proc qproc; 
} poll_table; 
poll_initwait函數如下:
[cpp] 
void poll_initwait(struct poll_wqueues *pwq) 

    init_poll_funcptr(&pwq->pt, __pollwait);//設置poll_table結構中的qproc函數指針為__pollwait函數,就是pwq->pt->qproc=__pollwait。這個函數是一個回調函數,基本上這種機制的實現,就是依靠回調函數了。 
    pwq->error = 0; 
    pwq->table = NULL; 
    pwq->inline_index = 0; 

所以poll_initwait就是初始化了poll_wqueues table,主要是將其結構中的函數指針設置為__pollwait函數。那麼這個函數是做什麼的呢?我們先看poll_initwait之後調用的函數,就是do_poll函數,其實現如下:
注意下面函數在調用時的參數,參數有這麼幾個nfds, head, &table, timeout,參數就容易理解了:nfds表示poll調用時傳入的數組中struct pollfd的個數,head其實是表示將poll調用時傳入的數組,因為全部都表示為struct poll_list鏈表了(前面分析的,還記得吧),table是剛剛初始化的一個,裡面暫時就只是包含一個回調函數的指針,就是__pollwait函數。timeout表示超時時間。
[cpp] 
static int do_poll(unsigned int nfds,  struct poll_list *list, 
           struct poll_wqueues *wait, s64 *timeout) 

    int count = 0; 
    poll_table* pt = &wait->pt; 
 
    /* Optimise the no-wait case */ 
    if (!(*timeout)) 
        pt = NULL; 
 
    for (;;) { 
        struct poll_list *walk; 
        long __timeout; 
 
        set_current_state(TASK_INTERRUPTIBLE); 
        for (walk = list; walk != NULL; walk = walk->next) { 
            struct pollfd * pfd, * pfd_end; 
 
            pfd = walk->entries; 
            pfd_end = pfd + walk->len; 
            for (; pfd != pfd_end; pfd++) { 
                /*
                 * Fish for events. If we found one, record it
                 * and kill the poll_table, so we don't
                 * needlessly register any other waiters after
                 * this. They'll get immediately deregistered
                 * when we break out and return.
                 */ 
                if (do_pollfd(pfd, pt)) { 
                    count++; 
                    pt = NULL; 
                } 
            } 
        } 
        /*
         * All waiters have already been registered, so don't provide
         * a poll_table to them on the next loop iteration.
         */ 
        pt = NULL; 
        if (!count) { 
            count = wait->error; 
            if (signal_pending(current)) 
                count = -EINTR; 
        } 
        if (count || !*timeout) 
            break; 
 
        if (*timeout < 0) { 
            /* Wait indefinitely */ 
            __timeout = MAX_SCHEDULE_TIMEOUT; 
        } else if (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT-1)) { 
            /*
             * Wait for longer than MAX_SCHEDULE_TIMEOUT. Do it in
             * a loop
             */ 
            __timeout = MAX_SCHEDULE_TIMEOUT - 1; 
            *timeout -= __timeout; 
        } else { 
            __timeout = *timeout; 
            *timeout = 0; 
        } 
 
        __timeout = schedule_timeout(__timeout); 
        if (*timeout >= 0) 
            *timeout += __timeout; 
    } 
    __set_current_state(TASK_RUNNING); 
    return count; 

這個函數有以下幾個要注意的點:
信號處理保障。在這個函數中先將當前進程設置為可以被信號中斷,就是set_current_state(TASK_INTERRUPTIBLE)這一行,後面還會檢查是否有需要處理的信號signal_pending(current)。這裡的意思是就算是poll調用進入到sys_poll系統調用之後,也可以接收外部信號,從而退出當前系統調用(因為我們知道一般的系統調用都不會被中斷的,所以系統調用一般都盡量很快的返回)。
外部大循環退出的條件,外部大循環退出的條件只有if (count || !*timeout) break;後面的條件容易理解,就是超時,前面的count是什麼意思?它在每次調用do_pollfd函數之後,都有可能會加1,其實調用do_pollfd就是檢查socket描述符狀態的變化,如果有變化,就會使count加1,所以在結束內部遍歷之後,count保存了所有的有狀態變化的socket描述符數量。
這個函數會對之前以head為頭結點的鏈表進行遍歷,然後鏈表上每個結點中都包含很多很多的struct pollfd進行遍歷(這些struct pollfd都被存儲在struct poll_list結構的數組字段struct pollfd entries裡面。
然後對每個struct pollfd調用do_pollfd(這會調用很多次,根據你傳入多少個socket描述符而定),這個函數需要兩個參數,一個是struct pollfd,這沒得說的,另一個是剛剛初始化的table,就是那個暫時只是包含__pollwait回調指針的結構,還記得吧。
我們再進入do_pollfd,了解這個函數是做什麼的?
[cpp] 
static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait) 

    unsigned int mask; 
    int fd; 
 
    mask = 0; 
    fd = pollfd->fd; 
    if (fd >= 0) { 
        int fput_needed; 
        struct file * file; 
 
        file = fget_light(fd, &fput_needed); 
        mask = POLLNVAL; 
        if (file != NULL) { 
            mask = DEFAULT_POLLMASK; 
            if (file->f_op && file->f_op->poll) 
                mask = file->f_op->poll(file, pwait); 
            /* Mask out unneeded events. */ 
            mask &= pollfd->events | POLLERR | POLLHUP; 
            fput_light(file, fput_needed); 
        } 
    } 
    pollfd->revents = mask; 
 
    return mask; 

這個函數很簡單,先根據socket描述符或者是文件句柄找到進程對應的struct file *file結構,然後調用file->f_op->poll(file,pwait),這是這個函數的核心調用,這其實也是linux的VFS的一部分,這會根據當前的文件是什麼類型的文件來選擇調用的入口,如file是socket網絡文件,此時調用的就是由網絡驅動設備來實現的poll,如果file是ext3等文件系統上打開的一個文件,那就會調用由該文件系統來實現的poll函數,我們以tcp_poll為例來了解一般poll完成什麼工作;
注意下面的參數,file和wait是由file->f_op->poll調用傳入的參數,而struct socket為socket連接的進程方面表示。
[cpp]
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait) 

    unsigned int mask; 
    struct sock *sk = sock->sk; 
    struct tcp_sock *tp = tcp_sk(sk); 
 
    poll_wait(file, sk->sk_sleep, wait); 
    if (sk->sk_state == TCP_LISTEN) 
        return inet_csk_listen_poll(sk); 
 
    /* Socket is not locked. We are protected from async events
       by poll logic and correct handling of state changes
       made by another threads is impossible in any case.
     */ 
 
    mask = 0; 
    if (sk->sk_err) 
        mask = POLLERR; 
 
    /*
     * POLLHUP is certainly not done right. But poll() doesn't
     * have a notion of HUP in just one direction, and for a
     * socket the read side is more interesting.
     *
     * Some poll() documentation says that POLLHUP is incompatible
     * with the POLLOUT/POLLWR flags, so somebody should check this
     * all. But careful, it tends to be safer to return too many
     * bits than too few, and you can easily break real applications
     * if you don't tell them that something has hung up!
     *
     * Check-me.
     *
     * Check number 1. POLLHUP is _UNMASKABLE_ event (see UNIX98 and
     * our fs/select.c). It means that after we received EOF,
     * poll always returns immediately, making impossible poll() on write()
     * in state CLOSE_WAIT. One solution is evident --- to set POLLHUP
     * if and only if shutdown has been made in both directions.
     * Actually, it is interesting to look how Solaris and DUX
     * solve this dilemma. I would prefer, if PULLHUP were maskable,
     * then we could set it on SND_SHUTDOWN. BTW examples given
     * in Stevens' books assume exactly this behaviour, it explains
     * why PULLHUP is incompatible with POLLOUT.    --ANK
     *
     * NOTE. Check for TCP_CLOSE is added. The goal is to prevent
     * blocking on fresh not-connected or disconnected socket. --ANK
     */ 
    if (sk->sk_shutdown == SHUTDOWN_MASK || sk->sk_state == TCP_CLOSE) 
        mask |= POLLHUP; 
    if (sk->sk_shutdown & RCV_SHUTDOWN) 
        mask |= POLLIN | POLLRDNORM | POLLRDHUP; 
 
    /* Connected? */ 
    if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) { 
        /* Potential race condition. If read of tp below will
         * escape above sk->sk_state, we can be illegally awaken
         * in SYN_* states. */ 
        if ((tp->rcv_nxt != tp->copied_seq) && 
            (tp->urg_seq != tp->copied_seq || 
             tp->rcv_nxt != tp->copied_seq + 1 || 
             sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data)) 
            mask |= POLLIN | POLLRDNORM; 
 
        if (!(sk->sk_shutdown & SEND_SHUTDOWN)) { 
            if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { 
                mask |= POLLOUT | POLLWRNORM; 
            } else {  /* send SIGIO later */ 
                set_bit(SOCK_ASYNC_NOSPACE, 
                    &sk->sk_socket->flags); 
                set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 
 
                /* Race breaker. If space is freed after
                 * wspace test but before the flags are set,
                 * IO signal will be lost.
                 */ 
                if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) 
                    mask |= POLLOUT | POLLWRNORM; 
            } 
        } 
 
        if (tp->urg_data & TCP_URG_VALID) 
            mask |= POLLPRI; 
    } 
    return mask; 

上面的tcp_poll看上去很長,但核心的的調用是:
[cpp] 
poll_wait(file, sk->sk_sleep, wait); 
這個函數的file和wait是我們在poll調用過程中傳入的參數,sk->sk_sleep是什麼呢?這裡解釋一下
sk的值是
[cpp]
struct sock *sk = sock->sk; 
struct sock是socket連接的內核表示,sk->sk_sleep是struct wait_queue_head_t結構類型,這表示的是socket的等待隊列,每一個socket都有自己的一個等待隊列,由內核結構struct sock來維護。
其實大多數驅動實現的時候,此時都調用這個函數,這個函數也很簡單,實現如下:
[cpp] 
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) 

    if (p && wait_address) 
        p->qproc(filp, wait_address, p); 

現在一個轉折點出現了,前面我們說過初始化table的函數指針為__pollwait,那麼此時調用的就是__pollwait(filp,wait_address,p),這裡的參數分別表示為進程表示文件結構struct file,socket或設備的等待隊列wait_queue_head_t,和poll_table。
再回顧一下,到此為止,從我們調用poll函數開始,然後復制數據至內核、將struct pollfd表示為內核的struct poll_list鏈表、初始化poll_table變量、然後調用do_pollfd函數等過程,其實都是為了檢查poll傳遞的每個struct pollfd是否有狀態變化,也就是調用VFS的file->f_op->poll函數,這就到了__pollwait函數這裡來了,這個函數會往等待隊列上添加一個新的結點。
__pollwait的實現
[cpp]
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, 
                poll_table *p) 

    struct poll_table_entry *entry = poll_get_entry(p); 
    if (!entry) 
        return; 
    get_file(filp); 
    entry->filp = filp; 
    entry->wait_address = wait_address; 
    init_waitqueue_entry(&entry->wait, current); 
    add_wait_queue(wait_address, &entry->wait); 

我們現在來分析一下,__pollwait調用完成之後,內核做了什麼?先看一下poll_get_entry(p);
[cpp] 
static struct poll_table_entry *poll_get_entry(poll_table *_p) 

    struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt); 
    struct poll_table_page *table = p->table; 
 
    if (p->inline_index < N_INLINE_POLL_ENTRIES) 
        return p->inline_entries + p->inline_index++; 
 
    if (!table || POLL_TABLE_FULL(table)) { 
        struct poll_table_page *new_table; 
 
        new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL); 
        if (!new_table) { 
            p->error = -ENOMEM; 
            __set_current_state(TASK_RUNNING); 
            return NULL; 
        } 
        new_table->entry = new_table->entries; 
        new_table->next = table; 
        p->table = new_table; 
        table = new_table; 
    } 
 
    return table->entry++; 

這個函數會根據情況創建struct poll_table_page結構,因為__pollwait在系統中是會被多次調用的,所以可能會有多個struct poll_table_page結構,這個結構是對struct poll_table_entry的一個封裝,其結構如下所示:
[cpp] 
struct poll_table_page { 
    struct poll_table_page * next; 
    struct poll_table_entry * entry; 
    struct poll_table_entry entries[0]; 
}; 
struct poll_table_entry { 
    struct file * filp; 
    wait_queue_t wait; 
    wait_queue_head_t * wait_address; 
}; 
所以在調用poll_get_entry之後,會返回一個新的poll_table_entry,這也是每次調用__pollwait都會產生的。接下來調用init_waitqueue_entry函數將這個新建的struct poll_table_entry和當前的進程綁定起來,再將struct poll_table_entry加入到socket的等待隊列。這樣就將當前進程和socket的等待隊列聯系,說白了,就是把current掛到等待隊列上。
因為一旦有數據就緒,就會叫醒等待隊列上的進程。可以看代碼
[cpp] 
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p) 

    q->flags = 0; 
    q->private = p; 
    q->func = default_wake_function; 

這裡同時,注冊了一個數據就緒時的叫醒函數
[cpp] 
int default_wake_function(wait_queue_t *curr, unsigned mode, int sync, 
              void *key) 

    return try_to_wake_up(curr->private, mode, sync); 

這就完成了調用。再來所有回顧一下
調用poll函數。
進入sys_poll等系列內核調用。
准備數據:,注冊__pollwait(這是通過初始化poll_wqueues來完成的),復制數據至內核,重新組織成struct poll_list等等。
對所有的struct pollfd循環,以調用do_pollfd函數。
do_pollfd調用file->f_op->poll函數。
然後調用__pollwait創建一個struct poll_table_entry,並將其與當前進程綁定。
將當前進程掛在socket的等待隊列上。
有數據就緒時喚醒進程。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved