`

利用JMS Topic发布/订阅消息

    博客分类:
  • jms
阅读更多

2003年3月11日那一期(使用JMS Queue) 解释了如何使用Java消息服务(Java Messaging Service,JMS)Queue进行点到点的消息发送。下面这一技巧将解释如何使用JMS Topic实现发布/订阅式的消息发送。

发布/订阅式的消息发送

发布/订阅式的消息发送中,一个发布者利用一个方法调用将每条消息发送给多个预订者。介于发布者和预订者之间的是一台消息服务器。在JMS中,消息服务器被叫做“JMS提供者”。发布者发送消息到JMS提供者,预订者从JMS提供者接收消息。

下图演示了这一方案。

在JMS中,发布/订阅式的消息发送使用JMS管理的一个叫做Topic的对象来管理发布者到预订者的消息流。JMS发布者又叫做消息生产者,而JMS预订者又叫做消息消费者。消息生产者获得服务器上一个JMS Topic的引用,并向该Topic发送消息。当消息到达时,JMS提供者负责通知所有预订了该Topic的消息消费者。JMS提供者每次发送消息后(可选地)将接收到消息收据的确认。

这一过程描述如下:

使用JMS Topic的发布/订阅式消息发送在几方面类似于点到点的消息发送。以下是两种消息发送方式共同的特点:

  • 消息发送可以是面向对象的,允许将整个的对象作为消息发送。
  • 消息发送可以是事务性的。
  • 消息发送可以是同步或异步的。
  • 消息发送可以与基础的第三方产品集成。
  • 消息可以发送给在消息发送时不在运行的消息消费者(即QueueReceiver或TopicSubscriber)。
  • 消息一旦被递送到队列或主题,发送消息的函数调用就会立即返回。
  • 可以显式地或自动地确认收到消息。

发布/订阅式消息发送与点到点消息发送之间也有几点不同:

  • 发布/订阅式消息发送是一对多的,而点到点消息发送是一对一的。
  • 发布的消息只递送给Topic当前的预订者。客户只能接收到他向一个Topic预订了的那些消息。而在点到点消息发送中,永久的消息将一直在Queue中,直到它超时或者某个接收者来取走该消息。
  • 发布/订阅式消息发送中的永久消息是由“耐久的预订”提供的。JMS提供者存储由于预订者出于某种原因不可用而无法递送给预订者的消息。在下次预订者连接上的时候,这些存储的消息将会被递送给他。这确保了客户预订一个Topic之后,所有发布的消息都会递送给他,哪怕该预订者不是总在运行。如果预订不是耐久的,在预订者掉线时发布的任何消息都不会递送到预订者。

发布/订阅消息和点到点消息发送没有优劣之分,它们是相互补充的工具,各自用于不同的目的。点到点消息发送通常用于消息接收者在一个系统内有惟一的标识的情况下。发布/订阅式消息发送更多地用于一个系统中的几个代理需要知道某个事件或条件何时出现这种情况下。

JMS消息发送模型非常类似于常规的Java 2编程中的事件侦听器。点到点消息发送就像一个单播事件侦听器模型,而发布/订阅消息就像一个组播侦听器模型。传统的Java事件侦听器与JMS(不同于编程语法)之间的差别是,事件源和侦听器分别叫做消息生产者和消费者。JMS消息生产者和消费者可以运行在不同的地址空间,甚至是在不同的机器上。JMS消息发送还提供比传统的事件侦听器模型所实现的更高级别的服务。不过基本的消息发送模型是相同的。

该技巧的示例代码由三个程序组成:

  1.  
    1. 一个servlet,名字叫做PublishWeatherServlet,它向JMS Topic发布一个XML格式的天气报告。
    2. 一个命令行Java应用程序,名字叫做WeatherReceiver,它向Topic发出预订并打印接收到的XML消息。
    3. 一个GUI应用客户端,名字叫做WeatherClient,它解析并以图形方式显示XML消息中的数据。

下面是用于发布天气报告的HTML页面和Web表单、运行命令行预订者的终端会话和GUI应用的屏幕快照:

发布消息到Topic

名叫PublishWeatherServlet的servlet从HTML表单接收POST参数,并转化为XML格式,然后使用JMS Topic将产生的XML文档发布到所有的侦听器。该servlet中的大多数代码用于接收POST参数并将它们转化为XML文档。代码的有趣部分在于发布方法。该方法接收一个String参数,其中包含有将发布的XML文本。下面我们来仔细研究发布方法,看它是如何发布到JMS Topic的:

    1.Get a TopicConnectionFactory and a Topic.

                  protected void publish(String text) {              

                     TopicConnectionFactory tcf = null;
                      Topic topic = null;
          

                        try {                 
                         Context jndiContext = new InitialContext();
                         tcf = (TopicConnectionFactory)10.
                         jndiContext.lookup(
    11.
                         "java:comp/env/jms/TopicConnectionFactory");
    12.
                      topic = (Topic) jndiContext.lookup(
    13.
                         "java:comp/env/jms/Topic");
    14.
                   } catch (NamingException nameEx) {

                      System.err.println(nameEx.toString());
    16.
                   }

该代码使用Java命名和目录接口(Naming and Directory Interface,JNDI)API在JMS提供者上查找两个对象:Topic和TopicConnectionFactory。该servlet将发送消息到Topic。TopicConnectionFactory用于创建一个到JMS提供者的连接。请注意该servlet用于查找这些对象的名称。记住,J2EE应用中所有对象的JNDI API名称都应该以java:comp/env/打头

    Create a Connection, Session, and Publisher.

                TopicConnection tc = null;                       

            try {  21.
               tc = tcf.createTopicConnection();
              TopicSession ts =
                 tc.createTopicSession(
                    false, Session.AUTO_ACKNOWLEDGE);
              TopicPublisher tp =
                 ts.createPublisher(topic);

该代码使用从JNDI API得到的TopicConnectionFactory来创建TopicConnection。TopicConnection可用于创建TopicSession。用于创建TopicSession的参数告诉连接创建一个不是事务性的TopicSession,并且自动地确认消息收到。(如果消息递送是事务性的,那么在同一TopicSession中发送的所有消息将形成一个工作单元,该单元可以被提交或回滚)。然后,使用Session创建TopicPublisher,TopicPublisher充当消息发布的通道的角色。

注意,J2EE 2.0规范指出,JMS消息的事务和确认是由J2EE容器管理的。这意味着如果代码运行在容器中,那么这些参数就可以被忽略。遗憾的是,并不是所有的提供商都会按照这一要求去做。如果事务、确认或者这二者对您的应用很重要,请务必检查您的产品文档,或者自己测试这些参数的行为。这些参数应该像在容器外工作一样。

    Create and publish the message.

                   TextMessage textMessage =
                      ts.createTextMessage();

                   textMessage.setText(text);

                   tp.publish(textMessage);

                   ...

            } // End of method publish

发布消息很简单。TopicSession充当创建新的TextMessage的工厂。该代码将TextMessage的文本设置为包含将发送的XML的字符串,然后使用TopicPublisher将消息发布到Topic。

发布方法到此就介绍完了。JMS提供者负责将消息递送给所有的预订者。

Topic预订消息和接收消息

命令行程序WeatherReceiver向Topic预订消息并打印出从该Topic接收到的任何消息。为了简化,预订Topic的过程被封装在辅助类SubscriptionHelper中。WeatherReceiver类充当一个异步消息接收者,并实际执行输出操作。

  1. 向一个Topic预订消息

    以下的代码来自类SubscriptionHelper,创建对一个Topic的预订:
      

    protected TopicConnection _tc;
        ...
       
        public SubscriptionHelper(String tcfName,
                               String topicName,
                               MessageListener listener) {

           // Get references to topic connection factory
           // and topic. \
           _tc = null;

           TopicConnectionFactory tcf = null;
           Topic topic = null;

           try {
              InitialContext ic = new InitialContext();
              tcf = (TopicConnectionFactory)
                 ic.lookup(tcfName);
              topic = (Topic) ic.lookup(topicName);
           } catch (NamingException e) {
              System.err.println(e.toString());
              e.printStackTrace(System.err);
           }

           try {
              // Create a connection and so on

              // Subscribe self to topic--messages will be
              // delivered to this.onMessage()
              _tc = tcf.createTopicConnection();
              TopicSession ts =

                 _tc.createTopicSession(
                   false, Session.AUTO_ACKNOWLEDGE);
              TopicSubscriber tsub =

               ts.createSubscriber(topic);
              tsub.setMessageListener(listener);
           } catch (JMSException e) {
              System.err.println(e.toString());

              e.printStackTrace(System.err);
              close();
           }
        }

SubscriptionHelper类的大部分等同于发布者代码。它使用JNDI API来获得对Topic 和TopicConnectionFactory的引用,并创建TopicConnection和TopicSession对象。但是该类不是创建一个TopicPublisher,而是创建一个TopicSubscriber,并将TopicSubscriber的消息侦听器设置为已经传递进来的MessageListener。从此以后,每当该Topic接收到一条消息,该消息就会被递送到MessageListener的onMessage方法。因为这种方式中使用了一个回调,因此该例子演示了异步消息接收。

2.Receiving Messages

接收消息惟一的要求是类实现接口javax.jms.MessageListener。WeatherReceiver类本身是一个MessageListener。MessageListener接口只有一个方法:onMessage。WeatherReceiver的onMessage方法出现在下面:

       

    public class WeatherReceiver implements
           MessageListener {
           // Print a weather message when it is received
           public void onMessage(Message message) {
              try {
                  if (message instanceof TextMessage) {
                     TextMessage m = (TextMessage) message;
                     System.out.println(
                        "--- Received weather report");
                     System.out.println(m.getText());
                     System.out.println("----------");
                  } else {
                     System.out.println(
                        "Received message of type " +
                         message.getClass().getName());
                  }
             } catch (JMSException e) {
                System.err.println(e.toString());
                e.printStackTrace(System.err);
             }
          }
          ...
          public static void main(String[] args) {
             if (args.length != 2) {
                System.out.println(
                   "Usage: WeatherReceiver " +
                   "topicConnectionFactorName topicName");
                System.exit(1);
             }


            // Create a receiver, then set it up to listen
           // for messages on the topic. Then wait for
            // messages and print them as they come in.
            WeatherReceiver wr = new WeatherReceiver();
            SubscriptionHelper sh =
               new SubscriptionHelper(args[0], args[1], wr);
       
           // Wait for publications...
            System.out.println(
              "Waiting for publications to topic " +
              args[1]);
            sh.waitForMessages();
         }

WeatherReceiver的主方法创建一个WeatherReceiver实例和一个SubscriptionHelper实例。它向SubscriptionHelper传递应用将会使用的WeatherReceiver、Topic名称和TopicConnectionFactory(这些参数在命令行指定)。SubscriptionHelper实例创建预订。然后主方法将WeatherReceiver注册为来自Topic的消息的消费者。

onMessage方法只是在适当的地方将接收到的Messages转化为类TextMessage,并输出接收到的XML文档。

注意,在Web层使用JMS侦听器是一个坏主意。实际上,J2EE 1.3参考实现不允许这样做。服务器端JMS侦听器在EJB层被适当模型化为消息驱动Bean。

部署Web应用

有了JMS,发布/订阅式消息发送的代码就很容易。但是,部署描述文件提出了一个需要解决的问题。PublishWeatherServlet是一个使用JNDI API查找外部组件的Web组件。Web组件使用编码名称查找外部资源(比如Topics和TopicConnectionFactories)。部署描述文件必须将这些编码名称定义为资源引用或资源环境引用。下面从Web应用的部署描述文件web.xml抽取出的代码定义了servlet使用的编码名称(该代码出现在web.xml中的<welcome-file-list>之后):

   <!-- JMS topics and connection factories used -->
  <resource-env-ref>
    <resource-env-ref-name>
      jms/Topic
    </resource-env-ref-name>
    <resource-env-ref-type>
      javax.jms.Topic
    </resource-env-ref-type>
  </resource-env-ref>
  <resource-ref>
    <res-ref-name>
      jms/TopicConnectionFactory

     </res-ref-name>
    <res-type>
      javax.jms.TopicConnectionFactory
    </res-type>
    <res-auth>
      Container
    </res-auth>
  </resource-ref>

resource-env-ref块定义名称“jms/Topic”的类型为javax.jms.Topic。字符串“jms/Topic”是用于查找Topic ("java:comp/env/jms/Topic")的字符串,其中“java:comp/env/”部分删除了。产品的部署工具允许应用部署人员将这一名称映射到环境中的一个Topic。

在J2EE参考实现中,这一映射已经预先配置在文件META-INF/sun-j2ee-ri.xml中的Web档案中。该文件是Web应用的运行时部署描述文件。部署描述文件在名称和内容方面都是特定于提供商的。

resource-ref块定义了TopicConnectionFactory的名称、类型和授权模式。通常,部署人员会使用部署工具将编码名称jms/TopicConnectionFactory与平台中的TopicConnectionFactory相关联。J2EE参考实现预配置了在JNDI中命名空间中叫做jms/TopicConnectionFactory的TopicConnectionFactory。

  • 大小: 12.1 KB
  • 大小: 7.7 KB
  • 大小: 23.4 KB
分享到:
评论
1 楼 qinxcb 2011-10-26  
能否将实例代码发布一下,谢谢。

相关推荐

    应用openJMS实现JMS消息发布于订阅

    3. **发布消息**: 创建一个`MessageProducer`实例,并通过`createTextMessage()`或`createObjectMessage()`方法创建消息。然后,调用`send()`方法将消息发送到指定的主题。 ```java MessageProducer producer = ...

    JMS sub/pub实现聊天系统

    在这个模型中,生产者(publisher)发布消息到一个主题(topic),而消费者(subscriber)则订阅这个主题,从而接收到发布的消息。 1. **JMS基本概念** - **消息**:消息是数据的载体,包含实际要传递的信息。 - ...

    ActiveMQ-Topic订阅发布模式Demo

    5. **发布消息**:使用MessageProducer对象创建并发送消息到Topic。消息可以是文本、二进制数据或其他复杂类型,通常需要设置主题名来指定目标。 6. **订阅Topic**:消费者通过MessageConsumer订阅Topic,可以设置...

    weblogic中使用JMS发送和接受消息

    通过理解并熟练掌握上述内容,你将在WebLogic环境中成功地利用JMS进行消息传递,无论是简单的点对点通信还是复杂的发布/订阅模式,都能游刃有余。请务必根据具体需求进行配置,并确保测试环节充分,以确保JMS服务的...

    activemq和spring整合发布消息和订阅消息demo

    这个"activemq和spring整合发布消息和订阅消息demo"是用于演示如何在Spring环境中配置和使用ActiveMQ来实现实时的数据通信。现在,我们将深入探讨相关的知识点。 首先,ActiveMQ是Apache软件基金会开发的一个开源...

    ESB应用JMS_Topic

    在本案例中,我们将深入探讨如何在JBoss ESB中利用JMS Topic实现消息通信。 #### 二、JMS Topic应用场景 JMS Topic通常用于需要广播消息的应用场景,即一个生产者发送的消息可以被多个消费者接收。这种模式非常...

    JMS 教程 - 消息队列、消息服务

    ### JMS 教程 - 消息队列、消息服务 #### 企业级消息传递与JMS概述 在深入探讨JMS(Java消息服务)之前,我们先来了解下...通过理解这些概念和技术,开发者可以更好地利用JMS构建出高效、可靠的企业级消息服务系统。

    JMS消息服务代码(java message service)

    1. **Message Publisher**:创建并发布消息到一个JMS主题。 2. **Topic**:消息发布的中心,所有的订阅者都会关注这个主题。 3. **Message Subscriber**:订阅特定主题,以接收发布到该主题的所有消息。 JMS API...

    Spring整合JMS

    生产者发布消息到一个主题,所有订阅了该主题的消费者都能收到消息。这种方式适用于一对多的消息传递,如事件通知、消息广播等。 4. Spring整合JMS Spring通过提供一套模板类简化了JMS API的使用,帮助开发者更容易...

    JMS中/英文帮助文档

    5. **主题(Topic)与队列(Queue)**:JMS支持两种消息模式——发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)。主题适用于广播式通信,多个消费者可以订阅同一个主题并接收消息。队列则遵循一对一的...

    JMS消息队列机制及案例

    5. **主题(Topic)**:主题支持发布/订阅模式,一个消息可以被多个订阅者同时接收,实现广播式通信。 6. **案例分析**: - **订单处理**:当大量订单涌入时,使用JMS队列可以避免系统因处理能力不足而崩溃。订单...

    jms基本用法

    了解了JMS的基本概念和用法后,开发者可以根据具体需求选择合适的消息模型和实现,利用JMS构建健壮、高效的企业级应用。在实际项目中,通常还需要结合其他工具和框架,如Spring JMS,来简化配置和使用过程。总之,...

    ActiveMq 点对点 发布订阅demo

    发布者将消息发送到一个主题(Topic),而多个订阅者可以订阅该主题,从而接收消息。这种模式强调消息的广播特性,每个订阅者都可以接收到消息的副本。在ActiveMQ中,我们通过创建`Topic`对象来实现发布/订阅。 3. ...

    ActiveMQ 中javax.jms的源码 javax.jms-sources-1.1.zip

    而Topic的实现则采用了发布/订阅模型,多个消费者可以同时订阅同一个Topic,消息会被广播给所有订阅者。 通过深入研究ActiveMQ的javax.jms源码,我们可以更深入地理解JMS的工作原理,了解如何利用ActiveMQ实现高效...

    JMS学习笔记精心总结

    **JMS学习笔记精心总结** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准接口。...通过深入学习和实践,我们可以更好地利用JMS来解决复杂系统的异步通信问题。

    Spring发送接收JMS消息

    JMS提供两种类型的消息模型:点对点(Point-to-Point, P2P)和发布/订阅(Publish/Subscribe)。在点对点模型中,消息由一个生产者发送到一个队列,然后由一个或多个消费者接收。发布/订阅模型中,消息发布到一个...

    jms培训PPT

    - **主题(Topic)**: 支持发布/订阅模式,多个消费者可以订阅同一个主题,当一个生产者发送消息到主题时,所有订阅该主题的消费者都能接收到消息。 - **队列(Queue)**: 支持点对点模型,每个消息只能被一个消费...

    Jms基础知识整理创建消息 -> 发送消息 -> 接收消息 -> 读取消息 ()

    在Pub/Sub模型中,生产者发布消息到主题,而多个订阅者可以订阅该主题并接收消息。 ### 使用消息的原因 消息系统提供了一种**分布式、低耦合的通讯方案**。例如,组件A发送消息到一个代理(Agent),组件B作为接收...

    JMS Southbound 指南 jms_southbound_tutorial

    - **Destination**: 目的地是消息发送的目标,可以是Queue(点对点)或Topic(发布/订阅)。 - **Producer**: 生产者创建并发送消息到目的地。 - **Consumer**: 消费者从目的地接收消息。 - **Message Broker**: ...

    JMS 1.1 规范中英文版本 Java Message Service.zip

    JMS提供两种主要的消息模型:点对点(Queue)和发布/订阅(Topic)。点对点模型中,消息被一个消费者接收,而发布/订阅模型下,消息可以被多个订阅者消费。 1. **点对点模型**:在点对点模型中,消息被发送到一个...

Global site tag (gtag.js) - Google Analytics