java代码:
package com.yanzhi.system; import com.yanzhi.tools.C; import com.yanzhi.tools.Global; import com.yanzhi.tools.StringUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.pool.PooledConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.connection.SingleConnectionFactory; import javax.jms.*; import java.sql.Timestamp; import java.util.List; /** * Created by xiaoyunlian on 2016/2/14. * activemq消费者实例 */ public class ConsumerApp implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApp.class); public static void start(){ try { //连接 ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory(); targetConnectionFactory.setBrokerURL(Global.getBrokerURL()); targetConnectionFactory.setTrustAllPackages(true); SingleConnectionFactory connectionFactory = new SingleConnectionFactory(); connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接 Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //队列 ActiveMQQueue yanzhiQueueDestination = (ActiveMQQueue) session.createQueue("testQueue1,testQueue2,testQueue3"); ConsumerApp consumerMessageListener = new ConsumerApp(); MessageConsumer consumer = session.createConsumer(yanzhiQueueDestination); consumer.setMessageListener(consumerMessageListener); System.err.println("~~~~~~~~~~~~~~~~~~~~~~ active MQ 消费者监听器 启动成功~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); }catch (Exception e){ e.printStackTrace(); } } @Override public void onMessage(Message msg) { try { Destination dest = msg.getJMSDestination(); if (msg instanceof TextMessage) { ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)msg; String name = activeMQTextMessage.getDestination().getPhysicalName(); TextMessage message = (TextMessage) msg; System.err.println("队列:"+name+"接收者接到一个String消息:"+message.getText()); } else if (msg instanceof MapMessage) { ActiveMQMapMessage activeMQMapMessage = (ActiveMQMapMessage) msg; String destinationName = activeMQMapMessage.getDestination().getPhysicalName(); } else if (msg instanceof StreamMessage) { StreamMessage message = (StreamMessage) msg; System.out.println("------Received StreamMessage------"); System.out.println(message.readString()); System.out.println(message.readBoolean()); System.out.println(message.readLong()); } else if (msg instanceof ObjectMessage) { ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage)msg; String destinationName = activeMQObjectMessage.getDestination().getPhysicalName(); if (Global.getPkQuene().equals(destinationName)){ ObjectMessage objectMessage = (ObjectMessage) msg; Object object = objectMessage.getObject(); if (object instanceof List){ // do something } } if (Global.getFaceValueReportQuene().equals(destinationName)){ // do something } if (Global.getRegUserQuene().equals(destinationName)){ // do something } } else if (msg instanceof BytesMessage) { System.out.println("------Received BytesMessage------"); BytesMessage message = (BytesMessage) msg; byte[] byteContent = new byte[1024]; int length = -1; StringBuffer content = new StringBuffer(); while ((length = message.readBytes(byteContent)) != -1) { content.append(new String(byteContent, 0, length)); } System.out.println(content.toString()); } else { System.out.println(msg); } } catch (JMSException e) { LOGGER.error("error {}", e); } } }
调用上面的start()方法,即可启动消息队列的消费者。
相关推荐
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。
在这个场景下,我们将探讨如何在Java环境中动态创建ActiveMQ消费者。 动态创建ActiveMQ消费者意味着程序可以在运行时根据需求创建或销毁消费者,而不是在编译时静态配置。这种方式提供了更大的灵活性,可以应对系统...
在本项目中,我们探讨的是如何使用SpringBoot集成Apache ActiveMQ来构建一个生产者和消费者的应用。SpringBoot以其简洁的配置和快速启动特性,成为现代Java应用开发的首选框架之一,而ActiveMQ则是流行的消息中间件...
在ActiveMQ中,可以通过设置消费者的订阅类型(Durable Subscription或Shared Subscription)来实现消息的多消费者分发策略。 6. **事务管理**: Spring与ActiveMQ整合时,还可以支持JMS事务,确保消息的一致性。`...
本案例代码包含了一个基本的ActiveMQ生产者和消费者的应用示例,帮助开发者理解如何使用ActiveMQ进行消息传递。 1. **JMS(Java Message Service)简介** JMS是Java平台上的一个标准API,它定义了生产、发送、接收...
本实例主要关注如何在Spring框架中配置和使用ActiveMQ消费者。 首先,我们来理解`am_spring_consumer`项目的基本结构。这个项目很可能包含了一个Spring配置文件(如`applicationContext.xml`),用于定义与ActiveMQ...
- 消费确认:ActiveMQ支持自动和手动确认,手动确认需要消费者显式通知服务器消息已被处理。 6. **优化和最佳实践**: - 使用PooledConnectionFactory可以减少资源开销,提高性能。 - 对于高并发场景,考虑使用...
Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...
ActiveMQ作为消息代理,接收生产者发送的消息,并存储在消息队列中,等待消费者来消费。消息可以是点对点(Point-to-Point)模型,即每个消息只被一个消费者接收,也可以是发布/订阅(Publish/Subscribe)模型,其中...
2. **ActiveMQ消费者API**: 消费者是从消息队列接收消息的组件。消费者同样需要`ConnectionFactory`来创建`Connection`,然后创建`Session`。在`Session`中,消费者会创建`MessageConsumer`。当消息到达时,`...
- **Amq_Consumer.cpp**:消费者客户端的实现,负责接收和处理来自 ActiveMQ 服务器的消息。 - **time.cpp**:可能包含与时间相关的函数,如计时器或延迟发送等,用于消息处理的时间控制。 4. **跨平台兼容性**:...
在分布式系统中,ActiveMQ作为消息代理,负责接收、存储和转发消息,从而实现生产者与消费者之间的解耦。 生产者和消费者是JMS中的核心概念。生产者是发送消息的应用,而消费者则是接收这些消息的应用。在ActiveMQ...
springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用
**1.10 ActiveMQ消费者特性** ActiveMQ提供了丰富的消费者特性,包括但不限于消息预取机制、自动重新连接机制以及消息签收等。 **1.11 ActiveMQ消息预取机制** 消息预取是指消费者可以预先从Broker处获取一定数量...
Spring 提供了与 ActiveMQ 集成的便捷方式,使得在 Spring 应用中创建消息生产者和消费者变得简单。 **消息生产者**(Producer)是发送消息的组件,通常在业务处理完成后,将结果或者事件封装为消息发送到消息队列...
java整合activemq的demo,生产者和消费者两个方法。结合自带的工具http://192.168.1.106:8161。来查看消息传递情况
- **消息中间件**:ActiveMQ作为消息中间件,它的主要任务是接收、存储和转发消息,使得生产者和消费者可以解耦,提高系统的可扩展性和可靠性。 - **消息模型**:ActiveMQ支持多种消息模型,如点对点(Queue)和...
Queue是一种点对点的消息模型,每个消息仅被一个消费者接收;而Topic则遵循发布/订阅模型,一个消息可以被多个消费者接收。 首先,我们需要在项目中添加ActiveMQ的相关依赖。在`pom.xml`文件中,添加以下Maven依赖...
现在可以编写一个消费者程序来接收这个消息,或者通过ActiveMQ Web控制台检查队列中的消息。 6. **高级特性** ActiveMQ支持多种高级特性,如事务处理、消息持久化、优先级、时间戳等,可以根据需求进一步定制生产...
本文将详细介绍ActiveMQ在高并发环境下的优化策略,包括异常处理、连接池使用、消费者公平调度以及系统整体扩展等方面。 #### 二、高并发发送消息异常及其解决 ##### 现象描述 当使用多个线程(如10个)以一定频率...