最近有个项目,要求通过kafka传较大的消息,先不论这么做操蛋不操蛋,反正咱研究研究也没错。
网上搜了一些文章,据说把消息拆成多片,然后如果使用一样的key,这些消息片都会被发到同一个partition上,就可以利用kafka同一个partition里消息是有序的这个特性来重新组装文件。
于是问题就来了,为什么key相同,就会被发到同一个partition上呢,key是啥类型都行吗,会不会有坑呢。咱先翻翻kafkaTemplate的send代码
public ListenableFuture<SendResult<K, V>> send(String topic, V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);//此处没有指定partition,也没指定key return doSend(producerRecord); }
先new一个producerRecord对象,它有多个构造方法,可以同时指定topic、partiion、key、value
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { if (this.producer == null) { synchronized (this) { if (this.producer == null) { this.producer = this.producerFactory.createProducer(); } } }//double check 生成singleton 的producer,createProducer方法用了委派模式,我的理解是spring为了屏蔽kafka底层代码的修改 final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>(); this.producer.send(producerRecord, new Callback() { ........
SettableListenableFuture,一个可以被set(Object)和setException(throwable)的future。咦,producer是singleton的,先mark下。咱先来send。
// first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);//更新metadata,主要是同步topic对应的partition信息 long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs); serializedKey = keySerializer.serialize(record.topic(), record.key()); serializedValue = valueSerializer.serialize(record.topic(), record.value()); int partition = partition(record, serializedKey, serializedValue, metadata.fetch());//分配partition int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);//加上kafka log头,具体可以看kafka的文件存储格式 ensureValidRecordSize(serializedSize);//检查消息大小maxRequestSize、totalMemorySize TopicPartition tp = new TopicPartition(record.topic(), partition); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);//把消息append到一个内存队列,等待发送到server if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
新的问题来了,kafka消息莫非是batch发送的?又mark一下。。。,先看partition
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
partition方法很简单,哈哈。。。,好吧,其实也没那么简单,partitioner是个接口,也就是说我可以自己实现自己的partitioner,我们先看看default的DefaultPartitioner吧,懒。。。。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//从meta里拿topic对应的partition列表 int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement();//看到这就觉得大事不好,差点被坑 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;//看到这里心情又好了,哈哈,看来不要老是一惊一乍的 } }
结论是文章里说的是靠谱的,这篇就到此为止了吧。
额,好像有什么事情忘掉了,mark的东东还没研究好,那咱继续,那先看看发送的内存队列吧,看起来比较有意思
Deque<RecordBatch> dq = dequeFor(tp);//每个topicpartition建一个queue,queue保存RecordBatch,里面包含了数据缓冲区、输出流 synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());//消息追加到缓冲区,缓冲区不够时返回null if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); ByteBuffer buffer = free.allocate(size, maxTimeToBlock);//分配新的缓冲区 synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer);//这意思是其他消息创建了新的batch?于是释放新申请的buffer return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);//创建新的MemoryRecords,包含了buffer、输出流 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//创建新的RecordBatch FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); dq.addLast(batch);//加入到dq队尾 incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
结合前面看到的代码,那么dq.size() > 1 || batch.records.isFull()或者是新建了一个RecordBatch的时候,sender会被唤醒。再去看sender,sender在前面创建KafkaProducer时创建,截取一个片段
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
既然是个thread,那就得跑跑跑跑跑啊,那来看看run
Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now);
一个词,复杂,不爱看,还是看看 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);找个吧,怎么着就ready了呢
boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;//尝试次数大于0,且离上次重试的时间还没超过重试等待时间 long waitedTimeMs = nowMs - batch.lastAttemptMs;//已经等待的时间 long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;//应等待时间 long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);//离下次发送还剩余的时间 boolean full = deque.size() > 1 || batch.records.isFull();//判断是否有BatchRecord满了 boolean expired = waitedTimeMs >= timeToWaitMs;//结合前面的代码,每有batchrecord满了的时候,才唤醒sender,所以有可能已等待时间超过了应等待时间 boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
boolean sendable = full || expired || exhausted || closed || flushInProgress();
就是说有batchRecord满了、时间到了、RecordAccumulator内存耗尽了、关闭了、flush了,就是ready了,就可以发送啦。
相关推荐
kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现
在Kafka中,Partition与Consumer的关系是理解Kafka消费模型的关键。Partition是Kafka主题(Topic)的逻辑分片,每个Partition内部的消息是有序的,并且只能被同一个消费者组(Consumer Group)中的一个消费者实例...
在本文中,我们将深入探讨如何在Spring Boot 2.x应用程序中整合Apache Kafka,重点是实现指定分区发送、批量消费以及指定topic分区消费的功能。Apache Kafka是一个分布式流处理平台,广泛用于构建实时数据管道和流...
kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现
在分布式消息系统Kafka中,Topic是数据的逻辑存储单元,它是消息的分类或主题。本文将深入探讨如何管理和操作Kafka的Topic,包括创建、查看、分区以及删除等核心概念。 **创建Topic** 在Kafka中,创建Topic通常通过...
#### 三、Kafka Topic与Partition - **Topic**:消息存储的基本单元,一个Topic可以包含多个Partition。 - **Partition**:Topic下的子单元,每个Partition内的消息是有序的,并且拥有唯一的偏移量(offset)标识每一...
1. **实时监控**:展示Kafka集群的实时状态,包括Broker性能指标、Topic和Partition的健康状况、延迟情况等。 2. **报警机制**:当监控的指标超过预设阈值时,Kafka-Eagle可以触发报警,帮助及时发现并解决问题。 3....
4. **查看与管理Offsets**:Kafkatool允许用户查看每个partition的最小和最大offset,以及consumer group的当前offset。用户还可以手动设置或重置offset,这对于调试和测试是非常有用的。 5. **创建与删除Topic**:...
12. **Kafka Partition**:Kafka中的Partition是消息的分片,Offset是每个Partition内的消息顺序号,通常用`(offset.partition.topic)`来标识一条消息。 13. **Channel和Sink**:在数据流处理中,Channel用于临时...
下面是kafka的原理文档,涵盖了kafka的架构、设计理念、消息模型、 Partition机制、日志策略、消息可靠性机制等方面。 一、kafka架构 kafka的架构主要包括以下几个部分: *Broker:kafka集群中的每个节点称为...
rd_kafka_topic_partition_list_add(topics, "my-topic", RD_KAFKA_PARTITION_UA); int err = rd_kafka_subscribe(consumer, topics); rd_kafka_topic_partition_list_destroy(topics); if (err != RD_KAFKA_RESP_...
3. **多pipeline支持**:每个数据源可以配置多个pipeline,对于Maxwell来说,每个Kafka partition对应一个pipeline;对于Debezium,则每个Kafka topic对应一个pipeline。 #### 五、搭建实例步骤 为了更好地理解...
3. 支持 Kafka Server 间的消息分区,同时保证每个 Partition 内的消息顺序传输。 4. 分布式系统,易于向外扩展。所有的 producer、broker 和 consumer 都会有多个,均为分布式的。 5. 无需停机即可扩展机器。 6. ...
主题可以被分成多个分区(Partition),每个分区可以有多个副本(Replica)以实现数据冗余和高可用性。 【多线程与Kafka】 在"Kafka-java-demo"中,你可能会看到如何利用多线程来并行处理生产或消费任务,提升数据...
* 分区(Partition):Kafka 中的分区机制,用于分布式存储和处理消息。 * 生产者(Producer):Kafka 中的生产者角色,用于发送消息。 * 消费者(Consumer):Kafka 中的消费者角色,用于消费消息。 * Broker:...
3. **Partition**: 为了提高并行处理能力,每个主题被分为多个分区,每个分区在不同的broker上。 4. **Producer**: 生产者负责将消息发送到主题的特定分区。 5. **Consumer Group**: 消费者通过加入消费组来共同消费...
每个主题可以被分成多个分区(Partition),保证了消息的顺序性和高可用性。 2. **分区与副本**:Kafka的分区是其分布式特性的基础。每个分区都有一个主副本(Leader)和零个或多个从副本(Follower)。如果主副本...
Kafka的核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)和分区(Partition)。生产者负责发布消息到主题,消费者则订阅并消费这些消息。主题是逻辑上的分类,可以将消息分为不同的类别。分区是...
Kafka 配置调优实践 Kafka 配置调优实践是指通过调整 Kafka 集群的参数配置来提高其吞吐性能。下面是 Kafka 配置调优实践的知识点总结: 一、存储优化 * 数据目录优先存储到 XFS 文件系统或者 EXT4,避免使用 EXT...