- 浏览: 565214 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (267)
- 随笔 (4)
- Spring (13)
- Java (61)
- HTTP (3)
- Windows (1)
- CI(Continuous Integration) (3)
- Dozer (1)
- Apache (11)
- DB (7)
- Architecture (41)
- Design Patterns (11)
- Test (5)
- Agile (1)
- ORM (3)
- PMP (2)
- ESB (2)
- Maven (5)
- IDE (1)
- Camel (1)
- Webservice (3)
- MySQL (6)
- CentOS (14)
- Linux (19)
- BI (3)
- RPC (2)
- Cluster (9)
- NoSQL (7)
- Oracle (25)
- Loadbalance (7)
- Web (5)
- tomcat (1)
- freemarker (1)
- 制造 (0)
最新评论
-
panamera:
如果设置了连接需要密码,Dynamic Broker-Clus ...
ActiveMQ 集群配置 -
panamera:
请问你的最后一种模式Broker-C节点是不是应该也要修改持久 ...
ActiveMQ 集群配置 -
maosheng:
longshao_feng 写道楼主使用 文件共享 模式的ma ...
ActiveMQ 集群配置 -
longshao_feng:
楼主使用 文件共享 模式的master-slave,produ ...
ActiveMQ 集群配置 -
tanglanwen:
感触很深,必定谨记!
少走弯路的十条忠告
消息队列应用场景:异步处理、应用解耦、流量削锋、日志处理、消息通讯
架构:
kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的运行依赖于ZooKeeper,Producer推送消息给kafka,Consumer从kafka拉消息。
ZooKeeper 是一个分布式的、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现。分布式应用程序可以基于它实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。
1.broker 在 ZooKeeper 中的注册:
为了记录 broker 的注册信息,在 ZooKeeper 上,专门创建了属于 Kafka 的一个节点,其路径为 /brokers;
Kafka 的每个 broker 启动时,都会到 ZooKeeper 中进行注册,告诉 ZooKeeper 其broker.id,在整个集群中,broker.id 应该全局唯一,并在 ZooKeeper 上创建其属于自己的节点,其节点路径为 /brokers/ids/{broker.id};
创建完节点后,Kafka 会将该 broker 的 broker.name 及端口号记录到该节点;
另外,该 broker 节点属性为临时节点,当 broker 会话失效时,ZooKeeper 会删除该节点,这样,我们就可以很方便的监控到broker 节点的变化,及时调整负载均衡等。
2.Topic 在 ZooKeeper 中的注册:
在 Kafka 中,所有 topic 与 broker 的对应关系都由 ZooKeeper 进行维护,在 ZooKeeper 中,建立专门的节点来记录这些信息,其节点路径为 /brokers/topics/{topic_name}。
3. consumer 在 ZooKeeper 中的注册:
a)注册新的消费者分组
当新的消费者组注册到 ZooKeeper 中时,ZooKeeper 会创建专用的节点来保存相关信息,其节点路径为 ls/consumers/{group_id},其节点下有三个子节点,分别为 [ids, owners, offsets]。
ids 节点:记录该消费组中当前正在消费的消费者;
owners 节点:记录该消费组消费的 topic 信息;
offsets 节点:记录每个 topic 的每个分区的 offset。
b)注册新的消费者
当新的消费者注册到 Kafka 中时,会在 /consumers/{group_id}/ids 节点下创建临时子节点,并记录相关信息。
c)监听消费者分组中消费者的变化
每个消费者都要关注其所属消费者组中消费者数目的变化,即监听/consumers/{group_id}/ids 下子节点的变化。一单发现消费者新增或减少,就会触发消费者的负载均衡。
4 Producers 负载均衡:
对于同一个 topic 的不同 partition,Kafka会尽力将这些 partition 分布到不同的broker 服务器上,这种均衡策略实际上是基于 ZooKeeper 实现的。在一个 broker 启动时,会首先完成 broker 的注册过程,并注册一些诸如 “有哪些可订阅的 topic” 之类的元数据信息。producers 启动后也要到 ZooKeeper 下注册,创建一个临时节点来监听broker 服务器列表的变化。由于在 ZooKeeper 下 broker 创建的也是临时节点,当 brokers 发生变化时,producers 可以得到相关的通知,从改变自己的 broker list。其它的诸如 topic 的变化以及broker 和 topic 的关系变化,也是通过 ZooKeeper 的这种 Watcher 监听实现的。
在生产中,必须指定 topic;但是对于 partition,有两种指定方式:
明确指定 partition(0-N),则数据被发送到指定 partition;
设置为 RD_KAFKA_PARTITION_UA,则 Kafka 会回调 partitioner 进行均衡选取,partitioner 方法需要自己实现。可以轮询或者传入 key 进行 hash。未实现则采用默认的随机方法 rd_kafka_msg_partitioner_random 随机选择。
5 Consumer 负载均衡:
Kafka 保证同一 consumer group 中只有一个 consumer 可消费某条消息,实际上,Kafka 保证的是稳定状态下每一个 consumer 实例只会消费某一个或多个特定的数据,而某个 partition 的数据只会被某一个特定的 consumer 实例所消费。这样设计的劣势是无法让同一个 consumer group 里的 consumer 均匀消费数据,优势是每个 consumer 不用都跟大量的 broker 通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个 partition 里的数据是有序的,这种设计可以保证每个 partition 里的数据也是有序被消费。
consumer 数量不等于 partition 数量
如果某 consumer group 中 consumer 数量少于 partition 数量,则至少有一个 consumer 会消费多个 partition 的数据;如果 consumer 的数量与 partition 数量相同,则正好一个 consumer 消费一个 partition 的数据,而如果 consumer 的数量多于 partition 的数量时,会有部分 consumer 无法消费该 topic 下任何一条消息。
借助 ZooKeeper 实现负载均衡
关于负载均衡,对于某些低级别的 API,consumer 消费时必须指定 topic 和 partition,这显然不是一种友好的均衡策略。基于高级别的 API,consumer 消费时只需制定 topic,借助 ZooKeeper 可以根据 partition 的数量和 consumer 的数量做到均衡的动态配置。
consumers 在启动时会到 ZooKeeper 下以自己的 conusmer-id 创建临时节点 /consumer/[group-id]/ids/[conusmer-id],并对 /consumer/[group-id]/ids 注册监听事件,当消费者发生变化时,同一 group 的其余消费者会得到通知。当然,消费者还要监听 broker 列表的变化。librdkafka 通常会将 partition 进行排序后,根据消费者列表,进行轮流的分配。
6. 记录消费进度 Offset:
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的消费进度 Offset 记录到 ZooKeeper上,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:
#节点内容就是Offset的值。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
并且默认提供了 kafka_consumer_groups.sh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groups.sh –bootstrap-server * –describe –group *)。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offset.store.method 的配置,默认是存储在 broker 端。
7. 记录 Partition 与 Consumer 的关系:
consumer group 下有多个 consumer(消费者),对于每个消费者组(consumer group),Kafka都会为其分配一个全局唯一的 group ID,group 内部的所有消费者共享该 ID。订阅的 topic 下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其它 group)。同时,Kafka 为每个消费者分配一个 consumer ID,通常采用 hostname:UUID 形式表示。
在Kafka中,规定了每个 partition 只能被同组的一个消费者进行消费,因此,需要在 ZooKeeper 上记录下 partition 与 consumer 之间的关系,每个 consumer 一旦确定了对一个 partition 的消费权力,需要将其 consumer ID 写入到 ZooKeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id] 就是一个消息分区的标识,节点内容就是该消息分区 消费者的 consumer ID。
关键技术:
1、直接使用linux 文件系统的cache,来高效缓存数据。
2、采用linux Zero-Copy提高发送性能。传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次。根据测试结果,可以提高60%的数据发送性能。
3、数据在磁盘上存取代价为O(1)。kafka以topic来进行消息管理,每个topic包含多个part(ition),每个part对应一个逻辑log,有多个segment组成。每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。发布者发到某个topic的消息会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
4、显式分布式,即所有的producer、broker和consumer都会有多个,均为分布式的。Producer和broker之间没有负载均衡机制。broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
(1)zero-copy
“零拷贝”是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式。
Non-Zero Copy方式:
Zero Copy方式:
Zero Copy的模式中,避免了数据在用户空间和内存空间之间的拷贝,从而提高了系统的整体性能。Linux中的sendfile()以及Java NIO中的FileChannel.transferTo()方法都实现了零拷贝的功能,而在Netty中也通过在FileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝。
在Kafka上,有两个原因可能导致低效:
1)太多的网络请求
2)过多的字节拷贝。
为了提高效率,Kafka把message分成一组一组的,每次请求会把一组message发给相应的consumer。 此外, 为了减少字节拷贝,采用了sendfile系统调用。为了理解sendfile原理,先说一下传统的利用socket发送文件要进行拷贝:
Sendfile系统调用:
(2)Exactly once message transfer
怎样记录每个consumer处理的信息的状态?在Kafka中仅保存了每个consumer已经处理数据的offset。这样有两个好处:
1)保存的数据量少
2)当consumer出错时,重新启动consumer处理数据时,只需从最近的offset开始处理数据即可。
(3)Push/pull
Producer 向Kafka(push)推数据,consumer 从kafka 拉(pull)数据。
Kafka采用了基于拉取模型的消费状态处理,它将主题分成多个有序的分区,任何时刻每个分区都只被一个消费者使用。并且消费者会记录每个分区的消费进度( 即偏移量)。每个消费者只需要为每个分区记录一个整数值,而不需要像其他消息系统那样记录每条消息的状态。假设有10000条消息,传统方式需要记录10000条消息的状态;如果用Kafka的分区机制,假设有10个分区,每个分区1000条消息,总共只需要记录1 0个分区的消费状态。
和传统方式需要跟踪每条消息的应答不同, Kafka的消费者会定时地将分区的消费进度保存成检查点文件,表示“这个位置之前的消息都已经被消费过了”。传统方式需要消费者发送每条消息的应答,服务端再对应答做出不同的处理;而Kafka只需要让消费者记录消费进度,服务端不需要记录消息的任何状态。除此之外,让消费者记录分区的消费进度还有一个好处:消费者可以“故意”回退到某个旧的偏移量位置,然后重新处理数据。虽然这种处理方式看起来违反了队列模型的规定( 一条消息发送给队列的一个消费者之后,就不会被其他消费者再次处理),但在实际运用中,很多消费者都需要这种功能。比如,消费者的处理逻辑代码出现了问题,在部署并启动消费者后,需要处理之前的消息并重新计算。
(4)负载均衡和容错
Producer和broker之间没有负载均衡机制。
broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
主题(Topic)和分区(Partition)
一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存。
kafka的消息通过主题进行分类,主题可以被分为若干个分区,消息以追加的方式写入分区,然后以先入先出的顺序读取。由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。
Topic 只是逻辑概念,面向的是 producer 和 consumer;而 Partition 则是物理概念。可以想象,如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 broker,那么关于该 Topic 的所有读写请求都将由这一个 broker 处理,吞吐量很容易陷入瓶颈,这显然是不符合高吞吐量应用场景的。有了 Partition 概念以后,假设一个 Topic 被分为 10 个Partitions,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀的分布到不同的 broker(服务器)上,当 producer 发布消息时,producer 客户端可以采用 random、key-hash 及 轮询 等算法选定目标 partition,若不指定,Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。
kafka通过分区来实现数据冗余和伸缩性,分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比单个服务器更强大的性能。
消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。群组保证每个分区只能被一个消费者使用。
一个独立的Kafka服务器被称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的
请求做出响应,返回已经提交到磁盘上的消息。
broker是集群的组成部分。每个集群都有一个broker同时充当了集群控制器的角色。控制器负责管理工作,包括将分区分配给broker和监控broker。在集群中,一个分区从属于一个broker,该broker被称为分区的首领。一个分区可以分配给多个broker,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个broker失效,其他broker可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。
消费者和消费者群组
往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS ,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。
除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。实际上, Kafka 设计的主要目标之一,就是要让Kafka 主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取到主题所有的消息。不同于传统的消息系统,横向伸缩Kafka 消费者和消费者群组并不会对性能造成负面影响。
如果新增一个只包含一个消费者的群组 G2 ,那么这个消费者将从主题 Tl 上接收所有的消息,与群组 Gl 之间互不影响。群组 G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组 Gl 那样。总的来说,群组 G2 还是会接收到所有消息,不管有没有其他群组存在。
简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组,然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息。
群组里的消费者共同读取主题的分区。一个新的悄费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生奔溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时, 比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要, 它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向被指派为群组协调器的broker (不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生奔溃,井停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
当消费者要加入群组时,它会向群组协调器发送一个Join Group 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。
Kafka 内置了两种分配策略,分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则, 一个消费者使用一个线程。如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。最好是把消费者的逻辑封装在自己的对象里,然后使用Java的ExecutorService 启动多个线程,使每个消费者运行在自己的线程上。
全程解析(Producer-kafka-consumer)
1. producer 发布消息
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。
其路由机制为:
指定了 patition,则直接使用;
未指定 patition 但指定 key,通过对 key 进行 hash 选出一个 patition;
patition 和 key 都未指定,使用轮询选出一个 patition。
写入流程:
producer 先从 ZooKeeper 的 "/brokers/.../state" 节点找到该 partition 的leader;
producer 将消息发送给该 leader;
leader 将消息写入本地 log;
followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK;
leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK;
2. Broker 存储消息
物理上把 topic 分成一个或多个 patition,每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)。
3. Consumer 消费消息
high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由ZooKeeper 保存(下次消费时,该group 中的consumer将从offset记录的位置开始消费)。
注意:
如果消费线程大于 patition 数量,则有些线程将收不到消息;
如果 patition 数量大于消费线程数,则有些线程多收到多个 patition 的消息;
如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的。
consumer 采用 pull 模式从 broker 中读取数据。
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
复制原理和同步方式
Kafka的副本机制会在多个服务端节点(简称节点即消息代理节点)上对每个主题分区的日志
进行复制。当集群中的某个节点出现故障时,访问故障节点的请求会被转移到其他正常节点的副本上。副本的单位是主题的分区, Kafka每个主题的每个分区都有一个主副本以及0个或多个备份副本。备份副本会保持和主副本的数据同步,用来在主副本失效时替换为主副本
备份副本始终尽量保持与主副本的数据同步。备份副本的日志文件和主副本的日志总是相同的,它们都有相同的偏移量和相同顺序的消息。
为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。
如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。
Kafka 文件存储机制
Kafka 中消息是以 topic 进行分类的,生产者通过 topic 向 Kafka broker 发送消息,消费者通过 topic 读取数据。然而 topic 在物理层面又能以 partition 为分组,一个 topic 可以分成若干个 partition。事实上,partition 并不是最终的存储粒度,partition 还可以细分为 segment,一个 partition 物理上由多个 segment 组成。
在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录,partition 的名称规则为:topic 名称 + 有序序号,第一个序号从 0 开始计,最大的序号为 partition 数量减 1,partition 是实际物理上的概念,而 topic 是逻辑上的概念。
partition中segment文件存储结构:
segment 文件由两部分组成,分别为 “.index” 文件和 “.log” 文件,分别表示segment 索引文件和数据文件,这两个文件一一对应,成对出现。这两个文件的命令规则为:partition 全局的第一个segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值,数值大小为 64 位,20 位数字字符长度,没有数字用 0 填充,如下:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
数据文件的分段:
kafka解决查询效率的手段之一是将数据文件分段。
一句话:kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到高效性。
日志删除:
kafka日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间顺序),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取曹组的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteAtrayList.
kafka消费日志删除思想:kafka把topic中一个partition大小文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
vi config/server.properties
log.cleanup.plicy=delete ##启用删除策略
##直接删除,删除后的消息不可恢复,可配置以下两个策略
log.retention.hours=16 ##清理超过指定时间message
log.retention.bytes=1073741824 ##清理指定大小后,删除旧的消息
磁盘存储优势:
kafka在设计的时候,采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已经写入的消息,这种方式属于典型的顺序写入的操作,所以就算是kafka使用磁盘作为存储介质,所能实现的吞吐量也非常可观。
kafka大量使用页面缓存,这也是kafka实现高吞吐的重要因素之一。
kafka还使用了零拷贝技术来进一步提升性能。
架构:
kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的运行依赖于ZooKeeper,Producer推送消息给kafka,Consumer从kafka拉消息。
ZooKeeper 是一个分布式的、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现。分布式应用程序可以基于它实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。
1.broker 在 ZooKeeper 中的注册:
为了记录 broker 的注册信息,在 ZooKeeper 上,专门创建了属于 Kafka 的一个节点,其路径为 /brokers;
Kafka 的每个 broker 启动时,都会到 ZooKeeper 中进行注册,告诉 ZooKeeper 其broker.id,在整个集群中,broker.id 应该全局唯一,并在 ZooKeeper 上创建其属于自己的节点,其节点路径为 /brokers/ids/{broker.id};
创建完节点后,Kafka 会将该 broker 的 broker.name 及端口号记录到该节点;
另外,该 broker 节点属性为临时节点,当 broker 会话失效时,ZooKeeper 会删除该节点,这样,我们就可以很方便的监控到broker 节点的变化,及时调整负载均衡等。
2.Topic 在 ZooKeeper 中的注册:
在 Kafka 中,所有 topic 与 broker 的对应关系都由 ZooKeeper 进行维护,在 ZooKeeper 中,建立专门的节点来记录这些信息,其节点路径为 /brokers/topics/{topic_name}。
3. consumer 在 ZooKeeper 中的注册:
a)注册新的消费者分组
当新的消费者组注册到 ZooKeeper 中时,ZooKeeper 会创建专用的节点来保存相关信息,其节点路径为 ls/consumers/{group_id},其节点下有三个子节点,分别为 [ids, owners, offsets]。
ids 节点:记录该消费组中当前正在消费的消费者;
owners 节点:记录该消费组消费的 topic 信息;
offsets 节点:记录每个 topic 的每个分区的 offset。
b)注册新的消费者
当新的消费者注册到 Kafka 中时,会在 /consumers/{group_id}/ids 节点下创建临时子节点,并记录相关信息。
c)监听消费者分组中消费者的变化
每个消费者都要关注其所属消费者组中消费者数目的变化,即监听/consumers/{group_id}/ids 下子节点的变化。一单发现消费者新增或减少,就会触发消费者的负载均衡。
4 Producers 负载均衡:
对于同一个 topic 的不同 partition,Kafka会尽力将这些 partition 分布到不同的broker 服务器上,这种均衡策略实际上是基于 ZooKeeper 实现的。在一个 broker 启动时,会首先完成 broker 的注册过程,并注册一些诸如 “有哪些可订阅的 topic” 之类的元数据信息。producers 启动后也要到 ZooKeeper 下注册,创建一个临时节点来监听broker 服务器列表的变化。由于在 ZooKeeper 下 broker 创建的也是临时节点,当 brokers 发生变化时,producers 可以得到相关的通知,从改变自己的 broker list。其它的诸如 topic 的变化以及broker 和 topic 的关系变化,也是通过 ZooKeeper 的这种 Watcher 监听实现的。
在生产中,必须指定 topic;但是对于 partition,有两种指定方式:
明确指定 partition(0-N),则数据被发送到指定 partition;
设置为 RD_KAFKA_PARTITION_UA,则 Kafka 会回调 partitioner 进行均衡选取,partitioner 方法需要自己实现。可以轮询或者传入 key 进行 hash。未实现则采用默认的随机方法 rd_kafka_msg_partitioner_random 随机选择。
5 Consumer 负载均衡:
Kafka 保证同一 consumer group 中只有一个 consumer 可消费某条消息,实际上,Kafka 保证的是稳定状态下每一个 consumer 实例只会消费某一个或多个特定的数据,而某个 partition 的数据只会被某一个特定的 consumer 实例所消费。这样设计的劣势是无法让同一个 consumer group 里的 consumer 均匀消费数据,优势是每个 consumer 不用都跟大量的 broker 通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个 partition 里的数据是有序的,这种设计可以保证每个 partition 里的数据也是有序被消费。
consumer 数量不等于 partition 数量
如果某 consumer group 中 consumer 数量少于 partition 数量,则至少有一个 consumer 会消费多个 partition 的数据;如果 consumer 的数量与 partition 数量相同,则正好一个 consumer 消费一个 partition 的数据,而如果 consumer 的数量多于 partition 的数量时,会有部分 consumer 无法消费该 topic 下任何一条消息。
借助 ZooKeeper 实现负载均衡
关于负载均衡,对于某些低级别的 API,consumer 消费时必须指定 topic 和 partition,这显然不是一种友好的均衡策略。基于高级别的 API,consumer 消费时只需制定 topic,借助 ZooKeeper 可以根据 partition 的数量和 consumer 的数量做到均衡的动态配置。
consumers 在启动时会到 ZooKeeper 下以自己的 conusmer-id 创建临时节点 /consumer/[group-id]/ids/[conusmer-id],并对 /consumer/[group-id]/ids 注册监听事件,当消费者发生变化时,同一 group 的其余消费者会得到通知。当然,消费者还要监听 broker 列表的变化。librdkafka 通常会将 partition 进行排序后,根据消费者列表,进行轮流的分配。
6. 记录消费进度 Offset:
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的消费进度 Offset 记录到 ZooKeeper上,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:
#节点内容就是Offset的值。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
并且默认提供了 kafka_consumer_groups.sh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groups.sh –bootstrap-server * –describe –group *)。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offset.store.method 的配置,默认是存储在 broker 端。
7. 记录 Partition 与 Consumer 的关系:
consumer group 下有多个 consumer(消费者),对于每个消费者组(consumer group),Kafka都会为其分配一个全局唯一的 group ID,group 内部的所有消费者共享该 ID。订阅的 topic 下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其它 group)。同时,Kafka 为每个消费者分配一个 consumer ID,通常采用 hostname:UUID 形式表示。
在Kafka中,规定了每个 partition 只能被同组的一个消费者进行消费,因此,需要在 ZooKeeper 上记录下 partition 与 consumer 之间的关系,每个 consumer 一旦确定了对一个 partition 的消费权力,需要将其 consumer ID 写入到 ZooKeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id] 就是一个消息分区的标识,节点内容就是该消息分区 消费者的 consumer ID。
关键技术:
1、直接使用linux 文件系统的cache,来高效缓存数据。
2、采用linux Zero-Copy提高发送性能。传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次。根据测试结果,可以提高60%的数据发送性能。
3、数据在磁盘上存取代价为O(1)。kafka以topic来进行消息管理,每个topic包含多个part(ition),每个part对应一个逻辑log,有多个segment组成。每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。发布者发到某个topic的消息会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
4、显式分布式,即所有的producer、broker和consumer都会有多个,均为分布式的。Producer和broker之间没有负载均衡机制。broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
(1)zero-copy
“零拷贝”是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式。
Non-Zero Copy方式:
Zero Copy方式:
Zero Copy的模式中,避免了数据在用户空间和内存空间之间的拷贝,从而提高了系统的整体性能。Linux中的sendfile()以及Java NIO中的FileChannel.transferTo()方法都实现了零拷贝的功能,而在Netty中也通过在FileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝。
在Kafka上,有两个原因可能导致低效:
1)太多的网络请求
2)过多的字节拷贝。
为了提高效率,Kafka把message分成一组一组的,每次请求会把一组message发给相应的consumer。 此外, 为了减少字节拷贝,采用了sendfile系统调用。为了理解sendfile原理,先说一下传统的利用socket发送文件要进行拷贝:
Sendfile系统调用:
(2)Exactly once message transfer
怎样记录每个consumer处理的信息的状态?在Kafka中仅保存了每个consumer已经处理数据的offset。这样有两个好处:
1)保存的数据量少
2)当consumer出错时,重新启动consumer处理数据时,只需从最近的offset开始处理数据即可。
(3)Push/pull
Producer 向Kafka(push)推数据,consumer 从kafka 拉(pull)数据。
Kafka采用了基于拉取模型的消费状态处理,它将主题分成多个有序的分区,任何时刻每个分区都只被一个消费者使用。并且消费者会记录每个分区的消费进度( 即偏移量)。每个消费者只需要为每个分区记录一个整数值,而不需要像其他消息系统那样记录每条消息的状态。假设有10000条消息,传统方式需要记录10000条消息的状态;如果用Kafka的分区机制,假设有10个分区,每个分区1000条消息,总共只需要记录1 0个分区的消费状态。
和传统方式需要跟踪每条消息的应答不同, Kafka的消费者会定时地将分区的消费进度保存成检查点文件,表示“这个位置之前的消息都已经被消费过了”。传统方式需要消费者发送每条消息的应答,服务端再对应答做出不同的处理;而Kafka只需要让消费者记录消费进度,服务端不需要记录消息的任何状态。除此之外,让消费者记录分区的消费进度还有一个好处:消费者可以“故意”回退到某个旧的偏移量位置,然后重新处理数据。虽然这种处理方式看起来违反了队列模型的规定( 一条消息发送给队列的一个消费者之后,就不会被其他消费者再次处理),但在实际运用中,很多消费者都需要这种功能。比如,消费者的处理逻辑代码出现了问题,在部署并启动消费者后,需要处理之前的消息并重新计算。
(4)负载均衡和容错
Producer和broker之间没有负载均衡机制。
broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
主题(Topic)和分区(Partition)
一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存。
kafka的消息通过主题进行分类,主题可以被分为若干个分区,消息以追加的方式写入分区,然后以先入先出的顺序读取。由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。
Topic 只是逻辑概念,面向的是 producer 和 consumer;而 Partition 则是物理概念。可以想象,如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 broker,那么关于该 Topic 的所有读写请求都将由这一个 broker 处理,吞吐量很容易陷入瓶颈,这显然是不符合高吞吐量应用场景的。有了 Partition 概念以后,假设一个 Topic 被分为 10 个Partitions,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀的分布到不同的 broker(服务器)上,当 producer 发布消息时,producer 客户端可以采用 random、key-hash 及 轮询 等算法选定目标 partition,若不指定,Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。
kafka通过分区来实现数据冗余和伸缩性,分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比单个服务器更强大的性能。
消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。群组保证每个分区只能被一个消费者使用。
一个独立的Kafka服务器被称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的
请求做出响应,返回已经提交到磁盘上的消息。
broker是集群的组成部分。每个集群都有一个broker同时充当了集群控制器的角色。控制器负责管理工作,包括将分区分配给broker和监控broker。在集群中,一个分区从属于一个broker,该broker被称为分区的首领。一个分区可以分配给多个broker,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个broker失效,其他broker可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。
消费者和消费者群组
往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS ,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。
除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。实际上, Kafka 设计的主要目标之一,就是要让Kafka 主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取到主题所有的消息。不同于传统的消息系统,横向伸缩Kafka 消费者和消费者群组并不会对性能造成负面影响。
如果新增一个只包含一个消费者的群组 G2 ,那么这个消费者将从主题 Tl 上接收所有的消息,与群组 Gl 之间互不影响。群组 G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组 Gl 那样。总的来说,群组 G2 还是会接收到所有消息,不管有没有其他群组存在。
简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组,然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息。
群组里的消费者共同读取主题的分区。一个新的悄费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生奔溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时, 比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要, 它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向被指派为群组协调器的broker (不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生奔溃,井停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
当消费者要加入群组时,它会向群组协调器发送一个Join Group 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。
Kafka 内置了两种分配策略,分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则, 一个消费者使用一个线程。如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。最好是把消费者的逻辑封装在自己的对象里,然后使用Java的ExecutorService 启动多个线程,使每个消费者运行在自己的线程上。
全程解析(Producer-kafka-consumer)
1. producer 发布消息
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。
其路由机制为:
指定了 patition,则直接使用;
未指定 patition 但指定 key,通过对 key 进行 hash 选出一个 patition;
patition 和 key 都未指定,使用轮询选出一个 patition。
写入流程:
producer 先从 ZooKeeper 的 "/brokers/.../state" 节点找到该 partition 的leader;
producer 将消息发送给该 leader;
leader 将消息写入本地 log;
followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK;
leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK;
2. Broker 存储消息
物理上把 topic 分成一个或多个 patition,每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)。
3. Consumer 消费消息
high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由ZooKeeper 保存(下次消费时,该group 中的consumer将从offset记录的位置开始消费)。
注意:
如果消费线程大于 patition 数量,则有些线程将收不到消息;
如果 patition 数量大于消费线程数,则有些线程多收到多个 patition 的消息;
如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的。
consumer 采用 pull 模式从 broker 中读取数据。
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
复制原理和同步方式
Kafka的副本机制会在多个服务端节点(简称节点即消息代理节点)上对每个主题分区的日志
进行复制。当集群中的某个节点出现故障时,访问故障节点的请求会被转移到其他正常节点的副本上。副本的单位是主题的分区, Kafka每个主题的每个分区都有一个主副本以及0个或多个备份副本。备份副本会保持和主副本的数据同步,用来在主副本失效时替换为主副本
备份副本始终尽量保持与主副本的数据同步。备份副本的日志文件和主副本的日志总是相同的,它们都有相同的偏移量和相同顺序的消息。
为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。
如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。
Kafka 文件存储机制
Kafka 中消息是以 topic 进行分类的,生产者通过 topic 向 Kafka broker 发送消息,消费者通过 topic 读取数据。然而 topic 在物理层面又能以 partition 为分组,一个 topic 可以分成若干个 partition。事实上,partition 并不是最终的存储粒度,partition 还可以细分为 segment,一个 partition 物理上由多个 segment 组成。
在 Kafka 文件存储中,同一个 topic 下有多个不同的 partition,每个 partiton 为一个目录,partition 的名称规则为:topic 名称 + 有序序号,第一个序号从 0 开始计,最大的序号为 partition 数量减 1,partition 是实际物理上的概念,而 topic 是逻辑上的概念。
partition中segment文件存储结构:
segment 文件由两部分组成,分别为 “.index” 文件和 “.log” 文件,分别表示segment 索引文件和数据文件,这两个文件一一对应,成对出现。这两个文件的命令规则为:partition 全局的第一个segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值,数值大小为 64 位,20 位数字字符长度,没有数字用 0 填充,如下:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
数据文件的分段:
kafka解决查询效率的手段之一是将数据文件分段。
一句话:kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到高效性。
日志删除:
kafka日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间顺序),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取曹组的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteAtrayList.
kafka消费日志删除思想:kafka把topic中一个partition大小文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
vi config/server.properties
log.cleanup.plicy=delete ##启用删除策略
##直接删除,删除后的消息不可恢复,可配置以下两个策略
log.retention.hours=16 ##清理超过指定时间message
log.retention.bytes=1073741824 ##清理指定大小后,删除旧的消息
磁盘存储优势:
kafka在设计的时候,采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已经写入的消息,这种方式属于典型的顺序写入的操作,所以就算是kafka使用磁盘作为存储介质,所能实现的吞吐量也非常可观。
kafka大量使用页面缓存,这也是kafka实现高吞吐的重要因素之一。
kafka还使用了零拷贝技术来进一步提升性能。
发表评论
-
Zookeeper 总结
2020-01-10 14:53 293zookeeper是一个开源的分 ... -
消息队列之 RabbitMQ
2018-10-16 11:53 819RabbitMQ 特点 RabbitMQ 是一个由 Erla ... -
Disruptor 介绍
2016-07-04 15:53 3106并发的复杂性: 在计算机科学中,并发的意思是两个或两 ... -
Apache kafka 介绍
2015-12-18 11:24 879kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性 ... -
ActiveMQ 集群配置
2015-01-09 10:55 18994构建高可用的ActiveMQ系 ... -
JXPath使用介绍
2014-03-11 15:13 1334一、JXPath简介 JXPath是apache公司提供 ... -
注册Tomcat服务为系统服务
2013-12-12 11:40 5721Tomcat服务注册为系统服务之后,就不用每次启动机器之 ... -
Apache ActiveMQ 介绍
2013-08-22 15:18 1207一、下载部署 1、下载 ... -
Apache PDFBox 介绍
2012-02-03 10:22 1695PDFBox是Java实现的PDF文档协作类库,提供PDF文档 ... -
Apache POI 介绍
2012-02-03 10:13 1711Apache POI是Apache软件基金会的开放源码函式库, ...
相关推荐
此外,书籍还包含了对消息队列、流处理以及Kafka在不同应用场景下的使用案例的详细解读,这有助于读者更好地理解Kafka的工作原理以及如何在自己的项目中有效利用Kafka。 总而言之,Apache Kafka作为目前大数据处理...
### Kafka系列解读知识点总结 #### 一、Apache Kafka简介与设计理念 **Apache Kafka** 是一款高性能、分布式的发布-订阅消息系统,适用于处理大规模实时数据流。它最初由LinkedIn开发,后捐赠给Apache软件基金会,...
在当今大数据处理领域,Apache Kafka以其高吞吐量、低延迟和分布式架构的特点,成为实时数据流处理的重要工具。而随着物联网(IoT)的发展,ARM架构的设备在边缘计算中扮演着越来越重要的角色,因此,Kafka针对ARM...
2. **安装与配置**:掌握如何在本地或集群环境中设置Kafka和ZooKeeper,包括配置文件的解读和优化。 3. **数据生产和消费**:学习如何使用Java API创建数据生产者和消费者,以及如何使用命令行工具进行消息发送和...
### Kafka 数据可靠性深度解读 #### 一、概述 Kafka是一种高性能、分布式的消息系统,最初由LinkedIn开发,并随后成为Apache项目的一部分。Kafka以其水平扩展能力和高吞吐率著称,广泛应用于各种场景中,包括大...
三、Kafka 源码解读 在 Kafka 2.1.1 源码中,我们可以深入理解其内部机制。例如: 1. **MessageAndOffset** 类:表示消息及其在分区中的位置,包含消息体和偏移量。 2. **Partitioner** 接口:定义了如何根据键...
在大数据处理领域,Apache Kafka是一种广泛使用的分布式流处理平台,它允许实时地处理和存储大量数据。本项目专注于“Kafka生产数据工程”,通过Java编程语言实现数据的生成与发送到Kafka集群的过程。以下是对这个...
在大数据领域,Apache Kafka作为一个分布式流处理平台,因其高效、可扩展和高吞吐量的特点,被广泛应用于实时数据处理和消息传递。本资料深入探讨了Kafka如何保证数据的可靠性,这对于理解Kafka的核心机制以及在实际...
《Kafka系列文档——官方帮助文档中文翻译版》是一份详尽解读Apache Kafka的重要资源,旨在帮助中文用户更好地理解和应用这一分布式流处理平台。Kafka是LinkedIn开发并贡献给Apache软件基金会的一个开源项目,它被...
#### 五、Kafka的网络包源码解读 **Kafka.network 包** 包含了 Kafka 网络通信的核心代码,如客户端与 Broker 之间的通信协议等。了解这部分源码有助于深入理解 Kafka 如何高效地进行数据传输。 #### 六、Kafka ...
Kafka是一款高吞吐量的分布式消息系统,由LinkedIn开发并贡献给Apache软件基金会。它的设计目标是成为一个实时、可扩展、持久化的流处理平台,能够处理大量的实时数据。本资料"04、Kafka核心源码剖析"将深入到Kafka...
Kafka是由Apache开发的一个分布式流处理平台,它最初由LinkedIn设计并开源,现在已经成为大数据领域的重要组成部分。Kafka主要用于构建实时数据管道和流应用,能够在高吞吐量下处理海量数据,并提供消息队列、数据...
Apache Kafka 是一个分布式流处理平台,被广泛应用于大数据实时处理、消息传递和日志聚合等场景。Kafka-Manager 是一个开源工具,由 XebiaLabs 开发,用于方便地管理和监控 Kafka 集群,提供了丰富的可视化界面,...
在大数据处理领域,Apache Kafka是一款广泛使用的分布式流处理平台,尤其在实时数据传输和消息队列方面表现出色。Cloudera作为Hadoop生态的重要组成部分,提供了对Kafka的深度集成和支持。以下是对“cloudera对kafka...
在IT行业中,消息队列(Message Queue,简称MQ)和Apache Kafka是两个非常重要的分布式系统组件,用于处理大规模数据传输和解耦应用间通信。本文将深入探讨MQ、特别是Kafka的学习资料,以及相关的知识点。 首先,让...
Kafka是一款由LinkedIn开发并开源的消息系统,后来成为Apache软件基金会的顶级项目。它最初设计为高吞吐量、低延迟的实时流处理平台,现在广泛应用于大数据领域的消息中间件。本篇将深入探讨Windows环境下Kafka 2.11...
Apache Kafka,作为一款高效、可扩展且分布式的流处理平台,已经在诸多企业中得到了广泛应用。本资料集旨在深入解析Kafka的内部工作机制,帮助读者从源码层面理解Kafka的设计思想与实现原理。 首先,让我们从“1....
#### 四、Kafka.Network包源码解读 `Kafka.Network` 包包含了 Kafka 的网络通信组件,主要包括以下核心类: - **SocketServer**:负责监听来自客户端的连接请求。 - **RequestHandler**:处理每个客户端请求的具体...