`

rabbitmq & spring amqp

    博客分类:
  • J2EE
阅读更多
 
My main AMQP post on blogger:
http://wuaner.blogspot.com/2012/10/messaging.html
sudo rabbitmqctl add_user admin admin
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
sudo rabbitmqctl set_user_tags admin administrator


AMQP的好处:
1 network protocal(TCP),P、Broker、C 可以在不同的主机上。
2 作为 P or C 的 client 可以是任意的编程语言。
3 提供队列的持久化机制,保证消息的安全性和可靠性。
4 支持插件,灵活的可扩展性。
5 Two-directional authentication,良好的安全机制。
5 开源,标准开放。


Difference between activeMQ  n rabbitMQ:
http://stackoverflow.com/questions/7044157/switching-from-activemq-to-rabbitmq
activeMQ 是以 Java JSM 标准API 作为默认的消息实现的;而 rabbitMQ 是 AMQP 的实现;比较 activeMQ  n rabbitMQ 的区别,比较 JMS 和 AMQP 就可以大概看出点来,见 wiki:
引用
http://en.wikipedia.org/wiki/AMQP
JMS, the Java messaging service, is often compared to AMQP. However, JMS is an API specification (part of the Java EE specification) that defines how message producers and consumers are implemented. JMS does not guarantee interoperability between implementations, and the JMS-compliant messaging system in use may need to be deployed on both client and server. On the other hand, AMQP is a wire-level protocol specification. In theory AMQP provides interoperability as different AMQP-compliant software can be deployed on the client and server sides. Note that, like HTTP and XMPP, AMQP does not have a standard API.
当然,activeMQ 现如今也加入了对 AMQP 以及其他面向消息协议的支持。



关于 rabbitmq 的 rabbitmq.config & rabbitmq-env.conf:
http://www.rabbitmq.com/configure.html#configuration-file


rabbitMQ 集群:
How To Cluster Rabbit-MQ:
http://www.godlikemouse.com/2010/12/14/how-to-cluster-rabbit-mq/
Clustering Guide:
http://www.rabbitmq.com/clustering.html
Highly Available Queues:
http://www.rabbitmq.com/ha.html
Distributed RabbitMQ brokers:
http://www.rabbitmq.com/distributed.html
RabbitMQ, backing stores, databases and disks:
http://www.rabbitmq.com/blog/2011/01/20/rabbitmq-backing-stores-databases-and-disks/

疑问:
RabbitMQ application 、node 、broker 间的关系?


在 consumer 和 producer(publisher)分离的场景下(即 consumer 和 producer 被分别启动,在不同的 app 下),queue / exchange / binding 的 declare,在哪端做比较合适?
http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2012-February/018105.html



关于 rabbitmq 的 Qos(即spring amqp 中的 prefetchCount):
http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/


Dead Letter Exchanges:
http://www.rabbitmq.com/dlx.html
Dead lettering with RabbitMQ – Strategies:
http://blog.craftforge.net/dead-lettering-with-rabbitmq-strategies/


exclusive, auto-delete, and TTL:
http://yabfog.com/blog/2013/04/23/rabbitmq-queue-auto-delete
https://www.rabbitmq.com/ttl.html
引用
auto-delete 和 Queue TTL 在对 queue 做删除时,都是只考虑该 queue 未被 consume 的时间,而不考虑 queue 是否为空。所以,这两项配置是会将一个不会空的 queue 连同其中的 messages 一起被删除的,这点需要注意。他们的不同在于:
queue 的 auto-delete 为 true 时:只在 consumer 的数量减至 0 时,才会删除该 queue;
为 queue 配置了 TTL 时:只要该 queue 未被 consume 的时间达到了 TTL 配置的时间,该 queue 会被删除。



Policy:
http://www.rabbitmq.com/parameters.html#policies
引用
#增加一个名为 expiry 的 policy,功能为配置所有以 a.b.c. 开头的 queue(且后面至少还有一个字符) 的 TTL(since unused,及多长时间未被 consume) 为 60 秒:
$ sudo rabbitmqctl set_policy expiry "^a\.b\.c\..+$" '{"expires":60000}' --apply-to queues



retry that based-on Spring-Retry:
Consumer发生business exception时,可以使用Spring-Retry尝试重发(指定次数)。
retry操作是需要Message有messageId(org.springframework.amqp.core.MessageProperties.messageId)作为retry的依据的;如果你可以控制Producer端Message的生成,那请为生成的Message添加messageId,如果控制不了,可以使用下面的MissingMessageIdAdvice:
https://github.com/garyrussell/spring-amqp/commit/a8f06c6ff1225e5e65a94d16621b6f99db6f8ba6
retry 范例:
http://forum.springsource.org/showthread.php?125717-AMQP-1-0-1-SNAPSHOT-DLQ-and-Retry-Backoff-policy



Performance of rabbitMQ:
http://stackoverflow.com/questions/10030227/maximize-throughput-with-rabbitmq
引用
Use a larger prefetch count. Small values hurt performance.
A topic exchange is slower than a direct or a fanout exchange.
Make sure queues stay short. Longer queues impose more processing overhead.
If you care about latency and message rates then use smaller messages. Use an efficient format (e.g. avoid XML) or compress the payload.
Experiment with HiPE, which helps performance.
Avoid transactions and persistence. Also avoid publishing in immediate or mandatory mode. Avoid HA. Clustering can also impact performance.
You will achieve better throughput on a multi-core system if you have multiple queues and consumers.
Use at least v2.8.1, which introduces flow control. Make sure the memory and disk space alarms never trigger.
Virtualisation can impose a small performance penalty.
Tune your OS and network stack. Make sure you provide more than enough RAM. Provide fast cores and RAM.



RabbitMQ Flow Control:
http://www.rabbitmq.com/memory.html

http://www.cnblogs.com/zhengyun_ustc/archive/2012/08/25/flowcontrol.html
引用
1. Per-Connection Flow Control
是面向每一个连接做的流量控制。
即,RabbitMQ 会主动阻塞(Block)那些发布消息太快的连接(Connections),无需做任何配置。
如果连接被阻塞了,那么它在 rabbitmqctl 控制台上会显示一个blocked的状态。
RabbitMQ 的流量控制机制是基于信用证(Credit)的拥塞控制机制。
2. Memory-Based Flow Control
RabbitMQ 会在启动时检测机器的物理内存数值。默认当 MQ 占用 40% 以上内存时,MQ 会主动抛出一个内存警告并阻塞所有连接(Connections)。
你也可以通过修改 rabbitmq.config 文件来调整内存阈值,默认值是 0.4,如下所示:
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
当 MQ 启动时,该内存阈值也会写入到  RABBITMQ_NODENAME.log 文件里,如下所示:
Memory limit set to 2048MB.
如果 MQ Server 不能识别你的系统,或者你在用 Windows 系统,那么它会写一个警告信息到 RABBITMQ_NODENAME.log 文件里,如下所示:
=WARNING REPORT==== 29-Oct-2009::17:23:44 ===
Unknown total memory size for your OS {unix,magic_homebrew_os}. Assuming memory size is 1024MB.
3. Disk-Based Flow Control
默认情况,如果剩余磁盘空间在 1GB 以下,RabbitMQ 主动阻塞所有的生产者。这个阈值也是可调的。



Spring AMQP - Reference Documentation:
http://static.springsource.org/spring-amqp/reference/html/index.html
Spring Integration 的 AMQP 支持:
http://static.springsource.org/spring-amqp/reference/html/spring-integration-amqp.html
引用
In Spring Integration, "Channel Adapters" are unidirectional (one-way) whereas "Gateways" are bidirectional (request-reply). We provide an inbound-channel-adapter, outbound-channel-adapter, inbound-gateway, and outbound-gateway.
[Spring-based app] <---> [AMQP Broker]。inbound、outbound 之 进、出,都是站在[Spring-based app]的角度考虑:
amqp:inbound-channel-adapter:To receive AMQP Messages from a Queue
amqp:outbound-channel-adapter:To send AMQP Messages to an Exchange
amqp:inbound-gateway:To receive an AMQP Message from a Queue, and respond to its reply-to address
amqp:outbound-gateway:To send AMQP Messages to an Exchange and receive back a response from a remote client
adapter 和 gateway 的区别,举个例子,以 inbound 为例,如果你得到 message 后只是对其做处理,那就用adapter;如果在处理之外,还会有处理之后的返回值,那针对这个返回值,可以使用 gateway,gateway 的 reply-channel 就是放返回值的地方!



常用插件plugins:
Management Plugin 及  Management Command Line Tool:
http://stackoverflow.com/questions/10709533/is-it-possible-to-view-rabbitmq-message-contents-directly-from-the-command-line
http://www.rabbitmq.com/management.html
http://www.rabbitmq.com/plugins.html
http://www.rabbitmq.com/man/rabbitmq-plugins.1.man.html
#enable management plugin:
$ rabbitmq-plugins enable rabbitmq_management
#lists all plugins, on one line each.
$ rabbitmq-plugins list
#Disables the specified plugins and all plugins that depend on them.
$ rabbitmq-plugins disable amqp_client
http://www.rabbitmq.com/management-cli.html
引用
访问 http://server-name:15672/cli/ 下载 rabbitmq management 命令行工具,将其 cp 到 /usr/local/bin 即可在命令行下使用 rabbitmqadmin 命令。如:
//查看rabbitmqadmin帮助
$ rabbitmqadmin -h (or --help)
//查看可用的子命令
$ rabbitmqadmin help subcommands
//声明一个名为 test 的队列(可指定auto_delete(默认false)、durable(默认true)等参数):
$ rabbitmqadmin  declare queue name=test
//列出所有exchanges:
$ rabbitmqadmin list exchanges;
//发布一条 message 到队列 test(使用的是没有名字的默认exchange):
$ rabbitmqadmin publish exchange=amq.default routing_key=test payload="hello, world 1"
//取出队列 test 中的一条 message(默认 requeue 为 true,即重新放回原队列头(谨记不是requeue入队列尾,而是头;下次还是会取到该message)!)
$ rabbitmqadmin get queue=test requeue=false



spring amqp ReturnCallback的使用:
http://forum.springsource.org/showthread.php?127065-Support-for-ReturnListener-on-RabbitTemplate
关于amqp中消息的 mandatory 和 immediate:
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish
引用
bit mandatory
This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.
The server SHOULD implement the mandatory flag.
bit immediate
This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.
http://answerpot.com/showthread.php?1119669-default+queue+for+unroutable+messages
引用
The mandatory flag causes a basic.return if the message wasn't routed to a queue.
The immediate flag causes a basic.return if the message got to some queues but there were no consumers waiting for it.
https://groups.google.com/forum/?fromgroups=#!topic/rabbitmq-discuss/_SSqSLE65vo
引用
mandatory->"unroutable", immediate->"undelivered".
关于 rabbitmq Java API 中 Channel.basicNack() 和 Channel.basicReject 的区别:
http://www.rabbitmq.com/nack.html
引用
The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.
To solve this, RabbitMQ supports the basic.nack method that provides all the functionality of basic.reject whilst also allowing for bulk processing of messages.
To reject messages in bulk, clients set the multiple flag of the basic.nack method to true. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag field of the basic.nack method. In this respect, basic.nack complements the bulk acknowledgement semantics of basic.ack.



cluster 环境下的 failover(故障转移) & HA(High Availability):
http://www.rabbitmq.com/ha.html
http://www.rabbitmq.com/clustering.html#clients
Failover with RabbitMQ, the sender's story:
http://blog.zenika.com/index.php?post/2012/03/14/Failover-with-RabbitMQ%2C-the-sender-s-story

"Messaging for Modern Applications" 相关:
http://forum.springsource.org/showthread.php?123977-Connection-Failover
https://github.com/tmccuch

Spring Integration + Spring Retry:
http://forum.springsource.org/showthread.php?120707-Failproof-spring-amqp
分享到:
评论

相关推荐

    java rabbitmq spring springAMQP 代码包 project

    这个“java rabbitmq spring springAMQP 代码包 project”显然是一个综合性的项目,旨在展示如何在Java环境中集成和使用RabbitMQ消息队列服务,结合Spring框架以及Spring AMQP的高级抽象来实现。接下来,我们将详细...

    RabbitMq与Spring整合实例

    将RabbitMQ与Spring整合,可以方便地在Spring应用中使用消息队列,实现异步通信和任务调度。 本实例主要介绍如何在Spring应用中集成RabbitMQ,构建一个完整的消息传递系统。首先,你需要确保已经安装了RabbitMQ...

    spring rabbitmq amqp

    Spring RabbitMQ AMQP 是一个基于Java的开源框架,它整合了RabbitMQ消息中间件,实现了高级消息队列协议(AMQP)。这个框架是Spring生态的一部分,为Spring Boot应用程序提供了强大的消息处理能力。AMQP是一种标准的...

    rabbitmq 路由spring-amqp实现

    Spring框架为与RabbitMQ的集成提供了`spring-amqp`模块,使得在Java应用中使用RabbitMQ变得更加便捷。本文将深入探讨如何使用Spring AMQP来实现RabbitMQ的路由功能。 首先,让我们理解RabbitMQ中的路由概念。在...

    spring amqp 配置实现rabbitmq 路由

    在本文中,我们将深入探讨如何使用Spring AMQP配置来实现RabbitMQ的路由。Spring AMQP是Spring框架的一个模块,它提供了与RabbitMQ消息中间件集成的能力,使得在Java应用中处理AMQP(Advanced Message Queuing ...

    Spring AMQP 1.5.3.RELEASE API

    Spring AMQP 1.5.3.RELEASE API是Spring框架中的一个重要组成部分,专注于RabbitMQ消息中间件的集成。Spring AMQP(Advanced Message Queuing Protocol)允许开发者利用AMQP协议来构建可扩展、高可用的消息驱动系统...

    Spring AMQP 2 中文 参考手册 中文文档

    想要使用Spring AMQP,开发者需要有一个运行中的RabbitMQ代理。RabbitMQ是一个流行的开源消息代理,它可以作为中间件来实现分布式系统中的消息队列。 在依赖管理方面,Spring AMQP项目允许开发者通过构建工具(如...

    rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用

    在本主题"rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用"中,我们将深入探讨如何使用RabbitMQ作为消息中间件,结合Spring-AMQP库实现RPC模式。 RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP...

    rabbitmq学习10:使用spring-amqp发送消息及异步接收消息

    标题中的“rabbitmq学习10:使用spring-amqp发送消息及异步接收消息”表明了本次讨论的主题,即如何在Spring框架中利用Spring AMQP组件与RabbitMQ进行交互,实现消息的发送和异步接收。RabbitMQ是一个开源的消息代理...

    Spring AMQP hello world

    Spring AMQP(Advanced Message Queuing Protocol)是Spring框架的一个扩展,用于支持RabbitMQ等AMQP消息代理。在这个“Spring AMQP Hello World”示例中,我们将深入探讨如何使用Spring AMQP来创建一个简单的消息...

    rabbitmq + spring boot demo 消息确认、持久化、备用交换机、死信交换机等代码

    3. **创建消息模板**:使用`RabbitTemplate`,这是Spring AMQP提供的核心类,用于发送和接收消息。配置消息确认模式(`mandatory`或`ackMode`),启用消息持久化。 4. **定义交换机和队列**:在Java配置类中,创建...

    spring-amqp文档.zip

    - `RabbitTemplate`:这是Spring AMQP的主要入口点,提供发送和接收消息的方法,简化了与RabbitMQ服务器的交互。 - `Message`和`MessageProperties`:`Message`代表AMQP的消息,`MessageProperties`包含消息的元...

    rabbitmq和spring集成

    1. **添加依赖**:在Spring项目的Maven或Gradle配置文件中,引入RabbitMQ的客户端库和Spring AMQP库。例如,在Maven的pom.xml中,你需要添加如下依赖: ```xml &lt;groupId&gt;com.rabbitmq&lt;/groupId&gt; &lt;artifactId&gt;...

    SpringAMQP-支持使用AMQP的Spring编程模型

    3. **容器管理的消费者**:Spring AMQP 提供了 RabbitMQ 容器,它能够自动创建和管理消息消费者。你可以通过注解来配置消费者的绑定,以及如何处理接收到的消息。这极大地简化了消费者的生命周期管理。 4. **...

    Rabbitmq与Spring整合

    1. 添加依赖:在你的`pom.xml`或`build.gradle`文件中添加RabbitMQ和Spring AMQP的依赖。 2. 配置RabbitMQ:在Spring的配置文件(如`application.properties`或`application.yml`)中设置RabbitMQ服务器的连接信息...

    rabbitmq+spring需要的jar包

    其次,`spring-rabbit-1.4.5.RELEASE.jar`是Spring与RabbitMQ之间的桥梁,它扩展了Spring AMQP,提供了具体的实现细节,如RabbitTemplate,用于发送和接收消息,以及RabbitAdmin,用于管理RabbitMQ的实体,如交换器...

    rabbitmq与spring集成示例demo

    2. **Spring AMQP**:Spring框架提供了一个名为Spring AMQP的模块,它为RabbitMQ提供了一种简单的方式来配置和操作AMQP代理。通过Spring AMQP,我们可以声明Exchange(交换机)、Queue(队列)和Binding(绑定),并...

    rabbitmq&swagger;&sqllite;

    RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议实现。RabbitMQ允许应用程序之间异步通信,提高了系统的可扩展性和可靠性。在Spring Boot中集成RabbitMQ,可以方便...

Global site tag (gtag.js) - Google Analytics