`
iaiai
  • 浏览: 2203386 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Kafka源码分析-序列5 -Producer -RecordAccumulator队列分析

    博客分类:
  • J2EE
 
阅读更多
在Kafka源码分析-序列2中,我们提到了整个Producer client的架构图,如下所示:

其它几个组件我们在前面都讲过了,今天讲述最后一个组件RecordAccumulator.

Batch发送

在以前的kafka client中,每条消息称为 “Message”,而在Java版client中,称之为”Record”,同时又因为有批量发送累积功能,所以称之为RecordAccumulator.

RecordAccumulator最大的一个特性就是batch消息,扔到队列中的多个消息,可能组成一个RecordBatch,然后由Sender一次性发送出去。

每个TopicPartition一个队列

下面是RecordAccumulator的内部结构,可以看到,每个TopicPartition对应一个消息队列,只有同一个TopicPartition的消息,才可能被batch。
public final class RecordAccumulator {
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

   ...
}

batch的策略

那什么时候,消息会被batch,什么时候不会呢?下面从KafkaProducer的send方法看起:
//KafkaProducer
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        try {
            // first make sure the metadata for the topic is available
            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);

            ...

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);   //核心函数:把消息放入队列

            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;

从上面代码可以看到,batch逻辑,都在accumulator.append函数里面:
    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
        appendsInProgress.incrementAndGet();
        try {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            Deque<RecordBatch> dq = dequeFor(tp);  //找到该topicPartiton对应的消息队列
            synchronized (dq) {
                RecordBatch last = dq.peekLast(); //拿出队列的最后1个元素
                if (last != null) {  
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); //最后一个元素, 即RecordBatch不为空,把该Record加入该RecordBatch
                    if (future != null)
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }

            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                    if (future != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        free.deallocate(buffer);
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                    }
                }

                //队列里面没有RecordBatch,建一个新的,然后把Record放进去
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

    private Deque<RecordBatch> dequeFor(TopicPartition tp) {
        Deque<RecordBatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new ArrayDeque<>();
        Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
    }

从上面代码我们可以看出Batch的策略:
1。如果是同步发送,每次去队列取,RecordBatch都会为空。这个时候,消息就不会batch,一个Record形成一个RecordBatch

2。Producer 入队速率 < Sender出队速率 && lingerMs = 0 ,消息也不会被batch

3。Producer 入队速率 > Sender出对速率, 消息会被batch

4。lingerMs > 0,这个时候Sender会等待,直到lingerMs > 0 或者 队列满了,或者超过了一个RecordBatch的最大值,就会发送。这个逻辑在RecordAccumulator的ready函数里面。
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<Node>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        boolean unknownLeadersExist = false;

        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();

            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader)) {
                synchronized (deque) {
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        boolean full = deque.size() > 1 || batch.records.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();  //关键的一句话
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {

                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
    }

为什么是Deque?

在上面我们看到,消息队列用的是一个“双端队列“,而不是普通的队列。
一端生产,一端消费,用一个普通的队列不就可以吗,为什么要“双端“呢?

这其实是为了处理“发送失败,重试“的问题:当消息发送失败,要重发的时候,需要把消息优先放入队列头部重新发送,这就需要用到双端队列,在头部,而不是尾部加入。

当然,即使如此,该消息发出去的顺序,还是和Producer放进去的顺序不一致了。
  • 大小: 250.1 KB
分享到:
评论
1 楼 jiazimo 2017-03-09  

相关推荐

    kafka_2.12-3.3.1.tgz

    6. **灵活的数据模型**: Kafka支持多种数据模型,包括简单的消息队列、时间序列数据存储以及复杂的事件流处理。这使得它在各种场景下都能发挥作用,如日志聚合、用户行为追踪、物联网(IoT)数据处理等。 7. **广泛的...

    kafka_2.11-2.2.0.tgz

    3. **消息队列**:Kafka作为一个高可用的消息中间件,可以替代传统的消息队列系统。 4. **事件源**:在微服务架构中,Kafka作为事件源,使得服务间通信更加解耦。 5. **数据集成**:在数据湖或数据仓库建设中,...

    kafka-java-demo 基于java的kafka生产消费者例子

    Kafka是一种高吞吐量、低延迟的消息队列系统,它允许应用程序以发布/订阅模式进行通信。在Kafka中,数据以主题(Topic)的形式存储,每个主题可以分为多个分区(Partition),确保数据的分布和负载均衡。此外,Kafka...

    springboot-kafka-simple-demo

    Kafka是一个高吞吐量、低延迟的消息队列,它可以处理PB级别的数据,适用于日志收集、实时分析、流处理等多种场景。其核心组成部分包括生产者(Producer)、消费者(Consumer)以及主题(Topic)。 在SpringBoot项目...

    kafka_2.10-0.8.2.0.tgz

    1.3 主题与分区(Topics and Partitions):主题是逻辑上的消息队列,每个主题可以被分为多个分区,分区是有序且不可变的消息序列。 1.4 broker:Kafka集群由多个broker节点组成,它们存储并处理主题的分区。0.8....

    kafka_2.11-0.10.1.0及使用说明

    它最初设计为一个高吞吐量、低延迟的消息队列系统,但随着时间的发展,Kafka已经发展成为一个强大的实时数据管道和流处理工具。在版本2.11-0.10.1.0中,Kafka继续提供了高度可扩展性和容错性,适用于大数据实时处理...

    pentaho-kafka-producer.zip

    Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...

    confluent-kafka-dotnet-1.4.3.zip

    Kafka,由LinkedIn开发并开源的分布式流处理平台,已成为大数据领域中的消息队列标准。Confluent Kafka作为其官方提供的商业支持版本,提供了丰富的功能和优化,包括更稳定的性能、全面的监控以及强大的连接器。在...

    kafka_2.10-0.10.0.0

    了解并掌握这些知识点,将有助于你有效地使用和部署Kafka 0.10.0.0,实现高效的数据流处理和实时分析。在实际应用中,还需要关注Kafka与其他系统的集成,如Hadoop、Elasticsearch等,以构建完整的数据处理链路。

    消息队列kafka源码详细讲解分析

    《Kafka消息队列源码深度解析》 Kafka,由LinkedIn开源并被Apache基金会接纳,是一款高吞吐量的分布式消息系统,广泛应用于大数据实时处理、日志收集、流计算等多个领域。本文将深入剖析Kafka的核心概念,以及其...

    Kafka-Tail-Producer:此应用程序用于通过 Linux tail cmd 从指定的日志文件中收集日志数据

    综上所述,Kafka-Tail-Producer是一个利用Java和Linux `tail`命令实现的实时日志数据收集工具,它与Apache Kafka紧密集成,为企业提供了强大的实时日志分析能力。通过合理的配置和扩展,可以满足不同规模和复杂性的...

    kafka队列下载

    标题中的“kafka队列下载”指的是Apache Kafka的下载过程,Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用。描述中的“kafka_2.12-0.11.0.0”是Kafka的一个特定版本,它表明是在使用Scala 2.12编译的...

    Kafka源码解析与实战

    #### 三、Kafka源码分析 ##### 3.1 生产者发送流程 1. **消息序列化**:生产者在发送消息之前,首先需要将消息对象序列化为字节数组。 2. **消息分发**:根据用户指定的分区策略(如轮询、按key哈希等),将消息...

    kafka环境搭建

    - **Producer**:Producer 负责向 Kafka 发布消息,可以指定消息发送到哪个 Topic 和哪个 Partition。 - **Consumer**:Consumer 用于从 Kafka 中读取消息。消费者可以按照队列模式或发布-订阅模式来消费消息。 ###...

    kafka-client-go:卡夫卡客户

    Kafka 是基于发布/订阅模式的消息队列,它的核心组件包括生产者(Producer)、消费者(Consumer)和代理(Broker)。生产者负责发布消息到主题,消费者订阅主题并消费消息,而 Broker 是 Kafka 集群中的服务器节点,...

    17道消息队列Kafka面试题!.zip

    标题"17道消息队列Kafka面试题!"表明这是一个关于Kafka面试问题的集合,可能涵盖了Kafka的核心概念、功能特性、使用场景以及常见问题等方面。描述同样强调了是17个与Kafka相关的面试问题,这暗示了我们可能需要了解...

    2、java调用kafka api

    Apache Kafka是一个高性能、可扩展的消息队列(MQ),它提供实时的数据流处理能力。在Java应用程序中集成Kafka API可以让开发者轻松地将消息生产到Kafka主题(topics)中,或者从这些主题中消费消息。 首先,让我们...

    kafka-starter-app-maven:Kafka生产者,消费者和消费者群体入门

    Apache Kafka是一个高吞吐量、低延迟的消息队列系统,最初由LinkedIn开发,后来成为Apache软件基金会的顶级项目。Kafka的设计目标是提供一个能够处理大量实时数据的平台,它支持发布订阅模型,可以用于日志聚合、流...

    kafka的安装和简单实例测试

    由于其出色的性能表现和可扩展性,Kafka已经成为大数据领域中最受欢迎的消息队列中间件之一。 #### 二、Kafka的核心概念 在深入了解Kafka的安装过程之前,我们需要先掌握一些Kafka的基本概念: 1. **Broker**:...

Global site tag (gtag.js) - Google Analytics