原文地址
http://kafka.apache.org/documentation.html
第四章
4.4 生产者
负载均衡
生产者直接把数据发给对应分区的主代理, 为了做到这点,所有的节点都能相应生产者
关于那个服务在线以及那个话题分区的领导者是谁的请求, 并且允许生产者直接占用这
个请求。
客户端控制着把数据发送到那个分区, 可以通过随机的方式也可以通过指定的方式来控制
发送那个分区,以此来实现负载均衡。设计者暴露了一个接口,使用着可以实现这个接口
来把请求散列到不同的节点上。当然也以为着消费者对他们的消费可以指定他们消费的数
据。
异步发送
批量处理是提高效率的重要途径,为了做到这点,卡夫卡还有异步模式。在这个模式下,
在内存中计算内容的大小然后在一次请求中发送出来。批量可以配置为多少个信息或多
长时间一次,这样可以在不是那么大读写的情况下发送更多的数据。这些缓存是发生在
客户端,这样减少了在内存中而没有发送出去的的消息由于生产者崩溃而丢失的可能性。
注意,0.8.1中没有异步生产者的回调方法,来处理一些发送的异常。在下个版本0.9中
会增加这个部分的内容。
4.5 消费者
卡夫卡的消费者通过发送 查询请求到 他想要的分区的主代理上来实现。消费者在每次的
查询请求中指定偏移量,然后会得到从这个偏移量开始的一块数据。消费者有偏移量的控制
权,以便能够倒回去重新查询一些需要的数据
推和拉
在刚开始我们考虑一个问题,是消费者中代理上拉数据还是代理将数据推给消费者。在这点
上,卡夫卡和其他的消息系统一下,由生产者将数据推给代理,然后消费者从代理上拉数据
一些日志中心系统, 如scribe 和 flume 采用的是完全不同的向下推数据的方式, 两种
方式都有优缺点。基于推数据的方式对于如何对于代理如何控制对于不同消费者的数据比例
有一定的困难。这种设计是让消费者能消费任何可能的数据,但是在基于推送的消息系统则
意味者生产者生产多少就需要消费多少(拒绝式服务攻击的精髓)。基于拉的系统使消费者能
够轻松应对这些,消费者能够最大限度的消费那些数据。上面这些内容决定了我们是用的是
更为传统的拉模式。
拉模式的另外一个好处是, 能够分批处理数据。基于推送的系统是一次性发送数据或者积累
一段时间以后发送, 而不考虑客户端是否能够及时处理。如果是低延迟的,那就以为者直接
发送给客户端没有任何缓存,这是很浪费资源的。基于拉的系统涉及能够让消费者能够从某个
偏移量后面取到可哟合格的数据。即得到了理想的批量处理,又减少了不必要的延迟。
简单的拉模型的一个缺点是, 当代理没有数据时,消费者会空转,繁忙地等待者数据的到来。
为了避免这种情况,在拉的时候可以设置参数,允许消费者停止请求直到有足够的数据近来。
可以想想一下只有拉没有推的模型。 生产者将数据写入到本地的日志中,代理从生产者者拉
数据,消费者从代理那里拉数据。一个相似的类型是 “存储和转发”生产者是推荐使用的。这
个是挺吸引人的, 但是我们感觉他不太适合上千个生产者的情况。大量数据持久化的经验告
诉我们不同的引用操作大量的磁盘会很不稳定而且操作起来会是一场噩梦。实际操作中我们发
现拥有大规模强的SLAs的管道就不需要生产者持久化了。
消费者偏移量
偏移量是记录有多少数据已经被消费,是消息系统中一个关键的指标。
大部分的消息系统都把偏移量等元数据存放在代理中。当一个消息被处理,代理立即在本地标
记或者等待消费者的处理结果。这种是相对直接的做法对于一个服务器, 但是对于其他服务就
不太明显是个明智的选择。当数据结果存在大规模的服务集群中,每个代理知道那些被消费了
就可以立即删除, 保存的数据为最小的。这种方式也是很实用的。
也许还有其他不是很明显的解决方法,对于代理和消费者,偏移量也许不是一个重要的问题。
如果代理每次立即记录被发送给消费者的信息,假如此时消费者正好崩溃或由于其他原因没有
处理这个消息,那这个消息就丢掉了。为了解决这个问题,许多消息系统都通过标记消息被发送
而没有被处理,当消费者处理完成以后这个消息,就回调给代理。代理再把这个消息标记为已
消费。这个能解决刚才丢消息,但是又引来了新的问题。首先,如果确认处理没有及时返回消息
可能会被处理两次;第二个是关于性能的问题,代理需要将每个消息成功标记两次(需要加锁)。
像这种发送但没有处理的消息的棘手的问题必须得解决掉。
卡夫卡用完全不同的思路来解决这个问题。话题被分为有序的几个分区,每个分区一次只能有
一个消费者。这就意味着每个分区的偏移量就是一个整数,就是下个需要消费的消息的位移。
这样消息被消费的状态就每场精简,对于每个分区只是一个数字。这个状态会定期检查,这样
对伊消息消费确认非常简单。
这样还有一个好处,是消费者能够倒回去重新消费以前的数据。这样违反了队列的一般规则,但是
对于多个消费者确是一个非常大的有点。例如,如果消费者处理有错误,就可以从错误开始的地方
从新处理这些消息。
线下数据加载
可扩展的持久化系统支持通过消费者将数据批量导入到线下的系统,如hadoop或者其他的关系型
数据库。
对于hadoop来说,可以通过不同的任务将数据并行的加载,如果一个任务对应一个节点的一个分区
则可以全并行的加载数据。hadoop提供任务管理,如果任务失败,可以简单从原来的位置从新开始
而不必担心数据会重复。
4.6 消息的发送机制
我们现在了解一下生产者和消费者是如何工作的,探讨一下卡夫卡的一些保障机制。显然有一些可能
的保障机制:消息可能丢但绝不会重发, 消息从不丢但可能重发,消息只发一次。
最糟糕的两个事情是: 保证发送消息和保证消费消息。
许多系统声称提供保证只此一次的发送机制,但是最好读写他们的文件, 他们的大部分都是在误导。
(在下列情况下他们不做处理:生产者或者消费者失败了,有多生产者的情况,写到磁盘的数据丢失)
卡夫卡的机制是简单的。当发送消息的时候,有一个概念是消息将被提交到日志系统。一但消息发送
并且提交到消息系统,消息不会丢掉一直到其他的代理备份这个分区时这个消息被标记为“alive”。
"alive"的含义会在下个章节解释。现在我们假设一个非常完美的场景,没有任何影响的代理,生产
者和消费者之间的保证。如果生产者准备发布一个消息,正好遇到一个网络异常。 但是不能确定这个
消息是否被提交到了日志系统,就想往数据库插入了一条自动生成key的记录。
这个不是最有可能的情况对于生产者来说。 尽管我们不能确定网络错误发生了什么,但我们可以让
生产者产生一个让生产者发送请求一致保持不变的key。这个特性是非常重要的对于副本集的系统,
而且他必须在服务端失败的时候有起作用。有了这个特性,可以让生产者一直尝试发送直到收到代理
返回成功提交的确认信息,可以保证消息只被提交一次。我们希望在以后的版本中添加这点。
不是所有的系统都有如此高的要求。对于延迟要求较高的,我们允许调节生产者的稳定性。如果生产者
指定需要等待消费者,则需要额外负担10ms。当然可以指定异步发送, 或者等到主代理返回已经提交
消息的结果。
下面看一下消费者是如何运转的。所有的副本集都有一样的位移的日志文件,消费者控制着自己的偏移
量。如果消费者永远不会宕机,则可以把偏移量保存到内容中。但是如果真的宕机了我们希望新的消费
者能够从正确的位置开始处理消息。消费者读取数据, 对于处理数据和更新位置有几个选吸纳个。
1,读数据,保存偏移量,最后处理数据。如果保存偏移量后消费者崩溃了,没有处理数据。而新的消费
者则从偏移量开始处理数据,则那些没有处理的数据就会丢掉了。
2,读数据,处理数据,最后保存偏移量。如果处理数据后消费者崩溃了,没有保存偏移量。而新的消费
者则从偏移量开始处理数据,则会造成数据的重复处理。在很多情况下,消息需要有一个唯一的key来应
对更新是原子的。
3, 什么是最正确的机制呢?这里的一些限制不仅仅是消息系统的问题, 而且需要消费者合作来保证偏移
量和实际处理的一致。经典的解决方案是分两个阶段提交,保证消费者得到和数据的结果是一致的。当然
有一种更简单的解决方法,就是让偏移量和数据都有消费者来保存。好处是不需要两次提交了,不是所有的
消费者都支持两次提交。例如hadoop将文件内容写入到hdfs中,需要保证偏移量和保存的内容一致。对于
要求比较严格的系统,我们也遵循这一点。消息没有唯一的key,允许备份。
实时上,卡夫卡默认最少一次的传送, 并且允许用户禁止用户多次尝试,保存偏移量在处理消息数据之前。
只有一次的传送需要存储系统的合作, 但是卡夫卡提供了偏移量这种更为直接的方法。
4.7 备份
卡夫卡会备份每个话题的每个分区,主要依据是提供节点的数据(可以设置备份的因子)。这样的容错保证了
当一些节点宕机不会影响消息数据的使用。
其他的消息系统也有备份的机制,但在我看来,这个特性是放在后面的,没有被重用。而且还有其他的副
作用: 副的代理是不活跃的,严重影响吞吐量,需要复杂的配置。卡夫卡是默认使用副本集的,如果你不
需要副本集可以将备份的因子设置为1。
需要备份的是每个话题的每个分区。在没有错误的情况下,一个分区只有个主,有0个或多个副的。副本集
的个数有包括主节点在内的节点组成。所有的读写都在主节点进行。显然,有很比代理更多的分区,主代理
分布在左右的代理中。所有的副节点和主节点都有相同的位移,所有的消息的顺序都是一样的(当然,会有
在特定条件下副节点没有将主节点的所有数据拷贝下来)
副节点更新消息到本地的日志系统,采用的方式和其他的消费者是同样的方式。副消费者处理消息的一个优
点是能够批量处理需要同步的日志信息。
和其他的容错式分布式系统一样, kafak也严格定义了一个节点“alive”, 有两个条件:
1,节点必须和zookeeper保持会话(通过zookeeper的心跳机制)
2,如果是副节点,必须复制主节点的数据,不能落后主节点太多
我们称满足这两个条件的节点为"in sync", 这样不会混淆“alive”和“failed”两个概念。主节点有一个
“in sync”的节点的集合。如果某个“in sync”的几点崩溃、没有反应、或者落后, 主节点会将这个节点
从“in sync”中移除。但是多少算是落后呢, 这个是由 replica.lag.max.messages 和 replica.lag.time.max.ms
两个参数控制。
在分布式的系统中,节点突然停止工作,然后回复工作,这种“失败/回复”的模式会得到处理的。但是卡夫卡
不会处理这种野蛮生产或者恶意的相应等“拜占庭的”式的异常。
一个消息被提交,以为着某个分区的所有“in sync”节点都已经把这个消息写入到本地的日志中。只有提交
的消息才会提供给消费者,这就以为者消费者不用担心,主节点处理失败的消息是否会丢失。另一方面,生
产者可以选择否等待所有消息都提交, 这个可以通过一个参数来控制。这个参数是在发送请求中的required.acks
来设置的。
这就保证了提交的消息不会丢失,哪怕只有一个"in sync"的节点存在。卡夫卡在某个节点失败后仍然可以
使用, 但是网络断开后就不一定。
日志备份:法定人数, ISRs 和 集群
日志备份是卡夫卡分区的核心。日志备份是大部分分布式数据系统的基础组成部分,当然也有很多不同的实习
方式。一个日志备份可以被其他的分布式系统来使用,这种模式成为集群。
一个日志备份主要是处理一些共有的数据序列(主要是一些0,1,2的数字条目)。实现这个有很多方式,最简单
和最快的是让主节点选择数据序列的顺序。只要主节点"alive",副节点只要拷贝主节点置顶的顺序就可以了。
当然如果主节点不会宕机就不需要副节点了。当主节点宕机,我们需要从副节点中选择一个来做主节点。但是
副节点可能落后于主节点,或者崩溃了, 要选择一个离主节点最近的副节点。最基础的算法保证,如果告诉
客户端一个消息提交了, 那么当主节点宕掉,新选举出来的主节点必须有这个消息的数据。这样产生一种情
况:如果主节点等到所有副节点确认消息的提交, 然后宣布消息提交。这个过程会更多可选成为主节点的节点。
如果你选择一个确认的数目,以及和选举的主节点比有重叠不分的日志数,这个就叫做法定人数。一个常见的
做法是选择对 提交 和 主节点进行投票, 多数胜出。卡夫卡不是这样做的。假设有 2f+1个副本集, 如果有
f+1个副本集超过leader声称的commit记录, 如果我们选择新的leader, 就从这f+1个副本集进行投票选举。
如果没有超过f个副本集有失败, 那么主节点肯定会有所有的提交数据。是因为对于这个f+1个副本集, 肯定
至少有一个是有最全的提交数据。这个副本集将会是最全的,因为会被选择为新的主节点。每个算法都有一些
细节需要处理(如怎么样精确定义多超过完成,保证日志的一致性或者改变副本集的集合), 我们会在以后详细
说的。
投票的一个主要规则是:延迟,延迟是取决于最快的节点。以为着,如果副本集的因子是 3, 延迟取决于最快
节点, 而不是最慢的。
关于分布式系统有非常多的算法,包括zookeeper的zab, raft以及 Viewstamped Replication。卡夫卡使用
的是来自于微软的 PacificA算法。
多数选举的负面是它不会在没有选举主节点的时候给你带来大量的异常。为了容忍一个失败,需要三份的数据
拷贝,为了容忍两个失败需要五份的数据拷贝。但是我们发现在实际的应用中,仅仅为了容错是不够的, 将每个
数据写5次, 需要写内容5倍的存储空间,以及只有五分之一的吞推量,这点对于大的数据系统是不合适的。如
hadf的name节点的高可用是构建在majority-vote-based journal, 但是这种昂贵的代价并没有用在数据存储
上面。
卡夫卡对于又有点不同来选取自己的quorum集合。多数投票,卡夫卡动态维护者记得的 “in-sync” (ISR)的副本集
, 就是那些能跟上主节点的副本集,这里面的副本集才有可能被选举为主节点,对于写操作,只有当这个集合
里面的所有副本集都确认写数据才算提交完成了。这个 ISR 集合是由zookeeper来维护。这个是很重要的一项内容
对于卡夫卡来说,通过这个保证负载均衡。对于这个ISR集合,有f+1个副本集,可以容忍f个失败的节点,而又不
丢失数据。对于大多数我们想要处理的情况,这种处理方式是合理的。但实际上,为了容忍f个几点失败, 需要
等相等数目的副本集确认,才能认为数据已经提交(如一个失败,对于主节点,需要三个副本集,一个确认已经提交
, 对于ISR 需要两个副本集,一个确认收到)。这样就不用等到最慢的机器返回就可以提交数据了。但是我们认为
可以让客户端来选择是否需要等待数据提交的返回, 以及影响吞吐量以及硬盘占用量的副本集因子。
另外一个不同的重要设计是,卡夫卡不要求崩溃的节点能够完好的回复数据。这在分布式系统中很常见,因为数据
是依赖于不以为异常回复而存在的完好的数据,而这点也没有破坏数据的一致性。有两个最常见的问题,一个是硬盘
的异常,经常也不会造成数据的不完整。即使这样,我们也不要求在写文件时使用同步,因为这回将性能降低两个
到三个数量级。我们的原则是,当一个副本集在加入到整个集群之前,必须同步所有需要同步的信息,即使这些信息
因为没有及时写入硬盘而丢失掉。
kafka保证只要有一个副本集在同步数据,这样意味者可以允许其他节点丢失数据。如果一个分区的所有副本集节点
都挂掉了,就不能保证数据不会丢失了。实时上,当这样的事情发生了,需要一个合理的操作。当你不幸遇到这种情况
需要考虑接下来会发生什么。首先, 会等待一个节点回复,然后让他作为主节点(希望数据是完整的);或者选择第一个
返回的节点做为主节点。
这个是在可用性和一致性之间做选择。如果我们等待节点回复,则只要副本集当掉,就不可用。如果一个副本集当掉或
者他们的数据丢失,正好一个未同步的副本集复活,如果此时我们允许他成为主节点,这样他就成了所有信息的来源,
但是这个节点却不包含所有提交的信息。目前的版本我们采用的是第二种方法,就是当所有的节点都死掉的时候选择
可用性大于一致性的方案。在以后的版本中,我们可能会通过配置的方法让使用者来选择那种方法。
这个困境不仅仅存在与kafka,他存在所有基于选举的系统中。如在一个多数选举的系统中,如果多数的机器都失败了
(就是已经保存数据的 n/2+1), 你是选择丢失100%的数据,还是选择剩下没有及时保存数据的那些节点来继续服务
上面的讨论是只有一种日志log,就是一个话题的一个分区,但是kafka管理着上百个或上千个这样的分区。我们将所有
的分区通过循环的方式均匀的分布在所有节点上面,避免某些热的分区散列在一小部分节点上面。同样的我们也将主节
点散列在不同的副本集上面。同样需要优化,在某些节点不可用时对于主节点选举的流程。如果选举结束时,选举出来
的主节点,正好失败了。作为代替,我们选举一个节点作为控制器,用它来检测所有节点的失败,而且对节点失败的
时候,重新选择收失败节点影响的分区的主节点。结果是我们可以批量的处理这些选举,对于大量的分区,这样就能
更快更有效的处理选举。如果控制器失败了,其中的一个代理会成为新的控制器。
日志压缩
日志压缩保证了kafka能够保留住每个分区的最后一条记录,这个地址用来恢复系统崩溃或者在崩溃后的重新加载。让
我们进入这样的场景看看是如何工作的。到目前为止,我们描述了当数据保存了一段时间或者数据的大小到达一定的大小
时的保存策略。这些对于一条记录的单独处理会很好,但是这对于重要的可变的数据是很有意义的。例如这样一个场景,
有一个保存每个人email弟子的,每次用户都会更新他们email地址,通过发送message给这个topic,然后根据每个人
的用户id来更新。如第一次,123的email地址是 bill@microsoft.com, 第二次是 bill@gatesfoundation.org, 第
三次是bill@gmail.com。日志的压缩能够保证对于每个key只保存最后一条记录(bill@gmail.com)。这样我们能够保证
对于每个key我们只保存最后一条记录,而不是记录每次的更改。这就意味着消费者需要保存自己关于这个topic的状态
而不是由我们来保存整个的改变记录。下面三种情况说明了如何起作用:
1,数据库更改订阅。很多情况下需要一个可修改的数据库,通常是某种关系型数据库或者key-value数据库。当数据库的
更改生效时,需要通知到各种不同的系统,如缓存、查询集群、hadoop集群。这种情况下,处理实时更新需要最近的日志
但是生效cache则只需要数据集。
2,事件驱动。这个是系统涉及共同查询处理,用到一系列的更改作为应用最基础的存储。
3,高可用的日志。一些的本地计算可以通过打印本地系统更改日志,可以使其他系统能够重新加载这些更改,从而达到
一定的容错性。一些例如查询总数,平均数,以及group-by类似的操作,等流查询系统。
在每一种情况都需要实时处理每一种改变,但是如果突然的宕机的话,需要重新加载或者重新处理。日志压缩可以让这些
从上次失败的地址开始重新处理。通用的做法是,如果有无穷尽的日志记录,会记录从第一次开始以后的每次更改。用这种
方式可以恢复到任意节点,通过从开始一个的n条记录。这种完美的假设对于频繁更新一条记录的应用来说并不实用,因为
日志会无限制的增长,会导致老的更新会占据空间,而实时的最新的更新却没有更新。如果这时从日志的起点开始从新加载
则不会复制到最新的更新数据。
日志的压缩是给一个基于每条记录的细粒度的保存,而不是基于时间的粗粒度的保存。这种方法是选择删除每个key以前的
记录,来保证每个key只有最后一个状态的保存记录。这种保存策略可以是每个topic的,所以一个集群可以有些topic是基于
时间或者大小,另外一些是通过压缩保存的。这种有创意的想法被linkedin一个最老却最陈宫的项目-数据更改日志缓存Databus
来实习那个。不像其他的日志结构存储系统,kafka是构建在订阅以及线性读写的基础上的。也不像Databus,kafka像是真理
存储器,在上传的数据源不是说不是重复使用的。
日志压缩的基本原理
日志尾,日志头。日志头是传统意义的kafka日志。日志头是密集的,有线性的偏移量以及包含所有的信息。日志的压缩可以
处理日志尾,上面的图像展示了压缩后的尾部。注意尾部保存了第一次写时的初始化分配的偏移量,这个是不会改变的。同样
注意的是所有的偏移量都是合法的,及时这些偏移量的消息已经被压缩;这些偏移量和接下来其他的显示在日志中的偏移量是
一样的。对于上述图片(38是DeleteRetentionPoint),36、37、38等这些相等的偏移量,查询后的偏移量都是从38开始返回
的数据集合。压缩也对删除的数据起作用,如果一个key对应的消息是null的,将会被删除对待,这样这个key以前的所有的记
录都会被删除。就是说需要额外的空间来存储数据的时候,才会清楚掉这些有删除标记的日志信息。这个标记就是上
面的38 Delete Retention Point。压缩的操作是周期性通过拷贝原来的日志段来实现的。清理操作不会阻塞read 操作的,
也可以通过调节I/O的吞吐量来避免影响到读和写操作。
日志压缩有那些保证呢
1,任何消费者只要能够拿到日志记录的头,就能够看到所有写入的消息;这些消息有线性的偏移量
2,消息的顺序型是能保证的,压缩不会影响的顺序,只是移除一些多余的消息
3,一个消息的偏移量是永远不会改变的,这个在日志记录中是持久话的
4,任何从0开始的读进程,只要能够读到最后一个写入日志的状态。所有标记删除的需要删除数据,只要没有满足删除的策略的
情况下都能够查询到,策略包括时间策略和日志大小策略(即日志超过一定的时间或日志的文件超过一定的大小,只要有一个条件
满足,就会删除数据清理空间)。这在删除操作恰好发生在读操作的过程中非常重要,就是说如果读操作正在读这些数据,则是不
会删除这些需要删除的数据。
日志压缩的细节
日志的压缩是通过日志清理来做到的(就是清理那些有相同key的记录),通过拷贝日志文件,删除那些在头文件中出现的日志消息。
每个压缩的日志有一下几个工作:
1,选择头比尾最大的那个日志文件
2,创建一个头不所有消息的简洁记录(不是头部的所有拷贝)
3,然后拷贝从头开始拷贝那些记录,删除那些在以后出现的有相同key文件的消息。所以新的日志文件会立即附加到原来的日志
文件后面,这样就额外需要一些硬盘空间来存储这些临时的日志文件
4,日志的头部就是一个hash表的空间压缩。对于一条记录用到24个bytes,这样8GB的清理缓存可以清理366GB的日志头(一条消息1K)
日志压缩的一些限制
1,不能配置有多少日志不会被压缩(日志头部), 当前所有不被写入的日志都是有资格的。
2,只是单个分区的压缩,不能跨越话题
第五章
.1 API设计
Producer APIs
Consumer APIs
5.2 Network Layer
网络层使用的简单的NIO服务,在此不会大篇幅的来描述。发送文件通过MessageSet中的writeTo接口
方法来实现,这样允许基于文件的消息系统能够用transferTo来实现而不是通过写缓存的操作来实现。
线程的模式是一个接收线程,有能够处理链接请求的N个处理线程。这种设计已经在其他地方中被证明
是简单和高效的,这种也保证以后客户端可以用其他语言实现。
5.3 消息体
消息体是有一个固定长度的头部,一个可变长度byte数组。头部是有版本号和检测异常的CRC32校验码。
消息的载体是不透明的,这样做的原因有:现在有很多序列化的工具,但是任何一种好像都不是对所有
的场景都很合适。对于使用kafka的不同使用者,可以更具场景来使用不同的序列化工具。MessageSet
是一个简单的消息体的迭代程序,可以整块的读数据以及能够写入到NIO通道。
5.4 消息提的具体格式
/**
* A message. The format of an N byte message is the following:
*
* If magic byte is 0
*
* 1. 1 byte "magic" identifier to allow format changes
*
* 2. 4 byte CRC32 of the payload
*
* 3. N - 5 byte payload
*
* If magic byte is 1
*
* 1. 1 byte "magic" identifier to allow format changes
*
* 2. 1 byte "attributes" identifier to allow annotations on the message independent
* of the version (e.g. compression enabled, type of codec used)
*
* 3. 4 byte CRC32 of the payload
*
* 4. N - 6 byte payload
*
*/
5.5 消息日志
对于一个名字为 my_topic ,并且有两个分区组成的话题的消息日志, 是由两个文件组成(my_topic_0 my_topic_1),
这两个文件存储的是这个话题的消息。消息日志的格式是由一系列的 log entries 组成; 每个 log entry 有4个字节
的消息的长度N, 以及紧跟的N个byte的消息。每个消息是一个64-位的整数值来唯一的标识,这个整数值是来表示这个
消息距离开始位置的偏移量,包括所有已经发送到这个话题的这个分区的消息。硬盘上的消息格式下面会给出。每个日志
文件是以这个文件包含的第一个消息的偏移量来明明的,所以第一个消息日志将会是 00000000000.kafka, 每个日志文件
将会比前面文件多出S个byte, S 是在配置文件中配置日志文件的最大值。
On-disk format of a message
message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte
crc : 4 bytes
payload : n bytes
具体的消息的二进制格式是有版本的并且是一个标准的接口,这样才能够在 producer、broker和client传输,而不用重新
拷贝和转换。
消息的偏移量使用是与众不同的,我们的最初的目的是通过 producer 来产生一个GUID,在每个broker上面保存一个GUID和
偏移量的关联。但是 consumer 必须保存一个id, 这样原来的全局唯一的GUID就不起作用了。另外对于GUID和偏移量的映射
的保存是很负载的,尤其是对于硬盘的同步需要设计一个很重的数据结构,而且需要连续的随机访问这个数据结构。然而我们
简单来看,找一个数据结构,用简单的分区计数,用分区id和节点id来组合起来,这样就能唯一的识别一个消息。这个看起来
更简单的数据结构,对于一个 consumer 的多次查询也更简单。而且一旦我们使用计数,跳过直接使用偏移量就很自然了,对于
所有的分区都是单挑递增的。这样偏移量并没有在 consumer API 中体现,我们需要直接的更高效的处理。
日志允许线性的在文件后面追加。这个文件一直被追加日志直到达到配置文件的大小。日志文件有两个配置,一个是M,就是有
M个写的消息时就强制系统将消息刷写到磁盘空间,一个是S,就是每隔S秒就把写的消息刷写到磁盘空间。这样就会有丢掉M个
消息或者S秒的消息会被丢失如果系统崩溃。
日志的读是通过给一个64位的消息偏移量以及S-byte 的数据块。这样会返回一个包含消息迭代器和S-byte的buffer。S必须比
单个消息大,但是对于单个异常大的消息体,可以尝试多次读,每次尝试2*S大小的数据,直到能够读取成功。一个消息的大小
可以设定,这样服务可以拒绝超过这个大小的消息,这样也能给到客户端取到一个完整的消息需要的最大值。好像读到的数据
有可能是半个消息, 这个是通过检测数据大小来划定界限。但是在实际的读操作的过程中,需要一个偏移量,就是第一个消息
的存储地址,计算日志文件的整体的偏移量,然后从这个文件的偏移量开始读。查询操作是通过二分查找,对于每个文件的偏移
量都是在内存中存储的。
日志还提供一个读最近写入文件的消息的订阅模式。如果consumer没有在SLA-specified 指定的天数内读取消息,这种情况下
consumer 尝试读取一个不存在的偏移量, 会抛出一个 OutOfRangeException。这种情况下既不能回退的,也不能得到以前的
那种常见的异常。 下面是发送给 consumer 的信息的格式:
MessageSetSend (fetch result)
total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
MultiMessageSetSend (multiFetch result)
total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n
日志的数据会定期的删除。日志的管理对于删除策略是可以配置的。当前的删除策略是删除那些N天没有更改的文件,尽管这个
文件还有N GB的空间可以用。为了不阻塞读的操作,可以通过 copy-on-write 的模式来实现,这样对于二分查找来说可以
保持一致性,就是在删除操作进行的时候会有一个当前日志的一个快照。
日志提供一个参数M,控制最多会有多少个消息写入但没有立即刷入到磁盘。每次启动的时候,会检测所有的消息日志中最后
一条消息的 message entry 是否有效,一个消息的大小以及偏移量的和必须小于文件的大小,而且他的校验码和存在消息中
的CRC校验码必须一致。在出现异常的情况下,消息日志会恢复到最后一个有效的消息处。
需要注意的是两种异常的情况必须处理:未写入的数据在崩溃情况下丢失,没有意义数据的写入。原因是大部分的操作系统无法
保证数据内容写入文件句柄的顺序,这样就有可能会丢失已经写入的数据或者写入一些没有意义的数据(文件句柄的数据大小已经
更新,而内容没有写入就发生了异常)。CRC校验能够检测到这种情况,能够阻止这些情况下的污染消息日志。
5.6 分布式
zookeeper 目录
下面给出了 zookeeper的目录结构,以及保证消费者和代理人之间协作的算法。
代理人节点的注册
/brokers/ids/[0...N] --> host:port (ephemeral node) 临时节点
这个是所有表现出来的代理人节点,每个提供了一个唯一的编码,用于消费者来识别(在配置文件中这个数字编码必须配置)。一旦
启动,代理人节点就在zookeeper中注册一个节点,这个节点的地址是 /broker/ids/n,其中n是这个节点唯一编码。这个唯一节点
的另外一个用处是可以使代理人转移到另外一个物理机上面,而不用担心是否会影响到消费者。一个尝试注册一个已经存在的唯一
编码的代理人将会出错(应为两个代理人配置了同样的编码)。如果代理人把自己注册到zookeeper虚拟节点,这种注册是动态的,如果
代理人宕机的话,这个临时节点会消失的(通知消费者这个代理人不再可用了)。
代理人话题注册
/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
每个代理人注册一些这个话题下他包含的分区
消费者和消费组
话题的消费者也会把他们自己注册到zookeeper中,这样就能均衡的消费数据,也能跟踪到每个代理人的每个分区的消费的偏移量。对于同
一个话题多个消费者可以组成一个组,这个组中的所有消费者拥有相同的group_id。如,在foobar的处理程序中,是在三个实体机上运行,
你可以把他们标记为 “foobar” 的消费组, 这样就能告诉消费者它属于那个组。一个组中的消费者是公平的分给不同的分区,每个分区只有
一个消费者(在同一个消费组中)
消费者Id注册
group_id对于一个组中的消费者是共享的, 每个消费者分配一个临时的唯一的消费者id(是机器名称uuid)。消费者注册的格式如下:
/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)
每个消费者注册在自己的分组小面,而且有一个消费者id。这个文件内容是 <topic, #streams> 的映射集合。这个id是用来标识当前分组
有那些消费者是活跃的。这个是临时节点,消费者死掉时会消失的。
消费者偏移量跟踪
消费者保存者他关于每个分区的消费的最大偏移量的值, 这个也是在zookeeper中存储。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value (persistent node)
分区主节点注册
一个分区会被一个消费组中唯一的消费者消费,消费者必须建立和这个分区的主节点建立关系,才能够进行消费。为了保存这种关系,
消费者会把这个id写入到某个代理人的某个分区下面的临时节点
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
代理人节点
代理人节点是相互独立的,这些节点只是发布他们拥有的信息。当有新的节点加入时,新节点会在broker的目录下注册新的节点,
节点的内容包括机器地址和对应的端口。代理人同样注册了他包含的话题,以及话题里面注册的分区。新话题是通过代理人动态
注册的。
消费者注册算法
当一个消费者开始,他会做一下事情:
1,将自己注册在某个组下面
2,监听消费者id的变化(有新的消费者加入或者已经存在的消费者退出,每次的更改都会触发一个组内消费者的归属)
3,监听代理人id的变化(有新的代理人加入或者已经存在的代理人推出,每次的更改都会触发消费者的重新平衡)
4,如果消费者创建一个话题过滤器,也会注册一个对话题变更的监听(新话题的加入,每个更改都会触发那些话题对这个话题过滤
器可用, 一个符合条件的新的topic会触发一个消费组内消费者的平衡)
5,强制要求自己在消费组内的平衡
消费者平衡的算法
消费者平衡算法能够使一个消费组内的消费者,消费那些分区达成共识。消费者再平衡是通过新加或者删除代理人或者消费者来触发的。
对于一个给定的话题的消费组,代理人的分区是被分配到这个组内的所有的消费者。一个分区通常是被一个消费者来消费的。这种设计
很容易实现,尽管我们允许一个分区同时被几个消费者使用,这种情况下要注意,而且会用到某种形式的锁。如果一个分区有多个消费
者,可能会有一些消费者不能拿到任何数据。再平衡的过程中,给每个分区非配消费者时,尽量减少每个分区的消费者使用。
Each consumer does the following during rebalancing:
1. For each topic T that Ci subscribes to
2. let PT be all partitions producing topic T
3. let CG be all consumers in the same group as Ci that consume topic T
4. sort PT (so partitions on the same broker are clustered together)
5. sort CG
6. let i be the index position of Ci in CG and let N = size(PT)/size(CG)
7. assign partitions from i*N to (i+1)*N - 1 to consumer Ci
8. remove current entries owned by Ci from the partition owner registry
9. add newly assigned partitions to the partition owner registry
(we may need to re-try this until the original partition owner releases its ownership)
第六章
6.1 基础的操作
删除和创建topics
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
--partitions 20 --replication-factor 3 --config x=y
你可以选择自己创建话题或者有系统自动建立(当你发送第一个消息时),如果是自动创建的,你需要修改
默认的配置。上面是创建话题的操作命令。
上面的副本集因子是说对于一个消息会有多少个服务来存储。如果你原来的副本集因子是3, 然后改成2会
失败的,因为这样会造成数据无法访问。我们推荐将副本集的因子设置为2或者3,这样就能在不影响消费者
的情况下添加或者去掉某些服务。
分区数控制了一个话题的所有消息将被分成多少份。这个数字有两个影响,第一是一个分区必须在一个服务上面,
这样就决定了你至多可以使用多少个服务(不包含副本);第二是这个数也影响到有多少个消费者并行的消费。
修改topics
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
--partitions 40
需要注意的是分区只是将消息数据分为不同的分区,增加分区不会改变已经存在数据的分区信息,这样可能妨碍
到正在读某个分区数据的消费者。如果数据是根据 hash(key) % number_of_partitions 来分区的,这样分区
有在新增分区的情况下有可能重新分配,但是kafka不会自动重新分配这些数据。
Kafka does not currently support reducing the number of partitions for a topic or changing the replication factor.
当前的版本不支持减少话题的分区数以及修改副本集因子。
优雅的关闭
kafka 会自动检测那些代理人关闭了或者失败了,让后会自动选举一个主节点对于那个机器上的分区。不管是因为
服务宕掉或者应为配置修改的重新启动,都会发生这样事情。对于最新版本的kafka支持一种优雅的关闭方式就是
直接kill掉。当被优雅的关闭时有两个优点:
1,他会把所有的消息日志同步到硬盘上面,避免了重启以后日志的恢复。日志恢复占用一定的时间,这样就能加速
重启的时间。
2,如果某个分区的主节点先宕掉,这样会导致主节点的迁移,这样也能够是这个转移更快一些,减少了每个分区不能
提供服务的时间为几毫秒。
日志的同步会一直进行,不管是服务停止或者被强制杀掉。但是对于主节点的迁移需要用特殊的配置,如下:
controlled.shutdown.enable=true
需要注意的是控制关闭成功起作用的前提是,每个分区至少有一个副本集,并且这个副本集还的是正常服务。这就意味
着如果最后一个副本集的节点宕掉的话服务就会停止了。
主节点的再平衡
如果一个节点宕掉的话,如果这个节点正好是主节点,则以为着主节点要转移到其他的副本集上面。这就意味者默认
新的代理人节点重启的话,它只能是他所拥有分区的副节点,也就意味这他不会被读和写。为了避免这种的不平衡,kafka
提供了命令来从新选择主节点。
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
当然也可以通过配置让kafka来自动做这些事情,配置如下:
auto.leader.rebalance.enable=true
监控节点间的数据
kafka 希望有一个在节点间的监控程序,来避免一个副本集的混乱。这个监控检测节点之间的数据, 从一个或这个多个
节点上读数据,最终写入到目标节点,结构图如下。这种情况的用途之一提供了向其他数据中心提供了副本集,下面的章节
会对这种情况做详细的介绍。你可以运行很多这种监控进程来提高吞吐量和容错型。数据从一个话题读出来,然后写入另一个
节点中的某个话题。事实上,这个监控就是扮演了kafka消费者和生产者两个角色。
监控的数据的的来源和最终写入的是完全不同的服务,可能会有不同的分区,对于偏移量也可能是不一样的。从这点上讲,并
不是一个容错的机制(消费者的偏移量是不一样的),从这点我们推荐普通的集群复制。这种镜像的复制会使用相同的key, 因为
数据都是在key的基础上进行分区的。下面的例子是从两个输入源来做一个镜像(到一个topic: my-topic):
> bin/kafka-run-class.sh kafka.tools.MirrorMaker
--consumer.config consumer-1.properties --consumer.config consumer-2.properties
--producer.config producer.properties --whitelist my-topic
注意 whitelist 参数, 这个可以支持JAVA的正则表达式。如果是A和B两个话题,则为 -whitelist 'A|B'。如果是所有的话题
则写为 --whitelist '*',只要引号内的正则表达式,对于shell脚本不会是作为文件解析就行。为了方便,允许使用','代替'|'
来表示一个list列表。有时你是不希望某些topic来过滤到这个镜像里面, 也可以使用 blacklist 来过滤,这个也支持正则表达式。
镜像的组合可以通过 auto.create.topics.enable=true 来实现,这样可以自动备份一个集群的镜像,即使有新的话题创建也会
有记录的。
检测消费者的消费的位置
看一下消费者消费的位置距离最新的消息还有多远
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
Group Topic Pid Offset logSize Lag Owner
my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0
my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0
扩展你的集群
想kafka增加机器是很容易的, 只要非配给他一个唯一的代理人id,然后启动就可以了。然而这个新添加的代理人不会自动的被分配
任何话题分区,只要没有新的分区移动到他上面或者知道有新的话题创建了,这时这个新的代理人才会工作。所以一般情况下,你新增
了机器节点到你的集群,你就像移动一些存在原来机器上的数据到这个新的节点上面来。
数据的迁移是手动启动,然后自动完成的。下面的章节描述了新加的节点成为跟随者后,如何迁移数据并且成功完成对现有数据的复制。
如果新加的节点完成了复制以后,然后会加入 in-sync 的副本集,这时原来的副本集上的数据就会被删除。
分区数据的移动是通过分区重新分配的工具来实现的,分区的信息以及消息数据的分布等信息,是在所有的代理节点间共享的。在0.8.1
的版本中,分区重新分配工具没有自动学写分区信息以及转移分区信息来实现消息数据的分布式存储的能力。就这点来说,管理员需要
指出那个话题的那个分区需要移动。
分区重新分配工具有下面三种独立的模式:
-generate: 这种模式下,给出一个话题的列表和一个代理人的列表,这个工具通过一个选举的分配方式,将指定话题的所有分区分配到
新的代理人上面。这种只是提供了一个将一些话题的分区分配到一些代理人节点的简单方便的方法。
-execute: 这种模式,需要提供一个从新分配的文件才能开始执行重新分配(用 -reassignment-json-file 参数)。也就是说可以通过
这种由admin指定的方式或者通过-generate方式来执行。
-verify: 这种模式,是用来检测上面 execute 模式的执行结果。可以是 completed, failed or in progress 三种模式。
自动迁移数据到新的机器
分区重新分配工具能够将当前代理人集群上面的一些话题转移到新添加的代理节点上。这种情况下很容易将整个话题迁移到一个新的代理人
集群上面,而不是一次只转移一个分区。这样做的时候需要提供需要移动的话题,以及需要转移到的代理人的集合。这个工具还能将自动
将话题的分区分配到新的代理人节点上面。这样的转移期间, 副本因子是保持不变的。而且这种从旧代理人集合移动到新代理人集合是很
高效。
下面的例子是将 foo1 foo2 的所有分区由 1,2 的代理人集群转移到 5,6代理人集群。移动结束后, foo1 foo2 的所有分区都只会存在
5,6 两个代理上面。工具接受json格式的文件作为参数,首先就需要你确认那些话题是需要转移的。下面是如何操作的
> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
如果json文件准备好了,用工具产生如何分配
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
这个工具提供了一个可共参考的重新分配的方案,将foo1 foo2 的话题转移到5 6 的节点上面。然而这个时候,
分区的移动并没有开始, 这个只是告诉我当前的分区方案以及将要修改成的方案。当前的分区方案需要备份起来,
以免能够回滚回来。新的分配需要保存到一个json文件里面,这个文件会作为-execute命令的参数。具体如下:
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
最后, 是-verify 的命令来检测重新分配的结果。注意上面的expand-cluster-reassignment.json 参数
还需要用到。 具体如下:
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully
自定义分区以及迁移
分区从新分配工具也能够选择性的将某些指定的分区移动到一些代理人节点上面。如果用这种模式,你首先需要
知道当前的分区结果,不需要用工具来产生可选的分配方案,也就是跳过第一步,直接运行 -execute 命令。
下面的例子是如何将 foo1 的 0 分区和 foo2 的 1 分区转移到代理人节点 2,3上面。
第一步是将分配的方案写入json文件
> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
然后运行-execute命令开始重新分配的过程
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
然后运行 -verify 命令检查分区重新分配的结果。具体命令如下:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
停止使用代理节点
重新分区工具不会自动产生一种将某个代理停止使用的分配方案。正是因为如此,管理员需要制定出将推管在将要停止使用的代理节点上
的数据重新分配到其他的不需要停止使用的代理节点上面。这个需要这个工具能够保证所有的副本集不会从停止使用的节点转移到其他的
代理节点上面,这个工作相对繁琐。为了更容易做到这点,计划在0.8.2的版本中实现停止使用代理。
增加副本集因子
对于一个存在的分区,增加副本集因子是很简单的。需要在json文件中指定额外的副本集信息,然后运行 -excute 命令就可以了。
下面的例子是将foo的 0 分区从1个修改为3个。在提高副本集因子之前,需要知道这个分区只存在代理5上面。我们需要把这个副本集
增加到代理6和7上面。
首先,需要手动制作一个json文件,如下:
> cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
然后将json文件作为参数运行 -execute 命令,如下:
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
最后用 -verify 命令来检测执行结果,具体如下:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
当然也可以用查看话题的命令来检测执行结果,如下:
> bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
相关推荐
Apache Kafka是一个分布式流处理平台,它具备三个...综合来看,这份Kafka官方中文文档提供了一套全面的知识体系,详细介绍了Kafka的架构、使用、配置、操作和安全等各方面知识,是理解和掌握Kafka技术的重要参考文献。
文档开始部分介绍了文档的版权信息,表明文档是Spring for Apache Kafka 2.1.9版本的官方文档,由Gary Russel、Artem Bilan和Biju Kunjummen撰写,由Pivotal Software Inc.出版。文档的使用条件允许个人和免费分发给...
### Kafka中文文档核心知识点 #### 一、Kafka概述 **Kafka** 是一款由 **Apache** 开发的开源流处理平台,它提供了一个统一、高吞吐量、低延迟的发布/订阅消息系统,特别适合处理实时数据流。Kafka 的设计灵感来源...
Kafka是一个非常复杂且功能丰富的系统,上述只是对Kafka中文文档内容的一小部分概述。在实际使用过程中,需要根据实际场景进行详细的学习和配置,才能充分发挥Kafka在消息队列和事件流处理中的巨大优势。
Apache Kafka 官方文档中文版
标签:11、apache、kafka、kafka_2、jar包、java、API文档、中文版; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明...
总的来说,Kafka文档中所涵盖的知识点非常丰富,既包括了Kafka的设计原理和架构细节,也包括了API使用和配置指导,以及大量的实际应用场景和案例分析。这些知识点为使用和管理Kafka集群提供了全面的理论基础和技术...
对于任何想要开始使用kafka-python的开发者来说,文档都是一个不可或缺的资源,无论是新手还是有经验的开发者,都可以通过阅读官方文档来获得帮助。 请注意文档的最后,提到了master分支可能包含未发布的新特性,...
《Kafka系列文档——官方帮助文档中文翻译版》是一份详尽解读Apache Kafka的重要资源,旨在帮助中文用户更好地理解和应用这一分布式流处理平台。Kafka是LinkedIn开发并贡献给Apache软件基金会的一个开源项目,它被...
Apache Kafka是一个分布式流媒体处理平台,它被设计用于构建实时数据管道和流式应用程序。在Kafka中,Producer(生产者)负责将数据流发送到Kafka集群中。生产者的配置参数非常关键,因为它们会直接影响到生产者的...
### Apache Kafka 概述 #### 1.1 Introduction Apache Kafka 是一款强大的分布式流处理平台。它具备以下三个核心功能: - **发布和订阅记录流**:与传统的消息队列或企业消息系统相似,Kafka 支持消息的发布与订阅...
Apache Kafka是一个分布式的流处理平台,主要用来处理实时数据流。它具有高吞吐量、可扩展性和耐用性等特点,常用于构建实时数据管道和流应用程序。Kafka中的Topic是一个记录消息的类别或者名称,是消息传递模型的...
标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请...
标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请...