`

WSAD环境下JMS异步通信全攻略 (3)

    博客分类:
  • JMS
阅读更多

3.5 消息驱动的Bean

  在前文讨论JMS消息接收处理逻辑的过程中,我们看到的代码仅仅适用于Servlet、JSP以及普通的Java应用程序,但不适用于EJB,因为在JMS的接收端使用EJB存在一些技术问题。一般地,JMS程序的交互模式分两种:

  ■ 发送-遗忘:JMS客户程序发出消息,但不需要应答。从性能的角度来看,这是最理想的处理模式,发送程序不需要等待对方响应请求,可以继续执行自己的任务。

  ■ 同步请求-答复:JMS客户程序发出消息并等待答复。在JMS中,这种交互方式通过执行一个伪同步的接收方法(前文已经讨论)实现。然而,这里可能出现问题。如果EJB模块在一个事务上下文之内执行(通常总是如此),单一事务之内无法执行请求-答复处理,这是因为发送者发出一个消息之后,只有当发送者提交了事务,接收者才能收到消息。因此,在单一事务之内是不可能得到应答的,因为在一个未提交的事务上下文之内接收程序永远收不到消息,当然也不可能作出应答了。解决的办法是请求-答复必须通过两个不同的事务执行。

  对于EJB来说,在JMS通信的接收端还可能出现另一个EJB特有的问题。在异步通信中,何时能够获得应答是不可预料的,异步通信的主要优点之一就是发送方能够在发出消息之后继续当前的工作,即使接收方当时根本不能接收消息也一样,但请求-答复模式却隐含地假定了EJB组件(假定是一个会话Bean)应该在发出消息之后等待答复。J2EE实际上是一个基于组件技术的事务处理环境,它的设计目标是处理大量短期生存的任务,却不是针对那些可能会被长期阻塞来等待应答的任务。

  为了解决这个问题,Sun在EJB 2.0规范中加入了一种新的EJB类型,即MDB。MDB是专门为JMS异步消息环境中(接收端)EJB组件面临的问题而设计的,其设计思路是把监听消息是否到达的任务从EJB组件移到容器,由于这个原因,MDB组件总是在容器的控制之下运行,容器代替MDB监听着特定的队列或主题,当消息到达该队列或者主题,容器就激活MDB,调用MDB的onMessage方法,将收到的消息作为参数传递给onMessage方法。

  MDB是一种异步组件,它的工作方式和其他同步工作的EJB组件(会话Bean,实体Bean)不同,MDB没有Remote接口和Home接口,客户程序不能直接激活MDB,MDB只能通过收到的消息激活。MDB的另一个重要特点是它对事务和安全上下文的处理方式与众不同,MDB已经彻底脱离了客户程序,因此也不使用客户程序的事务和安全上下文。

  发送JMS消息的远程客户程序完全有可能运行在不同的环境之中--非J2EE的环境,例如普通的Java应用程序,可能根本没有任何安全或事务上下文。因此,发送方的事务和安全上下文永远不会延续到接收方的MDB组件。由于MDB永远不会由客户程序直接激活,所以它们也永远不可能在客户程序的事务上下文之内运行。由于这个原因,下面这些事务属性对于MDB来说没有任何意义--Supports,RequiresNew,Mandatory,以及None,因为这些事务属性隐含地意味着延用客户程序的事务上下文。MDB可以使用的事务属性只有两个:NotSupported和Required。如果一个MDB组件有NotSupported事务属性,则它的消息处理过程不属于任何事务上下文。

  就象其他类型的EJB一样,MDB可能参与到两种类型的事务:Bean管理的事务,容器管理的事务。MDB组件的所有方法中,只有onMessage方法可以参与到事务上下文。如果让MDB参与到Bean管理的事务上下文,则表示允许MDB在onMessage方法之内开始和结束一个事务。这种策略的问题在于,收到的消息总是位于onMessage方法之内开始的事务之外(要让消息参与其中已经太迟了)。在这种情况下,如果由于某种原因必须回退,则消息必须手工处理。

  如果让MDB参与到容器管理的事务上下文,情况就完全不同了。只要设置了Required事务属性,容器就可以在收到消息时开始一个事务,这样,消息就成为事务的一部分,当事务成功提交时,或者当事务被回退而消息被返回到发送队列时,都可以获得明确的确认信息。

  事务可能在两种情形下回退:第一种情形是程序显式地调用setRollBackOnly方法,第二种情形是在onMessage方法之内抛出一个系统异常(注意,抛出应用程序异常不会触发回退动作)。当事务回退时,消息就被返回到原来的队列,监听器将再次发送消息要求处理。一般地,这种情况会引起无限循环,最后导致应用运行不正常。Max_retries属性可以用来控制监听器再次提取重发消息的次数(这个属性在配置监听器端口的时候设置),超过Max_retries规定的限制值之后,监听器将停止处理该消息(显然,这算不上最理想的处理方式)。

  如果用WebSphere MQ作为JMS提供者,还有一种更好的解决办法。我们可以配置WebSphere MQ,使其尝试一定的次数之后停止发送同一消息,并将消息放入一个特定的错误队列或Dead.Letter.Queue。记住,MDB是无状态的组件,也就是说它不会在两次不同的方法调用之间维持任何状态信息(就象Web服务器处理HTTP请求的情况一样),同时,由于客户程序永远不会直接调用MDB组件,所以MDB组件不能识别任何特定的客户程序,在这个意义上,可以认为MDB组件是"匿名"运行的。所有MDB组件都必须实现javax.ejb.MessageDrivenBean接口和javax.jms.MessageListener接口。

  除了onMessage方法之外,MDB还有其他几个回调方法--由容器调用的方法:

  ■ ejbCreate方法:容器调用该方法来创建MDB的实例。在ejbCreate方法中可以放入一些初始化的逻辑。

  ■ setMessageDrivenContext方法:当Bean第一次被加入到MDB Bean的缓冲池时容器将调用该方法。setMessageDrivenContext方法通常用来捕获MessageDrivenContext,并将setMessageDrivenContext保存到类里面的变量,例如:

public void setMessageDrivenContext(java.ejb.
MessageDrivenContext mdbContext)  {
    messageDrivenContext = mdbContext;
}



  ■ ejbRemove方法:容器把Bean从缓冲池移出并拆除时会调用该方法。一般地,ejbRemove方法会执行一些清理操作。

  一般而言,在onMessage方法之内执行业务逻辑是不推荐的,业务方面的操作最好委派给其他EJB组件执行。

  MDB容器会自动控制好并发处理多个消息的情形。每一个MDB实例处理一个消息,在onMessage方法把控制返回给容器之前,不会要求MDB同时处理另一个消息。如果有多个消息必须并行处理,容器会激活同一MDB的多个实例。

  从WebSphere 5.0开始,开发环境(WSAD 5.0)和运行环境(WAS 5.0)都开始全面支持MDB。

  下面的代码片断显示了一个MDB的概念框架。

// MDB概念框架
// package 声明略 
import javax.jms.Message;
import javax.jms.MapMessage;
import javax.naming.InitialContext;
import java.util.*;
public class LibraryNotificationBean
    implements javax.ejb.MessageDrivenBean, 
    javax.jms.MessageListener {
    MessageDrivenContext messageDrivenContext;
    Context jndiContext;
    public void setMessageDrivenContext(MessageDrivenContext
    msgDrivenContext) {
        messageDrivenContext = msgDrivenContext;
        try {
            jndiContext = new InitialContext();
        } catch (NamingException ne) {
            throw new EJBException(ne);
        }
    }

    public void ejbCreate() {
    }
    public void onMessage(Message notifyMsg) {
        try {
            MapMessage notifyMessage = (MapMessage) notifyMsg;
            String bookTitle = (String) notifyMessage.
	     getString("BookTitle");
            String bookAuthor = (String) notifyMessage.
	     getString("BookAuthor");
            String bookCatalogNumber =
                (String) notifyMessage.getString
		 ("bookCatalogNumber");
            Integer bookQuantity =
             (Integer) notifyMessage.getInteger("BookQuantity");
            // 处理消息...(调用其他EJB组件)
        } catch (Exception e) {
            throw new EJBException(e);
        }
    }
    public void ejbRemove() {
        try {
            jndiContext.close();
            jndiContext = null;
        } catch (NamingException ne) {
            // 异常处理
        }
    }
}



  3.6 消息持久化

  消息可以是持久性的,也可以是非持久性的,持久性的消息和非持久性的消息可以放入同一个队列。持久性的消息会被写入磁盘,即使系统出现故障也可以恢复。当然,正象其他场合的持久化操作一样,消息的持久化也会增加一定的开销,持久性消息大约要慢7%。控制消息持久化的办法之一是定义队列时设置其属性,如果没有显式地设置持久化属性,系统将采用默认值。另外,JMS应用程序本身也可以定义持久化属性:

  ■ PERSISTENCE(QDEF):继承队列的默认持久化属性值。

  ■ PERSISTENCE(PERS):持久性的消息。

  ■ PERSISTENCE(NON):非持久性的消息。

  消息的持久性还可以通过消息属性头JMSDeliveryMode来调节,JMSDeliveryMode可以取值DeliveryMode.PERSISTENT或DeliveryMode.NON_PERSISTENT,前者表示消息必须持久化,后者表示消息不需持久化。在事务化会话中处理的消息总是持久性的。

  3.7 消息选择器

  JMS提供了从队列选取一个消息子集的机制,能够过滤掉所有不满足选择条件的消息。选择消息的条件不仅可以引用消息头的域,还可以引用消息属性的域。下面是如何利用这一功能的例子:

QueueReceiver queueReceiver =
    queueSession.createReceiver(requestQueue, 
      "BookTitle = 'Windows 2000'");
QueueBrowser queueBrowser = queueSession.createBrowser
(requestQueue, "BookTitle = 'Windows 2000'
AND BookAuthor = 'Robert Lee'");



  注意,字符串(例如'Windows 2000')被一个双引号之内的单引号对包围。

  四、JMS Pub/Sub编程

  Pub/Sub通信方式的编程与P2P通信编程相似,两者最主要的差别是消息发送的目的地对象。在Pub/Sub通信方式中,发布消息的目的地和消费消息的消息源是一个称为主题(Topic)的JMS对象。主题对象的作用就象是一个虚拟通道,其中封装了一个Pub/Sub的目的地(Destination)对象。

  在P2P通信方式中,(发送到队列的)消息只能由一个消息消费者接收;但在Pub/Sub通信方式中,消息生产者发布到主题的消息可以分发到多个消息消费者,而且,消息的生产者和消费者之间的结合是如此宽松,以至于生产者根本不必了解任何有关消息消费者的情况,消息生产者和消费者都只要知道一个公共的目的地(也就是双方"交谈"的主题)。

  由于这个原因,消息的生产者通常称为出版者,消息的消费者则相应地称为订阅者。出版者为某一主题发布的所有消息都会传递到该主题的所有订阅者,订阅者将收到它订阅的主题上的所有消息,每一个订阅者都会收到一个消息的副本。订阅可以是耐久性的(Durable)或非耐久性(Nondurable)。非耐久性的订阅者只能收到订阅之后才发布的消息。

  耐久性订阅者则不同,它可以中断之后再连接,仍能收到在它断线期间发布的消息。Put/Sub通信方式中耐久性连接(在某种程度上)类似于P2P通信中的持久性消息/队列。出版者和订阅者永远不会直接通信,Pub/Sub代理的作用就象是一个信息发布中心,把所有消息发布给它们的订阅者。

  ● 注意:从WebSphere MQ 5.2开始,加装了MA88和MA0C扩展的WebSphere MQ可以当作JMS Pub/Sub通信代理。此外,WebSphere MQ Integrator也能够作为一个代理用。从MQ 5.3开始,MA88成了MQ基本软件包的一部分,因此只要在MQ 5.3的基础上安装MA0C就可以了。在MQ JMS环境中,要让Pub/Sub通信顺利运行,运行Pub/Sub代理的队列管理器上必须创建一组系统队列。

  MQ JMS MA0C扩展模块提供了一个工具来构造所有必需的Pub/Sub系统队列,这个工具就是MQJMS_PSQ.mqsc,位于\java\bin目录之下。要构造Pub/Sub通信方式所需的系统队列,只需从上述目录执行命令:runmqsc < MQJMS_PSQ.mqsc。

  多个主题可以组织成一种树形的层次结构,树形结构中主题的名称之间用一个斜杠(/)分隔--例如,Books/UnixBooks/SolarisBooks。如果要订阅一个以上的主题,可以在指定主题名字的时候使用通配符,例如,"Books/#"就是一个使用通配符的例子。

  下面给出了一个JMS Pub/Sub通信的例子(为简单计,这里省略了try/catch代码块)。在这个例子中,Books/ UnixBooks/SolarisBooks主题的订阅者将收到所有发布到SolarisBooks的消息,Books/#主题的订阅者将收到所有有关Books的消息(包括发布到UnixBooks和SolarisBooks主题的消息)。

// JMS Pub/Sub通信
import javsx.jms.*;
import javax.naming.*;
import javax.ejb.*;
public class PublisherExample implements javax.ejb.SessionBean {
    private TopicConnectionFactory topicConnFactory = null;
    private TopicConnection topicConnection = null;
    private TopicPublisher topicPublisher = null;
    private TopicSession topicSession = null;
    private SessionContext sessionContext = null;
    public void setSessionContext(SessionContext ctx) {
        sessionContext = cts;
    }
    public void ejbCreate() throws CreateException {
        InitialContext initContext = new InitialContext();
        // 从JNDI查找主题的连接工厂
        topicConnFactory =(TopicConnectionFactory) 
	initContext.lookup("java:comp/env/TCF");
        // 从JNDI查找主题
        Topic unixBooksTopic =
          (Topic) initContext.lookup("java:comp/env/UnixBooks");
        Topic javaBooksTopic =
          (Topic) initContext.lookup("java:comp/env/JavaBooks");
        Topic linuxBooksTopic =
          (Topic) initContext.lookup("java:comp/env/LinuxBooks");
        Topic windowsBooksTopic =
          (Topic) initContext.lookup("java:comp/env/WindowsBooks");
        Topic allBooksTopic =
          (Topic) initContext.lookup("java:comp/env/AllBooks");
        // 创建连接
        topicConnection = topicConnFactory.createTopicConnection();
        topicConn.start();
        // 创建会话
        topicSession =
            topicConn.createTopicSession(false, 
	      Session.AUTO_ACKNOWLEDGE);
    }
    public void publishMessage(String workMessage, 
      String topicToPublish) {
        // 创建一个消息
        TextMessage message = topicSession.createTextMessage
	 (workMessage);   // 创建出版者,发布消息
        if ((topicToPublish.toLowerCase()).equals("java")) {
            TopicPublisher javaBooksPublisher =
                topicSession.createPublisher(javaBooksTopic);
            javaBooksPublisher.publish(message);
        }
        if ((topicToPublish.toLowerCase()).equals("unix")) {
            TopicPublisher unixBooksPublisher =
                topicSession.createPublisher(unixBooksTopic);
            J2EE Enterprise Messaging 475 unixBooksPublisher.
	      publish(message);
        }
        if ((topicToPublish.toLowerCase()).equals("linux")) {
            TopicPublisher linuxBooksPublisher =
                topicSession.createPublisher(linuxBooksTopic);
            linuxBooksPublisher.publish(message);
        }
        if ((topicToPublish.toLowerCase()).equals("windows")) {
            TopicPublisher windowsBooksPublisher =
                topicSession.createPublisher(windowsBooksTopic);
            windowsBooksPublisher.publish(message);
        }
        TopicPublisher allBooksPublisher =
            topicSession.createPublisher(allBooksTopic);
        allBooksPublisher.publish(message);
    }
    public void ejbActivate() {
    }
    public void ejbPassivate() {
    }
    public void ejbRemove() {
        // 清理工作...
        if (javaBooksPublisher != null) {
            javaBooksPublisher.close();
            javaBooksPublisher = null;
        }
        if (unixBooksPublisher != null) {
            unixBooksPublisher.close();
            Chapter 9 476 unixBooksPublisher = null;
        }
        if (linuxBooksPublisher != null) {
            linuxBooksPublisher.close();
            linuxBooksPublisher = null;
        }
        if (windowsBooksPublisher != null) {
            windowsBooksPublisher.close();
            windowsBooksPublisher = null;
        }
        if (allBooksPublisher != null) {
            allBooksPublisher.close();
            allBooksPublisher = null;
        }
        if (topicSession != null) {
            topicSession.close();
            topicSession = null;
        }
        if (topicConnection != null) {
            topicConnection.stop();
            topicConnection.close();
            topicConnection = null;
        }
    }



  这段代码比较简单,想来不需要多余的解释了。唯一值得一提的地方是如何将一个消息发布到不同的主题:对于每一个特定的主题,分别创建一个对应的出版者,然后用它将消息发布到主题。

  如果一个MDB组件只负责接收消息,把所有其他的消息处理操作都委托给专用业务组件(这意味着MDB之内不包含消息发送或发布逻辑),MDB的代码就相当于P2P通信方式中的处理代码,使用MDB唯一的改变之处是将监听端口从监听一个队列改为监听一个主题。有兴趣的读者可以自己试验一下双重监听的实现方式。

  五、二阶段提交的事务

  在企业级处理任务中,为了保证JMS或非JMS代码处理业务逻辑过程的完整性(对于一个单元的工作,要么成功提交所有的处理步骤,要么全部回退所有处理步骤),操作一般要在一个事务上下文之内进行。除了将消息放入队列的过程之外,如果还要向数据库插入记录(二阶段提交,要么全部成功,要么全部失败),事务上下文的重要性尤其突出。

  为了支持二阶段提交,JMS规范定义了下列XA版的JMS对象:XAConnectionFactory、XAQueueConnectionFactory、XASession、XAQueueSession、XATopicConnectionFactory、XATopicConnection,以及XATopicSession。另外,凡是参与全局事务的所有资源均应该使用其XA版。特别地,对于JDBC资源,必须使用JDBC XADataSource。最后一点是,全局事务由JTA TransactionManager控制。下面的代码显示了建立全局事务所需的步骤。

// 配置全局事务

// 从JNDI名称空间获取一个JTA TransactionManager
TransactionManager globalTxnManager =
    jndiContext.lookup("java:comp/env/txt/txnmgr");
// 启动全局事务
globalTxnManager.begin();
// 获取事务对象
Transaction globalTxn = globalTxnManager.getTransaction();
// 获取XA数据资源
XADatasource xaDatasource = jndiContext.lookup
("java:comp/env/jdbc/datasource");

// 获取一个连接
XAConnection jdbcXAConn = xaDatasource.getConnection();
// 从XA连接获取XAResource
XAResource jdbcXAResource = jdbcXAConn.getXAResource();
// 在全局事务中"征募"XAResource
globalTxn.enlist(jdbcXAResource);
// 获取XA队列连接
XAQueueConnectionFactory xaQueueConnectionFactory =
    JndiContext.lookup("java:comp/env/jms/xaQCF")
    XAQueueConnection xaQueueConnection =
        XaQueueConnectionFactory.createXAQueueConnection();
// 获取XA队列会话
XAQueueSession xaQueueSession = xaQueueConnection.
createXAQueueSession();
// 从会话获取XA资源
XAResource jmsXAResource = xaQueueSession.getXAResource();
// 在全局事务中"征募"该XAResource
globalTxn.enlist(jmsXAResource);
// 其他处理工作...
// 提交全局事务
globalTxn.commit();



  总结:本文介绍了JMS及其在WSAD环境下的编程,探讨了JMS异步通信的主要优点,以及两种通信方式(P2P和Pub/Sub)、MDB、JMS事务、二阶段提交全局事务等方面的概念。希望本文的介绍对你有所帮助。

分享到:
评论

相关推荐

    WSAD环境下JMS异步通信全攻略

    如前所述,在JMS之前,J2EE原来是一个建立在Java RMI-IIOP通信协议基础上的同步环境,但MDB却具有接收异步消息的能力。  异步通信使得企业应用能够建立在一种全新的通信机制之上,它具有如下重要优点:  ■ ...

    JMS异步通信

    在WSAD(WebSphere Application Developer)环境下,开发者可以利用集成工具进行JMS的开发、测试和调试,包括创建和配置JMS资源,如队列和主题,以及开发和部署MDB。通过截图和教程,WSAD提供了一个全面的平台,帮助...

    WebSphere Studio Application DeveloperV5.0入门&WSAD环境下CTP的搭建和调试

    WSAD5.0学习文档 主要讲解WebSphere Studio Application DeveloperV5.0。包括:产品概述,应用程序开发工具,工作台基本概念,“Struts贸易样本”简介,Struts...还有WSAD环境下的CTP调试方法,CTP在WSAD环境上的搭建等

    WSAD开发EJB.pdf

    - **Java Messaging Service (JMS)**:描述了JMS如何支持应用程序之间基于消息的异步通信。 - **JavaMail**:本书也简要提及了JavaMail API,它用于发送电子邮件。 通过以上内容,我们可以看出《WSAD开发EJB》这...

    使用WSAD开发WEB应用(IBM Software Institute)

    5. 服务层:J2EE平台提供了多种服务,如JMS(Java Message Service)用于异步通信,JTA(Java Transaction API)管理事务,JNDI(Java Naming and Directory Interface)进行对象查找等。 WSAD作为IBM的开发工具,...

    wsad 操作指南 英文文档

    "IBM WebSphere Studio Application Developer应用开发指南"是了解和掌握WSAD的权威参考资料,涵盖了从基础操作到高级特性的全方位指导。同时,IBM官方网站提供了丰富的在线帮助和社区支持,帮助开发者解决问题和...

    !!!!!!!!WSAD安装及使用

    在IT开发领域,WSAD(WebSphere Studio Application Developer)是IBM提供的一款强大的集成开发环境,尤其适用于基于Java的Web应用程序和企业级应用的开发。本文将详细阐述WSAD的安装过程以及如何使用它来构建和发布...

    2.1_WSAD基本操作

    WSAD(WebSphere Studio Application Developer)是一款强大的集成开发环境,特别适合于 Web 应用的开发。通过对工作台、透视图、视图、编辑器等核心概念的理解,以及掌握如何打开、关闭和保存透视图、如何使用视图...

    wsadwsadwsad

    3. **集成过程**:学习如何在Websphere环境中设置和启用wsad控制,可能涉及到的配置文件修改、服务器插件安装等步骤。 4. **操作指南**:掌握wsad控制的具体操作,如启动、停止、监控服务器状态、部署应用等。 5. ...

    WSAD开发讲解教程

    ### 3. Web 应用模型 - **定义**:Web 应用模型主要基于浏览器/服务器(B/S)结构。 - **组成**:包括客户端浏览器、Web 服务器和 Web 应用服务器。 - **客户端浏览器**:如 Microsoft Internet Explorer 或 Netscape...

    允许选择工作区 for WSAD 5.0&5.1

    允许选择工作区 for WSAD 5.0&5.1

    VC++全局钩子WSAD控制鼠标

    在VC++(Visual C++)环境下,实现全局钩子通常需要以下步骤: 1. **定义钩子函数**:创建一个函数,该函数将在钩子被触发时被调用。对于键盘和鼠标事件,这个函数通常会接收`KBDLLHOOKSTRUCT`或`MSLLHOOKSTRUCT`...

    websphere+wsad+db2教程介绍

    在Windows或Unix/Linux环境下,这个过程可能有所不同,但通常包括配置TCP/IP网络连接、设置数据库参数和启动服务。 【安装Websphere】 Websphere的安装通常包括选择安装类型(如服务器仅安装、开发工具安装等)、...

    webSphere教材wsad5入门介绍

    IBM WebSphere Studio Application Developer (WSAD) V5.1 是一款功能强大的集成开发环境(IDE),专为Java应用开发而设计。它支持Java EE标准,并提供了丰富的工具集来简化Java应用程序的开发、测试和部署过程。...

    WSAD和WAS的使用总结

    7. 源代码管理:在WSAD中发布时选择“包含源代码”,可以在发布目录下的EAR文件中直接添加或修改JSP和class文件,操作方式类似于Tomcat。 8. 调试环境:WSAD自带了一个WAS5的服务器运行环境,可以直接进行调试,...

    wsad上下左右0000000000000



    WSAD512toJSE8.pdf

    3. **IDE 插件**:某些特定于 WSAD 的插件可能不再适用于 JSE 8,需要寻找替代方案或开发自定义插件。 4. **代码规范**:不同的 IDE 可能有不同的编码风格指南,开发者需要根据新环境的要求调整代码风格。 #### 七...

    Websphere.Application.Developer(WSAD)使用外置WinCVS解决方案

    Websphere Application Developer(WSAD),作为一款功能全面的J2EE集成开发环境(IDE),内置了多种开发工具,包括版本控制系统Concurrent Version System(CVS)的客户端。然而,WSAD内置的CVS客户端存在操作不便...

Global site tag (gtag.js) - Google Analytics