`
hanzhdy
  • 浏览: 28574 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ延迟消息的配置与使用

 
阅读更多
RocketMQ支持延迟/定时消息,但并不支持任意的时间精度,而是支持特定的level,例如5s,10s,1m等。其中level=0表示不延时,level=1表示1级延时,level=2表示2级延时,以此类推。

延迟级别配置
在rocketmq的broker端的属性配置文件中加入以下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

描述了各级别与延时时间的对应映射关系。
1、这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
2、时间单位支持:s、m、h、d,分别表示秒、分、时、天;
3、默认值就是上面声明的,可手工调整


可能存在的问题:在conf/broker.conf中添加messageDelayLevel配置,并修改为自定义的延时级别。测试时发现实际延迟时间仍按默认的配置生效

解决:如borker在启动时未指定配置文件路径,则使用默认配置,而不是使用conf/broker.conf。可以在启动命令中增加-c指定配置文件路径,如:
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true -c conf/broker.conf &

消息发送
只需要在生产者客户端中设置待发送消息的延时级别即可发送延时消息。

注意:如果项目中引入了spring-boot-starter-rocketmq,那么无法直接使用RocketMQTemplate设置延时级别。
可参考如下代码:

EventDTO eventDTO = new EventDTO();
// 设置eventDTO相关属性
String jsonstr = rocketMQTemplate.getObjectMapper().writeValueAsString(eventDTO);
byte[] bytes = jsonstr.getBytes(Charset.forName("UTF-8"));

DefaultMQProducer producer = rocketMQTemplate.getProducer();
org.apache.rocketmq.common.message.Message message = 
  new org.apache.rocketmq.common.message.Message("topic", "", bytes);
// 设置延迟级别
message.setDelayTimeLevel(4);
producer.send(message, (long)producer.getSendMsgTimeout());


扩展
在RocketMQ中,消息体存储格式为byte[],spring-boot-starter-rocketmq对发送对消息体处理如下:
* 1、如果发送的消息体为String,则将String转化为UTF-8格式的byte[]
* 2、如果发送的消息体不是String,则首先将对象通过jackson-databind转化为JSON String,再将JSON String转化为UTF-8格式的byte[]

原文如下:
How is the message content body being serialized and deserialized?
RocketMQ's message body is stored as byte[]. When the business system message content body if it is java.lang.String type, unified in accordance with utf-8 code into byte []; If the business system message content is not java.lang.String Type, then use jackson-databind serialized into the JSON format string, and then unified in accordance with utf-8 code into byte[].

参考:https://blog.csdn.net/u014380653/article/details/52883356
分享到:
评论

相关推荐

    spingboot-rocketmq事务消息

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,具有高吞吐量、低延迟、高可用性以及可扩展性等特点。 **什么是RocketMQ事务消息** RocketMQ的事务消息主要目的是为了实现分布式事务的一致性。事务消息不同于普通...

    rocketmq:RocketMq支持任意延迟时间的延时消息

    RocketMq支持任意延迟的延时消息方案 主要特性 支持任意延迟的延时消息,精确到秒,最长延迟时间为1年。 使用方法 配置broker.conf segmentScale=60 ##每个时间桶的时间范围,单位分钟,默认60,可选值0-60之间,...

    RocketMQ消息队列demo

    在“RocketMQ消息队列demo”中,我们可以理解这是一个演示如何使用RocketMQ进行消息发布和订阅的实例。这个demo可能是通过一个简单的客户端应用,允许用户输入服务器的IP地址和端口号,然后就能与RocketMQ服务器进行...

    RocketMQ使用手册

    * 多 Master 多 Slave 模式,异步复制:每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级 * 多 Master 多 Slave 模式,同步双写:每个 Master 配置一个 Slave,...

    rocketmq消息中间件.zip

    - **延迟与定时消息**:支持消息的定时和延时发送,满足不同业务需求。 - **事务消息**:提供分布式事务解决方案,确保事务的一致性。 - **消息回溯**:允许消费者回溯到历史消息,方便问题排查。 4. **RocketMQ...

    rocketmq使用.zip

    除了基本使用和事务消息,RocketMQ还有其他高级功能,如消息回溯、延迟消息、顺序消息、消息过滤等。消息回溯可以在不影响正常消息消费的情况下,追溯历史消息。延迟消息可以设定消息在未来的某个时间点才被消费。...

    springboot1.5.10.RELEASE集成rocketmq4.3.1消息服务demo,多个消费者多监听

    在本文中,我们将深入探讨如何将SpringBoot 1.5.10.RELEASE与RocketMQ 4.3.1集成,构建一个支持多个消费者监听的消息服务示例。RocketMQ是阿里巴巴开源的一款分布式消息中间件,它具有高吞吐量、低延迟、高可用性和...

    rocketmq可视化界面,rocketmq-console

    使用RocketMQConsole时,你需要对RocketMQ的基本概念有一定了解,并且熟悉如何配置和启动Console服务。在实际操作中,你可能还需要关注网络环境、权限设置以及日志分析等方面的问题,以确保RocketMQ的稳定运行。通过...

    RocketMQ黑马笔记

    二、安装与配置RocketMQ RocketMQ的安装和配置主要包括下载安装包、配置环境变量、启动NameServer和Broker等步骤。需要注意的是,需要先启动NameServer,然后再启动Broker,否则会导致启动失败。 三、快速运行...

    rocketmq管理工具,项目之前名字rocketmq-console后来项目独立并更名为rocketmq-dashboard

    在本文中,我们将深入探讨RocketMQ Dashboard的相关知识点,包括其功能、特性、安装与使用以及它如何帮助用户更好地管理RocketMQ集群。 1. **RocketMQ Dashboard的功能**: - **监控**: 提供实时的消费者和生产者...

    RocketMQ 5.2.0

    - **主题(Topic)与队列(Queue)**: RocketMQ 使用主题和队列的概念来组织消息。一个主题可以包含多个队列,每个队列存储一部分消息。这种设计允许消费者并行消费,提高处理速度。 - **发布/订阅模型**: 支持点...

    RocketMQ:发送消息与接收实例

    为了理解并使用这些功能,你需要了解 RocketMQ 的基本架构,包括 NameServer(路由注册与发现)、Broker(消息存储与转发)、Producer(消息生产者)和 Consumer(消息消费者)。其中,Producer 负责创建消息并发送...

    rocketmq安装包及RocketMQ 控制台JAR包

    3. **配置与连接**:在控制台上,你需要配置RocketMQ集群的NameServer地址,然后可以查看和管理你的主题(Topic)、队列(Queue)等资源,监控消费状态,进行消息查询等操作。 在实际应用中,RocketMQ支持多种部署...

    阿里 Rocketmq4.2 安装包下载

    - 对于大规模并发场景,可以考虑使用消息的批量发送和消费,以提高效率。 - 为保证事务消息的正确性,需合理设计事务回滚和补偿机制。 通过这些深入理解,开发者可以更好地利用RocketMQ 4.2构建高可靠、高性能的...

    商城秒杀项目,基于springboot开发,使用redis和rocketmq技术增加高并发能力

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,它具有高吞吐量、低延迟和高可用性的特点,适合处理大规模并发消息。在秒杀项目中,RocketMQ主要扮演以下角色: - 异步处理:将秒杀请求作为消息发送到RocketMQ,...

    rocketmq-dashboard.zip

    4. **延迟消息**:RocketMQ提供延迟消息功能,允许消息在特定时间后才被消费,适用于定时任务或者事务补偿场景。 5. **分布式事务**:RocketMQ支持分布式事务,通过半消息和全局顺序消息实现事务一致性。 6. **...

    rocketMQ常用命令

    Apache RocketMQ是一款开源的分布式消息中间件,由阿里巴巴开源并贡献给了Apache社区,它主要用于企业级消息通讯领域,提供低延迟、高可靠的消息传递服务。在本文中,我们将详细介绍如何使用RocketMQ的一些常用命令...

    kafka开发和rocketmq消息技术文档

    通过阅读《Kafka 权威指南》和《RocketMQ 实战与原理解析》这两本书,你可以深入理解这两个消息中间件的架构、配置、最佳实践以及如何在实际项目中应用它们。对于分布式消息系统的设计和实现,这两份文档将提供宝贵...

    RocketMQ讲义-03.pdf

    RocketMQ起源于阿里巴巴内部的分布式消息系统,后来成为Apache顶级项目,具有高可用、高吞吐、低延迟等特点。 **1.4 各种MQ产品的比较** RocketMQ与其他MQ产品如Kafka、RabbitMQ等在性能、功能、易用性等方面有...

Global site tag (gtag.js) - Google Analytics