`
zb_86
  • 浏览: 43304 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rabbitmq 网络层启动代码分析

 
阅读更多

<!--?xml version="1.0" encoding="UTF-8" standalone="no"?-->

 

networking module

rabbit_networking.erl:

start() ->

    {ok,_} = supervisor2:start_child(

               rabbit_sup,

               {rabbit_tcp_client_sup,

                {rabbit_client_sup, start_link,

                 [{local, rabbit_tcp_client_sup},

                  {rabbit_connection_sup,start_link,[]}]},

                transient, infinity, supervisor, [rabbit_client_sup]}),

    ok.

 

====>

rabbit_sup->rabbit_client_sup->rabbit_connection_sup:

start_link() ->

    {ok, SupPid} = supervisor2:start_link(?MODULE, []),

    {ok, Collector} =

        supervisor2:start_child(

          SupPid,

          {collector, {rabbit_queue_collector, start_link, []},

           intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),

    {ok, ChannelSupSupPid} =

        supervisor2:start_child(

          SupPid,

          {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},

           intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),

    {ok, ReaderPid} =

        supervisor2:start_child(

          SupPid,

          {reader, {rabbit_reader, start_link,

                    [ChannelSupSupPid, Collector,

                     rabbit_heartbeat:start_heartbeat_fun(SupPid)]},

           intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),

    {ok, SupPid, ReaderPid}.


rabbit_channel_sup_sup.erl:
start_link() ->
    supervisor2:start_link(?MODULE, []).

init([]) ->
    {ok, {{simple_one_for_one_terminate, 0, 1},
          [{channel_sup, {rabbit_channel_sup, start_link, []},
            temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.

supervisor2.erl:
%% SupName = self
%% SupFlags = {simple_one_for_one_terminate, 0, 1}
%% Mod = rabbit_channel_sup
%% Args = []
     case init_state(SupName, SupFlags, Mod, Args) of
          {ok, State} when ?is_simple(State) ->  %% 这里判断的结果确实是simple
              init_dynamic(State, StartSpec);
          {ok, State} ->
              init_children(State, StartSpec);
          Error ->
              {stop, {supervisor_data, Error}}
      end;

==>  {ok, State} = 
{ok, #state{name = supname(SupName,Mod),
            strategy = Strategy,
            intensity = MaxIntensity,
            period = Period,
            module = Mod,
            args = Args}};

==> 
到这里为止,目前rabbit_channel_sup_sup并没有把rabbit_channel_sup启动起来
%% StartSpec: {channel_sup, {rabbit_channel_sup, start_link, []},
            temporary, infinity, supervisor, [rabbit_channel_sup]}
init_dynamic(State, [StartSpec]) ->
    case check_startspec([StartSpec]) of
        {ok, Children} ->
         {ok, State#state{children = Children}};
        Error ->
            {stop, {start_spec, Error}}
    end;


==> (由rabbit_reader.erl启动channel):
init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
    Deb = sys:debug_options([]),
    receive                                             %% 这里由rabbit_networking.erl主动发起 Reader ! {go, Sock, SockTransform} 
                                                            %% 以下boot_tcp成功后才通知Reader
        {go, Sock, SockTransform} ->
            start_connection(
              Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
              SockTransform)
    end.

==>start_connection:
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
                                       State, #v1.stats_timer),
                                      handshake, 8))

==>recvloop:
recvloop(Deb, handle_input(State#v1.callback, Data,
                               State#v1{buf = [Rest],
                                        buf_len = BufLen - RecvLen})).

==>handle_input:
switch_callback(handle_frame(Type, Channel, Payload, State),
                            frame_header, 7);

==>handle_frame:
true  -> send_to_new_channel(
                                   Channel, AnalyzedFrame, State);

==>send_to_new_channel:
rabbit_channel_sup_sup:start_channel(
          ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
                       VHost, Capabilities, Collector}),

由此,这里才开始start rabbit_channel_sup


所有(包括networking)都是rabbit.erl 的-rabbit_boot_step加载步骤加载起来

rabbit_networking.erl  ==> boot_tcp:
start_tcp_listener -> start_listener -> start_listener0:
{ok,_} = supervisor:start_child(
               rabbit_sup,
               {Name,
                {tcp_listener_sup, start_link,
                 [IPAddress, Port, [Family | tcp_opts()],
                  {?MODULE, tcp_listener_started, [Protocol]},
                  {?MODULE, tcp_listener_stopped, [Protocol]},
                  OnConnect, Label]},          %% OnConnect=rabbit_networking:start_client
                transient, infinity, supervisor, [tcp_listener_sup]}).
[IPAddress, Port, [Family | tcp_opts()],
=> tcp_listener_sup:start_link(Args).  
%% Args=
               [IPAddress, Port, [Family | tcp_opts()],
                  {?MODULE, tcp_listener_started, [Protocol]},
                  {?MODULE, tcp_listener_stopped, [Protocol]},
                  OnConnect, Label]

{ok, {{one_for_all, 10, 10},
          [{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,
                               [Name, AcceptCallback]},
            transient, infinity, supervisor, [tcp_acceptor_sup]},
           {tcp_listener, {tcp_listener, start_link,
                           [IPAddress, Port, SocketOpts,
                            ConcurrentAcceptorCount, Name,
                            OnStartup, OnShutdown, Label]},
            transient, 16#ffffffff, worker, [tcp_listener]}]}}.

=>tcp_acceptor_sup.erl:
{ok, {{simple_one_for_one, 10, 10},
          [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
            transient, brutal_kill, worker, [tcp_acceptor]}]}}.

这里tcp_acceptor_sup start_link后并不会start tcp_acceptor。看代码可以知道"simple_one_for_one"调用init_dynamic,只是把child信息记录,不会调用start_link

下一个tcp_listener start的时候
{ok, _APid} = supervisor:start_child(
                                                  AcceptorSup, [LSock])  %% AcceptorSup就是上面的tcp_acceptor_sup
在这里再次调用sup的start_child方法,看supervisor可以知道start_child会把这次和上次保存的child参数合并:
Child = hd(State#state.children),
    #child{mfargs = {M, F, A}} = Child,
    Args = A ++ EArgs,
    case do_start_child_i(M, F, Args) of
故最终调用tcp_acceptor:start_link方法,解释了为什么tcp_acceptor的start_link两个参数的原因。




rabbit_channel_sup.erl: 通过Reader启动之后
1,先对于Reader和Channel启动一个Writer(rabbit_writer.erl)
 [{writer, {rabbit_writer, start_link,
               [Sock, Channel, FrameMax, Protocol, ReaderPid]},

2, start channel:
{ok, ChannelPid} =
        supervisor2:start_child(
          SupPid,
          {channel, {rabbit_channel, start_link,
                     [Channel, ReaderPid, WriterPid, ReaderPid, Protocol,
                      User, VHost, Capabilities, Collector,
                      rabbit_limiter:make_token(LimiterPid)]},
           intrinsic, ?MAX_WAIT, worker, [rabbit_channel]})
分享到:
评论

相关推荐

    rabbitMQ代码案例 简单入门

    RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中的消息传递,尤其是在微服务架构中。...记得结合源代码分析和动手实践,理论与实践相结合,将有助于更好地理解和掌握RabbitMQ的精髓。

    使用websocket连接rabbitmqtt搭建IM聊天

    WebSocket 和 RabbitMQ 是两种在现代网络应用中广泛使用的通信技术。WebSocket 提供了全双工、低延迟的双向通信机制,常用于实时性要求较高的应用,如在线聊天、游戏等。RabbitMQ 是一个开源的消息队列系统,基于 ...

    AMQPServer:这是RabbitMQ服务器(AMQP)的python实现

    在AMQPServer项目中,`AMQPServer-main`可能是项目的主入口文件,包含了启动服务器、配置连接和通道、声明交换机和队列,以及处理消息的核心代码。开发者可以基于这个实现来构建自己的消息处理系统,实现发布/订阅...

    springCloud

    一 服务启动 此项目集成了:Feign,Spring Cloud Bus,hystrix,swagger-ui,zuul-filter,配置中心功能 1)安装rabbitMQ 2)启动cloud—eureka :此时可访问 localhost:8761 3)启动 cloud-config 此处为配置中心 ...

    wmq-go http protocol

    本文将深入解析WMQ-GO的设计原理、核心功能以及实现细节。 首先,我们来看WMQ-GO的核心组件。在提供的压缩包文件中,可以看到`message.go`、`mq.go`、`api.go`等关键文件,这些都是WMQ-GO实现功能的基础。 1. `...

    23-IDEA-p2p.zip 分布式项目p2p金融项目完整版代码

    分布式项目p2p金融系统是基于点对点网络技术构建的一种现代金融模式,它通过将借贷双方直接连接,消除了传统金融机构的中介角色,降低了交易成本。在这个项目中,我们将会探讨P2P金融系统的核心技术和实现细节。 一...

    工作笔记.docx

    5. Git错误处理:当Git提示"ssh: Could not resolve hostname github.com: Name or service not known."时,通常是DNS解析问题,可以通过检查网络设置或更新hosts文件来解决。 6. GCC编译错误:“未找到命令g cc”...

    40-Migrate Instance 操作详解 1

    nova-api 接收到请求后,会通过 Messaging 层(通常是 RabbitMQ)发送一条消息,告知需要迁移实例。在 OpenStack 的代码实现中,实际调用的是 `resize` 方法,而不是直接的 `migrate`。这是因为 Migrate 实际上是...

    网络购物中心项目java源码.zip

    《网络购物中心项目Java源码深度解析》 网络购物中心项目是一个典型的电子商务系统,其核心功能包括商品展示、购物车管理、订单处理、用户管理等。在这个项目中,Java语言被广泛运用,展示了其在构建大型复杂系统上...

    数据采集系统

    三、代码结构分析 代码结构通常会反映出系统的模块化设计,包括数据源接口、预处理类、采集器类、传输类、存储接口以及监控模块。每个模块都有其特定的职责,通过合理的类和方法设计实现松耦合。 四、调试与优化 在...

    中国移动系统

    2. 消息队列:在高并发场景下,消息队列如RabbitMQ或Kafka可用于解耦系统组件,提高系统响应速度和容错能力。 3. RESTful API:系统对外提供的接口遵循RESTful风格,使得不同服务之间可以轻松地进行数据交换。 4. ...

    SpringBoot开发理财产品系统.zip

    5. 具体源码分析:压缩包中的"springboot_ym"可能是项目源码,其中可能包含主启动类、配置文件、实体类、控制器类、服务类和持久层接口等。通过对这些源码的学习,可以深入理解SpringBoot如何实现实例化、依赖注入、...

    spring-amqp-1.2.2.RELEASE.zip

    9. **开源社区**: 开源项目意味着有活跃的社区支持,开发者可以获取源代码、参与讨论、提交问题或贡献代码。 10. **集成能力**: imixs-workflow通常与其他企业应用集成,如ERP、CRM系统,以实现业务流程自动化。 ...

    ceph知识树.pdf

    - TCP/IP模型包括应用层、传输层、网络层和链路层,是互联网的基础。 **路由和路由策略:** - **路由**:决定数据包如何在网络中传输。 - **路由策略**:通过配置来控制路由行为,实现流量优化和安全策略。 **VLAN...

    基于ResNet与UserCF融合推荐的在线购物商城项目源码.zip

    为了进一步克服传统协同过滤技术在新用户或新商品上的冷启动问题,引入了基于深度学习的图像识别技术,通过应用 ResNet50 神经网络,系统能够分析用户的浏览图像,并从中检索出视觉上相似的商品。两者结合,从多维度...

    python入门到高级全栈工程师培训 第3期 附课件代码

    08 网络层和arp协议 09 传输层和应用层 第2章 01 上节课复习 02 arp协议复习 03 字符编码 第3章 01 网络基础和dos命令 02 为何学习linux 03 课程内容介绍 04 操作系统内核与系统调用 05 操作系统安装原理 06 ...

    性能测试岗位常见面试题.docx

    - 分析TPS上不去的原因,可能涉及服务器配置、网络延迟、代码优化等多方面。 - 选择测试环境与生产环境的配比,需考虑实际负载、成本和风险。 - 分析瓶颈时,采取自上而下、由浅入深的方法,确定问题根源。 - ...

    基于SpringBoot2.6.4的电商系统源码.zip

    2. **Web层**:使用Spring MVC处理HTTP请求,Controller层处理路由,视图解析器如Thymeleaf或Freemarker渲染页面。 3. **数据库交互**:Spring Data JPA提供了ORM(对象关系映射)功能,与数据库进行交互。配置...

    springboot+dubbo

    它集成了大量常用的第三方库配置,如 JDBC、MongoDB、JPA、RabbitMQ、Quartz 等,开发者可以“零配置”地快速启动项目。 - **特性**:自动配置、内嵌 Servlet 容器(如 Tomcat)、提供起步依赖、健康检查、运行时...

    课设毕设基于SpringBoot+Vue的广场舞团A LW+PPT+源码可运行.zip

    在“SPRINGBOOT-广场舞团 LW PPT.zip”中,SPRINGBOOT部分可能包含了SpringBoot项目的所有源代码,包括但不限于:主配置类、控制器、服务层、数据访问层等,这些代码可能使用Maven或Gradle进行构建,并依赖Spring ...

Global site tag (gtag.js) - Google Analytics