`

MQ study

 
阅读更多
一、简述
EJB2.0开始,引进了消息驱动的EJB,简称MDB(message driven bean)。当MOM收到消息时,能够自动传达给这种Bean。跟事件驱动一个道理。注:只是接收驱动。
EJB2.1,MDB又得到了加强,能够处理非JMS的消息,如:MailMessage,SMSMessage,SOAPMessage。

二、例子
见上章提到的源码。
(1)MDBQueueBean.java  这是EJB,它不需要其它什么 Home,Remote等接口。
(2)ejb-jar.xml  EJB描述。跟其它类型的EJB的描述大不一样。
(3)jboss.xml   JBOSS部署的描述。
   需要部署的就以上三个文件。见myMDB.jar
(4)MDBClient.java 这是一个客户端程序,用来发送信息的,执行后,服务端的MDB就能接收消息作出响应了。

三、MDB的不同之处:
在编程上,对比普通程序的JMS,MDB只是在初始化时少了定位消息目标和注册监听的语句而已:
      QueueReceiver queueReceiver = queueSession.createReceiver("queue/testQueue");
      queueReceiver.setMessageListener(this);

感觉:版本这东西真麻烦。买了一本《J2EE应用开发详解》(电子工业出版社,飞思科技监制),里面的例子行不通,主要是配置文件,简直连编译也通不过。
   摸了一天多,终于在jboss网站找到帮助文件:救星呀:
http://docs.jboss.org/jbossas/admindevel326/html/ch6.chapt.html#d0e11155
   不过,在我的环境中,我还得做一点点小修改。见源码。。。



附:
(1)MDBQueueBean.java
package study.jms;

import javax.ejb.MessageDrivenBean;
import javax.jms.MessageListener;
import javax.ejb.MessageDrivenContext;
//import javax.ejb.CreateException;
import javax.jms.Message;

/**add by zengabo */
import javax.ejb.EJBException;
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import java.util.HashMap;
//import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.TextMessage;

public class MDBQueueBean
    implements MessageDrivenBean, MessageListener {
  MessageDrivenContext messageDrivenContext;

  /** add by zengabo */
  private Context myContext;
  private QueueConnectionFactory queueConnectionFactory;
  private String queueConnectionFactoryName = "java:comp/env/jms/QCF";
  private QueueConnection queueConnection;
  private QueueSession queueSession;
  private int i_count = 0 ;

  public MDBQueueBean(){

  }
  public void ejbCreate() { //throws CreateException {
    System.out.println(" ejbCreate() is time: "+Integer.toString(++i_count));
    try{
       init();
    }catch (Exception e) {
      System.out.println(" init QueueMDB error"+e.toString());
      throw new EJBException(" init QueueMDB error", e);
    }
 }
  private void init() throws JMSException, NamingException {
    myContext =  new InitialContext();
    /** lookup 1 */
    Object obj = myContext.lookup(queueConnectionFactoryName);
    queueConnectionFactory = (QueueConnectionFactory) obj;
    queueConnection = queueConnectionFactory.createQueueConnection();
    queueConnection.start();
    queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
    /** in MDB , these are not use .
      QueueReceiver queueReceiver =null;
      queueReceiver = queueSession.createReceiver("queue/testQueue");
      queueReceiver.setMessageListener(this);
     *
     * */
  }

  public void ejbRemove() {
      this.messageDrivenContext = null ;
      try{
      if (queueSession != null) {
         queueSession.close();
       }
       if (queueConnection != null) {
         queueConnection.close();
       }
     }catch (Exception e) {
    }
  }

  public void onMessage(Message message) {
    if (message instanceof javax.jms.BytesMessage) {
      javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
      /** @todo Process bytesMessage here */
      System.out.println("**** get bytesMessage:") ;
      System.out.println(bytesMessage.toString()) ;
    }
    else if (message instanceof javax.jms.MapMessage) {
      javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message;
      /** @todo Process mapMessage here */
      System.out.println("**** get mapMessage:") ;
      System.out.println(mapMessage.toString()) ;
    }
    else if (message instanceof javax.jms.ObjectMessage) {
      javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message;
      /** @todo Process objectMessage here */
      System.out.println("**** get ObjectMessage:") ;
      HashMap hm = null;
      try {
        hm = (HashMap) objectMessage.getObject();
      }
      catch (JMSException ex2) {
        System.out.println(ex2.toString());
      }
      System.out.println(hm.get("name")) ;
      System.out.println(hm.get("age")) ;
      System.out.println(hm.get("sex")) ;
    }
    else if (message instanceof javax.jms.StreamMessage) {
      javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message;
      /** @todo Process streamMessage here */
      System.out.println("**** get StreamMessage:") ;
      System.out.println(streamMessage.toString()) ;
    }
    else if (message instanceof javax.jms.TextMessage) {
      javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message;
      /** @todo Process textMessage here */
      String rev = "";
      System.out.println("**** get textMessage:") ;
      try {
        rev =  objectMessage.getText();
        System.out.println(rev);
      }
      catch (JMSException ex1) {
        System.out.println(ex1.toString());
      }
    }
    else{
      System.out.println("**** Message is unknow type") ;
      return ;
    }
    /** 回复 */
    try{
      Queue dest = (Queue) message.getJMSReplyTo();
      sendReply(" jboss recv ok", dest);
    }catch(Exception ee){
      System.out.println("**** Reply Message Error"+ee.toString()) ;
    }
  }
  private void sendReply(String text, Queue dest) throws JMSException {
    QueueSender sender = queueSession.createSender(dest);
    TextMessage tm = queueSession.createTextMessage(text);
    sender.send(tm);
    sender.close();
  }

  public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
    this.messageDrivenContext = messageDrivenContext;
  }
}

(2)ejb-jar.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd">
<ejb-jar>
  <display-name>myMDB</display-name>
  <enterprise-beans>
    <message-driven>
      <ejb-name>MDBQueue</ejb-name>
      <ejb-class>study.jms.MDBQueueBean</ejb-class>
      <transaction-type>Container</transaction-type>
      <message-driven-destination>
        <destination-type>javax.jms.Queue</destination-type>
      </message-driven-destination>
      <resource-ref>
        <description>test mdb</description>
        <res-ref-name>jms/QCF</res-ref-name>
        <res-type>javax.jms.QueueConnectionFactory</res-type>
        <res-auth>Container</res-auth>
      </resource-ref>
    </message-driven>
  </enterprise-beans>
  <assembly-descriptor>
    <container-transaction>
      <method>
        <ejb-name>MDBQueue</ejb-name>
        <method-name>*</method-name>
      </method>
      <trans-attribute>Required</trans-attribute>
    </container-transaction>
  </assembly-descriptor>
</ejb-jar>

(3)jboss.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE jboss PUBLIC "-//JBoss//DTD JBOSS 3.2//EN" "http://www.jboss.org/j2ee/dtd/jboss_3_2.dtd">
<jboss>
  <enterprise-beans>
    <message-driven>
      <ejb-name>MDBQueue</ejb-name>
      <destination-jndi-name>queue/B</destination-jndi-name>
      <configuration-name>Standard Message Driven Bean</configuration-name>
      <resource-ref>
        <res-ref-name>jms/QCF</res-ref-name>
        <jndi-name>ConnectionFactory</jndi-name>
      </resource-ref>
    </message-driven>
  </enterprise-beans>
</jboss>

(4)MDBClient.java
package study.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import EDU.oswego.cs.dl.util.concurrent.CountDown;

import java.util.Hashtable;
import javax.naming.Context;

/**
 *  A complete JMS client example program that sends N TextMessages to
 *  a Queue B and asynchronously receives the messages as modified by
 *  TextMDB from Queue A.
 *
 *  @author Scott.Stark@jboss.org
 *  @version $Revision: 1.10 $
 */
public class MDBClient
{
    static final int N = 10;
    static CountDown done = new CountDown(N);

    QueueConnection conn;
    QueueSession session;
    Queue queA;
    Queue queB;

    public static class ExListener
        implements MessageListener
    {
        public void onMessage(Message msg)
        {
            done.release();
            TextMessage tm = (TextMessage) msg;
            try {
                System.out.println("onMessage, recv text="+tm.getText());
            } catch(Throwable t) {
                t.printStackTrace();
            }
        }
    }

    public void setupPTP()
        throws JMSException, NamingException
    {
//        InitialContext iniCtx =  new InitialContext();
        InitialContext iniCtx =  getInitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory"); //java:/XAConnectionFactory ConnectionFactory
        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
        conn = qcf.createQueueConnection();
        queA = (Queue) iniCtx.lookup("queue/A");
        queB = (Queue) iniCtx.lookup("queue/B");
        session = conn.createQueueSession(false,
                                          QueueSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    private InitialContext getInitialContext() throws NamingException {
      Hashtable environment = new Hashtable();
      environment.put(Context.INITIAL_CONTEXT_FACTORY,
                      "org.jnp.interfaces.NamingContextFactory");
      environment.put(Context.URL_PKG_PREFIXES,
                      "org.jboss.naming:org.jnp.interfaces");
      environment.put(Context.PROVIDER_URL, "jnp://localhost:1099");
      return new InitialContext(environment);
    }

    public void sendRecvAsync(String textBase)
        throws JMSException, NamingException, InterruptedException
    {
        System.out.println("Begin sendRecvAsync");

        // Setup the PTP connection, session
        setupPTP();

        // Set the async listener for queA
        QueueReceiver recv = session.createReceiver(queA);
        recv.setMessageListener(new ExListener());

        // Send a few text msgs to queB
        QueueSender send = session.createSender(queB);

        for(int m = 0; m < 10; m ++) {
            TextMessage tm = session.createTextMessage(textBase+"#"+m);
            tm.setJMSReplyTo(queA);
            send.send(tm);
            System.out.println("sendRecvAsync, sent text="+tm.getText());
        }
        System.out.println("End sendRecvAsync");
    }

    public void stop()
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    public static void main(String args[])
        throws Exception
    {
        System.out.println("Begin MDBClient,now=" +
                           System.currentTimeMillis());
        MDBClient client = new MDBClient();
        client.sendRecvAsync("A text msg");
        client.done.acquire();
        client.stop();
        System.exit(0);
        System.out.println("End MDBClient");
    }

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics