`
fish_no7
  • 浏览: 27966 次
文章分类
社区版块
存档分类
最新评论

Kafka producer client 源码包核心api学习

 
阅读更多

    上一篇博客,对producer的设计中的我所关注的点,比如如何进行partition,如何保证可靠发送(ack),消息在producer端的内存数据结构等,根据消息发送的流程,对代码进行了一个稍微的梳理和学习,但是还有一些疑问,暂时不打算关注,比如如何确保有序,协议层和网络层等。 看完了之前的流程,头脑有点晕晕的,今天打开client的源码包,挑出上次梳理流程中出现过的api和之前没有关注的api, 出来梳理和学习下

先看下package的结构。

1. BufferPool

BufferPool 能够提供一块特定大小的内存区域的循环使用,避免频繁的开辟和释放的开销;同时,保证对producer的公平,即优先给等待时间最长的producer提供可以使用的内存。

 

接下来看构造函数

 

/**
     * Create a new buffer pool
     * 
     * @param memory The maximum amount of memory that this buffer pool can allocate
     * @param poolableSize The buffer size to cache in the free list rather than deallocating
     * @param metrics instance of Metrics
     * @param time time instance
     * @param metricGrpName logical group name for metrics
     */
    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
        this.poolableSize = poolableSize;
        this.lock = new ReentrantLock();
        this.free = new ArrayDeque<ByteBuffer>();
        this.waiters = new ArrayDeque<Condition>();
        this.totalMemory = memory;
        this.availableMemory = memory;
        this.metrics = metrics;
        this.time = time;
        this.waitTime = this.metrics.sensor("bufferpool-wait-time");
        MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                                   metricGrpName,
                                                   "The fraction of time an appender waits for space allocation.");
        this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
    }

 其中关注memory和poolableSize参数,memory表示当前缓冲池能申请的最大内存,poolableSize表示buffer list中cached的buffer的size。

 

接下来看allocate方法

/**
     * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool
     * is configured with blocking mode.
     * 
     * @param size The buffer size to allocate in bytes
     * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available
     * @return The buffer
     * @throws InterruptedException If the thread is interrupted while blocked
     * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
     *         forever)
     */
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        this.lock.lock();
        try {
            // check if we have a free buffer of the right size pooled
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // now check if the request is immediately satisfiable with the
            // memory on hand or if we need to block
            int freeListSize = this.free.size() * this.poolableSize;
            if (this.availableMemory + freeListSize >= size) {
                // we have enough unallocated or pooled memory to immediately
                // satisfy the request
                freeUp(size);
                this.availableMemory -= size;
                lock.unlock();
                return ByteBuffer.allocate(size);
            } else {
                // we are out of memory and will have to block
                int accumulated = 0;
                ByteBuffer buffer = null;
                Condition moreMemory = this.lock.newCondition();
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                this.waiters.addLast(moreMemory);
                // loop over and over until we have a buffer or have reserved
                // enough memory to allocate one
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        this.waiters.remove(moreMemory);
                        throw e;
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        this.waitTime.record(timeNs, time.milliseconds());
                    }

                    if (waitingTimeElapsed) {
                        this.waiters.remove(moreMemory);
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    remainingTimeToBlockNs -= timeNs;
                    // check if we can satisfy this request from the free list,
                    // otherwise allocate memory
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        // just grab a buffer from the free list
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        // we'll need to allocate memory, but we may only get
                        // part of what we need on this iteration
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }

                // remove the condition for this thread to let the next thread
                // in line start getting memory
                Condition removed = this.waiters.removeFirst();
                if (removed != moreMemory)
                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");

                // signal any additional waiters if there is more memory left
                // over for them
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal();
                }

                // unlock and return the buffer
                lock.unlock();
                if (buffer == null)
                    return ByteBuffer.allocate(size);
                else
                    return buffer;
            }
        } finally {
            if (lock.isHeldByCurrentThread())
                lock.unlock();
        }
    }

 

注释足够清晰,这里我来给自己梳理下:申请给定大小的内存,如果没有足够的内存,线程会被阻塞,这个问题会在producer端消息的发送速度和消息的生产速度不匹配时被放大,因为线程的阻塞会影响应用的可用性,所以在设计和配置缓存大小的时候需要注意这个点。

进入代码, 

if (size == poolableSize && !this.free.isEmpty())

                return this.free.pollFirst();

如果缓存中刚好有一块内存,且大小刚好和申请的一样,则直接将队列中的byteBuffer返回就可以了;不然,则需要从物理内存中申请,申请代码如下:

if (this.availableMemory + freeListSize >= size) {

                // we have enough unallocated or pooled memory to immediately

                // satisfy the request

                freeUp(size);

                this.availableMemory -= size;

                lock.unlock();

                return ByteBuffer.allocate(size);

            }

如果内存不够,则需要阻塞当前线程。

 Condition moreMemory = this.lock.newCondition();

  long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);

  this.waiters.addLast(moreMemory);

waites是一个deque,BufferPool就是依靠这个双端队列维护对producer的“公平”。

阻塞方法:                       

waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);

还支持部分内存申请

 // we'll need to allocate memory, but we may only get

 // part of what we need on this iteration

freeUp(size - accumulated);

int got = (int) Math.min(size - accumulated, this.availableMemory);

this.availableMemory -= got;

accumulated += got;

自己申请完毕后,将自己从waiters中移除,并通知下个排队的人进入,很“友好”,

// remove the condition for this thread to let the next thread

                // in line start getting memory

                Condition removed = this.waiters.removeFirst();

                if (removed != moreMemory)

                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");

 

                // signal any additional waiters if there is more memory left

                // over for them

                if (this.availableMemory > 0 || !this.free.isEmpty()) {

                    if (!this.waiters.isEmpty())

                        this.waiters.peekFirst().signal();

                }

最后释放锁,至此整个申请过程完毕。

 

接下来我们看释放内存的方法deallocate

 

/**
     * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
     * memory as free.
     * 
     * @param buffer The buffer to return
     * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than buffer.capacity
     *             since the buffer may re-allocate itself during in-place compression
     */
    public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                this.availableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }

 代码很简单,如果是缓存池缓存队列规格大小的缓存,则直接入队列,否则,对availableMemory扩容,最后唤醒队列的第一个“同学”,去 拿内存吧,这就是整个释放的逻辑。

 

    整个BufferPool到此学习完毕,BufferPool的核心作用就是管理kafka producer端的异步发送缓存区的内存申请和释放,设计的优劣直接决定了Kafka producer端的可用性。其中对poolableSize的Bytebuffer进行cached和利用deque保证公平性的设计,都值得我们借鉴和学习。

 

2. ProducerInterceptors &&  ProducerInterceptor

    

public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }

 

 

     其实ProducerInterceptors 可以看作是ProducerInterceptor的包装,构造函数就是传入一个ProducerInterceptor的列表,然后依次调用。这里我们重点关注ProducerInterceptor接口

先看代码

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    public void close();

 

 

总共的api就是三个,onSend在KafkaProducer#send方法调用时调用,并且在key和value进行序列化以及parition指定之前调用,需要注意onSend方法的返回是ProducerRecord,通过这点可以判断ProducerInterceptor应该是支持拦截链的,并且可以对ProducerRecord进行拦截修改哦吐舌头;onAcknowledgement在服务器接收到结果的时候(This method is called when the record sent to the server has been acknowledged, or when sending the record fails before

     * it gets sent to the server.);close是在interceptor关闭的时候。

可以通过在配置KafkaProducer的时候使用如下代码启用

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
				"org.zh.kafka_study.producerpart.KafkaProducerSendInterceptor");

 这个功能我很喜欢,尤其是在二次开发后和spring在集成的时候很有用,具体的好处大家可以在实践中去体会。

 

3. Partitioner && DefaultPartitioner

partitioner是producer实现消息分区分发的核心接口,kafkaProducer在每次调用send方法的时候,会首先调用partition方法,如果在发送的时候指定了分区,则使用指定的分区,否则,调用partitioner接口获取当前消息要发放的分区,这就是producer的分区策略,具体可以看代码

 /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        if (partition != null) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
            int lastPartition = partitions.size() - 1;
            // they have given us a partition, use it
            if (partition < 0 || partition > lastPartition) {
                throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
            }
            return partition;
        }
        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
            cluster);
    }

 这个在上一篇博客中已经讲到了,这里就不讲了。这里我们还是关注partitioner接口和kafkaProducer提供的默认实现。

先看partition接口

/**
 * Partitioner Interface
 */

public interface Partitioner extends Configurable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

}

 

 核心就是partition方法,根据topic和key,value以及集群的相关信息,返回计算出来的分区number。

接下来我们再关注默认的实现,先看代码

/**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

代码很简单,获取当前消息的分区大小,然后看消息是否指定了key。如果指定了key,则对key的byte数组进行hash然后对hash值取余;如果没有指定的key,则看集群中是否有availablePartitions,如果有,则从availablePartitions轮询,否则,从所有的分区列表中轮询。

 

 暂时先写这么多,后期再接着写

 

  • 大小: 21.7 KB
分享到:
评论

相关推荐

    kafka-clients源码.zip

    《深入理解Kafka-clients源码》 Kafka-clients是Apache Kafka的重要组成部分,它提供了与Kafka集群交互的API,使得开发者能够...在实际开发中,结合源码调试和学习,将使我们对Kafka-clients有更深入的理解和运用。

    pentaho-kafka-producer.zip

    Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...

    Kafka Producer机制优化-提高发送消息可靠性

    ### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...

    kafka-clients-2.0.0-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka需要的源码包

    通过深入学习 Kafka 的源码,你可以更好地理解其内部工作原理,从而更有效地利用 Kafka 构建实时数据处理系统,解决实际问题。同时,这也会为你提供一个良好的基础,以便于参与到 Kafka 的开发和维护工作中去。

    kafka-clients-2.2.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...

    kafka-clients-2.0.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-clients-2.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.1.jar; 赠送原API文档:kafka-clients-2.0.1-javadoc.jar; 赠送源代码:kafka-clients-2.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.1.pom; 包含翻译后的API文档...

    java开发kafka-clients所需要的所有jar包以及源码

    Java开发Kafka客户端是构建基于Apache Kafka的应用程序的关键步骤,Kafka-clients库提供了与Kafka服务器进行交互的API,支持生产者和消费者的实现。在Java中使用Kafka-clients,你需要包含相应的jar包,这些包包含了...

    jemter-kafka连接器

    《JMeter与Kafka连接器:构建高并发数据流测试》 在现代大数据处理系统中,Apache Kafka作为一款分布式消息中间件,广泛应用于实时数据流处理。为了验证和优化Kafka系统的性能,开发者通常需要进行大规模并发数据...

    第12单元 Kafka producer拦截器与Kafka Streams1

    "Kafka Producer拦截器与Kafka Streams" Kafka Producer拦截器是Kafka 0.10版本引入的主要用于实现clients端的定制化控制逻辑。 Producer拦截器使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制...

    基于Kafka的管理系统源码.zip

    基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...

    kafka client依赖包和示例代码

    版本:kafka_2.12-0.10.2.1,生产者消费者示例代码,以及client相关包。 部分代码: public static KafkaProducer, String&gt; getProducer() { if (kp == null) { Properties props = new Properties(); props....

    Python-kafka集群搭建PythonAPI调用Producer和Consumer

    在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...

    kafka的java依赖包

    首先,我们需要理解Kafka的核心概念。Kafka是一种发布订阅模型的消息中间件,它主要由生产者、消费者、主题和分区组成。生产者负责创建和发送消息到主题,消费者则从主题中消费消息。主题是逻辑上的分类,而分区是...

    kafka-clients-2.2.0-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...

    spring cloud kafka数据中间件源码

    "dm-kafka-client"可能是一个自定义的Kafka客户端库,用于封装Spring Cloud Kafka的相关操作,提供更方便的API供应用调用。这个客户端可能包含了生产者和消费者的实现,以及一些定制的配置和工具类,以便于在多平台...

    kafka需要的jar包集合

    1. **kafka-clients.jar**:这是Kafka Java客户端的核心库,包含了生产者、消费者以及其他客户端API,用于连接Kafka服务器,发送和接收消息。 2. **slf4j-api.jar**:简单日志门面(SLF4J)是一个用于各种日志框架...

    kafka-clients-2.4.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    kettle kafka 消息生产插件

    在Kettle的工作流程中,此插件作为一个步骤,允许数据从各种源(如数据库、文件、API等)抽取后,无缝地推送到Kafka主题,进一步实现数据的实时流动和处理。 标签“kettle kafka produce”强调了这个插件的主要功能...

Global site tag (gtag.js) - Google Analytics