`
m635674608
  • 浏览: 5004278 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ学习:顺序消息

 
阅读更多

rocketmq的顺序消息需要满足2点:

1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。

先看个例子,代码版本跟前面的一样。
Producer类:


import Java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.messageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;


/**
 * Producer,发送顺序消息
 */
public class Producer {
    public static void main(String[] args) throws IOException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

            producer.setNamesrvAddr("192.168.0.104:9876");

            producer.start();

            String[] tags = new String[] { "TagA", "TagC", "TagD" };

            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateStr = sdf.format(date);
            for (int i = 0; i < 10; i++) {
                // 加个时间后缀
                String body = dateStr + " Hello RocketMQ " + i;
                Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        return mqs.get(id);
                    }
                }, 0);//0是队列的下标

                system.out.println(sendResult + ", body:" + body);
            }

            producer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.in.read();
    }
}

Consumer端:


import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;


/**
 * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
 */
public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("192.168.0.104:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
                for (MessageExt msg: msgs) {
                    System.out.println(msg + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }

}

NameServer和BrokerServer起来后,运行打印,把前面的不重要的去掉了,只看后面的几列:
content:2015-12-06 17:03:21 Hello RocketMQ 0
content:2015-12-06 17:03:21 Hello RocketMQ 1
content:2015-12-06 17:03:21 Hello RocketMQ 2
content:2015-12-06 17:03:21 Hello RocketMQ 3
content:2015-12-06 17:03:21 Hello RocketMQ 4
content:2015-12-06 17:03:21 Hello RocketMQ 5
content:2015-12-06 17:03:21 Hello RocketMQ 6
content:2015-12-06 17:03:21 Hello RocketMQ 7
content:2015-12-06 17:03:21 Hello RocketMQ 8
content:2015-12-06 17:03:21 Hello RocketMQ 9

可以看到,消息有序的。

如何在集群消费时保证消费的有序呢?

1.ConsumeMessageOrderlyService类的start()方法,如果是集群消费,则启动定时任务,定时向broker发送批量锁 住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队 列集合。
consumer收到后,设置是否锁住标志位。
这里注意2个变量:
consumer端的RebalanceImpl里的ConcurrentHashMap processQueueTable,是否锁住设置在ProcessQueue里。
broker端的RebalanceLockManager里的ConcurrentHashMap* group *, ConcurrentHashMap> mqLockTable,这里维护着全局队列锁。

2.ConsumeMessageOrderlyService.ConsumeRequest的run方法是消费消息,这里还有个 MessageQueueLock messageQueueLock,维护当前consumer端的本地队列锁。保证当前只有一个线程能够进行消费。

3.拉到消息存入ProcessQueue,然后判断,本地是否获得锁,全局队列是否被锁住,然后从ProcessQueue里取出消息,用MessageListenerOrderly进行消费。
拉到消息后调用ProcessQueue.putMessage(final List msgs) 存入,具体是存入TreeMap msgTreeMap。
然后是调用ProcessQueue.takeMessags(final int batchSize)消费,具体是把msgTreeMap里消费过的消息,转移到TreeMap msgTreeMapTemp。

4.本地消费的事务控制,ConsumeOrderlyStatus.SUCCESS(提 交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(挂起一会再消费),在此之前还有一个 变量ConsumeOrderlyContext context的setAutoCommit()是否自动提交。
当SUSPEND_CURRENT_QUEUE_A_MOMENT时,autoCommit设置为true或者false没有区别,本质跟消费相反,把消息从msgTreeMapTemp转移回msgTreeMap,等待下次消费。

当SUCCESS时,autoCommit设置为true时比设置为false多做了2个动 作,consumeRequest.getProcessQueue().commit()和 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
ProcessQueue.commit() :本质是删除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消费时从msgTreeMap转移过来的。
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本质是把拉消息的偏移量更新到本地,然后定时更新到broker。

那么少了这2个动作会怎么样呢,随着消息的消费进行,msgTreeMapTemp里的消息堆积越来越多,消费消息的偏移量一直没有更新到broker导致consumer每次重新启动后都要从头开始重复消费。
就算更新了offset到broker,那么msgTreeMapTemp里的消息堆积呢?不知道这算不算bug。
所以,还是把autoCommit设置为true吧。

 

http://www.07net01.com/2015/12/1003523.html

分享到:
评论

相关推荐

    RocketMQ:发送消息与接收实例

    RocketMQ 支持基于 Topic 或 Tag 的顺序消息,通过特定策略确保消息按照预期的顺序到达消费者。 为了理解并使用这些功能,你需要了解 RocketMQ 的基本架构,包括 NameServer(路由注册与发现)、Broker(消息存储与...

    Rocketmq学习指南

    Rocketmq学习指南是一份专为初学者设计的文档集合,旨在深入浅出地介绍Apache RocketMQ这一高效的消息队列系统。RocketMQ最初由阿里巴巴开发,现已成为Apache顶级项目,广泛应用于分布式系统、微服务架构以及大数据...

    RocketMQ学习文档

    RocketMQ学习文档主要涵盖以下几个核心知识点: 1. **RocketMQ简介**:RocketMQ是由阿里巴巴开源的一款高可用、高性能的消息中间件,它最初是基于Metaq发展而来,经过不断的优化和迭代,现已成为Apache顶级项目。...

    RocketMQ学习笔记1

    RocketMQ学习笔记1 RocketMQ是Apache旗下的一个开源的消息队列系统,具有分布式、可靠、可扩展、高性能等特点。下面是对RocketMQ的学习笔记的总结。 分布式架构 RocketMQ原生支持分布式,解决了单点故障问题,...

    RocketMQ高级原理:深入剖析消息系统的核心机制(超详细整理讲解,值得收藏)

    消息分片(Message ID)确保了消息的唯一性和顺序性,每个分片都有唯一的物理存储位置。 四、事务消息 RocketMQ支持分布式事务,通过半消息和回查机制实现事务一致性。Producer发送半消息,待本地事务执行完成后,...

    尚硅谷完整的关于rocketmq的学习视频整理笔记

    深入学习 RocketMQ 还包括了解其高可用策略(如主从复制、集群部署)、消息的顺序性保证、事务消息、延迟消息、死信消息等特性。通过实践操作和理解这些概念,你可以熟练地运用 RocketMQ 解决实际项目中的问题。

    rocketMQ学习代码及文档资料

    - **消息队列**:消息在Broker中存储,通过消息队列进行分发,确保消息的有序性。 2. ** RocketMQ架构** - **NameServer**:作为轻量级的注册中心,负责存储Broker的元数据信息,Producer和Consumer通过...

    rocketmq消息中间件.zip

    - **Broker**:消息服务器,存储和转发消息,是 RocketMQ 的核心组件。 - **Message Queue**:消息队列,消息实际存储的地方,每个队列对应 Broker 的一部分存储空间。 3. **RocketMQ 功能特性** - **高可用**:...

    RocketMQ学习及V3.2.4最新开发指南

    6. 重试策略:消息发送失败时,RocketMQ会自动进行重试。 7. 分布式事务:支持分布式事务消息,确保数据一致性。 三、开发实践 1. 创建生产者:首先需要创建一个生产者实例,并设置相关的属性,如Group ID,然后...

    Rocketmq学习自用2

    - **消息顺序性**:通过特定的Topic和队列配置,RocketMQ可以保证消息的顺序消费。 - **消息过滤**:RocketMQ提供基于消息内容和Tag的过滤规则,允许消费者订阅特定类型的消息。 - **消息跟踪与监控**:RocketMQ...

    RocketMQ消息队列资料

    3. **稳定性**:RocketMQ提供事务消息和顺序消息两种模式,以满足不同业务需求。事务消息可以保证消息的最终一致性,而顺序消息则能保证消息的顺序消费,这对于许多业务逻辑至关重要。 4. **轻量级框架**:RocketMQ...

    rocketmq-all-4.9.3-source

    - 消息顺序:RocketMQ支持消息的顺序传输,尤其在同一个Topic的同一个分区内的消息,可以保持严格的顺序。 - 消息回溯:Consumer可以设置消费位点,实现消息的回溯,方便排查问题。 - 容错机制:RocketMQ提供了消息...

    RocketMQ_design_.pdf.zip

    RocketMQ是中国阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统...通过对RocketMQ设计文档的深入学习,我们可以更好地理解和利用这个强大的消息中间件,优化我们的分布式系统架构,提升系统的可靠性和性能。

    消息中间件rocketmq原理解析

    - **Broker**:消息代理服务器,负责存储消息并提供给消费者消费。 - **Namesrv**:注册中心,管理broker的信息以及路由信息,提供给producer和consumer查询。 ### Producer启动流程 - **获取路由信息**:Producer...

    rocketmq_4.2.0 最新编译版

    6. **顺序消息**:对于对消息顺序有严格要求的应用,RocketMQ提供了顺序消息功能。在同一消息队列内,消息按照发送顺序被消费。 7. **消息回溯**:RocketMQ支持消息回溯,即在一定范围内重读历史消息,这对于故障...

    消息中间件rocketmq原理解析.pdf

    标题中提到的"消息中间件rocketmq原理解析"揭示了本文档的核心内容,即对消息中间件RocketMQ的原理进行解析和探讨。RocketMQ是阿里巴巴开源的一款分布式消息中间件,主要用于企业级的分布式系统中,用以实现系统之间...

    rocketmq安装包.rar

    - 高可靠性:消息持久化,即使在Broker宕机后,也能恢复未消费的消息。 - 分布式事务:支持分布式事务,保证消息的最终一致性。 - 消费策略:支持顺序消费、广播消费、集群消费等多种消费策略。 5. **RocketMQ...

    Apache RocketMQ是一个分布式消息和流媒体平台

    5. **顺序消息**:RocketMQ支持全局顺序消息和分区顺序消息,保证了在大规模并发下的消息顺序性,这对于金融交易、日志记录等场景尤为重要。 6. **定时与延时消息**:RocketMQ允许生产者设置消息的发送时间,可以...

    RocketMq学习视频

    011-011_RocketMQ_Producer_顺序消费机制详解 012-012_RocketMQ_Producer_事务消息机制详解 013-013_RocketMQ_Consumer_Push和Pull模式及使用详解 014-014_RocketMQ_Consumer_配置参数详解 015-015_RocketMQ_...

    RocketMQ 手册3.2.4.pdf

    2. 主题(Topic):消息的分类,生产者和消费者通过主题进行交互。 3. 分区(Queue):主题下的逻辑分片,每个分区对应一个队列,用于并行处理。 4. 生产者(Producer):负责发送消息到RocketMQ服务器。 5. 消费者...

Global site tag (gtag.js) - Google Analytics