`
jzhihui
  • 浏览: 268092 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

RabbitMQ源码分析 - 队列机制

阅读更多

 

(注:分析代码基于RabbitMQ 2.8.2)

当rabbit无法直接将消息投递到消费者时,需要暂时将消息入队列,以便重新投递。但是,将消息入队列并不意味着,就是将消息扔在一个队列中就不管了,rabbit提供了一套状态转换的机制来管理消息。

 

在rabbit中,队列中的消息可能会处理以下四种状态:

alpha:消息内容以及消息在队列中的位置(消息索引)都保存在内存中;

beta:消息内容只在磁盘上,消息索引只在内存中;

gamma:消息内容只在磁盘上,消息索引在内存和磁盘上都存在;

delta:消息的内容和索引都在磁盘上。

注意:对于持久化消息,消息内容和消息索引都必须先保存在磁盘上,然后才处于上述状态中的一种。其中gamma很少被用到,只有持久化的消息才会出现这种状态。

 

rabbit提供这种分类的主要作用是满足不同的内存和CPU需求。alpha最耗内存,但很少消耗CPU;delta基本不消耗内存,但是要消耗更多的CPU以及磁盘I/O操作(delta需要两次I/O操作,一次读索引,一次读消息内容;delta及gamma只需要一次I/O操作来读取消息内容)。

 

rabbit在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果内存中的消息数量大于这个数量,就会引起消息的状态转换,转换主要两种:alpha -> beta, beta -> delta。alpha -> beta的转换会将消息的内容写到磁盘(如果是持久化消息,在这一步转换后,消息将会处于gamma状态),beta -> delta的转换会更进一步减少内存消耗,将消息索引也写到磁盘。

 

运行时怎么体现消息的各种状态呢?在队列的状态vqstate(参见[$RABBIT_SRC/src/rabbit_variable_queue.erl])结构中,存在q1,q2,delta,q3,q4五个队列(使用q1~q4使用Erlang的queue模块,delta存储结构依赖于消息索引的实现),其中q1,q4只包含alpha状态的消息,q2,q3只包含beta和gamma状态的消息,delta包含delta状态的消息。

 

一般情况下消息会以p1->p2->delta->q3->q4的顺序进行状态变换:消息进入队列时,处于alpha状态并保存在内存中(q1或q4),然后某个时刻发现内存不足,被转换到beta状态(q2,q3)(这时候其实有两个转换q1->q2,q4->q3),如果还是内存不足,被转换到delta状态(delta)(q2->delta,q3->delta);当从队列中消费消息时,会先从处于alpha状态的内存队列(q4)中获取消息,如果q4为空,则从beta状态的队列(q3)中获取消息,如果q3也为空,则会从delta状态的消息队列中读取消息,并将之转移到q3。

 

上述步骤只是个一般步骤,实际运行时,中间的步骤都是可以跳过的,比如消息可能在一开始被放在q4队列中、q1队列中的元素会直接跳到q4队列中(这两种情况下,q3,delta,q2队列都为空);当delta队列为空时,q2队列可以直接跳到q3队列,q1队列也可以直接跳到q3。

 

上述这些状态转换主要发生的两个地方为:从队列中获取消息时([$RABBIT_SRC/src/rabbit_variable_queue.erl –> queue_out/1])以及减少内存使用量时([$RABBIT_SRC/src/rabbit_variable_queue.erl –> reduce_memery_use/1])。下面详细说明下这两个操作的逻辑。

$RABBIT_SRC/src/rabbit_variable_queue.erl -> queue_out/1
queue_out(State = #vqstate { q4 = Q4 }) ->
    case ?QUEUE:out(Q4) of
        {empty, _Q4} ->
            case fetch_from_q3(State) of
                {empty, _State1} = Result     -> Result;
                {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
            end;
        {{value, MsgStatus}, Q4a} ->
            {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
    end.
逻辑很简单,尝试从q4队列中获取一个消息,如果成功,则返回获取到的消息,如果失败,则尝试通过试用fetch_from_q3/1从q3队列获取消息,成功则返回,如果为空则返回空。
$RABBIT_SRC/src/rabbit_variable_queue.erl -> fetch_from_q3/1
fetch_from_q3(State = #vqstate { q1    = Q1,
                                 q2    = Q2,
                                 delta = #delta { count = DeltaCount },
                                 q3    = Q3,
                                 q4    = Q4 }) ->
    case ?QUEUE:out(Q3) of
        {empty, _Q3} ->
            {empty, State};
        {{value, MsgStatus}, Q3a} ->
            State1 = State #vqstate { q3 = Q3a },
            State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
                         {true, true} ->
                             true = ?QUEUE:is_empty(Q2), %% ASSERTION
                             true = ?QUEUE:is_empty(Q4), %% ASSERTION
                             State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; // A
                         {true, false} ->
                             maybe_deltas_to_betas(State1);	// B
                         {false, _} ->
                             State1
                     end,
            {loaded, {MsgStatus, State2}}
    end.
从case语句的第一个分支可以看到,当发现q3为空时,就直接返回了空。也就说当q3和q4为空时,整个队列为空(后续解释)。再看case的第二个分支,当q3非空,并且成功的从q3中取出一条消息后,需要检查取出消息后的队列状态。内嵌的case检查取出消息后q3的队列长度和当前delta队列的长度:1)当这两个队列都为空时,可以确认q2也为空(后续解释),也就是这时候,q2,q3,delta,q4都为空,那么,q1队列的消息可以直接转移到q4,下次获取消息时就可以直接从q4获取;2)q3空,delta非空,这时候就需要从delta队列(内容与索引都在磁盘上,通过maybe_deltas_to_betas/1调用)读取消息,并转移到q3队列;3)q3非空,直接返回,下次获取消息还可以从q3获取。
$RABBIT_SRC/src/rabbit_variable_queue.erl -> maybe_deltas_to_betas/1
maybe_deltas_to_betas(State = #vqstate {
                        q2                   = Q2,
                        delta                = Delta,
                        q3                   = Q3,
                        index_state          = IndexState,
                        pending_ack          = PA,
                        transient_threshold  = TransientThreshold }) ->
    #delta { start_seq_id = DeltaSeqId,
             count        = DeltaCount,
             end_seq_id   = DeltaSeqIdEnd } = Delta,
    DeltaSeqId1 =
        lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
                   DeltaSeqIdEnd]),
    {List, IndexState1} =
        rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
    {Q3a, IndexState2} =
        betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
    State1 = State #vqstate { index_state = IndexState2 },
    case ?QUEUE:len(Q3a) of
        0 ->
            maybe_deltas_to_betas(
              State1 #vqstate {
                delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
        Q3aLen ->
            Q3b = ?QUEUE:join(Q3, Q3a),
            case DeltaCount - Q3aLen of
                0 ->
                    State1 #vqstate { q2    = ?QUEUE:new(),
                                      delta = ?BLANK_DELTA,
                                      q3    = ?QUEUE:join(Q3b, Q2) };	// C
                N when N > 0 ->
                    Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
                                        count        = N,
                                        end_seq_id   = DeltaSeqIdEnd }),
                    State1 #vqstate { delta = Delta1,
                                      q3    = Q3b }
            end
    end.
在case语句之前,主要的逻辑就是从delta队列中读取消息,Q3a为读取到的消息(按索引分段读取,后面讲索引的时候会详细说明),直到读取的消息不为空为止(在执行这步时,可以确认delta中肯定有消息存在)。当读取到的消息个数与delta中保存的数量一样时(这时delta中所有消息都被读出),delta被重置(消息个数为0),q2中的消息被合并到q3中;不一样的话,就将读出的消息转移到q3。
$RABBIT_SRC/src/rabbit_variable_queue.erl -> reduce_memory_use/1
reduce_memory_use(State) ->
    {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
                                    fun push_betas_to_deltas/2,
                                    fun limit_ram_acks/2,
                                    State),
    State1
reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
                  State = #vqstate {
                    ram_ack_index    = RamAckIndex,
                    ram_msg_count    = RamMsgCount,
                    target_ram_count = TargetRamCount,
                    rates            = #rates { avg_ingress = AvgIngress,
                                                avg_egress  = AvgEgress },
                    ack_rates        = #rates { avg_ingress = AvgAckIngress,
                                                avg_egress  = AvgAckEgress }
                   }) ->

    {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
        case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
                        TargetRamCount) of
            0  -> {false, State};
            S1 -> {_, State2} =
                      lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
                                          ReduceFun(QuotaN, StateN)
                                  end,
                                  {S1, State},
                                  case (AvgAckIngress - AvgAckEgress) >
                                      (AvgIngress - AvgEgress) of
                                      true  -> [AckFun, AlphaBetaFun];
                                      false -> [AlphaBetaFun, AckFun]
                                  end),
                  {true, State2}
        end,

    case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
                    permitted_beta_count(State1)) of
        ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
        _                   -> {Reduce, State1}
    end
chunk_size(Current, Permitted)
  when Permitted =:= infinity orelse Permitted >= Current ->
    0;
chunk_size(Current, Permitted) ->
    lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
也就是说当内存中的消息数量(RamMsgCount)及内存中的等待ack的消息数量(RamAckIndex)的和大于允许的内存消息数量(TargetRamCount)时,多余数量的消息内容会被写到磁盘中,调用AckFun(limit_ram_acks/2)或者AlphaBetaFun(push_alphas_to_betas/2)来完成(保证每次的写操作个数不超过IO_BATCH_SIZE),先调用哪个由当前的消息速度来决定:如果当前ack的速度大于消息进入队列的速度,则调用AckFun,否则调用AlphaBetaFun。当beta状态的消息(q2,q3)多于允许的数量时,会调用BetaDeltaFun(push_betas_to_deltas/2)来将多余的消息索引写入磁盘,同时可以看到,只有这个多余的数量达到IO_BATCH_SIZE(值为64),才会引用写操作(文档对此的解释是:这个值不能太小也不能太大:太大,一次写操作会将整个队列阻塞更长的时间,太小,频繁的写操作,也会对q2、q3队列产生不利影响)。

$RABBIT_SRC/src/rabbit_variable_queue.erl -> limit_ram_acks /2
逻辑很简单,从pending_ack中拿到最大序号(seq_id,序号越大,等待的时间越短)的消息,并将消息内容写入磁盘,直到pending_ack中的消息数为0,或者要写入的消息数量已经完成。

如果处理完成正在等待ack的内存消息后,要写入磁盘的消息数量还没有完成(pending_ack中的消息数量已为0),则会调用push_alphas_to_betas/2。
$RABBIT_SRC/src/rabbit_variable_queue.erl -> push_alphas_to_betas/2
push_alphas_to_betas(Quota, State) ->
    {Quota1, State1} =
        push_alphas_to_betas(
          fun ?QUEUE:out/1,
          fun (MsgStatus, Q1a,
               State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
                  State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
              (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
                  State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
          end, Quota, State #vqstate.q1, State),
    {Quota2, State2} =
        push_alphas_to_betas(
          fun ?QUEUE:out_r/1,
          fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
                  State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
          end, Quota1, State1 #vqstate.q4, State1),
    {Quota2, State2}.
逻辑也很简单,q1向q2或者q3转移(取决于delta队列是否为空:如果为空,则向q3转移,否则向q2转移),q4向q3转移。push_alphas_to_betas/4的实现基本上就是把消息内容写到磁盘上,然后更新相关状态(消息的,队列的)。


现在来解释下前面的疑问:
q4和q3为空时,为什么整个队列为空:如果delta非空,则在q3变为空之前(取q3中最后一条消息时),B处代码最终引起delta中的消息转移到q3,并最终导致q3非空,与前提矛盾,所以delta肯定为空;同理,q2也必空,不然的话,通过B处的调用,最终将使q2的消息转移到q3(C),从而导致q3非空;如果q1非空,则A处的代码将使q1的消息转移到q4,从而导致q4非空,矛盾,所以q1为空。

q4为空,q3变空,delta已为空的情况下,为什么q2必空:首先可以肯定,在q3变空前,delta已为空。那么,在q3非空,delta变空的时刻,B处的代码调用通过C会将q2中的消息转移到q3,q2变为空(当然在q2变为空与q3变为空之间有一个时间段,这段时间内,唯一有机会使q2非空的操作是push_alphas_to_betas/2,但是因为delta队列为空,如果有转移,q1也只会转移到q3)。 

 

分享到:
评论

相关推荐

    官网的rabbitmq-server-3.10.0

    1. **消息队列原理**:RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,它提供了一种可靠的消息传递机制,确保消息在生产者和消费者之间正确传输,即使在高并发、网络不稳定或服务暂时不可用的情况下。...

    RabbitMQ-c源码

    **RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...

    rabbitmq-server-3.7.5.zip

    源码分析: 1. **源码结构**:解压后,你将看到一个典型的项目结构,包括src、erlang、plugins、docs等目录。`src`目录包含了RabbitMQ的核心源码,用Erlang语言编写;`erlang`包含了Erlang编译器需要的库和依赖;`...

    otp-win64-25.1.2 rabbitmq-server-3.11.2

    7. **最佳实践与性能调优**: 在实际应用中,理解如何设置合理的队列大小、消费速率、心跳间隔等参数,以及如何利用RabbitMQ的日志和监控数据进行性能分析和调优,是提升系统性能的关键。 8. **安全与权限控制**: ...

    RabbitMQ源码和客户端工具

    **RabbitMQ源码分析** RabbitMQ是一个开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步处理、任务队列以及服务间通信。源码分析有助于深入理解其内部...

    rabbitmq_delayed_message_exchange-3.8.0 延迟队列插件

    **正文** RabbitMQ延迟队列插件,即rabbitmq_delayed_message_exchange-3.8.0,是一个针对...在源码分析和软件开发过程中,深入理解这个插件的工作机制,有助于我们更好地利用RabbitMQ延迟队列来优化我们的应用架构。

    GitChat分享会-RabbitMQ典型场景实战-源码数据库

    通过分析源码和数据库的交互,我们可以更透彻地掌握RabbitMQ的核心机制和优化技巧。 RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的一款开源消息中间件,它提供了高可用、高可靠的消息传递服务。...

    RabbitMQ源码学习资料

    学习RabbitMQ源码能帮助我们理解其内部工作流程,如消息的发布、接收、存储和分发机制,以及服务器如何处理网络通信、并发和故障恢复。这对于优化性能、调试问题以及自定义扩展功能都非常有帮助。在分析mq-demo这个...

    Java rabbitMQ源码

    Java RabbitMQ 源码分析 RabbitMQ是一款开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统、微服务架构中,用于解耦生产者和消费者,实现异步处理和负载均衡。...

    Jmeter--RabbitMQ(源码)

    总之,JMeter-RabbitMQ源码提供了测试和优化RabbitMQ服务器性能的有效途径,不仅适合测试工程师进行性能评估,也为开发人员提供了深入理解消息队列工作原理和优化其性能的宝贵资源。通过源码学习和实践,我们可以更...

    museum-rabbitmq-demo.zip

    在“museum-rabbitmq-demo.zip”中,包含的源码可能涵盖了RabbitMQ的Java客户端库(如`com.rabbitmq.client`)的使用,展示了如何创建连接、声明交换器、绑定队列、发送和接收消息等操作。通过阅读和运行这些示例...

    RabbitMQ消息服务器 v3.8.12-源码.zip

    **RabbitMQ消息服务器 v3.8.12 源码分析** RabbitMQ是一款广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,为分布式系统提供了可靠的异步通信解决方案。源码学习是理解其内部机制、...

    基于RabbitMQ实现定时任务

    通过以上分析,我们可以看到,`rabbitmq-demo`源码示例涵盖了RabbitMQ实现定时任务的关键步骤,包括消息的创建、发送、延迟处理和消费。理解并实践这些步骤,可以帮助开发者在自己的项目中灵活地运用RabbitMQ进行...

    rabbitMQ 源代码实例

    通过分析源代码,你可以了解到如何在Java、Python、Ruby等语言中与RabbitMQ进行交互,如何创建和配置Exchanges、Queues、Bindings,以及如何处理各种异常情况。同时,这也有助于理解RabbitMQ内部的并发控制、网络...

    rabbitmq - erlang

    通过分析这些源码,我们可以深入了解RabbitMQ如何利用Erlang的特性来实现消息的发布、订阅、路由和存储等功能。同时,源码还可能包含配置文件、脚本和文档,帮助开发者了解和定制RabbitMQ的行为。 深入研究RabbitMQ...

    springboot整合RabbitMQ

    对于“源码”标签,我们可以深入研究 SpringBoot 的自动配置原理,以及 RabbitMQ 的客户端 API 实现。而“工具”可能指的是 IDE 插件、监控工具(如 RabbitMQ Management Console)或者日志分析工具等,这些工具能...

    6个消息队列项目源码.rar

    在这个"6个消息队列项目源码.rar"压缩包中,我们有六个不同的项目,它们都是基于C#语言实现的消息队列应用。以下是关于这些知识点的详细解释: 1. **消息队列原理**: - 消息生产者:创建并发送消息到队列。 - ...

    RabbitMQ客户端连接池的原理及源码

    在源码分析中,我们可以关注以下几个关键点: - 连接工厂类的创建方法,了解如何配置RabbitMQ连接参数。 - 连接池的初始化过程,包括如何设置最大连接数和超时策略。 - 获取和释放连接的逻辑,以及对象池策略的实现...

    javaIdeaWorkSpace.zip

    这些文件提供了一个全面的学习环境,涵盖了RabbitMQ的消费者和生产者端的实现,以及可能的Web应用集成,适合那些希望在Java项目中实施消息队列机制的开发者。通过分析和运行这些示例,用户可以深入了解RabbitMQ的...

    nova-compute源码分析

    ### nova-compute源码分析 #### 一、Nova概述及工作职责 **1.1 Nova的角色与任务** Nova是OpenStack项目中一个至关重要的组成部分,它主要负责虚拟机实例的生命周期管理,包括创建、调度、运行和销毁等功能。具体...

Global site tag (gtag.js) - Google Analytics