一、Kafka的基本概述
1、Kafka是什么?
kafka官网上对kafka的定义叫:A distributed publish-subscribe messaging system。publish-subscribe是发布和订阅的意思,所以准确的说kafka是一个消息订阅和发布的系统。最初,Kafka实际上是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)。
2、Kafka能做什么?
现今,Kafa主要用于处理活跃的流式数据,如分析用户的行为,包括用户的pageview(页面浏览),以便能够设计出更好的广告位,对用户搜索关键词进行统计以便分析出当前的流行趋势,比如经济学上著名的长裙理论:如果长裙的销量高了,说明经济不景气了,因为姑娘们没钱买各种丝袜了。当然还有些业务数据,如果存数据库浪费,而直接用传统的存硬盘方式效率又低下,这个时候,也可以使用Kafka的分布式进行存储。
3、Kafka中的相关概念
<!--[if !supportLists]-->· <!--[endif]-->Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic。
<!--[if !supportLists]-->· <!--[endif]-->Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,Kafka中Topic可以理解为一个存储消息的队列。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存在一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。
<!--[if !supportLists]-->· <!--[endif]-->Partition
Partition是物理上的概念,Kafka物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。如创建topic1和topic2两个topic,且分别有13个和19个Partition分区,则整个集群上相应会生成32个文件夹。为了实现扩展性,一个非常大的topic可以分布到多个broker上,但kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
<!--[if !supportLists]-->· <!--[endif]-->Producer
负责发布消息到Kafka broker。
<!--[if !supportLists]-->· <!--[endif]-->Consumer
消息消费者,向Kafka broker读取消息的客户端。
<!--[if !supportLists]-->· <!--[endif]-->Consumer Group(CG)
这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个 topic可以属于多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。每个consumer属于一个特定的Consumer Group, Kafka允许为每个consumer指定group name,若不指定group name则属于默认的group。
4、Kafka的特性:
1)数据在磁盘上存取代价为O(1),而一般数据在磁盘上是使用BTree存储的,存取代价为O(lgn)。
2)高吞吐率:即使在普通的节点(非常普通的硬件)上每秒钟也能处理成百上千的message。
3)显式分布式:即所有的producer、broker和consumer都会有多个,均匀分布并支持通过kafka服务器和消费机集群来分区消息。
4)支持数据并行加载到Hadoop中。
5) 支持Broker间的消息分区及分布式消费,同时保证每个partition内的消息顺序传输。
6)同时支持离线数据处理和实时数据处理:当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线),而Kafka通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理。
<!--[if !supportLists]-->7)<!--[endif]-->Scale out:支持在线水平扩展。
二、Kafka的架构设计
1、 最简单的Kafka部署图
如果将消息的发布(publish)称作producer,将消息的订阅(subscribe)表述为consumer,将中间的存储阵列称作broker,这样可以得到一个最简单的消息发布与订阅模型:
2、kafka的拓扑图:
Kafka是显示的分布式消息发布和订阅系统,除了有多个producer, broker,consumer外,还有一个zookeeper集群用于管理producer,broker和consumer之间的协同调用。
从图中可以看出,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
图上有个细节需要注意,Producer到Broker的过程是push,也就是有数据就推送到Broker,而Consumer到Broker的过程是pull,是通过Consumer主动去拉数据的,而不是Broker把数据主动发送到Consumer端的。
3、Zookeeper和Procuder、Broker、Consumer的协同工作
为了便于理解,假定此时Kafka集群中有两台Producer,但只有一台Kafka的Broker、Zookeeper和Consumer。如下图所示的部署集群。
1、 Kafka Broker其实就是Kafka的server,Broker主要做存储用,每个Broker启动后会在Zookeeper上注册一个临时的broker registry,包含Broker的IP地址和端口号,所存储的topics和partitions信息。
2、Zookeeper,可以把Zookeeper想象成它维持了一张表,记录了各个节点的IP、端口等配置信息。
3、Producer1,Producer2,Consumer的共同之处就是都配置了zkClient,更明确的说,就是运行前必须配置Zookeeper的地址,道理也很简单,因为他们之间的连接都是需要Zookeeper来进行分发的。
4、Kafka Broker和Zookeeper可以放在一台机器上,也可以分开放,此外Zookeeper也可以配集群,这样就不会出现单点故障。
5、 每个Consumer启动后会在Zookeeper上注册一个临时的consumer registry:包含Consumer所属的Consumer Group以及订阅的topics。每个Consumer Group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id,同时包含一个offset registry,内容为上一次订阅的offset。
整个系统运行的顺序可简单归纳为:
1、启动Zookeeper的server。
2、启动Kafka的server。
3、Producer如果生产了数据, 会先通过Zookeeper找到Broker,然后将数据存放进Broker。
4、Consumer如果要消费数据,会先通过Zookeeper找对应的Broker,然后消费。
Producer代码实例:
producer = new Producer(...); message = new Message("Hello Ebay".getBytes()); set = new MessageSet(message); producer.send("topic1", set); |
发布消息时,Producer先构造一条消息,将消息加入到消息集set中(Kafka支持批量发布,可以往消息集合中添加多条消息,然后一次性发布), send消息时,client需指定消息所属的topic。
Consumer代码实例:
streams[] = Consumer.createMessageStreams("topic1", 1); for (message : streams) { bytes = message.payload(); // do something with the bytes } |
订阅消息时,Consumer需指定topic以及partition num(每个partition对应一个逻辑日志流,如topic代表某个产品线,partition代表产品线的日志按天切分的结果),client订阅后,就可迭代读取消息,如果没有消息,client会阻塞直到有新的消息发布。Consumer可以累积确认接收到的消息,当其确认了某个offset的消息,意味着之前的消息也都已成功接收到,此时Broker会更新Zookeeper上的offset registry。
那么怎样记录每个Consumer处理的信息的状态呢?其实在Kafka中仅保存了每个Consumer已经处理数据的offset。这样有两个好处:一是保存的数据量少,二是当Consumer出错时,重新启动Consumer处理数据时,只需从最近的offset开始处理数据即可。
4、Kafka的存储策略
Kafka broker主要是用于做存储使用,每个broker上可存储很多的topic信息。
1、Kafka以topic来进行消息管理,每个topic包含多个partition,一个partition对应一个逻辑log,由多个segment组成。
2、每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
3、每个partition在内存中对应一个index,记录每个segment中的第一条消息偏移。
4、发布者发到某个topic的消息会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
5、Kafka的delivery guarantee
在Kafka上,有两个原因可能导致低效:太多的网络请求和过多的字节拷贝。为了提高网络利用率,Kafka把message分成一组一组的,每次请求会把一组message发给相应的consumer。 此外,为了减少字节拷贝,采用sendfile系统调用, sendfile比传统的利用socket发送文件进行拷贝高效得多。
Kafka支持有这几种delivery guarantee:
<!--[if !supportLists]-->· <!--[endif]-->At most once 消息可能会丢,但绝不会重复传输
<!--[if !supportLists]-->· <!--[endif]-->At least one 消息绝不会丢,但可能会重复传输
<!--[if !supportLists]-->· <!--[endif]-->Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
6、Kafka多数据中心的数据流拓扑结构
实际设计中,有时由于安全性问题并不想让一个单个的Kafka集群系统跨越多个数据中心(数据中心之间可以进行通信),而是想让Kafka支持多数据中心的数据流拓扑结构。这可通过在集群之间进行镜像或“同步”实现。这个功能非常简单,镜像集群只是作为源集群的数据使用者(Consumer)的角色运行。这意味着,一个单个的集群就能够将来自多个数据中心的数据集中到一个位置。下面所示是可用于支持批量装载(batch loads)的多数据中心拓扑结构的一个例子:
请注意,在图中上方的两个集群之间不存在通信连接,两者可能大小不同,具有不同数量的节点,而下面部分中的这个单个的集群可以镜像任意数量的源集群。
从以上可以的分析可以看出:Kafka的主要设计理念就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可,因为分属不同Consumer Group的Consumer都将收到Producer发布到Kafka broker的信息。
三、Kafka的High Availability机制
1、概述:
Kafka在0.8版本以后,开始提供High Availability机制。在此之前一旦一个或多个Broker宕机,则宕机期间其上所有Partition的数据将不可被消费,Producer也不能再讲数据存储于这些Partition中。若该Broker永远不能再恢复,或者产生磁盘故障,则其上数据将丢失。无论Producer使用同步模式还是异步模式发送数据,都会造成整个系统的可用性降低,因为如果Producer使用同步模式,则Producer会在尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,用户可以选择停止发送后续数据或者选择继续选择发送。者、选择继续发送会造成数据的阻塞,选择停止发送会造成本应发往该Broker的数据的丢失;而如果Producer使用异步模式,Producer会尝试重新发送message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题,更为悲惨的是当前Kafka的Producer并未对异步模式提供callback接口。
Kafka的HA机制主要是通过Data Replication和Leader Election来保证的,Data Replication指的是每一个Partition都可能会有1个或者多个Replica,其中有一个Replica会被推选为Leader节点,其余落选的为Follower节点,其中Leader将会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。
2、Data Replication
2.1、Kafka分配Replica的算法:
为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的 Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
Kafka分配Replica的算法如下(假设总共有n个broker):
- 将所有Broker和待分配的Partition排序
- 将第i个Partition分配到第(i mod n)个Broker上
- 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
2.2、Kafka传播(Propagate)消息与ACK的过程
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader,Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送 ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW(高水位:表示最近一次提交消息的偏移位置)并且向 Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被 Consumer消费。但考虑到这种场景非常少见,可以认为这种方式是在性能和数据持久化上做了一个比较好的平衡。Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。
2.3、Partition Follower和Leader间的ISR复制机制
和大部分分布式系统一样,Kafka处理失败需要明确定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现);二是Follower必须能够及时将 Leader的消息复制过来,不能“落后太多”。Leader会跟踪与其保持同步的Replica列表,即上面说到的ISR,如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages
配置,其默认值是4000)或者Follower超过一定时间(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms
来配置,其默认值是10000)未向Leader发送fetch请求。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率,而异步复制方式下,Follower异步的从 Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从 Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
需要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的所有 Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks
来设置,这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。
2.4、Follower从Leader Fetch数据
Follower通过向Leader发送FetchRequest获取消息,FetchRequest结构如下
从FetchRequest的结构可以看出,每个Fetch请求都要指定最大等待时间和最小获取字节数,以及由TopicAndPartition 和PartitionFetchInfo构成的Map。实际上,Follower从Leader Fetch数据和Consumer从Broker Fetch数据,都是通过FetchRequest请求完成,所以在FetchRequest结构中,其中一个字段是clientID,并且其默认值是 ConsumerConfig.DefaultClientId。Leader收到Fetch请求后,Kafka通过KafkaApis.handleFetchRequest响应该请求,响应过程如下:
- replicaManager根据请求读出数据存入dataRead中。
- 如果该请求来自Follower则更新其相应的LEO(log end offset)以及相应Partition的High Watermark
- 根据dataRead算出可读消息长度(单位为字节)并存入bytesReadable中。
- 满足下面4个条件中的1个,则立即将相应的数据返回
- Fetch请求不希望等待,即fetchRequest.macWait <= 0
- Fetch请求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo为空
- 有足够的数据可供返回,即bytesReadable >= fetchRequest.minBytes
- 读取数据时发生异常
- 若不满足以上4个条件,FetchRequest将不会立即返回,并将该请求封装成DelayedFetch。检查该DeplayedFetch是否满足,若满足则返回请求,否则将该请求加入Watch列表
Leader通过以FetchResponse的形式将消息返回给Follower,FetchResponse结构如下
管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions
上,而该操作会触发ReassignedPartitionsIsrChangeListener,从而通过执行回调函数KafkaController.onPartitionReassignment来完成Partition的重新分配,主要过程是将ZooKeeper中的AR(Current Assigned Replicas)更新为OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
2.6、Replication工具
1、Topic Tool
在$KAFKA_HOME/bin/kafka-topics.sh
,该工具可用于创建、删除、修改、查看某个Topic的相关配置信息,也可用于列出所有Topic。
2、Replica Verification Tool
在$KAFKA_HOME/bin/kafka-replica-verification.sh
,该工具用来验证所指定的一个或多个Topic下每个Partition对应的所有Replica是否都同步。可通过topic-white-list
这一参数指定所需要验证的所有Topic,支持正则表达式。
3、Kafka Reassign Partitions Tool
该工具的设计目标与后面即将提到的Preferred Replica Leader Election Tool有些类似,都旨在促进Kafka集群的负载均衡。不同的是,Preferred Replica Leader Election只能在Partition的AR范围内调整其Leader,使Leader分布均匀,而该工具还可以调整Partition的AR,Follower需要从Leader Fetch数据以保持与Leader同步,所以仅仅保持Leader分布的平衡对整个集群的负载均衡来说是不够的。另外,生产环境下,随着负载的增大,可能需要给Kafka集群扩容,向Kafka集群中增加Broker非常简单方便,但是对于已有的Topic,并不会自动将其Partition迁移到新加入的Broker上,此时可用该工具达到此目的。
某些场景下,实际负载可能远小于最初预期负载,此时可用该工具将分布在整个集群上的Partition重装分配到某些机器上,然后可以停止不需要的Broker从而实现节约资源的目的。需要说明的是,该工具不仅可以调整Partition的AR位置,还可调整其AR数量,即改变该Topic的replication factor。该工具有三种使用模式:generate模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行);execute模式,根据指定的reassign plan重新分配Partition;verify模式,验证重新分配。
Kafka Replication的数据流如下图所示:
3、Leader Election
3.1、概述
引入Replication之后,同一个Partition可能会有多个Replica,这时实际上需要从这些Replica中推选出一个Leader,如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率,而引入Leader后,只有Leader负责数据读写(即Leader负责与Producer和Consumer交互),而Follower负责向Leader顺序Fetch数据(N条通路),这样的设计使得系统更加简单且高效。
3.2、Kafka借助Zookeeper和ISR选择Leader
Kafka中,一个简单直观的选择Leader的方案是所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower,但由于Follower可能落后Leader许多或者crash了,所以必须确保选择“最新”的Follower作为新的Leader。一个基本的原则就是,如果Leader不在了,新的Leader必须拥有原来的Leader commit过的所有消息。这就需要作一个折衷,如果Leader在标明一条消息被commit前等待更多的Follower确认,那在它宕机之后就有更多的Follower可以作为新的Leader,但这也会造成吞吐率的下降。
一种非常常用的选举leader的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个Replica(包含Leader和Follower),那在commit之前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。因为在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几个Broker,而非最慢那个(即只要最快的n+1个有数据了,就能选出一个Leader来)。但Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍2个follower挂掉,必须要有5个以上的Replica,如果要容忍3个Follower挂掉,必须要有7个以上的Replica。换句话说,在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种方式。
Kafka0.8以后增加了一个Controller概念,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定,controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。前面提到Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka官方认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。
3.3、Preferred Replica Leader Election Tool
有了Replication机制后,每个Partition可能有多个备份。某个Partition的Replica列表叫作 AR(Assigned Replicas),AR中的第一个Replica即为“Preferred Replica“。创建一个新的Topic或者给已有Topic增加Partition时,Kafka保证Preferred Replica被均匀分布到集群中的所有Broker上。理想情况下,Preferred Replica会被选为Leader。以上两点保证了所有Partition的Leader被均匀分布到了集群当中,这一点非常重要,因为所有的读写操作都由Leader完成,若Leader分布过于集中,会造成集群负载不均衡。但是,随着集群的运行,该平衡可能会因为Broker的宕机而被打破,该工具就是用来帮助恢复Leader分配的平衡。
事实上,每个Topic从失败中恢复过来后,它默认会被设置为Follower角色,除非某个Partition的Replica全部宕机,而当前 Broker是该Partition的AR中第一个恢复回来的Replica。因此,某个Partition的Leader(Preferred Replica)宕机并恢复后,它很可能不再是该Partition的Leader,但仍然是Preferred Replica。
其用法为:
$KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181
在包含8个Broker的Kafka集群上,创建1个名为topic1,replication-factor为3,Partition数为8的Topic,使用如下命令查看其Partition/Replica分布。
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181
查询结果如下图所示,从图中可以看到,Kafka将所有Replica均匀分布到了整个集群,并且Leader也均匀分布。
手动停止部分Broker,topic1的Partition/Replica分布如下图所示。从图中可以看到,由于Broker 1/2/4都被停止,Partition 0的Leader由原来的1变为3,Partition 1的Leader由原来的2变为5,Partition 2的Leader由原来的3变为6,Partition 3的Leader由原来的4变为7。
再重新启动ID为1的Broker,topic1的Partition/Replica分布如下。可以看到,虽然Broker 1已经启动(Partition 0和Partition5的ISR中有1),但是1并不是任何一个Parititon的Leader,而Broker 5/6/7都是2个Partition的Leader,即Leader的分布不均衡——一个Broker最多是2个Partition的Leader,而 最少是0个Partition的Leader。
运行该工具后,topic1的Partition/Replica分布如下图所示。由图可见,除了Partition 1和Partition 3由于Broker 2和Broker 4还未启动,所以其Leader不是其Preferred Repliac外,其它所有Partition的Leader都是其Preferred Replica。同时,与运行该工具前相比,Leader的分配更均匀——一个Broker最多是2个Parittion的Leader,最少是1个 Partition的Leader。
WS:所以Partition的Replica其实就是指某个Broker的id。
启动Broker 2和Broker 4,Leader分布与上一步相比并未变化,如下图所示。
再次运行该工具,所有Partition的Leader都由其Preferred Replica承担,Leader分布更均匀——每个Broker承担1个Partition的Leader角色。
除了手动运行该工具使Leader分配均匀外,Kafka还提供了自动平衡Leader分配的功能,该功能可通过将auto.leader.rebalance.enable
设置为true开启,它将周期性检查Leader分配是否平衡,若不平衡度超过一定阈值则自动由Controller尝试将各Partition的Leader设置为其Preferred Replica。检查周期由leader.imbalance.check.interval.seconds
指定,不平衡度阈值由leader.imbalance.per.broker.percentage
指定。
相关推荐
高可用(High Availability, HA)架构设计通常涉及以下几个关键知识点: 1. **负载均衡**:通过分配网络流量到多个服务器,避免单一节点过载,提高服务的响应速度和整体系统的可用性。常见的负载均衡器有Nginx和...
九、故障恢复与高可用性(Fault Tolerance and High Availability) Kafka通过副本和选举机制实现快速故障恢复。同时,Kafka Cluster可以通过增加brokers数量提高可用性,减少单点故障的影响。 总结,Kafka的数据...
大型分布式电商系统架构的构建和演进是一个复杂且系统性的过程,它涉及到多个技术领域和设计原则。在从0开始构建这样一个系统时,我们需要考虑的关键点包括:高可用性(High Availability)、可扩展性(Scalability...
另外,分布式服务框架(如Dubbo、Spring Cloud)提供了服务注册与发现、熔断机制(如Hystrix)、重试策略等,确保服务在部分故障时仍能提供基本功能。 3. **分布式** 分布式系统是由多个独立计算机节点通过网络...
Flume系统设计了High Availability(HA)机制,通过两个服务器同时作为NameNode节点来解决单点故障问题,确保了在节点故障时系统依然能够正常运行。 Kafka是一种分布式流处理平台。它的核心是kafka集群,由多个kafka ...
高可用性(High Availability, HA)和高性能(High Performance, HP)是架构设计的关键目标。为了实现这些,架构师可能需要考虑负载均衡、故障转移、冗余系统和性能调优等策略。例如,使用Nginx进行负载均衡,通过...
总结,MySQL的MHA、Redis的哨兵集群以及Kafka的副本机制都是为了提供高可用性,它们在设计上各有优缺点,如MHA的成熟度、Redis的灵活持久化和Kafka的分布式特性。在实际应用中,应根据业务需求和资源限制选择适合的...
在IT行业中,构建高可用(High Availability,HA)系统架构是一项至关重要的任务,尤其是在Java开发领域。高可用性意味着系统能够在出现硬件故障、网络中断或其他异常情况时,仍能持续提供服务,确保业务的连续性。...
包括MHA(Master High Availability)、CDS(Clustered Database System)、HDB(Highly Distributed Base)、R2M(Read to Master)、DBRep(Database Replication)和HCenter等多个组件,这些架构设计旨在提高系统...
4. **高可用性实现**:淘宝在DAL层面实现了多种HA(High Availability)策略。其中一种是在出现故障时仅提供读服务,这种方式能够在保证系统基本可用性的前提下,避免因写操作可能导致的数据不一致问题。 5. **DAL...
4. 高可用与容错性:通过Hadoop的HA(High Availability)和故障切换机制,确保系统的稳定运行。 四、系统设计挑战与应对策略 1. 数据质量:确保数据的准确性和完整性是关键,需要建立有效的数据清洗和验证机制。 ...
常见的架构模式包括单体架构、微服务架构、分布式架构等,每种都有其适用场景和优缺点。 3. **高可用性(High Availability, HA)**:确保系统在硬件或软件故障时仍能提供服务。这通常通过冗余和负载均衡来实现,例如...
- Hadoop HA(High Availability)是指在Hadoop集群中实现高可用性的机制,主要是通过设置多个活跃的NameNode来实现。 - **上课用图** - 通常会涉及到Hadoop HA的架构图,包括Active NameNode、Standby NameNode、...
首先,高可用性(High Availability, HA)主要通过冗余和故障转移来实现。冗余是指在系统设计中引入备份组件,当主组件出现问题时,备份组件可以无缝接管工作。这包括硬件冗余、软件冗余和服务冗余。例如,使用负载...
接着,高可用性(High Availability,HA)架构是确保系统即使在部分故障的情况下也能正常运行的关键设计。通常通过冗余和负载均衡来实现。例如,使用多台服务器构成的集群,当一台服务器出现故障时,其他服务器可以...
高可用性是通过ZKFC(Zookeeper-based Keeper for High Availability)实现的,采用主从热备模式。 - 为了解决NameNode的扩展性问题,字节跳动采用了联邦机制(Federation),部署多组NameNode,每组独立维护元数据...
- **HA机制**:为了提高Hadoop集群的可用性,引入了HA(High Availability)机制。该机制通过配置两个NameNode节点(一个Active,一个Standby)来实现热备份,当Active NameNode发生故障时,Standby NameNode可以...
5. **AHAS (Application High Availability Service)**:应用高可用服务,提供流量控制、熔断和降级等功能,以增强应用的容错性和弹性。 6. **ACM (Application Configuration Management)**:应用配置管理服务,...
在Java编程中,这种概念同样存在,特别是在分布式服务和高可用性(High Availability, HA)系统设计中。 Java作为一种广泛应用于企业级开发的语言,提供了多种实现主从切换的框架和工具,如Zookeeper、Apache Kafka、...