JMS 概要
JMS 是 SUN 公司开发的一套访问 MOM(Message-Oriented-Middleware) 消息服务中间件的标准 API
MON 提供消息接收和转发的服务 , 对消息进行缓存和持久操作 , 保证消息的安全性 ,JMS 让开发都无须了解远程过程调用的细节和网络通信协议的细节就可以通过 JMS 向 MOM 发送消息 , 借助消息我们可以松散耦合的方式集成不同的应用
JMS 异步通信
JMS 支持异步消息机制 , 消息生产者发送消息到 MOM, 消息消费者甚至可以不在线 ,MOM 会暂存消息 , 当消息消费者上后 ,MOM 会将消息发给消息消费者 , 消息生产者不必等消息消费者 , 这种异步消息机制在很多系统中是非常重要的
引用 TaskExecuter 实现消息异常传输
JMS 的两种消息类型
PTP (Point to Point) 点对点
Pub/Sub (Publish/Subscribe) 发布 / 订阅
PTP
PTP 生产的消息会放在一个队列中 , 消费者从队列中取走消息 , 消息一旦被取走 , 就会从队列中删除 , 多个观察都观察一个队列 , 但一个消息只能被一个消费都取走
Pub/Sub
用称为主题 (Topic) 的内容分层结构代替了 PTP 模型中的唯一的目的地 , 一个发布者可以将消息发布到某个主题下面 , 订阅了该主题的订阅者就可以收到这个消息 .
JMS 受管理的对象
在 JMS 之前 , 各 MOM 产品都提供了专有的 API 访问其产品 ,JMS 通过 MOM 产品为 Java 程序提供了一个发送消息接受消息的标准 , 便利的方法 , 用 JMS 写的应用可以在任何实现 JMS 标准的 MOM 产品上运行 .
各 MOM 产品实现技术和机制存在很大的差别 , 为保持 JMS 客户端的可移植性 , 实现了 JMS 接口的对象必须与 MOM 产品的专胡技术进行隔离 , 完成这项工作的机制是管理对象 .
这些 JMS 接口的对象由提供者消息系统管理员创建 , 并放置在 JNDI 的空间中 , 然后由 JMS 检索这些对象 , 通过 JMS 接口访问这些对象 ,JMS 提供者必须提供创建管理对象的方法 , 受管对象存放于 JNDI
两个重要的受管对象
ConnectionFacory 用于创建到提供者底层消息系统的连接
Destination 用于指定 JMS 客户端发送消息的目的地或接收消息的来源
JMS 程序只要知道受管对象的 JNDI 名称和 JMS 接口类型就可以了
JMS 的一些重要接口
JMS 定义了一系列的封装消息的高级接口 , 这些接口又分两个消息域
PTP 和 Pub/Sub javax.jms 包下的接口
ConnectionFactory
Connection
Destination
Session 发送消息或接收消息的单线程环境
MessageProducer
MessageConsumer
JMS 的两个版本 JMS1.1 JMS1.02
区别 :JMS1.1 将两种消息统一进行处理 , 透明地操作两种消息域
JMS1.02 区别对待两种消息域 , 针对每种消息域提供了高级接口的子接口
高级接口 PTP 域子接口 Pub/Sub 域子接口
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver QueueBrowser TopicSubscriber
JSM 程序需要基本步骤
1. 通过 JNDI 查找 ConnectionFactory
2. 通过 ConnectionFactory 创建一个 Connection
3. 用 Connection 创建一个或者多个 Session
4. 通过 JNDI 查询一个或多个 Destination
5. 用 Session 和 Destination 创建对应的 MessageProducer 或 MessageConsumer
6. 启动 Conneciton
7. 发送或接收消息
JMS 消息的结构
Header Properties Body 三部分
Header 消息头部
JMSMessageID String
JMSDestination destination
JMSDeliveryMode int 消息持久化设置 Persistent
JMSTimestamp long
JMSExpiration long
JMSPriority int
JMSCorrelationID int
JMSReplyTo String
JMSType Destination 请求程序用它来指定回复消息就发送的地方 , 由
发送消息的 JMS 程序设置
JMSRedelivered String
Message 接口提供了访问 Header 的 API
Properties 消息属性
JMSX 开头的是 JMS 专用的 ,JSM_ 开头的是提供者专用的
Body 消息内容
JMS 有一种消息类型 , 通过五个接口来实现
TextMessage 字符串
ObjectMessage 实现了 Seralizable 接口的对象
MapMessage 是一个 Map
BytesMessage 消息是一个二进制数组
StreamMessage 消息是一组原始数据类型 , 这此数据按标准进行操作 , 按顺序进行填充或读取
消息的收发机制
JMS 事务
JMS 使用 Session 控制事务 , 如何需要事务可以在创建 Session 时标注需要事务
消息确认
消息确认是收到消息发送一个回执确认收到消息
三种确认方式
Session.ATUO_ACKNOWLEDGE 接收到消息自动发送这一个确认
Session.CLIENT_ACKNOWLEDGE 调用 Meeage#acknowledge() 方法 , 显示发送确认
Session.DUPS_OK_ACKNOWLEDGE 延时发送消息 , 但可能有重复接收消息的问题
消息选择 ( 消息过滤 )
选择条件可以通过 Header 和 Properties 进行匹配度设置
MessageConsumer consumer=session.createConsumer(destination,”JMSType=’car’”);
JMSType 和 color weigh 属性
“JMSType=’car’ AND color=’blue’ AND weigh>2500”
JMSCorrelationID=’12312312’ AND phone LIKE ‘12%e3’”;
Phone IS NOT NULL AND age>23
发送消息
MessageSender.java 消息发送器
package cn.com.snt.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class MessageSender extends ApplicationContextWare{
private ConnectionFactory factory=null;
private Destination dest=null;
public void setFactory(ConnectionFactory factory) {
this.factory = factory;
}
public void setDest(Destination dest) {
this.dest = dest;
}
public void send(String msgText){
Connection connection=null;
try {
// ApplicationContext ctx = new FileSystemXmlApplicationContext("src/cn/com/snt/jms/applicationContext.xml");
// ConnectionFactory factory=(ConnectionFactory) ctx.getBean("connectionFactory");
// Destination dest=(Destination)ctx.getBean("dest");
System.out.println(factory+" "+dest);
// Context ctx=new InitialContext();
// ConnectionFactory factory=(ConnectionFactory)ctx.lookup("jndi/jmsConn");
// Destination dest=(Destination)ctx.lookup("jndi/dest");
// ctx.close();
// 创建一个到 MOM 的连接
connection=factory.createConnection();
connection.start();
// 创建一个会话实例 , 不使用事务 , 采用自动确认消息的机制
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个指向特定目标地址的消息发送者
MessageProducer sender=session.createProducer(dest);
TextMessage message=session.createTextMessage(msgText);
sender.send(message);
System.out.println("sender success!");
}catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
((MessageSender)(getApplicationContext().getBean("messageSender"))).send("name:lilp;gender:male");
}
}
ApplicationContextWare.java 为了使用消息发送者和接收者同用一个 IoC 容器
package cn.com.snt.jms;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class ApplicationContextWare {
private static ApplicationContext ctx;
static{
ctx= new FileSystemXmlApplicationContext("src/cn/com/snt/jms/applicationContext.xml");
}
public static ApplicationContext getApplicationContext(){
return ctx;
}
}
MessageReceiver.java
package cn.com.snt.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class MessageReceiver extends ApplicationContextWare implements MessageListener {
private ConnectionFactory factory=null;
private Destination dest=null;
public void setFactory(ConnectionFactory factory) {
this.factory = factory;
}
public void setDest(Destination dest) {
this.dest = dest;
}
public void reveive(){
Connection connection=null;
try {
// Context ctx=new InitialContext();
// // 从 JNDI 中获取 ConnectionFactory
// ConnectionFactory factory=(ConnectionFactory)ctx.lookup("jndi/jmsConn");
// // 从 JNDI 中获取消息发送目标地
// Destination dest=(Destination)ctx.lookup("jndi/dest");
// ctx.close();
// ApplicationContext ctx = new FileSystemXmlApplicationContext("src/cn/com/snt/jms/applicationContext.xml");
// ConnectionFactory factory=(ConnectionFactory) ctx.getBean("connectionFactory");
// Destination dest=(Destination)ctx.getBean("dest");
System.out.println(factory+" "+dest);
// 创建一个到 MOM 的连接
connection=factory.createConnection();
connection.start();
// 创建一个会话实例 , 该会话不使用事务 , 采用自动确认消息的机制
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个指向一个特定目标地址的消息消费者
MessageConsumer receiver=session.createConsumer(dest);
receiver.setMessageListener(this);
}catch (JMSException e) {
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
TextMessage textMsg=(TextMessage)message;
System.out.println("MESSAGE:"+textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
((MessageReceiver)(getApplicationContext().getBean("messageReceiver"))).reveive();
}
}
两个受管对象可以从 JNDI 服务器中获取也可以在 IoC 容器中获取 , 上面使用的 Spring 的 IoC 容器
applicationContext.xml 文件内容如下 :
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
xmlns:amq = "http://activemq.org/config/1.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd" >
< bean id = "connectionFactory" class = "org.apache.activemq.pool.PooledConnectionFactory"
destroy-method = "stop" >
< property name = "connectionFactory" >
< bean class = "org.apache.activemq.ActiveMQConnectionFactory" >
< property name = "brokerURL" value = "tcp://127.0.0.1:61616" />
</ bean >
</ property >
</ bean >
< bean id = "dest" class = "org.apache.activemq.command.ActiveMQQueue" >
< constructor-arg value = "TOOL.DEFAULT" />
</ bean >
< bean id = "messageReceiver" class = "cn.com.snt.jms.MessageReceiver" >
< property name = "factory" ref = "connectionFactory" />
< property name = "dest" ref = "dest" />
</ bean >
< bean id = "messageSender" class = "cn.com.snt.jms.MessageSender" >
< property name = "factory" ref = "connectionFactory" />
< property name = "dest" ref = "dest" />
</ bean >
</ beans >
上面采用的 PTP 消息域类型 , 也可以采用 Pub/Sub 模式 , 发送什么类型的消息由 Destination 决定的
如果是 JMS1.02 需要区别对待 PTP 域和 Pub/Sub 域的消息 , 使用 QueueSender 发送 PTP 消息 , 并且目标地址为 Queue, 使用 TopicPublisher 发送 Pub/Sub 域消息 , 目标地址必须是 Topic 对象
使用 QueueReceiver 接收 PTP 域消息 , 地址通过 Queue 指定 , 使用 TopicSubscriber 接收 Pub/Sub 域的消息 , 地址通过 Topic 指定
JMS 接收消息的两种方式
同步接收与异步接收消息
使用消息接收器的 receive() 方法接收方式是同步接收的方式 , 当 receiver() 方法接收不到消息时 , 一直会阻塞 . 可能 receiver(long ) 指定等待多长时间 , 接收不到消息就返回 null,0 永不超时 , 也可以使用 receiverNoWait() 不等待
在实际的应用 , 异常消息接收更有意义 , 可以通过 注册 MessageListener 监听器 , 实现消息异常接收 , 上面的例子就是采用异步接收消息的方式实现的 .
JMS 的提供者 ActiveMQ 默认工作在 61616 商品上
JBOSS Mesaging
相关推荐
JMS入门文档通常会包含以下几个关键知识点: 1. **JMS基本概念**: - **消息**:JMS中的基本单元,它是一个数据对象,包含要传递的信息。 - **消息生产者**(Message Producer):负责创建和发送消息的组件。 - ...
下面我们将详细探讨JMS的基础知识、核心概念以及入门项目可能涉及的方面。 1. **JMS基本概念** - **消息**:在JMS中,数据通过消息进行传递。消息包含要传输的数据,以及描述数据的元数据。 - **消息生产者**:...
Java消息服务(Java Message Service,简称...例如,Test文件可能包含了JMS的入门示例代码,展示了如何创建消息、设置消息代理、发送和接收消息的过程。通过学习和实践这些示例,开发者能更好地理解和掌握JMS的使用。
Spring-JMS是Spring框架的一部分,专门用于处理Java消息服务(JMS)的集成。它提供了一个简单的API,使得开发者能够方便地在应用中使用消息传递功能。本文将深入探讨Spring-JMS的基础知识,包括它的核心概念、配置...
总的来说,JMS入门实例结合WebLogic的安装和配置,为初学者提供了一个了解和实践Java消息服务的完整流程。通过这个过程,开发者可以学习到如何在WebLogic这样的企业级应用服务器上构建可靠的消息传递系统,以及如何...
**JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和阅读消息的应用程序接口。它为应用程序提供了标准的接口,可以跨越多种消息中间件产品进行通信。...
【JMS入门】这篇文章主要介绍了Java消息服务(Java Message Service,简称JMS)的基本概念和如何使用开源的JMS服务器OpenJMS进行实践操作。JMS是一种标准接口,用于应用程序之间的异步通信,特别是在分布式环境中,...
该属性值默认为false,这样JMS在进行消息监听的时候就会进行事务控制,当在接收消息时监听器执行失败时JMS就会对接收到的消息进行回滚, 对于SessionAwareMessageListener在接收到消息后发送一个返回消息时也处于...
【WebLogic Server 8.1 JMS 入门】 WebLogic Server 8.1 是一款符合J2EE 1.3规范的中间件,其中包含了WebLogic JMS Server,它是基于Java Message Service (JMS) 的实现,允许Java应用程序进行可靠的消息交换。JMS...
**JMS(Java Message Service)入门** JMS是Java平台中用于消息传递的一个标准接口,它定义了在Java应用程序之间交换消息的API。通过JMS,开发者可以创建、发送、接收和阅读异步消息,这对于分布式系统中的解耦和...
本教程将深入探讨如何将这两个强大的工具结合在一起,以创建一个简单的发送JMS消息的入门实例。 首先,我们需要理解ActiveMQ的基本概念。ActiveMQ是Apache软件基金会开发的一个开源消息代理,它实现了JMS规范,提供...
在"JMS入门级的蹩脚篇.ppt"这个文件中,可能包含了以下内容:JMS的基本概念解释,如何创建消息,如何设置消息队列和主题,如何编写生产者和消费者代码示例,以及如何配置和运行JMS应用程序。这些内容对于初学者理解...
在这个"spring jms tomcat 异步消息传递入门实例"中,我们将探讨如何在Spring框架下利用JMS和Tomcat实现异步消息传递,以提高系统性能和可扩展性。 首先,理解异步消息传递的概念至关重要。在同步通信中,发送方...
本教程将带你逐步了解如何利用Tomcat、Spring和JMS(Java Message Service)构建一个简单的异步消息传递入门实例。 首先,让我们来理解一下核心组件: 1. **Tomcat**:这是一个流行的开源Java Servlet容器,用于...
理解如何创建消息对象,设置其属性,并通过Session和Destination接口进行发送和接收是JMS入门的关键。 2. **网关设置** 实例中可能涉及到的消息网关配置,如ActiveMQ,是JMS实现的核心组件。ActiveMQ是Apache软件...
Java消息服务(Java Message ..."JMS简明教程.pdf"这份文档应该详细介绍了这些概念,并可能包括实例代码和最佳实践,可以帮助初学者快速入门JMS。学习JMS不仅可以提升你的专业技能,也有助于解决复杂的企业级问题。