打印

poll 机制深入理解解

[复制链接]
169|0
手机看帖
扫描二维码
随时随地手机跟帖
跳转到指定楼层
楼主
追光少年|  楼主 | 2018-10-5 11:34 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
poll调用和select调用实现的功能一样,都是网络IO利用的一种机制。先看一下poll的调用形式

一,poll调用

[cpp]

#include  

int poll(struct pollfd fds[], nfds_t nfds, inttimeout);

struct pollfd结构如下:【在源码文件poll.h文件中】

[cpp]

struct pollfd {

    intfd;

    shortevents;

    shortrevents;

};

这个结构中fd表示文件描述符,events表示请求检测的事件,revents表示检测之后返回的事件,如果当某个文件描述符有状态变化时,revents的值就不为空。

二,参数说明

fds:存放需要被检测状态的Socket描述符;与select不同(select函数在调用之后,会清空检测socket描述符的数组),每当调用这个函数之后,系统不会清空这个数组,而是将有状态变化的描述符结构的revents变量状态变化,操作起来比较方便;

nfds:用于标记数组fds中的struct pollfd结构元素的总数量;

timeout:poll函数调用阻塞的时间,单位是MS(毫秒)

三,返回值

大于0:表示数组fds中有socket描述符的状态发生变化,或可以读取、或可以写入、或出错。并且返回的值表示这些状态有变化的socket描述符的总数量;此时可以对fds数组进行遍历,以寻找那些revents不空的socket描述符,然后判断这个里面有哪些事件以读取数据。

等于0:表示没有socket描述符有状态变化,并且调用超时。

小于0:此时表示有错误发生,此时全局变量errno保存错误码。

四,内核实现

     poll系统调用的内核实现是sys_poll,其代码如下:

[cpp]

asmlinkage long sys_poll(struct pollfd __user *ufds, unsigned intnfds,

           long timeout_msecs)

{

    s64timeout_jiffies;

    intret;



    if(timeout_msecs > 0) {

#if HZ > 1000

        

       if (timeout_msecs / 1000 > (s64)0x7fffffffffffffffULL /(s64)HZ)

           timeout_jiffies = -1;

       else

#endif

           timeout_jiffies =msecs_to_jiffies(timeout_msecs);

    } else{

        

       timeout_jiffies = timeout_msecs;

   }



    ret =do_sys_poll(ufds, nfds,&timeout_jiffies);

    if (ret ==-EINTR) {

       struct restart_block *restart_block;

       restart_block =&current_thread_info()->restart_block;

       restart_block->fn = do_restart_poll;

       restart_block->arg0 = (unsignedlong)ufds;

       restart_block->arg1 = nfds;

       restart_block->arg2 = timeout_jiffies &0xFFFFFFFF;

       restart_block->arg3 = (u64)timeout_jiffies >>32;

       ret = -ERESTART_RESTARTBLOCK;

   }

    returnret;

}

这个函数还是比较容易理解,包括三个部分的工作:

函数调用超时阻塞时间转换,根据内核的软时钟设置频率将超时时间设置为jiffies标准时间。

调用do_sys_poll,这里完成主要的工作。

如果当前进程有待处理的信号,则先处理信号,这是根据do_sys_poll返回来决定的,事实上在这个调用中会检查当前的进程是否有未处理信号,如果有,就会返回EINTR以处理信号,然后返回-ERESTART_RESTARTBLOCK,这会导致重新调用。

进入到do_sys_poll函数中

[cpp]

int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, s64*timeout)

