论坛首页 Java企业应用论坛

Java NIO 选择器(Selector) 知识预备 (linux epoll)

浏览 10590 次
该帖已经被评为精华帖
作者 正文
   发表时间:2011-08-10   最后修改:2011-08-10

最近花些功夫在研究Java NIO的JDK源码,发现Selector的实现,除了在唤醒机制上做了手脚,主要依赖操作系统的实现,为了无负担的弄懂Selector,有必要研究一 下操作系统是如何实现选择的。本文主要参考linux-2.6.10内核epoll的实现(poll见上一篇:  Java NIO 选择器(Selector) 知识预备 (linux poll))。

 

本文可能会表现得很肤浅,高手们请直接略过,另外,本文所出现的“政府”字样,乃比喻性质的,或者就认为它是“清政府”好了,请相关人员不要曲解。

 

上回冒充大侠poll府上走了一遭,感觉还不过瘾,于是计划再到它表哥epoll家去闯闯,可是man了一下之后,我有点退却了,丫的,还以为它表哥是一个人,原来是仨儿:

 

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
 
typedef union epoll_data {
    void        *ptr;
    int          fd;
    __uint32_t   u32;
    __uint64_t   u64;
} epoll_data_t;
 
struct epoll_event {
    __uint32_t   events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};
 

首先看看epoll_event是啥玩意儿,应该和pollfd类似吧?
还记得pollfd的定义吗?

 

struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};
 

对比一下,发现区别不大,epoll_data_t是一个共用体,至少我们可以认为它可以是一个fd,所以较大的不同点就在于epoll_event没有 revents了,上次探索poll的时候不是发现,这是poll一个很关键的地方吗?最终事件是否发生就看它的值了。决心带着这个疑问去探一探。


这次,我不打算带大家一步一步的串门,因为要理解epoll,最关键的就是它的结构设计,所以这里先从epoll的结构出发,请看下面一副简化的结构及联系图:

 


 

先介绍一下图中涉及到的各种结构体:

  • 先说明一下epitem结构体是什么,顾名思义,即为epollitem,epoll的基本单元,下面分别介绍一下几个主要的变量的含义:
  • struct list_head rdllink,或者取名为ready_list_link你会更容易理解,当epitem对应的fd的存在已经ready的I/O事件,则 ep_poll_callback回调函数会将该结点链接到eventpoll中的rdllist循环链表中去,这样就将ready的epitem都串连 起来了
  • struct epoll_filefd ffd,ffd中只包含一个fd及fd对应的file的指针
  • struct eventpoll *ep,eventpoll的指针,每个epitem都有这样一个指针,它指向对应的eventpoll变量,其实它的作用很简单,我们只要拿到了epitem,就可以根据它拿到eventpoll
  • struct event_poll event,还记得epoll_ctl的参数类型吗?其中就有一个event_poll指针,而该event即用来存放总用户空间拷贝的event_poll
  • 然后说明一下eventpoll结构体中主要的变量的含义:
    • struct list_head rdllist,ready_link_list,表示这是一个链表,事实上它就是一个循环链表,链表中的每个结点即为epitem中的rdllink,rdllist中链接的所有rdllink对应的epitem有事件ready
    • struct rb_root rbr,红黑树的根结点,其实每一个epitem中的第一个变量即为struct rb_node rbn;即表示红黑树的一个结点,所以rbr即是这样一颗红黑树,它的结点都为epitem变量,即相当于一个Set,将所有epitem管理起来,通过 它可以很方便的增删改查epitem
  • 再看eppoll_entry结构体,它主要有这样几个变量:
    • void * base,base指向其对应的epitem
    • wait_queue_t wait,等待队列的项,wait中有一个唤醒回调函数指针,且该指针被初始化为ep_poll_callback,wait会被挂在到设备的等待队列 中,等待设备的唤醒,当设备因状态改变,唤醒wait时,会执行ep_poll_callback,而ep_poll_callback会做这样一件 事:list_add_tail(&epi->rdllink,&ep->rdllist),其中epi即为epitem变 量,通过wait偏移拿到eppoll_entry,然后可以拿到base指针,即拿到了对应的epitem,而ep即为eventpoll变量,通过 epitem的ep指针即可拿到,list_add_tail将epi的rdllink链到ep的rdllist中

