`

MQ study

 
阅读更多
一、简述
EJB2.0开始,引进了消息驱动的EJB,简称MDB(message driven bean)。当MOM收到消息时,能够自动传达给这种Bean。跟事件驱动一个道理。注:只是接收驱动。
EJB2.1,MDB又得到了加强,能够处理非JMS的消息,如:MailMessage,SMSMessage,SOAPMessage。

二、例子
见上章提到的源码。
(1)MDBQueueBean.java  这是EJB,它不需要其它什么 Home,Remote等接口。
(2)ejb-jar.xml  EJB描述。跟其它类型的EJB的描述大不一样。
(3)jboss.xml   JBOSS部署的描述。
   需要部署的就以上三个文件。见myMDB.jar
(4)MDBClient.java 这是一个客户端程序,用来发送信息的,执行后,服务端的MDB就能接收消息作出响应了。

三、MDB的不同之处:
在编程上,对比普通程序的JMS,MDB只是在初始化时少了定位消息目标和注册监听的语句而已:
      QueueReceiver queueReceiver = queueSession.createReceiver("queue/testQueue");
      queueReceiver.setMessageListener(this);

感觉:版本这东西真麻烦。买了一本《J2EE应用开发详解》(电子工业出版社,飞思科技监制),里面的例子行不通,主要是配置文件,简直连编译也通不过。
   摸了一天多,终于在jboss网站找到帮助文件:救星呀:
http://docs.jboss.org/jbossas/admindevel326/html/ch6.chapt.html#d0e11155
   不过,在我的环境中,我还得做一点点小修改。见源码。。。



附:
(1)MDBQueueBean.java
package study.jms;

import javax.ejb.MessageDrivenBean;
import javax.jms.MessageListener;
import javax.ejb.MessageDrivenContext;
//import javax.ejb.CreateException;
import javax.jms.Message;

/**add by zengabo */
import javax.ejb.EJBException;
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import java.util.HashMap;
//import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.TextMessage;

public class MDBQueueBean
    implements MessageDrivenBean, MessageListener {
  MessageDrivenContext messageDrivenContext;

  /** add by zengabo */
  private Context myContext;
  private QueueConnectionFactory queueConnectionFactory;
  private String queueConnectionFactoryName = "java:comp/env/jms/QCF";
  private QueueConnection queueConnection;
  private QueueSession queueSession;
  private int i_count = 0 ;

  public MDBQueueBean(){

  }
  public void ejbCreate() { //throws CreateException {
    System.out.println(" ejbCreate() is time: "+Integer.toString(++i_count));
    try{
       init();
    }catch (Exception e) {
      System.out.println(" init QueueMDB error"+e.toString());
      throw new EJBException(" init QueueMDB error", e);
    }
 }
  private void init() throws JMSException, NamingException {
    myContext =  new InitialContext();
    /** lookup 1 */
    Object obj = myContext.lookup(queueConnectionFactoryName);
    queueConnectionFactory = (QueueConnectionFactory) obj;
    queueConnection = queueConnectionFactory.createQueueConnection();
    queueConnection.start();
    queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
    /** in MDB , these are not use .
      QueueReceiver queueReceiver =null;
      queueReceiver = queueSession.createReceiver("queue/testQueue");
      queueReceiver.setMessageListener(this);
     *
     * */
  }

  public void ejbRemove() {
      this.messageDrivenContext = null ;
      try{
      if (queueSession != null) {
         queueSession.close();
       }
       if (queueConnection != null) {
         queueConnection.close();
       }
     }catch (Exception e) {
    }
  }

  public void onMessage(Message message) {
    if (message instanceof javax.jms.BytesMessage) {
      javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
      /** @todo Process bytesMessage here */
      System.out.println("**** get bytesMessage:") ;
      System.out.println(bytesMessage.toString()) ;
    }
    else if (message instanceof javax.jms.MapMessage) {
      javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message;
      /** @todo Process mapMessage here */
      System.out.println("**** get mapMessage:") ;
      System.out.println(mapMessage.toString()) ;
    }
    else if (message instanceof javax.jms.ObjectMessage) {
      javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message;
      /** @todo Process objectMessage here */
      System.out.println("**** get ObjectMessage:") ;
      HashMap hm = null;
      try {
        hm = (HashMap) objectMessage.getObject();
      }
      catch (JMSException ex2) {
        System.out.println(ex2.toString());
      }
      System.out.println(hm.get("name")) ;
      System.out.println(hm.get("age")) ;
      System.out.println(hm.get("sex")) ;
    }
    else if (message instanceof javax.jms.StreamMessage) {
      javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message;
      /** @todo Process streamMessage here */
      System.out.println("**** get StreamMessage:") ;
      System.out.println(streamMessage.toString()) ;
    }
    else if (message instanceof javax.jms.TextMessage) {
      javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message;
      /** @todo Process textMessage here */
      String rev = "";
      System.out.println("**** get textMessage:") ;
      try {
        rev =  objectMessage.getText();
        System.out.println(rev);
      }
      catch (JMSException ex1) {
        System.out.println(ex1.toString());
      }
    }
    else{
      System.out.println("**** Message is unknow type") ;
      return ;
    }
    /** 回复 */
    try{
      Queue dest = (Queue) message.getJMSReplyTo();
      sendReply(" jboss recv ok", dest);
    }catch(Exception ee){
      System.out.println("**** Reply Message Error"+ee.toString()) ;
    }
  }
  private void sendReply(String text, Queue dest) throws JMSException {
    QueueSender sender = queueSession.createSender(dest);
    TextMessage tm = queueSession.createTextMessage(text);
    sender.send(tm);
    sender.close();
  }

  public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
    this.messageDrivenContext = messageDrivenContext;
  }
}

(2)ejb-jar.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd">
<ejb-jar>
  <display-name>myMDB</display-name>
  <enterprise-beans>
    <message-driven>
      <ejb-name>MDBQueue</ejb-name>
      <ejb-class>study.jms.MDBQueueBean</ejb-class>
      <transaction-type>Container</transaction-type>
      <message-driven-destination>
        <destination-type>javax.jms.Queue</destination-type>
      </message-driven-destination>
      <resource-ref>
        <description>test mdb</description>
        <res-ref-name>jms/QCF</res-ref-name>
        <res-type>javax.jms.QueueConnectionFactory</res-type>
        <res-auth>Container</res-auth>
      </resource-ref>
    </message-driven>
  </enterprise-beans>
  <assembly-descriptor>
    <container-transaction>
      <method>
        <ejb-name>MDBQueue</ejb-name>
        <method-name>*</method-name>
      </method>
      <trans-attribute>Required</trans-attribute>
    </container-transaction>
  </assembly-descriptor>
</ejb-jar>

(3)jboss.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE jboss PUBLIC "-//JBoss//DTD JBOSS 3.2//EN" "http://www.jboss.org/j2ee/dtd/jboss_3_2.dtd">
<jboss>
  <enterprise-beans>
    <message-driven>
      <ejb-name>MDBQueue</ejb-name>
      <destination-jndi-name>queue/B</destination-jndi-name>
      <configuration-name>Standard Message Driven Bean</configuration-name>
      <resource-ref>
        <res-ref-name>jms/QCF</res-ref-name>
        <jndi-name>ConnectionFactory</jndi-name>
      </resource-ref>
    </message-driven>
  </enterprise-beans>
</jboss>

(4)MDBClient.java
package study.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import EDU.oswego.cs.dl.util.concurrent.CountDown;

import java.util.Hashtable;
import javax.naming.Context;

/**
 *  A complete JMS client example program that sends N TextMessages to
 *  a Queue B and asynchronously receives the messages as modified by
 *  TextMDB from Queue A.
 *
 *  @author Scott.Stark@jboss.org
 *  @version $Revision: 1.10 $
 */
public class MDBClient
{
    static final int N = 10;
    static CountDown done = new CountDown(N);

    QueueConnection conn;
    QueueSession session;
    Queue queA;
    Queue queB;

    public static class ExListener
        implements MessageListener
    {
        public void onMessage(Message msg)
        {
            done.release();
            TextMessage tm = (TextMessage) msg;
            try {
                System.out.println("onMessage, recv text="+tm.getText());
            } catch(Throwable t) {
                t.printStackTrace();
            }
        }
    }

    public void setupPTP()
        throws JMSException, NamingException
    {
//        InitialContext iniCtx =  new InitialContext();
        InitialContext iniCtx =  getInitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory"); //java:/XAConnectionFactory ConnectionFactory
        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
        conn = qcf.createQueueConnection();
        queA = (Queue) iniCtx.lookup("queue/A");
        queB = (Queue) iniCtx.lookup("queue/B");
        session = conn.createQueueSession(false,
                                          QueueSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    private InitialContext getInitialContext() throws NamingException {
      Hashtable environment = new Hashtable();
      environment.put(Context.INITIAL_CONTEXT_FACTORY,
                      "org.jnp.interfaces.NamingContextFactory");
      environment.put(Context.URL_PKG_PREFIXES,
                      "org.jboss.naming:org.jnp.interfaces");
      environment.put(Context.PROVIDER_URL, "jnp://localhost:1099");
      return new InitialContext(environment);
    }

    public void sendRecvAsync(String textBase)
        throws JMSException, NamingException, InterruptedException
    {
        System.out.println("Begin sendRecvAsync");

        // Setup the PTP connection, session
        setupPTP();

        // Set the async listener for queA
        QueueReceiver recv = session.createReceiver(queA);
        recv.setMessageListener(new ExListener());

        // Send a few text msgs to queB
        QueueSender send = session.createSender(queB);

        for(int m = 0; m < 10; m ++) {
            TextMessage tm = session.createTextMessage(textBase+"#"+m);
            tm.setJMSReplyTo(queA);
            send.send(tm);
            System.out.println("sendRecvAsync, sent text="+tm.getText());
        }
        System.out.println("End sendRecvAsync");
    }

    public void stop()
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    public static void main(String args[])
        throws Exception
    {
        System.out.println("Begin MDBClient,now=" +
                           System.currentTimeMillis());
        MDBClient client = new MDBClient();
        client.sendRecvAsync("A text msg");
        client.done.acquire();
        client.stop();
        System.exit(0);
        System.out.println("End MDBClient");
    }

}
分享到:
评论

相关推荐

    Ex4_to_mq4-225

    【描述】"Ex4_to_mq4 -225 use for study only" 暗示这个版本可能是一个学习或研究用的工具,不推荐用于商业或实际交易环境。使用此工具的目的是为了学习和理解EA的工作原理,或者对现有的EA进行修改和优化。由于它...

    rabbitmq 路由 java 实现

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,用于解耦应用程序的不同组件,提高系统的可扩展性和稳定性。RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的开源消息代理软件,广泛应用...

    spring-boot-study:spring boot note--我自己的学习笔记,还有一些demo。包含docker、攻击技术、设计模式、安全通讯、redis集群等。还有一些mq的使用、aop日志、批量操作sql等

    web web项目继承自spring-boot-study api 接口统一定义,model统一定义 common 公共模块 controller 控制器 dao 数据访问层 service 服务层 #启动项目说明: 要安装mysql,创建数据库,导入doc目录下的sql文件 启动...

    study-archive:专题研究材料

    在消息队列(MQ)中,"兔子MQ"(RabbitMQ)是一种广泛使用的开源消息代理,它遵循AMQP(Advanced Message Queuing Protocol)协议,能够实现高可用性和可扩展性。作为动物园管理员,我们需要理解和掌握如何设置、...

    IBM Redbook - Case Study: AIX and WebSphere in an Enterprise Infrastructure

    WebSphere MQ则被用作中间件,提供高可用性的消息传输服务,确保数据在不同系统间的无缝流动。WebSphere Portal Server与Web Services for Remote Portlets(WSRP)结合,为用户提供统一的门户体验,集中管理各种...

    elasticsearch-study:开个新坑学习ElasticSearch,过几天写个项目把最近学的都整合进去

    ELK已经学习完毕,目前考虑将最近所学的ELK整合Cloud结合MQ和EasyExcel 做一个文件管理项目,或报表项目。目前整合想法是,L同步数据库到ES上, 发生增删改,同时调用L,然后有数据优先找ES,没有就Redis缓存,再...

    rabbitmq-microservice-study:微服务中rabbitmq的使用

    1.rabbitMQ消息中间件(注意:安装mq之前先安装erlang,原因在于RabbitMQ服务端代码是使用并发式语言erlang编写的) 2.Redis(这里redis的目的是保存消费次数) 二、Mq消息机制: 1、mq消息机制几个重要的组件: 1)、...

    ocp_java11_complete_study_guide.txt

    改后缀 epub

    springboot_study:springboot学习

    springboot_studyspringboot学习springboot-security springboot整合权限认证springboot_MQ 整合RabbitMQspringboot_rabbir 整合RabbitMQspringboot_RocketMQ 整合RocketMQ 入门 事务处理springboot_design 设计模式...

    【Java初学者】消息中间件得使用实践.zip

    在本压缩包中,mq-study-master文件夹可能包含了多种示例代码、教程、配置文件等资源,这些资源有助于Java初学者了解和掌握消息中间件的使用。例如,RabbitMQ的教程可能包括如何安装和配置RabbitMQ服务器,如何使用...

    企业软件架构与设计模式ppt

    “第五讲:企业应用集成模式”可能详细介绍了不同的集成策略,如API Gateway用于统一接口,企业级集成平台(如IBM WebSphere MQ或Apache Kafka)用于消息传递,以及适配器模式以兼容不同系统的接口。 “第六讲:...

    学习英语链接.txt

    - **ESL**: &lt;http://www.nceltr.mq.edu.au/ESL&gt; - **Eleaston.com**: - **Study.com**: &lt;http://www.study.com/resources.html&gt; - **TeachingEnglish**: - **Online Courses**: - **NLL**: - **Wicked Stuff**: ...

Global site tag (gtag.js) - Google Analytics