1 bootstrap.severs
该参数指定一组host:post对,用于创建向Kafka broker服务器的连接,比如k1:9092,k2:9092,k3:9092。如果Kafka集群中机器数很多,那么只需要指定部分broker即可,不需要列出所有的机器。因为不管指定几台机器,producer都会通过该参数找到并发现集群中所有的broker。为该参数指定多台机器只是为了故障转移使用。这样即使某一台broker挂掉了,producer重启后依然可以通过该参数指定的其他broker连入Kafka集群。
另外,如果broker端没有显示配置listeners使用的IP地址,那么最好将该参数也配置成主机名,而不是IP地址。因为Kafka内部使用的就是FQDN(Fully Qualified Domain Name)。
2 key.serializer
被发送到broker端的任何消息的格式都必须是字节数组,因此消息的各个组件必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化用的。这个参数指定的是实现了org.apache.kafka.common.serialization.Serializer接口的全限定名称。Kafka为大部分的初始类型(primitive type)默认提供了现成的序列化器。用户可以自定义序列化器,只要实现Serializer接口即可。
需要注意的是,即使producer程序在发送消息时不指定key,这个参数也是必须要设置的,否则程序会抛出ConfigException异常,提示“key.serializer”参数无默认值,必须要配置。
3 value.serializer
和key.serializer类似,只是它被用来对消息体(即消息的value)部分做序列化,将消息value部分转换成字节数组。
注意,这两个参数都必须是全限定名,只使用单独的类名是不行的。
4 acks
acks参数用于控制producer生产消息的持久性(durability)。对于producer而言,Kafka在乎的是“已提交”消息的持久性。一旦消息被成功提交,那么只要有任何一个保存了该消息的副本“存活”,这条消息就会被视为“不会丢失的”。
具体来说,当producer发送一条消息给Kafka集群时,这条消息会被发送到指定topic分区leader所在的broker上,producer等待从该leader broker返回消息的写入结果(当然并不是无限等待,是有超时时间的)以确定消息被成功提交。这一切完成后producer可以继续发送新的消息。Kafka能够保证的是consumer永远不会读取到尚未提交完成的消息。
acks指定了在给producer发送响应前,leader broker必须要确保已成功写入该消息的副本数。当前acks有3个取值:0、1和all。
acks=0:设置成0表示producer完全不理睬leader broker端的处理结果。此时,producer发送消息后立即开启下一条消息的发送,根本不等待leader broker端返回结果。由于不接收发送结果,因此在这种情况下producer.send的回调也就完全失去了作用,即用户无法通过回调机制感知任何发送过程中的失败,所以acks=0时producer并不保证消息会被发送成功。但凡是有利有弊,由于不需要等待响应结果,通常这种设置下producer的吞吐量是最高的。
acks=all或者-1:表示当发送消息时,leader broker不仅会将消息写入本地日志,同时还会等待ISR中所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给producer。显然当设置acks=all时,只要ISR中至少有一个副本是处于“存活”状态的,那么这条消息就肯定不会丢失,因而可以达到很高的消息持久性,但通常这种设置下producer的吞吐量也是最低的。
acks=1:是0和all折中的方案,也是默认的参数值。producer发送消息后leader broker仅将该消息写入本地日志,然后便发送响应结果给producer,而无须等待ISR中其他副本写入该消息。那么此时只要该leader broker一直存活,Kafka就能保证这条消息不丢失。这实际上是一种折中方案,既可以达到适当的消息持久性,同时也保证了producer端的吞吐量。
acks参数取值说明:
acks |
producer吞吐量 |
消息持久性 |
使用场景 |
0 |
最高 |
最差 |
<!--[if !supportLists]-->1. <!--[endif]-->完全不关心消息是否发送成功 <!--[if !supportLists]-->2. <!--[endif]-->允许消息丢失(比如统计服务器日志等) |
1 |
适中 |
适中 |
一般场景即可 |
all或-1 |
最差 |
最高 |
不能容忍消息丢失 |
在程序中设置方式如下:
props.put("acks", "-1"); // 或者 props.put(ProducerConfig.ACKS_CONFIG, "-1"); |
5 buffer.memory
该参数指定了producer端用于缓存消息的缓冲区大小,单位是字节,默认值是33554432,即32MB。由于采用了异步发送消息的设计架构,Java版本producer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由另一个专属线程负责从缓冲区中读取消息执行真正的发送。这部分内存空间的大小即是由buffer.memory参数指定的。若producer向缓冲区写消息的速度超过了专属I/O线程发送消息的速度,那么必然造成该缓冲区空间的不断增大。此时producer会停止手头的工作等待I/O线程追上来,若一段时间之后I/O线程还是无法追上producer的进度,那么producer就会抛出异常并期望用户介入进行处理。
虽说producer在工作过程中会用到很多部分的内存,但我们几乎可以认为该参数指定的内存大小就是producer程序使用的内存大小。若producer程序要给很多分区发送消息,那么就需要仔细地设置这个参数以防止过小的内存缓冲区降低了producer程序整体的吞吐量。
在程序中设置方式如下:
props.put("buffer.memory", 33554432); // 或者 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); |
6 compression.type
compression.type参数设置producer端是否压缩消息,默认值是none,即不压缩消息。Kafka的producer端引入压缩后可以显著地降低网络I/O传输开销从而提升整体的吞吐量,但也会增加producer端机器的CPU开销。另外,如果broker端的压缩参数设置的与producer不同,broker端在写入消息时也会额外使用CPU资源对消息进行对应的解压缩-重压缩操作。
目前Kafka支持3中压缩算法:GZIP、Snappy和LZ4。根据实际使用经验(1.0.0版本)来看producer结合LZ4的性能是最好的。
在程序中设置方式如下:
props.put("compression.type","lz4"); // 或者 props.put(ProducerConfig.COMPRESSION_TYPE _CONFIG, "lz4"); |
7 retries
该参数表示进行重试的次数,默认值为0,表示不进行重试。在实际使用过程中,设置重试可以很好地应对哪些瞬时错误,因此推荐用户设置该参数为一个大于0的值。只不过在考虑retries的设置时,有两点需要着重注意。
- 重试可能造成消息的重复发送
比如由于瞬时的网络抖动使得broker端已成功写入消息但没有成功发送响应给producer,因此producer会认为消息发送失败,从而开启重试机制。为了应对这一风险,Kafka要求用户在consumer端必须执行去重处理。另0.11.0.0版本开始支持“精确一次”处理语义。
- 重试可能造成消息乱序
当前producer会将多个消息发送请求(默认是5个)缓存再内存中,如果由于某种原因发生了消息发送重试,就可能造成消息流的乱序。为了避免乱序发生,Java版本producer提供了max.in.flight.request.per.connection参数。一旦用户将此参数设置成1,producer将确保某一时刻只能发送一个请求。
另外,producer两次重试之间会停顿一段时间,以防止频繁地重试对系统带来冲击。这段时间是可以配置的,由参数retry.backoff.ms指定,默认是100毫秒。由于leader“换届选举”是最常见的瞬时错误,笔者推荐用户通过测试来计算平均leader选举时间并根据该时间来设定retries和retry.backoff.ms的值
在程序中设置方式如下:
props.put("retries",100); // 或者 props.put(ProducerConfig.RETRIES _CONFIG, 100); |
8 batch.size
batch.size是producer最重要的参数之一,它对于调优producer吞吐量和延时性能指标都有着非常重要的作用。producer会将发往同一分区的多条消息封装进一个batch中。当batch满了的时候,producer会发送batch中的所有消息。不过,producer并不总是等待batch满了才发送消息,很有可能当batch还有很多空闲空间时producer就发送该batch。
通常来说,一个小的batch中包含的消息数很少,因而一次发送请求能够写入的消息数也很少,所以producer的吞吐量会很低;但若一个batch非常之巨大,那么会给内存使用带来极大的压力,因为不管是否能够填满,producer都会为该batch分配固定大小的内存。因此batch.size参数的设置其实是一种时间与空间权衡的体现。
batch.size参数默认值是16384,即16KB。这其实是一个非常保守的数字。在实际使用过程中合理地增加该参数值,通常都会发现producer的吞吐量得到了相应的增加。
在程序中设置方式如下:
props.put("batch.size",16384); // 或者 props.put(ProducerConfig.BATCH_SIZE _CONFIG, 16384); |
9 inger.ms
linger.ms参数就是控制消息发送延时行为的。该参数默认值是0,表示消息需要被立即发送,无须关心batch是否被填满,大多数情况下这是合理的,毕竟我们总是希望消息被尽可能快地发送。不过这样做会拉低producer吞吐量,毕竟producer发送的每次请求中包含的消息数越多,producer就越能将发送请求的开销摊薄到更多的消息上,从而提升吞吐量。
在程序中设置方式如下:
props.put("linger.ms",100); // 或者 props.put(ProducerConfig.LINGER_MS _CONFIG, 100); |
10 max.request.size
该参数用于控制producer发送请求的大小。实际上该参数控制的是producer端能够发送的最大消息的大小。由于请求有一些头部数据结构,因此包含一条消息的请求大小要比消息本身大。不过姑且把它当做请求的最大尺寸是安全的。如果producer要发送尺寸很大的消息,那么这个参数就是要被设置的。默认的1048576字节大小了,通常无法满足企业级消息的大小要求。
在程序中设置方式如下:
props.put("max.request.size",10485760); // 或者 props.put(ProducerConfig.MAX_REQUEST_SIZE _CONFIG, 10485760); |
11 request.timeout.ms
当producer发送请求给broker后,broker需要在规定的时间范围内将处理结果返还给producer。这段时间便由该参数控制的,默认是30秒。这就是说,如果broker在30秒内都没有给producer发送响应,那么producer就会认为该请求超时了,并在回调函数中显示地抛出TimeoutException异常交由用户处理。
默认的30秒对于一般的情况而言是足够的,但如果producer发送的负载很大,超时的情况就很容易碰到,此时就应该适当调整该参数值。
在程序中设置方式如下:
props.put("request.timeout.ms",60000); // 或者 props.put(ProducerConfig.MAX_REQUEST_TIMEOUT_MS_CONFIG, 60000); |
相关推荐
在Kafka中,Producer(生产者)负责将数据流发送到Kafka集群中。生产者的配置参数非常关键,因为它们会直接影响到生产者的行为和性能。以下是根据提供的文件内容整理的Kafka生产者配置参数的知识点: 1. bootstrap....
`config1.json`可能是用于配置连接器到Kafka集群的参数,如bootstrap servers、主题名、序列化方式等。确保正确配置这些参数是连接成功的关键。 在实际测试过程中,你可能会使用`src`目录下的源代码来定制特定的...
总结来说,`kafka_producer.zip`提供的资源展示了如何使用Python和`kafka-python`库创建一个简单的Kafka生产者,该生产者能够从配置文件读取参数,向指定的Kafka主题发送数据。理解这个过程对于任何需要在Python环境...
Kafka 配置调优实践是指通过调整 Kafka 集群的参数配置来提高其吞吐性能。下面是 Kafka 配置调优实践的知识点总结: 一、存储优化 * 数据目录优先存储到 XFS 文件系统或者 EXT4,避免使用 EXT3。 * 在挂载块设备时...
在项目“kafka_producer.sln”中,我们可以看到一个Visual Studio解决方案,包含了Kafka生产者的实现。通常,这个解决方案会包含生产者类的定义,以及与protobuf和avro相关的代码。例如,生产者类可能包含一个方法,...
- 使用`@EnableKafka`注解开启Kafka支持,并通过配置文件(如application.properties或application.yml)设定Kafka服务器地址、端口、topic等参数。 2. **Kafka生产者(Producer)**: - 生产者是向Kafka主题...
在分布式消息系统中,Kafka是一个广泛使用的高吞吐量、低延迟的开源消息队列。为了优化性能和提高效率,开发人员常常会利用连接池技术来管理Kafka生产者的连接。本文将深入探讨"Kafka生产者连接池"的概念、实现原理...
title: Kafka6# Producer重试参数retries设置取舍retries参数说明参数的设置通常是一种取舍,看下retries参数在版本0.11
配置参数对于优化 Kafka 生产者和消费者的性能至关重要。以下是一些关键的 Kafka 生产者配置参数的详细说明: 1. **acks**: 这个参数决定了生产者等待多少个副本确认消息接收。`acks=0` 表示不等待任何确认,`acks=...
`simple-kafka-producer-pool` 是一个基于 Java 开发的库,用于管理 Kafka 生产者的集合,以提高效率和性能。这个库的设计目标是为应用程序提供一个简单的方法来创建和管理多个 Kafka 生产者的实例,形成一个池,...
Kafka 主要用于构建实时数据管道和流应用,其特点包括高吞吐量、容错性、持久化以及支持大规模的数据处理。 首先,让我们了解 Kafka 的核心概念: 1. **生产者(Producer)**: 生产者是向 Kafka 集群发布消息的应用...
kafka的3554版本中的kafka-producer-perf-test.sh增添了 --num-thread --value-bound 两个参数,并且可以打印print-metrics 下载本资源后,读introduction.txt文件。 没有积分的同学,可以访问 ...
使用方式kafka-producer使用者,需要通过KafkaProducerBuilder去构建一个KafkaProducer实力,使用方式和Guava的Cache非常相似,指定好参数进行构建(关于参数,请参考"KafkaProducerBuilder参数说明")。为了保证...
本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控与面试知识点,旨在帮助学习者全面理解并掌握这一强大的分布式流处理平台。 **第 1 章 Kafka 概述** Apache Kafka是一款高吞吐量的...
8. **Kafka配置参数**: 有许多配置参数可以调整以适应不同的应用场景,如`batch.size`、`linger.ms`、`fetch.min.bytes`等,理解这些参数的含义并根据需求调整至关重要。 9. **测试和监控**: 开发过程中,使用...
Kafka Producer 配置中的一些重要参数包括: - **metadata.broker.list**: 用于指定 Broker 和分区的静态信息。可以只提供部分 Broker 的信息,Kafka 会自动发现其他 Broker。 - **topic.metadata.refresh.interval...
然后,根据`application.conf`或`Config.scala`中的配置,调整连接参数以指向你的阿里云Kafka实例。最后,通过命令行执行主程序,启动Producer和Consumer,观察其运行效果。 通过这个Demo,开发者不仅可以学习到...
2. **性能优化**:根据实际需求调整`KafkaSpout`的批处理大小、重试间隔和消费者组大小等参数,以优化性能。 3. **数据一致性**:理解并正确处理Kafka的分区和offset管理,确保数据处理的准确性和顺序性。 4. **监控...
在Kafka中,你可以使用命令行工具`kafka-topics.sh`来创建Topic,指定其分区数量、副本数量等参数。例如,`--create --topic my-topic --partitions 3 --replication-factor 2`将创建一个名为“my-topic”的Topic,...