上一篇博客,对producer的设计中的我所关注的点,比如如何进行partition,如何保证可靠发送(ack),消息在producer端的内存数据结构等,根据消息发送的流程,对代码进行了一个稍微的梳理和学习,但是还有一些疑问,暂时不打算关注,比如如何确保有序,协议层和网络层等。 看完了之前的流程,头脑有点晕晕的,今天打开client的源码包,挑出上次梳理流程中出现过的api和之前没有关注的api, 出来梳理和学习下
先看下package的结构。
1. BufferPool
BufferPool 能够提供一块特定大小的内存区域的循环使用,避免频繁的开辟和释放的开销;同时,保证对producer的公平,即优先给等待时间最长的producer提供可以使用的内存。
接下来看构造函数
/** * Create a new buffer pool * * @param memory The maximum amount of memory that this buffer pool can allocate * @param poolableSize The buffer size to cache in the free list rather than deallocating * @param metrics instance of Metrics * @param time time instance * @param metricGrpName logical group name for metrics */ public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { this.poolableSize = poolableSize; this.lock = new ReentrantLock(); this.free = new ArrayDeque<ByteBuffer>(); this.waiters = new ArrayDeque<Condition>(); this.totalMemory = memory; this.availableMemory = memory; this.metrics = metrics; this.time = time; this.waitTime = this.metrics.sensor("bufferpool-wait-time"); MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); }
其中关注memory和poolableSize参数,memory表示当前缓冲池能申请的最大内存,poolableSize表示buffer list中cached的buffer的size。
接下来看allocate方法
/** * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool * is configured with blocking mode. * * @param size The buffer size to allocate in bytes * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available * @return The buffer * @throws InterruptedException If the thread is interrupted while blocked * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block * forever) */ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = this.free.size() * this.poolableSize; if (this.availableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request freeUp(size); this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); } else { // we are out of memory and will have to block int accumulated = 0; ByteBuffer buffer = null; Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } if (waitingTimeElapsed) { this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list buffer = this.free.pollFirst(); accumulated = size; } else { // we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } } // remove the condition for this thread to let the next thread // in line start getting memory Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // signal any additional waiters if there is more memory left // over for them if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } // unlock and return the buffer lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { if (lock.isHeldByCurrentThread()) lock.unlock(); } }
注释足够清晰,这里我来给自己梳理下:申请给定大小的内存,如果没有足够的内存,线程会被阻塞,这个问题会在producer端消息的发送速度和消息的生产速度不匹配时被放大,因为线程的阻塞会影响应用的可用性,所以在设计和配置缓存大小的时候需要注意这个点。
进入代码,
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
如果缓存中刚好有一块内存,且大小刚好和申请的一样,则直接将队列中的byteBuffer返回就可以了;不然,则需要从物理内存中申请,申请代码如下:
if (this.availableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request
freeUp(size);
this.availableMemory -= size;
lock.unlock();
return ByteBuffer.allocate(size);
}
如果内存不够,则需要阻塞当前线程。
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
waites是一个deque,BufferPool就是依靠这个双端队列维护对producer的“公平”。
阻塞方法:
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
还支持部分内存申请
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
自己申请完毕后,将自己从waiters中移除,并通知下个排队的人进入,很“友好”,
// remove the condition for this thread to let the next thread
// in line start getting memory
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
// signal any additional waiters if there is more memory left
// over for them
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
最后释放锁,至此整个申请过程完毕。
接下来我们看释放内存的方法deallocate
/** * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the * memory as free. * * @param buffer The buffer to return * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity * since the buffer may re-allocate itself during in-place compression */ public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.availableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
代码很简单,如果是缓存池缓存队列规格大小的缓存,则直接入队列,否则,对availableMemory扩容,最后唤醒队列的第一个“同学”,去 拿内存吧,这就是整个释放的逻辑。
整个BufferPool到此学习完毕,BufferPool的核心作用就是管理kafka producer端的异步发送缓存区的内存申请和释放,设计的优劣直接决定了Kafka producer端的可用性。其中对poolableSize的Bytebuffer进行cached和利用deque保证公平性的设计,都值得我们借鉴和学习。
2. ProducerInterceptors && ProducerInterceptor
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) { this.interceptors = interceptors; }
其实ProducerInterceptors 可以看作是ProducerInterceptor的包装,构造函数就是传入一个ProducerInterceptor的列表,然后依次调用。这里我们重点关注ProducerInterceptor接口
先看代码
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); public void onAcknowledgement(RecordMetadata metadata, Exception exception); public void close();
总共的api就是三个,onSend在KafkaProducer#send方法调用时调用,并且在key和value进行序列化以及parition指定之前调用,需要注意onSend方法的返回是ProducerRecord,通过这点可以判断ProducerInterceptor应该是支持拦截链的,并且可以对ProducerRecord进行拦截修改哦;onAcknowledgement在服务器接收到结果的时候(This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
* it gets sent to the server.);close是在interceptor关闭的时候。
可以通过在配置KafkaProducer的时候使用如下代码启用
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.zh.kafka_study.producerpart.KafkaProducerSendInterceptor");
这个功能我很喜欢,尤其是在二次开发后和spring在集成的时候很有用,具体的好处大家可以在实践中去体会。
3. Partitioner && DefaultPartitioner
partitioner是producer实现消息分区分发的核心接口,kafkaProducer在每次调用send方法的时候,会首先调用partition方法,如果在发送的时候指定了分区,则使用指定的分区,否则,调用partitioner接口获取当前消息要发放的分区,这就是producer的分区策略,具体可以看代码
/** * computes partition for given record. * if the record has partition returns the value otherwise * calls configured partitioner class to compute the partition. */ private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); if (partition != null) { List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); int lastPartition = partitions.size() - 1; // they have given us a partition, use it if (partition < 0 || partition > lastPartition) { throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition)); } return partition; } return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
这个在上一篇博客中已经讲到了,这里就不讲了。这里我们还是关注partitioner接口和kafkaProducer提供的默认实现。
先看partition接口
/** * Partitioner Interface */ public interface Partitioner extends Configurable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); }
核心就是partition方法,根据topic和key,value以及集群的相关信息,返回计算出来的分区number。
接下来我们再关注默认的实现,先看代码
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
代码很简单,获取当前消息的分区大小,然后看消息是否指定了key。如果指定了key,则对key的byte数组进行hash然后对hash值取余;如果没有指定的key,则看集群中是否有availablePartitions,如果有,则从availablePartitions轮询,否则,从所有的分区列表中轮询。
暂时先写这么多,后期再接着写
相关推荐
《深入理解Kafka-clients源码》 Kafka-clients是Apache Kafka的重要组成部分,它提供了与Kafka集群交互的API,使得开发者能够...在实际开发中,结合源码调试和学习,将使我们对Kafka-clients有更深入的理解和运用。
Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...
### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
通过深入学习 Kafka 的源码,你可以更好地理解其内部工作原理,从而更有效地利用 Kafka 构建实时数据处理系统,解决实际问题。同时,这也会为你提供一个良好的基础,以便于参与到 Kafka 的开发和维护工作中去。
赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-2.0.1.jar; 赠送原API文档:kafka-clients-2.0.1-javadoc.jar; 赠送源代码:kafka-clients-2.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.1.pom; 包含翻译后的API文档...
Java开发Kafka客户端是构建基于Apache Kafka的应用程序的关键步骤,Kafka-clients库提供了与Kafka服务器进行交互的API,支持生产者和消费者的实现。在Java中使用Kafka-clients,你需要包含相应的jar包,这些包包含了...
《JMeter与Kafka连接器:构建高并发数据流测试》 在现代大数据处理系统中,Apache Kafka作为一款分布式消息中间件,广泛应用于实时数据流处理。为了验证和优化Kafka系统的性能,开发者通常需要进行大规模并发数据...
"Kafka Producer拦截器与Kafka Streams" Kafka Producer拦截器是Kafka 0.10版本引入的主要用于实现clients端的定制化控制逻辑。 Producer拦截器使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制...
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
版本:kafka_2.12-0.10.2.1,生产者消费者示例代码,以及client相关包。 部分代码: public static KafkaProducer, String> getProducer() { if (kp == null) { Properties props = new Properties(); props....
在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...
首先,我们需要理解Kafka的核心概念。Kafka是一种发布订阅模型的消息中间件,它主要由生产者、消费者、主题和分区组成。生产者负责创建和发送消息到主题,消费者则从主题中消费消息。主题是逻辑上的分类,而分区是...
赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...
"dm-kafka-client"可能是一个自定义的Kafka客户端库,用于封装Spring Cloud Kafka的相关操作,提供更方便的API供应用调用。这个客户端可能包含了生产者和消费者的实现,以及一些定制的配置和工具类,以便于在多平台...
1. **kafka-clients.jar**:这是Kafka Java客户端的核心库,包含了生产者、消费者以及其他客户端API,用于连接Kafka服务器,发送和接收消息。 2. **slf4j-api.jar**:简单日志门面(SLF4J)是一个用于各种日志框架...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
在Kettle的工作流程中,此插件作为一个步骤,允许数据从各种源(如数据库、文件、API等)抽取后,无缝地推送到Kafka主题,进一步实现数据的实时流动和处理。 标签“kettle kafka produce”强调了这个插件的主要功能...