简介
在前面的 一些文章里,我曾经对消息队列编程,以及消息队列通信方式做了一些总结。总的来说,那些示例是采用activemq服务器并且使用原生的代码来访问它们。在实际的代码实现中,采用原生api访问会显得非常的繁琐。因为我们要和里面一堆复杂的东西打交道,比如说Connection, ConnectionFactory, Session, Producer, Destination等等。这种复杂的代码结构非常容易出错而且也不容易关注于具体的业务逻辑。于是spring提供了一个jmsTemplate,可以在很大程度上简化它们。
比较
在使用spring开发具体示例前,我们先看一下一个简单发送消息到消息队列的示例:
public class Producer { private static String brokerURL = "tcp://localhost:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private transient MessageProducer producer; private static int id = 1000000; private String jobs[] = new String[]{"suspend", "delete"}; public Producer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Producer producer = new Producer(); producer.sendMessage(); producer.close(); } public void sendMessage() throws JMSException { Destination destination = session.createQueue("JOBS"); Message message = session.createObjectMessage(id); producer.send(destination, message); } }
在上述的代码里,我们需要定义消息队列服务器的连接地址,然后创建ConnectionFactory, connection, session等等。而真正发消息的动作呢,在sendMessage方法里就一个producer.send()方法。从真正使用者的角度来说,我们是希望将brokerURL, connection, session的创建等东西都封装和隐藏起来,通过提供参数让它们可以很好的配置。这样才能让我们的代码更加简洁。于是在这一点上,spring jmstemplate确实帮了不少忙。
依赖定义
Activemq服务器
为了运行示例,我们需要有一个activemq的服务器。activemq的下载地址如下:http://activemq.apache.org/download.html 下载到本地解压到某个目录。然后进入到bin目录下,运行命令:
./activemq start
这样activemq服务器就运行起来了。这个时候,服务器的默认brokerURL是: tcp://localhost:61616。在服务器运行起来之后,如果我们想要了解它的详细情况,可以通过一个web console来查看。在浏览器里输入如下地址: http://localhost:8161/admin 则可以看到如下的页面。
这是系统提示的登录页面,默认的用户名和密码都是admin。输入之后则可以看到如下的页面:
在图中的Queues和Topics这一栏中我们还可以看到运行时具体的Queue和Topic有哪些。针对具体activemq的配置和管理可以参考activemq的官方文档。这里就不再赘述。
工程依赖定义
在我们使用maven创建的工程里,主要依赖的类库内容如下:
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency>
其中activemq-all主要用来提供访问所有activemq的api,只是提供的都是原生的api。而引入的spring-jms类库则是在它们的基础上提供了的封装。在我们后面的示例里,因为对ConnectionFactory作了连接池封装,所以还额外引入了如下的两个依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.10.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.3</version> </dependency>
定义好了基础的依赖之后,剩下的就是针对消息的发送和接收进行讨论了。
发送消息
既然在前面的讨论里已经提到过,对于spring来说,它收发消息的核心就是jmsTemplate。那么首先应该看看它的配置是怎么样的。下面是一个典型的jmsTemplate配置文件内容:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location" value="classpath:datasource.properties"/> </bean> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.brokerURL}"/> <property name="userName" value="${jms.userName}"/> <property name="password" value="${jms.password}"/> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${jms.messageTopic}" /> </bean> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="${jms.messageTopic}" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="destination"/> </bean> <bean id="messageSender" class="com.yunzero.activemq.ActivemqMessageSender"> <constructor-arg index="0" ref="jmsTemplate"/> </bean> </beans>
我们针对前面的内容做一个解释。首先我们需要建立一个类型为org.apache.activemq.ActiveMQConnectionFactory的connectionFactory。这里它定义了我们需要访问的activemq服务器的url以及访问它需要的用户名和密码。
接着我们定义了两个bean, 一个是org.apache.activemq.command.ActiveMQQueue的destination。它表示对应的一个点对点通信的queue。而后面定义的destinationTopic对应的是采用广播方式通信的topic。我们知道,在jms通信的规范里有两种发送消息的方式,一种是基于点对点的queue的方式,主要用于一个发送者发消息给一个接收者的情况。另外一种则是基于广播的topic的方式,主要用于一个发送者发消息给若干个接收者。
接着就是我们要用到的关键部分,spring里面预先定义好了的jmsTemplate,它的类型是org.springframework.jms.core.JmsTemplate。它需要配置的两个主要属性分别就是connectionFactory和destination。这样,通过这个模板就已经解决了往哪个服务器的哪个地方发的问题了。剩下的就是我们定义的一个bean,它封装了jmsTemplate来发送消息。
有了这些配置,我们实际上使用它们的代码则非常简单。我们定义的ActivemqMessageSender的实现代码如下:
package com.yunzero.activemq; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component public class ActivemqMessageSender implements MessageSender { private JmsTemplate jmsTemplate; public ActivemqMessageSender(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Override public void sendMessage(String message) { jmsTemplate.convertAndSend(message); } }
在上述的代码里,实际上jmsTemplate发送消息的方法有若干个, 我们可以参照文档针对发送的不同类型消息来处理。在示例里我们仅仅是发送一个简单的字符串。
这里为了保证一定程度的松耦合,专门定义了一个接口MessageSender:
package com.yunzero.activemq; public interface MessageSender { void sendMessage(String message); }
接下来,我们尝试发送一个简单的消息出去:
public class App { public static void main( String[] args ) { ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("sample1.xml"); MessageSender sender = ctx.getBean(MessageSender.class); sender.sendMessage("Hello"); ctx.close(); } }
在执行完上面这部分代码之后,我们看消息队列服务器里面的消息,则会发现里面多了一个名字为test的队列。因为我们前面对应的配置文件里指定了连接配置消息服务器的信息:
jms.brokerURL=tcp://localhost:61616 jms.userName= jms.password= jms.messageTopic=test
在页面里,显示的内容如下图:
这里表示我们发送成功的一条消息已经保存在服务器了。我们剩下的就是需要有一个接收端来处理这个消息。
接收消息
同步接收消息
我们接收消息的方式其实也有两种方式。一种相当于同步接收消息的方式。这种方式就是利用jmsTemplate提供的receive方法。采用这种方式的一种最简单接收消息的方式如下:
public class ActivemqMessageReceiver implements MessageReceiver { private JmsTemplate jmsTemplate; public ActivemqMessageReceiver(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Override public String receiveMessage() { String result = (String)jmsTemplate.receiveAndConvert(); return result; } }
上述代码中接收消息的方法除了receiveAndConvert,还要其他的方法。具体可以查阅相关的文档。该类实现的接口如下:
public interface MessageReceiver { String receiveMessage(); }
同时,我们需要在配置文件里定义该bean对象,里面需要增加如下部分:
<bean id="messageReceiver" class="com.yunzero.activemq.ActivemqMessageReceiver"> <constructor-arg index="0" ref="jmsTemplate"/> </bean>
当然,我们可以根据文档提供的方法来对接收到的消息做各种处理和类型转换。
这种同步接收消息的方式比较有意思,因为我们在这里每调用它一次,它就接收一条消息。如果我们希望它保持监听某个队列的状态时,一般需要定义一个无限循环。
除了这种方式,还要一种就是消息监听的方式。相当于一个异步的消息处理。
异步接收消息
异步接收消息的方式在用纯jms的api时,我们也可以找到官方的介绍。无非就是定义一个实现MessageListener接口的类。在onMessage方法里实现接收到消息后的处理办法。在spring jms里也很简单。它需要做的改动有两个。一个是定义配置文件,一个典型的配置如下:
<jms:listener-container container-type="default" connection-factory="connectionFactory" acknowledge="auto" concurrency="10"> <jms:listener destination="${jms.messageTopic}" ref="messageListener" method="onMessage" /> </jms:listener-container> <bean id="messageListener" class="com.yunzero.activemq.ActivemqMessageListener"/>
在这里,我们引入了一个jms的命名空间。所以在定义文档的开头,我们需要引入它们的空间定义。文件定义头如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd">
在这里引入了相关的命名空间定义之后。我们需要的就是做一个接收消息的实现。这个实现和纯jms的基本上一样。也是实现MessageListener:
public class ActivemqMessageListener implements MessageListener { @Override public void onMessage(Message msg) { TextMessage textMsg = (TextMessage)msg; try { String message = textMsg.getText(); System.out.println(message); } catch (JMSException e) { e.printStackTrace(); } } }
而执行这部分的代码很简单。只要用applicationContext创建起来,它就相当于被注册了在保持一个运行的状态。完整的实现可以参看附件。
总结
Spring集成jms的过程其实并不复杂。它们交互的核心就是jmsTemplate。对于jmsTemplate的配置需要指定至少两个参数,一个是连接服务器的connectionFactory,另外一个则是发送的目的主题,比如是哪个quue还是topic。另外,消息交互的方式有两种,分别是基于quue和topic的。而接收消息也有两种方式,一种是基于jmsTemplate本身的同步接收消息方法,还要一种是注册事件通知的方式。它是一种异步的接收消息方式。在实际的情况我们可以根据具体业务的需要来选择。
参考材料
http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/core/JmsTemplate.html
相关推荐
1) 本工程主要演示在SPRING BOOT工程中怎样使用JMS集成IBM-MQ及TLQ两种消息中间件产品 2) 使用SPRING BOOT Conditional机制实现了两种产品按需加载,工程会根据配置文件开关动态加载 3) 实现了普通队列消息发送与...
本项目提供的下载资料可能包含Spring配置文件、Java源代码、测试脚本等,可以帮助学习者了解如何在实际项目中使用Spring与WebLogic JMS的集成。通过深入研究这些文件,你可以了解到如何在企业级环境中利用消息队列...
在本示例中,"jms简单demo"涵盖了两个方面:与Spring框架的集成以及不集成Spring的情况。 首先,让我们深入了解JMS。JMS定义了消息生产者(Producer)、消息消费者(Consumer)和消息代理(Message Broker)之间的...
Spring框架是一个广泛使用的Java应用开发框架,它提供了与多种消息中间件集成的能力,包括WebLogic Server的JMS服务。WebLogic是Oracle公司的一款企业级应用服务器,它支持JMS规范,提供了强大的消息队列和发布/订阅...
在实际应用中,`SpringJMS`可能包含以下示例代码片段: ```xml <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 这里配置具体的JMS提供商实现 -->...
Spring Boot可以通过自动配置与它们集成,只需要在`application.properties`或`application.yml`中添加适当的配置。 ### 7. Spring JMS配置 在Spring Boot应用中,通过`@EnableJms`注解可以启用JMS支持,并使用`...
Spring也提供了对JMS的全面支持,简化了在Spring应用中集成和使用JMS的过程。 在"JMS_Spring集成所需jar"的压缩包中,通常会包含以下关键的JMS和Spring相关的库文件: 1. **ActiveMQ** - 如果你看到有activemq相关...
《Spring技术内幕:深入解析Spring架构与设计原理(第2版)》这本书主要聚焦于Spring框架的核心架构和技术细节,帮助读者全面理解Spring的工作机制、设计理念以及实现方式。下面将根据书名及其描述来展开相关知识点。 ...
本文将深入探讨Spring-JMS的基础知识,包括它的核心概念、配置以及如何与ActiveMQ这样的消息中间件进行集成。 **1. JMS简介** Java消息服务(Java Message Service,简称JMS)是一个标准,定义了应用程序如何创建、...
本项目"jms整合spring工程"是一个已经准备就绪的Java工程,它展示了如何在Spring框架中集成JMS,以便利用消息队列进行通信。主要依赖的是Apache ActiveMQ,这是一款流行的开源JMS提供者,能够实现高效、可靠的实时...
在本文中,我们将深入探讨SpringJMS的基本概念、如何与ActiveMQ集成,以及如何通过示例代码理解其工作原理。 1. **SpringJMS简介** SpringJMS是Spring框架对JMS API的包装,它简化了生产者和消费者之间的消息通信...
Spring JMS可以与其他Spring模块无缝集成,如Spring MVC和Spring Data。例如,你可以在Web应用中使用JMS来处理后台任务,或者将消息队列作为数据持久化的一种方式。 **版本3.1.1.RELEASE特性** 在`3.1.1.RELEASE`...
Spring JMS(Java Message Service)是Spring框架的一部分,专门用于集成JMS消息传递系统,以实现异步通信和解耦应用程序组件。在这个入门级实例中,我们将探讨如何使用Maven、Spring和ActiveMQ来构建一个简单的...
IBM MQ JMS Spring组件该存储库包含有助于向Spring开发人员提供IBM MQ JMS软件包的轻松配置的代码。 该库包含: 用于应用程序的mq-jms-spring-boot-starter安装及使用该软件包的编译版本可以从Maven Central自动下载...
在本篇ActiveMQ学习笔记中,我们将探讨JMS(Java Message Service)与Spring框架的集成。JMS是一种标准API,用于在分布式环境中进行异步消息传递,而Spring框架则为开发人员提供了强大的依赖注入和管理服务的能力。...
Spring Boot 通过JMS方式集成TongLinkQ
Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的标准接口...通过分析和调试代码,你将能深入了解Spring如何与JMS进行集成,以及如何在实际应用中利用它们来实现可靠的异步通信。
以下是关于Spring与JMS集成的关键知识点: 1. **JMS基本概念**: - **消息**:数据传输的载体,包含消息头和消息体。 - **生产者**:创建并发送消息的应用。 - **消费者**:接收并处理消息的应用。 - **队列...
Spring与JMS的集成 Spring通过`org.springframework.jms`包提供对JMS的支持,包括消息生产者(MessageProducer)、消息消费者(MessageConsumer)以及事务管理等。主要涉及以下组件: - `ConnectionFactory`:这...
综上所述,Spring JMS 4.3.4.RELEASE为开发者提供了一套完整的JMS集成方案,通过抽象和封装JMS API,使得消息处理更加简单且易于维护。配合Spring Framework的其他模块,如Spring JDBC、Spring AOP等,可以构建出...