下面结合这幅图大致讲解一下epoll_create、epoll_ctl、epoll_wait都在做些什么:

  • 首先,epoll_create会创建一个epoll的文件(epfd),同时创建并初始化一个struct eventpoll,其中file的private_data指针即指向了eventpoll变量,因此,知道epfd就可以拿到file,即拿到了 eventpoll变量,这就是epoll_create所做的工作
  • epoll_ctl又做了什么事呢?首先大家看到了eventpoll中的rb_root红黑树吗?epoll_ctl其实就是在操作这颗红黑树,epoll_ctl有三种操作:
  • EPOLL_CTL_ADD:往红黑树中创建并添加一个epitem,对应处理函数为ep_insert
    在添加epitem时,也就是在ep_insert过程中,会创建一个eppoll_entry,并将wait_queue挂载到设备的等待队列上,其中 该wait_queue的唤醒回调函数为ep_poll_callback,当设备有事件ready而唤醒wait_queue时,就会执行 ep_poll_callback将当前epitem链接到eventpoll中的rdllist中去,另外,如果在挂载wait_queue时就发现设 备有事件ready了,同样会将epitem链接到rdllist中去
  • EPOLL_CTL_MOD:修改对应的epitem,对应处理函数为ep_modify
    在ep_modify过程中,处理会修改epitem对应的event值,同样会先查看一下对应设备的当前状态,如果有ready事件发生,则会将当前epitem链接到rdllist中去
  • EPOLL_CTL_DEL:从红黑树中删除对应的epitem,对应处理函数为ep_remove
    释放钩子、链接、资源空间等,如epitem所占的空间
  • 其实epoll_ctl已经将绝大部分事情都做了,epoll_wait有只需要收集结果就行了,它的目标也很单一,就看rdllist中是否有 元素即可,当然,它还需要控制timeout,及结果转移,因为对于rdllist链接的epitem,只能说明其对应的fd有事件ready,但是哪些 事件是不知道的,因此epoll_ctl再收集结果时,会亲自查看一下对应file的ready状态,来写回events

在给大家大致讲解了epoll涉及到的结构及epoll三兄弟大概在做些什么之后,开始我们的探索之旅吧:

 

epoll_create

先看sys_epoll_create系统调用:

asmlinkage long sys_epoll_create(int size)
{
    int error, fd;
    struct inode *inode;
    struct file *file;
 
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d)\n",
             current, size));
 
    /* Sanity check on the size parameter */
    error = -EINVAL;
    if (size         goto eexit_1;
 
    /*
     * Creates all the items needed to setup an eventpoll file. That is,
     * a file structure, and inode and a free file descriptor.
     */
    error = ep_getfd(&fd, &inode, &file);
    if (error)
        goto eexit_1;
 
    /* Setup the file internal data structure ( "struct eventpoll" ) */
    error = ep_file_init(file);
    if (error)
        goto eexit_2;
 
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %d\n",
             current, size, fd));
 
    return fd;
 
eexit_2:
    sys_close(fd);
eexit_1:
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %d\n",
             current, size, error));
    return error;
}
 

我们只需要注意到两个函数:ep_getfd和ep_file_init

  • ep_getfd其实就是在创建文件,我们这里不讲文件是如何创建的,大家只需要知道,调用了这个函数之后,除非出错,否则epoll文件就会被创建出来
  • ep_file_init我们到可以讲一下:
    顾名思义,它就是初始化刚才创建的文件的,下面看看它究竟初始化了哪些内容:

 

