- 浏览: 268422 次
- 性别:
- 来自: 杭州
最新评论
-
pjsong3101:
引用 public static void main ...
JVM中可生成的最大Thread数量 -
lzp459260276:
RabbitMQ源码分析 – 消息生命周期 -
huotianjun:
关于:为什么要每个操作写一个信息,而不是在publish时写一 ...
RabbitMQ源码分析 – 持久化机制 -
puyongjun_1989:
楼主你好, (256 * 1024) / 64 = 409 ...
JVM中可生成的最大Thread数量 -
chuqingq:
很不错,学习!
Erlang并发机制 –进程调度
(注:分析代码基于RabbitMQ 2.8.2)
当客户端通过basic.publish命令(AMQP定义)发布一个消息时,rabbit需要经过以下几个步骤处理消息:
1) 根据客户端传来的消息内容及相关属性(目标exchange,routing keys,mandatory及immediate属性)构造一个消息实体;
2) 根据要投递的exchange及routing keys匹配消息的目标投递队列名称;
3) 根据队列名称找到对应的处理进程ID;
4) 通过deliver消息(Erlang消息)向目标进程投递消息实体;
5) 队列对应进程收到投递的消息后,会试图向关联到此队列的消费者投递消息,如果投递失败,或者直接丢弃,或者将消息入队列,入队列时会根据消息及队列的属性(是否durable)判断是否需要写入磁盘,;
6) 保存到队列的消息,在下次有消费者关联到对应队列时,试图重新投递,直到投递成功,或者因为过期被丢弃掉。
本文的目的主要是分析上面各个步骤中的主要逻辑。
创建消息实体
消息实体由结构#basic_message表示,包含exchange_name,content,id,is_persistent,routing_keys几个域组成。没什么复杂逻辑,主要就是从客户端拿到这些信息后,再组织成一个#basic_message结构。
在实际投递消息之前,其实还有一层封装:#delivery结构,它只是在#basic_message的基础上封装了几个根据投递行为相关的属性,包含:mandatory(此值影响消息在未能成功路由到一个队列时的应对策略,如果为true,则会向客户端返回一个“无法路由(unroutable)”的错误消息;如果为false,则会直接丢弃消息),immediate(此值影响消息无法立即投递到消费者时的应对策略,如果为true,则会向客户端返回一个“无法投递(undeliverable)”的错误消息;如果为false,则服务器会将消息入队列,但是不保证消息最终会被消费者消费),sender(处理此消息的channel进程),message(对应上面的basic_message),msg_seq_no(从1开始的序列,每收到一个消息加1,此值只有存在AMQP中定义的事务的时候才有效)。
匹配目标队列
主要完成由客户端指定的routing keys匹配到目标队列的功能。在创建消费者时,消费者会指定在目标exchange上的绑定关系(bindings,通过queue.bind命令创建),匹配主要就基于这种绑定关系。在AMQP中,有四种最基本的exchange:direct,fanout,topic,headers。在匹配目标队列时,分成三类:direct和fanout的匹配类似,topic与headers各为一类。
direct与fanout匹配算法
主要代码如下:
match_routing_key(SrcName, [RoutingKey]) -> find_routes(#route{binding = #binding{source = SrcName, destination = '$1', key = RoutingKey, _ = '_'}}, []); match_routing_key(SrcName, [_|_] = RoutingKeys) -> find_routes(#route{binding = #binding{source = SrcName, destination = '$1', key = '$2', _ = '_'}}, [list_to_tuple(['orelse' | [{'=:=', '$2', RKey} || RKey <- RoutingKeys]])]). find_routes(MatchHead, Conditions) -> ets:select(rabbit_route, [{MatchHead, Conditions, ['$1']}]).
(参见[$RABBIT_SRC/src/rabbit_router.erl --> match_routing_key/2])
SrcName为目标exchange的名称;direct的exchange在匹配时,传入的第二个参数是客户端发送的routing keys,fanout传入的是[‘_’]。
从上面代码可以看出,匹配主要使用ets:select/2函数来完成。只有一个routing key时,match_routing_key函数完成的功能很简单:按照exchange名称(source),routing key(key)在rabbit_route表中进行匹配,并把匹配记录的队列名称(destination)返回。其中rabbit_route表是在客户端通过queue.bind命令(AMQP定义)绑定队列与exchange时,由rabbit写入的,参见[$RABBIT_SRC/src/rabbit_binding.erl --> add/2]。
fanout类型的exchange在匹配时传入[‘_’],会匹配到关联到exchange的所有队列。
当有多个routing keys时,find_routing中最终传入的第二个参数会类似如下形式:
[‘orelse’, {‘=:=’, ‘$2’, RKey1}, {‘=:=’, ‘$2’, RKey2}, {‘=:=’, ‘$2’, RKey1}, {‘=:=’, ‘$2’, RKey3}]
什么意思呢,{‘=:=’, ‘$2’, RKey1}是说 RKey1与$2所代表的变量要精确相等,第一个元素’orelse’代表其它的元素是“或”关系,也就是说只要#route结构中的key匹配RKey1, RKey2, RKey3中的任意一个就会返回匹配的队列名称。
topic匹配算法
topic的exchange基于类似正则表达式的方式来说明routing pattern(同bindings)。AMQP对用于topic类型exchange的routing key(由生产者在发布消息时指定)有如下规定:由0个或多个以”.”分隔的单词;每个单词只能以一字母或者数字组成。routing pattern增加如下规则:*可以匹配单个单词;#可以匹配0个或者多个单词。例如:routing pattern “*.stock.#”会匹配到routing key “usd.stock”和“eru.stock.db”,但不会匹配“stock.nasdaq”。
rabbit在收到一个topic类型exchange的绑定请求时,会根据routing pattern生成一个Trie结构:其中的边为以“.”分隔的单词,结点唯一编号。一般的Trie结构会是个树形结构,但是在AMQP的场景下,退化成类似一个链表。对于“*.abc.xyz.#.end”会生成如下结构:
(代码参见[$RABBIT_SRC/src/rabbit_exchange_type_topic.erl --> internal_add_binding/1])
在收到一个消息时,rabbit会根据绑定时创建的trie结构进行搜索,比如对于routing key为test.abc.xyz.123.456.end搜索路径如下:
从node1到node2的路径,rabbit会首先尝试用“test”匹配,发现没到直达路径,然后尝试以“*”匹配,成功,node2到node3以“abc”匹配成功,node3到node4同理,node4到node5以“#”号匹配,然后在node5结点要跳过任何不能匹配node5到node6路径的单词,这里是“456”,最后匹配到“end”。
rabbit在这个算法的实现上有点奇怪,例如从node2到node3的匹配,即使“abc”路径已经匹配,但还是会尝试通过“*”和“#”匹配,增加了很多无意义的比较。
(代码参见[$RABBIT_SRC/src/rabbit_exchange_type_topic.erl --> trie_match/2])
headers匹配算法
AMQP协议对headers类型的exchange的匹配算法有如下规定:由“x-match”头来控制匹配模式,分两种,all和any,类似于布尔运算里的AND和OR:如果匹配模式是all,则目标消息中的头信息的值必须匹配所有在绑定时指定的头信息;如果为any,则只要有一个头信息的值匹配就可以。其中的匹配是指,要么绑定时指定的头信息值为空,要么目标消息中的头信息的值与绑定时指定的值完全一致。
rabbit在实现时,会对绑定时指定的头信息和目标消息中的头信息进行排序(以头信息的键升序排列),然后逐个比对。
想不通这里为什么进行字母序排序(代码里也称这里的匹配算法是Horrendous matching algorithm,具体参见[$RABBIT_SRC/src/rabbit_exchange_type_headers.erl --> headers_match/2]),基于hash map类的数据结构更高效一些。
查找队列进程
每一个队列在创建时,会在rabbit_queue数据表中写入#amqqueue,其中跟进程相关的两个域是pid和slave_pids(在HA策略下slave_pids才有效,代表各个slave结点上的镜像队列的进程ID),pid代表的进程由[$RABBIT_SRC/src/rabbit_amqqueue_process.erl]。
查找的过程其实很简单,就是从rabbit_queue数据表中,根据队列名称找到相应的#amqqueue记录,并将相应的pid和slave_pids全部返回。
投递消息
找到队列对应的处理进程后,通过代理的方式(见[$RABBIT_SRC/src/delegate.erl])向各个队列进程(包含镜像进程)发送deliver消息。rabbit_amqqueue_process在收到deliver消息后,会尝试将消息投递到某个消费者(参见[$RABBIT_SRC/src/rabbit_amqqueue_process --> attempt_delivery/3])。最终会通过[$RABBIT_SRC/src/rabbit_writer.erl --> send_command_and_notify/5]调用以basic.deliver命令(AMQP)将消息内容发送给消费者。如果投递失败,有两种可能的结果:一种直接丢弃,另一种会将消息保存在队列中,等待后续有新的消费者加入时重新投递。保存在队列中的消息会根据durable属性来判断是不是需要写入磁盘,一般情况下,此值为false,消息只保存在内存中,如果需要持久化,此值为true,消息会写入磁盘。
队列机制
跟这部分相关的涉及到rabbit_msg_store模块和rabbit_queue_index模块。而且backing queue为rabbit_mirror_queue_master的队列还涉及到GM(Guaranteed Multicast)相关的东西,所以打算专门写一篇分析这一块。
发表评论
-
RabbitMQ源码分析 – 持久化机制
2012-08-21 09:34 14700(注:分析代码基于Rabb ... -
RabbitMQ源码分析 - 队列机制
2012-07-09 10:10 13323(注:分析代码基于RabbitMQ 2.8.2) 当r ... -
RabbitMQ源码分析 – 实体初始化
2012-06-11 16:15 6307(注:分析代码基于RabbitMQ 2.8.2) Con ... -
RabbitMQ源码分析 – 网络层
2012-05-30 13:31 8562(注:分析代码基于Rabb ... -
RabbitMQ源码分析 - 启动
2012-05-24 19:41 8783RabbitMQ是一个消息队列的实现,基于AMQP(Ad ... -
Erlang热部署 – 模块更新
2012-05-21 10:55 5152Erlang的热部署做的很完善,参见Release Hand ... -
Erlang并发机制 – 垃圾回收
2012-05-02 10:34 3912Erlang中每个进程都有独立的堆内存,默认的大小是23 ... -
Erlang并发机制 – 消息传递
2012-05-02 10:24 7526Erlang系统中,进程之间的通信是通过消息传递来完成的 ... -
Erlang并发机制 – 任务迁移算法
2012-05-02 10:22 3114一般情况下,在SMP环 ... -
Erlang并发机制 –进程调度
2012-04-10 22:43 9020Erlang调度器主要完成对Erlang进程的调度,它是 ... -
Erlang并发机制 - 进程
2012-03-26 21:24 7846在了解Erlang的并发机制之前,我们先来看一下Erlang与 ... -
CentOS6下编译Erlang R15B with wxWidgets
2012-02-23 16:41 19382如果不需要安装wxWidgets的话,很简单,./con ... -
Tsung源码分析(五):Tsung数据统计
2012-02-22 19:59 3045上一篇说明Tsung的服务器监控机制的时候提到,收集到监 ... -
Tsung源码分析(四):Tsung服务器监控
2012-02-21 22:18 4478Tsung在进行压力测试同时,也可以监控服务器结点上的C ... -
Tsung源码分析(三):Tsung插件式协议支持
2012-02-17 16:15 5124在Websocket for Tsung一文中有提到如 ... -
Tsung源码分析(二):Tsung压力生成过程
2012-02-17 12:55 3700上一篇讲到ts_config_server:newbeams通 ... -
Tsung源码分析(一):Tsung启动过程
2012-02-17 12:51 6856一方面,分析Tsung的架 ... -
Websocket for Tsung
2012-02-11 22:11 5726这篇博文距离上次提到要写差不多快两个月了,一方面时间不多 ... -
Windows下vimerl的配置以及扩展
2011-12-11 22:03 3606最近开始学习Erlang,一方面出于对其主要语言特征(高 ...
相关推荐
源码分析可以帮助理解队列的生命周期管理,包括创建、删除、绑定、解绑等操作。 6. **消息(Message)** 消息是通过RabbitMQ传输的基本单元,包含头信息和体。源码中会展示消息的序列化和反序列化过程,以及如何保证...
综上所述,"springboot+rabbitmq源码"项目展示了如何使用Spring Boot和RabbitMQ进行消息传递。通过`mq_producer`和`mq_consumer`模块,我们看到了生产者如何发送消息以及消费者如何接收和处理这些消息。此外,Maven...
.NET Core使用RabbitMQ是一个广泛应用于微服务架构中的消息队列技术,用于实现应用程序之间的异步通信和解耦。RabbitMQ是一个开源的消息代理,它遵循Advanced Message Queuing Protocol(AMQP)标准,允许不同语言的...
**RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...
NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...
标题中的"Release_rabbitmq_C#源码_"表明这是一个关于RabbitMQ的C#源代码发布...通过这个C#源码,开发者可以学习如何在RabbitMQ环境中实现数据落地和文件上传的完整流程,这对于构建可靠、高效的消息驱动系统至关重要。
NET Core 使用RabbitMQ源码 一、源码描述 .net core 使用RabbitMQ的demo。 二、功能介绍 RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布...
【标题】:“rabbitmq发送&接收消息” 在IT行业中,消息队列(Message Queue)是一种常用的技术,用于处理异步任务、解耦系统组件以及缓解高并发时的系统压力。RabbitMQ是一款开源的消息代理软件,它基于AMQP...
RabbitMQ消息中间件技术精讲(包含源码等) RabbitMQ消息中间件技术精讲(包含源码等) RabbitMQ消息中间件技术精讲(包含源码等)
RabbitMQ是目前最流行的消息队列系统之一,它基于AMQP(Advanced Message Queuing Protocol)协议,提供可靠的消息传递服务。本篇文章将深入探讨如何使用RabbitMQ实现消息插队,以"多人投资"为例,重点讲解手动投资...
Java RabbitMQ 源码分析 RabbitMQ是一款开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统、微服务架构中,用于解耦生产者和消费者,实现异步处理和负载均衡。...
RabbitMQ是一款开源的消息队列系统,广泛应用于分布式系统中,用于解耦应用...在分析mq-demo这个示例项目时,我们可以看到如何在实际应用中使用RabbitMQ,包括创建连接、声明交换器和队列、发送和接收消息等基本操作。
Rabbitmq c++ 封装 全部源码 自己测试过几天几夜,...包含内容:布消息 消费消息 读取消息 取消息队列属性 文件列表: rabbitmq cpp rabbitmq h ampq h ampq cpp 具体平台的dll请大家下载后自己编译,源码是完整的。
RabbitMQ是一款开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于解耦应用组件、处理异步任务和提供高可用性。RabbitMQ由Erlang编程语言开发,以其稳定...
RabbitMQ是一款开源的消息队列系统...学习这部分源码,不仅可以提升你对RabbitMQ的理解,还能够帮助你在实际项目中灵活运用消息队列技术,优化系统的性能和可靠性。记得在实践中不断调试和测试,加深对各种模式的掌握。
在提供的压缩包中,包含了RabbitMQ的源码以及运行所需的Erlang环境。让我们深入探讨这两个关键组件以及相关配置文件。 1. **Erlang**: Erlang是一种并发、分布式、容错的编程语言,是RabbitMQ的基础。版本25.3提供...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度探讨RabbitMQ技术的书籍,旨在帮助读者理解和掌握如何在分布式环境中高效地部署和使用RabbitMQ消息队列。RabbitMQ作为一款广泛使用的开源消息中间件,是实现应用...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,专注于帮助读者理解和掌握如何在实际项目中高效地部署和使用这个强大的消息中间件。RabbitMQ作为开源的消息代理和队列服务器,广泛应用...
《RabbitMQ高效部署分布式消息队列实战篇》是一份深度解析RabbitMQ技术的教程,结合实际应用案例,旨在帮助读者深入理解并熟练掌握如何在分布式环境中高效部署和运用消息队列。RabbitMQ作为业界广泛采用的消息中间件...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,旨在帮助读者理解和掌握如何在实际项目中高效地运用这一强大的消息中间件。书中不仅涵盖了RabbitMQ的基础知识,还深入探讨了其在分布式...