`

简单的JMS实例,包括点对点和主题订阅

    博客分类:
  • JAVA
阅读更多
程序代码:
JMSTest.java

package com.lizongbo.jmsdemo;

import javax.jms.MessageListener;
import javax.naming.Context;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.QueueReceiver;
import javax.jms.Queue;
import java.io.Serializable;
import java.util.Properties;
import javax.jms.Message;
import javax.jms.JMSException;

/**
 <p>Title:JMS demo  </p>

 <p>Description: </p>

 <p>Copyright: Copyright (c) 2005</p>

 <p>Company: </p>

 @author lizongbo
 @version 1.0


 * Use this class to send and receive point-to-point messages.
 * To send a text message:
 * <code>
 * JMSTest jMSTest = new JMSTest();
 * jMSTest.sendText("Hello world");
 * jMSTest.close(); //Release resources
 * </code>

 * To receive a message:
 * <code>
 * JMSTest jMSTest = new JMSTest();
 * jMSTest.getQueueReceiver();
 * </code>
 */

public class JMSTest
    implements MessageListener {
  private static Context context;
  private boolean transacted = true;
  private int acknowledgementMode = javax.jms.Session.AUTO_ACKNOWLEDGE;
  private QueueConnectionFactory queueConnectionFactory;
  private QueueConnection queueConnection;
  private QueueSession queueSession;
  private QueueSender queueSender;
  private QueueReceiver queueReceiver;
  private Queue queue;
  private String queueConnectionFactoryName = "testjms";
  private String sendQueueName = "com.lizongbo.jms001";
  private String recvQueueName = "com.lizongbo.jms001";
  public void setTransacted(boolean transacted) {
    this.transacted = transacted;
  }

  public void sendObject(Serializable message) throws Exception {
    javax.jms.ObjectMessage objectMessage = getQueueSession().
        createObjectMessage();
    objectMessage.clearBody();
    objectMessage.setObject(message);
    getQueueSender().send(objectMessage);
    if (isTransacted()) {
      getQueueSession().commit();
    }
  }

  public void sendText(String message) throws Exception {
    javax.jms.TextMessage textMessage = getQueueSession().createTextMessage();
    textMessage.clearBody();
    textMessage.setText(message);
    getQueueSender().send(textMessage);
    if (isTransacted()) {
      getQueueSession().commit();
    }
  }

  public QueueReceiver getQueueReceiver() throws Exception {
    if (queueReceiver == null) {
      queueReceiver = getQueueSession().createReceiver(getRecvQueue());
      queueReceiver.setMessageListener(this);
    }
    return queueReceiver;
  }

  public QueueSender getQueueSender() throws Exception {
    if (queueSender == null) {
      queueSender = getQueueSession().createSender(getSendQueue());
    }
    return queueSender;
  }

  public Queue getRecvQueue() throws Exception {
    if (queue == null) {
      Object obj = getContext().lookup(recvQueueName);
      queue = (Queue) obj;
    }
    return queue;
  }

  public Queue getSendQueue() throws Exception {
    if (queue == null) {
      Object obj = getContext().lookup(sendQueueName);
      queue = (Queue) obj;
    }
    return queue;
  }

  public QueueSession getQueueSession() throws Exception {
    if (queueSession == null) {
      queueSession = getQueueConnection().createQueueSession(isTransacted(),
          getAcknowledgementMode());
    }
    return queueSession;
  }

  public QueueConnection getQueueConnection() throws Exception {
    if (queueConnection == null) {
      queueConnection = getQueueConnectionFactory().createQueueConnection();
      queueConnection.start();
    }
    return queueConnection;
  }

  public QueueConnectionFactory getQueueConnectionFactory() throws Exception {
    if (queueConnectionFactory == null) {
      Object obj = getContext().lookup(queueConnectionFactoryName);
      queueConnectionFactory = (QueueConnectionFactory) obj;
    }
    return queueConnectionFactory;
  }

  public void setRecvQueueName(String recvQueueName) {
    this.recvQueueName = recvQueueName;
  }

  public String getRecvQueueName() {
    return recvQueueName;
  }

  public void setSendQueueName(String sendQueueName) {
    this.sendQueueName = sendQueueName;
  }

  public String getSendQueueName() {
    return sendQueueName;
  }

  public void setQueueConnectionFactoryName(String queueConnectionFactoryName) {
    this.queueConnectionFactoryName = queueConnectionFactoryName;
  }

  public String getQueueConnectionFactoryName() {
    return queueConnectionFactoryName;
  }

  public void setAcknowledgementMode(int acknowledgementMode) {
    this.acknowledgementMode = acknowledgementMode;
  }

  public int getAcknowledgementMode() {
    return acknowledgementMode;
  }

  public boolean isTransacted() {
    return transacted;
  }

  public static void main(String[] args) throws Exception {
    JMSTest jt = new JMSTest();
    jt.sendText("test jms Text  message!!!--lizongbo " + new java.util.Date().toString());
    jt.sendText("测试jms文本消息!!!--lizongbo" + new java.util.Date().toString());
  }

  public Context getInitialContext() throws Exception {
//    String url = "t3://*.*:7001";
//    String user = null;
//    String password = null;
     String url = "t3://127.0.0.1:7001";
    String user = "weblogic";
    String password = "weblogic";

    Properties properties;
    try {
      properties = new Properties();
      properties.put(Context.INITIAL_CONTEXT_FACTORY,
                     "weblogic.jndi.WLInitialContextFactory");
      properties.put(Context.PROVIDER_URL, url);
      if (user != null) {
        properties.put(Context.SECURITY_PRINCIPAL, user);
        properties.put(Context.SECURITY_CREDENTIALS,
                       password == null ? "" : password);
      }
      return new javax.naming.InitialContext(properties);
    }
    catch (Exception e) {
      System.out.println("Unable to connect to WebLogic server at " + url);
      System.out.println("Please make sure that the server is running.");
      throw e;
    }
  }

  private Context getContext() throws Exception {
    if (context == null) {
      try {
        context = getInitialContext();
      }
      catch (Exception ex) {
        ex.printStackTrace();
        throw ex;
      }
    }
    return context;
  }

  public void onMessage(Message message) {
    if (message instanceof javax.jms.BytesMessage) {
      javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
      System.out.println("这是一个BytesMessage,内容是:" + bytesMessage);
      /** @todo Process bytesMessage here */

    }
    else if (message instanceof javax.jms.MapMessage) {
      javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message;
      System.out.println("这是一个MapMessage,内容是:" + mapMessage);
      /** @todo Process mapMessage here */
    }
    else if (message instanceof javax.jms.ObjectMessage) {
      javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message;
      System.out.println("这是一个objectMessage,内容是:" + objectMessage);
      /** @todo Process objectMessage here */
    }
    else if (message instanceof javax.jms.StreamMessage) {
      javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message;
      System.out.println("这是一个StreamMessage,内容是:" + streamMessage);
      /** @todo Process streamMessage here */
    }
    else if (message instanceof javax.jms.TextMessage) {
      javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message;
      System.out.println("这是一个TextMessage,内容是:" + objectMessage);
      /** @todo Process textMessage here */
    }
    if (isTransacted()) {
      try {
        getQueueSession().commit();
      }
      catch (Exception ex) {
        ex.printStackTrace();
      }
    }
  }

  public void close() throws Exception {
    if (queueSender != null) {
      queueSender.close();
    }
    if (queueReceiver != null) {
      queueReceiver.close();
    }
    if (queueSession != null) {
      queueSession.close();
    }
    if (queueConnection != null) {
      queueConnection.close();
    }
  }
}


JMSFetchTest.java

package com.lizongbo.jmsdemo;
/**
 * 
 <p>Title:JMS demo  </p>

 <p>Description: </p>

 <p>Copyright: Copyright (c) 2005</p>

 <p>Company: </p>

 @author lizongbo
 @version 1.0
 */

public class JMSFetchTest {
  public JMSFetchTest() {
    super();
  }

  public static void main(String[] args) throws Exception {
    JMSTest jt = new JMSTest();
    jt.getQueueReceiver();
    synchronized (jt) {
      jt.wait(10000);
    }
    System.out.println("jieshu!!!");
    //jt.sendObject("test jms message!!!--lizongbo");
    //jt.sendObject("测试jms消息!!!--lizongbo");
  }

}



JMSpublicTest.java


package com.lizongbo.jmsdemo;

import javax.jms.MessageListener;
import javax.naming.Context;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.Topic;
import java.util.Properties;
import java.io.Serializable;
import javax.jms.Message;

/**
 <p>Title:JMS demo  </p>

 <p>Description: </p>

 <p>Copyright: Copyright (c) 2005</p>

 <p>Company: </p>

 @author lizongbo
 @version 1.0


 * Use this class to publish and subscribe to messages.
 * To send a text message:
 * <code>
 * JMSpublicTest jMSpublicTest = new JMSpublicTest();
 * jMSpublicTest.publishText("Hello world");
 * jMSpublicTest.close(); //Release resources
 * </code>

 * To receive a message:
 * <code>
 * JMSpublicTest jMSpublicTest = new JMSpublicTest();
 * jMSpublicTest.getTopicSubscriber();
 * </code>
 */

public class JMSpublicTest
    implements MessageListener {
  private static Context context;
  private boolean transacted = false;
  private int acknowledgementMode = javax.jms.Session.AUTO_ACKNOWLEDGE;
  private TopicConnectionFactory topicConnectionFactory;
  private TopicConnection topicConnection;
  private TopicSession topicSession;
  private TopicPublisher topicPublisher;
  private TopicSubscriber topicSubscriber;
  private Topic topic;
  private String topicConnectionFactoryName = "testjms";
  private String publishTopicName = "com.lizongbo.jms002";
  private String subscribeTopicName = "com.lizongbo.jms002";
  private String clientId = "";
  private String durableName = "";
  private boolean durable = false;
  public void setTransacted(boolean transacted) {
    this.transacted = transacted;
  }

  public TopicSubscriber getTopicSubscriber() throws Exception {
    if (topicSubscriber == null) {
      if (isDurable()) {
        topicSubscriber = getTopicSession(true).createDurableSubscriber(
            getSubscribeTopic(), getDurableName());
      }
      else {
        topicSubscriber = getTopicSession(true).createSubscriber(
            getSubscribeTopic());
      }
      topicSubscriber.setMessageListener(this);
      getTopicConnection(true).start();
    }
    return topicSubscriber;
  }

  public TopicPublisher getTopicPublisher() throws Exception {
    if (topicPublisher == null) {
      topicPublisher = getTopicSession(false).createPublisher(getPublishTopic());
    }
    return topicPublisher;
  }

  public Topic getPublishTopic() throws Exception {
    if (topic == null) {
      Object obj = getContext().lookup(publishTopicName);
      topic = (Topic) obj;
    }
    return topic;
  }

  public TopicSession getTopicSession(boolean consumer) throws Exception {
    if (topicSession == null) {
      topicSession = getTopicConnection(consumer).createTopicSession(
          isTransacted(), getAcknowledgementMode());
    }
    return topicSession;
  }

  public TopicConnection getTopicConnection(boolean consumer) throws Exception {
    if (topicConnection == null) {
      topicConnection = getTopicConnectionFactory().createTopicConnection();


      if (isDurable() && consumer) {
        topicConnection.setClientID(clientId);
      }
      topicConnection.start();
    }
    return topicConnection;
  }

  public TopicConnectionFactory getTopicConnectionFactory() throws Exception {
    if (topicConnectionFactory == null) {
      Object obj = getContext().lookup(topicConnectionFactoryName);
      topicConnectionFactory = (TopicConnectionFactory) obj;
    }
    return topicConnectionFactory;
  }

  public void setDurable(boolean durable) {
    this.durable = durable;
  }

  public boolean isDurable() {
    return durable;
  }

  public void setDurableName(String durableName) {
    this.durableName = durableName;
  }

  public String getDurableName() {
    return durableName;
  }

  public void setClientId(String clientId) {
    this.clientId = clientId;
  }

  public String getClientId() {
    return clientId;
  }

  public void setSubscribeTopicName(String subscribeTopicName) {
    this.subscribeTopicName = subscribeTopicName;
  }

  public String getSubscribeTopicName() {
    return subscribeTopicName;
  }

  public void setPublishTopicName(String publishTopicName) {
    this.publishTopicName = publishTopicName;
  }

  public String getPublishTopicName() {
    return publishTopicName;
  }

  public void setTopicConnectionFactoryName(String topicConnectionFactoryName) {
    this.topicConnectionFactoryName = topicConnectionFactoryName;
  }

  public String getTopicConnectionFactoryName() {
    return topicConnectionFactoryName;
  }

  public void setAcknowledgementMode(int acknowledgementMode) {
    this.acknowledgementMode = acknowledgementMode;
  }

  public int getAcknowledgementMode() {
    return acknowledgementMode;
  }

  public boolean isTransacted() {
    return transacted;
  }

  public static void main(String[] args) throws InterruptedException, Exception {

    JMSpublicTest jtc = new JMSpublicTest();
    jtc.setDurable(true);
    jtc.setClientId("ip192.168.9.226");
    jtc.setDurableName("test226aa");
    jtc.getTopicSubscriber();
    //开始发布一个消息
    JMSpublicTest jtp = new JMSpublicTest();
    jtp.setDurable(true);
    jtp.setClientId("ip192.168.9249");
    jtp.setDurableName("test226aa");
    jtp.publishText("发布一个消息!!!");
    jtp.close();
    //在这里保持监听来获取消息
    synchronized (jtc) {
      jtc.wait(10000);
    }
    jtc.close();
    System.out.println(jtp.getClientId());
    System.out.println("jieshu!!!");

  }

  public Context getInitialContext() throws Exception {
//    String url = "t3://*.*:7001";
//    String user = null;
//    String password = null;
     String url = "t3://127.0.0.1:7001";
    String user = "weblogic";
    String password = "weblogic";

    Properties properties;
    try {
      properties = new Properties();
      properties.put(Context.INITIAL_CONTEXT_FACTORY,
                     "weblogic.jndi.WLInitialContextFactory");
      properties.put(Context.PROVIDER_URL, url);
      if (user != null) {
        properties.put(Context.SECURITY_PRINCIPAL, user);
        properties.put(Context.SECURITY_CREDENTIALS,
                       password == null ? "" : password);
      }
      return new javax.naming.InitialContext(properties);
    }
    catch (Exception e) {
      System.out.println("Unable to connect to WebLogic server at " + url);
      System.out.println("Please make sure that the server is running.");
      throw e;
    }
  }

  private Context getContext() throws Exception {
    if (context == null) {
      try {
        context = getInitialContext();
      }
      catch (Exception ex) {
        ex.printStackTrace();
        throw ex;
      }
    }
    return context;
  }

  public Topic getSubscribeTopic() throws Exception {
    if (topic == null) {
      Object obj = getContext().lookup(subscribeTopicName);
      topic = (Topic) obj;
    }
    return topic;
  }

  public void publishText(String message) throws Exception {
    javax.jms.TextMessage textMessage = getTopicSession(false).
        createTextMessage();
    textMessage.clearBody();
    textMessage.setText(message);
    getTopicPublisher().publish(textMessage);
    if (isTransacted()) {
      getTopicSession(false).commit();
    }
  }

  public void publishObject(Serializable message) throws Exception {
    javax.jms.ObjectMessage objectMessage = getTopicSession(false).
        createObjectMessage();
    objectMessage.clearBody();
    objectMessage.setObject(message);
    getTopicPublisher().publish(objectMessage);
    if (isTransacted()) {
      getTopicSession(false).commit();
    }
  }

  public void onMessage(Message message) {
    if (message instanceof javax.jms.BytesMessage) {
      javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
      System.out.println("这是一个BytesMessage,内容是:" + bytesMessage);
      /** @todo Process bytesMessage here */

    }
    else if (message instanceof javax.jms.MapMessage) {
      javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message;
      System.out.println("这是一个MapMessage,内容是:" + mapMessage);
      /** @todo Process mapMessage here */
    }
    else if (message instanceof javax.jms.ObjectMessage) {
      javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message;
      System.out.println("这是一个objectMessage,内容是:" + objectMessage);
      /** @todo Process objectMessage here */
    }
    else if (message instanceof javax.jms.StreamMessage) {
      javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message;
      System.out.println("这是一个StreamMessage,内容是:" + streamMessage);
      /** @todo Process streamMessage here */
    }
    else if (message instanceof javax.jms.TextMessage) {
      javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message;
      System.out.println("这是一个TextMessage,内容是:" + objectMessage);
      /** @todo Process textMessage here */
    }
    if (isTransacted()) {
      try {
        getTopicSession(false).commit();
      }
      catch (Exception ex) {
        ex.printStackTrace();
      }
    }
  }

  public void close() throws Exception {
    if (topicPublisher != null) {
      topicPublisher.close();
    }
    if (topicSubscriber != null) {
      topicSubscriber.close();
    }
    if (topicSession != null) {
      topicSession.close();
    }
    if (topicConnection != null) {
      topicConnection.close();
    }
  }
}
分享到:
评论
1 楼 hanhg 2009-02-02  
   堆积了太多的代码,缺少介绍.
   如果把介绍加进来会好些.

相关推荐

    JMS实例 实例源码 点对点 主题 同步异步

    通过以上的JMS实例源码,我们可以看到如何在Java程序中创建、发送和接收消息,以及如何选择适合的通信模型(点对点或发布/订阅)和通信方式(同步或异步)。在实际开发中,根据业务需求选择合适的模型和通信方式,...

    activeMQ-JMS实例

    JMS是一种标准接口,用于Java平台上的消息传递,提供了可靠的消息传递机制,确保消息的顺序和持久性,同时也支持点对点和发布/订阅两种消息模型。 在Spring MVC中集成ActiveMQ,我们可以利用Spring的JMS抽象层,它...

    javax.jms.jar包与JMS实例代码

    JMS主要包含两种消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe, Pub/Sub)。在点对点模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者接收,每个消息只被一个消费者处理...

    JMS消息发送及订阅

    在这个主题中,我们将深入探讨JMS消息的发送和订阅,以及如何通过Apache Camel这一集成框架来实现。 **JMS核心概念** 1. **消息**: JMS中的基本单元,它包含了要传递的数据。 2. **生产者**: 创建并发送消息的应用...

    JBOSS建立JMS应用实例

    2. 消息模型:JMS支持两种消息模型——点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)。前者基于队列,后者基于主题。 3. JMS实体:主要包括Message(消息)、MessageProducer(消息生产者)、...

    ActiveMq-JMS简单实例

    在"ActiveMq-JMS简单实例使用tomcat.doc"中,可能详细介绍了如何配置Tomcat和ActiveMQ,包括启动和停止ActiveMQ服务的步骤,以及如何通过WebConsole管理队列和主题。 接下来,JMS规范培训教程.pdf可能会涵盖JMS的...

    Java-JMS实例

    - **点对点(Point-to-Point, PTP)**:在这种模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者从该队列中取出并消费。每个消息只被一个消费者接收。 - **发布/订阅(Publish/Subscribe, Pub/Sub...

    基于WebSphere MQ发送消息的简单JMS实例

    JMS提供了两种主要的消息模型:点对点(Point-to-Point, P2P)和发布/订阅(Publish/Subscribe, Pub/Sub)。 在WebSphere MQ中,我们可以通过以下步骤创建一个简单的JMS发送消息的实例: 1. **配置WebSphere MQ**...

    java-jms小例子

    3. **消息队列与主题**:JMS提供两种类型的消息传输模式——点对点(Queue)和发布/订阅(Topic)。消息队列遵循点对点模型,每个消息仅由一个消费者接收,适合一对一的通信。消息主题遵循发布/订阅模型,多个消费者...

    ActiveMq 点对点 发布订阅demo

    在本文中,我们将深入探讨ActiveMQ中的两种主要通信模式:点对点(Point-to-Point)和发布/订阅(Publish/Subscribe),并基于提供的"ActiveMq 点对点 发布订阅demo"进行分析。 1. **点对点通信模式(Point-to-...

    Flex + BlazeDS + Java + JMS 通信实例(附源码)

    5. **消息模型**:在JMS中,有两种主要的消息模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe)。点对点模型中,消息由一个生产者发送到队列,然后由一个消费者接收;发布/订阅模型中,消息由...

    JMS和MDB的实例代码

    JMS支持两种主要的消息模式:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。 点对点模式中,消息由一个生产者发送到一个队列,然后由一个消费者接收。每个消息只被一个消费者接收,...

    应用openJMS实现JMS消息发布于订阅

    JMS定义了两种消息传递模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。在点对点模型中,消息由一个生产者发送到一个队列,然后由一个消费者接收;而在发布/订阅模型中,消息由发布...

    JMS-ActiveMQ入门实例

    实例中的"queue"文件夹可能包含了队列接收和发送的示例代码,而"topic"文件夹则对应主题的代码。在这些代码中,你会看到如何使用JMS API创建`ConnectionFactory`,以及如何通过`Connection`、`Session`、`...

    JMS公用接口

    它支持同步和异步通信,以及点对点(队列)和发布/订阅(主题)两种模式,广泛应用于分布式系统、微服务架构和事件驱动的系统中。理解并熟练使用JMS接口,能帮助开发者构建更加健壮、高效的应用程序。

    JMS与MDB介绍

    它支持点对点和发布订阅两种消息模型,分别对应消息队列和主题。JMS客户端通过ConnectionFactory创建连接,Session进行消息操作,MessageProducer发送消息,MessageConsumer接收消息。MDB(Message Driven Bean)是...

    JMS 文档实例讲解

    JMS主要包含两个通信模型:点到点(P2P)模型和发布/订阅(Pub/Sub)模型。 1. **点到点模型(P2P)** 在点到点模型中,消息通过Queue(队列)进行传输。一个消息生产者将消息放入队列,然后一个或多个消息接收者...

    简单的activemq点对点的同步消息模型

    在“简单的activemq点对点的同步消息模型”中,我们将探讨如何构建一个基本的、基于ActiveMQ的点对点消息传递系统,以及它的工作原理。 1. **点对点模型**:在JMS中,点对点模型(P2P)是一种消息传递模式,其中...

Global site tag (gtag.js) - Google Analytics