`
shijian4810
  • 浏览: 20749 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

Spring 整合 RocketMQ

阅读更多

1. 引入jar包

复制代码
        <!-- RocketMQ -->
        <dependency>  
             <groupId>com.alibaba.rocketmq</groupId>  
             <artifactId>rocketmq-all</artifactId>  
             <version>3.2.6</version>  
             <type>pom</type>  
        </dependency>
        <dependency>  
             <groupId>com.alibaba.rocketmq</groupId>  
             <artifactId>rocketmq-client</artifactId>  
             <version>3.2.6</version>  
        </dependency>    
复制代码

2.Spring bean 配置单例

复制代码
  <bean id="myProducer" class="cn.zno.rocketmq.MyProducer"
            init-method="init"  
            destroy-method="destroy"
            scope="singleton">
      <property name="producerGroup" value="MyProducerGroup" />
      <property name="namesrvAddr" value="127.0.0.1:9876" />
    </bean>
    <bean class="cn.zno.rocketmq.MyConsumer" 
            init-method="init" 
            destroy-method="destroy"
            scope="singleton">
      <property name="consumerGroup" value="MyConsumerGroup" />
      <property name="namesrvAddr" value="127.0.0.1:9876" />
    </bean> 
复制代码

 

3. 自定义producer

复制代码
package cn.zno.rocketmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

public class MyProducer {

    private final Logger logger = LoggerFactory.getLogger(MyProducer.class);

    private DefaultMQProducer defaultMQProducer;
    private String producerGroup;
    private String namesrvAddr;

    /**
     * Spring bean init-method
     */
    public void init() throws MQClientException {
        // 参数信息
        logger.info("DefaultMQProducer initialize!");
        logger.info(producerGroup);
        logger.info(namesrvAddr);

        // 初始化
        defaultMQProducer = new DefaultMQProducer(producerGroup);
        defaultMQProducer.setNamesrvAddr(namesrvAddr);
        defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
        
        defaultMQProducer.start();

     logger.info("DefaultMQProudcer start success!");

    }

    /**
     * Spring bean destroy-method
     */
    public void destroy() {
        defaultMQProducer.shutdown();
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return defaultMQProducer;
    }

    // ---------------setter -----------------

    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

}
复制代码

4. 自定义consumer

复制代码
package cn.zno.rocketmq;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class MyConsumer {

    private final Logger logger = LoggerFactory.getLogger(MyConsumer.class);

    private DefaultMQPushConsumer defaultMQPushConsumer;
    private String namesrvAddr;
    private String consumerGroup;

    /**
     * Spring bean init-method
     */
    public void init() throws InterruptedException, MQClientException {

        // 参数信息
        logger.info("DefaultMQPushConsumer initialize!");
        logger.info(consumerGroup);
        logger.info(namesrvAddr);

        // 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
        // 注意:ConsumerGroupName需要由应用来保证唯一
        defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
        defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
        defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));

        // 订阅指定MyTopic下tags等于MyTag

        defaultMQPushConsumer.subscribe("MyTopic", "MyTag");

        // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        // 如果非第一次启动,那么按照上次消费的位置继续消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 设置为集群消费(区别于广播消费)
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);

        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {

            // 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("MyTopic")) {
                    // TODO 执行Topic的消费逻辑
                    if (msg.getTags() != null && msg.getTags().equals("MyTag")) {
                        // TODO 执行Tag的消费
                    }
                }
                // 如果没有return success ,consumer会重新消费该消息,直到return success
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
        defaultMQPushConsumer.start();

        logger.info("DefaultMQPushConsumer start success!");
    }

    /**
     * Spring bean destroy-method
     */
    public void destroy() {
        defaultMQPushConsumer.shutdown();
    }

    // ----------------- setter --------------------

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

}
复制代码

 

5. 发消息

复制代码
  @Autowired
    private MyProducer myProducer;

    public void sendMessage() {
        Message msg = new Message("MyTopic", "MyTag", (JSONObject.fromObject(someMessage)).getBytes());
        SendResult sendResult = null;
        try {
            sendResult = myProducer.getDefaultMQProducer().send(msg);
        } catch (MQClientException e) {
            logger.error(e.getMessage() + String.valueOf(sendResult));
        }
        // 当消息发送失败时如何处理
        if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {
            // TODO
        }
    }
复制代码
分享到:
评论

相关推荐

    rocketmq-spring-rocketmq-spring-all-2.0.3_rocketmq_

    RocketMQ Spring是阿里巴巴开源的一款基于Apache RocketMQ的消息中间件与Spring框架深度整合的产品,它使得在Spring Boot项目中集成RocketMQ变得极其简便。RocketMQ是一款高性能、高可用、分布式的消息队列服务,常...

    springboot整合rocketmq源码

    【标题】:SpringBoot整合RocketMQ源码解析 在当今的微服务架构中,消息队列(Message Queue)已经成为解耦、异步处理以及提高系统可靠性的关键组件。本主题将深入探讨如何在SpringBoot应用中整合Apache RocketMQ,...

    spring+websocket整合

    在本文中,我们将深入探讨如何将WebSocket技术与Spring框架整合,以实现高效、实时的Web应用功能。 首先,Spring框架为WebSocket提供了一整套支持,包括基于STOMP(Simple Text Oriented Message Protocol)的抽象...

    springboot-rocketmq

    springboot整和rocketmq, 分别通过配置和xml两种方式实现整合.

    SpringBoot整合RocketMq,rocketMq

    **SpringBoot整合RocketMQ** SpringBoot是一款由Pivotal团队提供的快速开发框架,它基于Spring框架,简化了配置,使得开发者能够更快地构建独立的、生产级别的基于Spring的应用程序。RocketMQ是阿里巴巴开源的一款...

    springboot-rocketmq-demo.zip

    标题中的"springboot-rocketmq-demo.zip"表明这是一个关于Spring Boot整合RocketMQ的示例项目。RocketMQ是阿里巴巴开源的一款分布式消息中间件,而Spring Boot是基于Spring框架的高度集成了许多开发工具和配置的轻量...

    springmvc集成rocketmq

    rocketmq集成至springmvc,rocketmq快速上手,快速集成至原有项目进行开发

    Spring Boot优雅使用RocketMQ的方法实例

    主要给大家介绍了关于Spring Boot优雅使用RocketMQ的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring Boot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧

    rocketmq-spring.rar

    标题"rocketmq-spring.rar"表明这个压缩包包含了Spring Boot整合RocketMQ所需的jar包,这通常包括RocketMQ的客户端库和Spring Boot的适配器。这些jar包使得我们可以在Spring Boot的应用中无缝地使用RocketMQ的服务,...

    Tedu五阶段SpringBoot整合RocketMQ

    ### Tedu五阶段Spring Boot整合RocketMQ #### 一、概览 本篇文章将详细介绍如何在Spring Boot项目中集成RocketMQ消息中间件,并通过具体的代码示例来展示两种消息类型:顺序消息与事务消息的处理方式。此外,还将...

    springcloud-stream-rocketmq多topic示例代码

    springcloud-stream-rocketmq多topic示例代码

    SpringBoot整合rocketmq事务消息

    首先,让我们了解SpringBoot整合RocketMQ的基本步骤: 1. **添加依赖**:在SpringBoot的`pom.xml`文件中,我们需要引入RocketMQ的客户端依赖。这可以通过Maven仓库进行,具体依赖可以根据RocketMQ的最新版本来调整...

    springboot整合RocketMQ的实践代码(生产者、消费者的演示)

    springboot整合RocketMQ的实践代码

    springboot整合rocketmq

    SpringBoot整合RocketMQ是将流行的Java微服务框架Spring Boot与分布式消息中间件RocketMQ结合,以实现高效、可靠的异步通信和解耦。这个项目是一个适用于初学者的Maven工程,可以直接在IDEA中导入并运行,便于学习和...

    springboot系列教程(二十):springboot整合RocketMQ,实现请求异步处理

    springboot整合RocketMQ,实现请求异步处理

    spring-boot-starter-rocketmq

    自己写的一个spring-boot整合rocketmq的starter,以及一个用来测试的项目rocketmq-starter-test。 涉及以下知识的最佳实践: 1、自定义spring boot starter; 2、使用spring的事件传播机制实现bean与bean之间基于...

    RocketMQ代码demo.zip

    本示例将通过两个关键场景来展示如何在Java环境中整合RocketMQ:一是基于原生API的使用,二是结合SpringBoot和SpringCloudStream的集成。 首先,我们来看原生API的使用。在RocketMQ中,生产者(Producer)负责发送...

    RocketMq + Spring 示例

    总之,这个示例提供了在 Spring 应用中使用 RocketMQ 的基础结构和操作流程,对于理解如何在实际项目中整合这两个强大的工具非常有帮助。通过深入学习和实践,你可以更好地掌握分布式消息传递的原理和最佳实践。

    springboot整合RocketMQ,实现请求异步处理

    RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。 (2)、NameServer 消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。类似...

    若依框架整合RocketMQ,自带原码,只需要修改yaml文件中的数据库连接配置,就可以直接启动

    RuoYi是一个基于Java技术开发的后台管理系统,基于技术组合(SpringBoot+Vue),内置模块有:部门管理、角色用户、菜单即按钮授权、数据权限、系统参数、日志管理、代码生成、表单构建等。支持多数据源、支持分布式...

Global site tag (gtag.js) - Google Analytics