static int ep_file_init(struct file *file)
{
    struct eventpoll *ep;
 
    if (!(ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL)))
        return -ENOMEM;
 
    memset(ep, 0, sizeof(*ep));
    rwlock_init(&ep->lock);
    init_rwsem(&ep->sem);
    init_waitqueue_head(&ep->wq);
    init_waitqueue_head(&ep->poll_wait);
    INIT_LIST_HEAD(&ep->rdllist);
    ep->rbr = RB_ROOT;
 
    file->private_data = ep;
 
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_file_init() ep=%p\n",
             current, ep));
    return 0;
}
 

从这几行代码可以看出,ep_file_init就做了两件事:

  • 创建并初始化一个eventpoll结构体变量
  • 指定file的private_data指针指向刚创建的eventpoll变量,这样,只要根据epoll文件描述符epfd就可以拿到file进而就拿到了eventpoll变量了,该eventpoll就是epoll_ctl和epoll_wait工作的场所

对外看来,epoll_create就做了一件事,那就是创建一个epoll文件,事实上,更关键的是,它创建了一个eventpoll结构体变量,该变量为epoll_ctl和epoll_wait的工作打下了基础。

epoll_ctl

展示一下epoll_ctl系统调用先:

asmlinkage long
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
    int error;
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p)\n",
             current, epfd, op, fd, event));
    error = -EFAULT;
    if (EP_OP_HASH_EVENT(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        goto eexit_1;
    /* Get the "struct file *" for the eventpoll file */
    error = -EBADF;
    file = fget(epfd);
    if (!file)
        goto eexit_1;
    /* Get the "struct file *" for the target file */
    tfile = fget(fd);
    if (!tfile)
        goto eexit_2;
    /* The target file descriptor must support poll */
    error = -EPERM;
    if (!tfile->f_op || !tfile->f_op->poll)
        goto eexit_3;
    /*
     * We have to check that the file structure underneath the file descriptor
     * the user passed to us _is_ an eventpoll file. And also we do not permit
     * adding an epoll file descriptor inside itself.
     */
    error = -EINVAL;
    if (file == tfile || !IS_FILE_EPOLL(file))
        goto eexit_3;
    /*
     * At this point it is safe to assume that the "private_data" contains
     * our own data structure.
     */
    ep = file->private_data;
    down_write(&ep->sem);
    /* Try to lookup the file inside our hash table */
    epi = ep_find(ep, tfile, fd);
    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_insert(ep, &epds, tfile, fd);
        } else
            error = -EEXIST;
        break;
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else
            error = -ENOENT;
        break;
    }
    /*
     * The function ep_find() increments the usage count of the structure
     * so, if this is not NULL, we need to release it.
     */
    if (epi)
        ep_release_epitem(epi);
    up_write(&ep->sem);
eexit_3:
    fput(tfile);
eexit_2:
    fput(file);
eexit_1:
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d\n",
             current, epfd, op, fd, event, error));
    return error;
}
 

