登录zookeeper
zkCli.sh -server centos1:2181
创建topics mytopic
kafka-topics.sh --create --zookeeper centos1:2181,centos2:2181,centos3:2181 --replication-factor 3 --partitions 2 --topic mytopic
zookeeper节点结构
/controller data={"version":1,"brokerid":2,"timestamp":"1495002238024"} //id=2的broker是leader
/controller_epoch data=1
/brokers
/brokers/ids //实时维护active的brokers
/brokers/ids/0
/brokers/ids/1
/brokers/ids/2
/brokers/topics
/brokers/topics/mytopic/partitions/0/state data={"controller_epoch":7,"leader":1,"version":1,"leader_epoch":0,"isr":[1,0,2]} //其中leader指的是该partition的leader,每个partition都有一个leader。"isr":[1,0,2]表示该partition有三个replication,分别位于1,0,2三个broker上。leader维护了其它副本的同步信息。
/brokers/topics/mytopic/partitions/1/state data={"controller_epoch":7,"leader":2,"version":1,"leader_epoch":0,"isr":[2,1,0]}
/brokers/seqid
/admin/delete_topics
/isr_change_notification
/consumers/
/consumers/console-consumer-24372 data=
/consumers/console-consumer-24372/ids data=
/consumers/console-consumer-24372/ids/console-consumer-24372_centos1-1495075711403-999aec1a data={"version":1,"subscription":{"mytopic":1},"pattern":"white_list","timestamp":"1495075711460"}
/consumers/console-consumer-24372/owners data=null
/consumers/console-consumer-24372/owners/mytopic data=null
/consumers/console-consumer-24372/owners/mytopic/0 data=console-consumer-24372_centos1-1495075711403-999aec1a-0
/consumers/console-consumer-24372/owners/mytopic/1 data=console-consumer-24372_centos1-1495075711403-999aec1a-0
/consumers/console-consumer-24372/offsets data=null
/consumers/console-consumer-24372/offsets/mytopic data=null
/consumers/console-consumer-24372/offsets/mytopic/0 data=153
/consumers/console-consumer-24372/offsets/mytopic/1 data=582 //console-consumer-24372 是某个控制台consumer的group name,582是该consumer目前消费的mytopic中分区1中的消息的偏移量。可以直接在zookeeper中修改这个值,从而让该consumer从这个值(偏移量)开始读消息。
/config
/config/changes
/config/clients
/config/topics
kafka目录结构
./.lock
./meta.properties
./cleaner-offset-checkpoint
./replication-offset-checkpoint
./recovery-point-offset-checkpoint
./mytopic-0 //命名方式:topic+分区ID
./mytopic-0/00000000000000000000.index
./mytopic-0/00000000000000000000.timeindex
./mytopic-0/00000000000000000000.log //存放消息的地方
./mytopic-1
./mytopic-1/00000000000000000000.index
./mytopic-1/00000000000000000000.timeindex
./mytopic-1/00000000000000000000.log //存放消息的地方
Kafka 副本机制:
1. 每个分区存放n个副本,可承受n-1个节点失效。
2. 这n个副本中有一个是leader,它同时维护者所有副本的同步状态。
3. 如果leader失效,会通知producer,然后producer将消息重新发送给新的leader。
4. 选择新leader的方法是:所有follower在zookeeper中注册自己,最先注册的是leader,其它是follower。
5. Kafka支持的副本机制有:
同步机制: producer从zookeeper中找到leader,向leader发送消息,消息写入leader本地log。follower从leader中pull消息,每个follower将消息写入本地log,向leader发送确认回执。leader收到follower的确认回执后再想producer发送确认回执。 在consumer端,所有的消息是从leader中pull的。
异步机制:与同步机制不同的是一旦leader向log写入message完成就会向producer发送确认回执。所以这种机制不保证向失效的follower写入成功。
consumer group 与 partition
1. 监听同一个topic的多个consumer,可以属于一个group。同属一个group的多个consumer不会重复接收消息。如果要重复接收所有消息需要配不同的group。
2. 假设partition的数量是m, 同属一个group的consumer数量是n:
a. m=n, 平均为每个consumer分配一个partition
b. m>n, 每个consumer都能分配一个partition,有些consumer会分配到多个partition
c. m<n, 只有m个consumer都能分配一个partition,n-m个consumer接收不到消息。此时如果开启新的consumer,某个旧的consumer将读不到消息。
consumer的数量可以随时调整,不会漏掉消息。
命令:bin/kafka-console-consumer.sh --bootstrap-server centos1:9092 --topic mytopic --consumer-property group.id=group1
3. 可以指定consumer只接收某个partition的消息
命令:bin/kafka-console-consumer.sh --bootstrap-server centos1:9092 --topic mytopic --consumer-property group.id=group1 --partition 0
4.下面的zookeeper节点信息是被group共享的(新版Kafka可能没有把offset存到zookeeper):
/consumers/mygroup/offsets/mytopic/0 data=153
/consumers/mygroup/offsets/mytopic/1 data=582
153、582记录的是mygroup在mytopic中的分区0和分区1分别读取到的偏移量
参考:http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/
相关推荐
- **启动Zookeeper集群**:在每个节点上,配置`zoo.cfg`文件,设置dataDir指向存储快照和事务日志的目录,server.x配置项表示节点ID(x为1,2,3...),并指定其他节点的IP和端口。例如: ``` dataDir=/path/to/...
在Kafka中,Zookeeper被用来进行集群管理、选举主节点、存储元数据信息以及协调客户端和服务器之间的通信。具体来说,Zookeeper存储了Kafka的Broker信息、Topic的分区信息和副本分配等,确保了Kafka的高可用性和一致...
6. 启动Kafka broker:在每个节点上启动Kafka服务,集群中的每个broker都会注册到Zookeeper,并与其他broker保持通信。 7. 配置生产者和消费者:编写像`RequestServiceServlet.java`这样的程序,作为Kafka的生产者...
本教程将详述如何在CentOS7操作系统中搭建Kafka、Zookeeper和Redis的集群环境,以实现高效、稳定的分布式数据处理和存储。 **Zookeeper简介** Apache ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,...
首先,我们需要了解Zookeeper在Kafka集群中的角色。Zookeeper作为Kafka的元数据存储中心,负责管理Kafka的分区(Partition)和副本(Replica)的分布,以及集群节点的状态信息。在高可用集群中,Zookeeper自身也需要...
- **命令行检查**:在`bin`目录下执行`./zkServer.sh status`,可以看到Zookeeper节点的运行状态,包括是否正在运行以及其当前的角色等信息。 - **端口监听**:使用`lsof -i:2181`命令可以检查Zookeeper服务端口...
在分布式系统中,Kafka和Zookeeper是两个非常重要的组件,它们经常被一起使用来构建高效、可扩展的消息传递和协调平台。Kafka是一个高吞吐量、低延迟的分布式消息队列,而Zookeeper则是一个分布式服务协调框架,提供...
在本文中,我们将深入探讨如何在Kubernetes集群中部署单机版的Apache Kafka和ZooKeeper,这两个组件是大数据处理和消息传递领域的关键组件。Kubernetes(简称K8s)作为一个自动化容器管理系统,使得在云环境中部署、...
在实际应用中,Kafka常与Zookeeper配合使用,Zookeeper负责管理Kafka的元数据,如主题、分区和副本的分布信息,确保Kafka集群的稳定运行。同时,Zookeeper也可以用于其他分布式应用的协调,如Hadoop、Spark等。 总...
在IT行业中,Kafka和ZooKeeper是两个非常重要的组件,尤其在大数据处理和分布式系统中扮演着核心角色。ZooKeeper是由Apache开发的一个开源分布式协调服务,而Kafka是一款高吞吐量的分布式消息系统。当我们谈论"Kafka...
每个Zookeeper节点都需要一个配置文件`zoo.cfg`,主要包括以下关键设置: - `dataDir`: 指定存储Zookeeper数据的目录,包括事务日志和快照。 - `clientPort`: 客户端连接Zookeeper的端口号,默认为2181。 - `server...
在本文中,我们将深入探讨如何在Windows环境下安装和配置Apache ZooKeeper和Kafka。Apache ZooKeeper是一个分布式的、开放源代码的服务,用于管理配置信息、命名服务、集群同步以及分布式应用程序协调。而Apache ...
在Kafka中,Zookeeper扮演了关键角色,用于管理Kafka的元数据,例如主题分区信息、集群配置以及选举控制器等。 **Kafka的核心概念** 1. **生产者(Producer)**: Kafka中的数据生产者负责生成消息并将其发送到Kafka...
在Spring与Zookeeper的整合中,我们可以利用Zookeeper的节点树结构来存储和管理配置信息,实现服务的注册与发现。Spring Cloud Zookeeper就是这样一个框架,它提供了Spring Boot应用与Zookeeper的集成,使得应用能够...
在Kafka中,Zookeeper主要扮演了元数据存储和集群协调的角色,例如,它存储了Kafka的broker列表、topic的分区信息以及消费者的位置。 **Canal** 是阿里巴巴开源的一个数据库增量日志提取工具,它能够实时捕获MySQL...
这个Docker Compose 文件定义了一个包含Zookeeper和三个Kafka节点的服务集群。通过指定镜像、端口映射、环境变量和依赖关系等配置,实现了Zookeeper和Kafka的快速部署和集成。同时,在定义了一个名为"mynetwork"的...
- **Zookeeper的角色**:作为分布式协调服务,Zookeeper在Kafka中主要负责元数据管理、集群配置同步、选举主节点等任务。 - **会话和 watches**:Zookeeper提供短暂的会话和持久化的watch机制,当节点状态发生变化...
Java、Zookeeper和Kafka是大数据处理领域中的关键组件,它们在日志处理和存储方面发挥着重要作用。这里我们将深入探讨这些技术如何协同工作,以及如何将日志数据存储到MySQL数据库。 首先,Java作为广泛使用的编程...
Zookeeper在Kafka中的主要作用有: 1. 集群管理:存储Kafka集群的节点信息。 2. 分区分配:决定哪个生产者向哪个broker发送消息,哪个消费者消费哪些分区。 3. 领导者选举:在节点故障时进行新的领导者选举。 4. ...