{

    structpoll_wqueues table;

    int err =-EFAULT, fdcount, len, size;

   

    longstack_pps[POLL_STACK_ALLOC/sizeof(long)];

    structpoll_list *const head = (struct poll_list*)stack_pps;

    structpoll_list *walk = head;

    unsignedlong todo = nfds;



    if (nfds>current->signal->rlim[RLIMIT_NOFILE].rlim_cur)

       return -EINVAL;



    len =min_t(unsigned int, nfds, N_STACK_PPS);

    for (;;){

       walk->next = NULL;

       walk->len = len;

       if (!len)

           break;



       if (copy_from_user(walk->entries, ufds +nfds-todo,

                   sizeof(struct pollfd) * walk->len))

           goto out_fds;



       todo -= walk->len;

       if (!todo)

           break;



       len = min(todo, POLLFD_PER_PAGE);

       size = sizeof(struct poll_list) + sizeof(struct pollfd) *len;

       walk = walk->next = kmalloc(size,GFP_KERNEL);

       if (!walk) {

           err = -ENOMEM;

           goto out_fds;

       }

   }

pollfd

   poll_initwait(&table);

    fdcount =do_poll(nfds, head, &table, timeout);

   poll_freewait(&table);



    for (walk =head; walk; walk = walk->next) {

       struct pollfd *fds = walk->entries;

       int j;



       for (j = 0; j < walk->len; j++,ufds++)

           if (__put_user(fds[j].revents,&ufds->revents))

               goto out_fds;

   }



    err =fdcount;

out_fds:

    walk =head->next;

    while (walk){

       struct poll_list *pos = walk;

       walk = walk->next;

       kfree(pos);

   }



    returnerr;

}

为了加快处理速度和提高系统性能,这里优先使用已经定好的一个栈空间,其大小为POLL_STACK_ALLOC,在我系统上,其值为256,大小为256个字节的栈空间转换为structpoll_list结构,以存储需要被检测的socket描述符,struct poll_list的结构如下:

[cpp]

struct poll_list {

    structpoll_list *next;

    intlen;

    structpollfd entries[0];

};

上面可以看到该结构的entries为一个数组,结构为structpollfd,这个有点眼熟,没错,它就是存储poll调用中需要被检测的socket描述符。那么前面分配的栈空间能存储多少个structpollfd呢?这计算如下:

[cpp]

len = min_t(unsigned int, nfds,N_STACK_PPS);

式中的N_STACK_PPS就是计算前面默认的固定栈大小能够存储多少个struct pollfd的

[cpp] view plaincopy

#define N_STACK_PPS ((sizeof(stack_pps) - sizeof(structpoll_list))  / \

           sizeof(struct pollfd))

然后就复制len个structpollfd至内核空间,这里有细心的用户就会发现:如果nfds比N_STACK_PPS大的话,怎么办呢?注意上面的函数,是一个循环,如果nfds比N_STACK_PPS大(事实上,一般都会比这里大),那么会再请求内存,然后接着复制,就是这个代码片段:

[cpp]

             len = min(todo, POLLFD_PER_PAGE);

size = sizeof(struct poll_list) + sizeof(struct pollfd) *len;

walk = walk->next = kmalloc(size,GFP_KERNEL);

if (!walk) {

    err =-ENOMEM;

    gotoout_fds;

}

POLLFD_PER_PAGE表示一页的内存能够存储多少个struct pollfd,可以计算一下,一页是4K,而structpollfd的内存占用8个字节,就是一页的内存可以将近存储512个socket描述符。如果在分配一页的内存之后,还不够nfds来用,没关系,循环不会退出的,会再分配一个页,并且所有分配的块都被structpoll_list链接起来,上面可以看到,这个结构有一个next域,就是专门做这个的。

在这之后,就会形成一个以stack_pps存储空间为头,然后一页一页分配的内存为接点的链表,这个链表上就存储了poll调用时传入的所有的socket描述符。

接下来调用一个很重要的部分

[cpp]

poll_initwait(&table);

fdcount = do_poll(nfds, head, &table,timeout);

poll_freewait(&table);

这是最重要的部分,因为接下来的部分比较容易理解,在这之后,做两件事:

将链表上的所有structpollfd中的revents的状态写入到用户空间(记得之前也从用户空间写入过内核空间,这是因为内核态地址,用户空间应用不能访问),所以需要写入到用户空间中去。

之前调用kmalloc分配了很多内存,现在要释放了,所以要从stack_pps地址处的head开始,顺着next不断的释放内存。

再回到最重要的部分,先看poll_initwait调用,下面是主要相关的数据结构

[cpp]

struct poll_wqueues {

    poll_tablept;

    structpoll_table_page * table;

    interror;

    intinline_index;