记得前文提到过eventpoll结构体包含一个变量struct rb_root rbr,这就是一颗红黑树的根结点,epoll_ctl的ADD、DEL、MOD操作,就是在操作这颗红黑树。先分析一下代码流程:

  • 首先copy_from_user将用户传入的event_poll拷贝到epds中,以供自己使用
  • file = fget(epfd),根据epoll文件的描述符拿到对应的文件file
  • tfile = fget(fd),同理,根据fd拿到目标文件tfile
  • ep = file->private_data,即拿到了epoll_create创建的eventpoll结构体变量,准备开始工作
  • epi = ep_find(ep, tfile, fd),这里不详细讲解ep_find源码,只需要说明一下即可,ep_find即从ep中的红黑树中根据tfile和fd来查找epitem,还记得 epitem结构体吗,这是epoll的基本单元,每个被epoll_ctl添加过的fd都会保存在一个epitem变量中,每个epitem变量都是红 黑树的结点,如果不理解红黑树也不要紧,就简单把它看做一个Map,其Key为tfile+fd,Value即为epitem的指针,因此能够根据 ep_find查找到tfile+fd对应的epitem,当然,如果找到的epi==NULL,自然表明不存在了
  • 接着根据op的三种类型分别操作:
    • EPOLL_CTL_ADD
      首先epds.events |= POLLERR | POLLHUP确保“出错、连接挂起”被当做感兴趣事件,因为底层有义务将出错信息返回给应用;然后调用ep_insert生成一个epitem并插入到 ep对应的红黑树中;这里详细看一下ep_insert的实现:

    struct ep_pqueue {
        // poll_table结构体在讲解poll实现那篇有说明,内部只包含一个回调函数指针
        poll_table pt;
        // epitem指针
        struct epitem *epi;
    };
    static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
                 struct file *tfile, int fd)
    {
        int error, revents, pwake = 0;
        unsigned long flags;
        // 待创建的epitem变量的指针
        struct epitem *epi;
        // ep_pqueue结构体变量
        struct ep_pqueue epq;
     
        error = -ENOMEM;
        // 为epi分配空间
        if (!(epi = EPI_MEM_ALLOC()))
            goto eexit_1;
     
        /* Item initialization follow here ... */
        EP_RB_INITNODE(&epi->rbn); // 初始化红黑树结点
        // 初始化各种链表
        INIT_LIST_HEAD(&epi->rdllink);
        INIT_LIST_HEAD(&epi->fllink);
        INIT_LIST_HEAD(&epi->txlink);
        INIT_LIST_HEAD(&epi->pwqlist);
        // 初始化epi的ep指针指向ep,这样一来,只要拿到epi就可以拿到ep了
        epi->ep = ep;
        // 初始化epi的ffd变量,该变量包含一个文件指针+文件描述符
        EP_SET_FFD(&epi->ffd, tfile, fd);
        // 初始化event
        epi->event = *event;
        atomic_set(&epi->usecnt, 1);
        epi->nwait = 0;
     
        /* Initialize the poll table using the queue callback */
        // 初始化ep_pqueue变量的epi指针为刚创建的epq
        epq.epi = epi;
        // 初始化epq中回调函数指针指向ep_ptable_queue_proc
        // ep_ptable_queue_proc见下文
        init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
     
        /*
         * Attach the item to the poll hooks and get current event bits.
         * We can safely use the file* here because its usage count has
         * been increased by the caller of this function.
         */
        // 这里再说明一下为什么会有ep_pqueue这个结构体,它很像一个
        // 中转变量,我可以看到tfile->f_op->poll(tfile, &epq.pt)
        // 好像只用到了回调函数指针而已,那有必要用epq吗?这其实就是
        // 内核的一种惯用手法,在回调函数中,通过poll_table指针偏移
        // 即可拿到ep_pqueue,进而拿到对应的epitem指针
        // tfile对应poll方法中,epq.pt方法会被调用,即ep_ptable_queue_proc
        // 会被执行,从而将等待队列项(见ep_ptable_queue_proc)挂载到
        // 设备的等待队列上,当设备唤醒等待队列项时,ep_poll_callback将会被执行
        revents = tfile->f_op->poll(tfile, &epq.pt);
     
        /*
         * We have to check if something went wrong during the poll wait queue
         * install process. Namely an allocation for a wait queue failed due
         * high memory pressure.
         */
        if (epi->nwait < 0)
            goto eexit_2;     
     
        spin_lock(&tfile->f_ep_lock);
        // 将epi的fllink链接到tfile的f_ep_links上
        list_add_tail(&epi->fllink, &tfile->f_ep_links);
        spin_unlock(&tfile->f_ep_lock);
     
        /* We have to drop the new item inside our item list to keep track of it */
        write_lock_irqsave(&ep->lock, flags);
     
        /* Add the current item to the rb-tree */
        // 将创建并初始化好的epitem插入到eventpoll的红黑树中
        ep_rbtree_insert(ep, epi);
     
        /* If the file is already "ready" we drop it inside the ready list */
        // 因为刚才的file->f_op->poll执行之后,有可能对应file已经是ready状态了
        // 如果发现的确是感兴趣的事件发生,并且当前epitem没有链接(即没有被收集到ep的
        // rdllist中,简单说,不需要重复收集),则就将其链接到ep的rdllist上
        if ((revents & event->events) && !EP_IS_LINKED(&epi->rdllink)) {
            // 将epi的rdllink结点链接到ep的rdllist头结点上
            list_add_tail(&epi->rdllink, &ep->rdllist);
     
            /* Notify waiting tasks that events are available */
            // 将ep从等待队列中唤醒,或者这样理解,这里已经找到满意结果了,不用在等待了
            if (waitqueue_active(&ep->wq))
                wake_up(&ep->wq);
            if (waitqueue_active(&ep->poll_wait))
                pwake++;
        }
     
        write_unlock_irqrestore(&ep->lock, flags);
     
        /* We have to call this outside the lock */
        if (pwake)
            ep_poll_safewake(&psw, &ep->poll_wait);
     
        DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_insert(%p, %p, %d)\n",
                 current, ep, tfile, fd));
     
        return 0;
     
    eexit_2:
        ep_unregister_pollwait(ep, epi);
     
        /*
         * We need to do this because an event could have been arrived on some
         * allocated wait queue.
         */
        write_lock_irqsave(&ep->lock, flags);
        if (EP_IS_LINKED(&epi->rdllink))
            EP_LIST_DEL(&epi->rdllink);
        write_unlock_irqrestore(&ep->lock, flags);
     
        EPI_MEM_FREE(epi);
    eexit_1:
        return error;
    }
     
    // 回调函数
    static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                     poll_table *pt)
    {
        // 通过pt拿到epitem指针
        struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt);
        // 待创建的eppoll_entry结构体变量指针
        struct eppoll_entry *pwq;
     
        if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) {
            // 这个为epoll的一个关键的地方,给pwq中的等待队列项初始化唤醒
            // 回调函数,这里初始化为ep_poll_callback
            init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
            // 等待队列的头指针,例如,当执行file->f_op->poll,则whead即
            // 为file对应设备的等待队列头指针
            pwq->whead = whead;
            // pwq的base指针指向epi,这样只要拿到eppoll_entry就能拿到epitem了
            pwq->base = epi;
            // 挂载pwq中等待队列项,当设备唤醒该项时,wait中回调函数会被调用
            add_wait_queue(whead, &pwq->wait);
            // 将pwq的llink链接到epi的pwqlist上
            list_add_tail(&pwq->llink, &epi->pwqlist);
            epi->nwait++;
        } else {
            /* We have to signal that an error occurred */
            epi->nwait = -1;
        }
    }
    static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
    {
        int pwake = 0;
        unsigned long flags;
        // 通过wait拿到eppoll_entry中的base,即拿到了epitem
        struct epitem *epi = EP_ITEM_FROM_WAIT(wait);
        // 通过epi的ep指针即拿到了eventpoll
        struct eventpoll *ep = epi->ep;
     
        DNPRINTK(3, (KERN_INFO "[%p] eventpoll: poll_callback(%p) epi=%p ep=%p\n",
                 current, epi->file, epi, ep));
     
        write_lock_irqsave(&ep->lock, flags);
     
        /*
         * If the event mask does not contain any poll(2) event, we consider the
         * descriptor to be disabled. This condition is likely the effect of the
         * EPOLLONESHOT bit that disables the descriptor when an event is received,
         * until the next EPOLL_CTL_MOD will be issued.
         */
        if (!(epi->event.events & ~EP_PRIVATE_BITS))
            goto is_disabled;
     
        /* If this file is already in the ready list we exit soon */
        if (EP_IS_LINKED(&epi->rdllink))
            goto is_linked;
     
        // 因为被设备唤醒,则说明当前epi对应的fd有事件ready
        // 则将其链接到ep的rdllist上
        list_add_tail(&epi->rdllink, &ep->rdllist);
     
    is_linked:
        /*
         * Wake up ( if active ) both the eventpoll wait list and the ->poll()
         * wait list.
         */
        // 已经找到结果了,不需要等待了,就notify一下吧,告诉大家不用再等了
        if (waitqueue_active(&ep->wq))
            wake_up(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
     
    is_disabled:
        write_unlock_irqrestore(&ep->lock, flags);
     
        /* We have to call this outside the lock */
        if (pwake)
            ep_poll_safewake(&psw, &ep->poll_wait);
     
        return 1;
    }
     
  • ep_insert相关函数是整个epoll的核心,结合上面的图再来 看,ep_pqueue结构体及ep_ptable_queue_proc方法其实就是起着桥梁的作用,通过它们,有着ep_poll_callback 等待队列项被挂在到设备的等待队列上,当设备唤醒该等待队列项时,自然就将当前epitem链接到eventpoll的rdllist链表上。

  • EPOLL_CTL_DEL
    调用的函数为epoll_remove,这里不打算详细讲解,其实很容易理解,就是将epitem从eventpoll的红黑树中移除,起到取消注册的作用。
  • EPOLL_CTL_MOD
    调用的函数为epoll_modify,这里先看看它的实现:

 

static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event)
{
    int pwake = 0;
    unsigned int revents;
    unsigned long flags;
 
    /*
     * Set the new event interest mask before calling f_op->poll(), otherwise
     * a potential race might occur. In fact if we do this operation inside
     * the lock, an event might happen between the f_op->poll() call and the
     * new event set registering.
     */
    // 这个就是modify需要修改的地方,即修改对应的events
    epi->event.events = event->events;
 
    /*
     * Get current event bits. We can safely use the file* here because
     * its usage count has been increased by the caller of this function.
     */
    // 这个地方不要感到奇怪,说明几点后大家应该就容易理解了:
    // 1、既然是modify则说明之前已经被add过,不需要重复挂等待队列,因此回调函数为NULL
    // 2、同时因为NULL参数,即说明不需要回调,也不会有挂等待队列的操作
    // 该调用其实就是去file那里收集一下事件而已
    revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
 
    write_lock_irqsave(&ep->lock, flags);
 
    /* Copy the data member from inside the lock */
    // 这个就是modify需要修改的地方,即修改对应的data
    epi->event.data = event->data;
 
    /*
     * If the item is not linked to the hash it means that it's on its
     * way toward the removal. Do nothing in this case.
     */
    // 这个if不准备详细讲,其实很简单,前面不是已经问过file得到revents吗?
    // 如果当前epi已经被链接的话,就看是否是感兴趣事件发生,如果是,则同样将其
    // 添加到eventpoll的rdllist链表中,并notify
    if (EP_RB_LINKED(&epi->rbn)) {
        /*
         * If the item is "hot" and it is not registered inside the ready
         * list, push it inside. If the item is not "hot" and it is currently
         * registered inside the ready list, unlink it.
         */
        if (revents & event->events) {
            if (!EP_IS_LINKED(&epi->rdllink)) {
                list_add_tail(&epi->rdllink, &ep->rdllist);
 
                /* Notify waiting tasks that events are available */
                if (waitqueue_active(&ep->wq))
                    wake_up(&ep->wq);
                if (waitqueue_active(&ep->poll_wait))
                    pwake++;
            }
        }
    }
 
    write_unlock_irqrestore(&ep->lock, flags);
 
    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&psw, &ep->poll_wait);
 
    return 0;
}
 

