`
jzhihui
  • 浏览: 268430 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

RabbitMQ源码分析 – 网络层

阅读更多

(注:分析代码基于RabbitMQ 2.8.2

 

网络层的启动也是作为上一篇文章中提到的一个启动步骤来启动的,入口为[$RABBIT_SRC/src/rabbit_networking.erl --> boot/0],代码如下:

boot() ->
    ok = start(),
    ok = boot_tcp(),
    ok = boot_ssl().

[$RABBIT_SRC/src/rabbit_networking.erl --> start/0]构建rabbit_client_sup子监控树,[$RABBIT_SRC/src/rabbit_networking.erl --> boot_tcp/0],启动TCP监听,并构建成tcp_listener_sup子监控树,这两个子树都属于rabbit最顶层监控结点rabbit_supSSL的类似)。最终的监控树如下:

 

(注:方框代表supervisor类型,圆角代表worker;虚线灰底的文字代表相应层子进程重启策略;各结点以各自所在的模块命名,实际监控树中的结点名有部分不同。)

[$RABBIT_SRC/src/tcp_listener.erl --> init/1]通过gen_tcp:listen/2启动监听,并根据ConcurrentAcceptorCount参数启动指定数量个tcp_acceptor(当前这个数量是1)。

     [$RABBIT_SRC/src/tcp_acceptor.erl]通过prim_inet:async_accept/2开始异步接受来自客户端的连接,主要代码如下:

handle_cast(accept, State) ->
    ok = file_handle_cache:obtain(),
    accept(State);
handle_info({inet_async, LSock, Ref, {ok, Sock}},
            State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
    %% patch up the socket so it looks like one we got from
    %% gen_tcp:accept/1
    {ok, Mod} = inet_db:lookup_socket(LSock),
    inet_db:register_socket(Sock, Mod),

    %% handle
    file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
    ok = file_handle_cache:obtain(),
    %% accept more
accept(State);

accept(State = #state{sock=LSock}) ->
    case prim_inet:async_accept(LSock, -1) of
        {ok, Ref} -> {noreply, State#state{ref=Ref}};
        Error     -> {stop, {cannot_accept, Error}, State}
end.

[$RABBIT_SRC/src/ file_handle_cache.erl]在这里的主要作用是管理每个进程拥有的文件描述符数量,确保每个进程不会超过设定的上限。其中,obtain/0方法增加拥有的文件描述符数量,transfer/1将文件描述符的拥有权转移到另外一个进程(原进程拥有的打开描述符数量减1,转移目标进程拥有的文件描述符数量加1,这里的目标进程是通过callback返回的rabbit_reader进程)。

 

state结构中的callback是由[$RABBIT_SRC/src/rabbit_networking.erl]在启动TCP监听端口时指定的,其值为[$RABBIT_SRC/src/rabbit_networking.erl --> start_client/1],每个客户端连接上来后,会通过apply方法调用(见上面代码中的apply(M, F, A)),实际调用[$RABBIT_SRC/src/rabbit_networking.erl --> start_client/2],主要代码如下:

start_client(Sock, SockTransform) ->
    {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
    ok = rabbit_net:controlling_process(Sock, Reader),
    Reader ! {go, Sock, SockTransform},
    Reader.

(这里rabbit_tcp_client_sup对应上图中rabbit_client_sup结点)

 

supervisor:start_child(rabbit_tcp_client_sup, [])会按照上图中rabbit_client_sup下的监控树结构依次构造各个子结点。也就是说在每一次服务端接受一个新的连接时,都会创建一个rabbit_client_sup的子进程(simple_one_for_one),并依次构建各个子结点(上图中黄色区域)。并最终返回创建的rabbit_reader

 

rabbit_net:controlling_process/2将上一步返回的rabbit_reader进程作为接收socket消息的进程(调用gen_tcp:controlling_process/2实现,原始接受消息的是tcp_acceptor进程)。

 

Reader ! {go, Sock, SockTransform}向目标rabbit_reader进程发送消息,指示开始接受从客户端发送来的数据。

 

rabbit_reader进程在创建后,会通过如下代码等待上述的go消息:

receive
        {go, Sock, SockTransform} ->
            start_connection(
              Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
              SockTransform)
end

(参见[$RABBIT_SRC/src/rabbit_reader.erl --> init/4]

 

收到go消息后,rabbit_reader开始与客户端进行消息交互。(参见[$RABBIT_SRC/src/rabbit_reader.erl --> start_connection /7])主要代码如下:

start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
                 Sock, SockTransform) ->
    process_flag(trap_exit, true),
    ConnStr = name(Sock),
    log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
    ClientSock = socket_op(Sock, SockTransform),
    erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
                      handshake_timeout),
    State = #v1{parent = Parent,…}, // 初始化state,部分代码省略
    try
        recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
                                       State, #v1.stats_timer),
                                      handshake, 8)),
        log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
    catch
        // 异常处理
    after
        rabbit_net:maybe_fast_close(ClientSock),
        rabbit_event:notify(connection_closed, [{pid, self()}])
    end,
done.

从上面的log语句可以看出,这时候与客户端的连接已经确定,要开始AMQP的握手过程。接收消息的主循环由recvloop/2来完成,其中第二个参数State保存当前连接的各种状态。recvloop按照AMQP协议对数据进行解析。

 

recvloop并没有使用任何一个OTP的行为模式,而是自己设计的一种状态转换的逻辑,该逻辑的实现依赖于AMQP协议本身,其中关键的数据是State里的callback以及recv_lenAMQP协议要求,客户端建立一个连接时,必需向服务端发送8个字节的协议头(包含AMQP4个字符以及客户端使用的AMQP版本),所以rabbit在一开始的时候,设定的recv_len=8,也就是说rabbit希望从客户端收到8个字节的数据,然后对这8个字节的处理对应handshake处理逻辑(见上面代码的recvloop/2调用)。handshake完成后,rabbit会根据AMQP协议定义的frame格式等待客户端发送来的数据:先是7个字符的frame头(recv_len=7, callback=frame_header),然后是长度声明在头中的负载数据(recv_len=PayloadSize+1callback=frame_payload,每个frame都有个结束符,所以这里负载长度要加1),收到负载数据后,会根据负载数据做对应处理,处理完成后,继续待续下一个frame。所以整个协议解析的过程就如下图所示:

rabbit会根据客户端发送的请求进行相应的操作,比如创建channel,创建exchange,创建queue等等。下篇会分析在创建每个实体时,都会做哪些工作。

  • 大小: 28.3 KB
  • 大小: 16.7 KB
分享到:
评论

相关推荐

    RabbitMQ源码和客户端工具

    分析源码还可以发现RabbitMQ如何优化内存管理、网络通信效率以及并发处理能力,这对于大型系统的性能调优至关重要。 总的来说,对RabbitMQ源码的深入研究能够帮助开发者更好地理解消息中间件的工作原理,提升在...

    springboot+rabbitmq源码

    在“springboot+rabbitmq源码”项目中,我们可以看到两个关键部分:mq_producer和mq_consumer,这分别代表了生产者和消费者。在消息队列系统中,生产者是发送消息的应用,而消费者则是接收和处理这些消息的应用。 ...

    RabbitMQ-c源码

    **RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...

    NET Core 使用RabbitMQ源码 LPNETCORERABBITMQ.rar

    NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...

    NET Core 使用RabbitMQ源码.rar

    .NET Core使用RabbitMQ是一个广泛应用于微服务架构中的消息队列技术,用于实现应用程序之间的异步通信和解耦。RabbitMQ是一个开源的消息代理,它遵循Advanced Message Queuing Protocol(AMQP)标准,允许不同语言的...

    NET Core 使用RabbitMQ源码 NETCORERABBITMQ.rar

    NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...

    Release_rabbitmq_C#源码_

    标题中的"Release_rabbitmq_C#源码_"表明这是一个关于RabbitMQ的C#源代码发布版本,可能包含了实现RabbitMQ在C#环境中使用的示例或者库。RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中,用于...

    Java rabbitMQ源码

    Java RabbitMQ 源码分析 RabbitMQ是一款开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统、微服务架构中,用于解耦生产者和消费者,实现异步处理和负载均衡。...

    RabbitMQ源码学习资料

    学习RabbitMQ源码能帮助我们理解其内部工作流程,如消息的发布、接收、存储和分发机制,以及服务器如何处理网络通信、并发和故障恢复。这对于优化性能、调试问题以及自定义扩展功能都非常有帮助。在分析mq-demo这个...

    rabbitmq c++ 封装源码

    Rabbitmq c++ 封装 全部源码 自己测试过几天几夜,性能很好。 包含内容:布消息 消费消息 读取消息 取消息队列属性 文件列表: rabbitmq cpp rabbitmq h ampq h ampq cpp 具体平台的dll请大家下载后自己编译,源码...

    RabbitMQ源码安装包

    3. **下载源码**:从RabbitMQ官方网站下载最新版本的源码包,这里提到的是`rabbitmq-server-3.1.5`。将下载的源码包解压到适当目录。 4. **构建与安装**:进入源码目录,执行配置、编译和安装命令: - `./...

    rabbitmq源码包(rabbitmq+erlang)

    在提供的压缩包中,包含了RabbitMQ的源码以及运行所需的Erlang环境。让我们深入探讨这两个关键组件以及相关配置文件。 1. **Erlang**: Erlang是一种并发、分布式、容错的编程语言,是RabbitMQ的基础。版本25.3提供...

    RabbitMQ的五种模式源码+纯手工打的代码

    本资源包含RabbitMQ的五种核心模式的源码,全部为手工编写,有助于深入理解和实践这些模式。 1. **简单模式(Simple)**: 在这个模式中,生产者发送消息到RabbitMQ,消费者从队列中接收并处理这些消息。这是一种...

    RabbitMQ消息中间件技术精讲(包含源码等).xlsx

    RabbitMQ消息中间件技术精讲(包含源码等) RabbitMQ消息中间件技术精讲(包含源码等) RabbitMQ消息中间件技术精讲(包含源码等)

    基于ARM64架构linux系统的RabbitMQ源码安装教程以及安装包.zip

    ARM64架构,银河麒麟linux系统,RabbitMQ的所有离线安装包

    spring data rabbitmq 源码

    RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,AMQP就是一个协议,是一个高级抽象层消息通信协议。 虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),...

    Jmeter--RabbitMQ(源码)

    4. **故障恢复测试**:模拟网络故障、服务器宕机等异常情况,验证RabbitMQ的恢复机制。 总之,JMeter-RabbitMQ源码提供了测试和优化RabbitMQ服务器性能的有效途径,不仅适合测试工程师进行性能评估,也为开发人员...

    rabbitmqDEMO.rar

    RabbitMQ是一款开源的消息队列系统,广泛应用于分布式系统中,用于解耦应用程序,实现异步处理和提高系统的可扩展性。在这个名为"rabbitmqDEMO.rar"的压缩包文件中,包含了两个主要的组件——"mblog-provider"和...

    rabbitmq c++ 2.0封装源码

    **RabbitMQ C++ 2.0 封装源码详解** RabbitMQ 是一个流行的开源消息代理,它基于 Advanced Message Queuing Protocol (AMQP) 实现,用于在分布式系统中高效地处理消息传递。本封装是针对 RabbitMQ 的 C++ 客户端库...

    C# rabbitmq源码实战+多种场景(生产者消费者,发布订阅等)

    C# rabbitmq项目实战源码,在网上找了大量的MQ资料用C#语言开发的各种场景示例,从路由及列队的配置,到场景代码的开发,使用场景基本上都是通过生产者与消费者,发布订阅模式的示例,程序使用WindowForm开发的重要...

Global site tag (gtag.js) - Google Analytics