0 0

activemq 怎么用编程方式得到 queue上未处理的消息?5

JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
ObjectName activeMQ = new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Broker");
BrokerViewMBean mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, activeMQ,BrokerViewMBean.class, true);

for (ObjectName name : mbean.getQueues()) {
//	List<?> mlist = (List<?>) conn.invoke(name, "browseMessages", new Object[] {}, new String[] {});
	CompositeData[] messages = (CompositeData[]) conn.invoke(name, "browse", new Object[] {}, new String[] {});
	for( CompositeData message :messages){
		System.out.println(message.get("Text"));
	}
}


我目前是用这种方式查询queue的。但只有消息为 TextMessage时,message.get("Text")才能取到值。
我猜是因为QueueViewMBean的browse方法得到的是CompositeData[],和消息差距较大。

查看api发现的browseMessages
List<?>	browseMessages() 
          Browses the current destination returning a list of messages

看起来这个更适合,但是一用就保错了。
Exception in thread "main" java.rmi.UnmarshalException: error unmarshalling return; nested exception is: 
	java.io.WriteAbortedException: writing aborted; java.io.NotSerializableException: org.apache.activemq.command.ActiveMQObjectMessage
	at sun.rmi.server.UnicastRef.invoke(Unknown Source)
	at com.sun.jmx.remote.internal.PRef.invoke(Unknown Source)
	at javax.management.remote.rmi.RMIConnectionImpl_Stub.invoke(Unknown Source)
	at javax.management.remote.rmi.RMIConnector$RemoteMBeanServerConnection.invoke(Unknown Source)
	at org.DestinationSourceM.main(DestinationSourceM.java:49)
Caused by: java.io.WriteAbortedException: writing aborted; java.io.NotSerializableException: org.apache.activemq.command.ActiveMQObjectMessage
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.readObject(Unknown Source)
	at java.util.ArrayList.readObject(Unknown Source)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
	at java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.io.ObjectInputStream.readObject(Unknown Source)
	at sun.rmi.server.UnicastRef.unmarshalValue(Unknown Source)
	... 5 more
Caused by: java.io.NotSerializableException: org.apache.activemq.command.ActiveMQObjectMessage
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
	at java.util.ArrayList.writeObject(ArrayList.java:570)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:945)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
	at sun.rmi.server.UnicastRef.marshalValue(UnicastRef.java:274)
	at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:315)
	at sun.rmi.transport.Transport$1.run(Transport.java:159)
	at java.security.AccessController.doPrivileged(Native Method)
	at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
	at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:662)
2012年6月04日 00:18

3个答案 按时间排序 按投票排序

0 0

搞定了没,楼主?能否分享下

2015年6月24日 15:29
0 0

List<?>接受的应该是可序列化的对象

2012年6月04日 09:58
0 0

用jmx调用 mq的消息api我还是第一次看到,你不会是把jms当成了jmx了吧?

2012年6月04日 09:13