可以看到,修改操作其实就修改红黑树中对应的epitem的event值,有个细节点需要注意,也就是内核不放弃任何一次机会,修改过程中也不忘问一下file的事件状态,如果有事件ready则同样将其链接到rdllist链表中。

 

epoll_wait

在讲解了epoll_ctl的过程之后,epoll_wait的确没什么内容了,也不想贴一大堆源码什么的,这里分几个点将其描述一下:

 

前文已经多次出现一个链表rdllist,该链表位于eventpoll结构体变量中,当ep_poll_callback回调函数被调用时,肯 定会将当前epitem链接进来,或者在ep_insert、ep_modify过程中,如果发现file有事件ready也会将当前epitem链接到 rdllist上,因此,我们可以猜测得到epoll_wait在做什么,看下面关键部分代码:

 

// 如果rdllist中还没有epitem时,就开始等待了
if (list_empty(&ep->rdllist)) {
        /*
         * We don't have any available event to return to the caller.
         * We need to sleep here, and we will be wake up by
         * ep_poll_callback() when events will become available.
         */
        // 初始化等待队列,等待队列项对应的线程即为当前线程
        init_waitqueue_entry(&wait, current);
        // 不用多说,先将当前线程挂到等待队列上,之后在调用schedule_timeout
        // 时,就开始了超时等待了
        add_wait_queue(&ep->wq, &wait);
 
        for (;;) {
            /*
             * We don't want to sleep if the ep_poll_callback() sends us
             * a wakeup in between. That's why we set the task state
             * to TASK_INTERRUPTIBLE before doing the checks.
             */
            // 这块内容比较熟悉,在poll讲解过程中也有说明,它与schedule_timeout配合
            // 因为会被阻塞,这里先设置线程状态为可中断
            set_current_state(TASK_INTERRUPTIBLE);
            // 整个循环的核心,其实就在看rdllist中是否有数据,或者等待超时
            // 应征了前面的说明,epoll_wait只需要等着收集数据即可
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break;
            // 如果被中断。。。后面部分比较简单,可以参照poll那篇
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }
 
            write_unlock_irqrestore(&ep->lock, flags);
            jtimeout = schedule_timeout(jtimeout);
            write_lock_irqsave(&ep->lock, flags);
        }
        remove_wait_queue(&ep->wq, &wait);
 
        set_current_state(TASK_RUNNING);
    }
 

