`
jzhihui
  • 浏览: 268139 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

RabbitMQ源码分析 – 持久化机制

阅读更多

(注:分析代码基于RabbitMQ 2.8.2) 

当消息需要持久化(相应队列首先必须是durable)或者因为内存吃紧,需要把消息转移到磁盘的时候就会触发持久化操作。Rabbit中两部分信息涉及到持久化操作:一个是消息本身,由msg_store模块负责([$RABBIT_SRC/src/rabbit_msg_store.erl]),另一个是消息在队列中的位置,由queue_index模块负责([$RABBIT_SRC/src/rabbit_queue_index.erl])。


消息持久化(msg_store)
msg_store采用的是一种Server-Client的模式,每个结点上有两个msg_store服务器:msg_store_persistent,msg_store_transient。msg_store_persistent负责带持久化属性的消息的持久化(重启后消息不会丢失);msg_store_transient负责不带持久化属性的消息的持久化(因为内存吃紧引起,重启后消息会丢失)。

每个队列在初始化时会通过rabbit_msg_store:client_init/4向各个msg_store服务器注册成为一个客户端(下文混用队列和客户端,都代表相应的队列),服务器会为每一个客户端创建一个client_msstate结构,保存msg_store服务器的各种缓存和文件信息,可以供客户端直接访问。

msg_store服务器以文件的形式保存需要持久化的消息,不同的服务器使用不同的子目录,文件名以数字命名,从0开始,每创建一个文件,文件名递增。每个文件有个文件大小(file_size_limit)限制,大于这个数字时,新的文件就会创建。为了方便的访问和写入数据到持久化文件,Rabbit使用了两个数据结构:
Index: MsgId到#msg_location结构的映射,说明消息在文件中的位置:
        {MsgId, RefCount, File, Offset, TotalSize}
        默认使用ets保存, 可扩展.
RefCount代表消息的引用计数;File代表消息保存的文件名(数字);Offset代表消息在文件中的偏移量;TotalSize代表消息的大小。
(注:这里的index代表一个消息在持久化文件中的位置,而消息索引(queue_index)代表消息在队列中位置)

FileSummary: File 到 #file_summary{}的映射,说明持久化文件的信息:
        {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
File代表持久化文件名称(数字);ValitTotalSize代表文件中的有效消息大小;Left代表该文件左右的文件名;Right代表该文件右边的文件(Left与Right的存在主要与文件的合并有关,前面提到文件以数字命名,并连续递增,但文件合并会引起文件名称不连续);FileSize代表文件当前大小;Locked代表当前文件是否被锁定,当有GC在操作该文件时,文件被锁定;Readers代表该文件当前的读者数量。

同时,为了加快访问和写入的速度,rabbit提供了两个数据结构:一个是cur_file_cache_ets,保存写入到当前文件中的消息,每个元素的结构为{MsgId, Msg, CacheRefCount},由客户端写入,其它客户端可访问。对于写入到当前文件中消息的访问都是从该缓存结构中读取的。每当新的持久化文件生成时,缓存中CacheRefCount为0的消息会被清除。
另一个是flying_ets,保存每个队列中正在等待执行的操作:读消息或者删除消息,每个元素的结构为{{MsgId, CRef}, Diff},其中CRef代表队列对应进程,Diff代表一个差值:+1代表所有的操作可以合并成一个写操作,-1代表所有的操作可以合并成一个删除操作,0代表什么也不用做。这个数据结构的主要作用是尽量减少对磁盘文件的写入和删除。考虑这样一个场景,一个队列需要持久化消息,在收到生产者的消息后,队列首先需要把消息写入磁盘,然后把消息投递到消费者,收到消费者的ack后,要从队列中删除该消息,假如ack回来时,消息的写入操作还未被执行,则这时候可以直接略过写入和读取操作。

下面对消息的写入、读取以及文件合并操作的主要流程进行分析。
消息写入
消息的写入由rabbit_msg_store:write/3或者rabbit_msg_store:write_flow/3来完成,这两个函数首先会调用rabbit_msg_store:clien_write/4:
client_write(MsgId, Msg, Flow,
             CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
                                        client_ref         = CRef }) ->
	%% 更新flying_ets表中对应{MsgId, CRef}的Diff,因为是写操作,所以加1
ok = client_update_flying(+1, MsgId, CState),
%% 更新cur_file_cache_ets表中对应MsgId的记录,CacheRefCount加1
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
%% 调用服务端write操作
ok = server_cast(CState, {write, CRef, MsgId, Flow}).
服务端write操作(异步操作)的处理如下:
handle_cast({write, CRef, MsgId, Flow},
            State = #msstate { cur_file_cache_ets = CurFileCacheEts,
                               clients            = Clients }) ->
case Flow of
	%% 流控相关处理
        flow   -> {CPid, _, _} = dict:fetch(CRef, Clients),
                  credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
        noflow -> ok
end,
%% cur_file_cache_ets对应MsgId记录,CacheRefCount减1,一般情况下,前面的client_write/4会对CachedRefCount加1,这里减1 后,CacheRefCount为0,在下面write_message/4操作完成后,如果要创建新的持久化文件,CacheRefCount为0的所有记录都会被清除(cur_file_cache_ets一般只保存当前文件里的消息)
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
%% 根据flying_ets表中Diff的值来确定是否要执行实际的写入操作,如果update_flying/4返回ignore,则表示不需要写入
    case update_flying(-1, MsgId, CRef, State) of
        process ->
            [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
            noreply(write_message(MsgId, Msg, CRef, State));
        ignore ->
            %% A 'remove' has already been issued and eliminated the
            %% 'write'.
            State1 = blind_confirm(CRef, gb_sets:singleton(MsgId),
                                   ignored, State),
			%% 如果要处理的消息不在当前文件,则清除cur_file_cache_ets中关于该消息的记录
            case index_lookup(MsgId, State1) of
                [#msg_location { file = File }]
                  when File == State1 #msstate.current_file ->
                    ok;
                _ ->
                    true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0})
            end,
            noreply(State1)
    end.
 write_messag/4最终会通过write_message/3来完成实际的消息写入,代码如下:
write_message(MsgId, Msg,
              State = #msstate { current_file_handle = CurHdl,
                                 current_file        = CurFile,
                                 sum_valid_data      = SumValid,
                                 sum_file_size       = SumFileSize,
                                 file_summary_ets    = FileSummaryEts }) ->
	%% 定位到当前文件的最后
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
%% 将消息内容添加到当前文件后面
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
%% 更新索引信息 
    ok = index_insert(
           #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
                           offset = CurOffset, total_size = TotalSize }, State),
    [#file_summary { right = undefined, locked = false }] =
        ets:lookup(FileSummaryEts, CurFile),
	%% 更新当前文件大小的统计信息
    [_,_] = ets:update_counter(FileSummaryEts, CurFile,
                               [{#file_summary.valid_total_size, TotalSize},
                                {#file_summary.file_size,        TotalSize}]),
	%% 更新所有文件的统计信息,并判断是否需要创建一个新的持久化文件
    maybe_roll_to_new_file(CurOffset + TotalSize,
                           State #msstate {
                             sum_valid_data = SumValid    + TotalSize,
                             sum_file_size  = SumFileSize + TotalSize }).
从上面的逻辑来看,实际的写入操作其实很简单,就是先写入消息内容到当前文件,再判断当前文件的大小,如果需要,则创建一个新的持久化文件。

消息读取
消息的读取通过rabbit_msg_store:read/2来完成,具体代码如下:
read(MsgId,
     CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
    %% 从当前文件缓存表中查找,如果找到则返回
    case ets:lookup(CurFileCacheEts, MsgId) of
        [] ->
            Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
			%% 从Index中查找,如果找到一个引用数大于0的消息,则调用client_read1/3读取实际的消息;否则认为消息没找到,通过上面的Defer函数,调用服务端read操作(消息引用数为0时,消息有可能会处于GC处理状态)
            case index_lookup_positive_ref_count(MsgId, CState) of
                not_found   -> Defer();
                MsgLocation -> client_read1(MsgLocation, Defer, CState)
            end;
        [{MsgId, Msg, _CacheRefCount}] ->
            {{ok, Msg}, CState}
end.
client_read1/3经过调用链client_read2/5 -> client_read3/3最终通过read_from_disk/2从磁盘实际读取消息。调用链中的逻辑主要是一些状态处理(GC相关)。read_from_disk/2的读取逻辑就是从找到的#msg_location结构提取消息所在的文件(File)、位移(Offset)和大小(TotalSize),并按上述信息读取消息。这里的client_read3和通过服务端read操作读取消息的实现类似,都是要从磁盘读取消息,只不过有些情况client_read3/3无法处理(比如消息所在的文件被GC进程锁定)时,就会通过服务端的read操作为完成(服务端read操作为同步操作)。

消息删除
消息的删除由rabbit_msg_store:remove/2来完成:
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
	%% 删除操作,fly_ets表中的Diff减1
    [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
    server_cast(CState, {remove, CRef, MsgIds}).
服务端remove操作:
handle_cast({remove, CRef, MsgIds}, State) ->
    {RemovedMsgIds, State1} =
        lists:foldl(
          fun (MsgId, {Removed, State2}) ->
				  %% 根据flying_ets中的Diff值决定是否需要执行删除操作
                  case update_flying(+1, MsgId, CRef, State2) of
                      process -> {[MsgId | Removed],
                                  remove_message(MsgId, CRef, State2)};
                      ignore  -> {Removed, State2}
                  end
          end, {[], State}, MsgIds),
	%% maybe_compact检查是否需要执行文件合并
    noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds),
                                         ignored, State1)));
remove_message/3的主要逻辑就是更新对应消息的引用计数,更新要删除消息所在文件的统计数据,如果消息所在文件已经没有有效数据,则删除该文件。

文件合并
当所有文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操作(至少有三个文件存在的情况下),以提高磁盘利用率。合并操作发生时,首先要做的就是要找到两个可合并的文件。代码参见rabbit_msg_store:find_files_to_combine/3。
rabbit_msg_store:find_files_to_combine/3会从第一个文件开始,找到符合下面条件的两个文件A=Dst, B=Src:
1) A.right=B;// 两个文件必须相连,但并不意味着文件名连续,且B必须在A的右边
2) B.right != undefined;// B的右边必须还有文件存在,也就是必须有三个文件存在
3) B.valid_total_size + A.valid_total_size <= file_size_limit;// A、B的有效数据之和必须小于一
// 个文件的最大容量,也就是说两// 个文件可以合并成一个文件
4) B.valid_total_size > 0 && A.valit_total_size > 0;// 两个文件中都有有效数据
5) B.locked = false && A.locked=false;// 两个文件都没有处于锁定状态(没有进行GC);

找到符合条件的文件A、B后,锁定两个文件,并通过GC进程对两个文件中的数据进行合并。最终B中的有效数据会转移到A中,也就是说有效数据会逐渐向文件名称较小的文件移动。合并代码参见[$RABBIT_SRC/src/rabbit_msg_store.erl –> combine_files/3]。主要流程如下:
%% 计算两个文件中总的有效数据大小,后续合并后的文件会扩展到这个大小
TotalValidData = SourceValid + DestinationValid,
    {DestinationWorkList, DestinationValid} =
        load_and_vacuum_message_file(Destination, State), %% 从目标文件读取有效消息数据
    {DestinationContiguousTop, DestinationWorkListTail} =
        drop_contiguous_block_prefix(DestinationWorkList), %% 检查从目标文件开始的连续有效消息区间
case DestinationWorkListTail of
	%% 如果目标文件中的所有有效消息数据都是连续,只要把目标文件扩展到合并后的大小,并将写入位置定位到目标文件的最后
        [] -> ok = truncate_and_extend_file(
                     DestinationHdl, DestinationContiguousTop, TotalValidData);
		%% 如果目标文件中有空洞,则:1)将除了第一个连续区间内的消息以外的所有有效消息先读到一个临时文件;2)把目标文件扩展到合并后的大小;3)把临时文件中的有效数据拷贝到目标文件
        _  -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
              {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
              ok = copy_messages(
                     DestinationWorkListTail, DestinationContiguousTop,
                     DestinationValid, DestinationHdl, TmpHdl, Destination,
                     State),
              TmpSize = DestinationValid - DestinationContiguousTop,
              %% so now Tmp contains everything we need to salvage
              %% from Destination, and index_state has been updated to
              %% reflect the compaction of Destination so truncate
              %% Destination and copy from Tmp back to the end
              {ok, 0} = file_handle_cache:position(TmpHdl, 0),
              ok = truncate_and_extend_file(
                     DestinationHdl, DestinationContiguousTop, TotalValidData),
              {ok, TmpSize} =
                  file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
              %% position in DestinationHdl should now be DestinationValid
              ok = file_handle_cache:sync(DestinationHdl),
              ok = file_handle_cache:delete(TmpHdl)
end,

%% 从源文件中加载所有有效消息数据
{SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State),
%% 将源文件中的所有有效数据拷贝到目标文件
    ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData,
                       SourceHdl, DestinationHdl, Destination, State).
 
索引持久化(queue_index)
queue_index将消息索引(SeqId)写入到segment,每个segment对应一个文件(文件所在的目录由队列名称md5及mnesia数据目录决定)。每个文件最多可以保存SEGMENT_ENTRY_COUNT个消息索引信息。与消息本身的持久化相似,这些文件也是以整数来命名。那某一个消息对应的消息索引写在哪个segment文件怎么确定呢?其实很简单:用消息索引本身对SEGMENT_ENTRY_COUNT取整(只能确定一个消息的索引在一个segment文件中,无法确定具体的偏移量,每一次的写入只是append到文件的末尾)。

对应一个消息,queue_index模块在消息publish,deliver或者ack的时候,都要写入一块信息(分别以PUB_PERSIST_JPREFIX(或PUB_TRANS_JPREFIX)、DEL_JPREFIX、ACK_JPREFIX为前缀)。当一个segment文件中的publish次数和ack次数相同时,这个segment文件会被删除(这里还没想清楚,为什么要每个操作写一个信息,而不是在publish时写一次,ack时删除一次,求达人解释)。

为了避免过多的磁盘寻址,queue_index在segment文件之前又添加了一层缓冲:journal文件。在一个journal中的entry达到max_jounal_entries之前,所有的操作信息(publish,deliver,ack)都只是添加到文件的末尾;到达max_jounal_entries之后,会将journal中的所有操作信息写入到相应的segment文件,并清空journal文件。但是,这时的写入,并不是从journal文件中一条记录一条记录的读取,并写入到相应的segment文件,而是在写入journal时,在对应消息索引所属的segment信息中添加一个相应的entry(#segment.journal_entries,存在于内存中)。当要写入的#segment.journal_entries中,publish次数和ack次数相同的,那么就不需要写入任何数据。那有人肯定要问了,既然写入到segment文件时,不需要从journal文件里面读取数据,那还要那个journal文件做啥?直接写在#segment.journal_entries不就好了。问题是,只保存内存里,进程挂掉或者系统异常,这些数据就丢了,写入journal文件可以保证进程重新后,能够正确的恢复索引信息。

总结以上内容,消息索引在需要写入磁盘时,实际的执行流程如下:1. 将操作对应的消息索引信息append到journal文件;2. 将操作对应的消息索引信息添加到#segment.journal_entries;3. 检查是否需要把journal中的数据写入到segment文件中(也就是journal中的记录数是否达到max_jounal_entries),如果需要,则从#segment. journal_entries中读取每个entry并写入到相应的segment,并将journal文件和#segment. journal_entries清空。具体代码就不贴了,基本上与上述总结的流程一致。
1
5
分享到:
评论
3 楼 huotianjun 2015-10-13  
关于:为什么要每个操作写一个信息,而不是在publish时写一次,ack时删除一次

我的分析是这样的:当segments过大的时候,内存bistate->segments->segment->jentries
数组会全部flush到segment文件(append方式)。所以存在这样的情况:当一个消息ack时,segments 的jentries全部flush到文件了,这个消息的publish记录在segment中是没有的。所以不能简单的publish写一次,ack时删除一次(当publish记录还在segment 内存中的时候,确实是直接reset删除的)。
2 楼 jzhihui 2013-05-03  
csic_422 写道
请教一个问题,对于rabbitmq的持久化机制中的:exchange持久化、queue持久化、消息持久化之间在具体的实现上有什么区别?

exchange及queue的持久化(durable)主要是指元数据在RabbitMQ重启后是不是还存在,而消息持久化(persistent)是指是否要将消息内容持久化到磁盘,其实是两个不同的概念的问题,实现上类似,都是要写到磁盘,只不过exchange及queue的持久化是通过写数据库持久化到磁盘上的
1 楼 csic_422 2013-05-02  
请教一个问题,对于rabbitmq的持久化机制中的:exchange持久化、queue持久化、消息持久化之间在具体的实现上有什么区别?

相关推荐

    rabbitmq + spring boot demo 消息确认、持久化、备用交换机、死信交换机等代码

    本教程将详细介绍如何在Spring Boot应用中结合RabbitMQ实现消息确认、消息持久化、备用交换机以及死信交换机等功能。 首先,让我们理解这些概念: 1. **消息确认**:在RabbitMQ中,消息确认(Message ...

    消息队列:RabbitMQ:RabbitMQ消息持久化策略.docx

    消息队列:RabbitMQ:RabbitMQ消息持久化策略.docx

    RabbitMQ源码和客户端工具

    **RabbitMQ源码分析** RabbitMQ是一个开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步处理、任务队列以及服务间通信。源码分析有助于深入理解其内部...

    springboot+rabbitmq源码

    在“springboot+rabbitmq源码”项目中,我们可以看到两个关键部分:mq_producer和mq_consumer,这分别代表了生产者和消费者。在消息队列系统中,生产者是发送消息的应用,而消费者则是接收和处理这些消息的应用。 ...

    RabbitMQ-c源码

    **RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...

    NET Core 使用RabbitMQ源码.rar

    对于需要在RabbitMQ重启后仍保留的消息,可以设置消息和队列的持久化属性。这确保即使服务器崩溃,消息也不会丢失。 10. **监控和管理**: 使用RabbitMQ的Web管理界面,可以监控服务器状态、查看队列和交换机信息...

    NET Core 使用RabbitMQ源码 LPNETCORERABBITMQ.rar

    NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...

    Release_rabbitmq_C#源码_

    1. **RabbitMQ基础**:RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,提供高可用性、可扩展性和持久化消息存储。它支持多种编程语言,包括C#,并且可以通过安装RabbitMQ .NET客户端库(如`RabbitMQ....

    NET Core 使用RabbitMQ源码 NETCORERABBITMQ.rar

    NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...

    Java rabbitMQ源码

    Java RabbitMQ 源码分析 RabbitMQ是一款开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统、微服务架构中,用于解耦生产者和消费者,实现异步处理和负载均衡。...

    RabbitMQ 的消息持久化与 Spring AMQP 的实现详解

    RabbitMQ 的消息持久化是指在 RabbitMQ Server 中保留消息的机制,以便在 Server 崩溃或重启后可以恢复消息。消息持久化是通过在交换器、队列和消息三个方面实现的。 第一步,交换器的持久化。交换器是 RabbitMQ 中...

    RabbitMQ流量控制机制分析1

    "RabbitMQ流量控制机制分析" RabbitMQ 的流量控制机制是指 RabbitMQ 为了避免接收消息的速率过快,导致消息队列过长,影响系统性能的机制。该机制主要通过三部分来实现:开关闸门、关闭闸门和开启闸门。 一、开关...

    RabbitMQ源码学习资料

    8. **持久化与高可用**:RabbitMQ支持消息和队列的持久化,即使服务器重启,也能恢复之前的状态。此外,通过镜像队列,可以实现队列数据在多个节点间的复制,提高数据的安全性。 9. **插件系统**:RabbitMQ拥有丰富...

    RabbitMQ_Mirror机制分析

    RabbitMQ Mirror机制是RabbitMQ中的一种高可用性机制,旨在提供消息队列的高可用性和持久化。Mirror机制的核心是镜像队列(Mirror Queue),它是一个特殊的Backing Queue,内部包裹了一个普通的Backing Queue,用于...

    rabbitmq c++ 封装源码

    Rabbitmq c++ 封装 全部源码 自己测试过几天几夜,性能很好。 包含内容:布消息 消费消息 读取消息 取消息队列属性 文件列表: rabbitmq cpp rabbitmq h ampq h ampq cpp 具体平台的dll请大家下载后自己编译,源码...

    RabbitMQ详细讲解

    * 消息持久化:RabbitMQ 提供了消息持久化机制,确保消息的安全。 demo 下面是一些使用 RabbitMQ 的 demo 例子: demo(1):简单的消息队列 * 生产者发送消息到 RabbitMQ 服务器。 * 消费者从 RabbitMQ 服务器...

    rabbitmq源码包(rabbitmq+erlang)

    在提供的压缩包中,包含了RabbitMQ的源码以及运行所需的Erlang环境。让我们深入探讨这两个关键组件以及相关配置文件。 1. **Erlang**: Erlang是一种并发、分布式、容错的编程语言,是RabbitMQ的基础。版本25.3提供...

    RabbitMQ源码安装包

    3. **下载源码**:从RabbitMQ官方网站下载最新版本的源码包,这里提到的是`rabbitmq-server-3.1.5`。将下载的源码包解压到适当目录。 4. **构建与安装**:进入源码目录,执行配置、编译和安装命令: - `./...

Global site tag (gtag.js) - Google Analytics