`

值得借鉴的JMS程序代码

阅读更多
(1)MDBQueueBean.java
package study.jms;

import javax.ejb.MessageDrivenBean;
import javax.jms.MessageListener;
import javax.ejb.MessageDrivenContext;
//import javax.ejb.CreateException;
import javax.jms.Message;

/**add by zengabo */
import javax.ejb.EJBException;
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import java.util.HashMap;
//import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.TextMessage;

public class MDBQueueBean
    implements MessageDrivenBean, MessageListener {
  MessageDrivenContext messageDrivenContext;

  /** add by zengabo */
  private Context myContext;
  private QueueConnectionFactory queueConnectionFactory;
  private String queueConnectionFactoryName = "java:comp/env/jms/QCF";
  private QueueConnection queueConnection;
  private QueueSession queueSession;
  private int i_count = 0 ;

  public MDBQueueBean(){

  }
  public void ejbCreate() { //throws CreateException {
    System.out.println(" ejbCreate() is time: "+Integer.toString(++i_count));
    try{
       init();
    }catch (Exception e) {
      System.out.println(" init QueueMDB error"+e.toString());
      throw new EJBException(" init QueueMDB error", e);
    }
}
  private void init() throws JMSException, NamingException {
    myContext =  new InitialContext();
    /** lookup 1 */
    Object obj = myContext.lookup(queueConnectionFactoryName);
    queueConnectionFactory = (QueueConnectionFactory) obj;
    queueConnection = queueConnectionFactory.createQueueConnection();
    queueConnection.start();
    queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
    /** in MDB , these are not use .
      QueueReceiver queueReceiver =null;
      queueReceiver = queueSession.createReceiver("queue/testQueue");
      queueReceiver.setMessageListener(this);
     *
     * */
  }

  public void ejbRemove() {
      this.messageDrivenContext = null ;
      try{
      if (queueSession != null) {
         queueSession.close();
       }
       if (queueConnection != null) {
         queueConnection.close();
       }
     }catch (Exception e) {
    }
  }

  public void onMessage(Message message) {
    if (message instanceof javax.jms.BytesMessage) {
      javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
      /** @todo Process bytesMessage here */
      System.out.println("**** get bytesMessage:") ;
      System.out.println(bytesMessage.toString()) ;
    }
    else if (message instanceof javax.jms.MapMessage) {
      javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message;
      /** @todo Process mapMessage here */
      System.out.println("**** get mapMessage:") ;
      System.out.println(mapMessage.toString()) ;
    }
    else if (message instanceof javax.jms.ObjectMessage) {
      javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message;
      /** @todo Process objectMessage here */
      System.out.println("**** get ObjectMessage:") ;
      HashMap hm = null;
      try {
        hm = (HashMap) objectMessage.getObject();
      }
      catch (JMSException ex2) {
        System.out.println(ex2.toString());
      }
      System.out.println(hm.get("name")) ;
      System.out.println(hm.get("age")) ;
      System.out.println(hm.get("sex")) ;
    }
    else if (message instanceof javax.jms.StreamMessage) {
      javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message;
      /** @todo Process streamMessage here */
      System.out.println("**** get StreamMessage:") ;
      System.out.println(streamMessage.toString()) ;
    }
    else if (message instanceof javax.jms.TextMessage) {
      javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message;
      /** @todo Process textMessage here */
      String rev = "";
      System.out.println("**** get textMessage:") ;
      try {
        rev =  objectMessage.getText();
        System.out.println(rev);
      }
      catch (JMSException ex1) {
        System.out.println(ex1.toString());
      }
    }
    else{
      System.out.println("**** Message is unknow type") ;
      return ;
    }
    /** 回复 */
    try{
      Queue dest = (Queue) message.getJMSReplyTo();
      sendReply(" jboss recv ok", dest);
    }catch(Exception ee){
      System.out.println("**** Reply Message Error"+ee.toString()) ;
    }
  }
  private void sendReply(String text, Queue dest) throws JMSException {
    QueueSender sender = queueSession.createSender(dest);
    TextMessage tm = queueSession.createTextMessage(text);
    sender.send(tm);
    sender.close();
  }

  public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
    this.messageDrivenContext = messageDrivenContext;
  }
}

(2)ejb-jar.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd">
<ejb-jar>
  <display-name>myMDB</display-name>
  <enterprise-beans>
    <message-driven>
      <ejb-name>MDBQueue</ejb-name>
      <ejb-class>study.jms.MDBQueueBean</ejb-class>
      <transaction-type>Container</transaction-type>
      <message-driven-destination>
        <destination-type>javax.jms.Queue</destination-type>
      </message-driven-destination>
      <resource-ref>
        <description>test mdb</description>
        <res-ref-name>jms/QCF</res-ref-name>
        <res-type>javax.jms.QueueConnectionFactory</res-type>
        <res-auth>Container</res-auth>
      </resource-ref>
    </message-driven>
  </enterprise-beans>
  <assembly-descriptor>
    <container-transaction>
      <method>
        <ejb-name>MDBQueue</ejb-name>
        <method-name>*</method-name>
      </method>
      <trans-attribute>Required</trans-attribute>
    </container-transaction>
  </assembly-descriptor>
</ejb-jar>

(3)jboss.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE jboss PUBLIC "-//JBoss//DTD JBOSS 3.2//EN" "http://www.jboss.org/j2ee/dtd/jboss_3_2.dtd">
<jboss>
  <enterprise-beans>
    <message-driven>
      <ejb-name>MDBQueue</ejb-name>
      <destination-jndi-name>queue/B</destination-jndi-name>
      <configuration-name>Standard Message Driven Bean</configuration-name>
      <resource-ref>
        <res-ref-name>jms/QCF</res-ref-name>
        <jndi-name>ConnectionFactory</jndi-name>
      </resource-ref>
    </message-driven>
  </enterprise-beans>
</jboss>

(4)MDBClient.java
package study.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import EDU.oswego.cs.dl.util.concurrent.CountDown;

import java.util.Hashtable;
import javax.naming.Context;

/**
*  A complete JMS client example program that sends N TextMessages to
*  a Queue B and asynchronously receives the messages as modified by
*  TextMDB from Queue A.
*
*  @author Scott.Stark@jboss.org
*  @version $Revision: 1.10 $
*/
public class MDBClient
{
    static final int N = 10;
    static CountDown done = new CountDown(N);

    QueueConnection conn;
    QueueSession session;
    Queue queA;
    Queue queB;

    public static class ExListener
        implements MessageListener
    {
        public void onMessage(Message msg)
        {
            done.release();
            TextMessage tm = (TextMessage) msg;
            try {
                System.out.println("onMessage, recv text="+tm.getText());
            } catch(Throwable t) {
                t.printStackTrace();
            }
        }
    }

    public void setupPTP()
        throws JMSException, NamingException
    {
//        InitialContext iniCtx =  new InitialContext();
        InitialContext iniCtx =  getInitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory"); //java:/XAConnectionFactory ConnectionFactory
        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
        conn = qcf.createQueueConnection();
        queA = (Queue) iniCtx.lookup("queue/A");
        queB = (Queue) iniCtx.lookup("queue/B");
        session = conn.createQueueSession(false,
                                          QueueSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    private InitialContext getInitialContext() throws NamingException {
      Hashtable environment = new Hashtable();
      environment.put(Context.INITIAL_CONTEXT_FACTORY,
                      "org.jnp.interfaces.NamingContextFactory");
      environment.put(Context.URL_PKG_PREFIXES,
                      "org.jboss.naming:org.jnp.interfaces");
      environment.put(Context.PROVIDER_URL, "jnp://localhost:1099");
      return new InitialContext(environment);
    }

    public void sendRecvAsync(String textBase)
        throws JMSException, NamingException, InterruptedException
    {
        System.out.println("Begin sendRecvAsync");

        // Setup the PTP connection, session
        setupPTP();

        // Set the async listener for queA
        QueueReceiver recv = session.createReceiver(queA);
        recv.setMessageListener(new ExListener());

        // Send a few text msgs to queB
        QueueSender send = session.createSender(queB);

        for(int m = 0; m < 10; m ++) {
            TextMessage tm = session.createTextMessage(textBase+"#"+m);
            tm.setJMSReplyTo(queA);
            send.send(tm);
            System.out.println("sendRecvAsync, sent text="+tm.getText());
        }
        System.out.println("End sendRecvAsync");
    }

    public void stop()
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    public static void main(String args[])
        throws Exception
    {
        System.out.println("Begin MDBClient,now=" +
                           System.currentTimeMillis());
        MDBClient client = new MDBClient();
        client.sendRecvAsync("A text msg");
        client.done.acquire();
        client.stop();
        System.exit(0);
        System.out.println("End MDBClient");
    }

}
分享到:
评论

相关推荐

    java JMS代码示例

    下面我们将通过一个简单的Java JMS代码示例来理解其基本用法: 1. **JMS提供者选择** 在开始之前,你需要选择一个JMS提供者,例如Apache ActiveMQ、IBM WebSphere MQ或RabbitMQ。这里假设我们使用ActiveMQ,首先...

    SpringJMS示例代码

    在本文中,我们将深入探讨SpringJMS的基本概念、如何与ActiveMQ集成,以及如何通过示例代码理解其工作原理。 1. **SpringJMS简介** SpringJMS是Spring框架对JMS API的包装,它简化了生产者和消费者之间的消息通信...

    jms示例代码

    本示例代码将帮助我们深入理解JMS的基本用法和概念。 JMS的核心组件包括生产者(Producer)、消费者(Consumer)和消息队列(Message Queue)。生产者创建并发送消息,消费者接收并处理这些消息,而消息队列则作为...

    javax.jms.jar包与JMS实例代码

    `javax.jms.jar` 包是实现JMS规范的核心库,包含了JMS接口和实现类,使得开发者可以轻松地在Java应用程序中集成消息服务。 JMS主要包含两种消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe...

    jms代码

    在IT领域,特别是Java企业级应用开发中,JMS(Java Message Service)是一个不可或缺的技术,它为应用程序提供了异步通信的能力。从给定的代码片段来看,主要涉及到JMS消息驱动Bean(MDB)的实现,这在Java EE环境中...

    JMS规范培训教程、JMS源代码

    Java消息服务(Java Message Service,简称JMS)是...通过深入学习这个教程和研究源代码,开发者可以更好地理解JMS的工作方式,并能在实际项目中有效地利用JMS进行应用程序间的数据交换,提高系统的可靠性和扩展性。

    基于Java通讯开发jms源代码 (jms通讯开发源码)

    基于Java通讯开发jms源代码 (jms通讯开发源码) java,net,socket,通讯开发,jms /* * @(#)Message.java 1.60 02/04/09 * * Copyright 1997-2002 Sun Microsystems, Inc. All Rights Reserved. * * SUN ...

    spring-jms 源代码包

    spring-jmsspring-jmsspring-jmsspring-jmsspring-jmsspring-jms

    flex用到JMS代码下载

    在“flex用到JMS代码下载”这个主题中,我们讨论的是如何在Flex应用中集成JMS来实现分布式通信。以下是一些关键知识点: 1. **Flex与JMS的结合**: Flex作为客户端技术,通过AMF(Action Message Format)或HTTP等...

    JMS代码示例及配置相关资料

    JMS允许应用程序创建、发送、接收和读取消息,它是异步通信的一种方式,能够有效地解耦发送者和接收者,使得系统更加健壮和可扩展。 在JMS中,主要有两种类型的消息模型:点对点(Point-to-Point,P2P)和发布/订阅...

    JMS简单示例1

    在实际开发中,开发者会使用JMS API或者第三方库(如Spring JMS)来简化上述步骤,提高代码的可读性和可维护性。通过学习和理解JMS的基础知识,我们可以构建出稳定、可靠的分布式系统,实现不同服务间的解耦和异步...

    jms-1.1接口定义代码

    JMS允许应用程序在不关心彼此的位置、速度或执行时间的情况下进行通信,这在分布式系统中尤为重要,因为它们可以处理大量并发请求并确保数据的可靠传输。 JMS的核心概念包括以下几点: 1. **消息**:消息是数据的...

    java聊天程序代码

    Java聊天程序代码是一个基于JavaEE技术栈开发的实时通信应用,它允许用户通过网络进行文本、语音甚至视频的交流。这个完整的压缩包包含了所有必要的组件,使得开发者可以在解压后直接在特定环境中运行,比如Eclipse...

    javax.jms.jar

    Classes contained in javax.jms.jar: javax.transaction.xa.XAResource.class javax.jms.BytesMessage.class javax.jms.Message.class javax.jms.JMSException.class javax.jms.Destination.class javax.jms....

    JMS ActiveMQ演示代码

    **JMS(Java Message Service)** 是一个Java平台上的标准接口,它定义了一种统一的API,使得应用程序可以与各种消息中间件进行交互。JMS主要用于在分布式环境中传递消息,提供可靠的异步通信机制。 **ActiveMQ** ...

    weblogic中配置JMS及其测试程序

    JMS是Java平台中用于异步通信的标准API,它允许应用程序创建、发送、接收和读取消息。在WebLogic环境中配置JMS并进行测试是确保分布式系统可靠性和可扩展性的重要步骤。以下将详细讲解这个过程。 1. **JMS基本概念*...

    javax.jms-1.1.jar

    通过JMS,开发人员能够编写不依赖于特定消息服务提供商的应用程序,实现跨平台和跨应用的通信。 2. **消息模型**:JMS支持两种消息模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub...

    JMS消息服务代码(java message service)

    Java Message Service(JMS)是Java平台中用于创建、发送、接收和读取消息的应用程序接口。它提供了一种标准的方式来解耦...在实践中,理解并熟练运用JMS消息服务代码,能够有效提升Java企业级应用的开发效率和质量。

    javax.jms.jar下载

    javax.jms.BytesMessage.class javax.jms.Connection.class javax.jms.ConnectionConsumer.class javax.jms.ConnectionFactory.class javax.jms.ConnectionMetaData.class javax.jms.DeliveryMode.class javax.jms....

    jms-1.1.jar(jms工具jar包)

    `jms-1.1.jar` 是一个包含了JMS 1.1版本规范实现的库文件,是开发基于JMS的应用程序所必需的依赖。 JMS的核心概念主要包括以下几点: 1. **消息**:在JMS中,消息是数据传输的基本单元,它封装了要传递的信息。...

Global site tag (gtag.js) - Google Analytics