RabbitMQ下高性能和高扩展性的路由拓扑
为一个高度可扩展性的系统设计一个好的路由拓扑就像是映射一张图。许多事情需要考虑到,比如故障,环境的约束,消息的具体实现,还有性能策略。我们经常遇到的困难是在给我们的需求选择合适的路由时缺少灵活性和表现力。RabbitMQ在此时脱颖而出。
基本概念
熟悉一般意义上”消息“的读者应该知道路由消息的概念(消息由A路由到B),路由可以很简单,也可以相当复杂,当为一个可扩展的,复杂的系统设计一个路由拓扑时,这个路由拓扑必须是优雅的。保持干净的和解藕的,组件能够自如地应对多变的负载。路由拓扑可以用一个简单的映射或者复杂的图来描述。在最简单的一种形式中,一个路由拓扑可以用一堆节点描述,比如
分层节点:
对于那些刚接触RabbitMQ或者AMQP(注意Rabbit支持很多协议,包括STOMP,HTTP,HTTPS,XMPP,还有SMTP),下面列出一些基本组件的描述:
-
交换(Exchange) 服务端的一个实体,接收来自生产者应用发过来的消息并且可选的将这些消息路由到服务端的队列里。
-
交换类型(Exchange type) 交换的一个指定模型的算法及实现。与交换实体(exchange instance)不同,后者是一个在服务端接收并路由消息的实体。
-
消息队列(Message queue) 一个命名了的实体,它处理消息并将它们发往消费者应用。
-
绑定(Binding) 一个连接消息队列和交换的实体。
-
路由匙(Routing key) 一个虚拟的地址,交换可以用它来决定该如何路由一个特定的消息。
对于点对点(point-to-point)路由,路由匙(routing key)通常就是一个消息队列的名字。对于主题 发布订阅(topic pub-sub)路由,路由匙(routing key)通常实质上是分层的:
api.agents.agent-{id}.operations.{operationName}
在更复杂的情形下路由匙(routing key)可以由路由消息的头域(header fields)和内容(body content)组成。一个交换(exchange)检查一个消息的属性(properties),头域(header fields),内容(body content)以及可能来自其他资源的数据,然后决定如何路由消息。源自上面路由匙(routing key)想法的一个绑定模式(binding pattern)可能看起来像api.agents.*.operations.*,在这个模式中,我们用绑定模式(binding pattern)api.agents.*.operations.*绑定交换(exchange) E1到队列(queue) Q1,因此任何发往E1的消息将会路由至Q1,只要它们的路由匙(routing key)符合这个绑定模式(binding pattern)。
Rabbit broker在结构上不同于JMS broker。每个RabbitMQ服务都由至少一个节点(node)组成(broker),或者更典型的,一个集群(cluster)里的节点。每个节点有一个默认的虚拟主机,“/”,可以创建更多的虚拟主机,如“/development”。Rabbit虚拟主机跟Tomcat的虚拟主机相似,它们都将broker的数据划分到sub-sets里。交换(exchange)和队列(queue)就在这些虚拟主机中。当一个用户认证连接时,他连接的是一个Rabbit节点上的虚拟主机。
下面演示一些代码,我们连接一个Rabbit节点,声明一个发布消息用的交换(exchange),一个消费消息用的队列(queue),一个绑定模式(binding pattern),然后发布一些消息,使用
RabbitMQ java client api:
package org.demo.simple.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public final class RocketSender {
public void sendRockets() throws IOException {
List<String> rocketsWithRoutings = new RocketRouter().build();
Connection connection = new ConnectionFactory().newConnection();
Channel channel = connection.createChannel();
String rocketExchange = "rockets.launched";
channel.exchangeDeclare(rocketExchange, "topic");
String rocketQueue = channel.queueDeclare().getQueue();
channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");
for (String rocketTo : rocketsWithRoutings) {
channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
}
channel.close();
connection.close();
}
}
一个简单的“消费”使火箭“着陆”,如下:
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(rocketQueue, false, queueingConsumer);
int landed = 0;
while (landed < launched) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
String rocketLanded = new String(delivery.getBody());
if (rocketLanded.equalsIgnoreCase("Alderaan")) {
System.out.println("That's no moon, that's a space station.");
}
landed++;
}
问题
考虑在可扩展的环境下什么样的路由策略表现得最好,并且它可以使性能也得到改进,这里有很多选择。通常关于“消息”很棒的一点就是,由于“消息”配置的多样性,我们总是可以找到一条合适的配置来解决当前和增长中的需求。
我们让事情简单点,考虑两种策略:
- 采用分层的路由匙(routing key),将路由划分的很细,更少的主题交换(topic exchange)。
- 大量的直接交换(direct exchange)和队列(queue),对应少得多的路由划分(routing partitions)。
每个场景基于这样一个用例:每个应用必须同时在生产者端和消费者端都是可扩展的(scale):
从哪开始
在探究一个路由(routing)解决方案前对你的环境和组件进行评估是一个好主意,这个解决方案会随着时间推移干净并有效地实现可扩展。比如,什么是可扩展的?通常的,解耦,分布式,异步,并行,抽象分级以及其他一些并不直接关联的东西都叫可扩展的。然后考虑什么要素是当前或者潜在的瓶颈。一个基本的原则是高traffic/volume路需要更多有效的吞吐量,否则在你的分布式应用中,你将承受瓶颈的风险。一个尝试是根据traffic重新排列或者组成一个heat map。下一步,你能将你的traffic分类吗,有没有overarching模式,主题(topics)相似的消息类型,以及它们之间有什么关系?现在开始考虑整合,如何,哪里可以有效地得到改进,应用测试模式解决那些热点(heat points),为了可扩展解耦以及增强性能。
通用的路由考虑点
所有的交换类型(exchange types)有不同的表现形式。下面列出一些通用的规则:
- 在一个应用的图里,如果你的域(domain)只能容纳有限的路由匙(routing key),那么生成许多扇面交换(fanout exchange)应该是个正确的选择(1比1的映射,一个交换(exchang)对应一个路由匙(routing key))
- 如果你拥有永久的,无限数量的路由匙(routing key),考虑主题交换(topic exchange)。
- 对于主题路由(topic routing),性能会随着绑定(binding)的增长而下降。
- 扇面交换(fanout exchange)非常快,因为它没有路由这个过程,然而如果大量的队列(queue)绑定在这个交换(exchange)上的话,一切就变了(意指扇面交换(fanout exchange)就没有想象中那么快了 译者注)。
- 直接交换(direct exchange)是主题交换(topic exchange)中比较快的一种类型,在你不需要考虑未知因素(wild card)的情况下可以使用它。
- 解决跨100,000+的队列(queue)产生的问题是非常单调乏味的,与之相对的如果面对的是更多的绑定(binding),更少的交换(exchange)和队列(queue)产生的拓扑,情况将为大为改观。
- 一个非常高数量的交换(exchange)和队列(queue)需要更多的成本,这个也许是有意义的,但也要看情况。
RabbitMQ 2.4.0的版本于2011年3月23日发布了,一个优化过的新的主题路由(topic routing)算法可以获取到了,它比之前的主题(topic)算法快60倍,据此,一个推荐的方案出现了,那就是更少的交换(exchange)和队列(queue),更多的绑定(routing),因为路由耗时现在降到了最小。
性能
什么是便宜的?
根据内存成本,交换(exchange)和绑定(binding)。RabbitMQ构建在
Erlang之上,每个节点(node)(broker)是Erlang中的一个进程(process),每个队列(queue)同样如此,在Erlang里,默认Erlang VM 进程(process)的限制是1M,这个可以增长。然而,交换(exchange)由于可扩展性的原因,并不是一个进程(process),它简单的是RabbitMQ内置Mnesia数据库的一条纪录。在集群(cluster)里,声明一个交换(exchange)会导致它出现在这个集群的所有节点上,而声明一个队列(queue)仅仅创建在这些节点中的一个上面。这就解释了为什么交换(exchange)存活着的节点可以重启或者在一个集群里创建一个节点,但是队列(queue)却不能这样做。
对绑定搅动(binding churn)的担心。在策略2中,如果你创建了许多新队列(queue)和它们的绑定(binding),不管消费者什么时候取消息,你都可能出问题。比如,给定交换(exchange) E1...En,许多消息发布至这些交换(exchange)中,不管什么时候消费者Cm连接上来时,它会通过它自己的队列(queue)创建绑定(binding)连接到所有E1...En,依赖于连接的速度,这可能会导致问题。
为了缓解绑定搅动(binding churn),考虑exchange-to-exchange绑定,来自2.3.1版本的新东西。每个消费者可以拥有自己的二级交换(secondary exchange) Ym,该交换(exchange)必须非auto-delete的。然后绑定所有的E1...En至Ym。用这种方式这些绑定(binding)总是存在。在这种情景下,不管什么时候消费者Cm连接上来时,它简单的只需要声明它自己的队列(queue)然后将这个队列(queue)绑定到Ym上。如果Ym是一个扇面交换(fanout exchange),它将会非常快并且可以减少绑定搅动(binding churn)速率至每个连接一次,而不是潜在的每个连接n次。
用例
exchange-exchange 可扩展性用例
考虑一个带有自主agent的server端应用。每个agent在一个虚拟主机上,虚拟主机作为一个elastically-scaled系统的一部分。当每个agent启动时,它发送消息给server,说我在线了,许多其他消息如认证和数据传输的消息紧接而至。如果我们有1,000个agent,每个声明50个直接交换(direct exchange),队列(queue)和绑定(binding),然后每个agent必须知道服务端的队列(queue),因为它需要通过queue.declare操作履行绑定合约。那并不是一个可扩展性的解决方案。
现在考虑创建一个共享的主题交换(topic exchange):一个为agent到服务端路径提供,另外一个为服务端到agent路径提供,第三个处理未认证的agent,它路由至那些不需要安全认证的队列。现在我们划分绑定模式(binding pattern),消息路由匙(routing key),然后将其中的一套应用到每个服务端上,这个服务端被所有agent连接所共享。然后,在它最简单的形式下,当每个agent在线了,它声明一个私有的交换(exchange)和队列(queue),绑定这个交换(exchage)到共享的主题交换(topic exchange)上。
现在关系图可以通过exchange-to-exchange映射来描述,这种映射减少了搅动速率并且将agent从不得不“知道”服务端的queue状况下解藕了。使用这种模式的系统是干净,解藕并且可扩展的。
Elastic-Scaling用例
让我们将前一个场景往前再推进一步。我们已经使用了情形2中提到的主题 发布订阅(topic pub-sub)路由:很多直接的路由。现在我们说系统需要在数据中心里的采用50,000或更多的agent来猛增服务端应用的规模集群。我们如何应对突然变化的负载?
认证的客户端将路由消息从agent交换至服务端。它处理发布消息至单个消费者队列的所有操作,包括生成极频繁使用消息的那些队列。在10,000个客户端每分钟产生大约60,000个消息或者每天86,400,000个消息的情况下,当前拓扑会有潜在的瓶颈。解决方法非常简单,RabbitMQ通过调整配置,可以每天处理超过1,000,000,000个消息,比如你选择是否持久化消息。
我们的服务端应用现在运行在一个RabbitMQ集群上。记住在一个集群里,声明一个交换(exchange)会导致这个交换(exchange)出现在所有的节点上,而声明一个队列(queue)仅仅在这些节点中的一个中创建,因此我们不得不配置一个方案。
生产者和消费者之间的负载均衡
当更多客户端应用(Agents)在线时,为了有效地处理这些潜在的非常高的负载,我们可以用一些方法修改拓扑。首先,跨越一个Rabbit集群从之前说的配置到负载均衡消息的优化。我们可以在Rabbit集群内部给每个节点创建一个队列(queue)。如果我们拥有4个节点,为每个hight-traffic队列(queue),我们创建 hfq.{0,1,2,3}来执行操作。现在每个agent可以从0到3中随机选一个数来选择对应的节点,或者更缜密的使用round-robin的实现,采用RabbitMQ发布消息的话,有一个RPC调用,或者你使用
Rabbit management插件得到节点数量,然后你可以将之用在你的round-robin算法上。
采用Round-Robin分发的worker queues
worker queues,或者task queues,通常被用来将耗时的任务分布在多个worker上,它很容易并行工作。此外,这个拓扑也可以应用于忽略资源密集型任务的需求,同时它必须是阻塞的直到任务完成。运行几个worker queue可以允许这些任务在它们之间分布。
采用worker queues,默认,Rabbit使用一个round-robin分发方法,依次发送每个消息至下一个消费者。每个消费者大约接收相同数量的消息。如果你声明一个队列(queue),这个队列(queue)有3个竞争的消费者,将之绑定到交换(exchange),然后发送20,000个消息,消息0将会路由到消费者1,消息1到消费者2,消息2到消费者3,以此类推。如果我们有一堆积压的任务,我们可以简单的添加更多的worker,这样就很容易地让系统可扩展。
性能
内存
以上选择中没有一个必然地减少地RabbitMQ的高负载。这里没有对交换(exchange)和队列(queue)数量有严格的限制,一个人在一个broker上创建,运行100,00个队列(queue)是没有问题的。使用正确的跳转和足够的RAM,你可以很好地运行超过1000,000的队列(queue)。
RabbitMQ动态推送消息至磁盘来释放RAM,因此一个队列(queue)的内存足迹不是依赖于它的内容。在一个队列(queue)空闲了10秒或更多时间后,它将“休眠”,同时导致那个队列(queue)上的GC。结果是,一个队列(queue)需要的内存的数量可以动态地收缩。比如,可能1000个空的,空闲的队列(queue)占了10MB的RAM。当它们处于激活状态(Active)(即使是空的),依赖于内存碎片,它们可能当然消耗了多得多的内存。将它们强制休眠来测试行为是困难的,因为Erlang VM并不是立刻将内存返还给OS。
然而,你可以观察一个大型进程(process)的休眠, 这个过程会产生非常碎片化的内存,来达到这个目的,因为回收的数量可以足够的强制VM将内存返还给OS。如果你运行一段测试代码,稳定地增加Rabbit中地内存足迹,你可以观察到对闲置进程(process)休眠的影响,因为它减少了内存使用速率的增长。
事务
在10,000个消息上事务的发布需要花费4分钟。RabbitMQ的一个新特性,被称为“
Publisher Confirms”,可以使同样一段带带有事务性的代码快100倍以上。如果你不是明确地需要实现事务,但是需要这个验证,你可以考虑这个选择。
外卖
最后这里有一些外卖在你实现地基础上帮助你获得最好地性能:
- 新topic路由算法的优化最多可以比之前的快60倍。
- topic biding模式使用wildcards ‘*’,这个符号匹配单个词,比‘#’快多了,后者匹配0或者更多的词。在路由桌面上wildcards‘#’的行进要比‘*’花费更多的时间。
- exchange-to-exchange绑定改进了解耦,增加了拓扑的灵活性,减少了绑定搅动,并且提升了性能。
- RabbitMQ Publisher Confirm比AMQP事务快100倍以上。
- 一个queue空闲超过10秒以上后,它将“休眠”,queue上的GC会减少,导致queue的内存需求会戏剧性的减少。
- worker queues在并行和分布式工作负载上有所帮助。
- Rabbit集群里的分布式worker queue可以帮助实现scale。
- 负载均衡你的拓扑。
在这个主题上,本篇文章毫无意义(作者自谦语 译者),事实上还有很多模式,拓扑和性能的细节需要考虑。一个策略,通常,依赖如此多的因素,但是我希望本文足够概括,可以在正确的方向上帮助到我们或者至少引起我们思考。
获取它
Maven
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
(全文完)
分享到:
相关推荐
RabbitMQ是一个高度可扩展且跨平台的消息代理系统,它基于开放标准——高级消息队列协议...通过深入理解和熟练使用RabbitMQ,开发者可以构建出健壮的微服务架构,处理大量并发的异步任务,以及实现系统的解耦和扩展性。
8. **集群(Clusters)**:RabbitMQ可以通过组建集群提高可用性和扩展性,多个节点共享队列和数据,实现故障转移和负载均衡。 9. **网络代理(Network Topologies)**:RabbitMQ可以配置为不同的网络拓扑结构,如...
将RabbitMQ驱动与.NET Core结合,开发者可以在这些平台上构建可扩展、高并发的应用,利用消息队列来解耦各个组件,提高系统的可靠性和可维护性。 标签"rabbitmq"和".net core"突出了这个驱动的核心特性:它是为...
RabbitMQ的使用确保了数据的可靠传输和系统的可扩展性。 实现Zipkin_RabbitMQ链路追踪的步骤通常包括以下几个部分: 1. **集成Zipkin客户端**:在微服务中引入Zipkin的客户端库,如Java的Brave、Python的open...
8. **线性可扩展性**:随着业务的增长,可以通过增加更多的RabbitMQ节点来线性提升处理能力,实现水平扩展。 9. **语言支持**:RabbitMQ提供了多种编程语言的客户端库,如Java、Python、Ruby、.NET、JavaScript等,...
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它主要用于解耦系统组件,提高系统的可扩展性和容错性。RabbitMQ作为一款广泛使用的开源消息代理和队列服务器,是实现这一功能的理想选择。在这个...
RabbitMQ系统升级是一个涉及多个组件和配置变更的过程,确保服务的稳定性和兼容性。在升级过程中,尤其需要注意的是RabbitMQ内部的数据存储系统——mnesia数据库的升级操作。Mnesia是Erlang语言的标准分布式数据库,...
总结,RabbitMQ通过分布式消息队列的实现,有效地解决了系统间的解耦和异步处理,提高了系统的稳定性和可扩展性。理解其核心原理和部署策略,对于构建高性能、高可用的分布式系统至关重要。通过深入学习RabbitMQ的...
3. **智能路由与网络拓扑感知**:ZeroMQ套接字能够自动处理路由和网络拓扑变化,这意味着单个ZeroMQ套接字可以绑定到多个端口并监听它们的入站请求消息。同样,也可以通过简单的API调用来向多个套接字发送数据。 4....
- **云盘规格**:定义了云盘的大小和性能等级。 2. **硬件设施**:涉及物理服务器、存储设备等硬件资源。 - **区域**:指地理上不同的数据中心位置。 - **集群**:一组逻辑上相关的物理服务器集合。 - **物理机*...
- 高可靠性的消息传递和路由。 文档《Spring AMQP Reference》提供了使用Spring AMQP进行消息队列操作的全面指导,覆盖了从基本消息交互到高级配置和最佳实践的各个方面,是Spring应用开发者在集成AMQP消息服务时...
在IT行业中,Go语言因其高性能、并发能力强以及简洁的语法特性,被广泛应用于微服务架构的设计和实现...开发者可以利用Go语言的强大性能和微服务架构的灵活性,构建出能够承受高并发、支持复杂业务逻辑的大型在线游戏。
它支持虚拟网络、端口、路由和安全组,使得用户可以自定义其虚拟网络拓扑,确保网络隔离和安全。 在实战教程中,你将学习到以下步骤: 1. **环境准备**:确保你有一个合适的基础环境,通常包括Linux操作系统(如...
- **可伸缩性**:0MQ可以轻松地扩展到数千个节点,支持多种网络拓扑结构。 - **容错性**:支持断线重连、消息持久化和故障恢复机制。 - **多语言支持**:0MQ提供了多种编程语言的绑定,包括C、C++、Python、Java...
- 分布式系统需要处理各种异常情况,并保持良好的扩展性。 **一致性:** - 在分布式系统中保证数据的一致性是非常重要的挑战之一。 **MVCC和CAP理论:** - **MVCC**(多版本并发控制):允许多个版本的数据存在,...
虽然微服务架构带来了很多好处,如提高开发效率、增强系统的弹性和可扩展性,但也存在一定的挑战,比如增加了运维复杂度、网络延迟等问题。因此,在决定是否采用微服务架构时,需要权衡利弊,并结合自身项目的实际...
这意味着你可以利用AMQP的特性,如事务、持久化、发布/确认以及多级交换机等,来增强你的RPC服务的健壮性和可靠性。此外,由于AMQP的广泛支持,amqprpc也使得你的Go服务可以与使用其他语言(如Python的RabbitMQ...
它允许服务之间通过发布和订阅模式进行通信,而无需直接相互依赖,从而提高系统的可扩展性和可靠性。"micro-broker-service" 可能采用了诸如 RabbitMQ、NATS 或 Apache Kafka 这样的消息队列/代理技术,但具体实现取...
【meshr:为 HFWF 启用...总的来说,meshr项目利用Java的强大力量,构建了一个支持HFWF的网状网络解决方案,涵盖了网络通信、分布式系统、安全性和可扩展性等多个核心领域,为复杂网络环境提供了高效且可靠的通信能力。