`
thecloud
  • 浏览: 911504 次
文章分类
社区版块
存档分类
最新评论

Glusterfs之rpc模块源码分析(中)之Glusterfs的rpc模块实现(1)

 
阅读更多

我的新浪微博:http://weibo.com/freshairbrucewoo

欢迎大家相互交流,共同提高技术。


二、Glusterfsrpc模块实现

第一节、rpc服务器端实现原理及代码分析

1.rpc服务初始化

Rpc服务的初始化工作在函数rpcsvc_init中实现的,实现代码如下:

rpcsvc_t*rpcsvc_init(glusterfs_ctx_t*ctx,dict_t*options)

{

rpcsvc_t*svc=NULL;//所有rpc服务的全局状态描述对象

intret=-1,poolcount=0;

svc=GF_CALLOC(1,sizeof(*svc),gf_common_mt_rpcsvc_t);//分配内存资源

pthread_mutex_init(&svc->rpclock,NULL);//初始化锁

INIT_LIST_HEAD(&svc->authschemes);//初始化权限模式链表

INIT_LIST_HEAD(&svc->notify);//初始化通知回调函数链表

INIT_LIST_HEAD(&svc->listeners);//初始化监听链表

INIT_LIST_HEAD(&svc->programs);//初始化所有程序链表

ret=rpcsvc_init_options(svc,options);//初始化rpc服务的可选项信息

poolcount=RPCSVC_POOLCOUNT_MULT*svc->memfactor;//计算内存池大小

svc->rxpool=mem_pool_new(rpcsvc_request_t,poolcount);//分配内存池空间

ret=rpcsvc_auth_init(svc,options);//初始化权限信息

svc->options=options;//可选项保存

svc->ctx=ctx;//所属ctx

gluster_dump_prog.options=options;//让描述程序的对象保存选项信息

ret=rpcsvc_program_register(svc,&gluster_dump_prog);//注册rpc服务

returnsvc;//返回初始化后的所有rpc服务的全局描述对象

}

初始化的工作主要就是为描述一个所有rpc服务的全局对象设置一些初始化的值,这些信息一直保存到整个rpc服务结束,也就是glusterfs集群管理程序等主进程结束。

2.注册回调通知函数rpcsvc_register_notify

其实这个也是算初始化的一部分,只不过它只初始化一种功能,就是注册某些事件产生时的通知回调函数,通知的含义就是当这个事件发生了通知别的部分这里发生了事件,你应该根据这个事情做相应的处理,说简单一点就是我这里发生的事件可能对你也有影响,所以通知你一下以便你也好做相应的处理。主要代码如下:

wrapper=rpcsvc_notify_wrapper_alloc();//为通知函数的包装对象分配资源

svc->mydata=mydata;//设置属于哪一个xlator

wrapper->data=mydata;//通知函数的包装对象也保存属于哪一个xlator

wrapper->notify=notify;//保存通知的回调函数

pthread_mutex_lock(&svc->rpclock);//svc是全局对象,访问需要枷锁

{

list_add_tail(&wrapper->list,&svc->notify);//添加到svc的通知函数列表

svc->notify_count++;//通知函数个数加1

}

pthread_mutex_unlock(&svc->rpclock);//释放锁

3.创建监听器的函数rpcsvc_create_listeners

这个函数是重点中的重点,因为它的工作基本上把rpc服务建立起来了,就开始等待服务器的链接了,而且会加载底层的通信协议,主要包括rdma和tcp。所以这个函数做的工作会比较多,但是都很重要,下面详细分析这个过程,先看这个函数实现的主要代码:

rpcsvc_create_listener(svc,options,transport_name);

这个函数的实现大部分代码都是在解析底层的传输类型,然后针对每一个解析出来的类型分别创建监听程序,也就是上面那一句代码。继续看这个函数的实现,主要代码如下:

trans=rpcsvc_transport_create(svc,options,name);//根据传输名称创建传输描述对象

listener=rpcsvc_listener_alloc(svc,trans);//为监听对象分配资源

if(!listener&&trans){//如果分配监听资源失败并且传输链接已建立成功

rpc_transport_disconnect(trans);//断开传输链接

}

继续看传输对象的创建函数rpcsvc_transport_create,主要代码如下:

trans=rpc_transport_load(svc->ctx,options,name);//装载传输层的库并初始化

ret=rpc_transport_listen(trans);//建立传输层链接监听,执行装载库后的listen函数

ret=rpc_transport_register_notify(trans,rpcsvc_notify,svc);//注册通知函数

if((ret==-1)&&(trans)){

rpc_transport_disconnect(trans);//断开底层传输链接

trans=NULL;

}

上面调用的几个函数就rpc_transport_load还有点意思,不过rpc_transport_listen函数执行了装载后具体的协议(rdma和tcp)的listen函数来开始监听客户端的请求。先看看rpc_transport_load函数的主要实现代码:

trans=GF_CALLOC(1,sizeof(structrpc_transport),gf_common_mt_rpc_trans_t);

trans->name=gf_strdup(trans_name);//复制传输层的名字

trans->ctx=ctx;//传输所属的ctx

type=str;//传输类型

is_tcp=strcmp(type,"tcp");

is_unix=strcmp(type,"unix");

is_ibsdp=strcmp(type,"ib-sdp");

if((is_tcp==0)||(is_unix==0)||(is_ibsdp==0)){

if(is_unix==0)//如果是unix通信协议就设置unix协议族

ret=dict_set_str(options,"transport.address-family","unix");

if(is_ibsdp==0)//如果是ib-sdb通信协议就设置inet-sdp协议族

ret=dict_set_str(options,"transport.address-family","inet-sdp");

ret=dict_set_str(options,"transport-type","socket");//设置传输类型socket

}

}

ret=dict_get_str(options,"transport-type",&type);//设置传输类型选项

ret=gf_asprintf(&name,"%s/%s.so",RPC_TRANSPORTDIR,type);//格式化库路径字符串

handle=dlopen(name,RTLD_NOW|RTLD_GLOBAL);//打开库文件

trans->ops=dlsym(handle,"tops");//取ops函数组

trans->init=dlsym(handle,"init");//取初始化函数

trans->fini=dlsym(handle,"fini");//取析构函数

trans->reconfigure=dlsym(handle,"reconfigure");//取重新配置函数

vol_opt=GF_CALLOC(1,sizeof(volume_opt_list_t),gf_common_mt_volume_opt_list_t);

vol_opt->given_opt=dlsym(handle,"options");//取可选项数组

trans->options=options;

pthread_mutex_init(&trans->lock,NULL);//初始化锁

trans->xl=THIS;//设置属于哪一个xlator

ret=trans->init(trans);//执行初始化函数

由上面代码可以看出,先动态加载了库函数到对应的对象中去,最后执行了这个传输对象的初始化函数init,上面提到过注册函数也是执行这里加载的一个listen函数,不过它是在一个函数数组集合里面ops,所以就是使用trans->ops->listen()来调用。下面就分析一下rdma和tcp传输协议的这两个函数执行情况,先分析tcp的,因为tcp是比较熟悉的协议,tcp协议的init函数的主要代码如下:

socket_init(this);

这个函数的代码主要就是调用socket_init函数,而socket_init函数主要根据选项信息初始化一些socket对象描述的结构体,例如是否是非阻塞IO、链接超时等。继续看看tcp的listen函数(在tcp实现为socket_listen),主要代码如下:

//根据选项信息设置本地sockaddr结构信息(协议族、端口号等)

ret=socket_server_get_local_sockaddr(this,SA(&sockaddr),

&sockaddr_len,&sa_family);

pthread_mutex_lock(&priv->lock);//加锁

{

memcpy(&myinfo->sockaddr,&sockaddr,sockaddr_len);//复制sockaddr地址

myinfo->sockaddr_len=sockaddr_len;

priv->sock=socket(sa_family,SOCK_STREAM,0);//创建socket

//设置接收系统缓存区大小

if(setsockopt(priv->sock,SOL_SOCKET,SO_RCVBUF,

&priv->windowsize,sizeof(priv->windowsize))<0){

}

//设置发送数据系统缓冲区大小

if(setsockopt(priv->sock,SOL_SOCKET,SO_SNDBUF,

&priv->windowsize,sizeof(priv->windowsize))<0){

}

if(priv->nodelay){

ret=__socket_nodelay(priv->sock);//设置无延迟

}

ret=__socket_server_bind(this);//设置地址重用并且绑定端口

if(priv->backlog)

ret=listen(priv->sock,priv->backlog);//设置并发接收数量

else

ret=listen(priv->sock,10);//默认10个并发连接

rpc_transport_ref(this);//引用加1

//注册读取网络事件

priv->idx=event_register(ctx->event_pool,priv->sock,

socket_server_event_handler,this,1,0);

}

pthread_mutex_unlock(&priv->lock);//解锁

经过listen函数真个监听服务就建立起来了,客户端就可以发起请求了,客户端的请求就由上面注册的函数socket_server_event_handler处理,这个函数接收连接并且建立新的socket通信,并注册另外一个函数进行数据传输的处理,基本上所有的网络事件模型都是这个流程。下面继续看rdma(rdm协议和工作原理相关内容就看附近)传输有什么不同的地方,还是从它的init函数开始分析,主要代码如下:

rdma_private_t*priv=NULL;//定义一个rdma的私有数据描述对象

priv=GF_CALLOC(1,sizeof(*priv),gf_common_mt_rdma_private_t);//分配内存

this->private=priv;//私有数据保存到传输描述对象中

priv->sock=-1;//sock初始化为-1

if(rdma_init(this)){//初始化infiniBand设备

}

这段代码分配还内存资源以后就调用rdma_init初始化一些infiniBand相关内容,这些内容都是调用相应库函数实现的,而且infiniBand需要特殊网络设备才能支持(也需要相关的设备驱动程序)。当这些内容都初始化好了以后,后面的过程就和上面的完全tcp传输过程完全一样,就是先注册监听端口等待客户端的链接,当链接建立以后就注册读写事件并在相应的注册函数中处理相应的事件响应。

继续回到rpcsvc_transport_create函数,当它执行了rpc_transport_load和rpc_transport_listen函数以后,基本基于配置协议的(tcp和rdma)的监听程序都已初始化完毕并且开始监听,不过还有最后一步就是注册传输对象的通知回调函数,在函数rpc_transport_register_notify中实现,注册的回调函数是rpcsvc_notify(就是把函数的地址保存到传输对象的notify成员中,把rpc状态描述对象保存到mydata中),下面看看注册的回调函数rpcsvc_notify的主要实现代码,如下:

switch(event){

caseRPC_TRANSPORT_ACCEPT://新的客户端链接请求已经被接收

new_trans=data;

ret=rpcsvc_accept(svc,trans,new_trans);//执行接收后的处理工作

break;

caseRPC_TRANSPORT_DISCONNECT://链接被断开

ret=rpcsvc_handle_disconnect(svc,trans);

break;

caseRPC_TRANSPORT_MSG_RECEIVED://消息已经接收

msg=data;

ret=rpcsvc_handle_rpc_call(svc,trans,msg);

break;

caseRPC_TRANSPORT_MSG_SENT://消息已经发送

break;

caseRPC_TRANSPORT_CLEANUP://清理链接

listener=rpcsvc_get_listener(svc,-1,trans->listener);//取得listener

rpcsvc_program_notify(listener,RPCSVC_EVENT_TRANSPORT_DESTROY,trans);

break;

}

主要的功能就是根据事件类型做相应的处理工作,相应的事件处理又有可能根据是哪一个监听对象调用监听对象以前保存的rpc状态对象里面注册的notify函数。

到此为止整个rpc服务基本上已经建立完成,开始提供rpc服务,不过具体提供哪些程序的服务还需要最后一步,就是在已经建立的rpc服务上注册服务程序(就是提供客户端调用的程序集,每一个程序里面提供多个函数),不过这些程序的注册是在具体启动这个rpc服务的地方进行注册的,下面以glusterd程序启动rpc服务为例,看注册了哪些程序,注册程序代码如下:

ret=glusterd_program_register(this,rpc,&glusterd1_mop_prog);

ret=glusterd_program_register(this,rpc,&gd_svc_cli_prog);

ret=glusterd_program_register(this,rpc,&gd_svc_mgmt_prog);

ret=glusterd_program_register(this,rpc,&gluster_pmap_prog);

ret=glusterd_program_register(this,rpc,&gluster_handshake_prog);

Glusterd进程注册了5个程序集,那么客户端就可以请求这5个程序集里面的函数调用。一个完整的rpc服务就这样完全建立了。

总结:可以看出整个rpc服务的建立过程还是比较复杂的,下面用一个完整的图来解析整个rpc的建立过程,图如下:



分享到:
评论

相关推荐

    gluster源码分析

    ### GlusterFS源码分析 #### 一、GlusterFS概览 GlusterFS是一种分布式文件系统,设计用于实现高性能和高扩展性。它能够支持大规模的数据存储需求,适用于多种应用场景,如云存储、大数据处理等。在进行深入的源码...

    Glusterfs 用户手册1

    1. **客户端**:GlusterFS客户端库(libglusterfs)允许应用程序透明地访问GlusterFS卷。客户端通过标准的文件系统接口(如POSIX)与GlusterFS交互,而无需知道底层的分布式特性。 2. **服务器/Brick节点**:每个...

    glusterfs原理.ppt

    GlusterFS的独特之处在于它并不依赖于传统的硬件存储设备或专用的存储网络,而是利用普通的TCP/IP或InfiniBand RDMA网络将分散在不同位置的存储资源聚合起来,形成一个统一的全局命名空间,使得用户可以透明地访问和...

    glusterfs:Gluster文件系统:在几分钟内建立您的分布式存储

    不要在运行“生产” glusterfs的机器上运行此程序,因为它会在每次运行中盲目杀死所有gluster进程。 如果要发送补丁程序,并且想要验证一个或几个特定测试,请通过运行以下命令来运行单个测试。 bash# /bin/bash ...

    一种基于RDMA多播机制的分布式持久性内存文件系统.pdf

    同时,MTFS还提供了一种轻量级的一致性保证机制,通过设计崩溃恢复机制、数据验证模块和重传方案,MTFS能够快速从故障中恢复,并通过错误检测和数据校正来实现文件系统的可靠性和数据一致性。 实验结果表明,相比于...

    软件开发应知应会.doc

    - 在 Struts 框架实现的 MVC 架构中,Action 类包含了执行方法(execute),负责调用模型的方法,并控制应用程序的流程。 **总结**:选项 A“Action”正确指出了 Struts 框架中负责执行逻辑并控制流程的类。 #### 十...

    一种基于RDMA多播机制的分布式持久性内存文件系统.docx

    6. **性能比较**:实验结果显示,MTFS相比于GlusterFS和NOVA在吞吐量和性能上有显著提升,尤其在Redis数据库工作负载下和多线程测试中表现出良好的可扩展性。 综上所述,MTFS是一种创新的分布式文件系统,它融合了...

    Linux的服务器集群

    - 共享存储:为了实现数据一致性,集群通常需要共享存储解决方案,如NFS(网络文件系统)、GlusterFS或Ceph。 - 网络配置:网络必须支持低延迟、高带宽和容错,可能需要特殊的网络拓扑如胖树或全互联。 - 安全措施...

    全球100款大数据工具汇总

    - 解决了 MapReduce 1.x 中的性能瓶颈问题。 - 支持多租户,增强了集群的灵活性。 #### 四、Mesos:开源集群管理软件 - **起源**:由加州大学伯克利分校 AMPLab 开发。 - **核心功能**: - 统一管理数据中心内的...

    架构解密从分布式到微服务(Leaderus著)

    然后简明扼要地介绍了进行系统架构所必需的网络基础,并详细介绍了分布式系统中的经典理论、设计套路及RPC通信,对内存、SOA架构、分布式存储、分布式计算等进行了深度解析,后详细介绍了全文检索与消息队列中间件,...

    homework_lesson07_NFS-

    【标题】:“homework_lesson07_NFS-”这个标题显然...通过完成这个“homework_lesson07_NFS-”,学生将全面了解NFS的工作机制,掌握配置、管理、优化和故障排除的技能,这对于在IT环境中实现高效的数据共享至关重要。

    分布式文件系统的历史与现状.

    NFS通过使用虚拟文件系统(VFS)机制,将客户端的文件访问请求转化为标准的远程过程调用(RPC),再由服务器端的本地文件系统完成实际的数据处理工作。 - **AFS**:由卡内基梅隆大学开发,主要应用于教育和研究...

Global site tag (gtag.js) - Google Analytics