kafka producer客户端
1.等待kafka要发送的topic的partition都在线 2.序列化key,value; key:org.apache.kafka.common.serialization.IntegerSerializer value:org.apache.kafka.common.serialization.StringSerializer 3.根据发送数据计算索要发送的topic的partition 使用record记录中的partition,若为空,用paritition类计算 partition:org.apache.kafka.clients.producer.internals.DefaultPartitioner 4.确保所要发送的信息的序列化大小不超过阈值 阈值:MAX_REQUEST_SIZE_CONFIG = "max.request.size" BUFFER_MEMORY_CONFIG = "buffer.memory" 5.实例化topic的partition,实例化发送对象result,添加accumulator中的topic队列中 封装为writable records,包含compresor压缩,再封装为batch 压缩参数:COMPRESSION_TYPE_CONFIG = "compression.type"; 6.查看result的batch是否或是新建的,则唤醒sener发送消息 7.返回result的future
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { try { // @1 first make sure the metadata for the topic is available waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); byte[] serializedKey; try { // @2 序列化key serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try { //序列化value serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } //@3 计算partition int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize);//@4 确保发送请求不超过阈值 TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); //@5 发送封装好的对象 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); if (result.batchIsFull || result.newBatchCreated) {//@6 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;//@7 // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); throw e; } }
如果没有,则封装writable records,包含compresor压缩
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); try { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch Deque<RecordBatch> dq = dequeFor(tp); synchronized (dq) {//添加到已经存在的topic队列中 RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); //重复判断其他producer有没有放到dp中 RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } //没有当前topic的数据 //封装writable records,包含compresor压缩 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//封装为recordBatch FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } }
public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { if (!this.records.hasRoomFor(key, value)) {//mem溢出 return null; } else { this.records.append(0L, key, value);//添加到record中,又进行压缩操作 this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
The record set is full The record set has sat in the accumulator for at least lingerMs milliseconds The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready). The accumulator has been closed
3. 生成request中
5.client selector nio发送数据
public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send 获得数据的leader RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); //only for debug test // if(batches.size()>=1){ // System.out.println(batches.size()); // } sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now);//生成request // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now);//nio 发送数据 }
