- 浏览: 268092 次
- 性别:
- 来自: 杭州
最新评论
-
pjsong3101:
引用 public static void main ...
JVM中可生成的最大Thread数量 -
lzp459260276:
RabbitMQ源码分析 – 消息生命周期 -
huotianjun:
关于:为什么要每个操作写一个信息,而不是在publish时写一 ...
RabbitMQ源码分析 – 持久化机制 -
puyongjun_1989:
楼主你好, (256 * 1024) / 64 = 409 ...
JVM中可生成的最大Thread数量 -
chuqingq:
很不错,学习!
Erlang并发机制 –进程调度
(注:分析代码基于RabbitMQ 2.8.2)
当rabbit无法直接将消息投递到消费者时,需要暂时将消息入队列,以便重新投递。但是,将消息入队列并不意味着,就是将消息扔在一个队列中就不管了,rabbit提供了一套状态转换的机制来管理消息。
在rabbit中,队列中的消息可能会处理以下四种状态:
alpha:消息内容以及消息在队列中的位置(消息索引)都保存在内存中;
beta:消息内容只在磁盘上,消息索引只在内存中;
gamma:消息内容只在磁盘上,消息索引在内存和磁盘上都存在;
delta:消息的内容和索引都在磁盘上。
注意:对于持久化消息,消息内容和消息索引都必须先保存在磁盘上,然后才处于上述状态中的一种。其中gamma很少被用到,只有持久化的消息才会出现这种状态。
rabbit提供这种分类的主要作用是满足不同的内存和CPU需求。alpha最耗内存,但很少消耗CPU;delta基本不消耗内存,但是要消耗更多的CPU以及磁盘I/O操作(delta需要两次I/O操作,一次读索引,一次读消息内容;delta及gamma只需要一次I/O操作来读取消息内容)。
rabbit在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果内存中的消息数量大于这个数量,就会引起消息的状态转换,转换主要两种:alpha -> beta, beta -> delta。alpha -> beta的转换会将消息的内容写到磁盘(如果是持久化消息,在这一步转换后,消息将会处于gamma状态),beta -> delta的转换会更进一步减少内存消耗,将消息索引也写到磁盘。
运行时怎么体现消息的各种状态呢?在队列的状态vqstate(参见[$RABBIT_SRC/src/rabbit_variable_queue.erl])结构中,存在q1,q2,delta,q3,q4五个队列(使用q1~q4使用Erlang的queue模块,delta存储结构依赖于消息索引的实现),其中q1,q4只包含alpha状态的消息,q2,q3只包含beta和gamma状态的消息,delta包含delta状态的消息。
一般情况下消息会以p1->p2->delta->q3->q4的顺序进行状态变换:消息进入队列时,处于alpha状态并保存在内存中(q1或q4),然后某个时刻发现内存不足,被转换到beta状态(q2,q3)(这时候其实有两个转换q1->q2,q4->q3),如果还是内存不足,被转换到delta状态(delta)(q2->delta,q3->delta);当从队列中消费消息时,会先从处于alpha状态的内存队列(q4)中获取消息,如果q4为空,则从beta状态的队列(q3)中获取消息,如果q3也为空,则会从delta状态的消息队列中读取消息,并将之转移到q3。
上述步骤只是个一般步骤,实际运行时,中间的步骤都是可以跳过的,比如消息可能在一开始被放在q4队列中、q1队列中的元素会直接跳到q4队列中(这两种情况下,q3,delta,q2队列都为空);当delta队列为空时,q2队列可以直接跳到q3队列,q1队列也可以直接跳到q3。
上述这些状态转换主要发生的两个地方为:从队列中获取消息时([$RABBIT_SRC/src/rabbit_variable_queue.erl –> queue_out/1])以及减少内存使用量时([$RABBIT_SRC/src/rabbit_variable_queue.erl –> reduce_memery_use/1])。下面详细说明下这两个操作的逻辑。
$RABBIT_SRC/src/rabbit_variable_queue.erl -> queue_out/1 queue_out(State = #vqstate { q4 = Q4 }) -> case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of {empty, _State1} = Result -> Result; {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end; {{value, MsgStatus}, Q4a} -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> fetch_from_q3/1 fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4 }) -> case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; {{value, MsgStatus}, Q3a} -> State1 = State #vqstate { q3 = Q3a }, State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> true = ?QUEUE:is_empty(Q2), %% ASSERTION true = ?QUEUE:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; // A {true, false} -> maybe_deltas_to_betas(State1); // B {false, _} -> State1 end, {loaded, {MsgStatus, State2}} end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> maybe_deltas_to_betas/1 maybe_deltas_to_betas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, index_state = IndexState, pending_ack = PA, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, end_seq_id = DeltaSeqIdEnd } = Delta, DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, PA, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, case ?QUEUE:len(Q3a) of 0 -> maybe_deltas_to_betas( State1 #vqstate { delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); Q3aLen -> Q3b = ?QUEUE:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> State1 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, q3 = ?QUEUE:join(Q3b, Q2) }; // C N when N > 0 -> Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, end_seq_id = DeltaSeqIdEnd }), State1 #vqstate { delta = Delta1, q3 = Q3b } end end.
$RABBIT_SRC/src/rabbit_variable_queue.erl -> reduce_memory_use/1 reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun push_betas_to_deltas/2, fun limit_ram_acks/2, State), State1 reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, rates = #rates { avg_ingress = AvgIngress, avg_egress = AvgEgress }, ack_rates = #rates { avg_ingress = AvgAckIngress, avg_egress = AvgAckEgress } }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; S1 -> {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> ReduceFun(QuotaN, StateN) end, {S1, State}, case (AvgAckIngress - AvgAckEgress) > (AvgIngress - AvgEgress) of true -> [AckFun, AlphaBetaFun]; false -> [AlphaBetaFun, AckFun] end), {true, State2} end, case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), permitted_beta_count(State1)) of ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; _ -> {Reduce, State1} end chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> 0; chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
$RABBIT_SRC/src/rabbit_variable_queue.erl -> push_alphas_to_betas/2 push_alphas_to_betas(Quota, State) -> {Quota1, State1} = push_alphas_to_betas( fun ?QUEUE:out/1, fun (MsgStatus, Q1a, State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } end, Quota, State #vqstate.q1, State), {Quota2, State2} = push_alphas_to_betas( fun ?QUEUE:out_r/1, fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } end, Quota1, State1 #vqstate.q4, State1), {Quota2, State2}.
发表评论
-
RabbitMQ源码分析 – 持久化机制
2012-08-21 09:34 14691(注:分析代码基于Rabb ... -
RabbitMQ源码分析 – 消息生命周期
2012-06-25 10:53 9977(注:分析代码基于Rabb ... -
RabbitMQ源码分析 – 实体初始化
2012-06-11 16:15 6301(注:分析代码基于RabbitMQ 2.8.2) Con ... -
RabbitMQ源码分析 – 网络层
2012-05-30 13:31 8551(注:分析代码基于Rabb ... -
RabbitMQ源码分析 - 启动
2012-05-24 19:41 8779RabbitMQ是一个消息队列的实现,基于AMQP(Ad ... -
Erlang热部署 – 模块更新
2012-05-21 10:55 5147Erlang的热部署做的很完善,参见Release Hand ... -
Erlang并发机制 – 垃圾回收
2012-05-02 10:34 3899Erlang中每个进程都有独立的堆内存,默认的大小是23 ... -
Erlang并发机制 – 消息传递
2012-05-02 10:24 7514Erlang系统中,进程之间的通信是通过消息传递来完成的 ... -
Erlang并发机制 – 任务迁移算法
2012-05-02 10:22 3111一般情况下,在SMP环 ... -
Erlang并发机制 –进程调度
2012-04-10 22:43 8999Erlang调度器主要完成对Erlang进程的调度,它是 ... -
Erlang并发机制 - 进程
2012-03-26 21:24 7825在了解Erlang的并发机制之前,我们先来看一下Erlang与 ... -
CentOS6下编译Erlang R15B with wxWidgets
2012-02-23 16:41 19359如果不需要安装wxWidgets的话,很简单,./con ... -
Tsung源码分析(五):Tsung数据统计
2012-02-22 19:59 3034上一篇说明Tsung的服务器监控机制的时候提到,收集到监 ... -
Tsung源码分析(四):Tsung服务器监控
2012-02-21 22:18 4471Tsung在进行压力测试同时,也可以监控服务器结点上的C ... -
Tsung源码分析(三):Tsung插件式协议支持
2012-02-17 16:15 5119在Websocket for Tsung一文中有提到如 ... -
Tsung源码分析(二):Tsung压力生成过程
2012-02-17 12:55 3694上一篇讲到ts_config_server:newbeams通 ... -
Tsung源码分析(一):Tsung启动过程
2012-02-17 12:51 6837一方面,分析Tsung的架 ... -
Websocket for Tsung
2012-02-11 22:11 5720这篇博文距离上次提到要写差不多快两个月了,一方面时间不多 ... -
Windows下vimerl的配置以及扩展
2011-12-11 22:03 3596最近开始学习Erlang,一方面出于对其主要语言特征(高 ...
相关推荐
1. **消息队列原理**:RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,它提供了一种可靠的消息传递机制,确保消息在生产者和消费者之间正确传输,即使在高并发、网络不稳定或服务暂时不可用的情况下。...
**RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...
源码分析: 1. **源码结构**:解压后,你将看到一个典型的项目结构,包括src、erlang、plugins、docs等目录。`src`目录包含了RabbitMQ的核心源码,用Erlang语言编写;`erlang`包含了Erlang编译器需要的库和依赖;`...
7. **最佳实践与性能调优**: 在实际应用中,理解如何设置合理的队列大小、消费速率、心跳间隔等参数,以及如何利用RabbitMQ的日志和监控数据进行性能分析和调优,是提升系统性能的关键。 8. **安全与权限控制**: ...
**RabbitMQ源码分析** RabbitMQ是一个开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步处理、任务队列以及服务间通信。源码分析有助于深入理解其内部...
**正文** RabbitMQ延迟队列插件,即rabbitmq_delayed_message_exchange-3.8.0,是一个针对...在源码分析和软件开发过程中,深入理解这个插件的工作机制,有助于我们更好地利用RabbitMQ延迟队列来优化我们的应用架构。
通过分析源码和数据库的交互,我们可以更透彻地掌握RabbitMQ的核心机制和优化技巧。 RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的一款开源消息中间件,它提供了高可用、高可靠的消息传递服务。...
学习RabbitMQ源码能帮助我们理解其内部工作流程,如消息的发布、接收、存储和分发机制,以及服务器如何处理网络通信、并发和故障恢复。这对于优化性能、调试问题以及自定义扩展功能都非常有帮助。在分析mq-demo这个...
Java RabbitMQ 源码分析 RabbitMQ是一款开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统、微服务架构中,用于解耦生产者和消费者,实现异步处理和负载均衡。...
总之,JMeter-RabbitMQ源码提供了测试和优化RabbitMQ服务器性能的有效途径,不仅适合测试工程师进行性能评估,也为开发人员提供了深入理解消息队列工作原理和优化其性能的宝贵资源。通过源码学习和实践,我们可以更...
在“museum-rabbitmq-demo.zip”中,包含的源码可能涵盖了RabbitMQ的Java客户端库(如`com.rabbitmq.client`)的使用,展示了如何创建连接、声明交换器、绑定队列、发送和接收消息等操作。通过阅读和运行这些示例...
**RabbitMQ消息服务器 v3.8.12 源码分析** RabbitMQ是一款广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,为分布式系统提供了可靠的异步通信解决方案。源码学习是理解其内部机制、...
通过以上分析,我们可以看到,`rabbitmq-demo`源码示例涵盖了RabbitMQ实现定时任务的关键步骤,包括消息的创建、发送、延迟处理和消费。理解并实践这些步骤,可以帮助开发者在自己的项目中灵活地运用RabbitMQ进行...
通过分析源代码,你可以了解到如何在Java、Python、Ruby等语言中与RabbitMQ进行交互,如何创建和配置Exchanges、Queues、Bindings,以及如何处理各种异常情况。同时,这也有助于理解RabbitMQ内部的并发控制、网络...
通过分析这些源码,我们可以深入了解RabbitMQ如何利用Erlang的特性来实现消息的发布、订阅、路由和存储等功能。同时,源码还可能包含配置文件、脚本和文档,帮助开发者了解和定制RabbitMQ的行为。 深入研究RabbitMQ...
对于“源码”标签,我们可以深入研究 SpringBoot 的自动配置原理,以及 RabbitMQ 的客户端 API 实现。而“工具”可能指的是 IDE 插件、监控工具(如 RabbitMQ Management Console)或者日志分析工具等,这些工具能...
在这个"6个消息队列项目源码.rar"压缩包中,我们有六个不同的项目,它们都是基于C#语言实现的消息队列应用。以下是关于这些知识点的详细解释: 1. **消息队列原理**: - 消息生产者:创建并发送消息到队列。 - ...
在源码分析中,我们可以关注以下几个关键点: - 连接工厂类的创建方法,了解如何配置RabbitMQ连接参数。 - 连接池的初始化过程,包括如何设置最大连接数和超时策略。 - 获取和释放连接的逻辑,以及对象池策略的实现...
这些文件提供了一个全面的学习环境,涵盖了RabbitMQ的消费者和生产者端的实现,以及可能的Web应用集成,适合那些希望在Java项目中实施消息队列机制的开发者。通过分析和运行这些示例,用户可以深入了解RabbitMQ的...
### nova-compute源码分析 #### 一、Nova概述及工作职责 **1.1 Nova的角色与任务** Nova是OpenStack项目中一个至关重要的组成部分,它主要负责虚拟机实例的生命周期管理,包括创建、调度、运行和销毁等功能。具体...