JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
ObjectName activeMQ = new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Broker");
BrokerViewMBean mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, activeMQ,BrokerViewMBean.class, true);
for (ObjectName name : mbean.getQueues()) {
// List<?> mlist = (List<?>) conn.invoke(name, "browseMessages", new Object[] {}, new String[] {});
CompositeData[] messages = (CompositeData[]) conn.invoke(name, "browse", new Object[] {}, new String[] {});
for( CompositeData message :messages){
System.out.println(message.get("Text"));
}
}
我目前是用这种方式查询queue的。但只有消息为 TextMessage时,message.get("Text")才能取到值。
我猜是因为QueueViewMBean的browse方法得到的是CompositeData[],和消息差距较大。
查看api发现的browseMessages
List<?> browseMessages()
Browses the current destination returning a list of messages
看起来这个更适合,但是一用就保错了。
Exception in thread "main" java.rmi.UnmarshalException: error unmarshalling return; nested exception is:
java.io.WriteAbortedException: writing aborted; java.io.NotSerializableException: org.apache.activemq.command.ActiveMQObjectMessage
at sun.rmi.server.UnicastRef.invoke(Unknown Source)
at com.sun.jmx.remote.internal.PRef.invoke(Unknown Source)
at javax.management.remote.rmi.RMIConnectionImpl_Stub.invoke(Unknown Source)
at javax.management.remote.rmi.RMIConnector$RemoteMBeanServerConnection.invoke(Unknown Source)
at org.DestinationSourceM.main(DestinationSourceM.java:49)
Caused by: java.io.WriteAbortedException: writing aborted; java.io.NotSerializableException: org.apache.activemq.command.ActiveMQObjectMessage
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at java.util.ArrayList.readObject(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at sun.rmi.server.UnicastRef.unmarshalValue(Unknown Source)
... 5 more
Caused by: java.io.NotSerializableException: org.apache.activemq.command.ActiveMQObjectMessage
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at java.util.ArrayList.writeObject(ArrayList.java:570)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:945)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at sun.rmi.server.UnicastRef.marshalValue(UnicastRef.java:274)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:315)
at sun.rmi.transport.Transport$1.run(Transport.java:159)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
相关推荐
### Apache ActiveMQ Queue & Topic 详解 ...综上所述,Apache ActiveMQ 不仅为开发者提供了强大的消息处理能力,还具备灵活的部署选项和广泛的语言支持,是构建高性能分布式应用的理想选择之一。
9. **反馈机制**:如果发送者希望在发送消息后得到接收者的确认信息,可以通过设置消息属性或者使用特定协议(如AMQP的Acknowledgement模式)来实现。ActiveMQ支持回调机制,允许在消息被成功接收后返回确认信息给...
在本文中,我们将深入探讨如何使用Go语言实现与ActiveMQ的通信,主要关注消息的收发功能。ActiveMQ是Apache软件基金会开发的一款开源消息中间件,支持多种协议,包括我们这里提到的STOMP(Simple Text Oriented ...
3. **创建队列**:在ActiveMQ中,可以通过管理控制台或编程方式创建队列。例如,通过JMS API,可以创建一个`Queue`对象来表示队列,并向其发送和接收消息。 4. **生产者(Publisher)**:生产者是发送消息的实体。...
分别实现生产者-消费者模式和发布-订阅模式,作为java编程发送消息和消费消息的基础示例。 源码主要包含如下内容: 1.spring boot配置初始化activeMQ 2.队列类型queue,生产者发送队列消息,以及消费者消费相关队列...
5. **消息的持久化**:ActiveMQ允许配置消息的持久性,即使服务器重启,未被消费的消息也不会丢失。在Spring配置中,可以通过设置`JmsTemplate`的`deliveryPersistent`属性为`true`来实现。 6. **事务管理**:...
在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...
这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...
8. **接收消息**:使用消费者监听消息,可以采用阻塞或非阻塞方式。对于点对点模型,通常使用`receive()`方法;对于发布/订阅模型,可能需要注册一个消息监听器。 9. **处理消息**:消费者接收到消息后,进行相应的...
- **Session**:单线程上下文,用于发送和接收消息,支持事务处理。 - **Message**:消息对象,包括消息头(必填)、属性(可选)和消息体(可选),有五种类型:TextMessage、MapMessage、ByteMessage、...
5. **消息持久化**:讲解ActiveMQ如何实现消息的持久化,确保在网络故障或服务器重启后仍能恢复未处理的消息。 6. **高级特性**:探讨ActiveMQ的事务管理、消息优先级、DLQ(死信队列)和消息重试等高级特性,以及...
- **Number of pending messages**:未处理的消息总数。 - **Number of consumers**:当前消费者的数量。 - **Messages enqueued**:已发送到队列的消息总数。 - **Messages dequeued**:已从队列中移除的消息...
- **启动Broker的方法**:可以通过命令行工具 `activemq start` 启动 Broker,也可以通过编程方式启动。 - **单独应用的开发**:编写 Java 应用程序使用 ActiveMQ 的 JMS API 进行消息收发。 - **结合Spring的开发**...
2. **JMS编程**:使用JMS API与ActiveMQ交互,创建ConnectionFactory,然后创建Connection,Session,Destination(Queue或Topic),最后创建MessageProducer和MessageConsumer。 3. **Web控制台**:ActiveMQ内置了...
在IT行业中,消息队列(Message Queue,简称MQ)是一种重要的中间件,它在分布式系统中扮演着数据通信的关键角色。ActiveMQ是Apache软件基金会开发的一款开源MQ产品,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等...
7. **消息持久化**:ActiveMQ支持消息的持久化存储,即使在服务器重启后,未被消费的消息也不会丢失。 8. **监控与管理**:ActiveMQ提供了一个Web控制台,可以实时查看消息队列的状态、监控性能等。 在实际项目中...
9. **Spring注解**:在提供的实例中,可能包含了使用注解的方式配置Spring与ActiveMQ的集成,如`@EnableJms`启动JMS支持,`@JmsListener`定义消息监听器等。 10. **Tomcat服务器**:Tomcat是一个流行的Java Web...
Spring集成ActiveMQ是将Spring框架与ActiveMQ消息中间件相结合,实现异步处理和解耦应用程序的关键技术。在本文中,我们将深入探讨如何配置和使用这一组合,以及它在实际项目中的应用。 首先,让我们了解Spring框架...
**正文** ...综上所述,ActiveMQ是一个强大的消息中间件,它在分布式系统中起到了关键的角色,提供可靠的异步通信机制。通过学习和掌握ActiveMQ,开发者能够构建出更加高效、稳定、可扩展的系统架构。
点对点模式则使用队列(Queue),一个生产者发送消息到一个队列,一个或多个消费者可以从队列中取出并处理消息。但每个消息只被一个消费者消费,实现了一对一的通信。 ActiveMQ-CPP 库提供了以下关键组件和概念: ...