rocketmq的顺序消息需要满足2点:
1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。
先看个例子,代码版本跟前面的一样。
Producer类:
import Java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.messageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
/**
* Producer,发送顺序消息
*/
public class Producer {
public static void main(String[] args) throws IOException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.0.104:9876");
producer.start();
String[] tags = new String[] { "TagA", "TagC", "TagD" };
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间后缀
String body = dateStr + " Hello RocketMQ " + i;
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
return mqs.get(id);
}
}, 0);//0是队列的下标
system.out.println(sendResult + ", body:" + body);
}
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.in.read();
}
}
Consumer端:
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.0.104:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println(msg + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
NameServer和BrokerServer起来后,运行打印,把前面的不重要的去掉了,只看后面的几列:
content:2015-12-06 17:03:21 Hello RocketMQ 0
content:2015-12-06 17:03:21 Hello RocketMQ 1
content:2015-12-06 17:03:21 Hello RocketMQ 2
content:2015-12-06 17:03:21 Hello RocketMQ 3
content:2015-12-06 17:03:21 Hello RocketMQ 4
content:2015-12-06 17:03:21 Hello RocketMQ 5
content:2015-12-06 17:03:21 Hello RocketMQ 6
content:2015-12-06 17:03:21 Hello RocketMQ 7
content:2015-12-06 17:03:21 Hello RocketMQ 8
content:2015-12-06 17:03:21 Hello RocketMQ 9
可以看到,消息有序的。
如何在集群消费时保证消费的有序呢?
1.ConsumeMessageOrderlyService类的start()方法,如果是集群消费,则启动定时任务,定时向broker发送批量锁 住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队 列集合。
consumer收到后,设置是否锁住标志位。
这里注意2个变量:
consumer端的RebalanceImpl里的ConcurrentHashMap processQueueTable,是否锁住设置在ProcessQueue里。
broker端的RebalanceLockManager里的ConcurrentHashMap* group *, ConcurrentHashMap> mqLockTable,这里维护着全局队列锁。
2.ConsumeMessageOrderlyService.ConsumeRequest的run方法是消费消息,这里还有个 MessageQueueLock messageQueueLock,维护当前consumer端的本地队列锁。保证当前只有一个线程能够进行消费。
3.拉到消息存入ProcessQueue,然后判断,本地是否获得锁,全局队列是否被锁住,然后从ProcessQueue里取出消息,用MessageListenerOrderly进行消费。
拉到消息后调用ProcessQueue.putMessage(final List msgs) 存入,具体是存入TreeMap msgTreeMap。
然后是调用ProcessQueue.takeMessags(final int batchSize)消费,具体是把msgTreeMap里消费过的消息,转移到TreeMap msgTreeMapTemp。
4.本地消费的事务控制,ConsumeOrderlyStatus.SUCCESS(提 交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(挂起一会再消费),在此之前还有一个 变量ConsumeOrderlyContext context的setAutoCommit()是否自动提交。
当SUSPEND_CURRENT_QUEUE_A_MOMENT时,autoCommit设置为true或者false没有区别,本质跟消费相反,把消息从msgTreeMapTemp转移回msgTreeMap,等待下次消费。
当SUCCESS时,autoCommit设置为true时比设置为false多做了2个动 作,consumeRequest.getProcessQueue().commit()和 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
ProcessQueue.commit() :本质是删除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消费时从msgTreeMap转移过来的。
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本质是把拉消息的偏移量更新到本地,然后定时更新到broker。
那么少了这2个动作会怎么样呢,随着消息的消费进行,msgTreeMapTemp里的消息堆积越来越多,消费消息的偏移量一直没有更新到broker导致consumer每次重新启动后都要从头开始重复消费。
就算更新了offset到broker,那么msgTreeMapTemp里的消息堆积呢?不知道这算不算bug。
所以,还是把autoCommit设置为true吧。
http://www.07net01.com/2015/12/1003523.html
相关推荐
RocketMQ 支持基于 Topic 或 Tag 的顺序消息,通过特定策略确保消息按照预期的顺序到达消费者。 为了理解并使用这些功能,你需要了解 RocketMQ 的基本架构,包括 NameServer(路由注册与发现)、Broker(消息存储与...
Rocketmq学习指南是一份专为初学者设计的文档集合,旨在深入浅出地介绍Apache RocketMQ这一高效的消息队列系统。RocketMQ最初由阿里巴巴开发,现已成为Apache顶级项目,广泛应用于分布式系统、微服务架构以及大数据...
RocketMQ学习文档主要涵盖以下几个核心知识点: 1. **RocketMQ简介**:RocketMQ是由阿里巴巴开源的一款高可用、高性能的消息中间件,它最初是基于Metaq发展而来,经过不断的优化和迭代,现已成为Apache顶级项目。...
RocketMQ学习笔记1 RocketMQ是Apache旗下的一个开源的消息队列系统,具有分布式、可靠、可扩展、高性能等特点。下面是对RocketMQ的学习笔记的总结。 分布式架构 RocketMQ原生支持分布式,解决了单点故障问题,...
消息分片(Message ID)确保了消息的唯一性和顺序性,每个分片都有唯一的物理存储位置。 四、事务消息 RocketMQ支持分布式事务,通过半消息和回查机制实现事务一致性。Producer发送半消息,待本地事务执行完成后,...
深入学习 RocketMQ 还包括了解其高可用策略(如主从复制、集群部署)、消息的顺序性保证、事务消息、延迟消息、死信消息等特性。通过实践操作和理解这些概念,你可以熟练地运用 RocketMQ 解决实际项目中的问题。
- **消息队列**:消息在Broker中存储,通过消息队列进行分发,确保消息的有序性。 2. ** RocketMQ架构** - **NameServer**:作为轻量级的注册中心,负责存储Broker的元数据信息,Producer和Consumer通过...
- **Broker**:消息服务器,存储和转发消息,是 RocketMQ 的核心组件。 - **Message Queue**:消息队列,消息实际存储的地方,每个队列对应 Broker 的一部分存储空间。 3. **RocketMQ 功能特性** - **高可用**:...
6. 重试策略:消息发送失败时,RocketMQ会自动进行重试。 7. 分布式事务:支持分布式事务消息,确保数据一致性。 三、开发实践 1. 创建生产者:首先需要创建一个生产者实例,并设置相关的属性,如Group ID,然后...
- **消息顺序性**:通过特定的Topic和队列配置,RocketMQ可以保证消息的顺序消费。 - **消息过滤**:RocketMQ提供基于消息内容和Tag的过滤规则,允许消费者订阅特定类型的消息。 - **消息跟踪与监控**:RocketMQ...
3. **稳定性**:RocketMQ提供事务消息和顺序消息两种模式,以满足不同业务需求。事务消息可以保证消息的最终一致性,而顺序消息则能保证消息的顺序消费,这对于许多业务逻辑至关重要。 4. **轻量级框架**:RocketMQ...
- 消息顺序:RocketMQ支持消息的顺序传输,尤其在同一个Topic的同一个分区内的消息,可以保持严格的顺序。 - 消息回溯:Consumer可以设置消费位点,实现消息的回溯,方便排查问题。 - 容错机制:RocketMQ提供了消息...
RocketMQ是中国阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统...通过对RocketMQ设计文档的深入学习,我们可以更好地理解和利用这个强大的消息中间件,优化我们的分布式系统架构,提升系统的可靠性和性能。
- **Broker**:消息代理服务器,负责存储消息并提供给消费者消费。 - **Namesrv**:注册中心,管理broker的信息以及路由信息,提供给producer和consumer查询。 ### Producer启动流程 - **获取路由信息**:Producer...
6. **顺序消息**:对于对消息顺序有严格要求的应用,RocketMQ提供了顺序消息功能。在同一消息队列内,消息按照发送顺序被消费。 7. **消息回溯**:RocketMQ支持消息回溯,即在一定范围内重读历史消息,这对于故障...
标题中提到的"消息中间件rocketmq原理解析"揭示了本文档的核心内容,即对消息中间件RocketMQ的原理进行解析和探讨。RocketMQ是阿里巴巴开源的一款分布式消息中间件,主要用于企业级的分布式系统中,用以实现系统之间...
- Queue:消息队列,每个Topic下可有多个Queue,Queue是实际消息存储的地方。 - Producer Group:生产者组,一组Producer共享相同的Group ID,消息会被均匀分发到不同的Queue。 - Consumer Group:消费者组,一组...
- 高可靠性:消息持久化,即使在Broker宕机后,也能恢复未消费的消息。 - 分布式事务:支持分布式事务,保证消息的最终一致性。 - 消费策略:支持顺序消费、广播消费、集群消费等多种消费策略。 5. **RocketMQ...
5. **顺序消息**:RocketMQ支持全局顺序消息和分区顺序消息,保证了在大规模并发下的消息顺序性,这对于金融交易、日志记录等场景尤为重要。 6. **定时与延时消息**:RocketMQ允许生产者设置消息的发送时间,可以...
011-011_RocketMQ_Producer_顺序消费机制详解 012-012_RocketMQ_Producer_事务消息机制详解 013-013_RocketMQ_Consumer_Push和Pull模式及使用详解 014-014_RocketMQ_Consumer_配置参数详解 015-015_RocketMQ_...