- 浏览: 1589211 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (289)
- java 语法基础 (51)
- spring (8)
- mvc struct /Ant --build.xml (8)
- SOA (0)
- oracle 9i/10g (23)
- sql server 2000-2005 (3)
- 数据库基础知识 (6)
- 设计模式与软件架构 (10)
- Hibernate 持久化 (9)
- J2SE/J2EE/J2ME/AJAX 技术 (8)
- JSF 技术 (3)
- JAVA 图形化 (0)
- JMS (40)
- Eclipse 3.2 IDE 开发技巧 (13)
- 项目处理方法集合 (2)
- html/jsp/javascript (2)
- Unix/Linux (9)
- j2me/ARM/windriver/嵌入式 (4)
- 电信科学 (8)
- jsp (1)
- c/c++ (1)
- LZW压缩算法(java) (2)
- Android (77)
- 版本管理git/svn (2)
最新评论
-
huihai:
有demo吗?
NamingStrategy实现动态表名映射 -
cangbaotu:
推荐给大家一些有用的爬虫源码:https://github.c ...
网络爬虫(源代码参考) -
tuspark:
除了.classpath文件以外,.project文件也应该了 ...
Eclipse .classpath文件浅谈 -
tuspark:
造成eclipse自动关闭的原因有很多,这里有很多介绍:ecl ...
eclipse 自动关闭 解决方案 -
DEMONU:
网上都是这些,这种文章。。。
ActiveMQ中的消息持久性
ActiveMQ in Action(6)
关键字: activemq
2.6 Features
ActiveMQ包含了很多功能强大的特性,下面简要介绍其中的几个。
2.6.1 Exclusive Consumer
Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。例如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作。
ActiveMQ从4.x版本起开始支持Exclusive Consumer (或者说Exclusive Queues)。 Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其它的consumer。
可以通过Destination Options 来创建一个Exclusive Consumer,如下:
- queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
- consumer = session.createConsumer(queue);
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);顺便说一下,可以给consumer设置优先级,以便针对网络情况(如network hops)进行优化,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true &consumer.priority=10");
2.6.2 Message Groups
用Apache官方文档的话说,Message Groups rock!它是Exclusive Consumer功能的增强。逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS 消息属性JMSXGroupID 被用来区分message group。Message Groups特性保证所有具有相同JMSXGroupID 的消息会被分发到相同的consumer(只要这个consumer保持active)。另外一方面,Message Groups特性也是一种负载均衡的机制。
在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker 会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息,直到:
- Consumer被关闭。
- Message group被关闭。通过发送一个消息,并设置这个消息的JMSXGroupSeq为0。
从4.1版本开始,ActiveMQ支持一个布尔字段JMSXGroupFirstForConsumer 。当某个message group的第一个消息被发送到consumer的时候,这个字段被设置。如果客户使用failover transport连接到broker。在由于网络问题等造成客户重新连接到broker的时候,相同message group的消息可能会被分发到不同与之前的consumer,因此JMSXGroupFirstForConsumer字段也会被重新设置。
以下是使用message groups的例子:
- Mesasge message = session.createTextMessage("<foo>hey</foo>");
- message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
- ...
- producer.send(message);
Mesasge message = session.createTextMessage("<foo>hey</foo>"); message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05"); ... producer.send(message);
2.6.3 JMS Selectors
JMS Selectors用于在订阅中,基于消息属性对进行消息的过滤。JMS Selectors由SQL92语法定义。以下是个Selectors的例子:
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,例如:
LIKE '12%3' ('123' true,'12993' true,'1234' false)
LIKE 'l_se' ('lose' true,'loose' false)
LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)
需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值。另外表达式中的属性不会自动进行类型转换,例如:
myMessage.setStringProperty("NumberOfOrders", "2");"NumberOfOrders > 1" 求值结果是false。关于JMS Selectors的详细文档请参考javax.jms.Message的javadoc。
上一小节介绍的Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,例如:
设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:
- producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。
- 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。
2.6.4 Pending Message Limit Strategy
首先简要介绍一下prefetch机制。ActiveMQ通过prefetch机制来提高性能,这意味这客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由prefetch limit来控制。当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认。可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQPrefetchPolicy对象来配置prefetch policy。也可以通过connection options或者destination options来配置。例如:
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
prefetch size的缺省值如下:
- persistent queues (default value: 1000)
- non-persistent queues (default value: 1000)
- persistent topics (default value: 100)
- non-persistent topics (default value: Short.MAX_VALUE -1)
慢消费者会在非持久的topics上导致问题:一旦消息积压起来,会导致broker把大量消息保存在内存中,broker也会因此而变慢。未来ActiveMQ可能会实现磁盘缓存,但是这也还是会存在性能问题。目前ActiveMQ使用Pending Message Limit Strategy来解决这个问题。除了prefetch buffer之外,你还要配置缓存消息的上限,超过这个上限后,新消息到来时会丢弃旧消息。通过在配置文件的destination map中配置PendingMessageLimitStrategy,可以为不用的topic namespace配置不同的策略。目前有以下两种:
- ConstantPendingMessageLimitStrategy。这个策略使用常量限制。
例如:<constantPendingMessageLimitStrategy limit="50"/> - PrefetchRatePendingMessageLimitStrategy。这个策略使用prefetch size的倍数限制。
例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
在以上两种方式中,如果设置0意味着除了prefetch之外不再缓存消息;如果设置-1意味着禁止丢弃消息。
此外,你还可以配置消息的丢弃策略,目前有以下两种:
- oldestMessageEvictionStrategy。这个策略丢弃最旧的消息。
- oldestMessageWithLowestPriorityEvictionStrategy。这个策略丢弃最旧的,而且具有最低优先级的消息。
以下是个ActiveMQ配置文件的例子:
- <broker persistent="false" brokerName="${brokername}" xmlns="http://activemq.org/config/1.0">
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <policyEntry topic="PRICES.>">
- <!-- 10 seconds worth -->
- <subscriptionRecoveryPolicy>
- <timedSubscriptionRecoveryPolicy recoverDuration="10000" />
- </subscriptionRecoveryPolicy>
- <!-- lets force old messages to be discarded for slow consumers -->
- <pendingMessageLimitStrategy>
- <constantPendingMessageLimitStrategy limit="10"/>
- </pendingMessageLimitStrategy>
- </policyEntry>
- </policyEntries>
- </policyMap>
- </destinationPolicy>
- ...
- </broker>
<broker persistent="false" brokerName="${brokername}" xmlns="http://activemq.org/config/1.0"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="PRICES.>"> <!-- 10 seconds worth --> <subscriptionRecoveryPolicy> <timedSubscriptionRecoveryPolicy recoverDuration="10000" /> </subscriptionRecoveryPolicy> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="10"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
2.6.5 Composite Destinations
从1.1版本起, ActiveMQ支持composite destinations。它允许用一个虚拟的destination 代表多个destinations。例如你可以通过composite destinations在一个操作中同时向12个queue发送消息。在composite destinations中,多个destination之间采用","分割。例如:
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
以下是ActiveMQ配置文件进行配置的一个例子:
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <compositeQueue name="MY.QUEUE">
- <forwardTo>
- <queue physicalName="FOO" />
- <topic physicalName="BAR" />
- </forwardTo>
- </compositeQueue>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <queue physicalName="FOO" /> <topic physicalName="BAR" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
可以在转发前,先通过JMS Selector判断一个消息是否需要转发,例如:
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <compositeQueue name="MY.QUEUE">
- <forwardTo>
- <filteredDestination selector="odd = 'yes'" queue="FOO"/>
- <filteredDestination selector="i = 5" topic="BAR"/>
- </forwardTo>
- </compositeQueue>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <filteredDestination selector="odd = 'yes'" queue="FOO"/> <filteredDestination selector="i = 5" topic="BAR"/> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
2.6.6 Mirrored Queues
每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。
ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序可以订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一个例子:
- <broker xmlns="http://activemq.org/config/1.0" brokerName="MirroredQueuesBroker1" useMirroredQueues="true">
- <transportConnectors>
- <transportConnector uri="tcp://localhost:61616"/>
- </transportConnectors>
- <destinationInterceptors>
- <mirroredQueue copyMessage = "true" prefix="Mirror.Topic"/>
- </destinationInterceptors>
- ...
- </broker>
<broker xmlns="http://activemq.org/config/1.0" brokerName="MirroredQueuesBroker1" useMirroredQueues="true"> <transportConnectors> <transportConnector uri="tcp://localhost:61616"/> </transportConnectors> <destinationInterceptors> <mirroredQueue copyMessage = "true" prefix="Mirror.Topic"/> </destinationInterceptors> ... </broker>假如某个producer向名为Foo.Bar的queue中发送消息,那么你可以通过订阅名为Mirror.Topic.Foo.Bar的topic来获得发送到Foo.Bar中的所有消息。
发表评论
-
基于jms使用ActiveMQ实现异步日志功能.消息持久到oracle 10g 数据库
2008-09-18 15:05 9935package askyaya.entity;import j ... -
activemq 重新连接的机制
2008-09-18 14:39 23220最近一个项目要用到ActiveMq,并且需要最大程度的保证消息 ... -
ActiveMQ stop inactivity read check
2008-09-17 16:01 10649You can do the following to fix ... -
WSAD环境下JMS异步通信全攻略 (3)
2008-09-11 14:18 32263.5 消息驱动的Bean 在前文讨论JMS消息接收处理逻 ... -
WSAD环境下JMS异步通信全攻略 (2)
2008-09-11 13:58 3771三、JMS P2P编程 在JMS P2P通信方式中,发送程 ... -
WSAD环境下JMS异步通信全攻略 (1)
2008-09-11 13:57 4711一、JMS基本概念 1.1 P2P通信 1.2 Pub ... -
topicpublisher (jms)
2008-09-10 18:55 2919目的地类型JNDI名字连接工厂类型Topic/Queuejav ... -
ProducerTool /MessageBroker /getConnectionFactoryF
2008-09-10 18:12 2474lib: jms1.1.jar activemq-all-5. ... -
activemq例子代码 发送BytesMessage消息
2008-09-10 13:55 18380import javax.jms.Connection; im ... -
ActiveMQ in Action(7)
2008-09-10 13:44 3949ActiveMQ in Action(7) 关键字 ... -
ActiveMQ in Action(5)
2008-09-10 13:42 3924ActiveMQ in Action(5) 关键字: acti ... -
ActiveMQ in Action(4)
2008-09-10 13:40 3475ActiveMQ in Action(4) 关键字: acti ... -
ActiveMQ in Action(3)
2008-09-10 13:39 3676ActiveMQ in Action(3) 关键字: acti ... -
ActiveMQ in Action(2)
2008-09-10 13:38 4208ActiveMQ in Action(2) 关键字: acti ... -
ActiveMQ in Action(1)
2008-09-10 13:37 5826ActiveMQ in Action(1) 关键字: acti ... -
为ActiveMQ服务器添加简单验证机制
2008-09-09 23:48 4192为ActiveMQ服务器添加简单验证机制 关键字: Java, ... -
Sender/receiver 消息
2008-09-09 20:28 2124Sender:import java.io.BufferedR ... -
activema.xml 配置
2008-09-09 17:44 3811/***作者:andyao,email:andyaoy@gma ... -
JMX 与系统管理
2008-09-08 10:52 1861Java SE 6 新特性: JMX 与系统管理 ... -
ActiveMQ中的消息持久性
2008-09-05 12:24 3389ActiveMQ中的消息持久性 ActiveMQ很 ...
相关推荐
ActiveMQ in Action pdf英文原版加源代码压缩包。 Apache ActiveMQ in Action is a thorough, practical guide to implementing message-oriented systems in Java using ActiveMQ. The book lays out the core of ...
《ActiveMQ In Action》是一本深入探讨Apache ActiveMQ的权威指南,这本书主要涵盖了消息中间件在实际应用中的各种场景和解决方案。ActiveMQ是Apache软件基金会开发的一个开源消息代理,它支持多种消息协议,如...
根据书籍提供的简介,读者可以预期从《ActiveMQ in Action》中获得从安装和配置到深入开发的全面指导,包括连接到ActiveMQ、消息存储、安全性配置以及构建使用ActiveMQ的应用程序的案例。书籍同样着重介绍了如何利用...
《ActiveMQ in Action》2010版是Manning Publications在2009年发布的一本关于Apache ActiveMQ的专业书籍,旨在深入介绍消息中间件和Java消息服务(JMS)的概念,以及如何有效地使用ActiveMQ。书中包含了第7、9、11、...
《ActiveMQ in Action》是一本深入探讨Apache ActiveMQ的专著,由Michael Burman和Peter Monks撰写。这本书详尽地介绍了如何使用ActiveMQ这一强大的开源消息代理来构建高效、可扩展和可靠的分布式应用程序。英文版是...
《ActiveMQ in Action》是一本深受读者欢迎的关于Apache ActiveMQ的权威指南,它深入浅出地介绍了如何在实际环境中运用这一开源消息中间件。ActiveMQ是Java消息服务(JMS)的一个实现,广泛用于分布式系统中的异步...
ActiveMQ in Action 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
ActiveMQ 不仅实现了 JMS 规范中定义的所有特性,也额外提供了一些特有且有用的特性。我们会在 3.1 小节详细列说这些特性,并且书中剩余的章节还会继续讨论这些特性。为了演示这些特性,我们创建了两个基于实际业务...
以上概述了《ActiveMQ In Action》这本书的主要知识点,涵盖了从基础知识到实践案例再到高级配置等各个方面。通过学习这些内容,开发者可以深入了解ActiveMQ的工作原理,并能够灵活地将其应用于各种业务场景中。
### ActiveMQ In Action翻译笔记-更新版2011 关键知识点详解 #### 一、ActiveMQ概述 **ActiveMQ**是Apache旗下的一款开源、高性能、功能丰富的消息中间件,支持多种消息传递模式,包括点对点(PTP)和发布/订阅...
6. **管理工具**:ActiveMQ提供Web控制台和命令行工具,方便管理员监控和管理消息队列。 三、使用ActiveMQ开发JMS 1. **安装配置**:首先,下载并安装ActiveMQ,配置相关参数,如端口、日志位置等,并启动服务器。 ...
根据给定的文件信息,以下是从“ActiveMQ In Action精简版”中提炼出的关键IT知识点,主要聚焦于ActiveMQ的介绍、配置、使用及高级功能。 ### ActiveMQ概述 ActiveMQ是Apache软件基金会下的一个开源项目,它是一款...
《ActiveMQ in Action 2011》是2011年出版的一本关于Apache ActiveMQ的权威指南,由行业专家撰写,旨在帮助读者深入理解并有效地使用这一强大的消息中间件。ActiveMQ是Apache软件基金会的一个开源项目,它提供了一个...
《ActiveMQ in Action》中文版是一个关于Apache ActiveMQ的详细指南,这是一款广泛使用的开源消息代理,也是Java消息服务(JMS)实现的领先者。这本书深入探讨了ActiveMQ的各个方面,包括其核心功能、使用场景以及...
ActiveMQ in Action是一本介绍ActiveMQ的书籍,虽然作者未知,但它是对ActiveMQ入门到一般管理非常有帮助的中文资料。该书在英文原版基础上增加了书签功能,方便读者阅读。书中首先介绍了JMS(Java消息服务)规范,...
《ActiveMQ in Action》这本书是了解和掌握Apache ActiveMQ这一开源消息代理的宝贵资源。ActiveMQ是Apache软件基金会开发的一款企业级的消息中间件,广泛应用于分布式系统中的消息传递和解耦。通过深入阅读这本书,...
《ActiveMQ In Action》这本书是了解和掌握Apache ActiveMQ这一开源消息中间件的绝佳资源。ActiveMQ是Apache软件基金会开发的一款高效、灵活且可靠的开源消息代理,它支持多种消息协议,如OpenWire、AMQP、STOMP等,...