- 浏览: 338275 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
右手边的你:
你确定你测试过batch是没问题的???
使用DbUtils进行数据库操作 -
电点mxn:
...
使用DbUtils进行数据库操作 -
sljackson:
顶
Spring事务管理 -
geyaandy:
...
Oracle行列互换 横表和纵表 -
weizou_china:
Request URL:http://localhost:80 ...
Spring mvc3的ajax
转自http://www.blogjava.net/Unmi/archive/2010/04/10/317947.html
JMS(Java Message Service) 是 Java 为面向消息中间件(MOM)定义的接口。JMS 的通信管道就是消息队列,说到消息队列,历史就悠久,在 MS 系统中很早就有 MSMQ,譬如邮件、群组就是些消息队列。JMS 因其异步,所以可用来解决高并发的问题,分布式可对负载进行均衡。
JMS 已成为 J2EE 规范中的一部分,所以在 J2EE 应用服务器中都有 JMS 核心部分 MQ 的实现,MQ 也有独立的产品,如 ActiveMQ、JBoss MQ(已更名为 JBoss Messaging)、WebSphere MQ 等。
如果我们蒙着头来理解,JMS 消息通信中的主要角色应该有:消息生产者(Producer)、消息消费者(Consumer)、它们间的消息队列(Queue)、以及所传送的消息(Message)。
由于 JMS 有两种通信模式:端到端(P2P) 和主题/订阅模式,所以还有个角色就是主题(Topic)。通信中还需要处理诸如连接(Connection),面向连接就会产生会话(Session),而连接一般都是通过连接工厂(Connection Factory) 来获得。
而 MQ 中间件的存在,使得端与端之间不需要直接相连来建立队列,且对于主题/订阅模式更是不可能,所以消息生产者和消费者它们都是指向到 MQ 中间件上的,它们在 MQ 上所指向的队列或主题被抽像为目的地(Destination),对于消息生产者称之为目的地可以理解,但作为消费者来说也叫做目的地中文描述上有些欠妥,谓之消息来源地(Source) 较好理解。还有,因为是异步的消息通信,所以就要注册消息监听器(MessageListener)
通过上面的理解,我们把 JMS 中所有的角色都串联起来了,我们在编程中要处理的基本就那些角色(对象),下面是 JMS 所定义的所有接口关系图(异常接口类未列于其中)。
现在来说说 JMS 中那两种通信模式的区别。首先来看最简单的 P2P 或队列模型。它就像我们邮件发送那样的方式,P 用户发往 C 用户的邮件只有 C 能收到,P 可以在多个邮件客户端发邮件给 C,C 也可以开多个邮件客户端来接收,某一个接收端收取了邮件 M 的话,则另一个接收端就收不到邮件 M 了。腾讯的 Foxmail 默认行为除外,它收完邮件后还会在服务器端保留,经常造成公司邮箱占用过大。用个图来描述:
这种 P2P 的通信息方式应该是数据传输在实际中可能不需要经过 MQ 服务器,而是直接在两个客户端间进行。
只有一个消费者将获得消息
生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
每一个成功处理的消息都由接收者签收
另一种传送模型是 主题发布/订阅模型。这种方式就类似于邮件列表,一般用户会就某个主题加入邮件列表,即订阅了该主题,当就该主题发布的信息,其它订阅者就都能收到,而且接收方之间是不受影响的。用个图来揣测一下 MQ Server 对该种模型的实现方式:
其实在 ActiveMQ 的管理控制台的 Topics 标签页中,也还是可以看到存在 Queue 的概念,这取决于 JMS 产品针对 Pub/Sub 模型的实现行为上。
MQ 除了完成上面消息转发或分发的任务之外,有时候称这个为目的地管理器 (DestinationManager),可以认为是 MQ 的的核心,除此之外还要负责消息缓存、状态、持久化、事物、安全性的管理。消息的持久性还是一项很重要的服务,消费方未启动时,假如有消息到来,消费方再次连接也可以接收到消息,即使 MQ 重启后消息也不会丢失。
MQ 产品一般还支持集群,以及与 MSMQ 或其他 MQ 产品桥接起来,也允许用其他语言编写客户端程序。
最后来看下以上两种消息传送模型中,消息生产者与消费者实现的主要步骤,两种模型的步骤差不多,只是一个是创建队列(Queue),一个是创建主题(Topic)。
消息生产者实现主要步骤:
上面代码全部是用最顶层的接口类型来引用的变量,实际应用中会用具体类型来引用变量,以使用更方便的方法。如直接用到 TopicPublisher、TopicSender、QueueSession、QueueConnection 等。而且还可能直接用某个 MQ 产品特定的实现类,如 ActiveMQConnection、等。如果为了便于切换不同的 MQ 产品,当然用最上层的接口去引用类型,但不得不要用到某个 MQ 产品的有利特性的时候,程序代码与该 MQ 产品存在这种高耦合也是不可避免的。
下面我么借助Mom4J来实现JMS
Spring提供JmsTemplate,简化JMS操作
大多数时候,除了简单的TextMessage外,需要发送的消息都应该被封装到Java类中,Spring提供了一个MessageConverter接口,方便实现Java类和JMS消息的转化
JMS(Java Message Service) 是 Java 为面向消息中间件(MOM)定义的接口。JMS 的通信管道就是消息队列,说到消息队列,历史就悠久,在 MS 系统中很早就有 MSMQ,譬如邮件、群组就是些消息队列。JMS 因其异步,所以可用来解决高并发的问题,分布式可对负载进行均衡。
JMS 已成为 J2EE 规范中的一部分,所以在 J2EE 应用服务器中都有 JMS 核心部分 MQ 的实现,MQ 也有独立的产品,如 ActiveMQ、JBoss MQ(已更名为 JBoss Messaging)、WebSphere MQ 等。
如果我们蒙着头来理解,JMS 消息通信中的主要角色应该有:消息生产者(Producer)、消息消费者(Consumer)、它们间的消息队列(Queue)、以及所传送的消息(Message)。
由于 JMS 有两种通信模式:端到端(P2P) 和主题/订阅模式,所以还有个角色就是主题(Topic)。通信中还需要处理诸如连接(Connection),面向连接就会产生会话(Session),而连接一般都是通过连接工厂(Connection Factory) 来获得。
而 MQ 中间件的存在,使得端与端之间不需要直接相连来建立队列,且对于主题/订阅模式更是不可能,所以消息生产者和消费者它们都是指向到 MQ 中间件上的,它们在 MQ 上所指向的队列或主题被抽像为目的地(Destination),对于消息生产者称之为目的地可以理解,但作为消费者来说也叫做目的地中文描述上有些欠妥,谓之消息来源地(Source) 较好理解。还有,因为是异步的消息通信,所以就要注册消息监听器(MessageListener)
通过上面的理解,我们把 JMS 中所有的角色都串联起来了,我们在编程中要处理的基本就那些角色(对象),下面是 JMS 所定义的所有接口关系图(异常接口类未列于其中)。
现在来说说 JMS 中那两种通信模式的区别。首先来看最简单的 P2P 或队列模型。它就像我们邮件发送那样的方式,P 用户发往 C 用户的邮件只有 C 能收到,P 可以在多个邮件客户端发邮件给 C,C 也可以开多个邮件客户端来接收,某一个接收端收取了邮件 M 的话,则另一个接收端就收不到邮件 M 了。腾讯的 Foxmail 默认行为除外,它收完邮件后还会在服务器端保留,经常造成公司邮箱占用过大。用个图来描述:
这种 P2P 的通信息方式应该是数据传输在实际中可能不需要经过 MQ 服务器,而是直接在两个客户端间进行。
只有一个消费者将获得消息
生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
每一个成功处理的消息都由接收者签收
另一种传送模型是 主题发布/订阅模型。这种方式就类似于邮件列表,一般用户会就某个主题加入邮件列表,即订阅了该主题,当就该主题发布的信息,其它订阅者就都能收到,而且接收方之间是不受影响的。用个图来揣测一下 MQ Server 对该种模型的实现方式:
其实在 ActiveMQ 的管理控制台的 Topics 标签页中,也还是可以看到存在 Queue 的概念,这取决于 JMS 产品针对 Pub/Sub 模型的实现行为上。
MQ 除了完成上面消息转发或分发的任务之外,有时候称这个为目的地管理器 (DestinationManager),可以认为是 MQ 的的核心,除此之外还要负责消息缓存、状态、持久化、事物、安全性的管理。消息的持久性还是一项很重要的服务,消费方未启动时,假如有消息到来,消费方再次连接也可以接收到消息,即使 MQ 重启后消息也不会丢失。
MQ 产品一般还支持集群,以及与 MSMQ 或其他 MQ 产品桥接起来,也允许用其他语言编写客户端程序。
最后来看下以上两种消息传送模型中,消息生产者与消费者实现的主要步骤,两种模型的步骤差不多,只是一个是创建队列(Queue),一个是创建主题(Topic)。
消息生产者实现主要步骤:
//1. 由ConnectionFactory 创建连接,一般 ConnectionFactory 从 JNDI 中获得 Connection connection = connectionFactory.createConnection(); //2. 创建 Session, Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //3. P2P 中创建 Destination, 这里创建了一个 Queue,队列名为 "Hello.Unmi" Destination destination = session.createQueue("Hello.Unmi"); //实际应用中队列是从 JNDI 中获得 //3. Sub/Pub 模型时,创建 Destination, 创建了一个 Topic,主题为"Unmi.Learn.ActiveMQ" Destination destination = session.createTopic("Unmi.Learn.ActiveMQ"); //4. 创建 Producer, MessageProducer producer = session.createProducer(destination); //5. 创建 Message,这里创建的是一个文本消息,可创建多种类型的消息 Message message = session.createTextMessage("Hello JMS Sended."); //6. 发送消息 producer.send(message); 消息消费者实现主要步骤: //1. 创建 Connection,//ActiveMQConnection 实现了 QueueConnection, TopicConnection Connection connection = connectionFactory.createConnection(); //2. 创建 Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //3. 创建 Destination, 一个 Queue,队列名与上同,这样就能接收到前面生产者发来的消息 Destination destination = session.createQueue("Hello.Unmi");//实际应用中队列是从 JNDI 中获得 //3. Sub/Pub 模型时,创建 Destination, 一个 Topic,主题名同上,可接上前面发布的消息 Destination destination = session.createTopic("Unmi.Learn.ActiveMQ"); //实际应用中队列是从 JNDI 中获得 //4. 创建 Consumer MessageConsumer consumer = session.createConsumer(destination); //5. 注册消息监听器,当消息到达时被触发并处理消息,也可阻塞式监听 consumer.receive() consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { //Do something with the message. } });
上面代码全部是用最顶层的接口类型来引用的变量,实际应用中会用具体类型来引用变量,以使用更方便的方法。如直接用到 TopicPublisher、TopicSender、QueueSession、QueueConnection 等。而且还可能直接用某个 MQ 产品特定的实现类,如 ActiveMQConnection、等。如果为了便于切换不同的 MQ 产品,当然用最上层的接口去引用类型,但不得不要用到某个 MQ 产品的有利特性的时候,程序代码与该 MQ 产品存在这种高耦合也是不可避免的。
下面我么借助Mom4J来实现JMS
public final class Mom4jUtil { public static void startJmsServer() throws Exception { File f = new File("durable.dat.lck"); f.delete(); Mom4jFactory.start(new CustomConfig(), true); System.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.mom4j.jndi.InitialCtxFactory"); System.setProperty(Context.PROVIDER_URL, "xcp://localhost:8001"); } } class CustomConfig implements Mom4jConfig { public int getPort() { return 4444; } public int getAdminPort() { return 8888; } public int getJndiPort() { return 8001; } public int getThreadCount() { return 3; } public int getSyncInterval() { return 1000; } public int getAsyncInterval() { return 5000; } public List getUsers() { return new ArrayList(); } public List getContextHandlers() { return new ArrayList(); } public File getMessageStore() { File dir = new File("store/"); dir.mkdirs(); return dir; } public File getDurablesStore() { return new File("durable.dat"); } @SuppressWarnings("unchecked") public List getDestinations() { List list = new ArrayList(1); list.add(new Mom4jDestination() { public String getName() { return "jms/queue"; } public int getType() { return Mom4jDestination.QUEUE; }}); return list; } } public class Sender extends Thread { public void run() { try { Context ctx = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) ctx.lookup("QueueConnectionFactory"); // Destination 即消息的目的地,消息发到何处 Destination destination = (Destination) ctx.lookup("jms/queue"); for (int i = 0;; i++) { Connection connection = null; try { connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); String text = "Hello, it is " + new Date(); System.out.println(" Send: " + text); Message message = session.createTextMessage(text); producer.send(message); producer.close(); session.close(); if(i%10 ==0) TimeUnit.SECONDS.sleep(10); } catch (JMSException e) { throw new RuntimeException(e); } finally { if (connection != null) connection.close(); } TimeUnit.MILLISECONDS.sleep(200); } } catch (NamingException e) { throw new RuntimeException(e); } catch (JMSException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } } } public class Receiver extends Thread implements MessageListener { public void run() { try { Context ctx = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) ctx.lookup("QueueConnectionFactory"); // Destination 即消息的来源地,从何处接受消息 Destination destination = (Destination) ctx.lookup("jms/queue"); Connection connection = null; try { connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this); connection.start(); Thread.sleep(20000); } catch (JMSException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { if (connection != null) connection.close(); } } catch (NamingException e) { throw new RuntimeException(e); } catch (JMSException e) { throw new RuntimeException(e); } } // MessageListener 的实现类是线程安全的 public void onMessage(Message message) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { throw new RuntimeException(e1); } if (message instanceof TextMessage) { TextMessage text = (TextMessage) message; try { System.out.println("Receive: " + text.getText()); } catch (JMSException e) { e.printStackTrace(); } } } } public class Main { public static void main(String[] args) throws Exception { Mom4jUtil.startJmsServer(); new Sender().start(); new Receiver().start(); } }
Spring提供JmsTemplate,简化JMS操作
<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="QueueConnectionFactory" /> </bean> <bean id="jmsQueue" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="jms/queue" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="defaultDestination" ref="jmsQueue" /> </bean> <bean id="sender" class="example.chapter9.Sender"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean> <bean id="receiver" class="example.chapter9.Receiver" /> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="jmsQueue" /> <property name="messageListener" ref="receiver" /> </bean>
public class Sender { private JmsTemplate jmsTemplate; public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void send(final String text) { System.out.println(" Send: " + text); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(text); } }); } } public class Receiver implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage text = (TextMessage) message; try { System.out.println("Receive: " + text.getText()); } catch (JMSException e) { } } } }
大多数时候,除了简单的TextMessage外,需要发送的消息都应该被封装到Java类中,Spring提供了一个MessageConverter接口,方便实现Java类和JMS消息的转化
发表评论
-
Spring异常备忘录
2010-09-09 09:32 2811引用org.springframework.beans.fac ... -
Spring mvc3的ajax
2010-08-06 19:44 29303在 Spring mvc3中,响应、接受 JSON都十分方便。 ... -
spring mvc
2010-08-05 21:55 0package demo.spring.mvc; impor ... -
集成任务调度服务
2010-06-09 23:36 1200JDK中任务调度核心是Timer类的schedule方法,传递 ... -
Spring与axis结合调用 WebService
2010-06-09 21:49 2648在这篇文章中http://xace.iteye.com/blo ... -
Spring与Xfire结合发布WebService
2010-06-08 23:17 5830Web服务的发布者首先必 ... -
Spring发送邮件
2010-06-08 21:58 2011JavaMail API为Java提供了邮件发送和接受服务,支 ... -
Spring实现RMI调用
2010-06-07 23:19 3481传统的实现RMI,需要 1.服务接口必须从Remote派生,每 ... -
Spring事务管理
2010-06-04 23:47 5069事务的ACID特性: Atomic ... -
Spring数据访问策略
2010-02-19 15:40 2867Spring封装的数据访问异常 与SQLException是一 ... -
Spring AOP
2010-02-18 20:28 1496AOP的实现原理 编译期 ... -
Spring IOC 笔记
2010-02-07 21:11 1469依赖注入的3种方式 接 ...
相关推荐
SpringJMS是Spring框架的一部分,它提供了一种与Java消息服务(JMS)...通过理解以上知识点,并结合提供的`demo`代码,你可以更好地掌握SpringJMS与ActiveMQ的集成,从而在实际项目中构建高效、可靠的消息传递系统。
在本篇ActiveMQ学习笔记中,我们将探讨JMS(Java Message Service)与Spring框架的集成。JMS是一种标准API,用于在分布式环境中进行异步消息传递,而Spring框架则为开发人员提供了强大的依赖注入和管理服务的能力。...
本文将深入探讨Spring-JMS的基础知识,包括它的核心概念、配置以及如何与ActiveMQ这样的消息中间件进行集成。 **1. JMS简介** Java消息服务(Java Message Service,简称JMS)是一个标准,定义了应用程序如何创建、...
在"activemq + jms(原生和集成spring-jms)"的主题中,我们将探讨如何使用ActiveMQ原生API以及结合Spring-JMS框架来实现消息队列的创建与使用,主要涵盖以下几个核心知识点: 1. **ActiveMQ的基本概念**:包括Broker...
在这个"JMS+ActiveMQ+Spring 完整样例代码"中,我们将会探讨如何将这三者结合起来,实现一个简单的消息传递系统。以下是关键的知识点: 1. **JMS接口** JMS定义了两种主要的消息模型:点对点(Point-to-Point,P2P...
### Spring+JMS+消息处理知识点详解 #### 一、Spring JMS框架概述 Spring JMS框架是Spring项目的一部分,其主要目标是简化Java Message Service (JMS) API的使用,使得开发人员能够更容易地与消息中间件进行交互。...
7. **事务管理**:Spring JMS可以与Spring的声明式事务管理结合,确保消息发送在事务中完成,从而提供事务的一致性。 8. **持久化**:ActiveMQ支持消息持久化,即使在服务器重启后,也能保证消息不丢失。 9. **...
3. **Spring与JMS集成**:Spring框架提供了JmsTemplate类,简化了发送和接收消息的过程。此外,通过配置XML或Java配置,可以轻松地设置连接工厂、目的地和消息监听器。 4. **Spring与ActiveMQ集成**:配置Spring以...
本篇文章将深入探讨Spring JMS与JNDI(Java命名和目录接口)的结合使用,以及它们如何在实际项目中发挥作用。 首先,理解JMS是理解Spring JMS的关键。JMS是一种标准接口,允许应用程序创建、发送、接收和读取消息。...
总结,JMS 1.1规范与Spring框架的结合,为Java开发者提供了强大的消息传递能力,并结合Spring的高级特性,使得在构建可扩展、高可用性的分布式系统时更加得心应手。通过阅读"JMS规范培训教程.pdf",可以深入理解JMS...
【ActiveMQ和Spring整合JMS】的文档主要介绍了消息中间件的基本概念,特别是重点讨论了ActiveMQ和JMS的相关知识。消息中间件是用于不同分布式系统之间数据交流的工具,通过消息传递机制来扩展进程间的通信。ActiveMQ...
Spring的任务执行与调度、Spring Web应用的国际化支持、AJAX和Spring结合的访问模式、利用Spring发 送电子邮件、Spring JMS消息应用编程、教学资源全文检索设计、Java应用的报表打印、网络考试系统设 计、Spring应用...
Spring JMS是Spring框架的一个重要组成部分,它提供了一个用于与JMS兼容的消息传递系统的抽象层,使开发者能够更容易地在应用程序中使用消息传递。4.3.4.RELEASE是该模块的一个发行版本,通常包含bug修复、性能优化...
以上就是Spring与其他框架整合的主要知识点,通过这些整合,开发者可以构建出更加复杂且高效的Java应用,同时利用Spring的优秀特性,如依赖注入、AOP、事务管理等,提高代码质量,降低维护成本。
在JMS集成中,Spring提供了一套丰富的JMS抽象层,简化了与消息代理的交互,如声明式消息监听容器和模板类。 5. **pom.xml**:Maven的配置文件,定义了项目依赖关系、构建过程等信息。在这个项目中,pom.xml会包含...
Spring AOP通过代理模式实现了这一功能,可以与Spring的DI无缝结合。 8. **spring-core.jar**:核心工具模块,包含了Spring框架的基础类和通用工具,如类型转换系统、反射和资源加载等。 9. **spring-tx.jar**:...
虽然描述部分为空白,但是结合标题和标签“spring”,我们可以深入探讨Spring框架的相关知识点。下面将详细介绍Spring框架的基础概念、核心模块、设计模式以及实际应用中的关键技巧。 ### Spring框架简介 Spring...
在这个“spring+activeMQ消息队列”的主题中,我们将深入探讨Spring如何与ActiveMQ结合使用,以及相关的知识点。 首先,让我们了解什么是ActiveMQ。ActiveMQ是Apache软件基金会的一个开源项目,它是一个完全支持JMS...
以下将详细介绍如何进行Spring与ActiveMQ的集成,并提供一些关键知识点。 1. **安装配置ActiveMQ** - 首先,需要下载并安装ActiveMQ服务器。可以从官方网站(https://activemq.apache.org/)获取最新版本。 - 启动...
5. **Spring MVC集成**:如果你的项目使用了Spring MVC,你可以将CXF服务与控制器结合,提供RESTful API,使得前后端交互更加便捷。 6. **异常处理**:Spring和CXF结合,可以统一处理服务调用中的异常,提供更优雅...