`
longgangbai
  • 浏览: 7332256 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ Conneccton中的队列的监听的使用和测试

阅读更多

   使用DestinationSource监听当前的Connection中的queue和topic的个数和信息的监听的时间。

 

package easyway.app.activemq.demo.monitors;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.DestinationEvent;
import org.apache.activemq.advisory.DestinationListener;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.advisory.ProducerEvent;
import org.apache.activemq.advisory.ProducerEventSource;
import org.apache.activemq.advisory.ProducerListener;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 针对当前Conneccton中的队列的监听的使用和测试
 * 
 * 监控ActiveMQ的各种信息
 * @author longgangbai
 */
public class DestinationSourceMonitor implements DestinationListener ,ProducerListener{
	    private static final transient Logger LOG = LoggerFactory.getLogger(DestinationSourceMonitor.class);
	    protected ActiveMQConnection connection;
	    protected ActiveMQConnectionFactory  connectionFactory;
	    protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar");
	    protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese");
	    protected Boolean useTopic=true;
	    protected int consumerCounter;
	    protected BrokerService broker;
	    protected Session consumerSession1;
	    protected Session consumerSession2;
	    protected ProducerEventSource producerEventSource;
	    protected List<ActiveMQDestination> newDestinations = new ArrayList<ActiveMQDestination>();
        String bindAddress="tcp://localhost:61619";//ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
        protected BlockingQueue<ProducerEvent> eventQueue = new ArrayBlockingQueue<ProducerEvent>(1000);
        protected ActiveMQDestination destination;
        /**
         * 
         * 根据DestinationSource获取当前Connection中的队列的信息
         * @throws Exception
         */
	    public void destiationSourceHasInitialDestinations() throws Exception {
            //创建DestinationSource
	        DestinationSource destinationSource = connection.getDestinationSource();
	        //获取connection的p2p的队列数
	        Set<ActiveMQQueue> queues = destinationSource.getQueues();
	        //获取Connection的topic的队列数
	        Set<ActiveMQTopic> topics = destinationSource.getTopics();

	        //获取connection的p2p的队列数
	        Set<ActiveMQTempQueue> tmpqueues = destinationSource.getTemporaryQueues();
	        //获取Connection的topic的队列数
	        Set<ActiveMQTempTopic> tmptopics = destinationSource.getTemporaryTopics();
	        LOG.info("Number of Queues: " + queues.size());
	        LOG.info("Queues: " + queues);
	        
	        
	        LOG.info("Number of topics: " + topics.size());
	        LOG.info("Topics: " + topics);
	        for (ActiveMQTempTopic topic : tmptopics) {
	        	 LOG.info("topic: " + topic);
			}
	        LOG.info("Number of ActiveMQTempQueue: " + tmpqueues.size());
	        LOG.info("ActiveMQTempQueue: " + tmpqueues);
	        
	        LOG.info("Number of ActiveMQTempTopic: " + tmptopics.size());
	        LOG.info("ActiveMQTempTopic: " + tmptopics);
	        
	        
	        LOG.info("The queues should not be empty!"+" ,"+!queues.isEmpty());
	        LOG.info("The topics should not be empty!"+" ,"+ !topics.isEmpty());

	        LOG.info("the connection contains initial queue: " + queues+","+queues.contains(sampleQueue));
	        LOG.info("the connection  contains initial topic: " + queues+" ,"+topics.contains(sampleTopic));
	        
	        destinationSource.start();
	    }
	    
	    /**
	     * 针对ProductorListener的测试
	     * @throws Exception
	     */
	    public void productorMonitor() throws Exception{
	    	
	         consumerSession1 = createSession();
	         
	         consumerSession2 = createSession();

	         producerEventSource.start();
	         assertConsumerEvent(2, true);

	         consumerSession1.close();
	         consumerSession1 = null;
	         assertConsumerEvent(1, false);

	         consumerSession2.close();
	         consumerSession2 = null;
	         assertConsumerEvent(0, false);
	         
	    }
	    
	    
	    protected Session createSession() throws JMSException {
	        final String consumerText = "Consumer: " + (++consumerCounter);
	        LOG.info("Creating consumer: " + consumerText + " on destination: " + destination);

	        Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	        return answer;
	    }
	    
	    
	    
	    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
	        ProducerEvent event = waitForProducerEvent();
	        LOG.info("Producer count ="+count+ ", "+event.getProducerCount());
	        LOG.info(" Producer started"+" ="+started+" ,"+ event.isStarted());
	    }


	    protected ProducerEvent waitForProducerEvent() throws InterruptedException {
	        ProducerEvent answer = eventQueue.poll(100000, TimeUnit.MILLISECONDS);
	        LOG.info("Should have received a consumer event!"+" ,"+ (answer != null));
	        return answer;
	    }

	    /**
	     * 测试的方法
	     * @param args
	     * @throws Exception
	     */
		public static void main(String[] args) throws Exception {
			DestinationSourceMonitor monitor=new DestinationSourceMonitor();
			monitor.init();
			monitor.destiationSourceHasInitialDestinations();
			monitor.productorMonitor();
			monitor.stopBroker();
		
		}
        /**
         * 创建的连接
         * @return
         * @throws Exception
         */
	    protected Connection createConnection() throws Exception {
	        return connectionFactory.createConnection();
	    }
	    /**
	     * 创建的activemq的目的对象
	     * @param subject
	     * @return
	     */
	    protected ActiveMQDestination createDestination(String subject) {
	        if (useTopic) {
	            return new ActiveMQTopic(subject);
	        } else {
	            return new ActiveMQQueue(subject);
	        }
	    }

	    /**
	     * Returns the name of the destination used in this test case
	     */
	    protected String getDestinationString() {
	        return getClass().getName() + "." +"activemq";
	    }

	    /**
	     * Factory method to create a new {@link ConnectionFactory} instance
	     * 
	     * @return a newly created connection factory
	     */
	    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
	        return new ActiveMQConnectionFactory(bindAddress);
	    }

        /**
         * 启动的方法
         * @throws Exception
         */
	    protected void startBroker() throws Exception {
	        broker.start();
	    }

        /**
         * 停止的方法
         * @throws Exception
         */
	    public void stopBroker() throws Exception{
	    	broker.stop();
	    }

	    /**
	     * @return whether or not persistence should be used
	     */
	    protected boolean isPersistent() {
	        return false;
	    }
	    /**
	     * 队列的时间的方法
	     */
	    public void onDestinationEvent(DestinationEvent event) {
	        ActiveMQDestination destination = event.getDestination();
	        if (event.isAddOperation()) {
	            LOG.info("Added:   " + destination);
	            newDestinations.add(destination);
	        }
	        else {
	            LOG.info("Removed: " + destination);
	            newDestinations.remove(destination);
	        }
	    }
        /**
         * 初始化
         * @throws Exception
         */
	    protected void init() throws Exception {
	    	
	    	   if (broker == null) {
	               broker = createBroker();
	           }
	           startBroker();

	           connectionFactory = createConnectionFactory();

	           destination = createDestination();
	           
	           
	           
	           //创建DestinationSource
	           connection = (ActiveMQConnection) createConnection();
	           connection.start();
	           connection.getDestinationSource().setDestinationListener(this);
	           
	           
	           
	           //创建ProducerEventSource
	           producerEventSource = new ProducerEventSource(connection, destination);
	           producerEventSource.setProducerListener(this);
	           producerEventSource.start();
	    }
	    
	    protected ActiveMQDestination createDestination() {
	        return createDestination(getDestinationString());
	    }
        /**
         * 创建broker对象
         * @return
         * @throws Exception
         */
	    protected BrokerService createBroker() throws Exception {
	    	broker = new BrokerService();
	    	broker.setPersistent(isPersistent());
	    	broker.addConnector(bindAddress);
	        broker.setDestinations(new ActiveMQDestination[]{
	                sampleQueue,
	                sampleTopic
	        });
	        return broker;
	    }
        /**
         * 关闭连接
         * @throws Exception
         */
	    protected void destory() throws Exception {
	        if (producerEventSource != null) {
	            producerEventSource.stop();
	        }
	        if (connection != null) {
	            connection.close();
	        }
	    }
	    public void onProducerEvent(ProducerEvent event) {
	        eventQueue.add(event);
	    }
	}

 

分享到:
评论

相关推荐

    activemq消息发送和监听

    项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。

    activemq配置组合队列(复制)、负载均衡

    在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种组件,它负责在不同服务之间异步传输数据,从而提高系统的响应速度和可扩展性。ActiveMQ是Apache出品的一款开源、高性能、支持多种协议的消息中间件...

    JMS之Spring +activeMQ实现消息队列

    "JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步处理中的...

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    ActiveMQ 队列消息过期时间设置和自动清除解决方案 ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。...通过这些步骤,可以测试 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386

    消息队列介绍和SpringBoot2.x整合RockketMQ、ActiveMQ

    本文介绍了消息队列的基本概念和作用,重点讲解了ActiveMQ5.x的主要特性和SpringBoot2.x中如何整合及实现点对点消息的实战案例。通过对这些知识点的理解和实践,开发者能够更好地理解和应用消息队列技术,提升系统...

    Jmeter测试ActiveMQ性能报告

    本报告详细阐述了使用JMeter对ActiveMQ进行性能测试的过程和结果,旨在评估ActiveMQ在JMS(Java消息服务)环境下的性能表现。JMeter作为一个强大的负载和性能测试工具,被广泛用于测试各种应用程序,包括消息中间件...

    ActiveMQ连接和使用测试工程

    本测试工程旨在帮助开发者理解和掌握如何连接并使用ActiveMQ,包括数据的插入和读取。 首先,我们需要了解ActiveMQ的基本概念。消息队列(MQ)是一种应用程序间的通信方法,允许不同速度的应用程序和系统异步交换...

    PHP过滤(selector)接收ActiveMQ的指定队列或者主题消息

    首先,我们需要理解ActiveMQ中的队列和主题概念。队列(Queue)是点对点通信模式,每个消息只能被一个消费者接收并处理,而主题(Topic)则采用发布/订阅模式,一个消息可以被多个订阅者同时接收。在使用PHP接收消息...

    ActiveMQ消息队列主题订阅Spring整合

    可以使用ActiveMQ的Web控制台监控消息队列的状态,查看消息的发送和接收情况。 通过以上步骤,你可以成功地将ActiveMQ消息队列与Spring框架整合,实现基于主题订阅的消息传递。这种整合有助于解耦系统组件,提高...

    ActiveMQ的队列queue模式(事务、应答、转发模式、阻塞消息)

    本文将深入探讨ActiveMQ中的队列(Queue)模式,包括事务、应答、转发以及MessageConsumer的receive阻塞消息处理方式。 ### 1. ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个...

    实验三 消息中间件应用开发:ActiveMQ实现单线程多队列

    在Java编程中,可以使用JMS API与ActiveMQ进行交互,创建生产者和消费者对象,定义消息的格式和内容,以及发送和接收消息。单线程多队列意味着一个线程可以同时处理多个不同的消息队列,这在并发处理和任务调度中有...

    MSMQ、RabbitMQ、ActiveMQ消息队列调试工具

    可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。

    7道消息队列ActiveMQ面试题!

    ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS...了解和掌握这些知识点,有助于面试者在面试中展示对ActiveMQ的深入理解和实际应用能力,同时也是确保在日常开发工作中正确、高效使用消息队列的重要基础。

    mqttjs(activemq测试工具)

    同时,ActiveMQ提供了一个Web管理界面,可以实时查看和管理消息队列,便于监控和调试。 对于更复杂的场景,`mqttjs`库还提供了其他高级功能,如QoS(服务质量)设置,确保消息可靠传递;设置保持连接心跳,防止网络...

    spring 整合activemq实现自定义动态消息队列

    本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...

    activemq的queue队列模式的maven,spring的demo

    在IT行业中,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它实现了多种消息协议,包括JMS(Java Message Service)。在这个“activemq的queue队列模式的maven,spring的demo”中,我们将深入探讨如何...

    ActiveMQ的队列、topic模式

    本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic)。 1. **队列(Queue)模式**: 队列模式遵循“发布/订阅”模型,但是一对一的。每个消息只能被一个消费者接收并处理。当一个消息被...

    SpringBoot快速玩转ActiveMQ消息队列

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。...通过理解和实践上述知识点,你将能够熟练地在SpringBoot项目中运用ActiveMQ实现消息队列的功能。

    ActiveMQ的队列模式

    在ActiveMQ中,队列模式是一种常见且重要的使用方式,尤其对于初学者而言,它是理解消息队列概念的一个良好起点。 在JMS中,队列(Queue)是一种点对点的通信模型,每个消息只会被一个消费者接收并处理,确保了消息...

Global site tag (gtag.js) - Google Analytics