`
m635674608
  • 浏览: 5041870 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ批量消费、消息重试、消费模式、刷盘方式

    博客分类:
  • MQ
 
阅读更多

一、Consumer 批量消费

可以通过

 

[java] view plain copy
 
  1. consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条  

这里需要分为2种情况1、Consumer端先启动  2、Consumer端后启动.   正常情况下:应该是Consumer需要先启动

 

1、Consumer端先启动

Consumer代码如下

 

[java] view plain copy
 
  1. package quickstart;  
  2.   
  3. import java.util.List;  
  4.   
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  9. import com.alibaba.rocketmq.client.exception.MQClientException;  
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  11. import com.alibaba.rocketmq.common.message.MessageExt;  
  12.   
  13. /** 
  14.  * Consumer,订阅消息 
  15.  */  
  16. public class Consumer {  
  17.   
  18.     public static void main(String[] args) throws InterruptedException, MQClientException {  
  19.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
  20.         consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
  21.         consumer.setConsumeMessageBatchMaxSize(10);  
  22.         /** 
  23.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
  24.          * 如果非第一次启动,那么按照上次消费的位置继续消费 
  25.          */  
  26.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  27.   
  28.         consumer.subscribe("TopicTest""*");  
  29.   
  30.         consumer.registerMessageListener(new MessageListenerConcurrently() {  
  31.   
  32.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  33.                   
  34.                 try {  
  35.                     System.out.println("msgs的长度" + msgs.size());  
  36.                     System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
  37.                 } catch (Exception e) {  
  38.                     e.printStackTrace();  
  39.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
  40.                 }  
  41.                   
  42.                   
  43.                   
  44.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  45.             }  
  46.         });  
  47.   
  48.         consumer.start();  
  49.   
  50.         System.out.println("Consumer Started.");  
  51.     }  
  52. }  



 



 

由于这里是Consumer先启动,所以他回去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1):

 

 

 

2、Consumer端后启动,也就是Producer先启动

由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer的

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条    

所以这段代码就生效了测试结果如下(每次size最多是10):

二、消息重试机制:消息重试分为2种1、Producer端重试 2、Consumer端重试

1、Producer端重试 

也就是Producer往MQ上发消息没有发送成功,我们可以设置发送失败重试的次数

 

[java] view plain copy
 
  1. package quickstart;  
  2.   
  3. import com.alibaba.rocketmq.client.exception.MQClientException;  
  4. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
  5. import com.alibaba.rocketmq.client.producer.SendResult;  
  6. import com.alibaba.rocketmq.common.message.Message;  
  7.   
  8. /** 
  9.  * Producer,发送消息 
  10.  *  
  11.  */  
  12. public class Producer {  
  13.     public static void main(String[] args) throws MQClientException, InterruptedException {  
  14.         DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");  
  15.         producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
  16.         producer.setRetryTimesWhenSendFailed(10);//失败的 情况发送10次  
  17.         producer.start();  
  18.   
  19.         for (int i = 0; i < 1000; i++) {  
  20.             try {  
  21.                 Message msg = new Message("TopicTest",// topic  
  22.                         "TagA",// tag  
  23.                         ("Hello RocketMQ " + i).getBytes()// body  
  24.                 );  
  25.                 SendResult sendResult = producer.send(msg);  
  26.                 System.out.println(sendResult);  
  27.             } catch (Exception e) {  
  28.                 e.printStackTrace();  
  29.                 Thread.sleep(1000);  
  30.             }  
  31.         }  
  32.   
  33.         producer.shutdown();  
  34.     }  
  35. }  



 

 

 

2、Consumer端重试

2.1、exception的情况,一般重复16次 10s、30s、1分钟、2分钟、3分钟等等

上面的代码中消费异常的情况返回

return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试

正常则返回:

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

 

 

[java] view plain copy
 
  1. package quickstart;  
  2.   
  3.   
  4. import java.util.List;  
  5.   
  6. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  8. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  9. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  10. import com.alibaba.rocketmq.client.exception.MQClientException;  
  11. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  12. import com.alibaba.rocketmq.common.message.MessageExt;  
  13.   
  14. /** 
  15.  * Consumer,订阅消息 
  16.  */  
  17. public class Consumer {  
  18.   
  19.     public static void main(String[] args) throws InterruptedException, MQClientException {  
  20.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
  21.         consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
  22.         consumer.setConsumeMessageBatchMaxSize(10);  
  23.         /** 
  24.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
  25.          * 如果非第一次启动,那么按照上次消费的位置继续消费 
  26.          */  
  27.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  28.   
  29.         consumer.subscribe("TopicTest""*");  
  30.   
  31.         consumer.registerMessageListener(new MessageListenerConcurrently() {  
  32.   
  33.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  34.   
  35.                 try {  
  36.                     // System.out.println("msgs的长度" + msgs.size());  
  37.                     System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
  38.                     for (MessageExt msg : msgs) {  
  39.                         String msgbody = new String(msg.getBody(), "utf-8");  
  40.                         if (msgbody.equals("Hello RocketMQ 4")) {  
  41.                             System.out.println("======错误=======");  
  42.                             int a = 1 / 0;  
  43.                         }  
  44.                     }  
  45.   
  46.                 } catch (Exception e) {  
  47.                     e.printStackTrace();  
  48.                     if(msgs.get(0).getReconsumeTimes()==3){  
  49.                         //记录日志  
  50.                           
  51.                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
  52.                     }else{  
  53.                           
  54.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
  55.                     }  
  56.                 }  
  57.   
  58.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
  59.             }  
  60.         });  
  61.   
  62.         consumer.start();  
  63.   
  64.         System.out.println("Consumer Started.");  
  65.     }  
  66. }  

 

打印结果:



假如超过了多少次之后我们可以让他不再重试记录 日志。

if(msgs.get(0).getReconsumeTimes()==3){
//记录日志 
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}

 

2.2超时的情况,这种情况MQ会无限制的发送给消费端。

就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;这样的就认为没有到达Consumer端。

这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ

 

[java] view plain copy
 
  1. package model;  
  2.   
  3. import java.util.List;  
  4.   
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  9. import com.alibaba.rocketmq.client.exception.MQClientException;  
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  11. import com.alibaba.rocketmq.common.message.MessageExt;  
  12.   
  13. /** 
  14.  * Consumer,订阅消息 
  15.  */  
  16. public class Consumer {  
  17.   
  18.     public static void main(String[] args) throws InterruptedException, MQClientException {  
  19.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");  
  20.         consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
  21.         consumer.setConsumeMessageBatchMaxSize(10);  
  22.         /** 
  23.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
  24.          * 如果非第一次启动,那么按照上次消费的位置继续消费 
  25.          */  
  26.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  27.   
  28.         consumer.subscribe("TopicTest""*");  
  29.   
  30.         consumer.registerMessageListener(new MessageListenerConcurrently() {  
  31.   
  32.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  33.   
  34.                 try {  
  35.   
  36.                     // 表示业务处理时间  
  37.                     System.out.println("=========开始暂停===============");  
  38.                     Thread.sleep(60000);  
  39.   
  40.                     for (MessageExt msg : msgs) {  
  41.                         System.out.println(" Receive New Messages: " + msg);  
  42.                     }  
  43.   
  44.                 } catch (Exception e) {  
  45.                     e.printStackTrace();  
  46.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
  47.                 }  
  48.   
  49.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
  50.             }  
  51.         });  
  52.   
  53.         consumer.start();  
  54.   
  55.         System.out.println("Consumer Started.");  
  56.     }  
  57. }  

 

 


 

三、消费模式

广播消费:rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费

consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费

 

[java] view plain copy
 
  1. package model;  
  2.   
  3. import java.util.List;  
  4.   
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  9. import com.alibaba.rocketmq.client.exception.MQClientException;  
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  11. import com.alibaba.rocketmq.common.message.MessageExt;  
  12. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;  
  13.   
  14. /** 
  15.  * Consumer,订阅消息 
  16.  */  
  17. public class Consumer2 {  
  18.   
  19.     public static void main(String[] args) throws InterruptedException, MQClientException {  
  20.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");  
  21.         consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
  22.         consumer.setConsumeMessageBatchMaxSize(10);  
  23.         consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费  
  24.       
  25.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  26.   
  27.         consumer.subscribe("TopicTest""*");  
  28.   
  29.         consumer.registerMessageListener(new MessageListenerConcurrently() {  
  30.   
  31.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  32.   
  33.                 try {  
  34.   
  35.                     for (MessageExt msg : msgs) {  
  36.                         System.out.println(" Receive New Messages: " + msg);  
  37.                     }  
  38.   
  39.                 } catch (Exception e) {  
  40.                     e.printStackTrace();  
  41.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
  42.                 }  
  43.   
  44.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
  45.             }  
  46.         });  
  47.   
  48.         consumer.start();  
  49.   
  50.         System.out.println("Consumer Started.");  
  51.     }  
  52. }  

如果我们有2台节点,Producerw往MQ上写入20条数据 其中Consumer1中拉取了12条 。Consumer2中拉取了8 条,这种情况下,加入Consumer1宕机,那么我们消费数据的时候,只能消费到Consumer2中的8条,Consumer1中的12条已经持久化到中。需要Consumer1回复之后这12条数据才能继续被消费。其实这种先启动producer往MQ上写数据,然后再启动Consumer的情况本来就是违规操作,正确的情况应该是先启动Consumer后再启动producer。

 

异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署

异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点

同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发送成功。

 

四、刷盘方式

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

commitlog:

commitlog就是来存储所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。

consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据

 

 

http://blog.csdn.net/u010634288/article/details/56049305

分享到:
评论

相关推荐

    rocketmq 开发规范 精讲 精华部分

    RocketMQ支持消费失败后的定时重试,有助于减少因暂时性错误导致的消息丢失。 ### **开发规范与最佳实践** **命名规范**: 1. 建议消息队列和消费者组的名称具有可读性和可管理性,避免使用特殊字符,如空格、换行...

    RocketMQ 开发手册 3.2.4

    - **重试机制**:对于发送失败的消息,RocketMQ会自动进行重试,降低消息丢失风险。 - **死信队列**:无法正常消费的消息会被放入死信队列,便于后续排查和处理。 通过 RocketMQ 开发手册 3.2.4,开发者可以深入...

    RocketMQ用户指南--v3.2.4--new.pdf

    12. **消息消费失败,定时重试**:对于消费失败的消息,可以设定重试策略。 13. **HA,同步双写/异步复制**:通过主备切换或复制机制保障服务的高可用性。 14. **单个JVM进程也能利用机器超大内存**:优化了内存管理...

    RocketMQ开发指南

    - **消息重试** (Message Retry): 当消息消费失败时,支持自动重试机制。 #### 三、RocketMQ系统架构概述 - **RocketMQ是什么**: - RocketMQ是一款分布式消息中间件,它提供了一种高效、可靠的方式来在分布式系统...

    RocketMQ 开发手册3.2.4.pdf

    - 消息消费失败时,通过定时重试机制来保证消息最终被成功处理。 4. RocketMQ存储特点: - 零拷贝原理,减少数据在传输过程中的复制次数,提高效率。 - 使用不同的文件系统来存储消息数据,如HDFS等。 - 数据...

    RocketMQ 4.5.2源码

    - **持久化与刷盘策略**: RocketMQ支持文件系统和MySQL等存储方式,消息持久化策略包括同步刷盘和异步刷盘。 5. **集群与高可用**: - **主备切换**: RocketMQ的HA机制基于主从复制,当主节点故障时,从节点接管...

    1.1.RocketMq课程说明.mp4

    本系列课程从rocketmq基础到高级实战,老师手把手教你如何进行rocketmq集群的搭建,课程体系完整清晰,理论结合实践,且涵盖了高频面试题。 1.MQ简介 1.1.mq简介 ...5.5.消息重试与死信队列 5.6.消息幂等性

    rocketmq-all-4.5.1-bin-release.zip

    2. **高可靠**:通过消息重试、死信队列、事务消息等功能确保消息不丢失。 3. **灵活性**:支持多种消息模型和消费策略,满足不同业务场景的需求。 4. **强大的社区支持**:由于其开源属性,RocketMQ拥有活跃的...

    RocketMQ Developer Guide

    - **消息消费负载均衡**:消息消费端能够实现负载均衡。 - **订阅消息负载均衡**:确保订阅消息能够合理分配。 - **单队列并行消费**:提高单个队列消息处理的并行度。 - **HA(高可用性)**:同步双写或异步复制的...

    rocketmq源码、

    - 如果发送失败,Producer可以设置重试策略。 4. **消息消费流程** - Consumer订阅感兴趣的Topic。 - 向NameServer查询Topic对应的Broker列表。 - Consumer根据消费策略(顺序消费、广播消费等)从 Broker Pull...

    RocketMQ用户指南

    由于文档内容存在重复,且关键知识点主要集中在部分段落中,以下是...文档中还包含了一些常见的问题处理方法,如消息队列满了如何处理、回溯消费、消息堆积、消息重试等,以帮助用户更好地理解和掌握RocketMQ的使用。

    kafka培训.pptx

    - **Kafka**:强调高吞吐量,使用发布订阅模式,消息持久化在磁盘,支持多语言客户端,但消费失败不支持重试。 3. **Kafka 架构及相关概念** - **基于Pull的消费模式**:消费者主动从服务器拉取消息,优化了处理...

Global site tag (gtag.js) - Google Analytics