前言
Kafka 作为一个高吞吐量的消息队列,它的很多设计都体现了这一点。比如它的客户端,无论是 Producer 还是 Consumer ,都会内置一个缓存用来存储消息。这样类似于我们写文件时,并不会一次只写一个字节,而是先写到一个缓存里,然后等缓存满了,才会将缓存里的数据写入到磁盘。这种缓存机制可以有效的提高吞吐量,本篇文章介绍缓存在 Kafka 客户端的实现原理。
Producer 缓存
我们知道 Producer 发送消息,会先将它存到 RecordAccumulator 的缓存里,等待缓存满了之后,就会发送到服务端。这个缓存的大小,是由内部的内存池控制的。
内存池使用
我们通过观察 RecordAccumulator 的 append 接口,可以看到每次缓存消息之前,都会向内存池申请内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public final class RecordAccumulator { // 消息缓存队列,以batch格式存储 private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; // 内存池 private final BufferPool free; public RecordAppendResult append(....) { // 计算该消息占用的内存大小 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); // 向内存池申请内存 buffer = free.allocate(size, maxTimeToBlock); ....... } } |
当消息发送后,会触发 RecordAccumulator 释放内存。
1 2 3 4 5 6 7 8 9 10 11 12 |
public final class RecordAccumulator { // 内存池 private final BufferPool free; public void deallocate(ProducerBatch batch) { incomplete.remove(batch); // 检测是否该消息batch数据太大了,如果太大了则需要切割。所以这种情况不需要释放内存 if (!batch.isSplitBatch()) // 向内存池释放内存 free.deallocate(batch.buffer(), batch.initialCapacity()); } } |
内存池结构
使用内存池有两个优点,一个是能够限制内存的使用量,另一个是减少内存的申请和回收频率。虽然 java 支持自动 gc ,但是 gc 也是有成本的。如果之前申请的内存用完之后,还可以重新复用,那么就不会触发 gc。但是内存池的实现有一个难点,那就是如何高效的重新利用。因为每次申请的内存大小都不相同,这样就没办法直接利用了。一种常见的做法是只缓存那些特定大小的内存,对于其他大小的内存则使用后直接丢弃。
我们知道 Kafka 为了提高吞吐量,都是以 batch 格式保存消息。Producer 在实现内存池时,它结合了消息 batch 的特点,试图将每个消息 batch 的大小控制在一定范围内。这样每次申请内存的大小,就可以是相同的。基于这个原因,Kafka 的内存池分为两部分。一部分是特定大小内存块的缓存池,另一个是非缓存池。
当申请的内存大小等于特定的数值,则优先从缓存池中获取。如果缓存池没有,那么需要向非缓存池部分申请内存。等到这块内存使用完后,才会被放入到缓存池等待复用。注意到缓存池的大小是可变的,一开始为零。随着用户申请和释放,才慢慢增长起来的。
如果申请的内存不等于特定的数值,则向非缓存池申请。如果内存空间不够用,那么就需要释放缓存池的内存。
缓存池的内存一般都很少回收,除非是内存空间不足。而非缓存池的内存,都是使用后丢弃,等待 gc 回收。
内存池实现
BufferPool 类负责实现内存池,它有两个重要接口:
- allocate 接口,负责申请内存
- deallocate 接口,负责释放内存
allocate 接口代码简化如下,它支持用户并发申请内存,里面包含了一个等待的用户队列,队列采用了先进先出的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
public class BufferPool { private final long totalMemory; // 整个内存池的总容量 private long nonPooledAvailableMemory; // 非缓存池的空闲大小 private final int poolableSize; // 缓存池中,特定内存的大小 private final ReentrantLock lock; // 锁用来防治并发 private final Deque<ByteBuffer> free; // 缓存池中的空闲内存块 private final Deque<Condition> waiters; // 等待申请的用户 // 参数size表示申请的内存大小,参数maxTimeToBlockMs表示等待的最长时间 public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("...") ByteBuffer buffer = null; this.lock.lock(); // 如果申请大小等于指定的值,并且缓存池中有空闲的内存块,则直接返回 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // 计算总的空闲内存大小, 等于缓存池的大小 + 非缓存池的空闲大小 int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // 如果空闲内存足够,那么需要保证非缓存池的空闲空间足够 // 因为所有的内存分配都是从非缓存池开始 freeUp(size); this.nonPooledAvailableMemory -= size; } else { // 如果空闲内存不够,那么需要等待别的用户释放内存 int accumulated = 0; // 表示已经成功申请的内存大小 Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); // 添加到等待集合中 this.waiters.addLast(moreMemory); // 循环等待 while (accumulated < size) { ...... // 等待通知 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); if (waitingTimeElapsed) { // 超过等待时间还没有分配到足够的内存,那么就抛出异常 throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } // 如果申请大小等于指定的值,而这时刚好有其他用户释放了内存,那么就直接从缓存池中获取 if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { buffer = this.free.pollFirst(); accumulated = size; } else { // 继续释放足够的内存 freeUp(size - accumulated); // 计算可以分配的内存大小 int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); // 更改非缓存池的空闲内存大小 this.nonPooledAvailableMemory -= got; accumulated += got; } } // 这里会检查是否还有剩下的空闲内存,如果有则需要通知下一个用户。 // 因为这时可能有多个用户都释放了内存,而用户释放内存只会通知第一个用户(也就是当前用户),而下个用户还在一直等待中,如果当前用户不主动通知的话,可能造成下个用户等待超时。 if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); // 运行到这里,说明已经成功了申请到了内存 if (buffer == null) // 表示buffer不是从缓存池中获取的,需要执行内存分配 return safeAllocateByteBuffer(size); else: // 已经从缓存池中获取到了,则直接返回 return buffer; } } } |
deallocate 接口的源码比较简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public class BufferPool { public void deallocate(ByteBuffer buffer) { deallocate(buffer, buffer.capacity()); } public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { // 如果释放的内存大小等于指定的值,那么就将它添加到缓存池。free列表存储这些内存块 if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { // 否则更新非缓存池的空闲大小,这个ByteBuffer实例等待jvm自动gc this.nonPooledAvailableMemory += size; } // 通知队列的第一个用户 Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } } } |
Consumer 缓存
Consumer 从服务端获取消息,也是以消息 batch 的格式获取,然后存到缓存里。KafkaConsumer 提供了 poll 方法读取消息。它的原理是从缓存里直接获取,如果缓存里没有,才会向服务端发出请求。
Fetcher 使用了一个队列,来缓存从服务端获取的响应。当用户从缓存中读取消息时,会依次从队列里解析响应,返回消息。但是用户一次不能获取过多数量的消息,这个阈值由配置项 max.poll.records 指定,默认为500。
Fetcher 类还负责与服务端的交互。这里主要关注两个接口
- fetchedRecords,负责读取缓存消息
- sendFetches,负责发送请求
KafkaConsumer 读取消息的过程如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
public class KafkaConsumer<K, V> implements Consumer<K, V> { private final Fetcher<K, V> fetcher; // 消息缓存 // poll 方法做过简化,省略了Metadata和topic subcribe的步骤 private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) { // 调用 pollForFetches 获取消息 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); if (!records.isEmpty()) { // 注意到这里,成功获取之后,还会调用sendFetches方法,发送请求 // 因为kafka发送请求都是异步的,这里提前发出请求,可以有效的减少下次缓存为空而需要等待请求的时间 if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords<>(records)); } } private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) { // 首先从缓存中尝试读取 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } // 如果缓存为空,则向服务端发出请求 fetcher.sendFetches(); // 等待服务端的响应 client.poll(pollTimeout, startMs, () -> { // hasCompletedFetches 表示是否有成功的响应 return !fetcher.hasCompletedFetches(); }); // 从缓存中获取消息 return fetcher.fetchedRecords(); } } |
我们注意到 Consumer 在每次读取消息之后,都会触发一次发送请求,这样对于提高性能有好处,减少了下一次的请求等待时间。但是这样会存在一个问题,假想我们把 max.poll.records 设置为 1,这样每次从服务端返回的消息数量都比 1 大,那么缓存就会持续的增长,造成 OOM。
其实 Fetcher 每次发送请求,并不是拉取所有分区的消息。它的 fetchablePartitions 方法决定了请求的分区,它会检查分区在缓存中是否有对应的消息,如果有那么就不请求。这样就基本保证了缓存里拥有每个分区的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { // CompletedFetch代表着响应,对应着一个分区的消息 private final ConcurrentLinkedQueue<CompletedFetch> completedFetches; private List<TopicPartition> fetchablePartitions() { // 存储着哪些分区不需要请求 Set<TopicPartition> exclude = new HashSet<>(); // 获取分配的分区 List<TopicPartition> fetchable = subscriptions.fetchablePartitions(); // nextInLineRecords 表示正在解析的响应 if (nextInLineRecords != null && !nextInLineRecords.isFetched) { exclude.add(nextInLineRecords.partition); } // 如果在缓存里,该分区已经有了消息,则不需要请求 for (CompletedFetch completedFetch : completedFetches) { exclude.add(completedFetch.partition); } // 剔除掉那些不需要请求的分区 fetchable.removeAll(exclude); return fetchable; } } |
对于请求,不仅有分区的限制,还有每次请求返回的数据大小限制。不然如果一次请求的数据过大,容易造成内存溢出。我们可以观察请求的格式,发现有多个值来限制请求大小。
- max_bytes,表示响应数据的最大长度。可以通过配置 fetch.max.bytes 来指定,默认为 50MB
- min_bytes,表示响应数据的最小长度。可以通过配置 fetch.min.bytes 来指定,默认为 1B
每次请求还包含了多个分区,对于每个分区返回的数据大小,也有限制。通过配置 max.partition.fetch.bytes 来指定,默认为 1MB。这样我们能够粗略的计算出缓存的大小,分配的分区数量 * max.partition.fetch.bytes 。
rel:https://zhmin.github.io/2019/07/24/kafka-client-buffer-manage/
相关推荐
1/kafka是一个分布式的消息缓存系统 2/kafka集群中的服务器都叫做broker 3/kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接 4/kafka中...
DelphiKafkaClient是Apache 的跨平台Delphi客户端/包装器。 支持Windows(i386 / x64)和Linux(x64)。 在Delphi 10.4上进行了测试,但应与所有现代Delphi版本一起使用。 免责声明 尽管它似乎可以按预期工作,但...
在本篇Kafka快速入门教程中,我们主要探讨了如何使用Python客户端库`confluent-kafka`来与Apache Kafka进行交互。`confluent-kafka`是一个轻量级的Python模块,它对librdkafka进行了封装,支持Kafka 0.8以上的版本。...
Java Kafka客户端库提供`KafkaProducer`类用于创建生产者实例。 - 生产者配置包括设置Bootstrap Servers(Kafka集群地址),键值编码器,acks配置(决定何时确认消息已被接收)等。 - 生产者负责将消息序列化,并...
深入研究这些文件,你可以了解到如何在实际项目中使用Kafka,如何进行错误处理,以及如何优化性能,例如批量发送消息、设置合适的缓存大小等。 总的来说,Kafka资源文件是理解和实践Kafka技术的重要素材。通过分析...
6. API友好:Kafka提供了Java、Scala以及Python等多语言的客户端API,方便开发者集成到各种应用中。 安装Kafka 2.4.1的步骤大致如下: 1. 解压文件:首先,需要将"kafka_2.11-2.4.1.tgz"解压缩,通常会得到一个名...
- `clients`目录:包含Kafka的Java客户端库,供开发者在应用程序中使用。 在部署和使用Kafka时,你需要根据实际需求修改配置文件,如设置broker的端口、Zookeeper连接地址、数据存储路径等。然后,可以通过启动脚本...
- **性能优化**: 调整Zookeeper和Kafka的缓存大小,以减少磁盘I/O操作。 - **监控与日志**: 配置适当的监控工具,如Prometheus + Grafana,以便及时发现和解决问题。 - **安全性**: 配置SSL/TLS以保护数据传输安全,...
上述配置中,`source1`是一个监听特定端口的网络源,`kafkaSink`是目标Kafka sink,`channel1`是用于缓存数据的内存通道。 整合Flume与Kafka的关键在于Flume的类路径中包含正确的jar包,这样才能让Flume理解如何与...
2. Kafka的API使用:如何编写Java、Python或其他语言的客户端代码,发布和消费消息。 3. Kafka的数据持久化与故障恢复:Kafka的Log Compaction、Replication Factor等机制如何保证数据安全。 4. Redis的基础操作:...
**Kafka 概述** Kafka 是一个分布式流处理平台,由 LinkedIn 开发并后来成为 Apache 软件基金会...在实际应用中,还需要考虑如何合理设置主题、分区和消费组,以及如何管理和监控 Kafka 集群,以确保系统的稳定运行。
通过Zookeeper管理Kafka集群的元数据信息。 - **故障恢复**:通过副本机制和Leader选举机制,确保数据不丢失且服务持续可用。 #### 七、Kafka与生态系统集成 - **与Hadoop集成**:Kafka可以通过Flume、Sqoop等工具...
除此之外,还可以使用YammerMetrics来报告服务端和客户端的性能指标,或使用Kafka Manager来查看整个集群的性能指标。 性能测试脚本可以用于对Kafka集群的写入性能进行压测,例如使用命令行工具./kafka-producer-...
#### 四、Kafka客户端配置示例 在使用Java客户端时,还需要配置相关的依赖: ```xml <groupId>org.apache.kafka <artifactId>kafka-clients <version>3.7.0 ``` 其中,`kafka-clients`提供了与Kafka集群交互...
2. **启动 ZooKeeper 和 Kafka 服务**:Kafka 需要依赖 ZooKeeper 进行集群管理,启动 ZooKeeper 后再启动 Kafka 服务。 3. **创建主题**:使用 `kafka-topics.sh` 工具创建一个主题(topic)。 4. **生产者和消费者...
Redis是一个高性能的键值数据库,常用于缓存、会话管理以及提供实时数据服务。它的主要特点是数据结构丰富,支持字符串、哈希、列表、集合和有序集合等。Redis的所有操作都在内存中进行,因此读写速度极快。为了持久...
**管理客户端API** 提供了一系列工具和接口,用于管理和监控 Kafka 集群的状态,包括创建和删除主题、管理分区和副本等。 ##### 2.6 废弃的APIs 随着时间的发展和技术进步,某些早期版本中的API可能会被废弃。了解...
生产服务器配置(A Production Server Configs)部分提供了一些服务器端优化的配置建议,比如Java版本、硬件和操作系统、磁盘和文件系统、以及应用与操作系统缓存管理等。 监控(Monitoring)部分讲述了Kafka集群的...
2. **高吞吐量**:Kafka 被设计为处理大规模的实时数据流,能够在单个服务器上每秒处理数十万条消息,这得益于其高效的磁盘存储和内存缓存机制。 3. **持久化存储**:Kafka 将消息持久化到硬盘,这意味着即使在...
Kafka客户端(KClient) KClient是一个简单易用,有效集成,高性能,高稳定的Kafka Java客户端。 此文档包含了背景介绍、功能特性、使用指南、API简介、后台监控和管理、消息处理机模板项目、架构设计以及性能压测相关...