- 浏览: 7332044 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (1546)
- 企业中间件 (236)
- 企业应用面临的问题 (236)
- 小布Oracle学习笔记汇总 (36)
- Spring 开发应用 (54)
- IBatis开发应用 (16)
- Oracle基础学习 (23)
- struts2.0 (41)
- JVM&ClassLoader&GC (16)
- JQuery的开发应用 (17)
- WebService的开发应用 (21)
- Java&Socket (44)
- 开源组件的应用 (254)
- 常用Javascript的开发应用 (28)
- J2EE开发技术指南 (163)
- EJB3开发应用 (11)
- GIS&Mobile&MAP (36)
- SWT-GEF-RCP (52)
- 算法&数据结构 (6)
- Apache开源组件研究 (62)
- Hibernate 学习应用 (57)
- java并发编程 (59)
- MySQL&Mongodb&MS/SQL (15)
- Oracle数据库实验室 (55)
- 搜索引擎的开发应用 (34)
- 软件工程师笔试经典 (14)
- 其他杂项 (10)
- AndroidPn& MQTT&C2DM&推技术 (29)
- ActiveMQ学习和研究 (38)
- Google技术应用开发和API分析 (11)
- flex的学习总结 (59)
- 项目中一点总结 (20)
- java疑惑 java面向对象编程 (28)
- Android 开发学习 (133)
- linux和UNIX的总结 (37)
- Titanium学习总结 (20)
- JQueryMobile学习总结 (34)
- Phonegap学习总结 (32)
- HTML5学习总结 (41)
- JeeCMS研究和理解分析 (9)
最新评论
-
lgh1992314:
[u][i][b][flash=200,200][url][i ...
看看mybatis 源代码 -
尼古拉斯.fwp:
图片根本就不出来好吧。。。。。。
Android文件图片上传的详细讲解(一)HTTP multipart/form-data 上传报文格式实现手机端上传 -
ln94223:
第一个应该用排它网关吧 怎么是并行网关, 并行网关是所有exe ...
工作流Activiti的学习总结(八)Activiti自动执行的应用 -
ZY199266:
获取不到任何消息信息,请问这是什么原因呢?
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息 -
xiaoyao霄:
DestinationSourceMonitor 报错 应该导 ...
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息
How can I monitor ActiveMQ
In ActiveMQ 4.x you can monitor the broker to see what destinations are being used, their activity along with connections and subscriptions using the following tools
- JMX and a JMX console such as jConsole
- The Web Console
- the Advisory Message feature (using JMS messages to monitor the system)
- The Command Agent; ActiveMQ.Agent topic that you query for status
- The Visualisation plug-in
- The Statistics plug-in (from 5.3)
package easyway.app.activemq.demo.monitors; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.management.ObjectName; import javax.management.QueryExp; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.BrokerView; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.ConnectionViewMBean; import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; import org.apache.activemq.broker.jmx.ManagedRegionBroker; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean; import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.StoreUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.TempUsage; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 监控ActiveMQ的各种信息Broker,Connection,Queue,Topic的数量和压栈和出栈 * @author longgangbai * */ public class ActiveMQMonitor { private static final transient Logger LOG = LoggerFactory.getLogger(DestinationSourceMonitor.class); protected static final int MESSAGE_COUNT = 2000; protected BrokerService brokerService; protected Connection connection; protected String bindAddress ="tcp://localhost:61619"; //ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected int topicCount; /** * 获取Broker 的AdminView对象 * @return * @throws Exception */ public BrokerViewMBean getBrokerAdmin() throws Exception { return brokerService.getAdminView(); } /** * 获取所有的QueueViewMBean的 * @return * @throws Exception */ public Collection<QueueViewMBean> getQueues() throws Exception { BrokerViewMBean broker = getBrokerAdmin(); if (broker == null) { return Collections.EMPTY_LIST; } ObjectName[] queues = broker.getQueues(); return getManagedObjects(queues, QueueViewMBean.class); } /** * 获取所有TopicViewMBean * @return * @throws Exception */ public Collection<TopicViewMBean> getTopics() throws Exception { BrokerViewMBean broker = getBrokerAdmin(); if (broker == null) { return Collections.EMPTY_LIST; } ObjectName[] queues = broker.getTopics(); return getManagedObjects(queues, TopicViewMBean.class); } /** * 获取所有DurableSubscriptionViewMBean * @return * @throws Exception */ public Collection<DurableSubscriptionViewMBean> getDurableTopicSubscribers() throws Exception { BrokerViewMBean broker = getBrokerAdmin(); if (broker == null) { return Collections.EMPTY_LIST; } ObjectName[] queues = broker.getDurableTopicSubscribers(); return getManagedObjects(queues, DurableSubscriptionViewMBean.class); } /** * 获取所有DurableSubscriptionViewMBean * @return * @throws Exception */ public Collection<DurableSubscriptionViewMBean> getInactiveDurableTopicSubscribers() throws Exception { BrokerViewMBean broker = getBrokerAdmin(); if (broker == null) { return Collections.EMPTY_LIST; } ObjectName[] queues = broker.getInactiveDurableTopicSubscribers(); return getManagedObjects(queues, DurableSubscriptionViewMBean.class); } /** * 根据queueName获取queue相关的信息 * @return * @throws Exception */ public QueueViewMBean getQueue(String name) throws Exception { return (QueueViewMBean) getDestinationByName(getQueues(), name); } /** * 根据topicName获取Topic相关的信息 * @return * @throws Exception */ public TopicViewMBean getTopic(String name) throws Exception { return (TopicViewMBean) getDestinationByName(getTopics(), name); } /** * 获取DestinationViewMBean * @return * @throws Exception */ protected DestinationViewMBean getDestinationByName( Collection<? extends DestinationViewMBean> collection, String name) { Iterator<? extends DestinationViewMBean> iter = collection.iterator(); while (iter.hasNext()) { DestinationViewMBean destinationViewMBean = iter.next(); if (name.equals(destinationViewMBean.getName())) { return destinationViewMBean; } } return null; } /** * 获取所有Mananage * @return * @throws Exception */ @SuppressWarnings("unchecked") protected <T> Collection<T> getManagedObjects(ObjectName[] names, Class<T> type) throws Exception { List<T> answer = new ArrayList<T>(); for (int i = 0; i < names.length; i++) { ObjectName name = names[i]; T value = (T) newProxyInstance(name, type, true); if (value != null) { answer.add(value); } } return answer; } /** * 获取所有ConnectionViewMBean * @return * @throws Exception */ @SuppressWarnings("unchecked") public Collection<ConnectionViewMBean> getConnections() throws Exception { String brokerName = getBrokerName(); ObjectName query = new ObjectName("org.apache.activemq:BrokerName=" + brokerName + ",Type=Connection,*"); Set<ObjectName> queryResult = queryNames(query, null); return getManagedObjects(queryResult.toArray(new ObjectName[queryResult .size()]), ConnectionViewMBean.class); } /** * 获取所有Connections * @return * @throws Exception */ @SuppressWarnings("unchecked") public Collection<String> getConnections(String connectorName) throws Exception { String brokerName = getBrokerName(); ObjectName query = new ObjectName("org.apache.activemq:BrokerName=" + brokerName + ",Type=Connection,ConnectorName=" + connectorName + ",*"); Set<ObjectName> queryResult = queryNames(query, null); Collection<String> result = new ArrayList<String>(queryResult.size()); for (ObjectName on : queryResult) { String name = StringUtils.replace(on.getKeyProperty("Connection"), "_", ":"); result.add(name); } return result; } /** * 获取所有ConnectionViewMBean * @return * @throws Exception */ @SuppressWarnings("unchecked") public ConnectionViewMBean getConnection(String connectionName) throws Exception { connectionName = StringUtils.replace(connectionName, ":", "_"); String brokerName = getBrokerName(); ObjectName query = new ObjectName("org.apache.activemq:BrokerName=" + brokerName + ",Type=Connection,*,Connection=" + connectionName); Set<ObjectName> queryResult = queryNames(query, null); if (queryResult.size() == 0) return null; ObjectName objectName = queryResult.iterator().next(); return (ConnectionViewMBean) newProxyInstance(objectName, ConnectionViewMBean.class, true); } @SuppressWarnings("unchecked") public Collection<String> getConnectors() throws Exception { String brokerName = getBrokerName(); ObjectName query = new ObjectName("org.apache.activemq:BrokerName=" + brokerName + ",Type=Connector,*"); Set<ObjectName> queryResult = queryNames(query, null); Collection<String> result = new ArrayList<String>(queryResult.size()); for (ObjectName on : queryResult) result.add(on.getKeyProperty("ConnectorName")); return result; } public ConnectorViewMBean getConnector(String name) throws Exception { String brokerName = getBrokerName(); ObjectName objectName = new ObjectName( "org.apache.activemq:BrokerName=" + brokerName + ",Type=Connector,ConnectorName=" + name); return (ConnectorViewMBean) newProxyInstance(objectName, ConnectorViewMBean.class, true); } @SuppressWarnings("unchecked") public Collection<NetworkConnectorViewMBean> getNetworkConnectors() throws Exception { String brokerName = getBrokerName(); ObjectName query = new ObjectName("org.apache.activemq:BrokerName=" + brokerName + ",Type=NetworkConnector,*"); Set<ObjectName> queryResult = queryNames(query, null); return getManagedObjects(queryResult.toArray(new ObjectName[queryResult .size()]), NetworkConnectorViewMBean.class); } public Collection<NetworkBridgeViewMBean> getNetworkBridges() throws Exception { String brokerName = getBrokerName(); ObjectName query = new ObjectName("org.apache.activemq:BrokerName=" + brokerName + ",Type=NetworkBridge,*"); Set<ObjectName> queryResult = queryNames(query, null); return getManagedObjects(queryResult.toArray(new ObjectName[queryResult .size()]), NetworkBridgeViewMBean.class); } @SuppressWarnings("unchecked") public Collection<SubscriptionViewMBean> getQueueConsumers(String queueName) throws Exception { String brokerName = getBrokerName(); queueName = StringUtils.replace(queueName, "\"", "_"); ObjectName query = new ObjectName("org.apache.activemq:BrokerName=" + brokerName + ",Type=Subscription,destinationType=Queue,destinationName=" + queueName + ",*"); Set<ObjectName> queryResult = queryNames(query, null); return getManagedObjects(queryResult.toArray(new ObjectName[queryResult .size()]), SubscriptionViewMBean.class); } @SuppressWarnings("unchecked") public Collection<SubscriptionViewMBean> getConsumersOnConnection( String connectionName) throws Exception { connectionName = StringUtils.replace(connectionName, ":", "_"); String brokerName = getBrokerName(); ObjectName query = new ObjectName("org.apache.activemq:BrokerName=" + brokerName + ",Type=Subscription,clientId=" + connectionName + ",*"); Set<ObjectName> queryResult = queryNames(query, null); return getManagedObjects(queryResult.toArray(new ObjectName[queryResult .size()]), SubscriptionViewMBean.class); } /** * 获取定时执行的队列的信息 * @return * @throws Exception */ public JobSchedulerViewMBean getJobScheduler() throws Exception { ObjectName name = getBrokerAdmin().getJMSJobScheduler(); return (JobSchedulerViewMBean) newProxyInstance(name, JobSchedulerViewMBean.class, true); } public String getBrokerName() throws Exception { return brokerService.getBrokerName(); } /** * 获取Broker对象 * @return * @throws Exception */ public Broker getBroker() throws Exception { return brokerService.getBroker(); } public ManagementContext getManagementContext() { return brokerService.getManagementContext(); } public ManagedRegionBroker getManagedBroker() throws Exception { BrokerView adminView = brokerService.getAdminView(); if (adminView == null) { return null; } return adminView.getBroker(); } public void purgeQueue(ActiveMQDestination destination) throws Exception { Set destinations = getManagedBroker().getQueueRegion().getDestinations(destination); for (Iterator i = destinations.iterator(); i.hasNext();) { Destination dest = (Destination) i.next(); if (dest instanceof Queue) { Queue regionQueue = (Queue) dest; regionQueue.purge(); } } } /** * * @param name * @param query * @return * @throws Exception */ public Set queryNames(ObjectName name, QueryExp query) throws Exception { return getManagementContext().queryNames(name, query); } /** * 通过JMX获取ActiveMQ各种信息 * @param objectName * @param interfaceClass * @param notificationBroadcaster * @return */ public Object newProxyInstance(ObjectName objectName, Class interfaceClass, boolean notificationBroadcaster) { return getManagementContext().newProxyInstance(objectName, interfaceClass, notificationBroadcaster); } /** * 监控内存信息 * @throws Exception */ public void monitorMermeryUsage() throws Exception{ SystemUsage proSystemUsage=brokerService.getProducerSystemUsage(); printSystemUsage(proSystemUsage); SystemUsage syUage=brokerService.getSystemUsage(); printSystemUsage(syUage); SystemUsage consumsyUage=brokerService.getConsumerSystemUsage(); printSystemUsage(consumsyUage); } /** * 打印内存信息 * @param syUage */ public void printSystemUsage(SystemUsage syUage){ String name=syUage.getName(); LOG.info("SystemUsage name ="+name); MemoryUsage memeryUsage =syUage.getMemoryUsage(); LOG.info("memeryUsage PercentUsage name ="+memeryUsage.getPercentUsage()); LOG.info("memeryUsage Limit name ="+memeryUsage.getLimit()); LOG.info("memeryUsage Usage name ="+memeryUsage.getUsage()); TempUsage tempUsage =syUage.getTempUsage(); LOG.info("tempUsage PercentUsage name ="+tempUsage.getPercentUsage()); LOG.info("tempUsage Limit name ="+tempUsage.getLimit()); LOG.info("tempUsage Usage name ="+tempUsage.getUsage()); StoreUsage storeUsage=syUage.getStoreUsage(); LOG.info("storeUsage PercentUsage name ="+storeUsage.getPercentUsage()); LOG.info("storeUsage Limit name ="+storeUsage.getLimit()); LOG.info("storeUsage Usage name ="+storeUsage.getUsage()); } /** * 监控消息的方法 * @throws Exception */ public void monitorQueueAndTopic() throws Exception{ LOG.info("==========Connection ================="); Collection<ConnectionViewMBean> conVBean=getConnections(); for (ConnectionViewMBean bean : conVBean) { LOG.info("remoteAddress:"+bean.getRemoteAddress()); LOG.info("isActive:"+bean.isActive()); LOG.info("isConnected:"+bean.isConnected()); } LOG.info("=============Topic ================="); Collection<TopicViewMBean> topicVBean=getTopics(); for (TopicViewMBean topicbean : topicVBean) { LOG.info("beanName ="+topicbean.getName()); LOG.info("ConsumerCount ="+topicbean.getConsumerCount()); LOG.info("DequeueCount ="+topicbean.getDequeueCount()); LOG.info("EnqueueCount ="+topicbean.getEnqueueCount()); LOG.info("DispatchCount ="+topicbean.getDispatchCount()); LOG.info("ExpiredCount ="+topicbean.getExpiredCount()); LOG.info("MaxEnqueueTime ="+topicbean.getMaxEnqueueTime()); LOG.info("ProducerCount ="+topicbean.getProducerCount()); LOG.info("MemoryPercentUsage ="+topicbean.getMemoryPercentUsage()); LOG.info("MemoryLimit ="+topicbean.getMemoryLimit()); } LOG.info("============Queue==================="); Collection<QueueViewMBean> queuqVBeanList=getQueues(); for (QueueViewMBean queuebean : queuqVBeanList) { LOG.info(" queue beanName ="+queuebean.getName()); LOG.info("ConsumerCount ="+queuebean.getConsumerCount()); LOG.info("DequeueCount ="+queuebean.getDequeueCount()); LOG.info("EnqueueCount ="+queuebean.getEnqueueCount()); LOG.info("DispatchCount ="+queuebean.getDispatchCount()); LOG.info("ExpiredCount ="+queuebean.getExpiredCount()); LOG.info("MaxEnqueueTime ="+queuebean.getMaxEnqueueTime()); LOG.info("ProducerCount ="+queuebean.getProducerCount()); LOG.info("MemoryPercentUsage ="+queuebean.getMemoryPercentUsage()); LOG.info("MemoryLimit ="+queuebean.getMemoryLimit()); } } public void test() throws Exception{ //获取初始化信息 init(); for (int i = 0; i < 10; i++) { sendTopic(i); } for (int i = 0; i < 10; i++) { sendPS(i); } monitorQueueAndTopic(); Thread.sleep(5000); receiveTopic(); receivePS(); } /** * P2P发送方式 * @throws JMSException */ public void sendTopic(int i) throws JMSException{ Session session=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); Destination topic=session.createQueue("activemq.queue"+i); MessageProducer productor=(MessageProducer) session.createProducer(topic); TextMessage txtMessage =session.createTextMessage(); txtMessage.setText("this is a topic message "+i); productor.send(txtMessage); } /** * Sub/Pub发送方式 * @throws JMSException */ public void sendPS(int i) throws JMSException{ Session session=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); Destination topic=session.createTopic("activemq.topic"+i); MessageProducer productor=(MessageProducer) session.createProducer(topic); TextMessage txtMessage =session.createTextMessage(); txtMessage.setText("this is a topic message "+i); productor.send(txtMessage); } /** * P2P接受方式 * @throws JMSException */ public void receiveTopic() throws JMSException{ Session session=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); Destination topic=session.createQueue("activemq.queue"); MessageConsumer consumer=(MessageConsumer) session.createConsumer(topic); TextMessage txtMessage =(TextMessage)consumer.receive(); System.out.println("txtMessage ="+txtMessage.getText()); } /** * Sub/Pub接受方式 * @throws JMSException */ public void receivePS() throws JMSException{ Session session=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); Destination topic=session.createQueue("activemq.topic"); MessageConsumer consumer=(MessageConsumer) session.createConsumer(topic); TextMessage txtMessage =(TextMessage)consumer.receive(); System.out.println("txtMessage ="+txtMessage.getText()); } /** * 初始化消息的方法 * @throws Exception */ protected void init() throws Exception { if (brokerService == null) { brokerService = createBroker(); } ActiveMQConnectionFactory factory = createConnectionFactory(); connection = factory.createConnection(); //添加Connection 的状态监控的方法 monitorConnection(connection); //启动连接 connection.start(); } /** * 监控台ActiveMQConnection的状态的方法 * @param connection */ public void monitorConnection(Connection connection){ ActiveMQConnection activemqconnection =(ActiveMQConnection)connection; //添加ActiveMQConnection的监听类 activemqconnection.addTransportListener(new TransportListener(){ public void onCommand(Object object) { LOG.info("onCommand object "+object); } public void onException(IOException ex) { LOG.info("onException ="+ex.getMessage()); } public void transportInterupted() { LOG.info("transportInterupted ="); } public void transportResumed() { LOG.info("transportResumed ........."); } }); } protected void destory() throws Exception { connection.close(); if (brokerService != null) { brokerService.stop(); } } /** * 创建ActiveMQConnectionFactory * @return * @throws Exception */ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( bindAddress); return cf; } /*** * 创建一个Broker监听进程 * @return * @throws Exception */ protected BrokerService createBroker() throws Exception { //创建BrokerService对象 BrokerService answer = new BrokerService(); //配置监听相关的信息 configureBroker(answer); //启动Broker的启动 answer.start(); return answer; } /** * 配置Broker * @param answer * @throws Exception */ protected void configureBroker(BrokerService answer) throws Exception { //创建持久化信息 answer.setPersistent(false); //设置采用JMX管理 answer.setUseJmx(true); ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); strategy.setLimit(10); PolicyEntry tempQueueEntry = createPolicyEntry(strategy); tempQueueEntry.setTempQueue(true); PolicyEntry tempTopicEntry = createPolicyEntry(strategy); tempTopicEntry.setTempTopic(true); PolicyMap pMap = new PolicyMap(); final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>(); policyEntries.add(tempQueueEntry); policyEntries.add(tempTopicEntry); pMap.setPolicyEntries(policyEntries); answer.setDestinationPolicy(pMap); //绑定url answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true); } /** * 创建一个配置策略 * @param strategy * @return */ private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) { PolicyEntry policy = new PolicyEntry(); policy.setAdvisdoryForFastProducers(true); policy.setAdvisoryForConsumed(true); policy.setAdvisoryForDelivery(true); policy.setAdvisoryForDiscardingMessages(true); policy.setAdvisoryForSlowConsumers(true); policy.setAdvisoryWhenFull(true); policy.setProducerFlowControl(false); policy.setPendingMessageLimitStrategy(strategy); return policy; } public void object2string(Object object ){ ToStringBuilder.reflectionToString(object, ToStringStyle.MULTI_LINE_STYLE); } }
package easyway.app.activemq.demo.monitors; public class ActiveMQMonitorTest { public static void main(String[] args) throws Exception { ActiveMQMonitor monitor=new ActiveMQMonitor(); monitor.test(); } }
评论
17 楼
ZY199266
2018-01-31
获取不到任何消息信息,请问这是什么原因呢?
16 楼
xiaoyao霄
2018-01-29
DestinationSourceMonitor 报错 应该导入那个包
15 楼
zzd0058
2017-07-26
你好我想知道getConnections为啥是空的
14 楼
longgangbai
2012-07-16
myfwz 写道
你有没有做过类似生产的速度与消费者(一或多个)的速度对比的工具。
没有响应的工具
13 楼
longgangbai
2012-07-16
myfwz 写道
1099这个端口在哪个配置文件里面?需要手动执行activemq-admin start来开启这个JMs监控端口是吗?
actviemq的admin管理端 好像有一个bat文件可以编辑的。你修改配置一下
12 楼
myfwz
2012-07-10
你有没有做过类似生产的速度与消费者(一或多个)的速度对比的工具。
11 楼
myfwz
2012-07-10
1099这个端口在哪个配置文件里面?需要手动执行activemq-admin start来开启这个JMs监控端口是吗?
10 楼
longgangbai
2012-07-10
myfwz 写道
我在本地运行的时候,不管是连我自己的MQ,还是远程的MQ,都没法运行起来。
本地我用netstat看过了,远程也一样使用netsat -ntl看过了,没有起1099这个端口。
另外:1099这个端口是ActiveMQ启动的时候拉起来的,还是监控程序设置的一个端口?
本地我用netstat看过了,远程也一样使用netsat -ntl看过了,没有起1099这个端口。
另外:1099这个端口是ActiveMQ启动的时候拉起来的,还是监控程序设置的一个端口?
activemq启动时候的使用的端口。
默认的是tcp://localhost:61616
1099为jms监控端口:
activemq-admin stop --jmxurl service:jmx:rmi:///jndi/rmi://remotehost:1099/jmxrmi --all
9 楼
longgangbai
2012-07-10
myfwz 写道
呵呵,非常感谢你的回复,只是我想了解一下,你在运行这个类的时候是连接本地MQ还是远程的MQ,是否需要在MQ上面做相应的设置,如果有的话,麻烦提供相应的配置文件给我学习下。
activemq安装之后有一个web版你发送多个消息,不要点击接受,之后,开启这个程序运行。我当时是这样测试的。
8 楼
myfwz
2012-07-10
我在本地运行的时候,不管是连我自己的MQ,还是远程的MQ,都没法运行起来。
本地我用netstat看过了,远程也一样使用netsat -ntl看过了,没有起1099这个端口。
另外:1099这个端口是ActiveMQ启动的时候拉起来的,还是监控程序设置的一个端口?
本地我用netstat看过了,远程也一样使用netsat -ntl看过了,没有起1099这个端口。
另外:1099这个端口是ActiveMQ启动的时候拉起来的,还是监控程序设置的一个端口?
7 楼
myfwz
2012-07-10
呵呵,非常感谢你的回复,只是我想了解一下,你在运行这个类的时候是连接本地MQ还是远程的MQ,是否需要在MQ上面做相应的设置,如果有的话,麻烦提供相应的配置文件给我学习下。
6 楼
longgangbai
2012-07-10
myfwz 写道
1099这个端口是ActiveMQ提供出来的吗?我的电脑重启后,再去连接也会报这个错呢。
java.rmi.server.ExportException: Port already in use: 1099; nested exception is:
java.net.BindException: Address already in use: JVM_Bind
这句话的中文意思,端口1099已经使用,JVM_Bind已经使用该地址。
5 楼
myfwz
2012-07-09
1099这个端口是ActiveMQ提供出来的吗?我的电脑重启后,再去连接也会报这个错呢。
4 楼
longgangbai
2012-07-04
myfwz 写道
你好,我在本地运行一个ActiveMQ,再运行这个程序报了以下异常:
(1 ms) [main] DEBUG: org.apache.activemq.broker.jmx.ManagementContext#createConnector : Failed to create local registry
java.rmi.server.ExportException: Port already in use: 1099; nested exception is:
java.net.BindException: Address already in use: JVM_Bind
at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:310)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:218)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:393)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:129)
at sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:190)
at sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:92)
at sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:68)
at java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:222)
(1 ms) [main] DEBUG: org.apache.activemq.broker.jmx.ManagementContext#createConnector : Failed to create local registry
java.rmi.server.ExportException: Port already in use: 1099; nested exception is:
java.net.BindException: Address already in use: JVM_Bind
at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:310)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:218)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:393)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:129)
at sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:190)
at sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:92)
at sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:68)
at java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:222)
JMS对应的本地注册的本地端口意见使用,netstat -a | find "1099" 查看那个程序占用,或者重启机器,再试!
3 楼
myfwz
2012-06-25
你好,我在本地运行一个ActiveMQ,再运行这个程序报了以下异常:
(1 ms) [main] DEBUG: org.apache.activemq.broker.jmx.ManagementContext#createConnector : Failed to create local registry
java.rmi.server.ExportException: Port already in use: 1099; nested exception is:
java.net.BindException: Address already in use: JVM_Bind
at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:310)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:218)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:393)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:129)
at sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:190)
at sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:92)
at sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:68)
at java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:222)
(1 ms) [main] DEBUG: org.apache.activemq.broker.jmx.ManagementContext#createConnector : Failed to create local registry
java.rmi.server.ExportException: Port already in use: 1099; nested exception is:
java.net.BindException: Address already in use: JVM_Bind
at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:310)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:218)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:393)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:129)
at sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:190)
at sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:92)
at sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:68)
at java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:222)
2 楼
longgangbai
2012-05-25
962685987962685987 写道
你好,我想问一下,我在本地搭建的 activeMQ WEB 界面中 发送了几个消息(queue),但是通过上面的程序却无法获取消息信息 ,即是说:getQueues() 获取不到任何消息信息,请问这是什么原因呢?
你可能要检查你的activemq配置是否支持jmx,如果不支持是监控不到的,上面的demo是基于jmx实现的,请检查相关的配置。
1 楼
962685987962685987
2012-05-24
你好,我想问一下,我在本地搭建的 activeMQ WEB 界面中 发送了几个消息(queue),但是通过上面的程序却无法获取消息信息 ,即是说:getQueues() 获取不到任何消息信息,请问这是什么原因呢?
发表评论
-
ActiveMQ的拦截器插件
2011-07-22 09:29 6601ActiveMQ拦截器使用和原 ... -
ActiveMQ的各种表SQL的管理
2011-07-20 20:58 3504在ActiveMQ为了方便的切换数据库,更为了深入 ... -
ActiveMQ中advisory的使用和原理
2011-07-20 18:46 2902在ActiveMQ中的监控和管理也可以通过Advisory实现 ... -
ActiveMQ的异步转发(DispatchAsync)功能
2011-07-20 11:29 47021. 消息者异步转发功能 针对正常情况下,在一个 ... -
ActiveMQ 的独占消费(Exclusive Consumer)
2011-07-20 11:26 4093我们经常希望维持队列中的消息,按一定次序转发给消息者。然而当有 ... -
ActiveMQ5.5在Tomcat6.0中部署
2011-07-19 22:27 2922在ActiveMQ中监控管理Web组件为ActiveMQCon ... -
Window 下ActiveMQ端口冲突,负载均衡,主备配置
2011-07-17 16:03 5526在Java 学习中Window操作系 ... -
ActiveMQ中消息权限策略
2011-07-17 00:31 2668在ActiveMQ发送消息的时候,可以通过MessageAut ... -
ActiveMQ和Jetty整合使用
2011-07-07 22:49 5585在ActiveMQ中的activemq.b ... -
ActiveMQ 和Commons-Daemon整合
2011-07-07 20:13 2940在一般的java项目中,如果在linu ... -
关于ActiveMQ中怎么实现一对多发送消息讨论
2011-07-07 19:50 6067无 ... -
ActiveMQ 中ActiveMQBlobMessage的接收和发送
2011-07-05 10:47 5121在ActiveMQ中对比较大的消息采用一 ... -
ActiveMQ 和JAXWS整合
2011-07-04 22:02 2212在多个系统中可能考虑到远程访问等的,采用WebServ ... -
ActiveMQ-Camel的使用
2011-07-02 10:27 10223在一个电子系统中可能接受来自不同供应商的 ... -
ActiveMQ模板和Velocity整合使用
2011-07-01 19:50 2306ActiveMQ模板使用 在ActiveMQ中AMQ ... -
ActiveMQ中消息游标
2011-06-30 18:16 2657在 ActiveMQ 5.0的之前版本中,b ... -
ActiveMQ和Tomcat的整合应用
2011-06-30 17:00 11209在ActiveMQ的资源让容器Tomcat管理时 ... -
ActiveMQ关于文件传输需要注意哪些方面?
2011-06-18 22:11 6176最近一直在关注一些文件传输中间件的实现,想用Acti ... -
关于ActiveMQ中Session和Connection资源的管理
2011-06-15 23:43 25099配置完了持久化之后,我们就可以使用代码来发送 ... -
ActiveMQ中关于文件锁的机制的学习
2011-06-14 23:31 3334在ActiveMQ中提供了文件数据库机 ...
相关推荐
通过JMX,可以创建和管理QUEUE和TOPIC。 1. 连接JMX:使用JConsole或JMX Console等工具连接到运行中的ActiveMQ服务器。 2. 查找MBean:在MBean浏览器中找到ActiveMQ相关的MBeans,如`org.apache.activemq:type=...
- **定义:** 配置JMX以获取ActiveMQ的相关监控信息。 - **步骤:** 在 `conf/activemq.xml` 文件中启用JMX监控。 **10.2 JMX 的 JAVA 端获取信息:** - **定义:** 通过Java客户端访问JMX接口来获取监控数据。 - **...
2. JMX:通过Java管理扩展(JMX)接口进行远程管理。 3. REST API:ActiveMQ还提供了RESTful API,便于集成到其他系统中。 七、安全配置 1. 用户认证:配置users.properties和groups.properties文件,设置用户和...
- 可以通过ActiveMQ的Web管理界面或JMX监控工具来查看消息队列的状态,如消息数量、消费者状态等。 综上所述,Spring Boot与ActiveMQ的集成为我们的应用带来了强大的消息处理能力,使得我们可以构建出更健壮、可...
同时,ActiveMQ还具有强大的监控和管理工具,通过JMX或者Web管理控制台,可以实时查看和调整队列、主题的状态。 总结来说,"ActiveMQ实例Demo"为我们展示了如何利用ActiveMQ构建消息传递系统,无论是点对点还是发布...
3. **Management Clients**: 使用JMX API,开发人员可以编写客户端应用程序来远程访问和操作MBean,从而监控和管理ActiveMQ服务器。 4. **Notifications**: MBeans可以发送通知事件,这些事件可以被注册的监听器...
另外,ActiveMQ 还提供了 JMX(Java Management Extensions)管理工具,方便监控和管理消息代理的状态。 总之,ActiveMQ 是一个强大的消息中间件,具有丰富的特性和广泛的协议支持,可以帮助开发者构建健壮、高效的...
此外,Spring还支持JMX(Java Management Extensions),可以通过JMX管理ActiveMQ组件。 整合ActiveMQ与Spring有助于构建可扩展、高可用的分布式系统。通过合理配置和使用消息队列,可以提高系统的响应速度,降低...
4. **监控与日志**:定期查看日志,利用监控工具如JMX进行性能监控。 六、实战应用 1. **微服务间通信**:在微服务架构中,ActiveMQ可以作为服务间通信的桥梁。 2. **任务调度**:将任务消息放入队列,由后台任务...
ActiveMQ 提供了丰富的配置选项和特性,比如通过网络集群实现高可用性,通过策略控制消息分发,以及通过 JMX 进行监控等。 总结来说,ActiveMQ 是一个强大的消息中间件,提供了灵活的消息传递模型和丰富的功能,...
- 通过JMX(Java Management Extensions)进行远程管理和监控。 这个Java项目代码应该包括了创建连接、发送和接收消息的示例,涵盖了队列和主题的基本操作。通过运行这些代码,你可以更好地理解ActiveMQ的工作原理...
此外,还可以通过JMX工具进行远程管理和监控。 7. **高级特性** ActiveMQ提供了一些高级特性,如持久化、消息分页、优先级、时间戳、事务支持和死信队列等,以提高系统的可靠性和灵活性。例如,持久化确保即使在...
此外,ActiveMQ还支持各种高级特性,如持久化消息、优先级、消息分页、消息组、事务等,以及通过JMX进行管理和监控。对于大型分布式系统,这些特性有助于提高系统的可靠性和性能。 总的来说,本示例旨在演示如何...
- **API和集成**:ActiveMQ可以与其他编程语言(如.NET、Python等)集成,并通过JMX进行管理。 5. **高级用法**: - **消息优先级**:允许指定消息优先级,确保关键任务先处理。 - **消息分页**:当队列或主题中...
6. **管理工具与监控**:ActiveMQ提供了Web控制台,用户可以通过浏览器界面进行配置、监控和管理消息队列。此外,还可以通过JMX(Java Management Extensions)进行远程管理。 7. **安全与认证**:ActiveMQ支持多种...
同时,可以通过监控工具,如Prometheus和Grafana,对ActiveMQ的性能进行实时监控。 **五、高级特性** 1. **消息选择器(Message Selector)**:允许消费者仅接收满足特定条件的消息。 2. **事务(Transactions)*...
9. **JMX(Java Management Extensions)管理**:ActiveMQ API提供JMX接口,允许管理员监控和管理消息代理的状态,如查看队列长度、消费者数量等。 10. **安全与认证**:ActiveMQ API支持用户身份验证和权限控制,...
- 通过JMX(Java Management Extensions)接口,可以远程监控和管理ActiveMQ。 8. **最佳实践** - 适当配置消费者的并发数,避免资源浪费和消息堆积。 - 注意消息的大小和数量,避免内存溢出。 - 根据业务场景...
- **管理工具**:提供Web控制台和JMX(Java Management Extensions)管理接口,方便监控和管理队列、主题和连接。 **3. 安装与配置** 下载并解压"apache-activemq-5.15.3-bin.zip"后,会得到包含bin、conf、lib等...
6. **管理工具**:ActiveMQ提供了Web控制台和JMX管理接口,便于监控和管理消息队列、连接和策略。 7. **灵活性**:可以部署在各种环境,包括云、容器和传统服务器,适应不同的IT架构需求。 在压缩包中,"output....