相关推荐

    Apache ActiveMQ Queue Topic 详解

    ### Apache ActiveMQ Queue & Topic 详解 ...综上所述,Apache ActiveMQ 不仅为开发者提供了强大的消息处理能力,还具备灵活的部署选项和广泛的语言支持,是构建高性能分布式应用的理想选择之一。

    activeMQ发送消息返回消息

    9. **反馈机制**:如果发送者希望在发送消息后得到接收者的确认信息,可以通过设置消息属性或者使用特定协议(如AMQP的Acknowledgement模式)来实现。ActiveMQ支持回调机制,允许在消息被成功接收后返回确认信息给...

    go语言实现使用activemq 收发消息

    在本文中,我们将深入探讨如何使用Go语言实现与ActiveMQ的通信,主要关注消息的收发功能。ActiveMQ是Apache软件基金会开发的一款开源消息中间件,支持多种协议,包括我们这里提到的STOMP(Simple Text Oriented ...

    简单的activemq点对点的同步消息模型

    3. **创建队列**:在ActiveMQ中,可以通过管理控制台或编程方式创建队列。例如,通过JMS API,可以创建一个`Queue`对象来表示队列,并向其发送和接收消息。 4. **生产者(Publisher)**:生产者是发送消息的实体。...

    JAVA编程之Spring boot-activeMQ示例

    分别实现生产者-消费者模式和发布-订阅模式,作为java编程发送消息和消费消息的基础示例。 源码主要包含如下内容: 1.spring boot配置初始化activeMQ 2.队列类型queue,生产者发送队列消息,以及消费者消费相关队列...

    Spring+ActiveMQ消息队列+前台接收消息

    5. **消息的持久化**:ActiveMQ允许配置消息的持久性,即使服务器重启,未被消费的消息也不会丢失。在Spring配置中,可以通过设置`JmsTemplate`的`deliveryPersistent`属性为`true`来实现。 6. **事务管理**:...

    用C#实现的ActiveMQ发布/订阅消息传送

    在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...

    ActiveMQ学习笔记之九--发送消息到队列中

    这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...

    ActiveMQ实现的消息收发案例

    8. **接收消息**:使用消费者监听消息,可以采用阻塞或非阻塞方式。对于点对点模型,通常使用`receive()`方法;对于发布/订阅模型,可能需要注册一个消息监听器。 9. **处理消息**:消费者接收到消息后,进行相应的...

    ActiveMQ使用入门.pdf

    - **Session**:单线程上下文,用于发送和接收消息,支持事务处理。 - **Message**:消息对象,包括消息头(必填)、属性(可选)和消息体(可选),有五种类型:TextMessage、MapMessage、ByteMessage、...

    activemq消息中间件-视频教程

    5. **消息持久化**:讲解ActiveMQ如何实现消息的持久化,确保在网络故障或服务器重启后仍能恢复未处理的消息。 6. **高级特性**:探讨ActiveMQ的事务管理、消息优先级、DLQ(死信队列)和消息重试等高级特性,以及...

    activemq的简单配置

    - **Number of pending messages**:未处理的消息总数。 - **Number of consumers**:当前消费者的数量。 - **Messages enqueued**:已发送到队列的消息总数。 - **Messages dequeued**:已从队列中移除的消息...

    activemq 配置说明与activemq入门讲解

    2. **JMS编程**:使用JMS API与ActiveMQ交互,创建ConnectionFactory,然后创建Connection,Session,Destination(Queue或Topic),最后创建MessageProducer和MessageConsumer。 3. **Web控制台**:ActiveMQ内置了...

    ActiveMQ消息队列主题订阅Spring整合

    在IT行业中,消息队列(Message Queue,简称MQ)是一种重要的中间件,它在分布式系统中扮演着数据通信的关键角色。ActiveMQ是Apache软件基金会开发的一款开源MQ产品,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等...

    spring + activemq 消息sample

    7. **消息持久化**:ActiveMQ支持消息的持久化存储,即使在服务器重启后,未被消费的消息也不会丢失。 8. **监控与管理**:ActiveMQ提供了一个Web控制台,可以实时查看消息队列的状态、监控性能等。 在实际项目中...

    Spring和ActiveMQ的整合实例源码

    9. **Spring注解**:在提供的实例中,可能包含了使用注解的方式配置Spring与ActiveMQ的集成,如`@EnableJms`启动JMS支持,`@JmsListener`定义消息监听器等。 10. **Tomcat服务器**:Tomcat是一个流行的Java Web...

    Spring集成ActiveMQ配置

    Spring集成ActiveMQ是将Spring框架与ActiveMQ消息中间件相结合,实现异步处理和解耦应用程序的关键技术。在本文中,我们将深入探讨如何配置和使用这一组合,以及它在实际项目中的应用。 首先,让我们了解Spring框架...

    ActiveMQ.zip

    **正文** ...综上所述,ActiveMQ是一个强大的消息中间件,它在分布式系统中起到了关键的角色,提供可靠的异步通信机制。通过学习和掌握ActiveMQ,开发者能够构建出更加高效、稳定、可扩展的系统架构。

    ActiveMQ开发实例-1

    点对点模式则使用队列(Queue),一个生产者发送消息到一个队列,一个或多个消费者可以从队列中取出并处理消息。但每个消息只被一个消费者消费,实现了一对一的通信。 ActiveMQ-CPP 库提供了以下关键组件和概念: ...

    39、ActiveMQ.pdf

    在本文中,我们将深入探讨ActiveMQ,这是一个广泛使用的开源消息中间件,属于Apache软件基金会的项目。ActiveMQ是基于Java的消息传递平台,它允许应用程序通过消息传递进行通信,从而实现解耦、流量控制和异步处理。...

Global site tag (gtag.js) - Google Analytics