`
m635674608
  • 浏览: 5042424 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMq offset 消费

    博客分类:
  • MQ
 
阅读更多
          List<MessageVo> msgList = new ArrayList<MessageVo>();
            try {
                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName);
                for (MessageQueue mq : mqs) {
                    System.out.printf("Consume from the queue: " + mq + "%n");
                    try {
                        PullResult pullResult =
                                consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                        if (pullResult != null) {
                            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) {
                                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                                            msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody()));
                                        }
                                    }
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                    break;
                                case OFFSET_ILLEGAL:
                                    break;
                                default:
                                    break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

   

  

            private Queue<List<MessageVo>> messageQueue = new LinkedBlockingQueue<List<MessageVo>>();
            
           DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(groupName);
            consumer.setNamesrvAddr(url);
            scheduleService = new MQPullConsumerScheduleService(groupName);
            scheduleService.setMessageModel(MessageModel.CLUSTERING);
            scheduleService.setDefaultMQPullConsumer(consumer);
            
          List<MessageVo> msgList = new ArrayList<MessageVo>();
            try {
                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName);
                for (MessageQueue mq : mqs) {
                    System.out.printf("Consume from the queue: " + mq + "%n");
                    try {
                        PullResult pullResult =
                                consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                        if (pullResult != null) {
                            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) {
                                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                                            msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody()));
                                        }
                                    }
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                    break;
                                case OFFSET_ILLEGAL:
                                    break;
                                default:
                                    break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
          List<MessageVo> msgList = new ArrayList<MessageVo>();
            try {
                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName);
                for (MessageQueue mq : mqs) {
                    System.out.printf("Consume from the queue: " + mq + "%n");
                    try {
                        PullResult pullResult =
                                consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                        if (pullResult != null) {
                            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) {
                                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                                            msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody()));
                                        }
                                    }
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                    break;
                                case OFFSET_ILLEGAL:
                                    break;
                                default:
                                    break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

 https://segmentfault.com/a/1190000009007514

分享到:
评论

相关推荐

    java面试题_消息中间件--RocketMq(14题).zip

    - Consumer保存消费位置(Offset),每次消费从上次的位置开始。 11. **RocketMQ的消费进度管理**: - Consumer可以手动调整Offset,实现消息回溯或跳过消费。 12. **RocketMQ的监控与调优**: - 提供丰富的...

    rocketmq 开发规范 精讲 精华部分

    RocketMQ允许按照时间精确到毫秒级别回溯消息,比Kafka的Offset回溯更为灵活。 **消费失败重试机制**: RocketMQ支持消费失败后的定时重试,有助于减少因暂时性错误导致的消息丢失。 ### **开发规范与最佳实践** ...

    rocketmq中间件

    - RocketMQ通过MessageId、Offset和CommitLog确保消息的可靠传输。 - 为了保证消息不丢失,RocketMQ支持刷盘策略,如同步刷盘和异步刷盘。 9. **高可用与容错**: - Master-Slave架构实现故障切换,当Master挂掉...

    RocketMQ源码

    - **消息模型**:RocketMQ支持发布/订阅模型和点对点模型,前者允许多个消费者订阅同一主题,后者则确保每条消息仅被一个消费者消费。 - **NameServer**:作为轻量级的注册中心,NameServer存储了Topic与Broker的...

    rocketmq-all-4.0.0-incubating-bin-release

    7. **Offset Management**: Consumer在消费消息时,会记录每个Queue的消费进度(Offset),确保消息的幂等性。Offset存储在Consumer端,可以手动或自动提交。 8. **Transaction Support**: RocketMQ支持分布式事务...

    RocketMQ学习文档

    同时,通过Offset管理,消费者可以任意定位并消费历史消息。 7. **消费策略**:RocketMQ支持广播消费和集群消费两种模式。广播消费模式下,每个消费者组内的所有消费者都会收到每条消息;集群消费模式下,消息仅由...

    RocketMQ源码包.zip

    9. **Offset管理**:每个消费者有自己的消费进度,即Offset,保存在RocketMQ的Broker上,确保消费者可以持续从上次消费的位置开始读取。 10. **性能优化**:RocketMQ在协议设计、网络通信、I/O操作等方面做了大量...

    RocketMQ文档

    `offset.store.type`定义了消费偏移量的存储方式,可以是内存或远程存储等。理解并正确配置这些参数能帮助你更好地管理RocketMQ集群,避免不必要的性能瓶颈。 接着,《rocketMQ使用手册.pdf》提供了RocketMQ的全...

    阿里巴巴 rocketmq

    消费进度由消费者自行管理,每个消费者在消费过程中记录自己的消费位点(Offset),确保消息的有序消费和避免消息重复。 13. **Fault Tolerance**: RocketMQ 设计了多种机制保证故障容错,如Master-Slave复制、...

    RocketMQ群问题整理

    13. **时间点消费**:RocketMQ 支持从特定时间点回溯消费,精确到毫秒级别,可通过 MQHelper 类实现。 14. **设置Nameserver地址**:可以通过 `-D` 参数设置系统属性或环境变量来指定 Nameserver 地址,系统属性...

    Rocketmq核心源码剖析-图灵杨过老师1

    ConsumeQueue存储了消息在CommitLog中的偏移量(offset)和其他消费相关的元数据,如消息Tag。消费者在消费时,先根据主题和队列号找到对应的ConsumeQueue,然后通过ConsumeQueue中的索引定位到CommitLog中的具体...

    rocketMQ介绍ppt

    commitlog存储所有消息,consumeQueue用于消费队列,index存储消息key与offset的对应关系,方便快速查找。 ### 高可用设计 - **消息重试机制**:当发送失败时,RocketMQ会自动重试,以保证消息最终能够送达。 - **...

    rocketmq-client.zip

    RocketMQ支持多种消费模式,如集群消费(Cluster)、广播消费(Broadcasting)、顺序消费(Orderly)等。在Python客户端中,消费者同样需要实例化,并订阅感兴趣的主题,然后通过拉取或推送给消费者消息。 3. **...

    RocketMQ讲义-03.pdf

    * **Offset**:消费者在Queue中的读取位置。 **5. 集群搭建** 包括集群的不同搭建方式和具体步骤,以及双主双从集群的工作流程。 **6. 管理工具** * **mqadmin**:命令行工具,用于管理MQ实例。 * **RocketMQ-...

    kafka开发和rocketmq消息技术文档

    4. **分布式事务协调**:RocketMQ 使用 GroupOffset 和 BrokeOffset 实现消费者组的事务协调,确保消息的正确消费。 5. **云原生设计**:RocketMQ 针对云环境进行了优化,支持大规模集群管理和监控。 6. **多语言...

    Apache RocketMQ 从入门到实战1

    RocketMQ的基础概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)、队列(Queue)、Message ID和Offset。生产者负责发送消息,消费者负责接收和处理消息,主题是消息的分类,队列则是主题下的逻辑分区。...

    rocketMq实战(3)-console和运维

    - **Offset调整**:手动调整消费组的消费位点,以应对消息丢失或重复的情况。 - **性能测试**:提供压力测试工具,评估RocketMQ在特定场景下的性能表现。 4. **源码分析** - **Console源码**:深入了解RocketMQ ...

    rocketmq-release-4.8.0.zip

    10. **Offset Management**:消费者需要管理自己的消费进度(Offset),RocketMQ提供了自动和手动两种方式来管理Offset,以保证消息的正确消费。 11. **Cluster Deployment**:RocketMQ支持集群部署,通过多台...

    最新版windows rocketmq-all-4.9.1-bin-release.zip

    10. **消息回溯**:RocketMQ允许用户配置消息保留策略,当需要追溯历史消息时,可以通过时间戳或Offset来检索。 总之,RocketMQ 4.9.1在Windows上的部署和使用涉及多个环节,包括环境配置、服务启动、客户端集成、...

    RocketMQ原理详解

    - **准备阶段**:生产者向Broker发送一条带有预提交标记的事务消息,Broker返回消息的偏移量(offset)。此时消息状态为“准备”状态,不会被消费者消费。随后生产者执行本地事务操作。 - **提交阶段**:根据本地...

Global site tag (gtag.js) - Google Analytics