1.问题
batch.size和linger.ms是对kafka producer性能影响比较大的两个参数。batch.size是producer批量发送的基本单位,默认是16384Bytes,即16kB;lingger.ms是sender线程在检查batch是否ready时候,判断有没有过期的参数,默认大小是0ms。
那么producer是按照batch.size大小批量发送消息呢,还是按照linger.ms的时间间隔批量发送消息呢?这里先说结论:其实满足batch.size和ling.ms之一,producer便开始发送消息。
2.源码分析
首先sender线程主要代码如下,我们主要关心sender线程阻塞的情况:
void run(long now) { Cluster cluster = metadata.fetch(); // result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); if (result.unknownLeadersExist) this.metadata.requestUpdate(); Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); // 暂且只关心result.nextReadyCheckDelayMs long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // poll最终会调用selector,pollTimeout也就是selector阻塞的时间 this.client.poll(pollTimeout, now); }
selector
private int select(long ms) throws IOException { if (ms < 0L) throw new IllegalArgumentException("timeout should be >= 0"); if (ms == 0L) return this.nioSelector.selectNow(); else return this.nioSelector.select(ms); }
我们可以从实例化一个新的KafkaProducer开始分析(还没有调用send方法),在sender线程调用accumulator#ready(..)时候,会返回result,其中包含selector可能要阻塞的时间。由于还没有调用send方法,所以Deque<RecordBatch>为空,所以result中包含的nextReadyCheckDelayMs也是最大值,这个时候selector会一直阻塞。
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<Node>(); // 初始化为最大值 long nextReadyCheckDelayMs = Long.MAX_VALUE; boolean unknownLeadersExist = false; boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader) && !muted.contains(part)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; // 和linger.ms有关 long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); }
然后我们调用send方法往内存中放入了一条数据,由于是新建的一个RecordBatch,所以会唤醒sender线程
KafkaProducer#doSend(...)
if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }
这个时候会唤醒阻塞在selector#select(..)的sender线程,sender线程又运行到accumulator#ready(..),由于Deque<RecordBatch>有值,所以返回的result包含的nextReadyCheckDelayMs不再是最大值,而是和linger.ms有关的值。也就是时候selector会z最多阻塞lingger.ms后就返回,然后再次轮询。
也就是说当Deque<RecordBatch>不为空的时候,sender线程会最多阻塞linger.ms时间;Deque<RecordBatch>为空的时候,sender线程会阻塞Long.MAX_VALUE时间;一旦调用了KafkaProduer#send(..)将消息放到内存中,新建了个RecordBatch,则会将sender线wakeup。
另外从上面的代码,即KafkaProducer#doSend(...)中也可以看到,如果有一个RecordBatch满了,也会调用Sender#wakeup(..),所以综上所述:只要满足linger.ms和batch.size满了就会激活sender线程来发送消息。
rel:https://www.cnblogs.com/set-cookie/p/8902340.html
相关推荐
- 参数配置涉及acks、batch.size、linger.ms等,以调整性能和可靠性。 5. **Kafka高级专题**: - **Zookeeper**:Kafka依赖Zookeeper进行集群协调,管理元数据。 - **Replication**:Partition的副本策略确保...
kafka生产者代码-producer的一些配置的介绍(包括bootstrap.servers,acks,batch.size,linger.ms,buffer.memory,key.serialization,value.serialization。。。。。。)
5. **参数解析**:Kafka的配置参数繁多,例如`bootstrap.servers`用于指定连接的broker列表,`linger.ms`决定等待多久批量发送消息。理解并合理设置这些参数,对于优化Kafka集群性能至关重要。 通过"Kafka_Learn....
有许多配置参数可以调整以适应不同的应用场景,如`batch.size`、`linger.ms`、`fetch.min.bytes`等,理解这些参数的含义并根据需求调整至关重要。 9. **测试和监控**: 开发过程中,使用单元测试验证生产者和消费...
7. **性能优化**:介绍如何通过调整配置参数(如batch.size和linger.ms)来提高Kafka的性能。 8. **Kafka Streams**:讲解Kafka的流处理库Kafka Streams,它是如何处理实时数据流的。 9. **Kafka Connect**:解释...
agent1.sinks.kafka-sink1.kafka.producer.linger.ms=1 agent1.sinks.kafka-sink1.kafka.producer.batch.size=1000 agent1.sinks.kafka-sink1.kafka.producer.retries=10 ``` 这些配置指定了Kafka Sink的相关参数,...
- `kafka.producer.linger.ms` 设置延迟时间以优化批量发送。 - `kafka.producer.compression.type` 选择压缩类型,如 snappy。 - `useFlumeEventFormat` 控制是否使用 Flume 的事件格式。 **Storm** 是一个开源的...
4. `linger.ms`: 如果没有达到 batch-size,则等待此时间以合并批次。 5. `buffer-memory`: 生产者缓冲区大小。 **Kafka 消费者配置(Consumer Configuration)**: 1. `bootstrap-servers`: 同生产者配置,指定 ...
生产者可以设置不同的配置,如acks(确认机制)、batch.size(批量发送大小)和linger.ms(等待时间以合并批次)等,这些参数对性能和可靠性有直接影响。 2. **消费者**:消费者则是从Kafka主题中订阅并消费消息的...
6. **性能调优**:探讨如何调整Kafka的配置参数以提高性能,如batch.size、linger.ms等,以及如何监控Kafka的运行状态。 7. **Kafka与Storm集成**:在Storm中,Kafka通常作为数据源,storm-kafka模块提供与Kafka的...
3. **batch.size** 和 **linger.ms**:批量发送消息的大小和延迟时间,用于优化网络传输和提高吞吐量。 4. **key.serializer** 和 **value.serializer**:指定键和值的序列化方式,确保数据能够正确地发送到 Kafka。...
props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache....
props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache....
第 4 章 Kafka API 4.1 Producer API 4.1.1 消息发送流程 Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享...linger.ms:如
6. **linger.ms**: 生产者等待新消息加入批次的时间,以毫秒为单位。增加此值可以提高批处理效率,减少网络传输次数。 7. **client.id**: 用于标识生产者的唯一字符串,有助于监控和故障排查。 8. **max.in.flight...
2. **消息生产端配置 (Producer Configs)**:生产者配置包括acks设置(决定何时确认消息已写入)、batch.size(批量发送消息的大小)和linger.ms(等待新消息以优化批处理的时间)等。 3. **消费端配置 (Consumer ...
- `linger.ms`:设置生产者等待新消息加入批次的时间,默认为0,表示立即发送。增加此值可提高批次利用率,减少网络请求。 - `buffer.memory`:定义生产者用于缓存未发送消息的内存大小,应根据系统资源和预期吞吐...
- **Producer配置**:包括batch.size、linger.ms、acks等,影响生产和发送效率。 - **Consumer配置**:group.id、auto.offset.reset、enable.auto.commit等,影响消费行为。 4. **消息模型**: - **At-Least-...
- **`linger.ms`**:等待额外消息的时间,单位是毫秒。 - **`buffer.memory`**:生产者使用的总内存缓冲区大小。 - **`key.serializer` 和 `value.serializer`**:指定键和值的序列化方式。 #### 生产者生产 完成...