`

activemq消费者

阅读更多

java代码:

 

package com.yanzhi.system;

import com.yanzhi.tools.C;
import com.yanzhi.tools.Global;
import com.yanzhi.tools.StringUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.connection.SingleConnectionFactory;

import javax.jms.*;

import java.sql.Timestamp;
import java.util.List;

/**
 * Created by xiaoyunlian on 2016/2/14.
 * activemq消费者实例
 */
public class ConsumerApp implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApp.class);

    public static void start(){
       try {
           //连接
           ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory();
           targetConnectionFactory.setBrokerURL(Global.getBrokerURL());
           targetConnectionFactory.setTrustAllPackages(true);
           SingleConnectionFactory connectionFactory = new SingleConnectionFactory();
           connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接
           Connection connection = connectionFactory.createConnection();
           connection.start();
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           //队列
           ActiveMQQueue yanzhiQueueDestination = (ActiveMQQueue) session.createQueue("testQueue1,testQueue2,testQueue3");
           ConsumerApp consumerMessageListener = new ConsumerApp();

           MessageConsumer consumer = session.createConsumer(yanzhiQueueDestination);
           consumer.setMessageListener(consumerMessageListener);
           System.err.println("~~~~~~~~~~~~~~~~~~~~~~ active MQ 消费者监听器 启动成功~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
       }catch (Exception e){
           e.printStackTrace();
       }

    }

    @Override
    public void onMessage(Message msg) {
        try {
            Destination dest = msg.getJMSDestination();
            if (msg instanceof TextMessage) {
                ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)msg;
                String name = activeMQTextMessage.getDestination().getPhysicalName();
                TextMessage message = (TextMessage) msg;
                System.err.println("队列:"+name+"接收者接到一个String消息:"+message.getText());
            } else if (msg instanceof MapMessage) {
                ActiveMQMapMessage activeMQMapMessage = (ActiveMQMapMessage) msg;
                String destinationName = activeMQMapMessage.getDestination().getPhysicalName();
            } else if (msg instanceof StreamMessage) {
                StreamMessage message = (StreamMessage) msg;
                System.out.println("------Received StreamMessage------");
                System.out.println(message.readString());
                System.out.println(message.readBoolean());
                System.out.println(message.readLong());
            } else if (msg instanceof ObjectMessage) {
                ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage)msg;
                String destinationName = activeMQObjectMessage.getDestination().getPhysicalName();
                if (Global.getPkQuene().equals(destinationName)){
                    ObjectMessage objectMessage = (ObjectMessage) msg;
                    Object object = objectMessage.getObject();
                    if (object instanceof List){
                        // do something
                    }
                    
                }
                if (Global.getFaceValueReportQuene().equals(destinationName)){
                    // do something
                }
                if (Global.getRegUserQuene().equals(destinationName)){
                    // do something
                }
            } else if (msg instanceof BytesMessage) {
                System.out.println("------Received BytesMessage------");
                BytesMessage message = (BytesMessage) msg;
                byte[] byteContent = new byte[1024];
                int length = -1;
                StringBuffer content = new StringBuffer();
                while ((length = message.readBytes(byteContent)) != -1) {
                    content.append(new String(byteContent, 0, length));
                }
                System.out.println(content.toString());
            } else {
                System.out.println(msg);
            }
        } catch (JMSException e) {
            LOGGER.error("error {}", e);
        }
    }
}

调用上面的start()方法,即可启动消息队列的消费者。 

 

0
5
分享到:
评论

相关推荐

    springboot整合activemq 消费者 ACK手动确认 &消息重发

    springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。

    动态创建ActiveMQ消费者

    在这个场景下,我们将探讨如何在Java环境中动态创建ActiveMQ消费者。 动态创建ActiveMQ消费者意味着程序可以在运行时根据需求创建或销毁消费者,而不是在编译时静态配置。这种方式提供了更大的灵活性,可以应对系统...

    基于SpringBoot的ActiveMQ生产者/消费者

    在本项目中,我们探讨的是如何使用SpringBoot集成Apache ActiveMQ来构建一个生产者和消费者的应用。SpringBoot以其简洁的配置和快速启动特性,成为现代Java应用开发的首选框架之一,而ActiveMQ则是流行的消息中间件...

    ActiveMQ整合Spring(多消费者)

    在ActiveMQ中,可以通过设置消费者的订阅类型(Durable Subscription或Shared Subscription)来实现消息的多消费者分发策略。 6. **事务管理**: Spring与ActiveMQ整合时,还可以支持JMS事务,确保消息的一致性。`...

    activemq生产者和消费者案例代码.zip

    本案例代码包含了一个基本的ActiveMQ生产者和消费者的应用示例,帮助开发者理解如何使用ActiveMQ进行消息传递。 1. **JMS(Java Message Service)简介** JMS是Java平台上的一个标准API,它定义了生产、发送、接收...

    am_spring_consumer:ActiveMQ 消费者实例

    本实例主要关注如何在Spring框架中配置和使用ActiveMQ消费者。 首先,我们来理解`am_spring_consumer`项目的基本结构。这个项目很可能包含了一个Spring配置文件(如`applicationContext.xml`),用于定义与ActiveMQ...

    ActiveMQ的队列、topic模式

    - 消费确认:ActiveMQ支持自动和手动确认,手动确认需要消费者显式通知服务器消息已被处理。 6. **优化和最佳实践**: - 使用PooledConnectionFactory可以减少资源开销,提高性能。 - 对于高并发场景,考虑使用...

    spring 整合 activemq 生产者和消费者 案例源码

    Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...

    ActiveMQ之C++与C#通信

    ActiveMQ作为消息代理,接收生产者发送的消息,并存储在消息队列中,等待消费者来消费。消息可以是点对点(Point-to-Point)模型,即每个消息只被一个消费者接收,也可以是发布/订阅(Publish/Subscribe)模型,其中...

    activemq_activemq_doublezoo_源码

    2. **ActiveMQ消费者API**: 消费者是从消息队列接收消息的组件。消费者同样需要`ConnectionFactory`来创建`Connection`,然后创建`Session`。在`Session`中,消费者会创建`MessageConsumer`。当消息到达时,`...

    自己实现的 ActiveMQ 多线程客户端 包含生产消息客户端和消费者消息客户端

    - **Amq_Consumer.cpp**:消费者客户端的实现,负责接收和处理来自 ActiveMQ 服务器的消息。 - **time.cpp**:可能包含与时间相关的函数,如计时器或延迟发送等,用于消息处理的时间控制。 4. **跨平台兼容性**:...

    activeMQ生产者和消费者代码

    在分布式系统中,ActiveMQ作为消息代理,负责接收、存储和转发消息,从而实现生产者与消费者之间的解耦。 生产者和消费者是JMS中的核心概念。生产者是发送消息的应用,而消费者则是接收这些消息的应用。在ActiveMQ...

    springboot整合activemq 生产者 一对一,一对多

    springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用

    ActiveMQ开发教程

    **1.10 ActiveMQ消费者特性** ActiveMQ提供了丰富的消费者特性,包括但不限于消息预取机制、自动重新连接机制以及消息签收等。 **1.11 ActiveMQ消息预取机制** 消息预取是指消费者可以预先从Broker处获取一定数量...

    activeMQ 详细教程与源码(包含消费者与生产者)

    Spring 提供了与 ActiveMQ 集成的便捷方式,使得在 Spring 应用中创建消息生产者和消费者变得简单。 **消息生产者**(Producer)是发送消息的组件,通常在业务处理完成后,将结果或者事件封装为消息发送到消息队列...

    activemq生产消费的Demo

    java整合activemq的demo,生产者和消费者两个方法。结合自带的工具http://192.168.1.106:8161。来查看消息传递情况

    activemq实战项目,同ssh框架整合(生产者+消费者)

    - **消息中间件**:ActiveMQ作为消息中间件,它的主要任务是接收、存储和转发消息,使得生产者和消费者可以解耦,提高系统的可扩展性和可靠性。 - **消息模型**:ActiveMQ支持多种消息模型,如点对点(Queue)和...

    springboot整合activemq案例

    Queue是一种点对点的消息模型,每个消息仅被一个消费者接收;而Topic则遵循发布/订阅模型,一个消息可以被多个消费者接收。 首先,我们需要在项目中添加ActiveMQ的相关依赖。在`pom.xml`文件中,添加以下Maven依赖...

    ActiveMQ生产者

    现在可以编写一个消费者程序来接收这个消息,或者通过ActiveMQ Web控制台检查队列中的消息。 6. **高级特性** ActiveMQ支持多种高级特性,如事务处理、消息持久化、优先级、时间戳等,可以根据需求进一步定制生产...

    ActiveMQ高并发处理方案

    本文将详细介绍ActiveMQ在高并发环境下的优化策略,包括异常处理、连接池使用、消费者公平调度以及系统整体扩展等方面。 #### 二、高并发发送消息异常及其解决 ##### 现象描述 当使用多个线程(如10个)以一定频率...

Global site tag (gtag.js) - Google Analytics