`
357029540
  • 浏览: 738515 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

springboot实现rabbit mq的持久化和消息确认

阅读更多

首先实现生产者发送消息和队列的持久化,这部分摘抄自http://blog.720ui.com/2017/rabbitmq_action_durable/

 

要从奔溃的 RabbitMQ 中恢复的消息,我们需要做消息持久化。如果消息要从 RabbitMQ 奔溃中恢复,那么必须满足三点,且三者缺一不可。

  • 交换器必须是持久化。
  • 队列必须是持久化的。
  • 消息必须是持久化的。

原生的实现方式

原生的 RabbitMQ 客户端需要完成三个步骤。

第一步,交换器的持久化。

// 参数1 exchange :交换器名
// 参数2 type :交换器类型
// 参数3 durable :是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

 第二步,队列的持久化。

// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

 第三步,消息的持久化。

// 参数1 exchange :交换器
// 参数2 routingKey : 路由键
// 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
// 参数4 body : 消息体
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

 

 

Spring AMQP 的实现方式

Spring AMQP 是对原生的 RabbitMQ 客户端的封装。一般情况下,我们只需要定义交换器的持久化和队列的持久化。

其中,交换器的持久化配置如下。

// 参数1 name :交互器名
// 参数2 durable :是否持久化
// 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列
new TopicExchange(name, durable, autoDelete)

 此外,还需要再配置队列的持久化。

// 参数1 name :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
new Queue(name, durable, exclusive, autoDelete);

 至此,RabbitMQ 的消息持久化配置完毕。

 

那么,消息的持久化难道不需要配置么?确实如此,我们来看下源码。

一般情况下,我们会通过这种方式发送消息。

rabbitTemplate.convertAndSend(exchange, routeKey, message);

 其中,调用了 convertAndSend(String exchange, String routingKey, final Object object) 方法。

@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
    convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}

 接着,用调用了 convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) 方法。

public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
        send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }

 此时,最关键的方法出现了,它是 convertMessageIfNecessary(final Object object)。

protected Message convertMessageIfNecessary(final Object object) {
    if (object instanceof Message) {
        return (Message) object;
    }
    return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}

 其中,关键的是 MessageProperties 类,它持久化的策略是 MessageDeliveryMode.PERSISTENT,因此它会初始化时默认消息是持久化的。

public class MessageProperties implements Serializable {
    public MessageProperties() {
        this.deliveryMode = DEFAULT_DELIVERY_MODE;
        this.priority = DEFAULT_PRIORITY;
    }
 
    static {
        DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
        DEFAULT_PRIORITY = Integer.valueOf(0);
    }
}

 

在消费者端,我这里采用的是默认的exchange,所以有很多配置没有使用,可以参考其他的

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

@Component
@RabbitListener(queues = "rotork_websocketQueue")
@Slf4j
public class WebsocketConsumer {

    @Resource
    private SimpMessagingTemplate simpMessagingTemplate;

    @Resource
    private Executor poolTaskExecutor;

    @RabbitHandler
    public void send(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag){
        CompletableFuture.runAsync(() ->{
                    try {
                        simpMessagingTemplate.convertAndSend("/topic/msg", msg);
                        //不需要重新发送消息
                        channel.basicAck(tag,false);
                    }catch (Exception e){
                        try {
                            // 消费失败,重新发送消息
                            channel.basicNack(tag, false, true);
                        } catch (IOException ioe) {
                            log.error("websocket消费重新获取消费消息错误:{}", ioe);
                        }
                        throw new RuntimeException(e);
                    }
                }, poolTaskExecutor).exceptionally(e -> {
            log.error("websocket消费消息失败", e);
            throw new RuntimeException(e);
        });
    }
}

 

 

分享到:
评论

相关推荐

    java基于SpringBoot+RabbitMQ用户注册实现异步发送验证码源码.zip

    基于SpringBoot+RabbitMQ用户注册实现异步发送验证码源码。基于SpringBoot+RabbitMQ用户注册实现异步发送验证码源码。基于SpringBoot+RabbitMQ用户注册实现异步发送验证码源码。...基于SpringBoot+Rabbit

    springboot + rabbit + hikari

    在"springboot-rabbit"这个项目中,我们可以预期找到以下文件结构和内容: 1. `pom.xml`:包含Spring Boot、Spring AMQP和HikariCP相关的依赖。 2. `application.properties`/`application.yml`:配置RabbitMQ...

    springboot整合rabbit+redis+mysql实现用户购买商品时提交订单后的过程

    springboot整合rabbit+redis+mysql实现用户购买商品时提交订单后的过程; 用户提交后,生成订单,用户有XX分钟时间完成支付 如果用户XX分钟内完成支付,则更新订单为已支付,仓库扣库存,发送短信 如果用户XX分钟后...

    springboot整合rabbit_Demo.zip

    通过整合这两者,我们可以利用消息队列的优势,实现应用之间的异步通信和解耦。 首先,要开始这个整合过程,你需要确保已经安装并运行了RabbitMQ服务器。可以从官方网站下载适合你操作系统的版本,并按照官方文档的...

    springboot Rabbit死信队列实现,rocketMq重试消息实现

    springboot Rabbit死信队列实现,rocketMq重试消息实现 基于springboot2.15版本,最新rabbit和rocktMq 中间件实例,亲测可用

    SpringBoot+WebSocket+RabbitMQ实时消息推送

    rabbitmq+websocket(SpringBoot版)实现分布式消息推送 本来想用websocket做一个消息推送 可是分布式环境下不支持session共享因为服务器不同 所以采用 rabbitMQ+webSocket实现分布式消息推送 生产者将消息 发送给 ...

    RabbitMQ实战-多线程-springboot-rabbit.zip

    在本项目"RabbitMQ实战-多线程-springboot-rabbit.zip"中,我们将深入探讨如何使用RabbitMQ作为消息中间件,结合Spring Boot框架和多线程技术来实现高效、可扩展的分布式系统。Spring Boot简化了RabbitMQ的集成,而...

    rabbit mq server windows 64

    rabbit mq windows 64 较新,很实用,消息中间件服务端。

    rabbit_mq的demo

    在IT行业中,消息队列(MQ)是一种常用于应用程序间通信的技术,它允许不同系统之间异步传递消息,从而解耦各个组件,提高系统的可扩展性和可靠性。RabbitMQ是其中一个广泛使用的开源消息代理和队列服务器,它实现了...

    简单封装spring-rabbit实现mq组件化

    本篇文章将围绕"简单封装spring-rabbit实现mq组件化"这一主题,探讨如何通过Spring框架和RabbitMQ来构建可复用的MQ组件。 首先,我们需要了解Spring框架的Spring AMQP模块,这是Spring对RabbitMQ的支持。它提供了...

    SpringBoot使用Rabbit详解含完整代码

    我们将会从基础概念出发,逐步深入到具体实践,包括如何配置 Spring Boot 项目以连接和操作 RabbitMQ,如何创建和管理消息队列、发布和订阅消息,以及处理消息确认和异常等高级特性。 #### 二、RabbitMQ 基础知识 ...

    springboot集成rabbitmq的简单使用

    springboot集成rabbitmq的简单使用,介绍了springboot集成rabbitmq的使用,利用的交换机、队列、路由key来实现的例子

    spring boot 集成rabbit mq 成功demo

    spring boot 集成rabbit mq 成功demo,spring boot 集成rabbit mq 成功demo

    RabbitMQ连接池+SpringBoot实现

    RabbitMQ连接池+SpringBoot实现。通过连接池实现将高效的管理RabbitMQ的Connection,并与springboot进行整合,实现消息发送,获取队列列表等功能。基于此可以进行更多功能的扩充。

    rabbit mq server 常用命令

    rabbit mq server 常用命令 rabbit mq server 常用命令 rabbit mq server 常用命令

    rabbit mq的demo更新

    在这个“rabbit mq的demo更新”中,我们将探讨如何通过编程实现与RabbitMQ的连接,发送和接收消息,以及如何配置消息的持久化和客户端订阅。 首先,连接RabbitMQ通常需要一个客户端库,如Java的`rabbitmq-amqp-...

    rabbit mq入门例子

    3. 持久化:对于重要的消息,可以设置队列和消息的持久化,保证在服务器重启后不丢失数据。 4. 高可用性:通过RabbitMQ集群提高服务的可用性和容错性。 总的来说,RabbitMQ是一个强大的消息中间件,它允许应用程序...

    rabbit mq demo spring java

    docker 安裝 rabbit mq 並測試 http://knight-black-bob.iteye.com/blog/2395713

    springboot redis zookeeperlock rabbit实现的分布式锁

    本项目基于SpringBoot框架,结合Redis、Zookeeper和RabbitMQ实现了分布式锁,让我们一起深入探讨这三个组件在分布式锁中的作用和实现机制。 **SpringBoot** 是一个轻量级的Java开发框架,它简化了新Spring应用的...

    rabbit-mq-ack-direct-consumer.zip

    本资料包"rabbit-mq-ack-direct-consumer.zip"包含的是关于RabbitMQ中消费者实现的代码示例,特别关注了消息确认(Message Acknowledgement)和Direct交换机模式。 首先,我们来理解RabbitMQ中的消息确认机制。在...

Global site tag (gtag.js) - Google Analytics