- 浏览: 7332256 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (1546)
- 企业中间件 (236)
- 企业应用面临的问题 (236)
- 小布Oracle学习笔记汇总 (36)
- Spring 开发应用 (54)
- IBatis开发应用 (16)
- Oracle基础学习 (23)
- struts2.0 (41)
- JVM&ClassLoader&GC (16)
- JQuery的开发应用 (17)
- WebService的开发应用 (21)
- Java&Socket (44)
- 开源组件的应用 (254)
- 常用Javascript的开发应用 (28)
- J2EE开发技术指南 (163)
- EJB3开发应用 (11)
- GIS&Mobile&MAP (36)
- SWT-GEF-RCP (52)
- 算法&数据结构 (6)
- Apache开源组件研究 (62)
- Hibernate 学习应用 (57)
- java并发编程 (59)
- MySQL&Mongodb&MS/SQL (15)
- Oracle数据库实验室 (55)
- 搜索引擎的开发应用 (34)
- 软件工程师笔试经典 (14)
- 其他杂项 (10)
- AndroidPn& MQTT&C2DM&推技术 (29)
- ActiveMQ学习和研究 (38)
- Google技术应用开发和API分析 (11)
- flex的学习总结 (59)
- 项目中一点总结 (20)
- java疑惑 java面向对象编程 (28)
- Android 开发学习 (133)
- linux和UNIX的总结 (37)
- Titanium学习总结 (20)
- JQueryMobile学习总结 (34)
- Phonegap学习总结 (32)
- HTML5学习总结 (41)
- JeeCMS研究和理解分析 (9)
最新评论
-
lgh1992314:
[u][i][b][flash=200,200][url][i ...
看看mybatis 源代码 -
尼古拉斯.fwp:
图片根本就不出来好吧。。。。。。
Android文件图片上传的详细讲解(一)HTTP multipart/form-data 上传报文格式实现手机端上传 -
ln94223:
第一个应该用排它网关吧 怎么是并行网关, 并行网关是所有exe ...
工作流Activiti的学习总结(八)Activiti自动执行的应用 -
ZY199266:
获取不到任何消息信息,请问这是什么原因呢?
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息 -
xiaoyao霄:
DestinationSourceMonitor 报错 应该导 ...
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息
使用DestinationSource监听当前的Connection中的queue和topic的个数和信息的监听的时间。
package easyway.app.activemq.demo.monitors; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.DestinationEvent; import org.apache.activemq.advisory.DestinationListener; import org.apache.activemq.advisory.DestinationSource; import org.apache.activemq.advisory.ProducerEvent; import org.apache.activemq.advisory.ProducerEventSource; import org.apache.activemq.advisory.ProducerListener; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 针对当前Conneccton中的队列的监听的使用和测试 * * 监控ActiveMQ的各种信息 * @author longgangbai */ public class DestinationSourceMonitor implements DestinationListener ,ProducerListener{ private static final transient Logger LOG = LoggerFactory.getLogger(DestinationSourceMonitor.class); protected ActiveMQConnection connection; protected ActiveMQConnectionFactory connectionFactory; protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar"); protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese"); protected Boolean useTopic=true; protected int consumerCounter; protected BrokerService broker; protected Session consumerSession1; protected Session consumerSession2; protected ProducerEventSource producerEventSource; protected List<ActiveMQDestination> newDestinations = new ArrayList<ActiveMQDestination>(); String bindAddress="tcp://localhost:61619";//ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected BlockingQueue<ProducerEvent> eventQueue = new ArrayBlockingQueue<ProducerEvent>(1000); protected ActiveMQDestination destination; /** * * 根据DestinationSource获取当前Connection中的队列的信息 * @throws Exception */ public void destiationSourceHasInitialDestinations() throws Exception { //创建DestinationSource DestinationSource destinationSource = connection.getDestinationSource(); //获取connection的p2p的队列数 Set<ActiveMQQueue> queues = destinationSource.getQueues(); //获取Connection的topic的队列数 Set<ActiveMQTopic> topics = destinationSource.getTopics(); //获取connection的p2p的队列数 Set<ActiveMQTempQueue> tmpqueues = destinationSource.getTemporaryQueues(); //获取Connection的topic的队列数 Set<ActiveMQTempTopic> tmptopics = destinationSource.getTemporaryTopics(); LOG.info("Number of Queues: " + queues.size()); LOG.info("Queues: " + queues); LOG.info("Number of topics: " + topics.size()); LOG.info("Topics: " + topics); for (ActiveMQTempTopic topic : tmptopics) { LOG.info("topic: " + topic); } LOG.info("Number of ActiveMQTempQueue: " + tmpqueues.size()); LOG.info("ActiveMQTempQueue: " + tmpqueues); LOG.info("Number of ActiveMQTempTopic: " + tmptopics.size()); LOG.info("ActiveMQTempTopic: " + tmptopics); LOG.info("The queues should not be empty!"+" ,"+!queues.isEmpty()); LOG.info("The topics should not be empty!"+" ,"+ !topics.isEmpty()); LOG.info("the connection contains initial queue: " + queues+","+queues.contains(sampleQueue)); LOG.info("the connection contains initial topic: " + queues+" ,"+topics.contains(sampleTopic)); destinationSource.start(); } /** * 针对ProductorListener的测试 * @throws Exception */ public void productorMonitor() throws Exception{ consumerSession1 = createSession(); consumerSession2 = createSession(); producerEventSource.start(); assertConsumerEvent(2, true); consumerSession1.close(); consumerSession1 = null; assertConsumerEvent(1, false); consumerSession2.close(); consumerSession2 = null; assertConsumerEvent(0, false); } protected Session createSession() throws JMSException { final String consumerText = "Consumer: " + (++consumerCounter); LOG.info("Creating consumer: " + consumerText + " on destination: " + destination); Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); return answer; } protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { ProducerEvent event = waitForProducerEvent(); LOG.info("Producer count ="+count+ ", "+event.getProducerCount()); LOG.info(" Producer started"+" ="+started+" ,"+ event.isStarted()); } protected ProducerEvent waitForProducerEvent() throws InterruptedException { ProducerEvent answer = eventQueue.poll(100000, TimeUnit.MILLISECONDS); LOG.info("Should have received a consumer event!"+" ,"+ (answer != null)); return answer; } /** * 测试的方法 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { DestinationSourceMonitor monitor=new DestinationSourceMonitor(); monitor.init(); monitor.destiationSourceHasInitialDestinations(); monitor.productorMonitor(); monitor.stopBroker(); } /** * 创建的连接 * @return * @throws Exception */ protected Connection createConnection() throws Exception { return connectionFactory.createConnection(); } /** * 创建的activemq的目的对象 * @param subject * @return */ protected ActiveMQDestination createDestination(String subject) { if (useTopic) { return new ActiveMQTopic(subject); } else { return new ActiveMQQueue(subject); } } /** * Returns the name of the destination used in this test case */ protected String getDestinationString() { return getClass().getName() + "." +"activemq"; } /** * Factory method to create a new {@link ConnectionFactory} instance * * @return a newly created connection factory */ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { return new ActiveMQConnectionFactory(bindAddress); } /** * 启动的方法 * @throws Exception */ protected void startBroker() throws Exception { broker.start(); } /** * 停止的方法 * @throws Exception */ public void stopBroker() throws Exception{ broker.stop(); } /** * @return whether or not persistence should be used */ protected boolean isPersistent() { return false; } /** * 队列的时间的方法 */ public void onDestinationEvent(DestinationEvent event) { ActiveMQDestination destination = event.getDestination(); if (event.isAddOperation()) { LOG.info("Added: " + destination); newDestinations.add(destination); } else { LOG.info("Removed: " + destination); newDestinations.remove(destination); } } /** * 初始化 * @throws Exception */ protected void init() throws Exception { if (broker == null) { broker = createBroker(); } startBroker(); connectionFactory = createConnectionFactory(); destination = createDestination(); //创建DestinationSource connection = (ActiveMQConnection) createConnection(); connection.start(); connection.getDestinationSource().setDestinationListener(this); //创建ProducerEventSource producerEventSource = new ProducerEventSource(connection, destination); producerEventSource.setProducerListener(this); producerEventSource.start(); } protected ActiveMQDestination createDestination() { return createDestination(getDestinationString()); } /** * 创建broker对象 * @return * @throws Exception */ protected BrokerService createBroker() throws Exception { broker = new BrokerService(); broker.setPersistent(isPersistent()); broker.addConnector(bindAddress); broker.setDestinations(new ActiveMQDestination[]{ sampleQueue, sampleTopic }); return broker; } /** * 关闭连接 * @throws Exception */ protected void destory() throws Exception { if (producerEventSource != null) { producerEventSource.stop(); } if (connection != null) { connection.close(); } } public void onProducerEvent(ProducerEvent event) { eventQueue.add(event); } }
发表评论
-
ActiveMQ的拦截器插件
2011-07-22 09:29 6601ActiveMQ拦截器使用和原 ... -
ActiveMQ的各种表SQL的管理
2011-07-20 20:58 3504在ActiveMQ为了方便的切换数据库,更为了深入 ... -
ActiveMQ中advisory的使用和原理
2011-07-20 18:46 2902在ActiveMQ中的监控和管理也可以通过Advisory实现 ... -
ActiveMQ的异步转发(DispatchAsync)功能
2011-07-20 11:29 47021. 消息者异步转发功能 针对正常情况下,在一个 ... -
ActiveMQ 的独占消费(Exclusive Consumer)
2011-07-20 11:26 4096我们经常希望维持队列中的消息,按一定次序转发给消息者。然而当有 ... -
ActiveMQ5.5在Tomcat6.0中部署
2011-07-19 22:27 2922在ActiveMQ中监控管理Web组件为ActiveMQCon ... -
Window 下ActiveMQ端口冲突,负载均衡,主备配置
2011-07-17 16:03 5526在Java 学习中Window操作系 ... -
ActiveMQ中消息权限策略
2011-07-17 00:31 2668在ActiveMQ发送消息的时候,可以通过MessageAut ... -
ActiveMQ和Jetty整合使用
2011-07-07 22:49 5586在ActiveMQ中的activemq.b ... -
ActiveMQ 和Commons-Daemon整合
2011-07-07 20:13 2941在一般的java项目中,如果在linu ... -
关于ActiveMQ中怎么实现一对多发送消息讨论
2011-07-07 19:50 6067无 ... -
ActiveMQ 中ActiveMQBlobMessage的接收和发送
2011-07-05 10:47 5122在ActiveMQ中对比较大的消息采用一 ... -
ActiveMQ 和JAXWS整合
2011-07-04 22:02 2212在多个系统中可能考虑到远程访问等的,采用WebServ ... -
ActiveMQ-Camel的使用
2011-07-02 10:27 10223在一个电子系统中可能接受来自不同供应商的 ... -
ActiveMQ模板和Velocity整合使用
2011-07-01 19:50 2306ActiveMQ模板使用 在ActiveMQ中AMQ ... -
ActiveMQ中消息游标
2011-06-30 18:16 2657在 ActiveMQ 5.0的之前版本中,b ... -
ActiveMQ和Tomcat的整合应用
2011-06-30 17:00 11210在ActiveMQ的资源让容器Tomcat管理时 ... -
ActiveMQ关于文件传输需要注意哪些方面?
2011-06-18 22:11 6177最近一直在关注一些文件传输中间件的实现,想用Acti ... -
关于ActiveMQ中Session和Connection资源的管理
2011-06-15 23:43 25099配置完了持久化之后,我们就可以使用代码来发送 ... -
ActiveMQ中关于文件锁的机制的学习
2011-06-14 23:31 3334在ActiveMQ中提供了文件数据库机 ...
相关推荐
项目使用springboot2.0.4搭建,一个父项目包含两个子项目:发送服务;监听服务;消息服务使用ActiveMQ 5.14.3,在docker中运行。 项目中有两种协议消息:activemq和mqtt。
在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种组件,它负责在不同服务之间异步传输数据,从而提高系统的响应速度和可扩展性。ActiveMQ是Apache出品的一款开源、高性能、支持多种协议的消息中间件...
"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步处理中的...
ActiveMQ 队列消息过期时间设置和自动清除解决方案 ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。...通过这些步骤,可以测试 ActiveMQ 队列消息过期时间设置和自动清除的解决方案。
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386
本文介绍了消息队列的基本概念和作用,重点讲解了ActiveMQ5.x的主要特性和SpringBoot2.x中如何整合及实现点对点消息的实战案例。通过对这些知识点的理解和实践,开发者能够更好地理解和应用消息队列技术,提升系统...
本报告详细阐述了使用JMeter对ActiveMQ进行性能测试的过程和结果,旨在评估ActiveMQ在JMS(Java消息服务)环境下的性能表现。JMeter作为一个强大的负载和性能测试工具,被广泛用于测试各种应用程序,包括消息中间件...
本测试工程旨在帮助开发者理解和掌握如何连接并使用ActiveMQ,包括数据的插入和读取。 首先,我们需要了解ActiveMQ的基本概念。消息队列(MQ)是一种应用程序间的通信方法,允许不同速度的应用程序和系统异步交换...
首先,我们需要理解ActiveMQ中的队列和主题概念。队列(Queue)是点对点通信模式,每个消息只能被一个消费者接收并处理,而主题(Topic)则采用发布/订阅模式,一个消息可以被多个订阅者同时接收。在使用PHP接收消息...
可以使用ActiveMQ的Web控制台监控消息队列的状态,查看消息的发送和接收情况。 通过以上步骤,你可以成功地将ActiveMQ消息队列与Spring框架整合,实现基于主题订阅的消息传递。这种整合有助于解耦系统组件,提高...
本文将深入探讨ActiveMQ中的队列(Queue)模式,包括事务、应答、转发以及MessageConsumer的receive阻塞消息处理方式。 ### 1. ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个...
在Java编程中,可以使用JMS API与ActiveMQ进行交互,创建生产者和消费者对象,定义消息的格式和内容,以及发送和接收消息。单线程多队列意味着一个线程可以同时处理多个不同的消息队列,这在并发处理和任务调度中有...
可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。
ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS...了解和掌握这些知识点,有助于面试者在面试中展示对ActiveMQ的深入理解和实际应用能力,同时也是确保在日常开发工作中正确、高效使用消息队列的重要基础。
同时,ActiveMQ提供了一个Web管理界面,可以实时查看和管理消息队列,便于监控和调试。 对于更复杂的场景,`mqttjs`库还提供了其他高级功能,如QoS(服务质量)设置,确保消息可靠传递;设置保持连接心跳,防止网络...
本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...
在IT行业中,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它实现了多种消息协议,包括JMS(Java Message Service)。在这个“activemq的queue队列模式的maven,spring的demo”中,我们将深入探讨如何...
本文将深入探讨ActiveMQ中的两种主要消息模式:队列(Queue)和主题(Topic)。 1. **队列(Queue)模式**: 队列模式遵循“发布/订阅”模型,但是一对一的。每个消息只能被一个消费者接收并处理。当一个消息被...
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。...通过理解和实践上述知识点,你将能够熟练地在SpringBoot项目中运用ActiveMQ实现消息队列的功能。
在ActiveMQ中,队列模式是一种常见且重要的使用方式,尤其对于初学者而言,它是理解消息队列概念的一个良好起点。 在JMS中,队列(Queue)是一种点对点的通信模型,每个消息只会被一个消费者接收并处理,确保了消息...