public ListenableFuture<SendResult<K, V>> send(String topic, V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);//此处没有指定partition,也没指定key return doSend(producerRecord); }
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { if (this.producer == null) { synchronized (this) { if (this.producer == null) { this.producer = this.producerFactory.createProducer(); } } }//double check 生成singleton 的producer,createProducer方法用了委派模式,我的理解是spring为了屏蔽kafka底层代码的修改 final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>(); this.producer.send(producerRecord, new Callback() { ........
// first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);//更新metadata,主要是同步topic对应的partition信息 long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs); serializedKey = keySerializer.serialize(record.topic(), record.key()); serializedValue = valueSerializer.serialize(record.topic(), record.value()); int partition = partition(record, serializedKey, serializedValue, metadata.fetch());//分配partition int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);//加上kafka log头,具体可以看kafka的文件存储格式 ensureValidRecordSize(serializedSize);//检查消息大小maxRequestSize、totalMemorySize TopicPartition tp = new TopicPartition(record.topic(), partition); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);//把消息append到一个内存队列,等待发送到server if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//从meta里拿topic对应的partition列表 int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement();//看到这就觉得大事不好,差点被坑 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;//看到这里心情又好了,哈哈,看来不要老是一惊一乍的 } }
Deque<RecordBatch> dq = dequeFor(tp);//每个topicpartition建一个queue,queue保存RecordBatch,里面包含了数据缓冲区、输出流 synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());//消息追加到缓冲区,缓冲区不够时返回null if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); ByteBuffer buffer = free.allocate(size, maxTimeToBlock);//分配新的缓冲区 synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer);//这意思是其他消息创建了新的batch?于是释放新申请的buffer return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);//创建新的MemoryRecords,包含了buffer、输出流 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//创建新的RecordBatch FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); dq.addLast(batch);//加入到dq队尾 incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
结合前面看到的代码,那么dq.size() > 1 || batch.records.isFull()或者是新建了一个RecordBatch的时候,sender会被唤醒。再去看sender,sender在前面创建KafkaProducer时创建,截取一个片段
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now);
一个词,复杂,不爱看,还是看看 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);找个吧,怎么着就ready了呢
boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;//尝试次数大于0,且离上次重试的时间还没超过重试等待时间 long waitedTimeMs = nowMs - batch.lastAttemptMs;//已经等待的时间 long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;//应等待时间 long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);//离下次发送还剩余的时间 boolean full = deque.size() > 1 || batch.records.isFull();//判断是否有BatchRecord满了 boolean expired = waitedTimeMs >= timeToWaitMs;//结合前面的代码,每有batchrecord满了的时候,才唤醒sender,所以有可能已等待时间超过了应等待时间 boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
boolean sendable = full || expired || exhausted || closed || flushInProgress();
