- 浏览: 208547 次
- 性别:
- 来自: 重庆
-
文章分类
最新评论
[liexusong原创]
memcached的网络层处理大概如下:
(1) main函数创建一个侦听服务端的socket, 然后添加到libevent中, 然后当有客户端连接时就会触发libevent事件, 并且调用drive_machine()函数, 就像上面图一样.
(2) drive_machine()函数会根据conn的state字段来判断一个要进行什么操作. memcached中有7种的操作状态, 定义如下:
enum conn_states {
conn_listening,
conn_read,
conn_write,
conn_nread,
conn_swallow,
conn_closing,
conn_mwrite
};
drive_machine()函数大概如下:
void drive_machine(conn *c) {
int exit = 0;
int sfd, flags = 1;
socklen_t addrlen;
struct sockaddr addr;
conn *newc;
int res;
while (!exit) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
exit = 1;
break;
} else {
perror("accept()");
}
break;
}
//设置socket为非阻塞
if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
close(sfd);
break;
}
newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST);
if (!newc) {
if (settings.verbose > 0)
fprintf(stderr, "couldn't create new connection\n");
close(sfd);
break;
}
break;
case conn_read:
if (try_read_command(c)) {
continue;
}
if (try_read_network(c)) {
continue;
}
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
… … …
}
… … …
}
}
drive_machine()函数大概就是根据conn *c的state字段来判断要进行的操作. 每个客户端连接到memcached之后都会把他添加到libevent, 然后就触发事件调用这个函数. 所以说这个函数是最重要的.
[1]conn_listening状态:
这个状态主要是侦听客户端连接, 把他设置为非阻塞, 然后通过调用conn_new()函数把他放到libevent中. conn_new()函数大概如下:
conn *conn_new(int sfd, int init_state, int event_flags) {
conn *c;
if (freecurr > 0) {
//如果在freeconns中有数据, 直接去freeconns中取
c = freeconns[--freecurr];
} else {
if (!(c = (conn *)malloc(sizeof(conn)))) {
perror("malloc()");
return 0;
}
c->rbuf = c->wbuf = 0;
c->ilist = 0;
c->rbuf = (char *) malloc(DATA_BUFFER_SIZE);//2048
c->wbuf = (char *) malloc(DATA_BUFFER_SIZE);//2048
c->ilist = (item **) malloc(sizeof(item *)*200);//分配200个item指针
//分配内存失败?
if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0) {
if (c->rbuf != 0) free(c->rbuf);
if (c->wbuf != 0) free(c->wbuf);
if (c->ilist !=0) free(c->ilist);
free(c);
perror("malloc()");
return 0;
}
c->rsize = c->wsize = DATA_BUFFER_SIZE;//2048
c->isize = 200;
stats.conn_structs++;//状态中增加一个conn
}
if (settings.verbose > 1) {
if (init_state == conn_listening)
fprintf(stderr, "<%d server listening\n", sfd);
else
fprintf(stderr, "<%d new client connection\n", sfd);
}
c->sfd = sfd;//保存socket文件描述符
c->state = init_state;//初始化的处理状态
c->rlbytes = 0;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;//指向当前的缓存位置
c->rcurr = c->rbuf;
c->icurr = c->ilist;
c->ileft = 0;
c->iptr = c->ibuf;
c->ibytes = 0;
c->write_and_go = conn_read;//写完后进行读操作
c->write_and_free = 0;
c->item = 0;
c->is_corked = 0;
//设置event事件
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
free(c);
return 0;
}
stats.curr_conns++;
stats.total_conns++;
return c;
}
conn_new()函数主要是新建一个conn对象, 并且初始化他的属性.
[2]conn_read状态:
状态conn_read处理中有两个函数比较重要, 就是try_read_network() 和try_read_command(). try_read_network()是从网络端处读取数据, 当读到足够的数据时调用try_read_command()来处理数据, try_read_command()函数调用parse_command()函数来解析命令, 然后根据命令的类型来改变连接的状态.
try_read_network()函数如下:
int try_read_network(conn *c) {
int gotdata = 0;
int res;
while (1) {
if (c->rbytes >= c->rsize) {
char *new_rbuf = realloc(c->rbuf, c->rsize*2);
if (!new_rbuf) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't realloc input buffer\n");
c->rbytes = 0;
out_string(c, "SERVER_ERROR out of memory");
c->write_and_go = conn_closing;
return 1;
}
c->rbuf = new_rbuf; c->rsize *= 2;
}
res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
if (res > 0) {
stats.bytes_read += res;
gotdata = 1;
c->rbytes += res;
continue;
}
if (res == 0) {
c->state = conn_closing;
return 1;
}
if (res == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
else return 0;
}
}
return gotdata;
}
这个函数的主要操作是, 从客服端处读取数据, 直到没有数据可读为止[res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)]. 读取数据的同时改变conn的rbytes字段, 表示已经读到的数据大小. 当read()调用返回0时表示连接已经断开, 修改连接的状态为关闭, 下次操作会从libevent中去掉.
try_read_command()函数如下:
int try_read_command(conn *c) {
char *el, *cont;
if (!c->rbytes)
return 0;
el = memchr(c->rbuf, '\n', c->rbytes);
if (!el)
return 0;
cont = el + 1;
//略过\r\n
if (el - c->rbuf > 1 && *(el - 1) == '\r') {
el--;
}
*el = '\0';
process_command(c, c->rbuf);
if (cont - c->rbuf < c->rbytes) {
memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));
}
c->rbytes -= (cont - c->rbuf);
return 1;
}
try_read_command()函数主要是当从客户端读取的数据足够时才会处理的, 就是当读到有换行符\n时才会处理, 因为命令一般是以换行符结束的. 然后通过调用process_command()来解析命令. 解析完命令后, 把命令从缓存中去掉, 剩下剩余的数据, 操作如下:
if (cont - c->rbuf < c->rbytes) {
memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));
}
效果如下图:
其中绿色段是数据, 黄色段是命令, 处理完命令后, 会用数据段把命令段覆盖, 是通过memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));这条语句来处理的.
process_command()函数如下:
void process_command(conn *c, char *command) {
int comm = 0;
int incr = 0;
if (settings.verbose > 1)
fprintf(stderr, "<%d %s\n", c->sfd, command);
set_cork(c, 1);
if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) ||
(strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
(strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) {
char key[251];
int flags;
time_t expire;
int len, res;
item *it;
//<command name> <key> <flags> <exptime> <bytes>\r\n
//忽略command
res = sscanf(command, "%*s %0s %u %lu %d\n", key, &flags, &expire, &len);
if (res!=4 || strlen(key)==0 ) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
expire = realtime(expire);
it = item_alloc(key, flags, expire, len+2);//alloc new item
if (it == 0) {
out_string(c, "SERVER_ERROR out of memory");
c->write_and_go = conn_swallow;
c->sbytes = len+2;
return;
}
c->item_comm = comm;
c->item = it;
c->rcurr = ITEM_data(it);//point to item data area (指向item的数据域)
c->rlbytes = it->nbytes;
c->state = conn_nread;
return;
}
… …
}
process_command()函数主要是解析命令, 然后进行制定的操作. 如上代码, 就是处理add/set/replace命令的操作.最后完成add/set/replace操作后, 会把连接的状态设置为conn_nread. conn_nread状态的操作是从客户端中读取完整的数据. 然后完成add/set/replace的所有操作, 如把数据存储在item中, 并且连接这个item. 最后把conn的rcurr指针指向item的数据域. 这样做是为了下面的conn_nread操作时可以直接把数据读到item的数据域中.
[3]conn_nread状态:
conn_nread状态是要从客户端连接处读取没有读完的数据, 并且把数据放到在conn_read状态时的item的数据域中.
[4]conn_write状态:
说conn_write状态之前先要说说out_string()函数, 定义如下:
void out_string(conn *c, char *str) {
int len;
if (settings.verbose > 1)
fprintf(stderr, ">%d %s\n", c->sfd, str);
len = strlen(str);
if (len + 2 > c->wsize) {
str = "SERVER_ERROR output line too long";
len = strlen(str);
}
strcpy(c->wbuf, str);
strcat(c->wbuf, "\r\n");
c->wbytes = len + 2;
c->wcurr = c->wbuf;
c->state = conn_write;
c->write_and_go = conn_read;
return;
}
就是把str字符串写到conn的wbuf缓存中, 然后设置conn的状态为conn_write, 表示下次操作要进行conn_write相关的操作, 最后设置conn的write_and_go为conn_read, 表示写操作完毕以后要进行读操作.
而conn_write状态对应的操作就是: 把wbuf缓存的数据写到客户端连接处, 操作如下:
case conn_write:
if (c->wbytes == 0) {
if (c->write_and_free) {
free(c->write_and_free);
c->write_and_free = 0;
}
c->state = c->write_and_go;
if (c->state == conn_read)
set_cork(c, 0);
break;
}
res = write(c->sfd, c->wcurr, c->wbytes);
if (res > 0) {
stats.bytes_written += res;
c->wcurr += res;
c->wbytes -= res;
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
}
if (settings.verbose > 0)
fprintf(stderr, "Failed to write, and not due to blocking\n");
c->state = conn_closing;
break;
conn_write状态操作一般是调用out_string()函数设置的, 作用一般都是把一些消息发给客户端连接. 调用out_string()函数一般会立即执行conn_read状态对应的操作. 因为out_string()没有修改libevent的事件, 所以要立即执行conn_read状态对应的操作才行, 不然就不会触发相关的事件. 而执行conn_read状态操作时, 如果客户端不可写(write调用返回-1, 并且errno == EAGAIN || errno == EWOULDBLOCK), 那么就会修改libevent的事件为写(EV_WRITE) [ update_event(c, EV_WRITE | EV_PERSIST)], 之后就等libevent事件触发才再次执行conn_write状态对应的操作了.
[5]conn_mwrite状态:
conn_mwrite状态是当用户要执行get命令时设置的. get命令处理如下:
if (strncmp(command, "get ", 4) == 0) {
char *start = command + 4;
char key[251];
int next;
int i = 0;
item *it;
time_t now = time(0);
while(sscanf(start, " %0s%n", key, &next) >= 1) {
start+=next;
stats.get_cmds++;
it = assoc_find(key);
if (it && (it->it_flags & ITEM_DELETED)) {
it = 0;
}
if (settings.oldest_live && it &&
it->time <= settings.oldest_live) {
it = 0;
}
if (it && it->exptime && it->exptime < now) {
item_unlink(it);
it = 0;
}
if (it) {
stats.get_hits++;
it->refcount++;
item_update(it);
*(c->ilist + i) = it;
i++;
if (i > c->isize) {
c->isize *= 2;
c->ilist = realloc(c->ilist, sizeof(item *)*c->isize);
}
} else stats.get_misses++;
}
c->icurr = c->ilist;
c->ileft = i;
if (c->ileft) {
c->ipart = 0;
c->state = conn_mwrite;
c->ibytes = 0;
return;
} else {
out_string(c, "END");
return;
}
}
此操作主要是根据用户提供的key来查找相关的item, 并且把item方法conn的item列表中供conn_mwrite状态操作时使用.
conn_mrwrite操作时最复杂的, 代码如下:
case conn_mwrite:
if (c->ibytes > 0) {
res = write(c->sfd, c->iptr, c->ibytes);
if (res > 0) {
stats.bytes_written += res;
c->iptr += res;
c->ibytes -= res;
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
}
if (settings.verbose > 0)
fprintf(stderr, "Failed to write, and not due to blocking\n");
c->state = conn_closing;
break;
} else {
item *it;
switch (c->ipart) {
case 1:
it = *(c->icurr);
assert((it->it_flags & ITEM_SLABBED) == 0);
c->iptr = ITEM_data(it);
c->ibytes = it->nbytes;
c->ipart = 2;
break;
case 2:
it = *(c->icurr);
item_remove(it);
c->ileft--;
if (c->ileft <= 0) {
c->ipart = 3;
break;
} else {
c->icurr++;
}
case 0:
it = *(c->icurr);
assert((it->it_flags & ITEM_SLABBED) == 0);
sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
if (settings.verbose > 1)
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
c->iptr = c->ibuf;
c->ibytes = strlen(c->iptr);
c->ipart = 1;
break;
case 3:
out_string(c, "END");
break;
}
}
break;
这个操作主要是把get命令处获取到的item的数据发送到客户端连接.
主要难点是: conn_mwrite操作是遍历所有获取到的item, 然后把item的数据发送给客户端.
遍历的时候使用了一个switch来处理, 主要有4中情况, 如下:
(1) 0是发生key和数据的信息.
(2) 1是发生数据data.
(3) 2是减少item的refcount和把当前item指针向后移动一个, 并且继续执行(1)的操作.
(4) 3是完成操作.
每次ibuf有数据的时候都会进行写操作, 代码如下:
if (c->ibytes > 0) {
res = write(c->sfd, c->iptr, c->ibytes);
if (res > 0) {
stats.bytes_written += res;
c->iptr += res;
c->ibytes -= res;
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
}
if (settings.verbose > 0)
fprintf(stderr, "Failed to write, and not due to blocking\n");
c->state = conn_closing;
break;
}
当ibuf没有数据时, 才会遍历从get命令操作取得的item列表, 并取得数据复制到ibuf中.
[6] conn_swallow状态:
conn_swallow状态是当内存不足并且调用add/set/replace命令的时候才会出现, 此操作主要是读取客户端发送过来的数据, 并且忽略掉. 操作如下:
case conn_swallow:
if (c->sbytes == 0) {
c->state = conn_read;
break;
}
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
c->sbytes -= tocopy;
if (c->rbytes > tocopy) {
memmove(c->rbuf, c->rbuf+tocopy, c->rbytes - tocopy);
}
c->rbytes -= tocopy;
break;
}
res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
if (res > 0) {
stats.bytes_read += res;
c->sbytes -= res;
break;
}
if (res == 0) {
c->state = conn_closing;
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
}
if (settings.verbose > 0)
fprintf(stderr, "Failed to read, and not due to blocking\n");
c->state = conn_closing;
break;
至此memcached所有的网络层操作已经全部分析完.
相关推荐
1. **基于C/S架构**:Memcached采用客户端-服务器模型,客户端应用程序通过网络发送请求到Memcached服务器,服务器处理请求并返回结果。这种架构使得Memcached易于部署和扩展。 2. **简单的协议**:Memcached使用...
**Memcached服务器软件详解** Memcached是一款高性能、分布式内存对象缓存系统,广泛应用于Web应用中,用于减轻数据库负载,提高数据访问速度。它的工作原理是将数据存储在内存中,以便快速检索,从而实现高速的...
### Memcached操作指令详解 memcached提供了丰富的操作指令,用于数据的存储、检索和管理: - `get <key>`:获取指定键值对中的数据。例如,`get key`将返回键名为`key`的数据。 - `set <key> <flags> <exptime> ...
**Memcached 安装详解** Memcached 是一款高性能的分布式内存对象缓存系统,主要用于减少数据库负载,提高动态应用的访问速度。它由 Danga Interactive 开发,最初应用于提升 LiveJournal.com 的页面访问速度,现在...
**Memcached 分布式内存详解** Memcached 是一套由 danga.com 开发的高效、分布式的内存对象缓存系统,其主要目标是在高流量的动态应用程序中减轻数据库的负载,从而提升整体系统的性能。这套系统的工作原理是将...
8. **Memcached的适用场景**:适用于需要快速读取大量相同数据的Web应用,如社交网络、视频分享平台等,通过缓存减少对数据库的依赖,提高响应速度。 9. **扩展应用**:除了Memcached,还有其他类似的分布式缓存...
2. 网络层负载均衡:基于IP协议的负载均衡,例如通过修改DNS解析来实现。 3. 传输层负载均衡:基于TCP/UDP协议的负载均衡,例如四层负载均衡器。 4. 应用层负载均衡:考虑应用协议特性,如HTTP、HTTPS,可以做更细...
memcached 是一个高性能、分布式内存对象缓存系统,用于在应用层缓存数据,减轻数据库负载。 **描述详解:** "内存命令" 意味着 memcmd 主要用于处理与内存相关的操作,特别是针对 memcached 实例。它提供了一系列...
**MPPS: Memcached 协议代理服务器详解** MPPS(Memcached Protocol Proxy Server)是一种基于Java实现的Memcached协议代理服务器。这个代理服务器的主要功能是为客户端提供一个中间层,使得客户端可以通过标准的...
- **安全**:Memcached本身并不提供安全性保障,如认证和加密,通常需要在应用层或网络层解决。 - **扩展性**:通过集群部署,Memcached可以轻松扩展到多台服务器,以满足更大规模的缓存需求。 通过阅读1.4.20版本...
- **技术选型**:最初的Memcached原型是用Perl编写的,但由于Perl在性能上的不足(如内存浪费严重、无法高效处理大量网络连接),最终选择C语言重新开发。 - **单进程单线程模型**:为了最大化效率,Memcached采用了...
### 缓存技术详解 #### 一、缓存的基本概念及作用 缓存技术是一种用于提高数据访问速度和系统响应时间的技术。它通过在靠近应用的地方存储数据副本,减少对后端系统的请求次数,从而加快数据获取的速度。缓存在Web...
云计算技术架构是构建高效、可伸缩的网络服务的基础,主要分为四个层次:显示层、中间件层、基础设施层和管理层。以下是对这些层次及其关键技术的详细解释。 **显示层**: 显示层负责向用户提供直观易用的界面,...
【Web网站架构详解】 在构建Web网站的过程中,架构设计至关重要,因为它直接影响到系统的性能、可扩展性和维护性。本文将探讨从单体架构到微服务架构的演变过程,以及涉及的关键技术和工具。 1. **服务分离** ...
### 大型网站架构演变与知识体系详解 #### 架构演变的第一步:物理分离Web服务器与数据库 在网站初创阶段,通常是通过租用或托管单一服务器的方式搭建基础架构。随着时间推移,如果该网站获得了良好的反馈并积累了...
【知识点详解】 1. **负载均衡机制** - **基于DNS的负载均衡**:通过DNS服务器根据预先设定的策略,将客户端的请求分散到不同的服务器上。这种机制简单易实施,但响应时间受DNS解析影响,可能不一致,且安全性较低...
3. **网络层**:“GameServer”在此层,负责处理客户端的连接和通信,通常使用TCP/IP协议,通过心跳机制保持连接。 4. **负载均衡层**:根据服务器负载情况,将玩家请求分发到不同的服务实例,确保服务的高可用性。 ...
- **DR模式**:在同一个局域网内的服务器间直接通信,无需额外的网络层转换。 - **FULLNAT模式**:所有请求都将经过LVS进行源和目标IP的转换,适合跨网络的场景。 - **NAT模式**:只转换客户端到Real Server的连接的...
### LAMP架构详解 #### LAMP架构概述 LAMP架构是一种广泛应用于Web开发领域的开源解决方案组合,其名称来源于四个核心组件的首字母:Linux(操作系统)、Apache(Web服务器)、MySQL(数据库管理系统)以及PHP...
综上所述,构建一个能应对高并发高流量的Linux网站架构需要综合运用多种技术和策略,从网络层到应用层全方位优化,以确保服务的稳定、高效和可扩展。这种架构设计方法不仅适用于大型网站,也对其他面临类似挑战的...