- 浏览: 188734 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
ZZX19880809:
没看到有[x] Received 'hello word!0' ...
rabbitmq学习3:Publish/Subscribe -
ZZX19880809:
根本就没有. 应该输入first message.
rabbitmq学习2:Work Queues -
jiaofuyou:
独孤日日也 写道我亲自试验了并没有实现啊,第一个worker输 ...
rabbitmq学习2:Work Queues -
独孤日日也:
我亲自试验了并没有实现啊,第一个worker输出: Wait ...
rabbitmq学习2:Work Queues -
jiaofuyou:
想问个问题,象这种任务分发的工作队列,你举的例子是一个队列被多 ...
rabbitmq学习2:Work Queues
http://www.infoq.com/cn/articles/AMQP-RabbitMQ
准备开始
高级消息队列协议(AMQP1)是一个异步消息传递所使用的应用层协议规范。作为线路层协议,而不是API(例如JMS2),AMQP客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器3和客户端可以投入使用4。
相关厂商内容
IBM 360°讲师团招募:每个爱技术乐分享的人都有机会
AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件(MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。
本文中区别发布/订阅是为了将生产者和消费者拆分开来:生产者无需知道消费者按照什么标准接受消息。队列是一个先入先出的数据结构。路由封装了消息队列中的消息的相关信息,这些信息决定了消息在异步消息系统中的最终展现形式。
在这里,我尝试解释一下这个模型的一些概念,Aman Gupta使用Ruby5实现了AMQP模型6。它使用的是一种事件驱动架构(基于EventMachine7),在阅读和使用的时候都会让人觉得有些不太熟悉。但是API的设计表明了在AMQ模型实体之间通信的方式是非常简单的,因此,即便开发者对Ruby并不熟悉,他同样也可以得到收获。
应该注意到,至少有三个或者更多的Ruby客户端8, 9, 10可供选择。其中的一个客户端Carrot很明显使用了非事件驱动的同步Ruby架构,因此,这个客户端在使用事件驱动的Ruby API的时候,风格非常简洁。
本文中的AMQP服务器是使用Erlang11编写的RabbitMQ。它实现了AMQP规范0-8版的内容,并且将在近期实现0-9-1版的内容12。
在开始之前再交代一些东西:异步消息是一个非常普通并且广泛使用的技术,从例如Skype或者XMPP/Jabber这样各种各样的即时消息协议到古老的email。但是,这些服务都有如下特征:
- 它们会在传输消息的时候或多或少加入一些随意的内容(例如一封email可能会包含一个文本和关于办公室笑话的PPT)和一些比较正式的路由信息(例如email地址)。
- 它们都是异步的,也就是说它们将生产者和消费者区分开来,因此可能将消息加入队列(例如某人发给你一条消息,但是你不在线或者你的邮箱会收到一封email)。
- 生产者和消费者是具有不同知识的不同角色。我不需要知道你的IMAP用户名和密码就能够给你发送email。事实上,我甚至不需要知道你的email地址是否是一个马甲或者“真实”地址。这个特性意味着生产者不能控制什么内容被阅读或者订阅了 - 就像我的email客户端会舍弃掉大多数主动发送给我的医药广告。
AMQP是一个抽象的协议(也就是说它不负责处理具体的数据),这个事实并不会将事情变得更复杂。反而,Internet使得消息无处不在。人们通常使用它们和异步消息简单灵活地解决很多问题。而且构建AMQ中的异步消息架构模型最困难的地方在于上手的时候,一旦这些困难被克服,那么构建过程将变得简单。
你可能需要安装一些软件来自己动手实现这些例子。如果你已经在系统上安装了Ruby,那么只需要不到十分钟的设置时间。RabbitMQ网站也有许多信息13帮助你尽快开始。你只需做这些准备工作:
- Erlang/OTP包。下载地址是 http://erlang.org/download.html,安装说明在 http://www.erlang.org/doc/installation_guide/part_frame.html 。
- RabbitMQ。下载地址是 http://www.rabbitmq.com/download.html,安装说明在 http://www.rabbitmq.com/install.html。
- 一个Ruby虚拟机。如果在你的系统平台上没有可供选择的Ruby解释器,你可能需要下载Ruby MRI VM。在 http://www.ruby-lang.org/en/downloads/可以找到下载地址和安装说明。
- 两个Ruby “gem”(已打包的库)。gem工具应该会随着你的Ruby安装包一起分发。
- 如果你需要全新安装或者不确定它是不是当前版本,那么你可以选择升级gem工具。输入gem update --system。在BSD/UNIX系统中,你可能需要有超级用户的权限才能运行此命令(以及后续指令)。
- 告诉gem在GitHub搜索包:gem sources -a http://gems.github.com。
- 安装AMQPgem:gem install tmm1-amqp。这也会安装event-machine gem。
现在你需要做的就是启动RabbitMQ服务器14。
AMQ模型
在AMQ规范中描述了一些实体。一个用来分辨这些实体的方法是检查它们是否由服务器管理员配置或者由客户端在运行的时候声明。
可配置的实体有:
- 消息协商器(Message Broker),它在TCP/IP等端口监听AMQ消息。
- 将消息协商数据划分到多个不同集合的虚拟主机,它很像webserver中的虚拟主机,例如Apache的http守护进程。
- 使用安全凭据连接到虚拟主机的用户。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 # connect to the rabbitmq demonstration broker server (http://www.rabbitmq.com/examples.html#demoserver) 13 14 AMQP.start :host => 'dev.rabbitmq.com', :port => 5672, :user => 'guest', :password => 'guest', :vhost => 'localhost' 15 16 event_loop.join
值得注意的是,规范中仅仅授予用户访问虚拟主机的权限,并没有采纳其他比这高级的访问控制措施,因此RabbitMQ并不支持这些高级访问控制措施。一个由厂商开发的解决方法15期望会加入到下个主要版本中。但是,这个功能16可以通过使用Mercurial代码库的默认branch17来实现,而且已经有一些RabbitMQ用户在使用了。
为了和协商器交流,一个客户端需要建立一个或者多个连接。这些连接只是限于连接用户和虚拟主机。客户端默认使用guest/guest访问权限和访问虚拟主机的根目录,这些默认实现也是RabbitMQ的默认安装选项。
在一个连接中,客户端声明了一个通道。这个通道是消息协商器的网络连接中的一个逻辑连接。这种多工机制是必要的,因为协议中的某些操作是需要这样的通道。因此,通过单一连接到协商器的并发控制需要建立一个可靠的模型,这里可以使用通道池和串行访问或者例如线程本地通道这样的线程并发模型。在这个例子中,Ruby API对用户隐藏了通道管理这样的细节。
如果需要在一个通道上进行操作,那么客户端需要声明AMQ组件。声明组件是断言特定的组件存在于协商器中──如果不存在的话,那么在运行时创建。
这些组件包括:
- 交换器(Exchange),它是发送消息的实体。
- 队列(Queue),这是接收消息的实体。
- 绑定器(Bind),将交换器和队列连接起来,并且封装消息的路由信息。
所有这些组件的属性各不相同,但是只有交换器和队列同样被命名。客户端可以通过交换器的名字来发送消息,也可以通过队列的名字收取信息。因为AMQ协议没有一个通用的标准方法来获得所有组件的名称,所以客户端对队列和交换器的访问被限制在仅能使用熟知的或者只有自己知道的名字(参见18了解这种访问控制的信息)。
绑定器没有名字,它们的生命期依赖于所紧密连接的交换器和队列。如果这两者任意一个被删除掉,那么绑定器便失效了。这就说明,若要知道交换器和队列的名字,还需要设置消息路由。
消息是一个不透明的数据包,这些包有如下性质:
- 元数据,例如内容的编码或者表明来源的字段。
- 标志位,标记消息投递时候的一些保障机制。
- 一个特殊的字段叫做routing key。
2.1 接受和发送消息:交换器类型
发送消息是一个非常简单的过程。客户端声明一个它想要发送消息的目的交换器,然后将消息传递给交换器。
接受消息的最简单办法是设置一个订阅。客户端需要声明一个队列,并且使用一个绑定器将之前的交换器和队列绑定起来,这样的话,订阅就设置完毕。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue 13 14 exchange = MQ.fanout('my-fanout-exchange') 15 queue = MQ.queue('my-fanout-queue') 16 17 queue.bind(exchange).subscribe do |header, body| 18 yield header, body 19 end 20 21 end 22 23 def send_to_exchange(message) 24 25 exchange = MQ.fanout('my-fanout-exchange') 26 exchange.publish message 27 28 end 29 30 subscribe_to_queue do |header, body| 31 p "I received a message: #{body}" 32 end 33 34 send_to_exchange 'Hello' 35 send_to_exchange 'World' 36 37 event_loop.join
三个标准决定了一条消息是否真的被投递到了队列中:
交换器的类型。在这个例子中类型是fanout。
消息的属性。在这个例子中,消息没有任何属性,只是有内容(首先是Hello,然后World)。
给定的绑定器的唯一可选属性:键值。在这个例子中绑定器没有任何键值。
交换器的类型决定了它如何解释这个连接。我们的例子中,fanout交换器不会解释任何东西:它只是将消息投递到所有绑定到它的队列中。
没有绑定器,哪怕是最简单的消息,交换器也不能将其投递到队列中,只能抛弃它。通过订阅一个队列,消费者能够从队列中获取消息,然后在使用过后将其从队列中删除。
下列交换器类型都在规范中被提及。随后我会由浅入深地介绍它们。
- direct交换器将消息根据其routing-key属性投递到包含对应key属性的绑定器上。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue(key) 13 14 exchange = MQ.direct('my-direct-exchange') 15 queue = MQ.queue('my-direct-queue') 16 17 queue.bind(exchange, :key => key).subscribe do |header, body| 18 yield header, body 19 end 20 21 end 22 23 def send_to_exchange(message, key) 24 25 exchange = MQ.direct('my-direct-exchange') 26 exchange.publish message, :routing_key => key 27 28 end 29 30 subscribe_to_queue('hello_world') do |header, body| 31 p "I received a message: #{body}" 32 end 33 34 send_to_exchange 'Hello', 'hello_world' 35 send_to_exchange 'Cruel', 'ignored' 36 send_to_exchange 'World', 'hello_world' 37 38 event_loop.join
- topic交换器用过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key *.stock.#匹配routing key usd.stcok和eur.stock.db,但是不匹配stock.nasdaq。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue(key) 13 14 exchange = MQ.topic('my-topic-exchange') 15 queue = MQ.queue('my-topic-queue') 16 17 queue.bind(exchange, :key => key).subscribe do |header, body| 18 yield header, body 19 end 20 21 end 22 23 def send_to_exchange(message, key) 24 25 exchange = MQ.topic('my-topic-exchange') 26 exchange.publish message, :routing_key => key 27 28 end 29 30 subscribe_to_queue('hello.*.message.#') do |header, body| 31 p ”I received a message: #{body}” 32 end 33 34 send_to_exchange 'Hello', 'hello.world.message.example.in.ruby' 35 send_to_exchange 'Cruel', 'cruel.world.message' 36 send_to_exchange 'World', 'hello.world.message' 37 38 event_loop.join
- 在规范中还有其他的交换器被提及,例如header交换器(它根据应用程序消息的特定属性进行匹配,这些消息可能在binding key中标记为可选或者必选),failover和system交换器。但是这些交换器现在在当前RabbitMQ版本中均未实现。
不同于队列的是,交换器有相应的类型,表明它们的投递方式(通常是在和绑定器协作的时候)。因为交换器是命名实体,所以声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。
交换器也有一些性质:
- 持久性:如果启用,交换器将会在协商器重启前都有效。
- 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
- 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。
2.2 默认交换器和绑定器
AMQP协商器都会对其支持的每种交换器类型(为每一个虚拟主机)声明一个实例。这些交换器的命名规则是amq.前缀加上类型名。例如 amq.fanout。空的交换器名称等于amq.direct。对这个默认的direct交换器(也仅仅是对这个交换器),协商器将会声明一个绑定了系统中所有队列的绑定器。
这个特点告诉我们,在系统中,任意队列都可以和默认的direct交换器绑定在一起,只要其routing-key等于队列名字。
2.3 队列属性和多绑定器
默认绑定器的行为揭示了多绑定器的存在 - 将一个或者多个队列和一个或者多个交换器绑定起来。这使得可以将发送到不同交换器的具有不同routing key(或者其他属性)的消息发送到同一个队列中。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue(*keys) 13 14 exchange = MQ.direct('my-direct-exchange') 15 queue = MQ.queue('my-direct-queue-with-multiple-bindings') 16 17 bindings = keys.map do |key| 18 queue.bind(exchange, :key => key) 19 end 20 21 bindings.last.subscribe do |header, body| 22 yield header, body 23 end 24 25 end 26 27 def send_to_exchange(message, key) 28 29 exchange = MQ.direct('my-direct-exchange') 30 exchange.publish message, :routing_key => key 31 32 end 33 34 subscribe_to_queue('foo', 'bar', 'wee') do |header, body| 35 p "I received a message: #{body}" 36 end
37 38 send_to_exchange 'Hello', 'foo' 39 send_to_exchange 'You', 'gee' 40 send_to_exchange 'Cruel', 'bar' 41 send_to_exchange 'World', 'wee' 42 43 event_loop.join
虽然不能被命名,但是队列也有以下属性,这些属性和交换器所具有的属性类似。
- 持久性:如果启用,队列将会在协商器重启前都有效。
- 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。
- 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
- 排他性:如果启用,队列只能被声明它的消费者使用。
这些性质可以用来创建例如排他和自删除的transient或者私有队列。这种队列将会在所有链接到它的客户端断开连接之后被自动删除掉 - 它们只是短暂地连接到协商器,但是可以用于实现例如RPC或者在AMQ上的对等通信。
AMQP上的RPC是这样的:RPC客户端声明一个回复队列,唯一命名(例如用UUID19),并且是自删除和排他的。然后它发送请求给一些交换器,在消息的reply-to字段中包含了之前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to作为routing key(之前提到过默认绑定器会绑定所有的队列到默认交换器)发送到默认交换器。注意仅仅是惯例而已。根据和RPC服务器的约定,它可以解释消息的任何属性(甚至数据体)来决定回复给谁。
队列也可以是持久的,可共享,非自动删除以及非排他的。使用同一个队列的多个用户接收到的并不是发送到这个队列的消息的一份拷贝,而是这些用户共享这队列中的一份数据,然后在使用完之后删除掉。
2.4 消息投递的保障机制
消费者会显式或者隐式地通知消息的使用完毕。当隐式地通知的时候,消息被认为在投递之后便被消耗掉。否则客户端需要显式地发送一个验证信息。只有这个验证信息收到之后,消息才会被认为已经收到并且从队列中删除。如果没有收到,那么协商器会在通道20关闭之前尝试着重新投递消息。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue 13 14 exchange = MQ.fanout('my-fanout-exchange-with-acks') 15 queue = MQ.queue('my-fanout-queue-with-acks') 16 17 queue.bind(exchange).subscribe(:ack => true) do |header, body| 18 yield header, body 19 header.ack unless body == 'Cruel' 20 end 21 22 end 23 24 def send_to_exchange(message) 25 26 exchange = MQ.fanout('my-fanout-exchange-with-acks') 27 exchange.publish message 28 29 end 30 31 subscribe_to_queue do |header, body| 32 p "I received a message: #{body}" 33 end 34 35 send_to_exchange 'Hello' 36 send_to_exchange 'Cruel' 37 send_to_exchange 'World' 38 39 event_loop.join 40 41 __END__ 42 43 First run: 44 45 "I received a message: Hello" 46 "I received a message: Cruel" 47 "I received a message: World" 48 49 Second run: 50 51 "I received a message: Cruel" 52 "I received a message: Hello" 53 "I received a message: Cruel" 54 "I received a message: World" 55 56 ... and so forth
消息生产者可以选择是否在消息被发送到交换器并且还未投递到队列(没有绑定器存在)和/或没有消费者能够立即处理的时候得到通知。通过设置消息的mandatory和/或immediate属性为真,这些投递保障机制的能力得到了强化。
现在在本文例子中使用的Ruby AMQP API还不完全支持这些标志位。但是,在GitHub上已经有两个patch21, 22展示了完全支持之后的情况。
此外,一个生产者可以设置消息的persistent属性为真。这样一来,协商器将会尝试将这些消息存储在一个稳定的位置,直到协商器崩溃。当然,这些消息肯定不会被投递到非持久的队列中。
2.5 拥塞控制
在给出的例子中,对消息的使用永远看做是一个订阅。那么考虑到了拥塞控制吗?规范制定了QoS23特性,限制了通过一个通道发送到一个消费者的消息总量。很不幸的是,这个特性在当前RabbitMQ的版本中还不支持(计划在1.6),但是在原则上是应该被AMQP API支持的。
作为一个替代方案,客户端可以选择从队列中取出消息而不是通过订阅。当使用这种方法的时候,拥塞控制可以手动地实现。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(5) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue 13 14 exchange = MQ.fanout('my-fanout-exchange') 15 queue = MQ.queue('my-fanout-queue') 16 17 queue.bind(exchange).pop do |header, body| 18 yield header, body 19 end 20 21 EM.add_periodic_timer(0.25) do 22 queue.pop 23 end 24 25 end 26 27 def send_to_exchange(message) 28 29 exchange = MQ.fanout('my-fanout-exchange') 30 exchange.publish message 31 32 end 33 34 received = 0 35 36 subscribe_to_queue do |header, body| 37 p "I received a message: #{body}" 38 end 39 40 send_to_exchange 'Hello' 41 send_to_exchange 'World' 42 43 event_loop.join
一个模型样例
想像一下你想创建一个普通的聊天应用,那么应该有以下几个基本特性:
- 聊天 - 两个用户应该可以相互发送消息。
- 一个好友系统 - 用户能够控制谁给他发送消息。
我们假设在协商器上有两种消费者:好友服务器和聊天客户端。
3.1 成为好友
为了成为Bob的好友,Alice首先得发送一个消息给fanout交换器iends,我们假设这个交换器是访问受限24的:普通用户不能够将队列绑定到它。在这个消息中,Alice表示想和Bob成为朋友。
在协商器上有大量的聊天服务器,从绑定到friends交换器的一个单一持久队列中持续地取出消息。这个队列的名字是例如friends.298F2DBC6865-4225-8A73-8FF6175D396D这样的,这难以猜测的名字能够阻止聊天客户端直接取出信息 - 记住:不知道队列的名字,就不能设置订阅。
当一个聊天服务器收到Alice的消息(只有一个会得到这个消息,虽然它们都是从同一个队列中获取),决定这个请求是否有效,然后将其(也许是做过一些调整或者参数化)发送到默认交换器(可以是直接的或者持久的)。它使用另外一个只有Bob知道的routing key来投递。当Bob上线的时候(或者一个服务器做了这件事),他会声明一个队列,这个队列的名字就是之前的routing key(记住在虚拟主机上的默认绑定器是将所有的队列和默认交换器绑定在一起)。
Bob的聊天客户端现在询问Bob是否想和Alice成为朋友。在她的请求消息中,有一个特殊的属性叫做reply-to - 这个属性包括了一个持久和排他的好友队列的名字,这个队列是Alice声明将用于和Bob的未来聊天用。如果Bob想和Alice成为朋友,他会使用这个队列的名字作为routing key,发送一个消息到默认交换器。他也会需要声明一个持久和排他的好友队列,将其名字设为reply-to的值。
例如:Alice和Bob的好友队列的名字是B5725C4A-6621463E-AAF1-8222AA3AD601。Bob发送给Alice的消息的routing-key的值便是这个名字,也是Alice发送给Bob的消息中reply-to的值。
因为好友队列是持久的,因此发送到消息在用户离线的时候也不会丢失。当用户上线之后,所有的在好友队列的消息将会发送到用户,然后才去获取新的消息。
当Bob不再想和Alice成为好友,他可以简单地删除掉为Alice声明的好友队列。在她使用mandatory标志位发送消息的时候,Alice也会注意到Bob已经不再想是她的好友。因为交换器会将她的消息认为不可投递而返回。
仍未提及的事情
仍然有很多本文没有介绍的东西,例如事务语义,关于信息的重路由,header交换器的规范以及不同AMQP规范之间的差异 - 尤其是在1.0版本之前的模型改变。为了简介起见,一个聊天的模型同样也被略过了。
这里也没有介绍了整个系统的管理,因为还不清楚AMQP和RabbitMQ将会走向何方。现在有一个课题,关于在保留的amq命名空间中可用的交换器,它能获取协商器所有的日志信息。但是,能够列出现在已经声明的组件和已连接的用户的工具是用rabbitmqctl命令行接口而不是AMQ实体来实现的。
1 require 'rubygems' 2 require 'mq' 3 4 PATH_TO_RABBITMQCTL = '/usr/local/sbin/rabbitmqctl' 5 6 event_loop = Thread.new { EM.run } 7 8 def subscribe_to_logger 9 10 random_name = (0...50).map{ ('a'..'z').to_a[rand(26)] }.join 11 12 exchange = MQ.topic('amq.rabbitmq.log') 13 queue = MQ.queue(random_name, :autodelete => true, :exclusive => true) 14 binding = queue.bind(exchange, :key => '#') 15 16 binding.subscribe do |header, body| 17 body.split("\n").each do |message| 18 yield header, message 19 end 20 end 21 22 end 23 24 def exchange_info(vhost = '/') 25 info :exchange, vhost, %w(name type durable auto_delete arguments) 26 end 27 28 def queue_info(vhost = '/') 29 info :queue, vhost, %w(name durable auto_delete arguments node messages_ready messages_unacknowledged messages_uncommitted messages acks_uncommitted consumers transactions memory) 30 end 31 32 def binding_info(vhost = '/') 33 info :binding, vhost 34 end 35 36 def connection_info 37 info :exchange, nil, %w(node address port peer_address peer_port state channels user vhost timeout frame_max recv_oct recv_cnt send_oct send_cnt send_pend) 38 end 39 40 def info(about, vhost = nil, items = []) 41 42 column_length = 20 43 44 puts "#{about} info\n" 45 46 cmd = "#{PATH_TO_RABBITMQCTL} list_#{about}s" 47 cmd << " -p #{vhost}" if vhost 48 cmd << " #{items.join(' ')} 2>&1" 49 50 pipe = IO.popen(cmd) 51 52 pipe.readlines.map { |line| line.chomp.split("\t").map { |item| item.ljust(column_length)[0, column_length] } }.slice(1..-2).each do |exchange| 53 print exchange.join(' ') + "\n" 54 end 55 56 end 57 58 subscribe_to_logger do |message| 59 p "RabbitMQ logger: #{message}" 60 end 61 62 %w(connection exchange queue binding).each do |method| 63 self.send "#{method}_info".to_sym 64 end 65 66 event_loop.join
必须提及的是,已经有一些使用AMQP(或者RabbitMQ)的分布式架构。这些架构(例如Nanite25或者Lizzy26)在AMQP的顶部引入了一些抽象层,这样简化了一些操作,例如cluster中在Ruby客户端之间工作的分配。
4.1 下一步该做什么?
要想使用本地的协商器来玩玩好友和邮件列表,第一步应该是学习有关AMQP和RabbitMQ的知识。不仅仅应该在RabbitMQ网站上阅读幻灯片和文章,还应该通过在IRC的#rabbitmq通道上和社区成员交流或者阅读关于RabbitMQ和/或AMQP的网志27, 28,例如LShift的博客。在Twitter上也可以通过#rabbitmq或#amqp这两个标签找到很多关于AMQP或者RabbitsMQ的内容29, 30, 31, 32, 33, 34, 35。
欢迎进入异步信息的世界,祝您玩得愉快!
1 http://www.infoq.com/amqp
2 ttp://java.sun.com/products/jms/
3 Advanced Message Queuing Protocol/Implementations
4 http://www.rabbitmq.com/how.html#clients
5 http://github.com/tmm1/amqp/tree/master
6 http://www.infoq.com/ruby/
7 http://rubyeventmachine.com/
8 http://github.com/famoseagle/carrot/tree/master
9 http://github.com/celldee/bunny/tree/master
10 http://qpid.apache.org/download.html
11 Erlang
12 http://www.rabbitmq.com/specification.html
13 http://www.rabbitmq.com/how.html
14 http://www.rabbitmq.com/how.html
15 AccessControlDesign
16 ACLs
17 http://www.rabbitmq.com/mercurial.html#defaultbranch
18 Minimum Air Induction
19 Universally Unique Identifier
20 或者关联到这个通道的连接。
21 somic/amqp
22 yawn/amqp
23 BasicQosDesign
24 作为替代方案,Alice可以可以收到另一个特殊的routing-key来请求加为好友。所有的聊天服务器将会使用聊天系统中所有用户的 binding-key绑定到friends.298F2DBC-6865-4225-8A73-8FF6175D396D。也有其他的可行方案 - 事实上本例子中所有的行为都有多种实现方法。
25 ezmobius / nanite
26 bmizerany / lizzy
27 lists.rabbitmq.com Mailing Lists
28 RabbitMQ
29 RabbitM - Highlights: presentations, blogs and code
30 freenode
31 Minimum Air Induction
32 Kirk's Rants blogspot
33 http://somic.org/category/rabbitmq/ 34 http://www.lshift.net/blog/category/lshift-sw/rabbitmq
35 Twitter
查看英文原文:Getting started with AMQP and RabbitMQ。
准备开始
高级消息队列协议(AMQP1)是一个异步消息传递所使用的应用层协议规范。作为线路层协议,而不是API(例如JMS2),AMQP客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器3和客户端可以投入使用4。
相关厂商内容
IBM 360°讲师团招募:每个爱技术乐分享的人都有机会
AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具。因此,面向消息的中间件(MOM)系统,例如发布/订阅队列,没有作为基本元素实现。反而通过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一部分,形成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如之前提到的发布/订阅,队列,事务以及流数据,并且添加了额外的特性,例如更易于扩展,基于内容的路由。
本文中区别发布/订阅是为了将生产者和消费者拆分开来:生产者无需知道消费者按照什么标准接受消息。队列是一个先入先出的数据结构。路由封装了消息队列中的消息的相关信息,这些信息决定了消息在异步消息系统中的最终展现形式。
在这里,我尝试解释一下这个模型的一些概念,Aman Gupta使用Ruby5实现了AMQP模型6。它使用的是一种事件驱动架构(基于EventMachine7),在阅读和使用的时候都会让人觉得有些不太熟悉。但是API的设计表明了在AMQ模型实体之间通信的方式是非常简单的,因此,即便开发者对Ruby并不熟悉,他同样也可以得到收获。
应该注意到,至少有三个或者更多的Ruby客户端8, 9, 10可供选择。其中的一个客户端Carrot很明显使用了非事件驱动的同步Ruby架构,因此,这个客户端在使用事件驱动的Ruby API的时候,风格非常简洁。
本文中的AMQP服务器是使用Erlang11编写的RabbitMQ。它实现了AMQP规范0-8版的内容,并且将在近期实现0-9-1版的内容12。
在开始之前再交代一些东西:异步消息是一个非常普通并且广泛使用的技术,从例如Skype或者XMPP/Jabber这样各种各样的即时消息协议到古老的email。但是,这些服务都有如下特征:
- 它们会在传输消息的时候或多或少加入一些随意的内容(例如一封email可能会包含一个文本和关于办公室笑话的PPT)和一些比较正式的路由信息(例如email地址)。
- 它们都是异步的,也就是说它们将生产者和消费者区分开来,因此可能将消息加入队列(例如某人发给你一条消息,但是你不在线或者你的邮箱会收到一封email)。
- 生产者和消费者是具有不同知识的不同角色。我不需要知道你的IMAP用户名和密码就能够给你发送email。事实上,我甚至不需要知道你的email地址是否是一个马甲或者“真实”地址。这个特性意味着生产者不能控制什么内容被阅读或者订阅了 - 就像我的email客户端会舍弃掉大多数主动发送给我的医药广告。
AMQP是一个抽象的协议(也就是说它不负责处理具体的数据),这个事实并不会将事情变得更复杂。反而,Internet使得消息无处不在。人们通常使用它们和异步消息简单灵活地解决很多问题。而且构建AMQ中的异步消息架构模型最困难的地方在于上手的时候,一旦这些困难被克服,那么构建过程将变得简单。
你可能需要安装一些软件来自己动手实现这些例子。如果你已经在系统上安装了Ruby,那么只需要不到十分钟的设置时间。RabbitMQ网站也有许多信息13帮助你尽快开始。你只需做这些准备工作:
- Erlang/OTP包。下载地址是 http://erlang.org/download.html,安装说明在 http://www.erlang.org/doc/installation_guide/part_frame.html 。
- RabbitMQ。下载地址是 http://www.rabbitmq.com/download.html,安装说明在 http://www.rabbitmq.com/install.html。
- 一个Ruby虚拟机。如果在你的系统平台上没有可供选择的Ruby解释器,你可能需要下载Ruby MRI VM。在 http://www.ruby-lang.org/en/downloads/可以找到下载地址和安装说明。
- 两个Ruby “gem”(已打包的库)。gem工具应该会随着你的Ruby安装包一起分发。
- 如果你需要全新安装或者不确定它是不是当前版本,那么你可以选择升级gem工具。输入gem update --system。在BSD/UNIX系统中,你可能需要有超级用户的权限才能运行此命令(以及后续指令)。
- 告诉gem在GitHub搜索包:gem sources -a http://gems.github.com。
- 安装AMQPgem:gem install tmm1-amqp。这也会安装event-machine gem。
现在你需要做的就是启动RabbitMQ服务器14。
AMQ模型
在AMQ规范中描述了一些实体。一个用来分辨这些实体的方法是检查它们是否由服务器管理员配置或者由客户端在运行的时候声明。
可配置的实体有:
- 消息协商器(Message Broker),它在TCP/IP等端口监听AMQ消息。
- 将消息协商数据划分到多个不同集合的虚拟主机,它很像webserver中的虚拟主机,例如Apache的http守护进程。
- 使用安全凭据连接到虚拟主机的用户。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 # connect to the rabbitmq demonstration broker server (http://www.rabbitmq.com/examples.html#demoserver) 13 14 AMQP.start :host => 'dev.rabbitmq.com', :port => 5672, :user => 'guest', :password => 'guest', :vhost => 'localhost' 15 16 event_loop.join
值得注意的是,规范中仅仅授予用户访问虚拟主机的权限,并没有采纳其他比这高级的访问控制措施,因此RabbitMQ并不支持这些高级访问控制措施。一个由厂商开发的解决方法15期望会加入到下个主要版本中。但是,这个功能16可以通过使用Mercurial代码库的默认branch17来实现,而且已经有一些RabbitMQ用户在使用了。
为了和协商器交流,一个客户端需要建立一个或者多个连接。这些连接只是限于连接用户和虚拟主机。客户端默认使用guest/guest访问权限和访问虚拟主机的根目录,这些默认实现也是RabbitMQ的默认安装选项。
在一个连接中,客户端声明了一个通道。这个通道是消息协商器的网络连接中的一个逻辑连接。这种多工机制是必要的,因为协议中的某些操作是需要这样的通道。因此,通过单一连接到协商器的并发控制需要建立一个可靠的模型,这里可以使用通道池和串行访问或者例如线程本地通道这样的线程并发模型。在这个例子中,Ruby API对用户隐藏了通道管理这样的细节。
如果需要在一个通道上进行操作,那么客户端需要声明AMQ组件。声明组件是断言特定的组件存在于协商器中──如果不存在的话,那么在运行时创建。
这些组件包括:
- 交换器(Exchange),它是发送消息的实体。
- 队列(Queue),这是接收消息的实体。
- 绑定器(Bind),将交换器和队列连接起来,并且封装消息的路由信息。
所有这些组件的属性各不相同,但是只有交换器和队列同样被命名。客户端可以通过交换器的名字来发送消息,也可以通过队列的名字收取信息。因为AMQ协议没有一个通用的标准方法来获得所有组件的名称,所以客户端对队列和交换器的访问被限制在仅能使用熟知的或者只有自己知道的名字(参见18了解这种访问控制的信息)。
绑定器没有名字,它们的生命期依赖于所紧密连接的交换器和队列。如果这两者任意一个被删除掉,那么绑定器便失效了。这就说明,若要知道交换器和队列的名字,还需要设置消息路由。
消息是一个不透明的数据包,这些包有如下性质:
- 元数据,例如内容的编码或者表明来源的字段。
- 标志位,标记消息投递时候的一些保障机制。
- 一个特殊的字段叫做routing key。
2.1 接受和发送消息:交换器类型
发送消息是一个非常简单的过程。客户端声明一个它想要发送消息的目的交换器,然后将消息传递给交换器。
接受消息的最简单办法是设置一个订阅。客户端需要声明一个队列,并且使用一个绑定器将之前的交换器和队列绑定起来,这样的话,订阅就设置完毕。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue 13 14 exchange = MQ.fanout('my-fanout-exchange') 15 queue = MQ.queue('my-fanout-queue') 16 17 queue.bind(exchange).subscribe do |header, body| 18 yield header, body 19 end 20 21 end 22 23 def send_to_exchange(message) 24 25 exchange = MQ.fanout('my-fanout-exchange') 26 exchange.publish message 27 28 end 29 30 subscribe_to_queue do |header, body| 31 p "I received a message: #{body}" 32 end 33 34 send_to_exchange 'Hello' 35 send_to_exchange 'World' 36 37 event_loop.join
三个标准决定了一条消息是否真的被投递到了队列中:
交换器的类型。在这个例子中类型是fanout。
消息的属性。在这个例子中,消息没有任何属性,只是有内容(首先是Hello,然后World)。
给定的绑定器的唯一可选属性:键值。在这个例子中绑定器没有任何键值。
交换器的类型决定了它如何解释这个连接。我们的例子中,fanout交换器不会解释任何东西:它只是将消息投递到所有绑定到它的队列中。
没有绑定器,哪怕是最简单的消息,交换器也不能将其投递到队列中,只能抛弃它。通过订阅一个队列,消费者能够从队列中获取消息,然后在使用过后将其从队列中删除。
下列交换器类型都在规范中被提及。随后我会由浅入深地介绍它们。
- direct交换器将消息根据其routing-key属性投递到包含对应key属性的绑定器上。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue(key) 13 14 exchange = MQ.direct('my-direct-exchange') 15 queue = MQ.queue('my-direct-queue') 16 17 queue.bind(exchange, :key => key).subscribe do |header, body| 18 yield header, body 19 end 20 21 end 22 23 def send_to_exchange(message, key) 24 25 exchange = MQ.direct('my-direct-exchange') 26 exchange.publish message, :routing_key => key 27 28 end 29 30 subscribe_to_queue('hello_world') do |header, body| 31 p "I received a message: #{body}" 32 end 33 34 send_to_exchange 'Hello', 'hello_world' 35 send_to_exchange 'Cruel', 'ignored' 36 send_to_exchange 'World', 'hello_world' 37 38 event_loop.join
- topic交换器用过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key *.stock.#匹配routing key usd.stcok和eur.stock.db,但是不匹配stock.nasdaq。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue(key) 13 14 exchange = MQ.topic('my-topic-exchange') 15 queue = MQ.queue('my-topic-queue') 16 17 queue.bind(exchange, :key => key).subscribe do |header, body| 18 yield header, body 19 end 20 21 end 22 23 def send_to_exchange(message, key) 24 25 exchange = MQ.topic('my-topic-exchange') 26 exchange.publish message, :routing_key => key 27 28 end 29 30 subscribe_to_queue('hello.*.message.#') do |header, body| 31 p ”I received a message: #{body}” 32 end 33 34 send_to_exchange 'Hello', 'hello.world.message.example.in.ruby' 35 send_to_exchange 'Cruel', 'cruel.world.message' 36 send_to_exchange 'World', 'hello.world.message' 37 38 event_loop.join
- 在规范中还有其他的交换器被提及,例如header交换器(它根据应用程序消息的特定属性进行匹配,这些消息可能在binding key中标记为可选或者必选),failover和system交换器。但是这些交换器现在在当前RabbitMQ版本中均未实现。
不同于队列的是,交换器有相应的类型,表明它们的投递方式(通常是在和绑定器协作的时候)。因为交换器是命名实体,所以声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。
交换器也有一些性质:
- 持久性:如果启用,交换器将会在协商器重启前都有效。
- 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
- 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。
2.2 默认交换器和绑定器
AMQP协商器都会对其支持的每种交换器类型(为每一个虚拟主机)声明一个实例。这些交换器的命名规则是amq.前缀加上类型名。例如 amq.fanout。空的交换器名称等于amq.direct。对这个默认的direct交换器(也仅仅是对这个交换器),协商器将会声明一个绑定了系统中所有队列的绑定器。
这个特点告诉我们,在系统中,任意队列都可以和默认的direct交换器绑定在一起,只要其routing-key等于队列名字。
2.3 队列属性和多绑定器
默认绑定器的行为揭示了多绑定器的存在 - 将一个或者多个队列和一个或者多个交换器绑定起来。这使得可以将发送到不同交换器的具有不同routing key(或者其他属性)的消息发送到同一个队列中。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue(*keys) 13 14 exchange = MQ.direct('my-direct-exchange') 15 queue = MQ.queue('my-direct-queue-with-multiple-bindings') 16 17 bindings = keys.map do |key| 18 queue.bind(exchange, :key => key) 19 end 20 21 bindings.last.subscribe do |header, body| 22 yield header, body 23 end 24 25 end 26 27 def send_to_exchange(message, key) 28 29 exchange = MQ.direct('my-direct-exchange') 30 exchange.publish message, :routing_key => key 31 32 end 33 34 subscribe_to_queue('foo', 'bar', 'wee') do |header, body| 35 p "I received a message: #{body}" 36 end
37 38 send_to_exchange 'Hello', 'foo' 39 send_to_exchange 'You', 'gee' 40 send_to_exchange 'Cruel', 'bar' 41 send_to_exchange 'World', 'wee' 42 43 event_loop.join
虽然不能被命名,但是队列也有以下属性,这些属性和交换器所具有的属性类似。
- 持久性:如果启用,队列将会在协商器重启前都有效。
- 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。
- 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
- 排他性:如果启用,队列只能被声明它的消费者使用。
这些性质可以用来创建例如排他和自删除的transient或者私有队列。这种队列将会在所有链接到它的客户端断开连接之后被自动删除掉 - 它们只是短暂地连接到协商器,但是可以用于实现例如RPC或者在AMQ上的对等通信。
AMQP上的RPC是这样的:RPC客户端声明一个回复队列,唯一命名(例如用UUID19),并且是自删除和排他的。然后它发送请求给一些交换器,在消息的reply-to字段中包含了之前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to作为routing key(之前提到过默认绑定器会绑定所有的队列到默认交换器)发送到默认交换器。注意仅仅是惯例而已。根据和RPC服务器的约定,它可以解释消息的任何属性(甚至数据体)来决定回复给谁。
队列也可以是持久的,可共享,非自动删除以及非排他的。使用同一个队列的多个用户接收到的并不是发送到这个队列的消息的一份拷贝,而是这些用户共享这队列中的一份数据,然后在使用完之后删除掉。
2.4 消息投递的保障机制
消费者会显式或者隐式地通知消息的使用完毕。当隐式地通知的时候,消息被认为在投递之后便被消耗掉。否则客户端需要显式地发送一个验证信息。只有这个验证信息收到之后,消息才会被认为已经收到并且从队列中删除。如果没有收到,那么协商器会在通道20关闭之前尝试着重新投递消息。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(1) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue 13 14 exchange = MQ.fanout('my-fanout-exchange-with-acks') 15 queue = MQ.queue('my-fanout-queue-with-acks') 16 17 queue.bind(exchange).subscribe(:ack => true) do |header, body| 18 yield header, body 19 header.ack unless body == 'Cruel' 20 end 21 22 end 23 24 def send_to_exchange(message) 25 26 exchange = MQ.fanout('my-fanout-exchange-with-acks') 27 exchange.publish message 28 29 end 30 31 subscribe_to_queue do |header, body| 32 p "I received a message: #{body}" 33 end 34 35 send_to_exchange 'Hello' 36 send_to_exchange 'Cruel' 37 send_to_exchange 'World' 38 39 event_loop.join 40 41 __END__ 42 43 First run: 44 45 "I received a message: Hello" 46 "I received a message: Cruel" 47 "I received a message: World" 48 49 Second run: 50 51 "I received a message: Cruel" 52 "I received a message: Hello" 53 "I received a message: Cruel" 54 "I received a message: World" 55 56 ... and so forth
消息生产者可以选择是否在消息被发送到交换器并且还未投递到队列(没有绑定器存在)和/或没有消费者能够立即处理的时候得到通知。通过设置消息的mandatory和/或immediate属性为真,这些投递保障机制的能力得到了强化。
现在在本文例子中使用的Ruby AMQP API还不完全支持这些标志位。但是,在GitHub上已经有两个patch21, 22展示了完全支持之后的情况。
此外,一个生产者可以设置消息的persistent属性为真。这样一来,协商器将会尝试将这些消息存储在一个稳定的位置,直到协商器崩溃。当然,这些消息肯定不会被投递到非持久的队列中。
2.5 拥塞控制
在给出的例子中,对消息的使用永远看做是一个订阅。那么考虑到了拥塞控制吗?规范制定了QoS23特性,限制了通过一个通道发送到一个消费者的消息总量。很不幸的是,这个特性在当前RabbitMQ的版本中还不支持(计划在1.6),但是在原则上是应该被AMQP API支持的。
作为一个替代方案,客户端可以选择从队列中取出消息而不是通过订阅。当使用这种方法的时候,拥塞控制可以手动地实现。
1 require 'rubygems' 2 require 'mq' 3 4 event_loop = Thread.new do 5 EM.run do 6 EM.add_timer(5) do 7 EM.stop 8 end 9 end 10 end 11 12 def subscribe_to_queue 13 14 exchange = MQ.fanout('my-fanout-exchange') 15 queue = MQ.queue('my-fanout-queue') 16 17 queue.bind(exchange).pop do |header, body| 18 yield header, body 19 end 20 21 EM.add_periodic_timer(0.25) do 22 queue.pop 23 end 24 25 end 26 27 def send_to_exchange(message) 28 29 exchange = MQ.fanout('my-fanout-exchange') 30 exchange.publish message 31 32 end 33 34 received = 0 35 36 subscribe_to_queue do |header, body| 37 p "I received a message: #{body}" 38 end 39 40 send_to_exchange 'Hello' 41 send_to_exchange 'World' 42 43 event_loop.join
一个模型样例
想像一下你想创建一个普通的聊天应用,那么应该有以下几个基本特性:
- 聊天 - 两个用户应该可以相互发送消息。
- 一个好友系统 - 用户能够控制谁给他发送消息。
我们假设在协商器上有两种消费者:好友服务器和聊天客户端。
3.1 成为好友
为了成为Bob的好友,Alice首先得发送一个消息给fanout交换器iends,我们假设这个交换器是访问受限24的:普通用户不能够将队列绑定到它。在这个消息中,Alice表示想和Bob成为朋友。
在协商器上有大量的聊天服务器,从绑定到friends交换器的一个单一持久队列中持续地取出消息。这个队列的名字是例如friends.298F2DBC6865-4225-8A73-8FF6175D396D这样的,这难以猜测的名字能够阻止聊天客户端直接取出信息 - 记住:不知道队列的名字,就不能设置订阅。
当一个聊天服务器收到Alice的消息(只有一个会得到这个消息,虽然它们都是从同一个队列中获取),决定这个请求是否有效,然后将其(也许是做过一些调整或者参数化)发送到默认交换器(可以是直接的或者持久的)。它使用另外一个只有Bob知道的routing key来投递。当Bob上线的时候(或者一个服务器做了这件事),他会声明一个队列,这个队列的名字就是之前的routing key(记住在虚拟主机上的默认绑定器是将所有的队列和默认交换器绑定在一起)。
Bob的聊天客户端现在询问Bob是否想和Alice成为朋友。在她的请求消息中,有一个特殊的属性叫做reply-to - 这个属性包括了一个持久和排他的好友队列的名字,这个队列是Alice声明将用于和Bob的未来聊天用。如果Bob想和Alice成为朋友,他会使用这个队列的名字作为routing key,发送一个消息到默认交换器。他也会需要声明一个持久和排他的好友队列,将其名字设为reply-to的值。
例如:Alice和Bob的好友队列的名字是B5725C4A-6621463E-AAF1-8222AA3AD601。Bob发送给Alice的消息的routing-key的值便是这个名字,也是Alice发送给Bob的消息中reply-to的值。
因为好友队列是持久的,因此发送到消息在用户离线的时候也不会丢失。当用户上线之后,所有的在好友队列的消息将会发送到用户,然后才去获取新的消息。
当Bob不再想和Alice成为好友,他可以简单地删除掉为Alice声明的好友队列。在她使用mandatory标志位发送消息的时候,Alice也会注意到Bob已经不再想是她的好友。因为交换器会将她的消息认为不可投递而返回。
仍未提及的事情
仍然有很多本文没有介绍的东西,例如事务语义,关于信息的重路由,header交换器的规范以及不同AMQP规范之间的差异 - 尤其是在1.0版本之前的模型改变。为了简介起见,一个聊天的模型同样也被略过了。
这里也没有介绍了整个系统的管理,因为还不清楚AMQP和RabbitMQ将会走向何方。现在有一个课题,关于在保留的amq命名空间中可用的交换器,它能获取协商器所有的日志信息。但是,能够列出现在已经声明的组件和已连接的用户的工具是用rabbitmqctl命令行接口而不是AMQ实体来实现的。
1 require 'rubygems' 2 require 'mq' 3 4 PATH_TO_RABBITMQCTL = '/usr/local/sbin/rabbitmqctl' 5 6 event_loop = Thread.new { EM.run } 7 8 def subscribe_to_logger 9 10 random_name = (0...50).map{ ('a'..'z').to_a[rand(26)] }.join 11 12 exchange = MQ.topic('amq.rabbitmq.log') 13 queue = MQ.queue(random_name, :autodelete => true, :exclusive => true) 14 binding = queue.bind(exchange, :key => '#') 15 16 binding.subscribe do |header, body| 17 body.split("\n").each do |message| 18 yield header, message 19 end 20 end 21 22 end 23 24 def exchange_info(vhost = '/') 25 info :exchange, vhost, %w(name type durable auto_delete arguments) 26 end 27 28 def queue_info(vhost = '/') 29 info :queue, vhost, %w(name durable auto_delete arguments node messages_ready messages_unacknowledged messages_uncommitted messages acks_uncommitted consumers transactions memory) 30 end 31 32 def binding_info(vhost = '/') 33 info :binding, vhost 34 end 35 36 def connection_info 37 info :exchange, nil, %w(node address port peer_address peer_port state channels user vhost timeout frame_max recv_oct recv_cnt send_oct send_cnt send_pend) 38 end 39 40 def info(about, vhost = nil, items = []) 41 42 column_length = 20 43 44 puts "#{about} info\n" 45 46 cmd = "#{PATH_TO_RABBITMQCTL} list_#{about}s" 47 cmd << " -p #{vhost}" if vhost 48 cmd << " #{items.join(' ')} 2>&1" 49 50 pipe = IO.popen(cmd) 51 52 pipe.readlines.map { |line| line.chomp.split("\t").map { |item| item.ljust(column_length)[0, column_length] } }.slice(1..-2).each do |exchange| 53 print exchange.join(' ') + "\n" 54 end 55 56 end 57 58 subscribe_to_logger do |message| 59 p "RabbitMQ logger: #{message}" 60 end 61 62 %w(connection exchange queue binding).each do |method| 63 self.send "#{method}_info".to_sym 64 end 65 66 event_loop.join
必须提及的是,已经有一些使用AMQP(或者RabbitMQ)的分布式架构。这些架构(例如Nanite25或者Lizzy26)在AMQP的顶部引入了一些抽象层,这样简化了一些操作,例如cluster中在Ruby客户端之间工作的分配。
4.1 下一步该做什么?
要想使用本地的协商器来玩玩好友和邮件列表,第一步应该是学习有关AMQP和RabbitMQ的知识。不仅仅应该在RabbitMQ网站上阅读幻灯片和文章,还应该通过在IRC的#rabbitmq通道上和社区成员交流或者阅读关于RabbitMQ和/或AMQP的网志27, 28,例如LShift的博客。在Twitter上也可以通过#rabbitmq或#amqp这两个标签找到很多关于AMQP或者RabbitsMQ的内容29, 30, 31, 32, 33, 34, 35。
欢迎进入异步信息的世界,祝您玩得愉快!
1 http://www.infoq.com/amqp
2 ttp://java.sun.com/products/jms/
3 Advanced Message Queuing Protocol/Implementations
4 http://www.rabbitmq.com/how.html#clients
5 http://github.com/tmm1/amqp/tree/master
6 http://www.infoq.com/ruby/
7 http://rubyeventmachine.com/
8 http://github.com/famoseagle/carrot/tree/master
9 http://github.com/celldee/bunny/tree/master
10 http://qpid.apache.org/download.html
11 Erlang
12 http://www.rabbitmq.com/specification.html
13 http://www.rabbitmq.com/how.html
14 http://www.rabbitmq.com/how.html
15 AccessControlDesign
16 ACLs
17 http://www.rabbitmq.com/mercurial.html#defaultbranch
18 Minimum Air Induction
19 Universally Unique Identifier
20 或者关联到这个通道的连接。
21 somic/amqp
22 yawn/amqp
23 BasicQosDesign
24 作为替代方案,Alice可以可以收到另一个特殊的routing-key来请求加为好友。所有的聊天服务器将会使用聊天系统中所有用户的 binding-key绑定到friends.298F2DBC-6865-4225-8A73-8FF6175D396D。也有其他的可行方案 - 事实上本例子中所有的行为都有多种实现方法。
25 ezmobius / nanite
26 bmizerany / lizzy
27 lists.rabbitmq.com Mailing Lists
28 RabbitMQ
29 RabbitM - Highlights: presentations, blogs and code
30 freenode
31 Minimum Air Induction
32 Kirk's Rants blogspot
33 http://somic.org/category/rabbitmq/ 34 http://www.lshift.net/blog/category/lshift-sw/rabbitmq
35 Twitter
查看英文原文:Getting started with AMQP and RabbitMQ。
发表评论
-
rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用
2013-12-10 10:27 0此远程接口调用是基于RPC的 先来看看提供暴 ... -
rabbitmq学习10:使用spring-amqp发送消息及异步接收消息
2013-12-26 14:58 4516前面我们已经学习了发送消息及同步接收消息的例子了。下面我们 ... -
rabbitmq学习9:使用spring-amqp发送消息及同步接收消息
2013-12-26 14:57 3959通过对spring-amqp看重要类的认识,下面来通过sp ... -
rabbitmq学习8:spring-amqp的重要类的认识
2013-12-26 14:57 1885对于大多数应用来说都做了与spring整合,对于rabbi ... -
rabbitmq学习7:ConntectionFactory与Conntection的认知
2013-12-26 14:57 1477从前面几小节的学习,我们可能知道在发送和接收消息重要的类C ... -
rabbitmq学习6:RPC
2013-12-26 14:56 1391在《rabbitmq学习2:Work Queues 》 ... -
rabbitmq学习5:Topics
2013-12-10 10:21 1099在前面的《rabbitmq学习4:Routing 》中 ... -
rabbitmq学习4:Routing
2013-12-10 10:20 1392在《rabbitmq学习3:Publish/Sub ... -
rabbitmq学习3:Publish/Subscribe
2013-12-10 10:19 1454在前面的Work Queue中的 ... -
rabbitmq学习2:Work Queues
2013-12-10 10:17 1531在前面的已经提到了一对一的情况;现在一个生产者与多个消费者的 ... -
rabbitmq学习1:hello world
2013-12-10 10:16 1814rabbitMQ是一个在AMQP基础上完整的,可服用的企业消 ... -
JMS、AMQP实例讲解
2013-02-23 20:27 2605使用Git从GitHub上将samples代码拷贝到本机, ... -
RabbitMQ概念导论
2013-02-22 09:44 17061 什么是RabbitMQ ... -
rabbitmq 学习-7- 官方rabbitmq+spring进行远程接口调用
2013-02-22 09:45 629到http://github.com/momania/spri ... -
rabbitmq 学习-6- 发送接收消息示例
2013-02-22 09:45 619这里是同步发送消息,异步接收消息 接收有两种方式:http:/ ... -
rabbitmq 学习-5-
2013-02-22 09:45 488RpcClient发送消息和同步接收消息原理 本身使用R ... -
rabbitmq 学习-4-rabbitmq基础
2013-02-22 09:44 86rabbitmq的中文资料真少,和同事lucas经过两周的学习 ... -
rabbitmq 学习-3-server管理
2013-02-21 09:29 155RabbitMQ Server Administrator's ... -
rabbitmq 学习-2-初试
2013-02-21 09:29 588本例是一个简单的异步发送消息实例 1,发送端 @Test(gr ... -
rabbitmq 学习-1
2013-02-21 09:29 841AMQP,即Advanced Message Queuing ...
相关推荐
【RabbitMQ 入门到精通】:RabbitMQ 是一款流行的消息中间件,它基于 AMQP(Advanced Message Queuing Protocol)协议实现,用于在分布式系统中高效地传输消息,从而实现异步处理、解耦和流量控制。本教程旨在帮助...
【标题】:“rabbitMQ入门” 在IT行业中,消息队列是一种常见的中间件技术,用于解耦应用程序组件,提高系统的可扩展性和可靠性。RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中。本篇文章将带...
消息中间件之RabbitMQ入门讲解”的主题中,我们将深入理解RabbitMQ的核心概念,如何通过控制台进行管理,以及如何在Spring Cloud框架下创建消息生产者和消费者。 首先,让我们了解RabbitMQ的基本概念。RabbitMQ的...
【RabbitMQ入门操作手册】提供了全面的RabbitMQ学习指南,从基础概念到实际操作,帮助初学者快速掌握这个强大的消息队列系统。RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)的开源消息代理,其核心...
这个“rabbitMQ代码案例 简单入门”的资料包为初学者提供了了解和学习RabbitMQ的基础知识。 首先,让我们了解一下RabbitMQ的基本概念: 1. **消息队列**:RabbitMQ的核心是消息队列,它负责存储和转发消息。生产者...
最近整理学习的RabbitMQ入门Dome,文件是一个普通java项目导入完成后在lib文件夹中amqp-client-5.2.0.jar,slf4j-api-1.7.25.jar添加进去即可,里面有5个dome分是 dome1 : 简单队列,dome2 :work模式,dome3 : 订阅...
### RabbitMQ从入门到放弃——理解消息队列与RabbitMQ #### 消息队列简介 消息队列(Message Queue, MQ)作为一种重要的中间件技术,它提供了应用程序间的一种通信方式,通过写入和读取出入列队的消息来进行通信,...
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言...RabbitMQ(支持更多语言,基于AMQP规范)
RabbitMQ 和消息传递系统通常会使用特定的术语来描述其中的关键组件。 - **生产者(Producer)**:生产者是指发送消息的程序或应用。在RabbitMQ 的图形表示中,我们通常用 "P" 来标记它。 - **队列(Queue)**:队列是...
RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息队列系统,它被广泛应用于分布式系统中的消息传递和任务调度。RabbitMQ 提供了高可用性、可扩展性和可靠性的消息中间件服务,能够有效...
RabbitMQ 是一款开源的消息代理软件,也是 AMQP(Advanced Message Queuing Protocol)标准的一个实现。它支持多种消息传递模式,包括点对点(Direct)、发布/订阅(Fanout)、主题(Topic)等,并且可以通过插件的...
在这个“RabbitMQ入门代码”中,我们将深入探讨如何使用Java来与RabbitMQ进行交互,包括队列持久化、消息持久化、Direct交换机、Fanout交换机和Topic交换机的基础测试代码。 首先,让我们从基础开始,了解如何在...
RabbitMQ是一款开源的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效地路由和传递消息。本教程将带你走进RabbitMQ的世界,了解其基本概念、安装过程,以及如何创建消息...
通过这个“RabbitMQ入门视频”,初学者将能够建立起对RabbitMQ基本操作和用法的全面认识,为进一步深入学习和实际项目应用打下坚实的基础。在后续的学习中,还可以探索RabbitMQ的高级特性,如死信队列、延迟队列、...
以上就是RabbitMQ极速入门的关键点,通过学习和实践,你将能够快速掌握这个强大的消息中间件,并将其应用于实际项目中,提升系统性能和稳定性。记得尝试创建自己的第一个消息队列,发布和消费消息,从而加深理解。
RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,支持多种客户端(语言),用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征\ RabbitMQ是采用 Erlang语言开发的,所以系统...
RabbitMQ 开发入门 RabbitMQ 是一个基于 AMQP 协议的消息队列中间件,广泛应用于分布式系统中实现异步通信、解...以上是 RabbitMQ 开发入门的一些重要知识点,希望能够帮助您快速了解 RabbitMQ 的开发原理和配置方法。
要求入门想运行我的演示代码: 克隆存储库: git clone < Repository>cd rabbitmq-nodejs-demos 安装所有依赖项: npm install 想要运行自己的代码: npm init -ynpm install amqplib本地运行在terminal 1 docker...
这个指南将引导你建立一个RabbitMQ AMQP服务器发布和订阅消息的过程。 声明 可以使用本人阿里云安装好的RabbitMQ服务器 host:http://120.27.114.229 username:root password:root port:5672 web management: ...
RabbitMQ是一种开源的消息代理和队列服务器,它允许不同的应用程序之间通过AMQP协议共享数据。AMQP是一种二进制协议,是一套应用层协议的规范,允许各种消息中间件产品遵循此规范并实现相同的功能。RabbitMQ使用...