`
flamezealot
  • 浏览: 20755 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

kafka partition

 
阅读更多

最近有个项目,要求通过kafka传较大的消息,先不论这么做操蛋不操蛋,反正咱研究研究也没错。

网上搜了一些文章,据说把消息拆成多片,然后如果使用一样的key,这些消息片都会被发到同一个partition上,就可以利用kafka同一个partition里消息是有序的这个特性来重新组装文件。

于是问题就来了,为什么key相同,就会被发到同一个partition上呢,key是啥类型都行吗,会不会有坑呢。咱先翻翻kafkaTemplate的send代码

public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
		ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);//此处没有指定partition,也没指定key
		return doSend(producerRecord);
	}

 先new一个producerRecord对象,它有多个构造方法,可以同时指定topic、partiion、key、value

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
		if (this.producer == null) {
			synchronized (this) {
				if (this.producer == null) {
					this.producer = this.producerFactory.createProducer();
				}
			}
		}//double check 生成singleton 的producer,createProducer方法用了委派模式,我的理解是spring为了屏蔽kafka底层代码的修改
		
		final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
		this.producer.send(producerRecord, new Callback() {
........
				

SettableListenableFuture,一个可以被set(Object)和setException(throwable)的future。咦,producer是singleton的,先mark下。咱先来send。

// first make sure the metadata for the topic is available
            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);//更新metadata,主要是同步topic对应的partition信息
            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);

                serializedKey = keySerializer.serialize(record.topic(), record.key());
           
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
           
            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());//分配partition
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);//加上kafka log头,具体可以看kafka的文件存储格式
            ensureValidRecordSize(serializedSize);//检查消息大小maxRequestSize、totalMemorySize
            TopicPartition tp = new TopicPartition(record.topic(), partition);
           

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);//把消息append到一个内存队列,等待发送到server
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;

 

新的问题来了,kafka消息莫非是batch发送的?又mark一下。。。,先看partition

 

 

return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
            cluster);

 

partition方法很简单,哈哈。。。,好吧,其实也没那么简单,partitioner是个接口,也就是说我可以自己实现自己的partitioner,我们先看看default的DefaultPartitioner吧,懒。。。。

 

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//从meta里拿topic对应的partition列表
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();//看到这就觉得大事不好,差点被坑
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;//看到这里心情又好了,哈哈,看来不要老是一惊一乍的
        }
    }

 

 结论是文章里说的是靠谱的,这篇就到此为止了吧。

 

 额,好像有什么事情忘掉了,mark的东东还没研究好,那咱继续,那先看看发送的内存队列吧,看起来比较有意思

 

 Deque<RecordBatch> dq = dequeFor(tp);//每个topicpartition建一个queue,queue保存RecordBatch,里面包含了数据缓冲区、输出流
            synchronized (dq) {
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());//消息追加到缓冲区,缓冲区不够时返回null
                    if (future != null)
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }

            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
           
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);//分配新的缓冲区
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                    if (future != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        free.deallocate(buffer);//这意思是其他消息创建了新的batch?于是释放新申请的buffer
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                    }
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);//创建新的MemoryRecords,包含了buffer、输出流
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//创建新的RecordBatch 
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));

                dq.addLast(batch);//加入到dq队尾
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);

结合前面看到的代码,那么dq.size() > 1 || batch.records.isFull()或者是新建了一个RecordBatch的时候,sender会被唤醒。再去看sender,sender在前面创建KafkaProducer时创建,截取一个片段

 String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

 既然是个thread,那就得跑跑跑跑跑啊,那来看看run

Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);

        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);

 

一个词,复杂,不爱看,还是看看 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);找个吧,怎么着就ready了呢

 

        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();

            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader)) {
                synchronized (deque) {
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;//尝试次数大于0,且离上次重试的时间还没超过重试等待时间
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;//已经等待的时间
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;//应等待时间
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);//离下次发送还剩余的时间
                        boolean full = deque.size() > 1 || batch.records.isFull();//判断是否有BatchRecord满了
                        boolean expired = waitedTimeMs >= timeToWaitMs;//结合前面的代码,每有batchrecord满了的时候,才唤醒sender,所以有可能已等待时间超过了应等待时间
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);

  

boolean sendable = full || expired || exhausted || closed || flushInProgress();

就是说有batchRecord满了、时间到了、RecordAccumulator内存耗尽了、关闭了、flush了,就是ready了,就可以发送啦。

 

分享到:
评论

相关推荐

    kafka自定义partition分发策略实例代码.zip

    kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现

    kafka中partition和消费者对应关系1

    在Kafka中,Partition与Consumer的关系是理解Kafka消费模型的关键。Partition是Kafka主题(Topic)的逻辑分片,每个Partition内部的消息是有序的,并且只能被同一个消费者组(Consumer Group)中的一个消费者实例...

    springboot整合kafka,指定分区发送,批量消费,指定topic分区消费

    在本文中,我们将深入探讨如何在Spring Boot 2.x应用程序中整合Apache Kafka,重点是实现指定分区发送、批量消费以及指定topic分区消费的功能。Apache Kafka是一个分布式流处理平台,广泛用于构建实时数据管道和流...

    kafka自定义partition分发策略代码实例

    kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现

    kafka_topic创建、分区、删除管理

    在分布式消息系统Kafka中,Topic是数据的逻辑存储单元,它是消息的分类或主题。本文将深入探讨如何管理和操作Kafka的Topic,包括创建、查看、分区以及删除等核心概念。 **创建Topic** 在Kafka中,创建Topic通常通过...

    kafka集群部署文档(部署,运维,FAQ)

    #### 三、Kafka Topic与Partition - **Topic**:消息存储的基本单元,一个Topic可以包含多个Partition。 - **Partition**:Topic下的子单元,每个Partition内的消息是有序的,并且拥有唯一的偏移量(offset)标识每一...

    kafka可视化工具--kafkatool

    4. **查看与管理Offsets**:Kafkatool允许用户查看每个partition的最小和最大offset,以及consumer group的当前offset。用户还可以手动设置或重置offset,这对于调试和测试是非常有用的。 5. **创建与删除Topic**:...

    华为HCIA-大数据认证练试题与答案-202010.docx

    12. **Kafka Partition**:Kafka中的Partition是消息的分片,Offset是每个Partition内的消息顺序号,通常用`(offset.partition.topic)`来标识一条消息。 13. **Channel和Sink**:在数据流处理中,Channel用于临时...

    kafka2种工具 kafkatool-64bit.exe kafka-eagle-bin-1.4.6.tar.gz

    1. **实时监控**:展示Kafka集群的实时状态,包括Broker性能指标、Topic和Partition的健康状况、延迟情况等。 2. **报警机制**:当监控的指标超过预设阈值时,Kafka-Eagle可以触发报警,帮助及时发现并解决问题。 3....

    kafka原理文档1

    下面是kafka的原理文档,涵盖了kafka的架构、设计理念、消息模型、 Partition机制、日志策略、消息可靠性机制等方面。 一、kafka架构 kafka的架构主要包括以下几个部分: *Broker:kafka集群中的每个节点称为...

    RdKafka::KafkaConsumer使用实例

    rd_kafka_topic_partition_list_add(topics, "my-topic", RD_KAFKA_PARTITION_UA); int err = rd_kafka_subscribe(consumer, topics); rd_kafka_topic_partition_list_destroy(topics); if (err != RD_KAFKA_RESP_...

    mysql postgresql Greenplum 实时同步

    3. **多pipeline支持**:每个数据源可以配置多个pipeline,对于Maxwell来说,每个Kafka partition对应一个pipeline;对于Debezium,则每个Kafka topic对应一个pipeline。 #### 五、搭建实例步骤 为了更好地理解...

    Kafka简介及使用PHP处理Kafka消息

    3. 支持 Kafka Server 间的消息分区,同时保证每个 Partition 内的消息顺序传输。 4. 分布式系统,易于向外扩展。所有的 producer、broker 和 consumer 都会有多个,均为分布式的。 5. 无需停机即可扩展机器。 6. ...

    kafka-java-demo 基于java的kafka生产消费者示例

    主题可以被分成多个分区(Partition),每个分区可以有多个副本(Replica)以实现数据冗余和高可用性。 【多线程与Kafka】 在"Kafka-java-demo"中,你可能会看到如何利用多线程来并行处理生产或消费任务,提升数据...

    Kafka详细课程讲义

    3. **Partition**: 为了提高并行处理能力,每个主题被分为多个分区,每个分区在不同的broker上。 4. **Producer**: 生产者负责将消息发送到主题的特定分区。 5. **Consumer Group**: 消费者通过加入消费组来共同消费...

    Kafka the Definitive Guide 2nd Edition

    * 分区(Partition):Kafka 中的分区机制,用于分布式存储和处理消息。 * 生产者(Producer):Kafka 中的生产者角色,用于发送消息。 * 消费者(Consumer):Kafka 中的消费者角色,用于消费消息。 * Broker:...

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    每个主题可以被分成多个分区(Partition),保证了消息的顺序性和高可用性。 2. **分区与副本**:Kafka的分区是其分布式特性的基础。每个分区都有一个主副本(Leader)和零个或多个从副本(Follower)。如果主副本...

    Kafka尚硅谷.rar

    Kafka的核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)和分区(Partition)。生产者负责发布消息到主题,消费者则订阅并消费这些消息。主题是逻辑上的分类,可以将消息分为不同的类别。分区是...

    kafka配置调优实践

    Kafka 配置调优实践 Kafka 配置调优实践是指通过调整 Kafka 集群的参数配置来提高其吞吐性能。下面是 Kafka 配置调优实践的知识点总结: 一、存储优化 * 数据目录优先存储到 XFS 文件系统或者 EXT4,避免使用 EXT...

Global site tag (gtag.js) - Google Analytics