    structpoll_table_entryinline_entries[N_INLINE_POLL_ENTRIES];

};

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *,struct poll_table_struct *);

typedef struct poll_table_struct {

   poll_queue_proc qproc;

} poll_table;

poll_initwait函数如下:

[cpp]

void poll_initwait(struct poll_wqueues*pwq)

{

   init_poll_funcptr(&pwq->pt,__pollwait);//设置poll_table结构中的qproc函数指针为__pollwait函数,就是pwq->pt->qproc=__pollwait。这个函数是一个回调函数,基本上这种机制的实现,就是依靠回调函数了。

   pwq->error = 0;

   pwq->table = NULL;

   pwq->inline_index = 0;

}

所以poll_initwait就是初始化了poll_wqueuestable,主要是将其结构中的函数指针设置为__pollwait函数。那么这个函数是做什么的呢?我们先看poll_initwait之后调用的函数,就是do_poll函数,其实现如下:

注意下面函数在调用时的参数,参数有这么几个nfds, head, &table,timeout,参数就容易理解了:nfds表示poll调用时传入的数组中structpollfd的个数,head其实是表示将poll调用时传入的数组,因为全部都表示为structpoll_list链表了(前面分析的,还记得吧),table是刚刚初始化的一个,里面暂时就只是包含一个回调函数的指针,就是__pollwait函数。timeout表示超时时间。

[cpp]

static int do_poll(unsigned int nfds,  structpoll_list *list,

          struct poll_wqueues *wait, s64 *timeout)

{

    int count =0;

    poll_table*pt = &wait->pt;



   

    if(!(*timeout))

       pt = NULL;



    for (;;){

       struct poll_list *walk;

       long __timeout;



       set_current_state(TASK_INTERRUPTIBLE);

       for (walk = list; walk != NULL; walk = walk->next){

           struct pollfd * pfd, * pfd_end;



           pfd = walk->entries;

           pfd_end = pfd + walk->len;

           for (; pfd != pfd_end; pfd++) {

               

               if (do_pollfd(pfd, pt)) {

                   count++;

                   pt = NULL;

               }

           }

       }

        

       pt = NULL;

       if (!count) {

           count = wait->error;

           if (signal_pending(current))

               count = -EINTR;

       }

       if (count || !*timeout)

           break;



       if (*timeout < 0) {

            

           __timeout = MAX_SCHEDULE_TIMEOUT;

       } else if (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT-1)){

            

           __timeout = MAX_SCHEDULE_TIMEOUT - 1;

           *timeout -= __timeout;

       } else {

           __timeout = *timeout;

           *timeout = 0;

       }



       __timeout = schedule_timeout(__timeout);

       if (*timeout >= 0)

           *timeout += __timeout;

   }

   __set_current_state(TASK_RUNNING);

    returncount;

}

这个函数有以下几个要注意的点:

信号处理保障。在这个函数中先将当前进程设置为可以被信号中断,就是set_current_state(TASK_INTERRUPTIBLE)这一行,后面还会检查是否有需要处理的信号signal_pending(current)。这里的意思是就算是poll调用进入到sys_poll系统调用之后,也可以接收外部信号,从而退出当前系统调用(因为我们知道一般的系统调用都不会被中断的,所以系统调用一般都尽量很快的返回)。

外部大循环退出的条件,外部大循环退出的条件只有if (count || !*timeout)break;后面的条件容易理解,就是超时,前面的count是什么意思?它在每次调用do_pollfd函数之后,都有可能会加1,其实调用do_pollfd就是检查socket描述符状态的变化,如果有变化,就会使count加1,所以在结束内部遍历之后,count保存了所有的有状态变化的socket描述符数量。

