Metaq是一个类是kafka的消息系统,开源地址https://github.com/killme2008/Metamorphosis。
基于Pull的消息系统,consumer端保持了很多逻辑,比如当前拉取消息的offset,loadbalance等,使用zookeeper作为coordination。
简单类图
核心类ZKLoadRebalanceListener,负责集群感知,当有broker退出或consumer退出时,重新balance。
FetchManager是具体的worker线程,负责从处理FetchRequest,从某一个partition拉去数据,并回掉业务方的接口实现
其实现流程
1.注册consumer id
/meta/consumers/meta-example/ids新建临时节点,data为topicString,就是这个consumer订阅了那些topic
2.loadBalanceListener监听/meta/consumers/meta-example/ids children consumer数目变化
3.loadBalanceListener监听/meta/brokers/topics-sub/meta-test children分区变化
4.主动balance,分区和同组consumer之间均分
4.1 /meta/consumers/meta-example/ids下读取自己刚才写入的data,topics列表,构造一个myConsumerPerTopicMap,topic-->consumer
4.2 /meta/brokers/topics-sub/ids下读取所有brokers信息,成为Cluster
4.3 /meta/consumers/meta-example/ids读取所有consumer
4.4 /meta/consumers/meta-example/ids读取consumer的data,topics列表,构造一个consumersPerTopicMap,topic-->consumer list
4.5 拿自己订阅的topics的所有分区,partitionsPerTopicMap,topic-->partition list
4.5.1 /meta/brokers/topics-sub/meta-test读取所有children
4.5.2 每个child代表一个broker,其data为‘{"broker":"0-m","numParts":2}’,保存broker的分区信息
4.5.3 构造一个partList代表这个topic的所有分区信息,单个分区类是‘0-0’,‘0-1’
4.6 看看自己订阅的topic里,有哪些topic的broker或者分区有变化,初始化时old为空,都不一样,返回relevantTopicConsumerIdMap,topic-->consumer
4.7 暂停Fetcher
4.8 提交offset
4.9 对于每个变化的topic,根据负载均衡策略获取这个consumer对应的partition列表
4.10 查看当前这个topic的分区列表,查看是否有变更,topicRegistry中
4.11 新分配的分区添加,/meta/consumers/meta-example/owners/meta-test下创建临时节点,名称为分区名‘0-0’,data为consumerId
4.12 拿offset信息,/meta/consumers/meta-example/offsets/meta-test/0-0的data,内容如‘251537821439885312-545’,前为msgId,后为offset
4.13 初始化时,认为offset为0
4.14 分配给自己的分区随机取master或slave的一个读,master和slave的brokerId一样
4.15 添加FetchRequest到FetchManager
4.16 给需要请求的broker创立连接
4.17 关闭之前创建的无用的连接
4.18 启动fetch线程
5.FetchRequestRunner启动,开始拉数据
5.1 组装GetCommand,发送,戴上offset信息,每次取1M
5.2 返回的数据弄一个MessageIterator包装下
6 数据处理
6.1 判断FetchRequest的重试次数是否超过限制,默认5次,超过则存本地,跳过这跳消息,继续消费
6.2 decodeMessage成message对象
6.3 调用业务接口
6.4 消息处理完后,ack request,修改内存中的TopicPartitionRegInfo的offset信息,后续Timer线程会commit
6.5 重新添加请求,继续拉取数据
Timer线程扫描topicRegistry中的所有topic的offset内存数据,修改对应zk中的offset节点中数据,比如‘/meta/consumers/meta-example/offsets/meta-test/0-0’
小节
1.partition由broker指定,同一个broker可以制定多个partition
2.每个partion只能分配给同一个group下的的一个consumer
3.每个consumer可以分配多个partition,订阅多个topic
4.consumer集群变化时执行load balance,重新分配partition
5.提交offset使用异步批量提交
相关推荐
Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...
MetaQ Server的架构主要包括Producer、Consumer、Broker和Controller四个主要部分: 1. Producer:生产者负责将消息发送到MetaQ Server,通过API接口与Server进行通信。 2. Consumer:消费者从MetaQ Server订阅并...
Client则分为Producer和Consumer,Producer负责发送消息,Consumer负责接收和消费消息。 在性能优化方面,MetaQ 1.4.6.2版本可能包括了对网络IO、多线程并行处理和内存管理等方面的改进。例如,可能会采用零拷贝...
在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者主动从MetaQ拉取数据,解析成消息并进行消费。MetaQ的架构设计中,...
RocketMQ的核心组件包括Producer、Consumer、NameServer和Broker四部分: 1. **Producer**:生产者是消息的发送方,负责创建并发送消息到RocketMQ Broker。RocketMQ提供了同步和异步两种发送模式,同步模式确保...
对于卡夫卡的对比,与其他消息系统相比,如RabbitMQ、Redis、ZeroMQ、ActiveMQ、MetaQ/RocketMQ等,卡夫卡具有高性能跨语言的分布式发布/订阅消息系统的特性,支持数据持久化,具有完全分布式的特性,并能够同时支持...
阿里云ONS(消息队列服务)是基于阿里云提供的分布式的、高可靠性的消息服务产品,它来源于阿里内部广泛使用的消息中间件MetaQ,亦即后来的开源项目RocketMQ。ONS支持海量消息的生产与消费,以无单点故障、高可扩展...
RocketMQ 的发展历程可以追溯到Metaq 1.x,它是基于Kafka的Java重写版。随后,Metaq经历了2.x版本的存储优化,以适应阿里集团的大规模交易需求。2012年,RocketMQ 3.0版本发布,成为独立的产品,并在2017年成为...
RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...
支付宝钱包系统架构内部剖析 支付宝钱包系统架构概况: 支付宝钱包系统架构是...4. 分布式环境下(broker,producer,consumer都为集群)的消息路由,对顺序和可靠性有极高要求的场景 5. 作为一般MQ来使用的其他功能
Consumer还可以通过Pull模式拉取消息,这为消费者提供了更大的灵活性。 #### 7. shutdown Consumer的关闭流程需要妥善处理,保证消息不丢失。 ### 三、Broker #### 1. Broker的启动 Broker是RocketMQ的核心组件,...
最后,文档中还提供了产品发展历史的概述,从Metaq的迭代到RocketMQ的正式上线,以及其后续版本的发展。介绍了如何在不同的业务系统中深度定制RocketMQ来满足特定的需求,如淘宝、支付宝和B2B等场景下的应用。并且,...
数据消费者,consumer 的用法:《consumer 的用法》 迓有些零碎的,关亍通信段的源码览读:《net 包源码览读》、《broker 配置》 扩展的阅读迓有下面返些: 关亍 kafka 和 jafka 的相关博客,特删好,有徆多问题也都...
1. **Push(推送)**:类似于Broker将消息推送给Consumer的方式,但实际上仍是Consumer主动从Broker拉取消息。 - **优点**:采用长轮询方式,能有效减少Broker和Consumer之间的交互频率,提高系统的整体性能。 2. *...
RocketMQ是一个由阿里巴巴开源的消息中间件,脱胎自阿里巴巴的MetaQ,在设计上借鉴了Kafka。下面将详细介绍Window搭建部署RocketMQ的步骤。 一、安装RocketMQ 首先需要下载RocketMQ的发行版本,解压缩后得到bin、...
阿里巴巴作为拥有庞大微服务架构的企业,其电商业务涉及众多子品牌如天猫、淘宝等,依赖Dubbo RPC进行服务间的通信,MetaQ则用于异步解耦。然而,这种架构存在服务可见性无隔离、接口级服务发现等问题,以及Java技术...
MetaQ 和 RocketMQ 是阿里巴巴开源的分布式消息系统,支持分布式事务。 总之,Kafka 以其高效、可靠和分布式的特点,成为了大数据时代实时数据处理的关键组件。无论是实时监控、日志收集还是数据集成,Kafka 都能...
- **设置消费进度**:利用`setConsumeProgress`命令设置,需先关闭订阅组中的所有Consumer,设置后再重启。 - **清除特定Broker权限**:`clearBrokerPerm`命令。 - **获取Consumer消费进度**:`getConsumerProgress`...