本文是研究ActiveMQ过程中总结的资料,主要目的是梳理activemq的现有功能帮助记忆。
ActiveMQ是JMS的一种实现,因此对JMS标准的了解非常有助于对ActiveMQ的认识。
目录
1. Jms消息: 4
1.1 PTP模型 4
1.1.1 点对点消息传递域的特点如下: 4
1.1.2 可靠性 4
1.2 PUB/SUB 模型 4
1.2.1 发布/订阅消息传递域的特点如下: 4
1.2.2 订阅 5
1.2.3 恢复和重新派送 5
1.2.4 可靠性(Reliability) 5
1.3 JMS消息体格式 5
1.4 消息的同步和异步接收 6
1.5 消息选择器 6
2. 基本可靠性机制 6
2.1 控制消息的签收(Acknowledgment) 6
2.2 指定消息传送模式 7
2.2.1 PERSISTENT(持久性消息) 7
2.2.2 NON_PERSISTENT(非持久性消息) 7
3. 高级可靠性机制 7
3.1 创建持久订阅 7
3.1.1 建立持久订阅的步骤: 7
3.2 使用本地事务 8
3.3 异步发送消息 8
3.4 独占的消费者 8
4. 配置文件 8
5. 监控 8
在Windows 平台监控 8
5.1 JXM 控制台(JDK1.5 控制台)监控ActiveMQ。 9
5.2 Web Console 被集成到了ActiveMQ 的二进制发布包中,因此缺省访问 9
5.3 Command Agent 从4.2 版本起,ActiveMQ 支持Command Agent 9
6. 移植及集群 9
6.1 Queue consumer clusters 9
6.2 Broker clusters 9
6.3 Master Slave 10
7. 特性 10
7.1 Exclusive Consumer 10
7.2 Message Groups 11
7.3 Composite Destinations 11
7.4 Mirrored Queues 11
7.5 Strict Order Dispatch Policy 11
7.6 Message Transformation 12
7.7 Transport 12
7.8 Persistence 13
7.8.1 AMQ Message Store 13
是ActiveMQ5.0 缺省的持久化存储。 13
7.8.2 Kaha Persistence 13
是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。 13
7.8.3 JDBC Persistence 13
目前支持的数据库有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB,MySQL, Oracle, Postgresql, SQLServer, Sybase 13
7.8.4 Disable Persistence 13
不应用持久化 13
7.9 Security 13
7.9.1 Simple Authentication Plugin 13
7.9.2 JAAS Authentication Plugin 13
7.9.3 Custom Authentication Implementation 13
7.9.4 Authorization Plugin 13
8. 编程 14
8.1 开发JSM 的步骤 14
8.2 Session 14
8.3 Destination 14
知识积累
1. Jms消息:
1.1 PTP模型
1.1.1 点对点消息传递域的特点如下:
· 每个消息只能有一个消费者。
· 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发
送消息的时候是否处于运行状态,它都可以提取消息。
客户端用Session 创建MessageProducer 和MessageConsumer 对象。如果在Session 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收。
1.1.2 可靠性
队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。
1.2 PUB/SUB 模型
1.2.1 发布/订阅消息传递域的特点如下:
每个消息可以有多个消费者。
生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费
自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程
度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激
活状态时发送的消息。
JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作主题(topic)。
1.2.2 订阅
消息订阅分为非持久订阅(non-durablesubscription)和持久订阅(durablesubscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider 时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。
1.2.3 恢复和重新派送
非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重新派送一个未签收的消息。
1.2.4 可靠性(Reliability)
当所有的消息必须被接收,则用持久订阅模式。当丢失消息能够被容忍,则用非持久订阅模式。
1.3 JMS消息体格式
JMS 消息由以下三部分组成:
· 消息头。每个消息头字段都有相应的getter 和setter 方法。
· 消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。
· 消息体。JMS 定义的消息类型有TextMessage、MapMessage、BytesMessage、
StreamMessage 和 ObjectMessage。
1.4 消息的同步和异步接收
消息的同步接收是指客户端主动去接收消息,客户端可以采用 MessageConsumer的receive 方法去接收下一个消息。
消息的异步接收是指当消息到达时,ActiveMQ 主动通知客户端。客户端可以通过注册一个实现MessageListener 接口的对象到MessageConsumer。 MessageListener 只有一个必须实现的方法—— onMessage,它只接收一个参数,即Message。在为每个发送到Destination 的消息实现onMessage 时,将调用该方法。
1.5 消息选择器
JMS 提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。
生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。
这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。
消息选择器是用于 MessageConsumer 的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。
按照JMS 文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92 的子集。可以将消息选择器作为MessageConsumer 创建的一部分。
2. 基本可靠性机制
2.1 控制消息的签收(Acknowledgment)
对队列来说,如果当一个Session 终止时它接收了消息但是没有签收,那么ActiveMQ 将保留这些消息并将再次传送给下一个进入队列的消费者。
对主题来说,如果持久订阅用户终止时,它已消费未签收的消息也将被保留,直到再次传送给这个用户。对于非持久订阅,AtiveMQ 在用户Session 关闭时将删除这些消息。
如果使用队列和持久订阅,并且Session 没有使用事务,那么可以使用Session 的recover 方法停止Session,再次启动后将收到它第一条没有签收的消息,事实上,重启后Session 一系列消息的传送都是以上一次最后一条已签收消息的下一条为起点。如果这时有消息过期或者高优先级的消息到来,那么这时消息的传送将会和最初的有所不同。对于非持久订阅用户,重启后,ActiveMQ 有可能删除所有没有签收的消息。
2.2 指定消息传送模式
2.2.1 PERSISTENT(持久性消息)
这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
2.2.2 NON_PERSISTENT(非持久性消息)
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。
此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。
3. 高级可靠性机制
3.1 创建持久订阅
通过为发布者设置 PERSISTENT 传送模式,为订阅者时使用持久订阅,这样可以保证Pub/Sub 程序接收所有发布的消息。
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscription),非持久订阅只有当客户端处于激活状态,也就是和ActiveMQ 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向ActiveMQ 注册一个识别自己身份的ID,当这个客户端处于离线时,ActiveMQ 会为这个ID 保存所有发送到主题的消息,当客户端再次连接到ActiveMQ 时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。持久订阅会增加开销,同一时间在持久订阅中只有一个激活的用户。
3.1.1 建立持久订阅的步骤:
1. 为连接设置一个客户 ID;
2. 为订阅的主题指定一个订阅名称;
上述组合必须唯一。
3.2 使用本地事务
在事务中生成或使用消息时,ActiveMQ 跟踪各个发送和接收过程,并在客户端发出提交事务的调用时完成这些操作。如果事务中特定的发送或接收操作失败,则出现异常。客户端代码通过忽略异常、重试操作或回滚整个事务来处理异常。
本地事务的范围始终为一个会话。也就是说,可以将单个会话的上下文中执行的一个或多个生产者或消费者操作组成一个本地事务。
不但单个会话可以访问 Queue 或 Topic (任一类型的 Destination ),而且单个会话实例可以用来操纵一个或多个队列以及一个或多个主题,一切都在单个事务中进行。
这意味着单个会话可以(例如)创建队列和主题中的生产者,然后使用单个事务来同时发送队列和主题中的消息。
3.3 异步发送消息
ActiveMQ 支持生产者以同步或异步模式发送消息。使用不同的模式对send 方法的反应时间有巨大的影响,反映时间是衡量ActiveMQ 吞吐量的重要因素,使用异步发送可以提高系统的性能。
3.4 独占的消费者
ActiveMQ 维护队列消息的顺序并顺序把消息分派给消费者。但是如果建立了多个Session 和MessageConsumer,那么同一时刻多个线程同时从一个队列中接收消息时就并不能保证处理时有序。
有时候有序处理消息是非常重要的。ActiveMQ4 支持独占的消费。ActiveMQ 挑选一个MessageConsumer,并把一个队列中所有消息按顺序分派给它。如果消费者发生故障,那么ActiveMQ 将自动故障转移并选择另一个消费者。
4. 配置文件
ActiveMQ 配置文件:$AcrtiveMQ/conf/activemq.xml
5. 监控
在Windows 平台监控
进入%JAVA_HOME%/bin,双击jconsole.exe 即出现如下画面,在对话框中输入ActiveMQ 服务主机的地址,JXM 的端口和主机登陆帐号。
5.1 JXM 控制台(JDK1.5 控制台)监控ActiveMQ。
在使用JMX 监控broker 之前,首先要启用broker 的JMX 监控功能,例如在配置文件中设置useJmx="true"
5.2 Web Console 被集成到了ActiveMQ 的二进制发布包中,因此缺省访问
http://localhost:8161/admin 即可访问Web Console。
5.3 Command Agent 从4.2 版本起,ActiveMQ 支持Command Agent
6. 移植及集群
只需将$ACTIVEMQ_HOME 打包移植到新机器即可。
ActiveMQ 从多种不同的方面提供了集群的支持。
6.1 Queue consumer clusters
ActiveMQ 支持订阅同一个queue 的consumers 上的集群。如果一个consumer失效,那么所有未被确认 (unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer 的处理速度比其它 consumers 更快,那么这个consumer 就会消费更多的消息。
6.2 Broker clusters
一个常见的场景是有多个JMS broker,有一个客户连接到其中一个broker。
如果这个broker 失效,那么客户会自动重新连接到其它的broker。在ActiveMQ
中使 用failover:// 协议来实现这个功能。ActiveMQ3.x 版本的reliable://
协议已经变更为failover://。
如果某个网络上有多个brokers 而且客户使用静态发现(使用Static Transport 或Failover Transport)或动态发现(使用Discovery Transport),
那么客户可以容易地在某个broker 失效的情况下切换到其它的brokers。然而,
stand alone brokers 并不了解其它brokers 上的consumers,也就是说如果某
个broker 上没有consumers,那么这个broker 上的消 息可能会因得不到处理
而积压起来。目前的解决方案是使用Network of brokers,以便在broker 之间
存储转发消息。ActiveMQ 在未来会有更好的特性,用来在客户端处理这个问题。
从ActiveMQ1.1 版本起,ActiveMQ 支持networks of brokers。它支持分布
式的queues 和topics。一个broker 会相同对待所有的订阅(subscription):
不管他们是来自本地的 客户连接,还是来自远程broker,它都会递送有关的消
息拷贝到每个订阅。远程broker 得到这个消息拷贝后,会依次把它递送到其内
部的本地连接上。
6.3 Master Slave
在一个网络内运行多个brokers 或者stand alone brokers 时存在一个问题,这就是消息在物理上只被一个broker 持有,因此当某个broker 失效,那么你只能等待直到它重启后,这个 broker 上的消息才能够被继续发送(如果没有设置持久化,那么在这种情况下,消息将会丢失)。Master Slave 背后的想法是,
消息被复制到slave broker,因此即使master broker 遇到了像硬件故障之类
的错误,你也可以立即切换到slave broker 而不丢失任何消息。
Master Slave 是目前ActiveMQ 推荐的高可靠性和容错的解决方案
7. 特性
7.1 Exclusive Consumer
queue 中的消息是按照顺序被分发到consumers 的。然而,当你有多个consumers 同时从相同的queue 中提取消息时,你将失去这个保 证。因为这些消息是被多个线程并发的处理.
ActiveMQ 从4.x 版本起开始支持Exclusive Consumer (或者说Exclusive Queues)。 Broker 会从多个consumers 中挑选一个consumer 来处理queue 中所有的消息,从而保证了消息的有序处理。如果这个consumer 失效,那么broker会自动切换到其它的consumer。
7.2 Message Groups
用Apache 官方文档的话说,Message Groups rock!它是Exclusive Consumer功能的增强。逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer。跟所有的消息都由唯一的consumer 处理不同,JMS 消息属性JMSXGroupID 被用来区分message group。Message Groups 特性保证所有具有相同JMSXGroupID 的消息会被分发到相同的consumer(只要这个consumer 保持active)。另外一方面,Message Groups 特性也是一种负载均衡的机制。
7.3 Composite Destinations
从1.1 版本起, ActiveMQ 支持composite destinations。它允许用一个虚拟的destination 代表多个destinations。例如你可以通过composite destinations 在一个操作中同时向12 个queue 发送消息。在composite destinations 中,多个destination 之间采用","分割
如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://
7.4 Mirrored Queues
每个queue 中的消息只能被一个consumer 消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations来建立一个virtual queue 来把消息转发到多个queues 中。但是 为系统中每个queue 都进行如此的配置可能会很麻烦。
7.5 Strict Order Dispatch Policy
有时候需要保证不同的topic consumer 以相同的顺序接收消息。通常ActiveMQ 会保证topic consumer 以相同的顺序接收来自同一个producer 的消息。
然而,由于多线程和异步处理,不同的topic consumer 可能会以不同的顺序接收来自不同producer 的消息。例如有两个producer,分别是P 和Q。差不多是同一时间内,P 发送了 P1、P2 和P3 三个消息;Q 发送了Q1 和Q2 两个消息。两个不同的consumer 可能会以以下顺序接收到消息:
consumer1: P1 P2 Q1 P3 Q2
consumer2: P1 Q1 Q2 P2 P3
Strict order dispatch policy 会保证每个topic consumer 会以相同的顺序接收消息,代价是性能上的损失。
7.6 Message Transformation
有时候需要在JMS provider 内部进行message 的转换。从4.2 版本起,ActiveMQ 提供了一个MessageTransformer 接口用于进行消息转换
在消息被发送到JMS provider 的消息总线前进行转换。通过producerTransform 方法。
在消息到达消息总线后,但是在consumer 接收到消息前进行转换。通过consumerTransform 方法。
7.7 Transport
ActiveMQ 目前支持的transport 有:VM Transport、TCP Transport、SSL
Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport 等
7.8 Persistence
7.8.1 AMQ Message Store
是ActiveMQ5.0 缺省的持久化存储。
7.8.2 Kaha Persistence
是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。
7.8.3 JDBC Persistence
目前支持的数据库有Apache Derby, Axion, DB2, HSQL, Informix, MaxDB,MySQL, Oracle, Postgresql, SQLServer, Sybase
7.8.4 Disable Persistence
不应用持久化
7.9 Security
ActiveMQ 支持可插拔的安全机制,用以在不同的provider 之间切换。
7.9.1 Simple Authentication Plugin
适用于简单的认证需求,或者用于建立测试
环境。它允许在XML 配置文件中指定用户、用户组和密码等信息
7.9.2 JAAS Authentication Plugin
依赖标准的JAAS 机制来实现认证
7.9.3 Custom Authentication Implementation
可以通过编码的方式为ActiveMQ 增加认证功能。例如编写一个类继承自XBeanBrokerService。
7.9.4 Authorization Plugin
可以通过Authorization Plugin 为认证后的用户授权
8. 编程
8.1 开发JSM 的步骤
广义上说,一个 JMS 应用是几个JMS 客户端交换消息,开发JMS 客户端应用由以
下几步构成:
用 JNDI 得到ConnectionFactory 对象;
用 ConnectionFactory 创建Connection 对象;
用 Connection 对象创建一个或多个JMS Session;
用 JNDI 得到目标队列或主题对象,即Destination 对象;
用 Session 和Destination 创建MessageProducer 和MessageConsumer;
通知 Connection 开始传送消息。
8.2 Session
一旦从 ConnectionFactory 中获得一个Connection,就必须从Connection 中创建一个或者多个Session。Session 是一个发送或接收消息的线程,可以使用Session 创建MessageProducer,MessageConsumer 和Message。
Session 可以被事务化,也可以不被事务化,通常,可以通过向Connection 上的适当创建方法传递一个布尔参数对此进行设置。
Java 客户端:
ActiveMQConnection 方法:
Session createSession(boolean transacted, int acknowledgeMode);
其中 transacted 为使用事务标识,acknowledgeMode 为签收模式。
例如:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
8.3 Destination
Destination 是一个客户端用来指定生产消息目标和消费消息来源的对象。在 PTP 模式中,Destination 被称作Queue 即队列;在Pub/Sub 模式,Destination 被称作Topic 即主题。在程序中可以使用多个Queue 和Topic。
分享到:
相关推荐
**Spring与ActiveMQ搭建JMS开发系统示例详解** 在Java世界中,消息队列(JMS,Java Message Service)是一种标准,它定义了API来创建、发送、接收和读取消息,允许应用程序进行异步通信。Spring框架是Java开发中的...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它基于Java Message Service (JMS) API,为分布式系统提供高效、可靠的消息传递服务。在“activeMq代码记录(jmsspring+jmstest)”中,我们将探讨如何将...
ActiveMQ是Apache软件基金会开发的一个开源JMS提供者,它实现了JMS规范,提供了高效、可靠的的消息传递功能。在Java应用中,Spring框架被广泛用来管理依赖注入和简化业务逻辑处理。将ActiveMQ与Spring整合,可以方便...
ActiveMQ作为JMS提供者,Spring通过其JMS模块提供对JMS的全面支持,包括配置、模板以及监听容器。 整合ActiveMQ和Spring的第一步是引入所需的依赖。在Maven项目中,你需要在pom.xml文件中添加ActiveMQ和Spring相关...
Apache ActiveMQ是业界广泛使用的开源消息中间件,它遵循Java Message Service (JMS) 规范,提供高效、可靠的异步消息传递服务。在“activemqactivemq”这个主题下,我们主要关注如何使用ActiveMQ进行消息的接收和...
JMS(Java Message Service)是Java平台中用于与消息传递系统交互的标准API,而ActiveMQ是Apache软件基金会开发的一款开源消息代理,实现了JMS规范。本文将探讨如何利用Spring框架与ActiveMQ实现消息队列。 首先,...
Spring框架提供了一套完整的JMS(Java Message Service)支持,可以方便地与各种消息队列进行整合,ActiveMQ作为JMS的实现,能够帮助我们构建高可靠、高性能的消息系统。 **一、环境准备** 1. **安装ActiveMQ**: ...
ActiveMQ作为JMS的实现,可以通过Spring的配置来轻松集成,使得应用程序能够方便地发送和接收消息。 1. **Spring JMS配置** 在Spring配置文件中,我们需要定义一个`ConnectionFactory`,它是与消息服务器建立连接...
而ActiveMQ则是Apache软件基金会开发的一个开源消息代理,它实现了Java消息服务(JMS)规范,用于处理应用程序之间的异步通信。 标题“spring+activemq”暗示了我们将探讨如何将Spring框架与ActiveMQ集成,以便利用...
在Spring整合ActiveMQ的过程中,首先需要在项目的`pom.xml`文件中添加ActiveMQ和Spring JMS的依赖。这通常涉及到Maven的坐标技术,例如: ```xml <groupId>org.apache.activemq <artifactId>activemq-client ...
在本篇ActiveMQ学习笔记中,我们将探讨JMS(Java Message Service)与Spring框架的集成。JMS是一种标准API,用于在分布式环境中进行异步消息传递,而Spring框架则为开发人员提供了强大的依赖注入和管理服务的能力。...
public class MessageListener implements javax.jms.MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(...
接下来,我们需要配置Spring的ApplicationContext.xml文件,声明一个ActiveMQ的ConnectionFactory和一个JMS模板(JMSTemplate)。ConnectionFactory是创建连接到消息代理的工厂,而JMSTemplate则是Spring提供的发送...
在IT行业中,ActiveMQ是一个非常重要的消息中间件,它基于开放标准JMS(Java Message Service)规范,提供了高效、可靠的异步通信能力。SpringBoot框架则以其简洁的配置和快速的开发体验,成为了现代Java应用的首选...
总结来说,"activemq + spring"的主题涵盖了使用Java原生API连接ActiveMQ以及通过Spring框架实现更高级别的整合,包括利用Spring的JMS支持简化消息发送和接收。这些知识对于构建基于消息驱动的分布式系统,实现解耦...
### Spring+JMS+ActiveMQ+Tomcat 实现消息服务 #### 一、技术栈介绍 在本案例中,我们采用的技术栈为Spring 2.5、ActiveMQ 5.4.0 和 Tomcat 6.0.30。这些技术的结合能够有效地构建一个可靠的消息传递系统。 - **...
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; System.out.println("Received ...
其次,**ActiveMQ** 是Apache软件基金会开发的一个开源消息中间件,实现了Java消息服务(Java Message Service, JMS)标准。它允许应用程序通过消息传递进行异步通信,从而降低系统的耦合度,提高系统的可伸缩性和...
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage txtMsg = (TextMessage) message; System.out.println("Received message: " ...
Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置选项,使得在Spring应用中集成ActiveMQ变得简单。通过使用`JmsTemplate`类,我们可以方便地发送和接收消息。 1. **配置ActiveMQ**:在开始之前,我们...