这个函数会对之前以head为头结点的链表进行遍历,然后链表上每个结点中都包含很多很多的structpollfd进行遍历(这些struct pollfd都被存储在struct poll_list结构的数组字段struct pollfdentries里面。

然后对每个structpollfd调用do_pollfd(这会调用很多次,根据你传入多少个socket描述符而定),这个函数需要两个参数,一个是structpollfd,这没得说的,另一个是刚刚初始化的table,就是那个暂时只是包含__pollwait回调指针的结构,还记得吧。

我们再进入do_pollfd,了解这个函数是做什么的?

[cpp]

static inline unsigned int do_pollfd(struct pollfd *pollfd,poll_table *pwait)

{

    unsigned intmask;

    intfd;



    mask =0;

    fd =pollfd->fd;

    if (fd >=0) {

       int fput_needed;

       struct file * file;



       file = fget_light(fd, &fput_needed);

       mask = POLLNVAL;

       if (file != NULL) {

           mask = DEFAULT_POLLMASK;

           if (file->f_op &&file->f_op->poll)

               mask = file->f_op->poll(file, pwait);

            

           mask &= pollfd->events | POLLERR |POLLHUP;

           fput_light(file, fput_needed);

       }

   }

   pollfd->revents = mask;



    returnmask;

}

这个函数很简单,先根据socket描述符或者是文件句柄找到进程对应的struct file*file结构,然后调用file->f_op->poll(file,pwait),这是这个函数的核心调用,这其实也是linux的VFS的一部分,这会根据当前的文件是什么类型的文件来选择调用的入口,如file是socket网络文件,此时调用的就是由网络驱动设备来实现的poll,如果file是ext3等文件系统上打开的一个文件,那就会调用由该文件系统来实现的poll函数,我们以tcp_poll为例来了解一般poll完成什么工作;

注意下面的参数,file和wait是由file->f_op->poll调用传入的参数,而structsocket为socket连接的进程方面表示。

[cpp]

unsigned int tcp_poll(struct file *file, struct socket *sock,poll_table *wait)

{

    unsigned intmask;

    struct sock*sk = sock->sk;

    structtcp_sock *tp = tcp_sk(sk);



   poll_wait(file, sk->sk_sleep, wait);

    if(sk->sk_state == TCP_LISTEN)

       return inet_csk_listen_poll(sk);



   



    mask =0;

    if(sk->sk_err)

       mask = POLLERR;



   

    if(sk->sk_shutdown == SHUTDOWN_MASK || sk->sk_state ==TCP_CLOSE)

       mask |= POLLHUP;

    if(sk->sk_shutdown & RCV_SHUTDOWN)

       mask |= POLLIN | POLLRDNORM | POLLRDHUP;



   

    if ((1<< sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)){

        

       if ((tp->rcv_nxt != tp->copied_seq)&&

           (tp->urg_seq != tp->copied_seq ||

            tp->rcv_nxt != tp->copied_seq + 1 ||

            sock_flag(sk, SOCK_URGINLINE) ||!tp->urg_data))

           mask |= POLLIN | POLLRDNORM;



       if (!(sk->sk_shutdown & SEND_SHUTDOWN)){

           if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)){

               mask |= POLLOUT | POLLWRNORM;

           } else {   

               set_bit(SOCK_ASYNC_NOSPACE,

                   &sk->sk_socket->flags);

               set_bit(SOCK_NOSPACE,&sk->sk_socket->flags);



               

               if (sk_stream_wspace(sk) >=sk_stream_min_wspace(sk))

                   mask |= POLLOUT | POLLWRNORM;

           }

       }



       if (tp->urg_data & TCP_URG_VALID)

           mask |= POLLPRI;

   }

    returnmask;

}

上面的tcp_poll看上去很长,但核心的的调用是:

[cpp]

poll_wait(file, sk->sk_sleep,wait);

这个函数的file和wait是我们在poll调用过程中传入的参数,sk->sk_sleep是什么呢?这里解释一下

sk的值是

[cpp]

struct sock *sk = sock->sk;

struct sock是socket连接的内核表示,sk->sk_sleep是structwait_queue_head_t结构类型,这表示的是socket的等待队列,每一个socket都有自己的一个等待队列,由内核结构structsock来维护。

其实大多数驱动实现的时候,此时都调用这个函数,这个函数也很简单,实现如下:

[cpp]

static inline void poll_wait(struct file * filp, wait_queue_head_t* wait_address, poll_table *p)

{

    if (p&& wait_address)

       p->qproc(filp, wait_address, p);

}

