1 什么是RabbitMQ?
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:
单向解耦
双向解耦(如:RPC)
例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成。
使用RabbitMQ server需要:
1. ErLang语言包;
2. RabbitMQ安装包;
RabbitMQ同时提供了java的客户端(一个jar包)。
2 概念和特性
2.1 交换机(exchange):
1. 接收消息,转发消息到绑定的队列。四种类型:direct, topic, headers and fanout
direct:转发消息到routigKey指定的队列
topic:按规则转发消息(最灵活)
headers:(这个还没有接触到)
fanout:转发消息到所有绑定队列
2. 如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。
3. 一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。
4. topic类型交换器通过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配routing key:usd.stcok和eur.stock.db,但是不匹配stock.nana。
还有一些其他的交换器类型,如header、failover、system等,现在在当前的RabbitMQ版本中均未实现。
5. 因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。
6. 交换器的属性:
- 持久性:如果启用,交换器将会在server重启前都有效。
- 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
- 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。
2.2 队列(queue):
1. 队列是RabbitMQ内部对象,存储消息。相同属性的queue可以重复定义。
2. 临时队列。channel.queueDeclare(),有时不需要指定队列的名字,并希望断开连接时删除队列。
3. 队列的属性:
- 持久性:如果启用,队列将会在server重启前都有效。
- 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。
- 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
- 排他性:如果启用,队列只能被声明它的消费者使用。
这些性质可以用来创建例如排他和自删除的transient或者私有队列。这种队列将会在所有链接到它的客户端断开连接之后被自动删除掉。它们只是短暂地连接到server,但是可以用于实现例如RPC或者在AMQ上的对等通信。4. RPC的使用是这样的:RPC客户端声明一个回复队列,唯一命名(例如用UUID),并且是自删除和排他的。然后它发送请求给一些交换器,在消息的reply-to字段中包含了之前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to作为routing key(默认绑定器会绑定所有的队列到默认交换器,名称为“amp.交换器类型名”)发送到默认交换器。注意这仅仅是惯例而已,可以根据和RPC服务器的约定,它可以解释消息的任何属性(甚至数据体)来决定回复给谁。
2.3 消息传递:
1. 消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,可以动态的增加消费者以提高消息的处理能力。
2. 为了实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。
channel.basic_qos(prefetch_count=1)
注意:要防止如果所有的消费者都在处理中,则队列中的消息会累积的情况。
3. 消息有14个属性,最常用的几种:
deliveryMode:持久化属性
contentType:编码
replyTo:指定一个回调队列
correlationId:消息id
实例代码:
4. 消息生产者可以选择是否在消息被发送到交换器并且还未投递到队列(没有绑定器存在)和/或没有消费者能够立即处理的时候得到通知。通过设置消息的mandatory和/或immediate属性为真,这些投递保障机制的能力得到了强化。
5. 此外,一个生产者可以设置消息的persistent属性为真。这样一来,server将会尝试将这些消息存储在一个稳定的位置,直到server崩溃。当然,这些消息肯定不会被投递到非持久的队列中。
2.4 高可用性(HA):
1. 消息ACK,通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
channel.basicConsume(queuename, noAck=false, consumer);
2. 消息和队列的持久化。定义队列时可以指定队列的持久化属性(问:持久化队列如何删除?)
channel.queueDeclare(queuename, durable=true, false, false, null);
发送消息时可以指定消息持久化属性:
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
这样,即使RabbitMQ服务器重启,也不会丢失队列和消息。
3. publisher confirms
4. master/slave机制,配合Mirrored Queue,这种情况下,publisher会正常发送消息和接收消息的confirm,但对于subscriber来说,需要接收Consumer Cancellation Notifications来得到主节点失败的通知,然后re-consume from the queue,此时要求client有处理重复消息的能力。注意:如果queue在一个新加入的节点上增加了一个slave,此时slave上没有此前queue的信息(目前还没有同步机制)。
(通过命令行或管理插件可以查看哪个slave是同步的:
rabbitmqctl list_queues name slave_pids synchronised_slave_pids)
当一个slave重新加入mirrored-queue时,如果queue是durable的,则会被清空。
2.5 集群(cluster):
1. 不支持跨网段(如需支持,需要shovel或federation插件)
2. 可以随意的动态增加或减少、启动或停止节点,允许节点故障
3. 集群分为RAM节点和DISK节点,一个集群最好至少有一个DISK节点保存集群的状态。
4. 集群的配置可以通过命令行,也可以通过配置文件,命令行优先。
3 使用
3.1 简易使用流程
3.2 RabbitMQ在OpenStack中的使用
在Openstack中,组件之间对RabbitMQ使用基本都是“Remote Procedure Calls”的方式。每一个Nova服务(比如计算服务、存储服务等)初始化时会创建两个队列,一个名为“NODE-TYPE.NODE-ID”,另一个名为“NODE-TYPE”,NODE-TYPE是指服务的类型,NODE-ID指节点名称。
从抽象层面上讲,RabbitMQ的组件的使用类似于下图所示:
每个服务会绑定两个队列到同一个topic类型的exchange,从不同的队列中接收不同类型的消息。消息的发送者如果关心消息的返回值,则会监听另一个队列,该队列绑定在一个direct类型的exchange。接受者收到消息并处理后,会将消息的返回发送到此exchange。
在Openstack中,如果不关心消息返回,消息的流程图如下:
如果关心消息返回值,流程图如下:
3.3 为什么要使用RabbitMQ?
曾经有过一个人做过一个测试(http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html),发送1百万个并发消息,对性能有很高的需求,于是作者对比了RabbitMQ、MSMQ、ActiveMQ、ZeroMQueue,整个过程共产生1百万条1K的消息。测试的执行是在一个Windows Vista上进行的,测试结果如下:
虽然ZeroMQ性能较高,但这个产品不提供消息持久化,需要自己实现审计和数据恢复,因此在易用性和HA上不是令人满意,通过测试结果可以看到,RabbitMQ的性能确实不错。
我在本机也做了一些测试,但我的测试是基于组件的原生配置,没有做任何的配置优化,因此总觉的不靠谱。我只测试了RabbitMQ和ActiveMQ两款产品,虽然网上都说ActiveMQ性能不如前者,但平心而论,ActiveMQ提供了很多配置,存在很大的调优空间,也许修改一个配置参数就会使组件的性能有一个质的飞跃。
- 大小: 14.9 KB
- 大小: 15.3 KB
- 大小: 17.7 KB
- 大小: 2.5 KB
- 大小: 24.6 KB
- 大小: 38.1 KB
- 大小: 43.9 KB
- 大小: 29.9 KB
- 大小: 40 KB
- 大小: 43.1 KB
分享到:
相关推荐
【ARM架构】 RabbitMQ_3.8.3版本 Docker镜像
x86架构,rabbitmq镜像
ARM64架构,银河麒麟linux系统,RabbitMQ的所有离线安装包
arm架构的rabbitmq没有可用的版本,只能安装适应arm架构的版本的rabbitmq
在ARM64架构的设备上安装RabbitMQ时,由于ARM架构的特殊性,可能需要特定版本的依赖包以确保软件的正常运行。在本案例中,我们关注的是离线安装依赖包`unixodbc_2.3.1-4.1-arm64.deb`,这是一个用于连接ODBC(Open ...
在ARM64架构的设备上安装RabbitMQ时,可能会遇到一些挑战,因为不是所有的软件包都有针对这种架构的预编译版本。本话题将详细介绍如何在ARM64架构的Linux系统上离线安装RabbitMQ及其依赖包`odbcinst1debian2_2.3.1-...
这个场景中提到的是在基于ARM64架构的Linux系统上安装RabbitMQ,一个广泛应用的消息队列服务。首先,我们需要理解几个核心概念: 1. **ARM64架构**:也称为AArch64,是ARM公司的64位指令集架构,广泛应用于移动设备...
银河麒麟server v10 sp3版本amd架构rabbitmq相关依赖和安装rpm包
本毕业设计项目为基于微服务架构的影院售票系统后端源码,共包含328个文件,涉及81个GIF动画、70个JavaScript文件、62个Python脚本、35个...系统后端采用nameko、flask和RabbitMQ进行开发,前端则基于layui框架构建。
rabbitmq docker镜像压缩包
RabbitMQ 双活架构设计 RabbitMQ 是一种流行的消息队列服务,它可以实现业务之间的解耦,提高系统的可靠性和可扩展性。在本文中,我们将介绍 RabbitMQ 双活架构设计的实现,包括消息 Publish 可靠性、消息 Consume ...
10. **实战应用**:本书可能涵盖了RabbitMQ在微服务架构、日志收集、任务调度、事件驱动系统等多个场景下的应用实例,帮助读者理解如何在实际项目中有效利用RabbitMQ。 通过深入学习和实践《RabbitMQ实战:高效部署...
RabbitMQ镜像文件
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,旨在帮助读者理解和掌握如何在实际项目中...通过学习本书,开发者不仅能提升自身的技术能力,还能为所在项目的架构设计带来更多的可能性。
1. **微服务通信**: 在微服务架构中,RabbitMQ可作为服务间的通信桥梁,实现服务解耦和异步处理。 2. **批量处理**: 处理大量数据时,可以将任务放入队列,由后台服务按需处理。 3. **日志收集**: 日志生产者发送...
开发者需要熟悉 Kettle 的 API 和插件架构,以便创建能够与 RabbitMQ 通信的组件。 4. **RabbitMQ 配置**:设置 Kettle 与 RabbitMQ 的连接,包括创建连接配置,如主机名、端口、用户名和密码等。 5. **发送数据到...
4. **理解RabbitMQ-c架构** - **AMQP协议**: RabbitMQ-c实现了AMQP协议的客户端部分,包括连接、频道、交换机、队列、消息等概念的API接口。 - **网络通信**: 库使用OpenSSL库提供安全的TCP连接,支持TLS/SSL加密...
1. **RabbitMQ架构** RabbitMQ采用Erlang语言开发,Erlang以其并发性和容错性而闻名。RabbitMQ服务器由多个节点组成,这些节点可以是集群的一部分,以提供高可用性和负载均衡。 2. **AMQP协议** AMQP是一种二进制...
标题中的“rabbitmq3.8.8.zip”表明这是一个关于RabbitMQ的软件包,版本为3.8.8。RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中处理消息...
总的来说,RabbitMQ_Windows版为开发者提供了一个强大且灵活的消息传递解决方案,无论是在微服务架构还是传统系统中,都能发挥其作用,提高应用的可扩展性和容错性。通过熟练掌握RabbitMQ的原理和使用,我们可以构建...