`
zhaohaolin
  • 浏览: 1017051 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ActiveMQ学习笔记----ActiveMQ和JBossMQ性能对比测试代码

    博客分类:
  • JMS
阅读更多

本文描述了对ActiveMQ进行性能测试的代码。性能测试用源代码共包含3个文件,分别是:
JMS消息发送类:ActiveMQProducer.java
JMS消息接收类:ActiveMQConsumer.java
JMS消息收发测试主类:ActiveMQTest.java
下面分别介绍这三个类。

1. JMS消息发送类 ActiveMQProducer.java 的源码如下:
/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2007</p>
 * <p>Company: </p>
 * @author mqboss
 * @version 1.0
 */

import javax.jms.*;

import org.apache.activemq.*;

public class ActiveMQProducer {
 public final static int MAX_SEND_TIMES = 100;

 private String user = ActiveMQConnection.DEFAULT_USER;

 private String password = ActiveMQConnection.DEFAULT_PASSWORD;

 private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

 private String subject = "TOOL.DEFAULT";

 private Destination destination = null;

 private Connection connection = null;

 private Session session = null;

 private MessageProducer producer = null;

 /**
  * 初始化
  * 
  * @throws JMSException
  * @throws java.lang.Exception
  */
 private void initialize() throws JMSException, Exception {
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    user, password, url);
  connection = connectionFactory.createConnection();
  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  destination = session.createQueue(subject);
  producer = session.createProducer(destination);
  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  connection.start();
 }

 /**
  * 发送消息
  * 
  * @param message
  * @throws JMSException
  * @throws java.lang.Exception
  */
 public void produceMessage(String message) throws JMSException, Exception {
  initialize();
  TextMessage msg = session.createTextMessage(message);
  
  long beginTime = System.currentTimeMillis();

  System.out.println("Producer:->Sending message: ");

  for (int i = 0; i < MAX_SEND_TIMES; i++) {
   producer.send(msg);
   if ((i + 1) % 1000 == 0) {
    System.out.println("This is the " + i + " message!");
   }
  }

  System.out.println("Producer:->Message sent complete!");
  long endTime = System.currentTimeMillis();
  long executeTime = endTime - beginTime;
  System.out.println("ActiveMQ send" + MAX_SEND_TIMES + " messages used: "
    + executeTime + " ms");
 }

 /**
  * 关闭连接
  * 
  * @throws JMSException
  */
 public void close() throws JMSException {
  System.out.println("Producer:->Closing connection");
  if (producer != null) {
   producer.close();
  }
  if (session != null) {
   session.close();
  }
  if (connection != null) {
   connection.close();
  }
 }
}


2. JMS消息接收类 ActiveMQConsumer.java 的源码如下
/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2007</p>
 * <p>Company: </p>
 * @author mqboss
 * @version 1.0
 */

import javax.jms.*;

import org.apache.activemq.*;

public class ActiveMQConsumer implements MessageListener {
 public static int RECEIVED_MSG_NUM = 0;
 long beginReceiveTime = 0;
 long endReceiveTime = 0;
 long receiveDuringTime = 0;

 private String user = ActiveMQConnection.DEFAULT_USER;

 private String password = ActiveMQConnection.DEFAULT_PASSWORD;

 private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

 private String subject = "TOOL.DEFAULT";

 private Destination destination = null;

 private Connection connection = null;

 private Session session = null;

 private MessageConsumer consumer = null;

 /**
  * 初始化
  * 
  * @throws JMSException
  * @throws java.lang.Exception
  */
 private void initialize() throws JMSException, Exception {
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    user, password, url);
  connection = connectionFactory.createConnection();
  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  destination = session.createQueue(subject);
  consumer = session.createConsumer(destination);
 }

 /**
  * 接收消息
  * 
  * @throws JMSException
  * @throws java.lang.Exception
  */
 public void consumeMessage() throws JMSException, Exception {
  initialize();
  connection.start();
  System.out.println("Consumer:->Begin listening...");
  // 设置消息监听
  consumer.setMessageListener(this);
 }

 /**
  * 关闭连接
  * 
  * @throws JMSException
  */
 public void close() throws JMSException {
  System.out.println("Consumer:->Closing connection");
  if (consumer != null) {
   consumer.close();
  }
  if (session != null) {
   session.close();
  }
  if (connection != null) {
   connection.close();
  }
 }

 /**
  * 收到消息的处理
  * 
  * @param message
  */
 public void onMessage(Message message) {
  try {
   if (message instanceof TextMessage) {

    TextMessage txtMsg = (TextMessage) message;
    String msg = txtMsg.getText();

    // receive the first message
    if (RECEIVED_MSG_NUM == 0) {
     beginReceiveTime = System.currentTimeMillis();
    }

    RECEIVED_MSG_NUM++;

    // print one String when received 1000 message
    if ((RECEIVED_MSG_NUM + 1) % 1000 == 0) {
     System.out.println("Consumer:->Received: "
       + RECEIVED_MSG_NUM);
    }

    // receive the last message
    if (RECEIVED_MSG_NUM == ActiveMQProducer.MAX_SEND_TIMES - 1) {
     endReceiveTime = System.currentTimeMillis();
     receiveDuringTime = endReceiveTime - beginReceiveTime;
     System.out.println("ActiveMQ Receive "
       + ActiveMQProducer.MAX_SEND_TIMES
       + " messages used: " + receiveDuringTime + " ms");
    }

   } else {
    System.out.println(System.currentTimeMillis()
      + "Consumer:->Received: " + message);
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}

3. JMS消息收发测试主类:ActiveMQTest.java
/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2007</p>
 * <p>Company: </p>
 * @author mqboss
 * @version 1.0
 */
import javax.jms.*;

public class ActiveMQTest {

 /**
  * 
  * @param args
  * @throws JMSException
  * @throws java.lang.Exception
  */
 public static void main(String[] args) throws JMSException, Exception {
  ActiveMQConsumer consumer = new ActiveMQConsumer();
  ActiveMQProducer producer = new ActiveMQProducer();
  char[] tempChars=new char[1024];
  for(int i=0;i<1024;i++){
   tempChars[i]='a';
  }
  String tempMsg=String.valueOf(tempChars);
  // 开始监听
  consumer.consumeMessage();
  producer.produceMessage(tempMsg);
  producer.close();

  // 延时5000ms后关闭连接
  Thread.sleep(5000);
  consumer.close();
 }
}

为了与JBossMQ进行性能对比测试,下面把JBossMQ收发消息的测试代码也一并附上。
为了保证测试代码的一致性,JBossMQ性能测试的代码也包含3个文件,分别是:
JMS消息发送类:JBossMQProducer.java
JMS消息接收类:JBossMQConsumer.java
JMS消息收发测试主类:JBossMQTest.java
下面分别介绍这三个类。

1. JMS消息发送类 JBossMQProducer.java 的源码如下:
/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2007</p>
 * <p>Company: </p>
 * @author mqboss
 * @version 1.0
 */

import java.util.Properties;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;

public class JbossMQProducer {
 public final static int MAX_SEND_TIMES = 100;

 private Destination destination = null;

 private Connection connection = null;

 private Session session = null;

 private MessageProducer producer = null;

 /**
  * 初始化
  * 
  * @throws JMSException
  * @throws java.lang.Exception
  */
 private void initialize() throws JMSException, Exception {
  Properties props = new Properties();
  props.put(Context.INITIAL_CONTEXT_FACTORY,
    "org.jnp.interfaces.NamingContextFactory");
  props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
  props.put(Context.PROVIDER_URL, "localhost:1099");

  InitialContext jmsContext = new InitialContext(props);
  ConnectionFactory connectionFactory = (ConnectionFactory) jmsContext
    .lookup("ConnectionFactory");
  connection = connectionFactory.createConnection();
  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  destination = (Queue) jmsContext.lookup("queue/A");
  producer = session.createProducer(destination);
  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  connection.start();
 }

 /**
  * 发送消息
  * 
  * @param message
  * @throws JMSException
  * @throws java.lang.Exception
  */
 public void produceMessage(String message) throws JMSException, Exception {
  initialize();
  TextMessage msg = session.createTextMessage(message);

  long beginTime = System.currentTimeMillis();

  System.out.println("Producer:->Sending message: ");

  for (int i = 0; i < MAX_SEND_TIMES; i++) {
   if ((i + 1) % 1000 == 0) {
    System.out.println("This is the " + i + " message!");
   }
   producer.send(msg);
  }

  System.out.println("Producer:->Message sent complete!");
  long endTime = System.currentTimeMillis();
  long executeTime = endTime - beginTime;
  System.out.println("JbossMQ send" + MAX_SEND_TIMES + " messages used: "
    + executeTime + " ms");
 }

 /**
  * 关闭连接
  * 
  * @throws JMSException
  */
 public void close() throws JMSException {
  System.out.println("Producer:->Closing connection");
  if (producer != null) {
   producer.close();
  }
  if (session != null) {
   session.close();
  }
  if (connection != null) {
   connection.close();
  }
 }
}

2. JMS消息接收类 JBossMQConsumer.java 的源码如下
/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2007</p>
 * <p>Company: </p>
 * @author mqboss
 * @version 1.0
 */

import java.util.Properties;

import javax.jms.*;
import javax.naming.*;

public class JbossMQConsumer implements MessageListener {
 public static int RECEIVED_MSG_NUM = 0;
 long beginReceiveTime = 0;
 long endReceiveTime = 0;
 long receiveDuringTime = 0;

 private Destination destination = null;

 private Connection connection = null;

 private Session session = null;

 private MessageConsumer consumer = null;

 /**
  * 初始化
  * 
  * @throws JMSException
  * @throws java.lang.Exception
  */
 private void initialize() throws JMSException, Exception {
  Properties props = new Properties();
  props.put(Context.INITIAL_CONTEXT_FACTORY,
    "org.jnp.interfaces.NamingContextFactory");
  props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
  props.put(Context.PROVIDER_URL, "localhost:1099");

  // get naming context
  InitialContext jmsContext = new InitialContext(props);
  ConnectionFactory connectionFactory = (ConnectionFactory) jmsContext
    .lookup("ConnectionFactory");

  connection = connectionFactory.createConnection();
  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  destination = (Queue) jmsContext.lookup("queue/A");
  consumer = session.createConsumer(destination);
 }

 /**
  * 接收消息
  * 
  * @throws JMSException
  * @throws java.lang.Exception
  */
 public void consumeMessage() throws JMSException, Exception {
  initialize();
  connection.start();
  System.out.println("Consumer:->Begin listening...");
  // 设置监听
  consumer.setMessageListener(this);
 }

 /**
  * 关闭连接
  * 
  * @throws JMSException
  */
 public void close() throws JMSException {
  System.out.println("Consumer:->Closing connection");
  if (consumer != null) {
   consumer.close();
  }
  if (session != null) {
   session.close();
  }
  if (connection != null) {
   connection.close();
  }
 }

 /**
  * 处理消息
  * 
  * @param message
  */
 public void onMessage(Message message) {
  try {
   if (message instanceof TextMessage) {

    TextMessage txtMsg = (TextMessage) message;
    String msg = txtMsg.getText();

    // receive the first message
    if (RECEIVED_MSG_NUM == 0) {
     beginReceiveTime = System.currentTimeMillis();
    }

    RECEIVED_MSG_NUM++;

    //print one String when received 1000 message
    if ((RECEIVED_MSG_NUM + 1) % 1000 == 0) {
     System.out.println("Consumer:->Received: "
       + RECEIVED_MSG_NUM);
    }

    // receive the last message
    if (RECEIVED_MSG_NUM == JbossMQProducer.MAX_SEND_TIMES - 1) {
     endReceiveTime = System.currentTimeMillis();
     receiveDuringTime = endReceiveTime - beginReceiveTime;
     System.out.println("JbossMQ receive "
       + JbossMQProducer.MAX_SEND_TIMES
       + " messages used: " + receiveDuringTime + " ms");
    }
        
   } else {
    System.out.println(System.currentTimeMillis()
      + "Consumer:->Received: " + message);
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}

3. JMS消息收发测试主类:JBossMQTest.java
import javax.jms.JMSException;

/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2007</p>
 * <p>Company: </p>
 * @author mqboss
 * @version 1.0
 */

public class JbossMQTest {

 /**
  * 
  * @param args
  * @throws JMSException
  * @throws java.lang.Exception
  */
 public static void main(String[] args) throws JMSException, Exception {
  JbossMQConsumer consumer = new JbossMQConsumer();
  JbossMQProducer producer = new JbossMQProducer();
  char[] tempChars = new char[1024];
  for (int i = 0; i < 1024; i++) {
   tempChars[i] = 'a';
  }
  String tempMsg = String.valueOf(tempChars);
  // 启动消息监听
  consumer.consumeMessage();
  producer.produceMessage(tempMsg);
  producer.close();
  // 5000ms后关闭连接
  Thread.sleep(5000);
  consumer.close();
 }
}

以上是把ActiveMQ集成到JBoss以后,对ActiveMQ和JBossMQ进行性能对比测试的源代码。
很显然,以上源代码基本一致,所以性能测试结果可以很好的说明ActiveMQ和JBossMQ的性能对比情况。

参考文献:
[1]ActiveMQ的一个简单示例

分享到:
评论

相关推荐

    activemq-protobuf-1.1-API文档-中文版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。

    activemq-cpp-library-3.9.5 编译的windows库文件,支持vs2015、vs2017

    ActiveMQ-CPP Library 3.9.5是一款专为C++开发者设计的、用于与Apache ActiveMQ集成的库,它提供了丰富的API接口,使得在C++环境中能够方便地发送和接收消息。这个版本的库文件特别针对Visual Studio 2015和2017进行...

    activemq-cpp-library-3.9.5-src.zip

    同时,完整的API文档和示例代码可以帮助快速理解和使用ActiveMQ-CPP库。 总之,ActiveMQ-CPP库3.9.5版本为C++开发者提供了强大的消息传递功能,无论是简单的点对点通信还是复杂的发布/订阅模式,都能灵活应对。通过...

    apache-activemq-5.9.0-bin

    这个“apache-activemq-5.9.0-bin”压缩包包含了Apache ActiveMQ 5.9.0版本的完整二进制文件,用于在本地或网络环境中安装和运行。 Apache ActiveMQ的核心功能包括: 1. **消息队列**:ActiveMQ支持多种消息模式,...

    apache-activemq-5.15.8-bin.zip

    这个"apache-activemq-5.15.8-bin.zip"文件包含了ActiveMQ的可执行版本,用于在本地计算机上安装和运行ActiveMQ服务。 首先,我们需要了解ActiveMQ的核心概念。它是一个消息代理,扮演着消息生产者与消费者之间的...

    activemq-all-5.2.0.jar包

    activemq-all-5.2.0.JAR包,欢迎下载。编写java中间件的时候会用到。这是activemq实现的jms中间件。希望能帮助到你。

    activemq-web-console-5.11.2

    其实activemq-web-console完全可以和activemq-broker分开来部署。 activemq-web-console包含3个apps, 1.一个是admin,用来显示和管理所有的queue、topic、connection等等。 2.一个是demo,有一些使用jms和activemq...

    apache-activemq-5.8.0-bin.zip

    这个压缩包"apache-activemq-5.8.0-bin.zip"包含了ActiveMQ 5.8.0版本的二进制发行版,供用户在本地计算机上安装和运行。 1. **Apache ActiveMQ简介** - Apache ActiveMQ是业界广泛使用的消息代理,提供可靠的消息...

    activemq-core-5.7.0-API文档-中英对照版.zip

    赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar;...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。 双语对照,边学技术、边学英语。

    apache-activemq-5.14.3-bin.zip

    这个"apache-activemq-5.14.3-bin.zip"压缩包包含了在Windows环境下部署和运行ActiveMQ所需的所有文件。让我们深入探讨一下这个版本的ActiveMQ及其在Java消息服务中的应用。 首先,Java消息服务(JMS)是一种标准...

    apache-activemq-5.13.2-bin.tar.gz

    解压后,你会得到一个名为`apache-activemq-5.13.2`的目录,里面包含了所有必要的可执行文件和配置文件。这个目录结构包括了`bin`、`conf`、`lib`等子目录,分别用于存放可执行脚本、配置文件和依赖库。 在`bin`...

    activemq-web-4.0-M3.jar.zip

    在提供的压缩包"activemq-web-4.0-M3.jar.zip"中,有两个主要文件:"activemq-web-4.0-M3.jar"和"license.txt"。"activemq-web-4.0-M3.jar"是核心的Java档案文件,包含了运行ActiveMQ Web UI所需的所有类和资源。这...

    activemq-protobuf-1.1.jar

    activemq-protobuf-1.1.jar;activemq-protobuf-1.1.jar

    activemq-core-5.7.0-API文档-中文版.zip

    赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources....人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。

    activemq-all-5.8.0.jar

    activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载

    activemq-all-5.6.0.jar

    activemq-all-5.6.0.jar activemq-all-5.6.0.jar activemq-all-5.6.0.jar activemq-all-5.6.0.jar

    activemq-all-5.2.0-jar包

    使用`activemq-all-5.2.0.jar`,开发者可以通过JMS API来创建连接、生产者、消费者和消息。此外,ActiveMQ还提供了XML配置文件,用于设置服务器、网络连接、安全策略等。 **部署和运行** 在Java环境中,将`activemq...

    activemq-protobuf-1.1-API文档-中英对照版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar;...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。 双语对照,边学技术、边学英语。

    activemq-rar-5.10.0.rar

    标题中的"activemq-rar-5.10.0.rar"指的是Apache ActiveMQ的RAR资源包,版本为5.10.0。ActiveMQ是业界广泛使用的开源消息中间件,它遵循Java Message Service (JMS) 规范,提供高可靠的消息传递服务,能够帮助应用...

    activemq-all-5.2.0-sources.jar

    activemq-all-5.2.0-sources.jar

Global site tag (gtag.js) - Google Analytics