现在一个转折点出现了,前面我们说过初始化table的函数指针为__pollwait,那么此时调用的就是__pollwait(filp,wait_address,p),这里的参数分别表示为进程表示文件结构structfile,socket或设备的等待队列wait_queue_head_t,和poll_table。

再回顾一下,到此为止,从我们调用poll函数开始,然后复制数据至内核、将struct pollfd表示为内核的structpoll_list链表、初始化poll_table变量、然后调用do_pollfd函数等过程,其实都是为了检查poll传递的每个structpollfd是否有状态变化,也就是调用VFS的file->f_op->poll函数,这就到了__pollwait函数这里来了,这个函数会往等待队列上添加一个新的结点。

__pollwait的实现

[cpp]

static void __pollwait(struct file *filp, wait_queue_head_t*wait_address,

               poll_table *p)

{

    structpoll_table_entry *entry =poll_get_entry(p);

    if(!entry)

       return;

   get_file(filp);

   entry->filp = filp;

   entry->wait_address = wait_address;

   init_waitqueue_entry(&entry->wait,current);

   add_wait_queue(wait_address,&entry->wait);

}

我们现在来分析一下,__pollwait调用完成之后,内核做了什么?先看一下poll_get_entry(p);

[cpp]

static struct poll_table_entry *poll_get_entry(poll_table*_p)

{

    structpoll_wqueues *p = container_of(_p, struct poll_wqueues,pt);

    structpoll_table_page *table = p->table;



    if(p->inline_index <N_INLINE_POLL_ENTRIES)

       return p->inline_entries +p->inline_index++;



    if (!table|| POLL_TABLE_FULL(table)) {

       struct poll_table_page *new_table;



       new_table = (struct poll_table_page *)__get_free_page(GFP_KERNEL);

       if (!new_table) {

           p->error = -ENOMEM;

           __set_current_state(TASK_RUNNING);

           return NULL;

       }

       new_table->entry =new_table->entries;

       new_table->next = table;

       p->table = new_table;

       table = new_table;

   }



    returntable->entry++;

}

这个函数会根据情况创建structpoll_table_page结构,因为__pollwait在系统中是会被多次调用的,所以可能会有多个structpoll_table_page结构,这个结构是对struct poll_table_entry的一个封装,其结构如下所示:

[cpp]

struct poll_table_page {

    structpoll_table_page * next;

    structpoll_table_entry * entry;

    structpoll_table_entry entries[0];

};

struct poll_table_entry {

    struct file* filp;

    wait_queue_twait;

   wait_queue_head_t * wait_address;

};

所以在调用poll_get_entry之后,会返回一个新的poll_table_entry,这也是每次调用__pollwait都会产生的。接下来调用init_waitqueue_entry函数将这个新建的structpoll_table_entry和当前的进程绑定起来,再将structpoll_table_entry加入到socket的等待队列。这样就将当前进程和socket的等待队列联系,说白了,就是把current挂到等待队列上。

因为一旦有数据就绪,就会叫醒等待队列上的进程。可以看代码

[cpp]

static inline void init_waitqueue_entry(wait_queue_t *q, structtask_struct *p)

{

    q->flags= 0;

   q->private = p;

    q->func =default_wake_function;

}

这里同时,注册了一个数据就绪时的叫醒函数

[cpp]

int default_wake_function(wait_queue_t *curr, unsigned mode, intsync,

             void *key)

{

    returntry_to_wake_up(curr->private, mode,sync);

}

这就完成了调用。再来所有回顾一下

调用poll函数。

进入sys_poll等系列内核调用。

准备数据:,注册__pollwait(这是通过初始化poll_wqueues来完成的),复制数据至内核,重新组织成structpoll_list等等。

对所有的struct pollfd循环,以调用do_pollfd函数。

do_pollfd调用file->f_op->poll函数。

然后调用__pollwait创建一个struct poll_table_entry,并将其与当前进程绑定。

将当前进程挂在socket的等待队列上。

有数据就绪时唤醒进程。

使用特权

评论回复

相关帖子

发新帖 我要提问
您需要登录后才可以回帖 登录 | 注册

本版积分规则

379

主题

379

帖子

0

粉丝