其实还有一点需要说明,大家可能也会想到,rdllist中的epitem只能表示对应fd有事件ready,可是自始至终都没看到有地方回写revents,我们怎么知道到底是哪些事件ready了呢?
在ep_send_events函数中有这么一段代码:

 

list_for_each(lnk, txlist) {
    epi = list_entry(lnk, struct epitem, txlink);
 
    /*
     * Get the ready file event set. We can safely use the file
     * because we are holding the "sem" in read and this will
     * guarantee that both the file and the item will not vanish.
     */
    revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);
 
    /*
     * Set the return event set for the current file descriptor.
     * Note that only the task task was successfully able to link
     * the item to its "txlist" will write this field.
     */
    epi->revents = revents & epi->event.events;
 

看到这段代码,应该很清楚了,只需要遍历链表,再去拿一次就好了,见关键代码:


revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);


这一句在ep_modify中也有出现。

还有一点我这里故意隐瞒,其实不是我特别想说明的点,对理解epoll影响也不大,那就是收集结果不是直接从rdllist中进行的,这中间还有一个转移的过程,在epoll_wait的最后进行,关键代码如下:

 

static int ep_collect_ready_items(struct eventpoll *ep, struct list_head *txlist, int maxevents)
{
    int nepi;
    unsigned long flags;
    // rdllist里存放的就是当前ready的epitem链表,且至少存在一个epitem
    struct list_head *lsthead = &ep->rdllist, *lnk;
    struct epitem *epi;
 
    write_lock_irqsave(&ep->lock, flags);
 
    // 遍历rdllist链表
    for (nepi = 0, lnk = lsthead->next; lnk != lsthead && nepi < maxevents;) {
        // 先拿到epitem
        epi =   list_entry(lnk, struct epitem, rdllink);
        lnk = lnk->next;
 
        /* If this file is already in the ready list we exit soon */
        // 确保不会被重复链接到txlink上
        if (!EP_IS_LINKED(&epi->txlink)) {
            /*
             * This is initialized in this way so that the default
             * behaviour of the reinjecting code will be to push back
             * the item inside the ready list.
             */
            epi->revents = epi->event.events;
 
            /* Link the ready item into the transfer list */
            // 将epi的txlink链接到ep的txlist上,简单的说
            // 将对应的epitem链接到txlist链表上
            list_add(&epi->txlink, txlist);
            nepi++;
 
            /*
             * Unlink the item from the ready list.
             */
            // 因为已经被转移了,所以从rdllist链表中清除
            EP_LIST_DEL(&epi->rdllink);
        }
    }
 
    write_unlock_irqrestore(&ep->lock, flags);
 
    return nepi;
}

 

