一、介绍
1、异步消息
异步消息是一个非常普通并且广泛使用的技术,例如Skype。这些服务都有如下特征:
- 他们会在传输消息的时候或多或少加入一些随意的内容和一些比较正式的路由信息;
- 他们都是异步的,也是就是说他们将生产者和消费者区分开来,因此可能将消息加入队列(例如某人发送给你一条消息,但是你不在线或者你的邮箱会受到一封Email)。
- 生产者和消费者是具有不同知识的不同角色。
2、AMQP
AMQP是一个异步消息传递所使用的应用层协议规范,是一个抽象的协议。AMQP当中有四个概念非常重要:虚拟主机(virtual host)、交换机(exchange)、队列(queue)和绑定(binding)。一个虚拟主机持有一组交换机、队列和绑定。RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。
队列(queue)是你的消息的终点,可以理解成消息的容器。消息就一直在里面,直到有客户端连接到这个队列并且将其取走为止。
交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键的属性,就是一个简单的字符串。交换机当中有一系列的绑定,即路由规则。每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机用5个核,另外3个核留下来做消息处理。
一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。交换机的类型:
- Fanout Exchange--不处理路由键。你只需要简单的将队列绑定到交换机上。 一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得一份复制的消息。Fanout交换机转发消息是最快的。
- Direct Exchange--处理路由器。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键“dog",则只有被标记为dog的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
- Topic Exchange--将路由键和某模式进行匹配。此时队列需要绑定一个模式上。符号#匹配一个或多个词,符号*匹配不多不少一个词。因此audit.#能够匹配到audit.irs.corporate,但是audiit.*只能匹配到audit.irs。
RabbitMQ是一个Erlang编写的AMQP服务器。他的核心原理非常简单:接收和发送消息。你可以把它想象成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,他处理的不是纸,而是接收、存储和发送二进制的数据--消息。
3、RabbitMQ特点
支持持久化:如果RabbitMQ死掉了,消息并不会丢失,当队列重启,一切都会恢复。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,大多数用户都会选择持久化。消息队列持久化包括3个部分:
- exchange持久化,在声明时指定durable=>1
- queue持久化,在声明时指定durable=>1
- 消息持久化,在投递时指定delivery_mode=>2(1是非持久化)
如果exchange和queue都是持久化的,那么他们之间的binding也是持久化的。
如果exchange和queue两者间有一个持久化,一个非持久化,就不允许建立绑定。
RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放到磁盘。不过,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。
良好的设计架构可以如下:在一个集群里,有3台以上机器,其中1台使用磁盘模式,其他使用内存模式。其他几台为内存模式的节点,无疑速度更快,因此客户端连接访问它们。而磁盘模式的 节点,由于磁盘IO相对较慢,因此仅作为数据备份使用。
二、原理
一个用作发送消息,另一个接受消息并打印消息内容
其中:p为生产者;hello表示队列名称;c为消费者;首先要做的事情就是建立一个到RabbitMQ服务器的连接,在发送消息之前我们要确认队列是存在的,如果我们把消息发送到一个不存在的队列,RabbitMQ会丢弃这条消息。
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
三、RabbitMQ安装
RabbitMQ使用的是AMQP协议。要使用它就必须需要一个使用同样协议的库。几乎所有的编程语言都有可选择的库。python也一样,可以从以下几个库中选择:py-amqplib、txAMQP、pika。
下载RabbitMQ:http://www.rabbitmq.com/releases/rabbitmq-server/v3.2.1/rabbitmq-server-3.2.1-1. noarch.rpm
安装:rpm -ivh rabbitmq-server-3.2.1-1.noarch.rpm
四、结构图
Exchange:消息交换机,它指定消息按什么规则,路由到那个队列。
Queue:消息队列载体,每个消息都会被插入到一个或多个队列。
Channel:消息通道,在客户端的每个链接里,可建立多个Channel,每个Channel代表一个会话任务。
routing key:路由关键字,Exchange根据这个关键字进行消息投递。
消息队列的使用过程大概如下:
- 客户端连接到消息队列服务器,打开一个Channel;
- 客户端声明一个Exchange,并设置相关属性;
- 客户端声明一个queue,并设置相关属性;
- 客户端使用routing key,在Exchange和queue之间建立绑定关系。
- 客户端投递消息到Exchange。
五、具体应用
1、服务器段设置
创建用户myuser和密码mypassword: $ rabbitmqctl add_user myuser mypassword 创建虚拟主机名myvhost: $ rabbitmqctl add_vhost myvhost 设置权限: $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" 启动: ./rabbitmq-server 停止: ./rabbitmqctl stop
2、客户端部分代码
from amqplib import client_0_8 as amqp conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="guest", virtual_host="/", insist=False) chan = conn.channel()
AMQP支持在一个TCP连接上启用多个MQ通讯Channel,每个channel都可以被应用作为通讯流。每个AMQP程序都至少有一个连接和一个channel。
每个channel都被分配一个整数标示,自动由connection()类的.channel()方法维护。或者你可以使用.channel(x)来指定channel标示,其中x是你想要使用的channel标示。通常情况下,推荐使用.channel()方法来自从分配channel标示,以便防止冲突。
前面已经有了一个可用的连接和channel。现在代码将分成两个应用,生产者和消费者。先创建一个消费者程序,包含一个叫做po_box的队列和一个叫sorting_room的交换机。
chan.queue_declare(queue="po_box", durable=True,exclusive=False, auto_delete=False) chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,auto_delete=False,)
首先,创建了一个名叫po_box的队列,它是durable的(重启之后会重新建立),并且最后一个消费者断开的时候不会自动删除(auto_delete=false)。在常见durable的队列的时候,将auto_delete设置为FALSE是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durable和auto_delete都设置成TRUE,只有尚有消费者活动的队列可以在rabbitMQ意外崩溃的时候自动恢复。
另外一个标志exclusive。如果设置为TRUE,只有创建这个队列的消费者程序才允许连接该队列。这种队列对于这个消费者程序是私有的。
还有另外一个交换机声明,创建了一个名字叫”sorting_room“的交换机。auto_delete和durable的含义和队列是一样的。但是.excange_declare()还有另外一个参数type,用来指定要创建的交换机的类型:fanout、direct和topic。
到此为止,已经有了一个可以接受消息的队列和一个可以发送消息的交换机。不过需要创建一个绑定,把他们连接起来:
chan.queue_bind(queue=”po_box”, exchange=”sorting_room”, routing_key=”jason”)
这个绑定的过程非常直接。任何送到交换机“sorting_room”的具有路由键“jason” 的消息都被路由到名为“po_box” 的队列。
现在有两种方法从队列当中取出消息。第一个是调用chan.basic_get(),主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body 会在没有消息的时候崩掉):
msg = chan.basic_get("po_box") print msg.body chan.basic_ack(msg.delivery_tag)
但是如果你想要应用程序在消息到达的时候立即得到通知怎么办?这种情况下不能使用chan.basic_get(),你需要用chan.basic_consume()注册一个新消息到达的回调:
def recv_callback(msg): print 'Received: ' + msg.body chan.basic_consume(queue='po_box', no_ack=True, callback=recv_callback, consumer_tag="testtag") while True: chan.wait() chan.basic_cancel("testtag")
chan.wait() 放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。chan.basic_cancel() 用来注销该回调函数。参数consumer_tag 当中指定的字符串和chan.basic_consume() 注册的一致。在这个例子当中chan.basic_cancel() 不会被调用到,因为上面是个无限循环…… 不过你需要知道这个调用,所以我把它放在了代码里。
需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get()和chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack 参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack()方法,使用消息的delivery_tag属性作为参数。
下面的代码示例表明如何将一个简单消息发送到交换区“sorting_room”,并且标记为路由键“jason” :
msg = amqp.Message("Test message!") msg.properties["delivery_mode"] = 2 chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
你也许注意到我们设置消息的delivery_mode属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。
剩下的最后一件事情(生产者和消费者都需要调用的)是关闭channel和连接:
chan.close() conn.close()
结果:
生产者: python amqp_publisher.py thank python amqp_publisher.py think1 消费者: Received:thank from channel #1 Received:thank1 from channel #1
六、队列模式
1、一个队列对应一个消费者:
2、一个队列对应多个消费者
3、一个交换队列对应两个队列,注意要先建立交换机和队列的绑定,才可以发送消息:
4、一个交换机和一个队列进行绑定,交换机类型为direct
5、一个交换机和一个队列进行多个绑定,交换机类型为topic
6、远程程序调用
相关推荐
RabbitMQ技术详解 RabbitMQ是一个开源的消息队列系统,其主要功能是作为消息中间件,用于在分布式系统中存储和转发消息。它基于AMQP(Advanced Message Queuing Protocol),这是一种开放标准,专为面向消息的...
四、rabbitmq-c API详解 rabbitmq-c库提供了丰富的API接口,包括连接管理、通道操作、队列、交换机、绑定等。例如: 1. **连接管理**:`rmq_connection_init()`初始化连接,`rmq_connection_destroy()`关闭连接。 ...
### AMQP与RabbitMQ详解 #### 一、AMQP概览 AMQP,全称为Advanced Message Queuing Protocol,即高级消息队列协议,是一种开放的应用层标准协议,专为面向消息的中间件设计。AMQP的核心优势在于其标准化的设计,...
RabbitMQ面试题及答案详解 RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的开源消息代理软件,也是面向消息的中间件。它被广泛应用于分布式系统中,用于实现系统之间的通信和数据交换。 ...
RabbitMQ 面试题及答案详解 RabbitMQ 是一种基于消息队列的中间件产品,通过异步处理、应用解耦、流量削锋和日志处理等方式,提高系统吞吐量和可靠性。下面是 RabbitMQ 面试题及答案的详解: 一、什么是 MQ? MQ ...
RabbitMQ 面试题及答案详解 RabbitMQ 是一种基于 AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列技术。其最大特点是消费并不需要确保提供方存在,实现了服务之间的高度解耦。 为什么要使用...
RabbitMQ 延迟队列及消息延迟推送实现详解 RabbitMQ 延迟队列及消息延迟推送实现详解是指在 RabbitMQ 中实现消息延迟推送的功能,即在指定的时间后推送消息到目标队列中。这种机制在实际应用中非常有价值,如在淘宝...
rabbitmq 3.9.3 配置文件
#### 二、生产者确认机制详解 RabbitMQ 提供了两种确认机制,分别为`publisher-confirm`和`publisher-return`。 ##### 1.1.1 修改配置 为了启用生产者确认机制,需要在`application.yml`配置文件中添加以下内容: ...
RabbitMQ 消息中间件示例详解 RabbitMQ 消息中间件是使用 Erlang 语言开发的高级消息队列协议(Advanced Message Queuing Protocol,AMQP),具有高可靠性和一致性。它可以胜任订单处理、秒杀等一致性要求较高的...
【rabbit安装包】详解 RabbitMQ是一种广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,适用于多种编程语言,如Java、Python、Ruby等。在本教程中,我们将重点讲解如何...
RabbitMQ 消息持久化与 Spring AMQP 实现详解 RabbitMQ 的消息持久化是指在 RabbitMQ Server 中保留消息的机制,以便在 Server 崩溃或重启后可以恢复消息。消息持久化是通过在交换器、队列和消息三个方面实现的。 ...
**RabbitMQ 默认配置文件模板详解** RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于解耦应用程序,提高系统的可扩展性和容错性。在...
**RabbitMQ详解** RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的消息传递。它允许应用程序之间进行异步通信,提高了系统的可扩展性和容错...
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
RabbitMQ是一种广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,允许应用程序之间进行异步通信。RabbitMQ在分布式系统、微服务架构以及高并发场景中扮演着关键角色,...
**RabbitMQ学习笔记与软件插件详解** RabbitMQ是一种广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,提供可靠的消息传递服务。在分布式系统中,RabbitMQ扮演着数据交换中心的角色,...
RabbitMQ是一款开源的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效地传递消息。在这个“20.消息中间件之RabbitMQ入门讲解”的主题中,我们将深入理解RabbitMQ的核心...
消息队列:RabbitMQ:RabbitMQ工作模式详解.docx