`
iwinit
  • 浏览: 454784 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

[metaq]Consumer

阅读更多

Metaq是一个类是kafka的消息系统,开源地址https://github.com/killme2008/Metamorphosis。

基于Pull的消息系统,consumer端保持了很多逻辑,比如当前拉取消息的offset,loadbalance等,使用zookeeper作为coordination。

简单类图

核心类ZKLoadRebalanceListener,负责集群感知,当有broker退出或consumer退出时,重新balance。

FetchManager是具体的worker线程,负责从处理FetchRequest,从某一个partition拉去数据,并回掉业务方的接口实现

 

其实现流程

1.注册consumer id

/meta/consumers/meta-example/ids新建临时节点,data为topicString,就是这个consumer订阅了那些topic

2.loadBalanceListener监听/meta/consumers/meta-example/ids children consumer数目变化

3.loadBalanceListener监听/meta/brokers/topics-sub/meta-test children分区变化

4.主动balance,分区和同组consumer之间均分

 

4.1 /meta/consumers/meta-example/ids下读取自己刚才写入的data,topics列表,构造一个myConsumerPerTopicMap,topic-->consumer

4.2 /meta/brokers/topics-sub/ids下读取所有brokers信息,成为Cluster

4.3 /meta/consumers/meta-example/ids读取所有consumer

4.4 /meta/consumers/meta-example/ids读取consumer的data,topics列表,构造一个consumersPerTopicMap,topic-->consumer list

4.5 拿自己订阅的topics的所有分区,partitionsPerTopicMap,topic-->partition list

4.5.1 /meta/brokers/topics-sub/meta-test读取所有children

4.5.2 每个child代表一个broker,其data为‘{"broker":"0-m","numParts":2}’,保存broker的分区信息

4.5.3 构造一个partList代表这个topic的所有分区信息,单个分区类是‘0-0’,‘0-1’

4.6 看看自己订阅的topic里,有哪些topic的broker或者分区有变化,初始化时old为空,都不一样,返回relevantTopicConsumerIdMap,topic-->consumer

4.7 暂停Fetcher

4.8 提交offset

4.9 对于每个变化的topic,根据负载均衡策略获取这个consumer对应的partition列表

4.10 查看当前这个topic的分区列表,查看是否有变更,topicRegistry中

4.11 新分配的分区添加,/meta/consumers/meta-example/owners/meta-test下创建临时节点,名称为分区名‘0-0’,data为consumerId

4.12 拿offset信息,/meta/consumers/meta-example/offsets/meta-test/0-0的data,内容如‘251537821439885312-545’,前为msgId,后为offset

4.13 初始化时,认为offset为0

4.14 分配给自己的分区随机取master或slave的一个读,master和slave的brokerId一样

4.15 添加FetchRequest到FetchManager

4.16 给需要请求的broker创立连接

4.17 关闭之前创建的无用的连接

4.18 启动fetch线程

 

5.FetchRequestRunner启动,开始拉数据

5.1 组装GetCommand,发送,戴上offset信息,每次取1M 

5.2 返回的数据弄一个MessageIterator包装下

6 数据处理

6.1 判断FetchRequest的重试次数是否超过限制,默认5次,超过则存本地,跳过这跳消息,继续消费

6.2 decodeMessage成message对象

6.3 调用业务接口

6.4 消息处理完后,ack request,修改内存中的TopicPartitionRegInfo的offset信息,后续Timer线程会commit

6.5 重新添加请求,继续拉取数据

 

Timer线程扫描topicRegistry中的所有topic的offset内存数据,修改对应zk中的offset节点中数据,比如‘/meta/consumers/meta-example/offsets/meta-test/0-0’

 

小节

1.partition由broker指定,同一个broker可以制定多个partition

2.每个partion只能分配给同一个group下的的一个consumer

3.每个consumer可以分配多个partition,订阅多个topic

4.consumer集群变化时执行load balance,重新分配partition

5.提交offset使用异步批量提交 

  • 大小: 222.3 KB
分享到:
评论

相关推荐

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaq-server-1.4.6.2.tar.gz

    MetaQ Server的架构主要包括Producer、Consumer、Broker和Controller四个主要部分: 1. Producer:生产者负责将消息发送到MetaQ Server,通过API接口与Server进行通信。 2. Consumer:消费者从MetaQ Server订阅并...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    Client则分为Producer和Consumer,Producer负责发送消息,Consumer负责接收和消费消息。 在性能优化方面,MetaQ 1.4.6.2版本可能包括了对网络IO、多线程并行处理和内存管理等方面的改进。例如,可能会采用零拷贝...

    MetaQ 分布式消息服务中间件.pdf

    在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者主动从MetaQ拉取数据,解析成消息并进行消费。MetaQ的架构设计中,...

    阿里rocketMQ

    RocketMQ的核心组件包括Producer、Consumer、NameServer和Broker四部分: 1. **Producer**:生产者是消息的发送方,负责创建并发送消息到RocketMQ Broker。RocketMQ提供了同步和异步两种发送模式,同步模式确保...

    kafka入门介绍

    对于卡夫卡的对比,与其他消息系统相比,如RabbitMQ、Redis、ZeroMQ、ActiveMQ、MetaQ/RocketMQ等,卡夫卡具有高性能跨语言的分布式发布/订阅消息系统的特性,支持数据持久化,具有完全分布式的特性,并能够同时支持...

    阿里云ons指南

    阿里云ONS(消息队列服务)是基于阿里云提供的分布式的、高可靠性的消息服务产品,它来源于阿里内部广泛使用的消息中间件MetaQ,亦即后来的开源项目RocketMQ。ONS支持海量消息的生产与消费,以无单点故障、高可扩展...

    rocketmq-note.pdf

    RocketMQ 的发展历程可以追溯到Metaq 1.x,它是基于Kafka的Java重写版。随后,Metaq经历了2.x版本的存储优化,以适应阿里集团的大规模交易需求。2012年,RocketMQ 3.0版本发布,成为独立的产品,并在2017年成为...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...

    【系统架构】最全最强解析:支付宝钱包系统架构内部剖析(架构图).docx

    支付宝钱包系统架构内部剖析 支付宝钱包系统架构概况: 支付宝钱包系统架构是...4. 分布式环境下(broker,producer,consumer都为集群)的消息路由,对顺序和可靠性有极高要求的场景 5. 作为一般MQ来使用的其他功能

    消息中间件 rocketmq原理解析

    Consumer还可以通过Pull模式拉取消息,这为消费者提供了更大的灵活性。 #### 7. shutdown Consumer的关闭流程需要妥善处理,保证消息不丢失。 ### 三、Broker #### 1. Broker的启动 Broker是RocketMQ的核心组件,...

    RocketMQ原理简介

    最后,文档中还提供了产品发展历史的概述,从Metaq的迭代到RocketMQ的正式上线,以及其后续版本的发展。介绍了如何在不同的业务系统中深度定制RocketMQ来满足特定的需求,如淘宝、支付宝和B2B等场景下的应用。并且,...

    kafka学习文档

    数据消费者,consumer 的用法:《consumer 的用法》 迓有些零碎的,关亍通信段的源码览读:《net 包源码览读》、《broker 配置》 扩展的阅读迓有下面返些: 关亍 kafka 和 jafka 的相关博客,特删好,有徆多问题也都...

    RocketMQ的使用、原理

    1. **Push(推送)**:类似于Broker将消息推送给Consumer的方式,但实际上仍是Consumer主动从Broker拉取消息。 - **优点**:采用长轮询方式,能有效减少Broker和Consumer之间的交互频率,提高系统的整体性能。 2. *...

    Window搭建部署RocketMQ步骤详解

    RocketMQ是一个由阿里巴巴开源的消息中间件,脱胎自阿里巴巴的MetaQ,在设计上借鉴了Kafka。下面将详细介绍Window搭建部署RocketMQ的步骤。 一、安装RocketMQ 首先需要下载RocketMQ的发行版本,解压缩后得到bin、...

    Service Mesh在超大规模场景下的最佳实践.pptx

    阿里巴巴作为拥有庞大微服务架构的企业,其电商业务涉及众多子品牌如天猫、淘宝等,依赖Dubbo RPC进行服务间的通信,MetaQ则用于异步解耦。然而,这种架构存在服务可见性无隔离、接口级服务发现等问题,以及Java技术...

    kafka课堂讲义.docx

    MetaQ 和 RocketMQ 是阿里巴巴开源的分布式消息系统,支持分布式事务。 总之,Kafka 以其高效、可靠和分布式的特点,成为了大数据时代实时数据处理的关键组件。无论是实时监控、日志收集还是数据集成,Kafka 都能...

    RocketMQ运维指令

    - **设置消费进度**:利用`setConsumeProgress`命令设置,需先关闭订阅组中的所有Consumer,设置后再重启。 - **清除特定Broker权限**:`clearBrokerPerm`命令。 - **获取Consumer消费进度**:`getConsumerProgress`...

Global site tag (gtag.js) - Google Analytics