经过这一步,rdllist中当前的结果已经被转移到txlist中,之后如果有新加入到rdllist的话,本次epoll_wait不会再关心,不过可以留到下次再收集。
还记得写回events的过程吗?最后的工作,当前是遍历txlist链表,并将结果写回到用户空间中了。

 

总结

后面详细讲解epoll_create、epoll_ctl、epoll_wait只是为了让大家强化理解前面的那副图,这里讲解epoll并不涉 及到内存映射等优化点,只是为了让大家理解,epoll到底在干什么,到最后,留给大家的,也只是这幅图,或者更简单的一个点:原来回调函数是epoll 比poll高明的地方啊。至于为什么要创建一个文件来承载eventpoll,甚至采用红黑树来保存数据,都只是空间换时间而已。

 

PS. 本文地址:Java NIO 教程 ,请大家关注:黄金档

 

  • 大小: 60.4 KB
   发表时间:2011-08-11  
分析的很深,楼主应该花了不少功夫吧。

期待更多的产出。
0 请登录后投票
   发表时间:2011-08-11  
楼主去看看apache mina 写一边这样的文章就好了
0 请登录后投票
   发表时间:2011-08-11  
现在iteye流行源码风了?
0 请登录后投票
   发表时间:2011-08-11  
NightWatch 写道
现在iteye流行源码风了?

