1. 引入jar包
<!-- RocketMQ --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.2.6</version> <type>pom</type> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency>
2.Spring bean 配置单例
<bean id="myProducer" class="cn.zno.rocketmq.MyProducer" init-method="init" destroy-method="destroy" scope="singleton"> <property name="producerGroup" value="MyProducerGroup" /> <property name="namesrvAddr" value="127.0.0.1:9876" /> </bean> <bean class="cn.zno.rocketmq.MyConsumer" init-method="init" destroy-method="destroy" scope="singleton"> <property name="consumerGroup" value="MyConsumerGroup" /> <property name="namesrvAddr" value="127.0.0.1:9876" /> </bean>
3. 自定义producer
package cn.zno.rocketmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; public class MyProducer { private final Logger logger = LoggerFactory.getLogger(MyProducer.class); private DefaultMQProducer defaultMQProducer; private String producerGroup; private String namesrvAddr; /** * Spring bean init-method */ public void init() throws MQClientException { // 参数信息 logger.info("DefaultMQProducer initialize!"); logger.info(producerGroup); logger.info(namesrvAddr); // 初始化 defaultMQProducer = new DefaultMQProducer(producerGroup); defaultMQProducer.setNamesrvAddr(namesrvAddr); defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis())); defaultMQProducer.start(); logger.info("DefaultMQProudcer start success!"); } /** * Spring bean destroy-method */ public void destroy() { defaultMQProducer.shutdown(); } public DefaultMQProducer getDefaultMQProducer() { return defaultMQProducer; } // ---------------setter ----------------- public void setProducerGroup(String producerGroup) { this.producerGroup = producerGroup; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } }
4. 自定义consumer
package cn.zno.rocketmq; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class MyConsumer { private final Logger logger = LoggerFactory.getLogger(MyConsumer.class); private DefaultMQPushConsumer defaultMQPushConsumer; private String namesrvAddr; private String consumerGroup; /** * Spring bean init-method */ public void init() throws InterruptedException, MQClientException { // 参数信息 logger.info("DefaultMQPushConsumer initialize!"); logger.info(consumerGroup); logger.info(namesrvAddr); // 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br> // 注意:ConsumerGroupName需要由应用来保证唯一 defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup); defaultMQPushConsumer.setNamesrvAddr(namesrvAddr); defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis())); // 订阅指定MyTopic下tags等于MyTag defaultMQPushConsumer.subscribe("MyTopic", "MyTag"); // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> // 如果非第一次启动,那么按照上次消费的位置继续消费 defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 设置为集群消费(区别于广播消费) defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING); defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); if (msg.getTopic().equals("MyTopic")) { // TODO 执行Topic的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("MyTag")) { // TODO 执行Tag的消费 } } // 如果没有return success ,consumer会重新消费该消息,直到return success return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> defaultMQPushConsumer.start(); logger.info("DefaultMQPushConsumer start success!"); } /** * Spring bean destroy-method */ public void destroy() { defaultMQPushConsumer.shutdown(); } // ----------------- setter -------------------- public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } }
5. 发消息
@Autowired private MyProducer myProducer; public void sendMessage() { Message msg = new Message("MyTopic", "MyTag", (JSONObject.fromObject(someMessage)).getBytes()); SendResult sendResult = null; try { sendResult = myProducer.getDefaultMQProducer().send(msg); } catch (MQClientException e) { logger.error(e.getMessage() + String.valueOf(sendResult)); } // 当消息发送失败时如何处理 if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) { // TODO } }
相关推荐
RocketMQ Spring是阿里巴巴开源的一款基于Apache RocketMQ的消息中间件与Spring框架深度整合的产品,它使得在Spring Boot项目中集成RocketMQ变得极其简便。RocketMQ是一款高性能、高可用、分布式的消息队列服务,常...
【标题】:SpringBoot整合RocketMQ源码解析 在当今的微服务架构中,消息队列(Message Queue)已经成为解耦、异步处理以及提高系统可靠性的关键组件。本主题将深入探讨如何在SpringBoot应用中整合Apache RocketMQ,...
在本文中,我们将深入探讨如何将WebSocket技术与Spring框架整合,以实现高效、实时的Web应用功能。 首先,Spring框架为WebSocket提供了一整套支持,包括基于STOMP(Simple Text Oriented Message Protocol)的抽象...
springboot整和rocketmq, 分别通过配置和xml两种方式实现整合.
**SpringBoot整合RocketMQ** SpringBoot是一款由Pivotal团队提供的快速开发框架,它基于Spring框架,简化了配置,使得开发者能够更快地构建独立的、生产级别的基于Spring的应用程序。RocketMQ是阿里巴巴开源的一款...
标题中的"springboot-rocketmq-demo.zip"表明这是一个关于Spring Boot整合RocketMQ的示例项目。RocketMQ是阿里巴巴开源的一款分布式消息中间件,而Spring Boot是基于Spring框架的高度集成了许多开发工具和配置的轻量...
rocketmq集成至springmvc,rocketmq快速上手,快速集成至原有项目进行开发
主要给大家介绍了关于Spring Boot优雅使用RocketMQ的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring Boot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
标题"rocketmq-spring.rar"表明这个压缩包包含了Spring Boot整合RocketMQ所需的jar包,这通常包括RocketMQ的客户端库和Spring Boot的适配器。这些jar包使得我们可以在Spring Boot的应用中无缝地使用RocketMQ的服务,...
### Tedu五阶段Spring Boot整合RocketMQ #### 一、概览 本篇文章将详细介绍如何在Spring Boot项目中集成RocketMQ消息中间件,并通过具体的代码示例来展示两种消息类型:顺序消息与事务消息的处理方式。此外,还将...
springcloud-stream-rocketmq多topic示例代码
首先,让我们了解SpringBoot整合RocketMQ的基本步骤: 1. **添加依赖**:在SpringBoot的`pom.xml`文件中,我们需要引入RocketMQ的客户端依赖。这可以通过Maven仓库进行,具体依赖可以根据RocketMQ的最新版本来调整...
springboot整合RocketMQ的实践代码
SpringBoot整合RocketMQ是将流行的Java微服务框架Spring Boot与分布式消息中间件RocketMQ结合,以实现高效、可靠的异步通信和解耦。这个项目是一个适用于初学者的Maven工程,可以直接在IDEA中导入并运行,便于学习和...
springboot整合RocketMQ,实现请求异步处理
自己写的一个spring-boot整合rocketmq的starter,以及一个用来测试的项目rocketmq-starter-test。 涉及以下知识的最佳实践: 1、自定义spring boot starter; 2、使用spring的事件传播机制实现bean与bean之间基于...
本示例将通过两个关键场景来展示如何在Java环境中整合RocketMQ:一是基于原生API的使用,二是结合SpringBoot和SpringCloudStream的集成。 首先,我们来看原生API的使用。在RocketMQ中,生产者(Producer)负责发送...
总之,这个示例提供了在 Spring 应用中使用 RocketMQ 的基础结构和操作流程,对于理解如何在实际项目中整合这两个强大的工具非常有帮助。通过深入学习和实践,你可以更好地掌握分布式消息传递的原理和最佳实践。
RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。 (2)、NameServer 消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。类似...
RuoYi是一个基于Java技术开发的后台管理系统,基于技术组合(SpringBoot+Vue),内置模块有:部门管理、角色用户、菜单即按钮授权、数据权限、系统参数、日志管理、代码生成、表单构建等。支持多数据源、支持分布式...