`
zb_86
  • 浏览: 43172 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rabbitmq 网络层启动代码分析

 
阅读更多

<!--?xml version="1.0" encoding="UTF-8" standalone="no"?-->

 

networking module

rabbit_networking.erl:

start() ->

    {ok,_} = supervisor2:start_child(

               rabbit_sup,

               {rabbit_tcp_client_sup,

                {rabbit_client_sup, start_link,

                 [{local, rabbit_tcp_client_sup},

                  {rabbit_connection_sup,start_link,[]}]},

                transient, infinity, supervisor, [rabbit_client_sup]}),

    ok.

 

====>

rabbit_sup->rabbit_client_sup->rabbit_connection_sup:

start_link() ->

    {ok, SupPid} = supervisor2:start_link(?MODULE, []),

    {ok, Collector} =

        supervisor2:start_child(

          SupPid,

          {collector, {rabbit_queue_collector, start_link, []},

           intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),

    {ok, ChannelSupSupPid} =

        supervisor2:start_child(

          SupPid,

          {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},

           intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),

    {ok, ReaderPid} =

        supervisor2:start_child(

          SupPid,

          {reader, {rabbit_reader, start_link,

                    [ChannelSupSupPid, Collector,

                     rabbit_heartbeat:start_heartbeat_fun(SupPid)]},

           intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),

    {ok, SupPid, ReaderPid}.


rabbit_channel_sup_sup.erl:
start_link() ->
    supervisor2:start_link(?MODULE, []).

init([]) ->
    {ok, {{simple_one_for_one_terminate, 0, 1},
          [{channel_sup, {rabbit_channel_sup, start_link, []},
            temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.

supervisor2.erl:
%% SupName = self
%% SupFlags = {simple_one_for_one_terminate, 0, 1}
%% Mod = rabbit_channel_sup
%% Args = []
     case init_state(SupName, SupFlags, Mod, Args) of
          {ok, State} when ?is_simple(State) ->  %% 这里判断的结果确实是simple
              init_dynamic(State, StartSpec);
          {ok, State} ->
              init_children(State, StartSpec);
          Error ->
              {stop, {supervisor_data, Error}}
      end;

==>  {ok, State} = 
{ok, #state{name = supname(SupName,Mod),
            strategy = Strategy,
            intensity = MaxIntensity,
            period = Period,
            module = Mod,
            args = Args}};

==> 
到这里为止,目前rabbit_channel_sup_sup并没有把rabbit_channel_sup启动起来
%% StartSpec: {channel_sup, {rabbit_channel_sup, start_link, []},
            temporary, infinity, supervisor, [rabbit_channel_sup]}
init_dynamic(State, [StartSpec]) ->
    case check_startspec([StartSpec]) of
        {ok, Children} ->
         {ok, State#state{children = Children}};
        Error ->
            {stop, {start_spec, Error}}
    end;


==> (由rabbit_reader.erl启动channel):
init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
    Deb = sys:debug_options([]),
    receive                                             %% 这里由rabbit_networking.erl主动发起 Reader ! {go, Sock, SockTransform} 
                                                            %% 以下boot_tcp成功后才通知Reader
        {go, Sock, SockTransform} ->
            start_connection(
              Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
              SockTransform)
    end.

==>start_connection:
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
                                       State, #v1.stats_timer),
                                      handshake, 8))

==>recvloop:
recvloop(Deb, handle_input(State#v1.callback, Data,
                               State#v1{buf = [Rest],
                                        buf_len = BufLen - RecvLen})).

==>handle_input:
switch_callback(handle_frame(Type, Channel, Payload, State),
                            frame_header, 7);

==>handle_frame:
true  -> send_to_new_channel(
                                   Channel, AnalyzedFrame, State);

==>send_to_new_channel:
rabbit_channel_sup_sup:start_channel(
          ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
                       VHost, Capabilities, Collector}),

由此,这里才开始start rabbit_channel_sup


所有(包括networking)都是rabbit.erl 的-rabbit_boot_step加载步骤加载起来

rabbit_networking.erl  ==> boot_tcp:
start_tcp_listener -> start_listener -> start_listener0:
{ok,_} = supervisor:start_child(
               rabbit_sup,
               {Name,
                {tcp_listener_sup, start_link,
                 [IPAddress, Port, [Family | tcp_opts()],
                  {?MODULE, tcp_listener_started, [Protocol]},
                  {?MODULE, tcp_listener_stopped, [Protocol]},
                  OnConnect, Label]},          %% OnConnect=rabbit_networking:start_client
                transient, infinity, supervisor, [tcp_listener_sup]}).
[IPAddress, Port, [Family | tcp_opts()],
=> tcp_listener_sup:start_link(Args).  
%% Args=
               [IPAddress, Port, [Family | tcp_opts()],
                  {?MODULE, tcp_listener_started, [Protocol]},
                  {?MODULE, tcp_listener_stopped, [Protocol]},
                  OnConnect, Label]

{ok, {{one_for_all, 10, 10},
          [{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,
                               [Name, AcceptCallback]},
            transient, infinity, supervisor, [tcp_acceptor_sup]},
           {tcp_listener, {tcp_listener, start_link,
                           [IPAddress, Port, SocketOpts,
                            ConcurrentAcceptorCount, Name,
                            OnStartup, OnShutdown, Label]},
            transient, 16#ffffffff, worker, [tcp_listener]}]}}.

=>tcp_acceptor_sup.erl:
{ok, {{simple_one_for_one, 10, 10},
          [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
            transient, brutal_kill, worker, [tcp_acceptor]}]}}.

这里tcp_acceptor_sup start_link后并不会start tcp_acceptor。看代码可以知道"simple_one_for_one"调用init_dynamic,只是把child信息记录,不会调用start_link

下一个tcp_listener start的时候
{ok, _APid} = supervisor:start_child(
                                                  AcceptorSup, [LSock])  %% AcceptorSup就是上面的tcp_acceptor_sup
在这里再次调用sup的start_child方法,看supervisor可以知道start_child会把这次和上次保存的child参数合并:
Child = hd(State#state.children),
    #child{mfargs = {M, F, A}} = Child,
    Args = A ++ EArgs,
    case do_start_child_i(M, F, Args) of
故最终调用tcp_acceptor:start_link方法,解释了为什么tcp_acceptor的start_link两个参数的原因。




rabbit_channel_sup.erl: 通过Reader启动之后
1,先对于Reader和Channel启动一个Writer(rabbit_writer.erl)
 [{writer, {rabbit_writer, start_link,
               [Sock, Channel, FrameMax, Protocol, ReaderPid]},

2, start channel:
{ok, ChannelPid} =
        supervisor2:start_child(
          SupPid,
          {channel, {rabbit_channel, start_link,
                     [Channel, ReaderPid, WriterPid, ReaderPid, Protocol,
                      User, VHost, Capabilities, Collector,
                      rabbit_limiter:make_token(LimiterPid)]},
           intrinsic, ?MAX_WAIT, worker, [rabbit_channel]})
分享到:
评论

相关推荐

    rabbitMQ代码案例 简单入门

    RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中的消息传递,尤其是在微服务架构中。...记得结合源代码分析和动手实践,理论与实践相结合,将有助于更好地理解和掌握RabbitMQ的精髓。

    使用websocket连接rabbitmqtt搭建IM聊天

    WebSocket 和 RabbitMQ 是两种在现代网络应用中广泛使用的通信技术。WebSocket 提供了全双工、低延迟的双向通信机制,常用于实时性要求较高的应用,如在线聊天、游戏等。RabbitMQ 是一个开源的消息队列系统,基于 ...

    AMQPServer:这是RabbitMQ服务器(AMQP)的python实现

    在AMQPServer项目中,`AMQPServer-main`可能是项目的主入口文件,包含了启动服务器、配置连接和通道、声明交换机和队列,以及处理消息的核心代码。开发者可以基于这个实现来构建自己的消息处理系统,实现发布/订阅...

    springCloud

    一 服务启动 此项目集成了:Feign,Spring Cloud Bus,hystrix,swagger-ui,zuul-filter,配置中心功能 1)安装rabbitMQ 2)启动cloud—eureka :此时可访问 localhost:8761 3)启动 cloud-config 此处为配置中心 ...

    wmq-go http protocol

    本文将深入解析WMQ-GO的设计原理、核心功能以及实现细节。 首先,我们来看WMQ-GO的核心组件。在提供的压缩包文件中,可以看到`message.go`、`mq.go`、`api.go`等关键文件,这些都是WMQ-GO实现功能的基础。 1. `...

    23-IDEA-p2p.zip 分布式项目p2p金融项目完整版代码

    分布式项目p2p金融系统是基于点对点网络技术构建的一种现代金融模式,它通过将借贷双方直接连接,消除了传统金融机构的中介角色,降低了交易成本。在这个项目中,我们将会探讨P2P金融系统的核心技术和实现细节。 一...

    工作笔记.docx

    5. Git错误处理:当Git提示"ssh: Could not resolve hostname github.com: Name or service not known."时,通常是DNS解析问题,可以通过检查网络设置或更新hosts文件来解决。 6. GCC编译错误:“未找到命令g cc”...

    40-Migrate Instance 操作详解 1

    nova-api 接收到请求后,会通过 Messaging 层(通常是 RabbitMQ)发送一条消息,告知需要迁移实例。在 OpenStack 的代码实现中,实际调用的是 `resize` 方法,而不是直接的 `migrate`。这是因为 Migrate 实际上是...

    网络购物中心项目java源码.zip

    《网络购物中心项目Java源码深度解析》 网络购物中心项目是一个典型的电子商务系统,其核心功能包括商品展示、购物车管理、订单处理、用户管理等。在这个项目中,Java语言被广泛运用,展示了其在构建大型复杂系统上...

    数据采集系统

    三、代码结构分析 代码结构通常会反映出系统的模块化设计,包括数据源接口、预处理类、采集器类、传输类、存储接口以及监控模块。每个模块都有其特定的职责,通过合理的类和方法设计实现松耦合。 四、调试与优化 在...

    中国移动系统

    2. 消息队列:在高并发场景下,消息队列如RabbitMQ或Kafka可用于解耦系统组件,提高系统响应速度和容错能力。 3. RESTful API:系统对外提供的接口遵循RESTful风格,使得不同服务之间可以轻松地进行数据交换。 4. ...

    SpringBoot开发理财产品系统.zip

    5. 具体源码分析:压缩包中的"springboot_ym"可能是项目源码,其中可能包含主启动类、配置文件、实体类、控制器类、服务类和持久层接口等。通过对这些源码的学习,可以深入理解SpringBoot如何实现实例化、依赖注入、...

    spring-amqp-1.2.2.RELEASE.zip

    9. **开源社区**: 开源项目意味着有活跃的社区支持,开发者可以获取源代码、参与讨论、提交问题或贡献代码。 10. **集成能力**: imixs-workflow通常与其他企业应用集成,如ERP、CRM系统,以实现业务流程自动化。 ...

    ceph知识树.pdf

    - TCP/IP模型包括应用层、传输层、网络层和链路层,是互联网的基础。 **路由和路由策略:** - **路由**:决定数据包如何在网络中传输。 - **路由策略**:通过配置来控制路由行为,实现流量优化和安全策略。 **VLAN...

    python入门到高级全栈工程师培训 第3期 附课件代码

    08 网络层和arp协议 09 传输层和应用层 第2章 01 上节课复习 02 arp协议复习 03 字符编码 第3章 01 网络基础和dos命令 02 为何学习linux 03 课程内容介绍 04 操作系统内核与系统调用 05 操作系统安装原理 06 ...

    性能测试岗位常见面试题.docx

    - 分析TPS上不去的原因,可能涉及服务器配置、网络延迟、代码优化等多方面。 - 选择测试环境与生产环境的配比,需考虑实际负载、成本和风险。 - 分析瓶颈时,采取自上而下、由浅入深的方法,确定问题根源。 - ...

    基于SpringBoot2.6.4的电商系统源码.zip

    2. **Web层**:使用Spring MVC处理HTTP请求,Controller层处理路由,视图解析器如Thymeleaf或Freemarker渲染页面。 3. **数据库交互**:Spring Data JPA提供了ORM(对象关系映射)功能,与数据库进行交互。配置...

    springboot+dubbo

    它集成了大量常用的第三方库配置,如 JDBC、MongoDB、JPA、RabbitMQ、Quartz 等,开发者可以“零配置”地快速启动项目。 - **特性**:自动配置、内嵌 Servlet 容器(如 Tomcat)、提供起步依赖、健康检查、运行时...

    课设毕设基于SpringBoot+Vue的广场舞团A LW+PPT+源码可运行.zip

    在“SPRINGBOOT-广场舞团 LW PPT.zip”中,SPRINGBOOT部分可能包含了SpringBoot项目的所有源代码,包括但不限于:主配置类、控制器、服务层、数据访问层等,这些代码可能使用Maven或Gradle进行构建,并依赖Spring ...

    50 Android Hacks - Hack2

    12. **App启动优化**:了解如何减少启动时间,如延迟初始化非必需组件,以及使用Splash Screen提供更好的启动体验。 13. **本地化与国际化**:学习如何为应用添加多语言支持,以及如何使用Android的资源系统进行...

Global site tag (gtag.js) - Google Analytics