`
m635674608
  • 浏览: 5028462 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

rocketmq 延迟队列的实现

 
阅读更多

流程描述:

 

1. producer发消息,设置一个延迟level值. 

 

“设置消息延时 10s 消费”的 Producer 端代码如下:

 

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

 

[java] view plain copy在CODE上查看代码片派生到我的代码片
  1. Message msg = newMessage(topic, tags, keys, body);  
  2. msg.setDelayTimeLevel(3);  
  3. ...  
  4. SendResult sendResult = getMQProducer().send(msg);  

 

2. broker 保存消息时替换了topic,和queueId(一个level计算得到一个queueId,并将实际的topic和queueId作为properties保存).

 

3. broker有定时任务(其实是个consumer)消费延迟消息,如果到达延迟时间,将消息取出,改回原来的topic和queueId,放入到commitLog中,然后被真正的消费者.

 

疑难点:

 

 问: 如何保证rocketMq的offset移动和延迟消息不冲突?

 

 答: rocketMq当消息真正要消费的时候才把消息放到对应的topic中. 中间先保存到其他地方(利用原有的存储,消费机制,自然是一个topic). rocketMq很巧妙的将用户配置的level和queueId进行了一对一映射. 这样就能保证同一个queue下的消息肯定是顺序消费的. 

 

 

 

代码细节:

 

1. producer 发 消息,设置一个level值.

 

服务端MessageStoreConfig.messageDelayLevel 默认值是

 

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

 


2. 服务端接受到消息后
MessageConst.PROPERTY_REAL_TOPI
commitLog.putMessage(MessageExtBrokerInner)里,会把配置了延迟level的消息,存到
ScheduleMessageService.SCHEDULE_TOPIC(值为SCHEDULE_TOPIC_XXXX) ,
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());中
真正的topic和queueId作为msg暂时存起来.
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID,
    String.valueOf(msg.getQueueId()));
3.有个定时任务模拟消费者消费该queue
ScheduleMessageService.DeliverDelayedMessageTimerTask 内,判断是否可消息,可以就取出消息,将topic和quueeId还原,放到commitLog中
PutMessageResult putMessageResult =
        ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
//
其中
private static final long FIRST_DELAY_TIME = 1000L; //定时任务第一次启动时延迟时间
private static final long DELAY_FOR_A_WHILE = 100L;  // 死循环轮训时延迟时间. 又生成一个task,这样避免很多线程一直在执行.比较好的死循环策略.
private static final long DELAY_FOR_A_PERIOD = 10000L; // put到commitLog出错时的延迟时间
 

 

 

参考文献:http://www.tuicool.com/articles/aU7JRz7
       主要根据该文章的关键字去看对应的源代码,https://github.com/alibaba/RocketMQ

http://blog.csdn.net/fei33423/article/details/51189430

 

分享到:
评论

相关推荐

    RocketMQ消息队列demo

    这个demo可能是通过一个简单的客户端应用,允许用户输入服务器的IP地址和端口号,然后就能与RocketMQ服务器进行交互,实现消息的发布和订阅功能。 1. ** RocketMQ的基本概念 ** - **主题(Topic)**:主题是消息...

    互联网延迟队列解决方案设计.pdf

    Redis具有轻量级、高性能、支持持久化以及易于水平扩展的特点,非常适合用作延迟队列系统的实现基础。利用Redis的发布订阅模型和数据结构(如List、Zset等),可以高效地实现延迟队列功能。 延迟队列系统的设计应该...

    RocketMq消息队列实施计划方案_.doc

    RocketMQ是一款针对大规模分布式系统的高吞吐量、低延迟、高可用的消息中间件,提供丰富的消息类型和强大的管理控制台。 5. **RockerMQ集群部署** - **单个Master**:简单但不冗余,服务中断风险高。 - **多...

    阿里 Rocketmq4.2 安装包下载

    - **发布/订阅模型**:RocketMQ支持发布者将消息发送到主题,而多个订阅者可以订阅同一个主题,实现一对多的消息分发。 - **顺序消息**:在特定的Topic下,RocketMQ能够保证消息的严格顺序,这对于处理交易流水等...

    RocketMQ使用与实现.pdf

    - **顺序模式**:确保消息按照指定顺序发送到同一个队列中,实现方式是通过单个队列实现。 - **事务模式**: - **Prepared消息发送**:发送预处理消息,在本地事务提交前完成。 - **本地事务执行**:根据业务逻辑...

    Python RocketMQ

    **Python RocketMQ** 在分布式系统中,消息队列(Message Queue)扮演着至关重要的角色,它能够有效地解耦系统组件,...通过`jpype1`库,我们可以在Python环境中利用RocketMQ的强大功能,实现高效的消息传递和处理。

    RocketMQ技术内幕:RocketMQ架构设计与实现原理.docx

    RocketMQ技术内幕:RocketMQ架构设计与实现原理 本文概述了RocketMQ的技术内幕,涵盖了其架构设计、实现原理、性能评估和应用展望等多个方面。RocketMQ是一款开源的分布式消息中间件,由阿里巴巴团队开发并开源,...

    rocketmq-dashboard.zip

    4. **延迟消息**:RocketMQ提供延迟消息功能,允许消息在特定时间后才被消费,适用于定时任务或者事务补偿场景。 5. **分布式事务**:RocketMQ支持分布式事务,通过半消息和全局顺序消息实现事务一致性。 6. **...

    rocketmq安装包及RocketMQ 控制台JAR包

    总结来说,RocketMQ是一个强大的消息中间件,通过安装`rocketmq4.9.2.zip`和运行`rocketmq-dashboard-1.0.1-SNAPSHOT.jar`,我们可以实现对RocketMQ实例的管理和监控,更好地理解和优化我们的分布式系统。...

    商城秒杀项目,基于springboot开发,使用redis和rocketmq技术增加高并发能力

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,它具有高吞吐量、低延迟和高可用性的特点,适合处理大规模并发消息。在秒杀项目中,RocketMQ主要扮演以下角色: - 异步处理:将秒杀请求作为消息发送到RocketMQ,...

    rocketmq可视化界面,rocketmq-console

    通过修改配置文件中的集群IP地址,你可以将这个控制台连接到你的RocketMQ集群,从而实现对生产者、消费者、主题(Topic)和队列(Queue)等关键元素的实时监控和管理。这对于调试、优化以及故障排查来说至关重要。 ...

    RocketMQ学习笔记 1

    RocketMQ有多种优点,如高性能、低延迟、高可用性、可扩展性等。它支持JMS规范,提供了丰富的API和工具来帮助开发者快速构建消息队列系统。 9. RocketMQ的安装和配置 RocketMQ可以在多种操作系统上安装和配置,如...

    微服务SpringBoot整合Redis基于Redis的Stream消息队列实现异步秒杀下单

    总结,本文深入介绍了如何利用SpringBoot和Redis Stream构建一个高效率、低延迟的异步秒杀系统,通过消息队列实现了生产者和消费者间的解耦,提高了系统的可靠性和可扩展性。对于需要处理大量并发请求的微服务应用,...

    rocketmq安装包,rocketmq-all-5.1.3-bin-release

    11. **延时消息与定时消息**:RocketMQ允许Producer设置消息的延迟级别或指定精确的消费时间,使得消息在指定时间后才被消费,这对于实现某些特定业务逻辑非常有用。 12. **消息幂等性**:为了防止重复消费同一消息...

    RocketMQ技术内幕.rar

    RocketMQ作为一款高效、稳定、可扩展的消息队列服务,广泛应用于大数据处理、实时计算、微服务架构等领域。以下是根据书名和描述所涵盖的一些关键知识点的详细说明: 一、RocketMQ简介 RocketMQ起源于阿里巴巴内部...

    rocketmq32位动态库

    1. **消息队列管理**:RocketMQ允许开发者创建、管理和删除消息队列,以实现异步通信和解耦应用程序。 2. **消息发送与接收**:通过调用动态库中的API,开发者可以方便地向消息队列发送消息,并从队列中消费消息。...

    rocketmq-5.1.3.zip

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大数据、实时计算、微服务等领域,具有高可用、高并发、低延迟等特性。RocketMQ 5.1.3是该产品的一个重要版本,提供了诸多增强和优化,旨在提高性能和稳定...

    RocketMq快速入门文档

    RocketMQ是一款由阿里巴巴开源并捐赠给Apache基金会的分布式消息中间件,源自阿里的MetaQ项目,具有高性能、高可用、低延迟等特性,尤其在处理大规模消息流转方面表现出色。RocketMQ支持十万级别的数据吞吐量,处理...

    RocketMQ学习笔记1

    RocketMQ可以实现消息分发,生产者可以将消息发送到队列中,消费者可以根据需求订阅感兴趣的消息,实现解耦。 消息一致性 RocketMQ可以保证消息的一致性,确保消息的可靠传输。 动态扩容 RocketMQ可以实现动态扩...

    rocketMq可视化界面

    在项目文件"rocketmq-dashboard-master"中,包含了实现RocketMQ可视化界面的所有源代码。开发人员可以研究这些源码,学习如何与RocketMQ服务进行交互,获取和展示相关数据。同时,由于项目是基于Spring Boot构建的,...

Global site tag (gtag.js) - Google Analytics