看源码,写感受,挺好的啊。
0 请登录后投票
   发表时间:2011-08-12  
希望楼主多出这样的好文章啊
0 请登录后投票
   发表时间:2011-08-12  
绝对精华贴……
0 请登录后投票
   发表时间:2011-08-12   最后修改:2011-08-12
楼主想向你请教一个问题。
前段时间看了下java nio selector的java层面的实现(没有深入到native code看cpp代码,且也只是粗略的看),发现一个问题,对于每一个注册了相关事件的socketchannel/serversocketchannel,selector都会为他们开启一个等待线程sun.nio.windowSelectorImpl$selectThread,这让我很迷惑:
selector本名多路复用机制,就是为了在一个通道上完成以前需要多个慢速通道才能完成问题,这种机制的好处也不就是在于避免了 thread per socketchannel而频繁导致thread的阴塞与调度吗?
所以对于openjdk里selectorimp.java,最终每个socketchannel一个thread的模式,能给解释下吗?谢谢。
0 请登录后投票
   发表时间:2011-08-13   最后修改:2011-08-13
很好,有时间再来细看
0 请登录后投票
   发表时间:2011-08-15  
littlecar 写道
楼主想向你请教一个问题。
前段时间看了下java nio selector的java层面的实现(没有深入到native code看cpp代码,且也只是粗略的看),发现一个问题,对于每一个注册了相关事件的socketchannel/serversocketchannel,selector都会为他们开启一个等待线程sun.nio.windowSelectorImpl$selectThread,这让我很迷惑:
selector本名多路复用机制,就是为了在一个通道上完成以前需要多个慢速通道才能完成问题,这种机制的好处也不就是在于避免了 thread per socketchannel而频繁导致thread的阴塞与调度吗?
所以对于openjdk里selectorimp.java,最终每个socketchannel一个thread的模式,能给解释下吗?谢谢。


首先要理解windows下的NIO的Selector其实就是用select实现的,select有MAX_SELECTABLE_FDS限制,而NIO的Selector似乎没有这个限制,为什么,这就是SelectThread的作用,SelectThread中有SubSelector可以用于poll,同时WindowsSelector保证了MAX_SELECTABLE_FDS - 1会拥有一个Daemon的SelectThread线程,去完成select工作,因此不是每个Channel会有一个SelectThread,而是每MAX_SELECTABLE_FDS - 1会有一个SelectThread去做select工作。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics