`
yuanyu5237
  • 浏览: 162846 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

libevent—异步IO编程

 
阅读更多

原地址:http://www.wangafu.net/~nickm/libevent-book/01_intro.html

关于

本文档旨在教会你如何使用libevent2.0编写高效可移植的异步网络程序,我们假定:

  • 你熟悉C语言。

  • 你熟悉C语言基本网络接口 (比如socket(), connect()等)。

示例程序

本文档所有例程都可以在Linux, FreeBSD, OpenBSD, NetBSD, Mac OS X, Solaris, 和Android平台上运行. 但其中某些可能无法在Windows上正常编译.

 

异步IO编程

多数程序员一开始都是使用阻塞式IO编程。所谓同步IO调用一般是指当你发起一个IO调用操作,应用程序或者在操作完成后返回,或者等待一段时间后返回。比如调用connect函数发起一个TCP连接,除非它从对端收到了一个SYN ACK(连接成功)报文,或者等待了很长一段时间,否则该函数不会返回。 

下面是一个简单的阻塞IO例程,它连接到www.google.com的主机,发送一个简单的http请求,并打印返回结果。

Example: A simple blocking HTTP client
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For gethostbyname */
#include <netdb.h>

#include <unistd.h>
#include <string.h>
#include <stdio.h>

int main(int c, char **v)
{
    const char query[] =
        "GET / HTTP/1.0\r\n"
        "Host: www.google.com\r\n"
        "\r\n";
    const char hostname[] = "www.google.com";
    struct sockaddr_in sin;
    struct hostent *h;
    const char *cp;
    int fd;
    ssize_t n_written, remaining;
    char buf[1024];

    /* Look up the IP address for the hostname.   Watch out; this isn't
       threadsafe on most platforms. */
    h = gethostbyname(hostname);
    if (!h) {
        fprintf(stderr, "Couldn't lookup %s: %s", hostname, hstrerror(h_errno));
        return 1;
    }
    if (h->h_addrtype != AF_INET) {
        fprintf(stderr, "No ipv6 support, sorry.");
        return 1;
    }

    /* Allocate a new socket */
    fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd < 0) {
        perror("socket");
        return 1;
    }

    /* Connect to the remote host. */
    sin.sin_family = AF_INET;
    sin.sin_port = htons(80);
    sin.sin_addr = *(struct in_addr*)h->h_addr;
    if (connect(fd, (struct sockaddr*) &sin, sizeof(sin))) {
        perror("connect");
        close(fd);
        return 1;
    }

    /* Write the query. */
    /* XXX Can send succeed partially? */
    cp = query;
    remaining = strlen(query);
    while (remaining) {
      n_written = send(fd, cp, remaining, 0);
      if (n_written <= 0) {
        perror("send");
        return 1;
      }
      remaining -= n_written;
      cp += n_written;
    }

    /* Get an answer back. */
    while (1) {
        ssize_t result = recv(fd, buf, sizeof(buf), 0);
        if (result == 0) {
            break;
        } else if (result < 0) {
            perror("recv");
            close(fd);
            return 1;
        }
        fwrite(buf, 1, result, stdout);
    }

    close(fd);
    return 0;
}

上面的所有网络调用都是阻塞式的:gethostbyname只有在解析网址www.google.com成功或失败后才会返回;connect只在连接成功后返回; recv只在接收到数据或者关闭连接后返回;send至少也要等到数据被刷新到内核的写缓冲区之后才会返回。

就目前来说,阻塞式IO没有什么不好。如果只是完成以上这些功能,阻塞式IO是一个不错的选择。但假如你需要编写程序来一次处理多个连接,怎么办?具体点,你想从两个连接中读取数据,但又不知道哪个连接的数据会先到达,你可千万不能像下面这边编写代码:

Bad Example
/* This won't work. */
char buf[1024];
int i, n;
while (i_still_want_to_read()) {
    for (i=0; i<n_sockets; ++i) {
        n = recv(fd[i], buf, sizeof(buf), 0);
        if (n==0)
            handle_close(fd[i]);
        else if (n<0)
            handle_error(fd[i], errno);
        else
            handle_input(fd[i], buf, n);
    }
}

因为即使描述符fd[2]上的数据先到,你的程序也会等到fd[0]和fd[1]上的数据获取完成后才会去读取fd[2]上的数据。

这个问题可以用多线程或多进程服务器解决。最简单的方式就是为每个连接都建立一个单独的进程(或线程)来处理数据读写。因为每个连接都有自己的进程,等待连接的阻塞调用(accept)就不会阻塞其他连接。

以下是一个简单的服务器程序,在端口47013上等待连接,从socket的输入端读取一行数据,将字母做一个ROT13变换,再在socket的输出端返回这些数据到客户端,该程序使用了unix系统的fork调用来为每个新连接创建进程。

Example: Forking ROT13 server
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>

#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

#define MAX_LINE 16384

char
rot13_char(char c)
{
    /* We don't want to use isalpha here; setting the locale would change
     * which characters are considered alphabetical. */
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void
child(int fd)
{
    char outbuf[MAX_LINE+1];
    size_t outbuf_used = 0;
    ssize_t result;

    while (1) {
        char ch;
        result = recv(fd, &ch, 1, 0);
        if (result == 0) {
            break;
        } else if (result == -1) {
            perror("read");
            break;
        }

        /* We do this test to keep the user from overflowing the buffer. */
        if (outbuf_used < sizeof(outbuf)) {
            outbuf[outbuf_used++] = rot13_char(ch);
        }

        if (ch == '\n') {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}

void
run(void)
{
    int listener;
    struct sockaddr_in sin;

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);

#ifndef WIN32
    {
        int one = 1;
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    }
#endif

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) {
        perror("listen");
        return;
    }



    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener, (struct sockaddr*)&ss, &slen);
        if (fd < 0) {
            perror("accept");
        } else {
            if (fork() == 0) {
                child(fd);
                exit(0);
            }
        }
    }
}

int
main(int c, char **v)
{
    run();
    return 0;
}

至此,我们似乎有了一次处理多个连接的完美解决方案?其实不然,首先,进程创建(甚至线程创建)在某些平台上的开销是非常昂贵的。事实上,你可能会使用线程池技术,而不是无限制的创建进程。如果你的程序需要一次处理成千上万的连接时,这就会变得非常低效,还不如让每个CPU只处理较少的线程。

但如果多线程不是处理多连接问题的答案,又当如何?在Unix平台上,你可以设置sockets为非阻塞模式。下面的调用就是用来干这个事的:

fcntl(fd, F_SETFL, O_NONBLOCK);

此时fd是一个socket描述符[1] 一旦你设置socket描述符fd为非阻塞模式,之后所有对fd的网络调用都会,或者立即完成操作,然后返回;或者返回一个错误码,表示“操作无法继续进行,请重试”。因此我们的双socket程序可以写成这样(非常天真的想法):

Bad Example: busy-polling all sockets
/* This will work, but the performance will be unforgivably bad. */
int i, n;
char buf[1024];
for (i=0; i < n_sockets; ++i)
    fcntl(fd[i], F_SETFL, O_NONBLOCK);

while (i_still_want_to_read()) {
    for (i=0; i < n_sockets; ++i) {
        n = recv(fd[i], buf, sizeof(buf), 0);
        if (n == 0) {
            handle_close(fd[i]);
        } else if (n < 0) {
            if (errno == EAGAIN)
                 ; /* The kernel didn't have any data for us to read. */
            else
                 handle_error(fd[i], errno);
         } else {
            handle_input(fd[i], buf, n);
         }
    }
}

因为使用了非阻塞事socket,上面的代码会“勉为其难”地运行。其性能会因为2个原因而糟糕透顶:1,当两个连接上都没有数据时,该循环会无限地自旋(spin),耗尽CPU资源. 2, 如果用这种方法处理一个以上连接时,无论是否有数据到达,每个连接都会产生一个内核调用。因此,正确的做法是采用一种方式告诉内核:等吧,等到某个socket有数据就绪时,告诉我到底是哪一个就行了。

最古老的方式就是用select,现在也还在用。select()接收3个描述符集合fds (以字节数组的方式实现): 一个用来读, 一个用来写, 还有一个表示异常。它会一直等待,直到描述符集中某个socket准备就绪,然后修改此集合,使其只包含该就绪socket,以备后用。

下面是一个使用了select的例子:

Example: Using select
/* If you only have a couple dozen fds, this version won't be awful */
fd_set readset;
int i, n;
char buf[1024];

while (i_still_want_to_read()) {
    int maxfd = -1;
    FD_ZERO(&readset);

    /* Add all of the interesting fds to readset */
    for (i=0; i < n_sockets; ++i) {
         if (fd[i]>maxfd) maxfd = fd[i];
         FD_SET(i, &readset);
    }

    /* Wait until one or more fds are ready to read */
    select(maxfd+1, &readset, NULL, NULL, NULL);

    /* Process all of the fds that are still set in readset */
    for (i=0; i < n_sockets; ++i) {
        if (FD_ISSET(fd[i], &readset)) {
            n = recv(fd[i], buf, sizeof(buf), 0);
            if (n == 0) {
                handle_close(fd[i]);
            } else if (n < 0) {
                if (errno == EAGAIN)
                     ; /* The kernel didn't have any data for us to read. */
                else
                     handle_error(fd[i], errno);
             } else {
                handle_input(fd[i], buf, n);
             }
        }
    }
}

以下,用select重新实现了ROT13 server:

Example: select()-based ROT13 server
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>
/* for select */
#include <sys/select.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

char
rot13_char(char c)
{
    /* We don't want to use isalpha here; setting the locale would change
     * which characters are considered alphabetical. */
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

struct fd_state {
    char buffer[MAX_LINE];
    size_t buffer_used;

    int writing;
    size_t n_written;
    size_t write_upto;
};

struct fd_state *
alloc_fd_state(void)
{
    struct fd_state *state = malloc(sizeof(struct fd_state));
    if (!state)
        return NULL;
    state->buffer_used = state->n_written = state->writing =
        state->write_upto = 0;
    return state;
}

void
free_fd_state(struct fd_state *state)
{
    free(state);
}

void
make_nonblocking(int fd)
{
    fcntl(fd, F_SETFL, O_NONBLOCK);
}

int
do_read(int fd, struct fd_state *state)
{
    char buf[1024];
    int i;
    ssize_t result;
    while (1) {
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
            break;

        for (i=0; i < result; ++i)  {
            if (state->buffer_used < sizeof(state->buffer))
                state->buffer[state->buffer_used++] = rot13_char(buf[i]);
            if (buf[i] == '\n') {
                state->writing = 1;
                state->write_upto = state->buffer_used;
            }
        }
    }

    if (result == 0) {
        return 1;
    } else if (result < 0) {
        if (errno == EAGAIN)
            return 0;
        return -1;
    }

    return 0;
}

int
do_write(int fd, struct fd_state *state)
{
    while (state->n_written < state->write_upto) {
        ssize_t result = send(fd, state->buffer + state->n_written,
                              state->write_upto - state->n_written, 0);
        if (result < 0) {
            if (errno == EAGAIN)
                return 0;
            return -1;
        }
        assert(result != 0);

        state->n_written += result;
    }

    if (state->n_written == state->buffer_used)
        state->n_written = state->write_upto = state->buffer_used = 1;

    state->writing = 0;

    return 0;
}

void
run(void)
{
    int listener;
    struct fd_state *state[FD_SETSIZE];
    struct sockaddr_in sin;
    int i, n, maxfd;
    fd_set readset, writeset, exset;

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    for (i = 0; i < FD_SETSIZE; ++i)
        state[i] = NULL;

    listener = socket(AF_INET, SOCK_STREAM, 0);
    make_nonblocking(listener);

#ifndef WIN32
    {
        int one = 1;
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    }
#endif

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) {
        perror("listen");
        return;
    }

    FD_ZERO(&readset);
    FD_ZERO(&writeset);
    FD_ZERO(&exset);

    while (1) {
        maxfd = listener;

        FD_ZERO(&readset);
        FD_ZERO(&writeset);
        FD_ZERO(&exset);

        FD_SET(listener, &readset);

        for (i=0; i < FD_SETSIZE; ++i) {
            if (state[i]) {
                if (i > maxfd)
                    maxfd = i;
                FD_SET(i, &readset);
                if (state[i]->writing) {
                    FD_SET(i, &writeset);
                }
            }
        }

        n = select(maxfd+1, &readset, &writeset, &exset, NULL);

        if (FD_ISSET(listener, &readset)) {
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);
            int fd = accept(listener, (struct sockaddr*)&ss, &slen);
            if (fd < 0) {
                perror("accept");
            } else if (fd > FD_SETSIZE) {
                close(fd);
            } else {
                make_nonblocking(fd);
                state[fd] = alloc_fd_state();
                assert(state[fd]);/*XXX*/
            }
        }

        for (i=0; i < maxfd+1; ++i) {
            int r = 0;
            if (i == listener)
                continue;

            if (FD_ISSET(i, &readset)) {
                r = do_read(i, state[i]);
            }
            if (r == 0 && FD_ISSET(i, &writeset)) {
                r = do_write(i, state[i]);
            }
            if (r) {
                free_fd_state(state[i]);
                state[i] = NULL;
                close(i);
            }
        }
    }
}

int
main(int c, char **v)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();
    return 0;
}

还没完!因为产生和读取select字节数组所花时间与传入的最大的描述符集合的大小成比例,当socket的数量很大时,select会非常耗时。[2]

不同的操作系统为select提供了不同的替代函数。有poll(), epoll(), kqueue(), evports, 和 /dev/poll。所有这些都比select()性能要好, 除了poll()其余的函数在操作:添加socket, 删除socket,和监视某个socketIO就绪时,都能获得O(1)时间复杂度的性能。

可惜的是,这些高性能接口没有一个统一的标准。Linux对应epoll(), the BSDs (including Darwin)对应kqueue(), Solaris对应evports和/dev/poll…没有一个操作系统会有其他平台上的接口。所以如果想写一个高性能的可移植异步程序,就需要一个所有平台上这些高性能接口的抽象,并且在该平台上使用时一定会提供最高效的那一种接口。

这其实就是libevent的低级API所做的事情,它为不同平台上的select替代函数提供了一个一致性接口,并且保证会使用应用程序所运行平台上可用的最高效版本。

下面是异步ROT13服务器的另一个版本。使用 Libevent 2 而不是select()。 注意,描述符集fd_sets已经不见了,现在,我们把(IO)事件与结构体event_base关联(associate and disassociate),它可能会是select(), poll(), epoll(), kqueue()等中的一种。

Example: A low-level ROT13 server with Libevent
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
    /* We don't want to use isalpha here; setting the locale would change
     * which characters are considered alphabetical. */
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

struct fd_state {
    char buffer[MAX_LINE];
    size_t buffer_used;

    size_t n_written;
    size_t write_upto;

    struct event *read_event;
    struct event *write_event;
};

struct fd_state *
alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{
    struct fd_state *state = malloc(sizeof(struct fd_state));
    if (!state)
        return NULL;
    state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
    if (!state->read_event) {
        free(state);
        return NULL;
    }
    state->write_event =
        event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);

    if (!state->write_event) {
        event_free(state->read_event);
        free(state);
        return NULL;
    }

    state->buffer_used = state->n_written = state->write_upto = 0;

    assert(state->write_event);
    return state;
}

void
free_fd_state(struct fd_state *state)
{
    event_free(state->read_event);
    event_free(state->write_event);
    free(state);
}

void
do_read(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = arg;
    char buf[1024];
    int i;
    ssize_t result;
    while (1) {
        assert(state->write_event);
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
            break;

        for (i=0; i < result; ++i)  {
            if (state->buffer_used < sizeof(state->buffer))
                state->buffer[state->buffer_used++] = rot13_char(buf[i]);
            if (buf[i] == '\n') {
                assert(state->write_event);
                event_add(state->write_event, NULL);
                state->write_upto = state->buffer_used;
            }
        }
    }

    if (result == 0) {
        free_fd_state(state);
    } else if (result < 0) {
        if (errno == EAGAIN) // XXXX use evutil macro
            return;
        perror("recv");
        free_fd_state(state);
    }
}

void
do_write(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = arg;

    while (state->n_written < state->write_upto) {
        ssize_t result = send(fd, state->buffer + state->n_written,
                              state->write_upto - state->n_written, 0);
        if (result < 0) {
            if (errno == EAGAIN) // XXX use evutil macro
                return;
            free_fd_state(state);
            return;
        }
        assert(result != 0);

        state->n_written += result;
    }

    if (state->n_written == state->buffer_used)
        state->n_written = state->write_upto = state->buffer_used = 1;

    event_del(state->write_event);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = arg;
    struct sockaddr_storage ss;
    socklen_t slen = sizeof(ss);
    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    if (fd < 0) { // XXXX eagain??
        perror("accept");
    } else if (fd > FD_SETSIZE) {
        close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
    } else {
        struct fd_state *state;
        evutil_make_socket_nonblocking(fd);
        state = alloc_fd_state(base, fd);
        assert(state); /*XXX err*/
        assert(state->write_event);
        event_add(state->read_event, NULL);
    }
}

void
run(void)
{
    evutil_socket_t listener;
    struct sockaddr_in sin;
    struct event_base *base;
    struct event *listener_event;

    base = event_base_new();
    if (!base)
        return; /*XXXerr*/

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);
    evutil_make_socket_nonblocking(listener);

#ifndef WIN32
    {
        int one = 1;
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    }
#endif

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) {
        perror("listen");
        return;
    }

    listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    /*XXX check it */
    event_add(listener_event, NULL);

    event_base_dispatch(base);
}

int
main(int c, char **v)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();
    return 0;
}

代码细节: 我们没有使用"int"型的sockets, 使用了evutil_socket_t。没有使用fcntl(O_NONBLOCK) 来设置sockets为非阻塞模式, 使用了evutil_make_socket_nonblocking。这些变化使代码与不同的平台(比如Win32 networking API)兼容性更好。

说好的简单呢? (Windows呢?)

 

 

 

你可能注意到代码虽然更加高效,但也更加复杂。以前我们创建进程的时候,不需替每个连接都管理缓冲区: 每个进程都有一个独立的栈缓冲区(stack-allocated buffer)。不需要显示地跟踪是否有socket可读或者可写:这些都隐含在代码之中。不需要一个结构体来跟踪每个操作完成了多少,我们只需要循环和从栈上分配的变量(stack variables)即可。

进一步讲,如果你是一个windows编程老手,你会发现如果像上面那样使用libevent,是不会获得最优性能的。在Windows上, 最快的异步IO方式不是使用类似于select()的接口: 相反,它使用了完成端口IOCP (IO Completion Ports) API。不像所有的网络API, 完成端口IOCP不会在socket操作就绪时提示你,此时,程序会通知Windows networking stack 去启动一个(该)网络操作,当该操作完成时,完成端口IOCP才会发出提示。

幸运的是,Libevent 2中的"bufferevents" 解决了这两类问题:它既可以使程序容易编写,也提供了一个在windows和unix上都高效的接口。

下面是最后一版ROT13服务器, 使用bufferevents API.

Example: A simpler ROT13 server with Libevent
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
    /* We don't want to use isalpha here; setting the locale would change
     * which characters are considered alphabetical. */
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void
readcb(struct bufferevent *bev, void *ctx)
{
    struct evbuffer *input, *output;
    char *line;
    size_t n;
    int i;
    input = bufferevent_get_input(bev);
    output = bufferevent_get_output(bev);

    while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
        for (i = 0; i < n; ++i)
            line[i] = rot13_char(line[i]);
        evbuffer_add(output, line, n);
        evbuffer_add(output, "\n", 1);
        free(line);
    }

    if (evbuffer_get_length(input) >= MAX_LINE) {
        /* Too long; just process what there is and go on so that the buffer
         * doesn't grow infinitely long. */
        char buf[1024];
        while (evbuffer_get_length(input)) {
            int n = evbuffer_remove(input, buf, sizeof(buf));
            for (i = 0; i < n; ++i)
                buf[i] = rot13_char(buf[i]);
            evbuffer_add(output, buf, n);
        }
        evbuffer_add(output, "\n", 1);
    }
}

void
errorcb(struct bufferevent *bev, short error, void *ctx)
{
    if (error & BEV_EVENT_EOF) {
        /* connection has been closed, do any clean up here */
        /* ... */
    } else if (error & BEV_EVENT_ERROR) {
        /* check errno to see what error occurred */
        /* ... */
    } else if (error & BEV_EVENT_TIMEOUT) {
        /* must be a timeout event handle, handle it */
        /* ... */
    }
    bufferevent_free(bev);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = arg;
    struct sockaddr_storage ss;
    socklen_t slen = sizeof(ss);
    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    if (fd < 0) {
        perror("accept");
    } else if (fd > FD_SETSIZE) {
        close(fd);
    } else {
        struct bufferevent *bev;
        evutil_make_socket_nonblocking(fd);
        bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
        bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
        bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
        bufferevent_enable(bev, EV_READ|EV_WRITE);
    }
}

void
run(void)
{
    evutil_socket_t listener;
    struct sockaddr_in sin;
    struct event_base *base;
    struct event *listener_event;

    base = event_base_new();
    if (!base)
        return; /*XXXerr*/

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);
    evutil_make_socket_nonblocking(listener);

#ifndef WIN32
    {
        int one = 1;
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    }
#endif

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) {
        perror("listen");
        return;
    }

    listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    /*XXX check it */
    event_add(listener_event, NULL);

    event_base_dispatch(base);
}

int
main(int c, char **v)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();
    return 0;
}

真的,性能可以好到什么地步?

 

 

 

XXXX write an efficiency section here. The benchmarks on the libevent page are really out of date.


1. 文件描述符就是当你打开一个socket时,内核分配的序号。你可以使用该序号在该socket上执行unix调用。 
2. 在用户空间,产生和读取字节数组所花的时间与传入select函数的描述符的个数成正比,但在内核空间,读取字节数组所花时间与该字节数组正最大的描述符成正比,而这个最大描述符数字又恰好约等于整个程序所用的描述符总数,而与select中描述符集内的描述符个数无关。
分享到:
评论

相关推荐

    一个简单的异步IO库

    本项目是一个基于C语言在Linux环境下实现的简单异步IO库,旨在提供对socket、文件读取、定时器以及文件状态变更监控的支持。 首先,我们来看一下异步I/O的基本概念。传统的同步I/O模式在执行I/O操作时会阻塞,直到...

    78程序员练级攻略(2018):异步IO模型和lock-Free编程1

    Libevent和Libuv是两个流行的事件库,它们提供了抽象层以简化跨平台的异步I/O编程。Libevent的《libevent 2.0 book》和《Libevent深入浅出》可以帮助理解其设计理念和使用方法,而Libuv的Design Overview提供了更...

    libevent 教程

    libevent 是一个高效的异步 IO 库,提供统一的 API,简化开发。下面是 libevent 教程的知识点总结: 1. libevent 简介 libevent 是一个跨平台的异步 IO 库,提供了统一的 API,支持多种操作系统和事件机制。它可以...

    2014PHP 异步并行编程_韩天峰(PHPCONCHINA2014)

    - **异步编程实践**:通过具体实例(如使用libevent扩展)展示了如何实现异步处理,并给出了具体的代码示例。 #### 结论 - **未来展望**:随着PHP语言的发展和社区的支持,异步编程将成为PHP开发者不可或缺的一项...

    Fast Portable non-blocking network programming with Libevent

    1. 异步IO的入门介绍:这部分简要介绍了什么是同步IO以及异步IO,并解释了为何异步IO对于高性能网络编程至关重要。同步IO会阻塞程序执行直到操作完成或超时,而异步IO则允许程序在I/O操作进行时继续执行其他任务。 ...

    libevent参考手册(中文).pdf

    Libevent 是用于编写高速可移植非阻塞 IO 应用的库,其设计目标是:  可移植性:使用 libevent 编写的程序应该可以在 libevent 支持的所有平台上工作。即使 没有好的方式进行非阻塞 IO,libevent 也应该支持一般的方式...

    libevent文档

    文档提供了一个关于异步IO的简单介绍,指出大多数初学者从阻塞IO调用开始编程。IO调用是同步的,当你调用它时,它会在操作完成前不返回,或者直到足够的时间过去使得你的网络堆栈放弃。例如,在TCP连接上调用...

    libevent-2.1.12-stable.tar.gz

    libevent是一个轻量级的库,用于解决网络编程中的异步事件问题。它将操作系统提供的各种事件模型(如POSIX select、epoll、kqueue等)进行了抽象,为程序员提供了一致的API,使得开发者可以轻松地在不同的操作系统...

    安装 libevent

    对于PHP开发者而言,使用`libevent`可以通过扩展来实现异步IO操作,从而提升应用程序的性能。本文将详细介绍如何在系统上安装`libevent`库及其PHP扩展。 #### 二、安装libevent **步骤1:下载libevent** 首先,...

    libevent官方文档英文版

    总之,Libevent官方文档是一份珍贵的资源,对于那些希望深入了解如何使用Libevent库来处理异步事件和网络编程任务的开发者来说,它提供了理论知识与实践指导。通过遵循文档中的示例和最佳实践,开发者可以构建出健壮...

    libevent_源码剖析

    Libevent的核心功能包括事件通知、IO缓存事件、定时器、信号处理、异步DNS解析和HTTP服务器及RPC框架。事件通知允许在文件描述符可读或可写时执行回调函数,IO缓存事件则提供了自动化的读写操作,简化了用户对I/O的...

    libevent中文参考手册

    ### libevent中文参考手册...总之,libevent作为一个强大的跨平台异步IO库,旨在简化网络编程的同时保持高性能和灵活性。无论是对于构建高性能Web服务器还是开发复杂的企业级应用,libevent都是一个值得考虑的选择。

    libevent.tar.gz

    1. **事件驱动编程**:理解libevent如何使用事件模型处理网络连接和事件,如IO就绪、定时器事件等。 2. **libevent API**:学习如何使用libevent的API创建事件基础架构,如event_base_new()、event_add()、event_del...

    libevent参考手册(中文版).rar

    Libevent是一个开源、跨平台的事件通知库,它允许程序员编写高性能、异步的网络服务。这个中文版的手册为中国的开发人员提供了一个更加便利的了解和学习Libevent的途径。 Libevent的核心功能在于它能够处理大量的...

    libevent:本书要求有一定的服务并发编程基础,了解select和epoll等多路IO复用机制

    在深入探讨libevent之前,让我们首先理解什么是服务并发编程以及IO复用机制。服务并发编程是构建高性能、高并发服务器的关键技术,它允许一个程序同时处理多个客户端请求,提高系统资源的利用率和响应速度。在Linux...

    后端网络框架学习必备:libevent源码剖析

    通过剖析libevent的源码,不仅可以帮助我们掌握网络编程的核心技术,也能提升我们的编程技巧和解决问题的能力。对于那些渴望在后端开发领域有所建树的人来说,《后端网络框架学习必备:libevent源码剖析》无疑是一本...

    crawler:C语言实现网络爬虫

    crawler 计算机应用编程实验2 实验涉及内容 Linux Socket编程 AC自动机提取页面url BloomFilter url去重 Libevent异步IO库并发抓取 PageRank 计算权重最高的十个页面

    libevent.pdf

    它封装了 IO 多路复用技术,如 epoll、poll、select 和 kqueue,提供了事件驱动的编程模型,有助于简化异步网络编程,提高开发效率。Libevent 的核心功能是通过事件基(event_base)来管理事件,并提供了对不同后端...

    计算机网络高级软件编程技术随书光盘内容

    4. **异步IO与事件驱动编程**:介绍非阻塞IO、I/O复用(如select、poll、epoll)和事件驱动编程(如事件循环和回调函数),这些技术能有效提升服务器处理大量并发请求的能力。 5. **HTTP协议详解**:作为最广泛使用...

Global site tag (gtag.js) - Google Analytics