`
longgangbai
  • 浏览: 7338828 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ 通过JMX监控Connection,Queue,Topic的信息

阅读更多

How can I monitor ActiveMQ

In ActiveMQ 4.x you can monitor the broker to see what destinations are being used, their activity along with connections and subscriptions using the following tools

 

package easyway.app.activemq.demo.monitors;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import javax.management.QueryExp;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectionViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 监控ActiveMQ的各种信息Broker,Connection,Queue,Topic的数量和压栈和出栈
 * @author longgangbai
 *
 */
public class ActiveMQMonitor {
	   private static final transient Logger LOG = LoggerFactory.getLogger(DestinationSourceMonitor.class);
		
	    protected static final int MESSAGE_COUNT = 2000;
	    protected BrokerService brokerService;
	    protected Connection connection;
	    protected String bindAddress ="tcp://localhost:61619";
	    //ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
	    protected int topicCount;

        /**
         * 获取Broker 的AdminView对象
         * @return
         * @throws Exception
         */
    	public BrokerViewMBean getBrokerAdmin() throws Exception {
    		return brokerService.getAdminView();
    	}
    	/**
    	 * 获取所有的QueueViewMBean的
    	 * @return
    	 * @throws Exception
    	 */
    	public Collection<QueueViewMBean> getQueues() throws Exception {
    		BrokerViewMBean broker = getBrokerAdmin();
    		if (broker == null) {
    			return Collections.EMPTY_LIST;
    		}
    		ObjectName[] queues = broker.getQueues();
    		return getManagedObjects(queues, QueueViewMBean.class);
    	}
    	
    	
        /**
         * 获取所有TopicViewMBean
         * @return
         * @throws Exception
         */
    	public Collection<TopicViewMBean> getTopics() throws Exception {
    		BrokerViewMBean broker = getBrokerAdmin();
    		if (broker == null) {
    			return Collections.EMPTY_LIST;
    		}
    		ObjectName[] queues = broker.getTopics();
    		return getManagedObjects(queues, TopicViewMBean.class);
    	}
        /**
         * 获取所有DurableSubscriptionViewMBean
         * @return
         * @throws Exception
         */
    	public Collection<DurableSubscriptionViewMBean> getDurableTopicSubscribers()
    			throws Exception {
    		BrokerViewMBean broker = getBrokerAdmin();
    		if (broker == null) {
    			return Collections.EMPTY_LIST;
    		}
    		ObjectName[] queues = broker.getDurableTopicSubscribers();
    		return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
    	}
        /**
         * 获取所有DurableSubscriptionViewMBean
         * @return
         * @throws Exception
         */
    	public Collection<DurableSubscriptionViewMBean> getInactiveDurableTopicSubscribers()
    			throws Exception {
    		BrokerViewMBean broker = getBrokerAdmin();
    		if (broker == null) {
    			return Collections.EMPTY_LIST;
    		}
    		ObjectName[] queues = broker.getInactiveDurableTopicSubscribers();
    		return getManagedObjects(queues, DurableSubscriptionViewMBean.class);
    	}
        /**
         * 根据queueName获取queue相关的信息
         * @return
         * @throws Exception
         */
    	public QueueViewMBean getQueue(String name) throws Exception {
    		return (QueueViewMBean) getDestinationByName(getQueues(), name);
    	}
        /**
         * 根据topicName获取Topic相关的信息
         * @return
         * @throws Exception
         */
    	public TopicViewMBean getTopic(String name) throws Exception {
    		return (TopicViewMBean) getDestinationByName(getTopics(), name);
    	}
        /**
         * 获取DestinationViewMBean
         * @return
         * @throws Exception
         */
    	protected DestinationViewMBean getDestinationByName(
    			Collection<? extends DestinationViewMBean> collection, String name) {
    		Iterator<? extends DestinationViewMBean> iter = collection.iterator();
    		while (iter.hasNext()) {
    			DestinationViewMBean destinationViewMBean = iter.next();
    			if (name.equals(destinationViewMBean.getName())) {
    				return destinationViewMBean;
    			}
    		}
    		return null;
    	}
        /**
         * 获取所有Mananage
         * @return
         * @throws Exception
         */
    	@SuppressWarnings("unchecked")
    	protected <T> Collection<T> getManagedObjects(ObjectName[] names,
    			Class<T> type) throws Exception {
    		List<T> answer = new ArrayList<T>();
    		for (int i = 0; i < names.length; i++) {
    			ObjectName name = names[i];
    			T value = (T) newProxyInstance(name, type, true);
    			if (value != null) {
    				answer.add(value);
    			}
    		}
    		return answer;
    	}
        /**
         * 获取所有ConnectionViewMBean
         * @return
         * @throws Exception
         */
    	@SuppressWarnings("unchecked")
    	public Collection<ConnectionViewMBean> getConnections() throws Exception {
    		String brokerName = getBrokerName();
    		ObjectName query = new ObjectName("org.apache.activemq:BrokerName="
    				+ brokerName + ",Type=Connection,*");
    		Set<ObjectName> queryResult = queryNames(query, null);
    		return getManagedObjects(queryResult.toArray(new ObjectName[queryResult
    				.size()]), ConnectionViewMBean.class);
    	}
        /**
         * 获取所有Connections
         * @return
         * @throws Exception
         */
    	@SuppressWarnings("unchecked")
    	public Collection<String> getConnections(String connectorName)
    			throws Exception {
    		String brokerName = getBrokerName();
    		ObjectName query = new ObjectName("org.apache.activemq:BrokerName="
    				+ brokerName + ",Type=Connection,ConnectorName="
    				+ connectorName + ",*");
    		Set<ObjectName> queryResult = queryNames(query, null);
    		Collection<String> result = new ArrayList<String>(queryResult.size());
    		for (ObjectName on : queryResult) {
    			String name = StringUtils.replace(on.getKeyProperty("Connection"),
    					"_", ":");
    			result.add(name);
    		}
    		return result;
    	}
        /**
         * 获取所有ConnectionViewMBean
         * @return
         * @throws Exception
         */
    	@SuppressWarnings("unchecked")
    	public ConnectionViewMBean getConnection(String connectionName)
    			throws Exception {
    		connectionName = StringUtils.replace(connectionName, ":", "_");
    		String brokerName = getBrokerName();
    		ObjectName query = new ObjectName("org.apache.activemq:BrokerName="
    				+ brokerName + ",Type=Connection,*,Connection="
    				+ connectionName);
    		Set<ObjectName> queryResult = queryNames(query, null);
    		if (queryResult.size() == 0)
    			return null;
    		ObjectName objectName = queryResult.iterator().next();
    		return (ConnectionViewMBean) newProxyInstance(objectName,
    				ConnectionViewMBean.class, true);
    	}

    	@SuppressWarnings("unchecked")
    	public Collection<String> getConnectors() throws Exception {
    		String brokerName = getBrokerName();
    		ObjectName query = new ObjectName("org.apache.activemq:BrokerName="
    				+ brokerName + ",Type=Connector,*");
    		Set<ObjectName> queryResult = queryNames(query, null);
    		Collection<String> result = new ArrayList<String>(queryResult.size());
    		for (ObjectName on : queryResult)
    			result.add(on.getKeyProperty("ConnectorName"));
    		return result;
    	}

    	public ConnectorViewMBean getConnector(String name) throws Exception {
    		String brokerName = getBrokerName();
    		ObjectName objectName = new ObjectName(
    				"org.apache.activemq:BrokerName=" + brokerName
    						+ ",Type=Connector,ConnectorName=" + name);
    		return (ConnectorViewMBean) newProxyInstance(objectName,
    				ConnectorViewMBean.class, true);
    	}

    	@SuppressWarnings("unchecked")
    	public Collection<NetworkConnectorViewMBean> getNetworkConnectors()
    			throws Exception {
    		String brokerName = getBrokerName();
    		
    		ObjectName query = new ObjectName("org.apache.activemq:BrokerName="
    				+ brokerName + ",Type=NetworkConnector,*");
    		Set<ObjectName> queryResult = queryNames(query, null);
    		return getManagedObjects(queryResult.toArray(new ObjectName[queryResult
    				.size()]), NetworkConnectorViewMBean.class);
    	}

    	public Collection<NetworkBridgeViewMBean> getNetworkBridges()
    			throws Exception {
    		String brokerName = getBrokerName();
    		ObjectName query = new ObjectName("org.apache.activemq:BrokerName="
    				+ brokerName + ",Type=NetworkBridge,*");
    		Set<ObjectName> queryResult = queryNames(query, null);
    		return getManagedObjects(queryResult.toArray(new ObjectName[queryResult
    				.size()]), NetworkBridgeViewMBean.class);
    	}

    	@SuppressWarnings("unchecked")
    	public Collection<SubscriptionViewMBean> getQueueConsumers(String queueName)
    			throws Exception {
    		String brokerName = getBrokerName();
    		queueName = StringUtils.replace(queueName, "\"", "_");
    		ObjectName query = new ObjectName("org.apache.activemq:BrokerName="
    				+ brokerName
    				+ ",Type=Subscription,destinationType=Queue,destinationName="
    				+ queueName + ",*");
    		Set<ObjectName> queryResult = queryNames(query, null);
    		return getManagedObjects(queryResult.toArray(new ObjectName[queryResult
    				.size()]), SubscriptionViewMBean.class);
    	}

    	@SuppressWarnings("unchecked")
    	public Collection<SubscriptionViewMBean> getConsumersOnConnection(
    			String connectionName) throws Exception {
    		connectionName = StringUtils.replace(connectionName, ":", "_");
    		String brokerName = getBrokerName();
    		ObjectName query = new ObjectName("org.apache.activemq:BrokerName="
    				+ brokerName + ",Type=Subscription,clientId=" + connectionName
    				+ ",*");
    		Set<ObjectName> queryResult = queryNames(query, null);
    		return getManagedObjects(queryResult.toArray(new ObjectName[queryResult
    				.size()]), SubscriptionViewMBean.class);
    	}
        /**
         * 获取定时执行的队列的信息
         * @return
         * @throws Exception
         */
    	public JobSchedulerViewMBean getJobScheduler() throws Exception {
    		ObjectName name = getBrokerAdmin().getJMSJobScheduler();
    		return (JobSchedulerViewMBean) newProxyInstance(name,
    				JobSchedulerViewMBean.class, true);
    	}

    	public String getBrokerName() throws Exception {
    		return brokerService.getBrokerName();
    	}
    	/**
    	 * 获取Broker对象
    	 * @return
    	 * @throws Exception
    	 */
    	public Broker getBroker() throws Exception {
    		return brokerService.getBroker();
    	}
    	public ManagementContext getManagementContext() {
    		return brokerService.getManagementContext();
    	}

    	public ManagedRegionBroker getManagedBroker() throws Exception {
    		BrokerView adminView = brokerService.getAdminView();
    		if (adminView == null) {
    			return null;
    		}
    		return adminView.getBroker();
    	}

        public void purgeQueue(ActiveMQDestination destination) throws Exception {
            Set destinations = getManagedBroker().getQueueRegion().getDestinations(destination);
            for (Iterator i = destinations.iterator(); i.hasNext();) {
                Destination dest = (Destination) i.next();
                if (dest instanceof Queue) {
                    Queue regionQueue = (Queue) dest;
                    regionQueue.purge();
                }
            }
        }
        /**
         * 
         * @param name
         * @param query
         * @return
         * @throws Exception
         */
        public Set queryNames(ObjectName name, QueryExp query) throws Exception {
            return getManagementContext().queryNames(name, query);
        }
        /**
         * 通过JMX获取ActiveMQ各种信息
         * @param objectName
         * @param interfaceClass
         * @param notificationBroadcaster
         * @return
         */
        public Object newProxyInstance(ObjectName objectName, Class interfaceClass, boolean notificationBroadcaster) {
            return getManagementContext().newProxyInstance(objectName, interfaceClass, notificationBroadcaster);
        }
        /**
         * 监控内存信息
         * @throws Exception
         */
        public void monitorMermeryUsage() throws Exception{
        	SystemUsage proSystemUsage=brokerService.getProducerSystemUsage();
        	printSystemUsage(proSystemUsage);
        	SystemUsage syUage=brokerService.getSystemUsage();
        	printSystemUsage(syUage);
        	SystemUsage consumsyUage=brokerService.getConsumerSystemUsage();
        	printSystemUsage(consumsyUage);
        }
        
        /**
         * 打印内存信息
         * @param syUage
         */
        public void printSystemUsage(SystemUsage syUage){
        	String name=syUage.getName();
        	LOG.info("SystemUsage name ="+name);
        	MemoryUsage memeryUsage =syUage.getMemoryUsage();
        	LOG.info("memeryUsage PercentUsage name ="+memeryUsage.getPercentUsage());
        	LOG.info("memeryUsage Limit name ="+memeryUsage.getLimit());
        	LOG.info("memeryUsage Usage name ="+memeryUsage.getUsage());
        	TempUsage tempUsage =syUage.getTempUsage();
        	LOG.info("tempUsage PercentUsage name ="+tempUsage.getPercentUsage());
        	LOG.info("tempUsage Limit name ="+tempUsage.getLimit());
        	LOG.info("tempUsage Usage name ="+tempUsage.getUsage());
        	StoreUsage storeUsage=syUage.getStoreUsage();
        	LOG.info("storeUsage PercentUsage name ="+storeUsage.getPercentUsage());
        	LOG.info("storeUsage Limit name ="+storeUsage.getLimit());
        	LOG.info("storeUsage Usage name ="+storeUsage.getUsage());
        }
	    /**
	     * 监控消息的方法
	     * @throws Exception
	     */
	    public void monitorQueueAndTopic() throws Exception{
	    	LOG.info("==========Connection =================");
	    	Collection<ConnectionViewMBean> conVBean=getConnections();
	    	for (ConnectionViewMBean bean : conVBean) {
	    		LOG.info("remoteAddress:"+bean.getRemoteAddress());
	    		LOG.info("isActive:"+bean.isActive());
	    		LOG.info("isConnected:"+bean.isConnected());
			}
	    	LOG.info("=============Topic =================");
	    	Collection<TopicViewMBean>  topicVBean=getTopics();
	    	for (TopicViewMBean topicbean : topicVBean) {
	    		LOG.info("beanName ="+topicbean.getName());
	    		LOG.info("ConsumerCount ="+topicbean.getConsumerCount());
	    		LOG.info("DequeueCount ="+topicbean.getDequeueCount());
	    		LOG.info("EnqueueCount ="+topicbean.getEnqueueCount());
	    		LOG.info("DispatchCount ="+topicbean.getDispatchCount());
	    		LOG.info("ExpiredCount ="+topicbean.getExpiredCount());
	    		LOG.info("MaxEnqueueTime ="+topicbean.getMaxEnqueueTime());
	    		LOG.info("ProducerCount ="+topicbean.getProducerCount());
	    		LOG.info("MemoryPercentUsage ="+topicbean.getMemoryPercentUsage());
	    		LOG.info("MemoryLimit ="+topicbean.getMemoryLimit());
			}
	    	LOG.info("============Queue===================");
	    	Collection<QueueViewMBean> queuqVBeanList=getQueues();
	      	for (QueueViewMBean queuebean : queuqVBeanList) {
	    		LOG.info(" queue beanName ="+queuebean.getName());
	    		LOG.info("ConsumerCount ="+queuebean.getConsumerCount());
	    		LOG.info("DequeueCount ="+queuebean.getDequeueCount());
	    		LOG.info("EnqueueCount ="+queuebean.getEnqueueCount());
	    		LOG.info("DispatchCount ="+queuebean.getDispatchCount());
	    		LOG.info("ExpiredCount ="+queuebean.getExpiredCount());
	    		LOG.info("MaxEnqueueTime ="+queuebean.getMaxEnqueueTime());
	    		LOG.info("ProducerCount ="+queuebean.getProducerCount());
	    		LOG.info("MemoryPercentUsage ="+queuebean.getMemoryPercentUsage());
	    		LOG.info("MemoryLimit ="+queuebean.getMemoryLimit());
			}
	    }
	    
	    public void test() throws Exception{
	       	//获取初始化信息
	    	init();
	    	for (int i = 0; i < 10; i++) {
	    		sendTopic(i);
			}
	    	for (int i = 0; i < 10; i++) {
	    		sendPS(i);
			}
            monitorQueueAndTopic();	    	
	    	Thread.sleep(5000);
            receiveTopic();
	    	receivePS();
	    }
	    /**
	     * P2P发送方式
	     * @throws JMSException
	     */
	    public void sendTopic(int i) throws JMSException{
	    	Session session=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
	    	Destination topic=session.createQueue("activemq.queue"+i);
	    	MessageProducer productor=(MessageProducer) session.createProducer(topic);
	    	TextMessage txtMessage =session.createTextMessage();
	    	txtMessage.setText("this is a topic message "+i);
	    	productor.send(txtMessage);
	    }
	    /**
	     * Sub/Pub发送方式
	     * @throws JMSException
	     */
	    public void sendPS(int i) throws JMSException{
	    	Session session=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
	    	Destination topic=session.createTopic("activemq.topic"+i);
	    	MessageProducer productor=(MessageProducer) session.createProducer(topic);
	    	TextMessage txtMessage =session.createTextMessage();
	    	txtMessage.setText("this is a topic message "+i);
	    	productor.send(txtMessage);
	    }
	    
	    /**
	     * P2P接受方式
	     * @throws JMSException
	     */
	    public void receiveTopic() throws JMSException{
	    	Session session=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
	    	Destination topic=session.createQueue("activemq.queue");
	    	
	    	MessageConsumer consumer=(MessageConsumer) session.createConsumer(topic);
	    	TextMessage txtMessage =(TextMessage)consumer.receive();
	    	System.out.println("txtMessage ="+txtMessage.getText());
	    }
	    /**
	     * Sub/Pub接受方式
	     * @throws JMSException
	     */
	    public void receivePS() throws JMSException{
	       	Session session=connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
	    	Destination topic=session.createQueue("activemq.topic");
	    	
	    	MessageConsumer consumer=(MessageConsumer) session.createConsumer(topic);
	    	TextMessage txtMessage =(TextMessage)consumer.receive();
	    	System.out.println("txtMessage ="+txtMessage.getText());
	    }
	    
	    
	    /**
	     * 初始化消息的方法
	     * @throws Exception
	     */
	    protected void init() throws Exception {
	        if (brokerService == null) {
	            brokerService = createBroker();
	        }
	        ActiveMQConnectionFactory factory = createConnectionFactory();
	        connection = factory.createConnection();
	        //添加Connection 的状态监控的方法
	        monitorConnection(connection);
	        //启动连接
	        connection.start();
	    }
	    
	    /**
	     * 监控台ActiveMQConnection的状态的方法
	     * @param connection
	     */
	    public void monitorConnection(Connection connection){
	    	ActiveMQConnection activemqconnection =(ActiveMQConnection)connection;
	    	//添加ActiveMQConnection的监听类
	    	activemqconnection.addTransportListener(new TransportListener(){

				public void onCommand(Object object) {
					LOG.info("onCommand  object "+object);
					
				}

				public void onException(IOException ex) {
					LOG.info("onException ="+ex.getMessage());
				}

				public void transportInterupted() {
					LOG.info("transportInterupted =");
				}

				public void transportResumed() {
					LOG.info("transportResumed .........");					
				}
	    		
	    	});
	    }

	    protected void destory() throws Exception {
	        connection.close();
	        if (brokerService != null) {
	            brokerService.stop();
	        }
	    }
        /**
         * 创建ActiveMQConnectionFactory
         * @return
         * @throws Exception
         */
	    protected ActiveMQConnectionFactory createConnectionFactory()
	            throws Exception {
	        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
	                bindAddress);
	        return cf;
	    }

	    /***
	     * 创建一个Broker监听进程
	     * @return
	     * @throws Exception
	     */
	    protected BrokerService createBroker() throws Exception {
	    	//创建BrokerService对象
	        BrokerService answer = new BrokerService();
	        //配置监听相关的信息
	        configureBroker(answer);
	        //启动Broker的启动
	        answer.start();
	        
	        return answer;
	    }

	    /**
	     * 配置Broker
	     * @param answer
	     * @throws Exception
	     */
	    protected void configureBroker(BrokerService answer) throws Exception {
	        //创建持久化信息
	    	answer.setPersistent(false);
	    	//设置采用JMX管理
	    	answer.setUseJmx(true);
	        ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
	        strategy.setLimit(10);
	        PolicyEntry tempQueueEntry = createPolicyEntry(strategy);
	        tempQueueEntry.setTempQueue(true);
	        PolicyEntry tempTopicEntry = createPolicyEntry(strategy);
	        tempTopicEntry.setTempTopic(true);
	        PolicyMap pMap = new PolicyMap();
	        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
	        policyEntries.add(tempQueueEntry);
	        policyEntries.add(tempTopicEntry);
	        pMap.setPolicyEntries(policyEntries);
	        answer.setDestinationPolicy(pMap);
	        //绑定url
	        answer.addConnector(bindAddress);
	        answer.setDeleteAllMessagesOnStartup(true);
	    }
        /**
         * 创建一个配置策略
         * @param strategy
         * @return
         */
	    private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) {
	        PolicyEntry policy = new PolicyEntry();
	        policy.setAdvisdoryForFastProducers(true);
	        policy.setAdvisoryForConsumed(true);
	        policy.setAdvisoryForDelivery(true);
	        policy.setAdvisoryForDiscardingMessages(true);
	        policy.setAdvisoryForSlowConsumers(true);
	        policy.setAdvisoryWhenFull(true);
	        policy.setProducerFlowControl(false);
	        policy.setPendingMessageLimitStrategy(strategy);
	        return policy;
	    }
	    
	    public void object2string(Object object ){
	    	ToStringBuilder.reflectionToString(object, ToStringStyle.MULTI_LINE_STYLE);
	    }
	    
}

 

package easyway.app.activemq.demo.monitors;

public class ActiveMQMonitorTest {
	public static void main(String[] args) throws Exception {
		ActiveMQMonitor monitor=new ActiveMQMonitor();
		monitor.test();
		
	}
}

 

分享到:
评论
17 楼 ZY199266 2018-01-31  
获取不到任何消息信息,请问这是什么原因呢?
16 楼 xiaoyao霄 2018-01-29  
DestinationSourceMonitor 报错 应该导入那个包
15 楼 zzd0058 2017-07-26  
你好我想知道getConnections为啥是空的
14 楼 longgangbai 2012-07-16  
myfwz 写道
你有没有做过类似生产的速度与消费者(一或多个)的速度对比的工具。

没有响应的工具
13 楼 longgangbai 2012-07-16  
myfwz 写道
1099这个端口在哪个配置文件里面?需要手动执行activemq-admin start来开启这个JMs监控端口是吗?

actviemq的admin管理端 好像有一个bat文件可以编辑的。你修改配置一下
12 楼 myfwz 2012-07-10  
你有没有做过类似生产的速度与消费者(一或多个)的速度对比的工具。
11 楼 myfwz 2012-07-10  
1099这个端口在哪个配置文件里面?需要手动执行activemq-admin start来开启这个JMs监控端口是吗?
10 楼 longgangbai 2012-07-10  
myfwz 写道
我在本地运行的时候,不管是连我自己的MQ,还是远程的MQ,都没法运行起来。
本地我用netstat看过了,远程也一样使用netsat -ntl看过了,没有起1099这个端口。
另外:1099这个端口是ActiveMQ启动的时候拉起来的,还是监控程序设置的一个端口?

activemq启动时候的使用的端口。
              默认的是tcp://localhost:61616
1099为jms监控端口:
activemq-admin stop --jmxurl service:jmx:rmi:///jndi/rmi://remotehost:1099/jmxrmi --all
9 楼 longgangbai 2012-07-10  
myfwz 写道
呵呵,非常感谢你的回复,只是我想了解一下,你在运行这个类的时候是连接本地MQ还是远程的MQ,是否需要在MQ上面做相应的设置,如果有的话,麻烦提供相应的配置文件给我学习下。

activemq安装之后有一个web版你发送多个消息,不要点击接受,之后,开启这个程序运行。我当时是这样测试的。
8 楼 myfwz 2012-07-10  
我在本地运行的时候,不管是连我自己的MQ,还是远程的MQ,都没法运行起来。
本地我用netstat看过了,远程也一样使用netsat -ntl看过了,没有起1099这个端口。
另外:1099这个端口是ActiveMQ启动的时候拉起来的,还是监控程序设置的一个端口?
7 楼 myfwz 2012-07-10  
呵呵,非常感谢你的回复,只是我想了解一下,你在运行这个类的时候是连接本地MQ还是远程的MQ,是否需要在MQ上面做相应的设置,如果有的话,麻烦提供相应的配置文件给我学习下。
6 楼 longgangbai 2012-07-10  
myfwz 写道
1099这个端口是ActiveMQ提供出来的吗?我的电脑重启后,再去连接也会报这个错呢。

java.rmi.server.ExportException: Port already in use: 1099; nested exception is:
java.net.BindException: Address already in use: JVM_Bind
这句话的中文意思,端口1099已经使用,JVM_Bind已经使用该地址。
5 楼 myfwz 2012-07-09  
1099这个端口是ActiveMQ提供出来的吗?我的电脑重启后,再去连接也会报这个错呢。
4 楼 longgangbai 2012-07-04  
myfwz 写道
你好,我在本地运行一个ActiveMQ,再运行这个程序报了以下异常:

(1   ms)   [main]   DEBUG:   org.apache.activemq.broker.jmx.ManagementContext#createConnector   :   Failed to create local registry
java.rmi.server.ExportException: Port already in use: 1099; nested exception is:
java.net.BindException: Address already in use: JVM_Bind
at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:310)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:218)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:393)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:129)
at sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:190)
at sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:92)
at sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:68)
at java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:222)

JMS对应的本地注册的本地端口意见使用,netstat -a | find "1099" 查看那个程序占用,或者重启机器,再试!
3 楼 myfwz 2012-06-25  
你好,我在本地运行一个ActiveMQ,再运行这个程序报了以下异常:

(1   ms)   [main]   DEBUG:   org.apache.activemq.broker.jmx.ManagementContext#createConnector   :   Failed to create local registry
java.rmi.server.ExportException: Port already in use: 1099; nested exception is:
java.net.BindException: Address already in use: JVM_Bind
at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:310)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:218)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:393)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:129)
at sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:190)
at sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:92)
at sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:68)
at java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:222)
2 楼 longgangbai 2012-05-25  
962685987962685987 写道
你好,我想问一下,我在本地搭建的 activeMQ WEB 界面中 发送了几个消息(queue),但是通过上面的程序却无法获取消息信息 ,即是说:getQueues() 获取不到任何消息信息,请问这是什么原因呢?

你可能要检查你的activemq配置是否支持jmx,如果不支持是监控不到的,上面的demo是基于jmx实现的,请检查相关的配置。
1 楼 962685987962685987 2012-05-24  
你好,我想问一下,我在本地搭建的 activeMQ WEB 界面中 发送了几个消息(queue),但是通过上面的程序却无法获取消息信息 ,即是说:getQueues() 获取不到任何消息信息,请问这是什么原因呢?

相关推荐

    activeMQ JMS 3种创建方式

    通过JMX,可以创建和管理QUEUE和TOPIC。 1. 连接JMX:使用JConsole或JMX Console等工具连接到运行中的ActiveMQ服务器。 2. 查找MBean:在MBean浏览器中找到ActiveMQ相关的MBeans,如`org.apache.activemq:type=...

    ActiveMQ使用手册(中文版)

    - **定义:** 配置JMX以获取ActiveMQ的相关监控信息。 - **步骤:** 在 `conf/activemq.xml` 文件中启用JMX监控。 **10.2 JMX 的 JAVA 端获取信息:** - **定义:** 通过Java客户端访问JMX接口来获取监控数据。 - **...

    ActiveMQ实践入门指南_ActiveMQ实践入门指南_源码

    2. JMX:通过Java管理扩展(JMX)接口进行远程管理。 3. REST API:ActiveMQ还提供了RESTful API,便于集成到其他系统中。 七、安全配置 1. 用户认证:配置users.properties和groups.properties文件,设置用户和...

    Springboot ActiveMQ 集成.rar

    - 可以通过ActiveMQ的Web管理界面或JMX监控工具来查看消息队列的状态,如消息数量、消费者状态等。 综上所述,Spring Boot与ActiveMQ的集成为我们的应用带来了强大的消息处理能力,使得我们可以构建出更健壮、可...

    ActiveMQ实例Demo

    同时,ActiveMQ还具有强大的监控和管理工具,通过JMX或者Web管理控制台,可以实时查看和调整队列、主题的状态。 总结来说,"ActiveMQ实例Demo"为我们展示了如何利用ActiveMQ构建消息传递系统,无论是点对点还是发布...

    ActiveMQ-jms jar包

    3. **Management Clients**: 使用JMX API,开发人员可以编写客户端应用程序来远程访问和操作MBean,从而监控和管理ActiveMQ服务器。 4. **Notifications**: MBeans可以发送通知事件,这些事件可以被注册的监听器...

    activeMq技术手册1

    另外,ActiveMQ 还提供了 JMX(Java Management Extensions)管理工具,方便监控和管理消息代理的状态。 总之,ActiveMQ 是一个强大的消息中间件,具有丰富的特性和广泛的协议支持,可以帮助开发者构建健壮、高效的...

    ActiveMQ整合Spring(多消费者)

    此外,Spring还支持JMX(Java Management Extensions),可以通过JMX管理ActiveMQ组件。 整合ActiveMQ与Spring有助于构建可扩展、高可用的分布式系统。通过合理配置和使用消息队列,可以提高系统的响应速度,降低...

    activeMQ实例

    4. **监控与日志**:定期查看日志,利用监控工具如JMX进行性能监控。 六、实战应用 1. **微服务间通信**:在微服务架构中,ActiveMQ可以作为服务间通信的桥梁。 2. **任务调度**:将任务消息放入队列,由后台任务...

    ActiveMQ运行说明

    ActiveMQ 提供了丰富的配置选项和特性,比如通过网络集群实现高可用性,通过策略控制消息分发,以及通过 JMX 进行监控等。 总结来说,ActiveMQ 是一个强大的消息中间件,提供了灵活的消息传递模型和丰富的功能,...

    java操作activeMQ(java项目代码及jar包可运行,队列和订阅模式)

    - 通过JMX(Java Management Extensions)进行远程管理和监控。 这个Java项目代码应该包括了创建连接、发送和接收消息的示例,涵盖了队列和主题的基本操作。通过运行这些代码,你可以更好地理解ActiveMQ的工作原理...

    activemq学习入门第一步

    此外,还可以通过JMX工具进行远程管理和监控。 7. **高级特性** ActiveMQ提供了一些高级特性,如持久化、消息分页、优先级、时间戳、事务支持和死信队列等,以提高系统的可靠性和灵活性。例如,持久化确保即使在...

    JMS.rar_activemq_jms_jms activemq

    此外,ActiveMQ还支持各种高级特性,如持久化消息、优先级、消息分页、消息组、事务等,以及通过JMX进行管理和监控。对于大型分布式系统,这些特性有助于提高系统的可靠性和性能。 总的来说,本示例旨在演示如何...

    ActiveMQ_in_Action

    - **API和集成**:ActiveMQ可以与其他编程语言(如.NET、Python等)集成,并通过JMX进行管理。 5. **高级用法**: - **消息优先级**:允许指定消息优先级,确保关键任务先处理。 - **消息分页**:当队列或主题中...

    activemq-demo.zip

    6. **管理工具与监控**:ActiveMQ提供了Web控制台,用户可以通过浏览器界面进行配置、监控和管理消息队列。此外,还可以通过JMX(Java Management Extensions)进行远程管理。 7. **安全与认证**:ActiveMQ支持多种...

    ActiveMQ操作指南

    同时,可以通过监控工具,如Prometheus和Grafana,对ActiveMQ的性能进行实时监控。 **五、高级特性** 1. **消息选择器(Message Selector)**:允许消费者仅接收满足特定条件的消息。 2. **事务(Transactions)*...

    activeMQ-API 5_2_0_0 api

    9. **JMX(Java Management Extensions)管理**:ActiveMQ API提供JMX接口,允许管理员监控和管理消息代理的状态,如查看队列长度、消费者数量等。 10. **安全与认证**:ActiveMQ API支持用户身份验证和权限控制,...

    ActiveMQ+Spring完整详解例子

    - 通过JMX(Java Management Extensions)接口,可以远程监控和管理ActiveMQ。 8. **最佳实践** - 适当配置消费者的并发数,避免资源浪费和消息堆积。 - 注意消息的大小和数量,避免内存溢出。 - 根据业务场景...

    apache-activemq-5.15.3-bin.zip

    - **管理工具**:提供Web控制台和JMX(Java Management Extensions)管理接口,方便监控和管理队列、主题和连接。 **3. 安装与配置** 下载并解压"apache-activemq-5.15.3-bin.zip"后,会得到包含bin、conf、lib等...

    archive_ ActiveMQ消息服务器 v5.17.5 [江西新余电信].zip.zip

    6. **管理工具**:ActiveMQ提供了Web控制台和JMX管理接口,便于监控和管理消息队列、连接和策略。 7. **灵活性**:可以部署在各种环境,包括云、容器和传统服务器,适应不同的IT架构需求。 在压缩包中,"output....

Global site tag (gtag.js) - Google Analytics