JMS 简介:
JMS的全称是Java Message Service,即Java消息服务。是JAVA平台中关于面向消息中间件的API.JMS提供了应用之间的异步通信机制。
JMS优点:
- 异步通信;当使用JMS发送消息时,客户端不必等待消息被处理;客户端只需要将消息发送给消息代理,就可以确保消息会被发送给相应的目的地。
- 面向消息和解耦;使用JMS进行异步通信,客户端不要要知道目的地的地址。
- 确保投递
消息的传递方式有:
- 点对点 (Point-to-Point PTP ):一条消息只能传递给一个客户端,也就是说只要有一个客户端消费了该条消息其他客户端都无法接受到该条消息。
- 发布/订阅(Publish/subscribe pub/sub):一条消息可以发送给多个客户端
PTP 类 javax.jms.Queue 和 pub/sub 类 javax.jms.Topic 都扩展 javax.jms.Destination 类。
消息的组成
消息传递系统的中心就是消息。一条 Message 分为三个组成部分 :
· 消息头( header ):是个标准字段集,客户机和供应商都用它来标识和路由消息。头信息包括:
JMSMessageID: 标识提供者发送的每一条消息 , 发送过程中由提供者设置
JMSDestination: 消息发送的 Destination, 由提供者设置
JMSDeliveryMode: 包括 DeliveryMode.PERSISTENT( 被且只被传输一次 ) 和
DeliveryMode.NON_PERSISTENT( 最多被传输一次 )
JMSTimestamp: 提供者发送消息的时间 , 由提供者设置
JMSExpiration: 消息失效的时间 , 是发送方法的生存时间和当前时间值的和 , 0 表明消息不会过期
JMSPriority: 由提供者设置 , 0 最低 , 9 最高
JMSCorrelationID: 用来链接响应消息和请求消息 , 由发送消息的 JMS 程序设置
JMSReplyTo: 请求程序用它来指出回复消息应发送的地方
JMSType: JMS 程序用来指出消息的类型
JMSRedelivered: 消息被过早的发送给了 JMS 程序 , 程序不知道消息的接受者是谁
· 消息属性( property ) :支持把可选头字段添加到消息。如果您的应用程序需要不使用标准头字段对消息编目和分类,您就可以添加一个属性到消息以实现这个编目和分类。提供 set<Type>Property(...) 和 get<Type>Property(...) 方法以设置和获取各种 Java 类型的属性,包括 Object 。 JMS 定义了一个供应商选择提供的标准属性集:
JMSXUserID: 发送消息的用户的身份
JMSXAppID: 发送消息的应用程序的身份
JMSXDeliveryCount: 尝试发送消息的次数
JMSXGroupID: 该消息所属的消息组的身份
JMSXGroupSeq: 该消息在消息组中的序号
JMSXProducerTxID: 生成该消息的事物的身份
JMSXConsumerTxID: 使用该消息的事物的身份
JMSXRcvTimestamp: JMS 将消息发送给客户的时间
· 消息体( body ):包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。 JMS 为不同类型的内容提供了它们各自的消息类型,但是所有消息都派生自 Message 接口。消息类型有下列几种:
StreamMessage :包含 Java 基本数值流,用标准流操作来顺序的填充和读取。
MapMessage :包含一组名 / 值对;名称为 string 类型,而值为 Java 的基本类型。
TextMessage :包含一个 String 。
ObjectMessage :包含一个 Serializable Java 对象;能使用 JDK 的集合类。
BytesMessage :包含未解释字节流 : 编码主体以匹配现存的消息格式。
ActiveMQMapMessage {commandId = 6, responseRequired = false, messageId = ID:zhangwei-52158-1422932380502-1:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:zhangwei-52158-1422932380502-1:2:1:1, destination = topic://notifyTopic, transactionId = TX:ID:zhangwei-52158-1422932380502-1:2:1, expiration = 0, timestamp = 1422932388705, arrival = 0, brokerInTime = 1422932388707, brokerOutTime = 1422932388710, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1a0c529, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQMapMessage{ theTable = {} }
ActiveMQ
ActiveMQ 是最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
下载ActiveMQ
首先去http://activemq.apache.org/download.html 下载稳定版本5.10.0
解压后目录如下:
+bin (windows下面的bat和unix/linux下面的sh)
+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
+data (默认是空的)
+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)
+example (几个例子
+lib (activemMQ使用到的lib)
-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)
-LICENSE.txt
-NOTICE.txt
-README.txt
-user-guide.html
启动ActiveMQ
可以使用bin\activemq.bat(activemq) 启动
使用JMS
JMS(Java Message Service) 可以提供应用的伸缩性,可以有效地避免服务被压垮。在Java EE 平台中,为了发送和接收JMS消息,必要的步骤有如下七步:
- 在一个消息代理上创建一个JMS连接工厂
- 从JMS连接工厂中打开一个JMS连接
- 创建一个JMS的目的地,可以是一个消息队列也可以是一个Topic
- 从连接中获取一个JMS会话
- 用消息生产者或者消息消费者发送或接收一个消息
- 处理JMSException
- 关闭JMS会话和连接
public class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
// 1创建 factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
// 2、创建factory
connection = connectionFactory.createConnection();
// 3、创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地
destination = session.createQueue(subject);
// 5.创建消息消生产者
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
/**
* 将传入的String 以TextMessage 格式发送出去
* @param message
* @throws JMSException
* @throws Exception
*/
public void produceMessage(String message) throws JMSException, Exception {
initialize();
// 创建文本消息
TextMessage msg = session.createTextMessage(message);
// 打开连接
connection.start();
// 发送消息
producer.send(msg);
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
监听JMS
public class ConsumerTool implements MessageListener {
/**
* JSM 用户
*/
private String user = ActiveMQConnection.DEFAULT_USER;
/**
* JSM 用户密码
*/
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
/**
* JSM 主题
*/
private String subject = "TOOL.DEFAULT";
/**
* 目的地
*/
private Destination destination = null;
/**
* 连接
*/
private Connection connection = null;
/**
* 会话
*/
private Session session = null;
/**
* 消息消费者
*/
private MessageConsumer consumer = null;
/**
* 初始化方法
* 1、通过user ,password,url 创建 connenctionFactory
* 2、通过connectionFactory 创建session
* 3、通过session创建目的地
* 5、创建指定主题消息的消费者
* @throws JMSException
* @throws Exception
*/
private void initialize() throws JMSException, Exception {
// 1、创建Factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
// 2、创建 connection
connection = connectionFactory.createConnection();
// 3、 创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建 目的地
destination = session.createQueue(subject);
// 5、创建消息消费者
consumer = session.createConsumer(destination);
}
/**
* 消费消息方法,向消息消费者添加消息到达监听器
* @throws JMSException
* @throws Exception
*/
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
// 增加消息监听器
consumer.setMessageListener(this);
}
/**
* close 关闭
* @throws JMSException
*/
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
/**
* 接收到消息:sendTo:13919306243;content:hello!
* 当接收到消息的处理方法。将接收到的信息按照指定格式截取组装成SMS消息发送出去。
*/
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
String[] msgs = FileUtil.splitMessage(msg);
try {
if (null != msgs && !msgs[1].equals("")) {
for (String s : FileUtil.buildContent(msgs[1])) {
SMSSender.getInstance().send(
new OutboundMessage(msgs[0], s));
}
}
} catch (GatewayException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
可以发现实现一个简单的发送消息需要许多编码。Spring提供了一个基于模板的的解决方案,用于简化JMS消息实现的代码
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cathy</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>3.1.4.RELEASE</spring.version>
<cxf.version>2.3.0</cxf.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
</dependency>
<!-- activeMq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.10.0</version>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId> org.aspectj</groupId>
<artifactId> aspectjweaver</artifactId>
<version> 1.6.11</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxws</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxrs</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-jetty</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-bindings-xml</artifactId>
<version>${cxf.version}</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>javax.xml.ws</groupId>
<artifactId>jaxws-api</artifactId>
<version>2.2.11</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:task="http://www.springframework.org/schema/task" xmlns:util="http://www.springframework.org/schema/util"
xmlns:jaxws="http://cxf.apache.org/jaxws"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://cxf.apache.org/jaxws
http://cxf.apache.org/schemas/jaxws.xsd
">
<aop:aspectj-autoproxy />
<context:annotation-config />
<context:component-scan base-package="com.cathy.demo.jms.*" />
<!-- connectionFactory -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!-- jmsTemplate -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="destination"/>
<property name="receiveTimeout" value="60000"/>
</bean>
<!-- 队列目的地 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="message.queue"/>
</bean>
<!-- 主题 -->
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="notifyTopic"/>
</bean>
</beans>
创建一个消息发送者,jmsTemplate 中可以设置默认的destination,如果发送的地址固定此处的属性可以省略,直接使用默认地址。
/**
*
* @author zhangwei_david
* @version $Id: ProducerImpl.java, v 0.1 2015年1月31日 下午8:25:36 zhangwei_david Exp $
*/
@Component
public class ProducerImpl implements Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination destination;
/**
*/
public void send() {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("key", "test");
return mapMessage;
}
});
}
}
同步接收消息,receive()方法会一直阻塞到消息到达为止
/**
*
* @author zhangwei_david
* @version $Id: ReceiverImpl.java, v 0.1 2015年1月31日 下午8:53:49 zhangwei_david Exp $
*/
@Component
public class ReceiverImpl implements Receiver {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination destination;
/**
* @see com.cathy.demo.jms.receiver.Receiver#receive()
*/
public void receive() {
MapMessage mapMessage = (MapMessage) jmsTemplate.receive(destination);
if (mapMessage != null) {
System.out.println(mapMessage);
}
}
}
/**
*
* @author zhangwei_david
* @version $Id: Sender.java, v 0.1 2015年1月31日 下午8:47:18 zhangwei_david Exp $
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath*:META-INF/spring/jms-beans.xml")
public class Sender {
@Autowired
private Producer producer;
@Autowired
private Receiver receiver;
@Test
public void testSend() {
producer.send();
}
@Test
public void testReceive() {
receiver.receive();
}
}
分享到:
相关推荐
通过配置`ConnectionFactory`、`JmsTemplate`和消息监听器,我们可以方便地实现消息的发送和接收,从而提高系统的可扩展性和解耦性。同时,Spring提供的事务管理和异步处理功能进一步增强了JMS的实用性。在实际项目...
WebLogic Server是一款由Oracle公司提供的企业级应用服务器,它支持Java Message Service (JMS) 规范,允许在分布式环境中可靠地发送和接收消息。JMS是Java平台上的标准接口,用于实现应用程序间的异步通信。本文将...
《Spring In Action:使用Spring发送和接收JMS消息》 在Java消息服务(Java Message Service,JMS)中,Spring框架提供了一种高效且灵活的方式来处理消息传递。JMS允许应用程序在分布式环境中发送和接收消息,它...
使用Spring-JMS发送消息** 通过`JmsTemplate`,可以方便地发送消息到消息队列或主题。例如: ```java JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.convertAndSend(queueName, ...
3. **创建消息模板**:Spring的`JmsTemplate`是发送和接收JMS消息的核心工具。在配置文件中创建一个`JmsTemplate`的bean,并注入连接工厂。 ```xml <bean id="jmsTemplate" class="org.springframework.jms.core....
接下来,JMS是Java平台的标准API,用于在分布式环境中发送和接收消息。JMS提供了一种方式,使得应用程序可以在不关心对方何时在线的情况下进行通信,增强了系统的可靠性和可扩展性。在这个实例中,我们可能会使用`...
模板类提供了发送消息、同步接收消息以及访问JMS会话和消息生产者的辅助方法。在需要更复杂操作时,模板类会将控制权交给用户实现的回调接口,如`MessageCreator`和`SessionCallback`,以实现自定义的行为。 `org....
在压缩包文件"shanhy-springjms"中,可能包含了以下内容: 1. `pom.xml`:Maven项目的配置文件,定义了项目依赖,包括Spring、JMS和JTA相关的库。 2. `src/main/java`:源代码目录,可能有Spring配置类、消息生产者...
在这个"spring-jms使用queue发送消息简单例子"中,我们将深入探讨如何使用Spring JMS与ActiveMQ结合,通过队列(Queue)来发送和接收消息。 首先,`pom.xml`文件是Maven项目的配置文件,它包含了项目所依赖的库。...
SpringJMS允许开发者捕获和处理JMS异常,例如消息无法发送或接收时,可以通过Spring的异常处理机制进行适当的响应。 10. **性能优化** 调整SpringJMS和ActiveMQ的配置,例如设置连接池大小、消息缓存等,可以优化...
在Spring框架中,我们可以使用`JMSTemplate`来简化JMS操作,包括发送和接收消息。 集成Spring和WebLogic 9.2发送JMS消息的步骤如下: 1. **配置JMS连接工厂**:在Spring的配置文件中,你需要定义一个JMS连接工厂。...
它们负责管理消息监听器的生命周期,并自动处理JMS会话和消息消费。开发者只需要实现`MessageListener`接口,即可专注于处理接收到的消息。 3. **事务支持**:Spring允许开发者将JMS操作与数据库事务结合,确保在...
在这个测试中,你可以看到如何使用Spring JMS API来发送和接收消息,验证ActiveMQ配置的正确性,以及消息传递的有效性。 总的来说,这个示例涵盖了以下关键知识点: 1. ActiveMQ的安装和运行。 2. Spring框架中JMS...
6. **发送和接收消息**:使用`JmsTemplate`可以方便地发送消息,而消息的接收则由监听器容器自动处理。 7. **事务管理**:Spring支持与JMS的事务整合,可以配置事务边界,确保消息发送与业务操作原子性。 8. **...
1. **JMS模板(JmsTemplate)**: 这是Spring提供的一个核心工具类,简化了发送和接收JMS消息的过程。它提供了同步和异步发送消息的方法,支持点对点(P2P)和发布/订阅(Pub/Sub)模型。 2. **MessageListener容器*...
在IT行业中,Spring框架是Java领域最常用的轻量级应用框架之一,而JMS(Java Message Service)则是Java平台上的消息中间件标准,用于应用程序之间的异步通信。本篇文章将详细探讨如何通过Spring框架整合JMS,特别是...
- 可以使用Spring的`JmsTemplate`进行发送消息的测试,或者通过WebLogic Server的管理控制台查看JMS资源的状态和消息队列。 通过以上知识,开发者可以构建一个基于Spring的Java应用,有效地利用WebLogic Server的...
Spring JMS提供了对JMS API的高度封装,简化了消息生产者和消费者的实现,同时也支持事务管理和消息确认机制,极大地提升了开发效率和代码的可维护性。 首先,我们来看看Spring JMS的核心组件。主要包括...
综上所述,Spring整合JMS和ActivemQ提供了一套完整的解决方案,帮助开发者轻松地在应用中实现消息的发送和接收。通过这种方式,可以构建出高可用、松耦合、可扩展的分布式系统,提高系统的稳定性和响应速度。在实际...