`

JMS 基本知识及与Spring结合

阅读更多
转自http://www.blogjava.net/Unmi/archive/2010/04/10/317947.html
JMS(Java Message Service) 是 Java 为面向消息中间件(MOM)定义的接口。JMS 的通信管道就是消息队列,说到消息队列,历史就悠久,在 MS 系统中很早就有 MSMQ,譬如邮件、群组就是些消息队列。JMS 因其异步,所以可用来解决高并发的问题,分布式可对负载进行均衡。

JMS 已成为 J2EE 规范中的一部分,所以在 J2EE 应用服务器中都有 JMS 核心部分 MQ 的实现,MQ 也有独立的产品,如 ActiveMQ、JBoss MQ(已更名为 JBoss Messaging)、WebSphere MQ 等。

如果我们蒙着头来理解,JMS 消息通信中的主要角色应该有:消息生产者(Producer)、消息消费者(Consumer)、它们间的消息队列(Queue)、以及所传送的消息(Message)。

由于 JMS 有两种通信模式:端到端(P2P) 和主题/订阅模式,所以还有个角色就是主题(Topic)。通信中还需要处理诸如连接(Connection),面向连接就会产生会话(Session),而连接一般都是通过连接工厂(Connection Factory) 来获得。

而 MQ 中间件的存在,使得端与端之间不需要直接相连来建立队列,且对于主题/订阅模式更是不可能,所以消息生产者和消费者它们都是指向到 MQ  中间件上的,它们在 MQ 上所指向的队列或主题被抽像为目的地(Destination),对于消息生产者称之为目的地可以理解,但作为消费者来说也叫做目的地中文描述上有些欠妥,谓之消息来源地(Source) 较好理解。还有,因为是异步的消息通信,所以就要注册消息监听器(MessageListener)

通过上面的理解,我们把 JMS 中所有的角色都串联起来了,我们在编程中要处理的基本就那些角色(对象),下面是 JMS 所定义的所有接口关系图(异常接口类未列于其中)。






现在来说说 JMS  中那两种通信模式的区别。首先来看最简单的 P2P 或队列模型。它就像我们邮件发送那样的方式,P 用户发往 C 用户的邮件只有 C 能收到,P 可以在多个邮件客户端发邮件给 C,C 也可以开多个邮件客户端来接收,某一个接收端收取了邮件 M 的话,则另一个接收端就收不到邮件 M 了。腾讯的 Foxmail 默认行为除外,它收完邮件后还会在服务器端保留,经常造成公司邮箱占用过大。用个图来描述:




这种 P2P 的通信息方式应该是数据传输在实际中可能不需要经过 MQ 服务器,而是直接在两个客户端间进行。
只有一个消费者将获得消息
生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
每一个成功处理的消息都由接收者签收

另一种传送模型是 主题发布/订阅模型。这种方式就类似于邮件列表,一般用户会就某个主题加入邮件列表,即订阅了该主题,当就该主题发布的信息,其它订阅者就都能收到,而且接收方之间是不受影响的。用个图来揣测一下 MQ Server 对该种模型的实现方式:




其实在 ActiveMQ 的管理控制台的 Topics 标签页中,也还是可以看到存在 Queue 的概念,这取决于 JMS 产品针对 Pub/Sub 模型的实现行为上。

MQ 除了完成上面消息转发或分发的任务之外,有时候称这个为目的地管理器 (DestinationManager),可以认为是 MQ 的的核心,除此之外还要负责消息缓存、状态、持久化、事物、安全性的管理。消息的持久性还是一项很重要的服务,消费方未启动时,假如有消息到来,消费方再次连接也可以接收到消息,即使 MQ 重启后消息也不会丢失。

MQ 产品一般还支持集群,以及与 MSMQ 或其他 MQ 产品桥接起来,也允许用其他语言编写客户端程序。

最后来看下以上两种消息传送模型中,消息生产者与消费者实现的主要步骤,两种模型的步骤差不多,只是一个是创建队列(Queue),一个是创建主题(Topic)。

消息生产者实现主要步骤:

  //1. 由ConnectionFactory 创建连接,一般 ConnectionFactory 从 JNDI 中获得
  Connection connection = connectionFactory.createConnection(); 
  
  //2. 创建 Session,
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  
  //3. P2P 中创建 Destination, 这里创建了一个 Queue,队列名为 "Hello.Unmi"
  Destination destination = session.createQueue("Hello.Unmi"); //实际应用中队列是从 JNDI 中获得
  //3. Sub/Pub 模型时,创建 Destination, 创建了一个 Topic,主题为"Unmi.Learn.ActiveMQ"  
  Destination destination = session.createTopic("Unmi.Learn.ActiveMQ");  

  //4. 创建 Producer,
  MessageProducer producer = session.createProducer(destination);
  
  //5. 创建 Message,这里创建的是一个文本消息,可创建多种类型的消息
  Message message = session.createTextMessage("Hello JMS Sended.");
  
  //6. 发送消息
  producer.send(message); 

消息消费者实现主要步骤:

  //1. 创建 Connection,//ActiveMQConnection 实现了 QueueConnection, TopicConnection
  Connection connection = connectionFactory.createConnection();
  
  //2. 创建 Session
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  
  //3. 创建 Destination, 一个 Queue,队列名与上同,这样就能接收到前面生产者发来的消息
  Destination destination = session.createQueue("Hello.Unmi");//实际应用中队列是从 JNDI 中获得
 //3. Sub/Pub 模型时,创建 Destination, 一个 Topic,主题名同上,可接上前面发布的消息  
  Destination destination = session.createTopic("Unmi.Learn.ActiveMQ");  //实际应用中队列是从 JNDI 中获得
  
  //4. 创建 Consumer
  MessageConsumer consumer = session.createConsumer(destination);
  
  //5. 注册消息监听器,当消息到达时被触发并处理消息,也可阻塞式监听 consumer.receive()
  consumer.setMessageListener(new MessageListener() {
      public void onMessage(Message message) {
              //Do something with the message.
      }
   });



上面代码全部是用最顶层的接口类型来引用的变量,实际应用中会用具体类型来引用变量,以使用更方便的方法。如直接用到 TopicPublisher、TopicSender、QueueSession、QueueConnection 等。而且还可能直接用某个 MQ 产品特定的实现类,如 ActiveMQConnection、等。如果为了便于切换不同的 MQ 产品,当然用最上层的接口去引用类型,但不得不要用到某个 MQ 产品的有利特性的时候,程序代码与该 MQ 产品存在这种高耦合也是不可避免的。


下面我么借助Mom4J来实现JMS
public final class Mom4jUtil {
    public static void startJmsServer() throws Exception {
      File f = new File("durable.dat.lck");
      f.delete();
        Mom4jFactory.start(new CustomConfig(), true);
        System.setProperty(Context.INITIAL_CONTEXT_FACTORY, 
          "org.mom4j.jndi.InitialCtxFactory");
        System.setProperty(Context.PROVIDER_URL, "xcp://localhost:8001");
    }
}

class CustomConfig implements Mom4jConfig {

    public int getPort() { return 4444; }
    public int getAdminPort() { return 8888; }
    public int getJndiPort() { return 8001; }
    public int getThreadCount() { return 3; }
    public int getSyncInterval() { return 1000; }
    public int getAsyncInterval() { return 5000; }

    public List getUsers() { return new ArrayList(); }
    public List getContextHandlers() { return new ArrayList(); }
    
    public File getMessageStore() {
        File dir = new File("store/");
        dir.mkdirs();
        return dir;
    }

    public File getDurablesStore() {
        return new File("durable.dat");
    }

    @SuppressWarnings("unchecked")
    public List getDestinations() {
        List list = new ArrayList(1);
        list.add(new Mom4jDestination() {
            public String getName() {
                return "jms/queue";
            }
            public int getType() {
                return Mom4jDestination.QUEUE;
            }});
        return list;
    }
}  

public class Sender extends Thread {
  public void run() {
    try {
      Context ctx = new InitialContext();
      ConnectionFactory factory = (ConnectionFactory) ctx.lookup("QueueConnectionFactory");
      // Destination 即消息的目的地,消息发到何处
      Destination destination = (Destination) ctx.lookup("jms/queue");
      for (int i = 0;; i++) {
        Connection connection = null;
        try {
          connection = factory.createConnection();
          Session session = connection.createSession(false, 
            Session.AUTO_ACKNOWLEDGE);
          MessageProducer producer = session.createProducer(destination);
          String text = "Hello, it is " + new Date();
          System.out.println("   Send: " + text);
          Message message = session.createTextMessage(text);
          producer.send(message);
          producer.close();
          session.close();
          if(i%10 ==0)
            TimeUnit.SECONDS.sleep(10);
        } catch (JMSException e) {
          throw new RuntimeException(e);
        } finally {
          if (connection != null)
            connection.close();
        }
        TimeUnit.MILLISECONDS.sleep(200);
      } 
    } catch (NamingException e) {
      throw new RuntimeException(e);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

public class Receiver extends Thread implements MessageListener {
  public void run() {
    try {
      Context ctx = new InitialContext();
      ConnectionFactory factory = (ConnectionFactory) ctx.lookup("QueueConnectionFactory");
      // Destination 即消息的来源地,从何处接受消息
      Destination destination = (Destination) ctx.lookup("jms/queue");
      Connection connection = null;
      try {
        connection = factory.createConnection();
        Session session = connection.createSession(false, 
          Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(this);
        connection.start();
        Thread.sleep(20000);
      } catch (JMSException e) {
        throw new RuntimeException(e);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } finally {
        if (connection != null)
          connection.close();
      }
    } catch (NamingException e) {
      throw new RuntimeException(e);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  }
  // MessageListener 的实现类是线程安全的
  public void onMessage(Message message) {
    try {
      TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e1) {
      throw new RuntimeException(e1);
    }
    if (message instanceof TextMessage) {
      TextMessage text = (TextMessage) message;
      try {
        System.out.println("Receive: " + text.getText());
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }
  }
}

public class Main {
  public static void main(String[] args) throws Exception {
    Mom4jUtil.startJmsServer();
    new Sender().start();
    new Receiver().start();
  }
}



Spring提供JmsTemplate,简化JMS操作
  <bean id="jmsConnectionFactory" 
      class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="QueueConnectionFactory" />
  </bean>

  <bean id="jmsQueue" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="jms/queue" />
  </bean>

  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="defaultDestination" ref="jmsQueue" />
  </bean>

  <bean id="sender" class="example.chapter9.Sender">
    <property name="jmsTemplate" ref="jmsTemplate" />
  </bean>

  <bean id="receiver" class="example.chapter9.Receiver" />

  <bean id="listenerContainer" 
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destination" ref="jmsQueue" />
    <property name="messageListener" ref="receiver" />
  </bean>


public class Sender {
  private JmsTemplate jmsTemplate;
  public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
  }
  public void send(final String text) {
    System.out.println("   Send: " + text);
    jmsTemplate.send(new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(text);
      }
    });
  }
}

public class Receiver implements MessageListener {
  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
      TextMessage text = (TextMessage) message;
      try {
        System.out.println("Receive: " + text.getText());
      } catch (JMSException e) {
      }
    }
  }
}


大多数时候,除了简单的TextMessage外,需要发送的消息都应该被封装到Java类中,Spring提供了一个MessageConverter接口,方便实现Java类和JMS消息的转化
  • 大小: 85.8 KB
  • 大小: 41.8 KB
  • 大小: 44.4 KB
分享到:
评论

相关推荐

    SpringJMS示例代码

    SpringJMS是Spring框架的一部分,它提供了一种与Java消息服务(JMS)...通过理解以上知识点,并结合提供的`demo`代码,你可以更好地掌握SpringJMS与ActiveMQ的集成,从而在实际项目中构建高效、可靠的消息传递系统。

    ActiveMQ学习笔记(二) JMS与Spring

    在本篇ActiveMQ学习笔记中,我们将探讨JMS(Java Message Service)与Spring框架的集成。JMS是一种标准API,用于在分布式环境中进行异步消息传递,而Spring框架则为开发人员提供了强大的依赖注入和管理服务的能力。...

    spring-jms入门

    本文将深入探讨Spring-JMS的基础知识,包括它的核心概念、配置以及如何与ActiveMQ这样的消息中间件进行集成。 **1. JMS简介** Java消息服务(Java Message Service,简称JMS)是一个标准,定义了应用程序如何创建、...

    activemq +jms(原生和集成spring-jms)

    在"activemq + jms(原生和集成spring-jms)"的主题中,我们将探讨如何使用ActiveMQ原生API以及结合Spring-JMS框架来实现消息队列的创建与使用,主要涵盖以下几个核心知识点: 1. **ActiveMQ的基本概念**:包括Broker...

    JMS+ActiveMQ+Spring 完整样例代码

    在这个"JMS+ActiveMQ+Spring 完整样例代码"中,我们将会探讨如何将这三者结合起来,实现一个简单的消息传递系统。以下是关键的知识点: 1. **JMS接口** JMS定义了两种主要的消息模型:点对点(Point-to-Point,P2P...

    Spring+JMS+消息处理

    ### Spring+JMS+消息处理知识点详解 #### 一、Spring JMS框架概述 Spring JMS框架是Spring项目的一部分,其主要目标是简化Java Message Service (JMS) API的使用,使得开发人员能够更容易地与消息中间件进行交互。...

    jms Spring+ActiveMQ 5.4.2

    7. **事务管理**:Spring JMS可以与Spring的声明式事务管理结合,确保消息发送在事务中完成,从而提供事务的一致性。 8. **持久化**:ActiveMQ支持消息持久化,即使在服务器重启后,也能保证消息不丢失。 9. **...

    jms+activeMq+spring学习简单例子

    3. **Spring与JMS集成**:Spring框架提供了JmsTemplate类,简化了发送和接收消息的过程。此外,通过配置XML或Java配置,可以轻松地设置连接工厂、目的地和消息监听器。 4. **Spring与ActiveMQ集成**:配置Spring以...

    Spring JMS 消息处理-基于JNDI

    本篇文章将深入探讨Spring JMS与JNDI(Java命名和目录接口)的结合使用,以及它们如何在实际项目中发挥作用。 首先,理解JMS是理解Spring JMS的关键。JMS是一种标准接口,允许应用程序创建、发送、接收和读取消息。...

    JMS1.1规范培训教程&&spring框架

    总结,JMS 1.1规范与Spring框架的结合,为Java开发者提供了强大的消息传递能力,并结合Spring的高级特性,使得在构建可扩展、高可用性的分布式系统时更加得心应手。通过阅读"JMS规范培训教程.pdf",可以深入理解JMS...

    消息中间件ActiveMQ及Spring整合JMS.docx

    【ActiveMQ和Spring整合JMS】的文档主要介绍了消息中间件的基本概念,特别是重点讨论了ActiveMQ和JMS的相关知识。消息中间件是用于不同分布式系统之间数据交流的工具,通过消息传递机制来扩展进程间的通信。ActiveMQ...

    Spring3.X编程技术与应用,完整扫描版

    Spring的任务执行与调度、Spring Web应用的国际化支持、AJAX和Spring结合的访问模式、利用Spring发 送电子邮件、Spring JMS消息应用编程、教学资源全文检索设计、Java应用的报表打印、网络考试系统设 计、Spring应用...

    spring-jms-4.3.4.RELEASE.zip

    Spring JMS是Spring框架的一个重要组成部分,它提供了一个用于与JMS兼容的消息传递系统的抽象层,使开发者能够更容易地在应用程序中使用消息传递。4.3.4.RELEASE是该模块的一个发行版本,通常包含bug修复、性能优化...

    spring整合其他框架

    以上就是Spring与其他框架整合的主要知识点,通过这些整合,开发者可以构建出更加复杂且高效的Java应用,同时利用Spring的优秀特性,如依赖注入、AOP、事务管理等,提高代码质量,降低维护成本。

    ActiveMQ+Camel+Spring+jms Demo(一)

    在JMS集成中,Spring提供了一套丰富的JMS抽象层,简化了与消息代理的交互,如声明式消息监听容器和模板类。 5. **pom.xml**:Maven的配置文件,定义了项目依赖关系、构建过程等信息。在这个项目中,pom.xml会包含...

    spring2.5jar包

    Spring AOP通过代理模式实现了这一功能,可以与Spring的DI无缝结合。 8. **spring-core.jar**:核心工具模块,包含了Spring框架的基础类和通用工具,如类型转换系统、反射和资源加载等。 9. **spring-tx.jar**:...

    Spring技术内幕:深入解析Spring架构与设计原理(第2版).pdf

    虽然描述部分为空白,但是结合标题和标签“spring”,我们可以深入探讨Spring框架的相关知识点。下面将详细介绍Spring框架的基础概念、核心模块、设计模式以及实际应用中的关键技巧。 ### Spring框架简介 Spring...

    spring+MQ消息队列

    在这个“spring+activeMQ消息队列”的主题中,我们将深入探讨Spring如何与ActiveMQ结合使用,以及相关的知识点。 首先,让我们了解什么是ActiveMQ。ActiveMQ是Apache软件基金会的一个开源项目,它是一个完全支持JMS...

    spring集成activemq例子demo

    以下将详细介绍如何进行Spring与ActiveMQ的集成,并提供一些关键知识点。 1. **安装配置ActiveMQ** - 首先,需要下载并安装ActiveMQ服务器。可以从官方网站(https://activemq.apache.org/)获取最新版本。 - 启动...

    spring+cxf 整合jar包

    5. **Spring MVC集成**:如果你的项目使用了Spring MVC,你可以将CXF服务与控制器结合,提供RESTful API,使得前后端交互更加便捷。 6. **异常处理**:Spring和CXF结合,可以统一处理服务调用中的异常,提供更优雅...

Global site tag (gtag.js) - Google Analytics