3.2.2.1 bootstrap.servers
同生产者bootstrap.servers参数。
3.2.2.2 group.id
该参数指定的是consumer group的名字,它能够唯一标识一个consumer group。通常设置一个有业务意义的名字就可以了。
3.2.2.3 key.deserializer
consumer代码从broker端获取的任何消息都是字节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式。这个参数就是为消息的key做解序列化的。该参数必须是实现org.apache.kafka.common.serialization.Deserializer接口的Java类的全限定名称。Kafka默认为绝大部分的初始类型(primitive type)提供现成的解序列化器。StringDeserializer会将接收到的字节数组转换成UTF-8编码的字符串。consumer支持自定义的deserializer。不论consumer消费的消息是否指定了key,consumer都必须要设置该参数,否则程序会抛出ConfigException。
3.2.2.4 value.deserializer
与value.deserializer类似,该参数用来对消息体(即消息value)进行解序列化,从而把消息“还原”会原来的对象类型。
3.2.2.5 session.timeout.ms
session.timeout.ms是consumer group检测组内成员发送崩溃的时间。这个参数还有另外一重含义:consumer消息处理逻辑的 最大时间,倘若consumer两次poll之间的间隔超过了该参数所设置的阀值,那么coordinator(消息组协调者)就会认为这个consumer已经追不上组内其他成员的消费进度了,因此会将该consumer踢出组,该consumer负责的分区也会被分配给其他consumer。在最好的情况下,这会导致不必要的rebalance,因为consumer需要重新加入group。更糟的是,对于那些在被踢出group后处理的消息,consumer都无法提交位移,这就意味着这些消息在rebalance之后会被重新消费一遍。如果一条消息或一组消息总是需要花费很长的时间处理,那么consumer甚至无法执行任何消费,除非用户重新调整参数。
0.10.1.0版本对该参数的含义进行了拆分。在该版本及以后的版本中,session.timeout.ms参数被明确为“coordinator检测失败的时间”。实际使用中可以为该参数设置一个较小的值使coordinator能够更快第检查consumer崩溃的情况,从而更快地开启rebalance,避免造成更大的消息滞后(consumer lag),目前该参数的默认值是10秒。
3.2.2.6 max.poll.interval.ms
这个参数就是用来设置消息处理逻辑的最大时间的。通过将该参数设置成稍大于实际的逻辑处理时间再结合较低的session.timeout.ms参数值,consumer group既实现了快速的consumer崩溃检测,也保证了复杂的事件处理逻辑不会造成不必要的rebalance。
3.2.2.7 auto.offset.reset
指定了无位移信息或位移越界(即consumer要消费的消息的位移不在当前消息日志的合理区间范围)时Kafka的应对策略。
目前该参数有如下3个可能的取值:
earliest: 指定从最早的位移开始消费。注意这里最早的位移不一定就是0。
latest: 指定从最新处位移开始消费。
none:指定如果未发现位移信息或位移越界,则抛出异常。该值在真实业务场景中使用甚少。
3.2.2.8 enable.auto.commit
该参数指定consumer是否自动提交位移。若设置为true,则consumer在后台自动提交位移。否则,用户需要手动提交位移。对于有较强“精确处理一次”语义需求的用户来说,最好将该参数设置为false,由用户自行处理位移提交问题。
3.2.2.9 fetch.max.bytes
它指定了consumer端单次获取数据的最大字节数。若实际业务消息很大,则必须要设置该参数为一个较大的值,否则consumer将无法消费这些消息。
3.2.2.10 max.poll.records
该参数控制单次poll调用返回的最大消息数。比较极端的做法是设置该参数为1,那么每次poll只会返回1条消息。如果用户发现consumer端的瓶颈在poll速度太慢,可以适当地增加该参数的值。如果用户的消息处理逻辑很清理,默认的500条消息通常不能满足实际的消息处理速度。
3.2.2.11 heartbeat.interval.ms
当coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROGRESS异常的形式“塞进”consumer心跳请求的response中,这样其他成员拿到response后才能知道它需要重新加入group。显然这个过程越快越好,而heartbeat.interval.ms就是用来做这件事情的。
比较推荐的做法是设置一个比较低的值,让group下的其他consumer成员能够更快地感知新一轮rebalance开启了。注意,该值必须小于session.timeout.ms!毕竟如果consumer在session.timeout.ms这段时间内都不发送心跳,coordinator就会认为它已经dead,因此也就没有必要让它知晓coordinator的决定了。
3.2.2.12 connections.max.idle.ms
Kafka会定期地关闭空闲Socket连接导致下次consumer处理请求时需要重新创建连向broker的Socket连接。当前默认值是9分钟,如果用户实际环境中不在乎这些Socket资源开销,比较推荐设置该参数值为-1,既不要关闭这些空闲连接。
相关推荐
1. **创建Consumer实例**:首先,我们需要配置一个`Properties`对象,设置必要的参数如bootstrap servers、group id等,然后使用`KafkaConsumer`类的构造函数创建消费者实例。 2. **订阅主题**:消费者通过调用`...
创建`KafkaConsumer`实例时,我们需要配置各种参数,例如bootstrap服务器列表、组ID等。这些可以通过`rd_kafka_conf_t`对象来设置: ```cpp rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set(conf,...
《Go-consumergroup:构建基于Golang的Kafka消费者库》 在现代软件开发中,消息队列系统如Apache Kafka扮演着至关重要的角色,它提供了高效、可靠的异步通信能力。而Go语言以其简洁的语法和高性能特性,成为了编写...
【标题】"kafka_hdfs_consumer"涉及到的关键技术是将数据从Kafka消费并存储到HDFS(Hadoop Distributed File System)中。这个过程通常在大数据处理和流处理场景下非常常见,它允许实时或近实时的数据从消息队列流向...
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} import java.util.Properties val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...
KafkaConsumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("myTopic")); // 订阅主题 while (true) { ConsumerRecords, String> records = consumer.poll...
使用场景:生产环境海量数据,用kafka-console-consumer 消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢...
- KafkaConsumer:是创建消费者实例的静态接口。 - Consumer:是消费者结构,处理消费相关的逻辑。 - KafkaConsumerImpl:是消费者实现的具体封装。 librdkafka还提供了消费者回调机制,如ConsumeCb和EventCb,它们...
开发者可以通过配置KafkaProducer和KafkaConsumer的相关参数来启用这些功能。 在实际开发中,还需要注意一些最佳实践,例如合理设置分区数量以平衡负载,优化消费者组的大小和配置,以及定期保存和提交offset以防止...
在本文中,我们将深入探讨如何在Spring Boot应用中使用KafkaConsumer来消费Apache Kafka主题中的消息。Apache Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。Spring Boot简化了Kafka消费者的...
- 使用`@EnableKafka`注解开启Kafka支持,并通过配置文件(如application.properties或application.yml)设定Kafka服务器地址、端口、topic等参数。 2. **Kafka生产者(Producer)**: - 生产者是向Kafka主题...
4. 运行循环,调用`KafkaConsumer.poll()`获取新消息。 **6. 深入理解Kafka特性** 在"Kafka-java-demo"项目中,你可能会遇到一些高级特性,如: - 分区分配策略:决定消息如何在消费者组内的消费者之间分配。 - ...
这里,我们创建了`KafkaConsumer`类,配置了消费组ID和自动重置偏移量的策略。`StartConsuming`方法启动消费过程,当接收到消息时,会调用传入的`handleMessage`回调函数。 现在,我们可以在主程序中创建生产者和...
运行Spring Boot应用,你可以通过调用`KafkaProducer`的`sendMessage`方法发送消息,而`KafkaConsumer`会自动监听并打印接收到的消息。 在实际应用中,你可能需要根据业务需求进行更复杂的配置,如设置消费者组、...
`KafkaConsumer`是Apache Kafka提供的原生消费者API,用于从Kafka主题中消费数据。而`@KafkaListener`是Spring Kafka提供的一个注解,它可以简化消费者的配置,并允许我们以声明式的方式定义监听器方法,使得消费者...
安装完成后,配置 `kafkatool.properties` 文件,设置 Kafka 集群的地址、端口、安全协议等相关参数。 **3. 主题管理** - **创建主题**: 使用 `create-topic` 命令可以快速创建一个新的 Kafka 主题,支持设置分区...
`KafkaProducer`用于创建和发送消息,而`KafkaConsumer`则负责订阅主题并处理消息。开发者还需要熟悉`Properties`配置文件,其中包含了连接Kafka集群的必要参数,如bootstrap servers、acks设置等。 总结来说,这个...
1. **配置初始化**:首先,我们需要创建一个配置对象,设置Kafka集群的地址、消费者组ID、以及其他相关参数。例如: ```rust use rdkafka::config::RDKafkaConfig; let config = RDKafkaConfig::new() .set(...
Spring Kafka 中关于 Kafka 的配置参数详解 在 Spring Kafka 中,配置参数是非常重要的,它直接影响着 Kafka 的性能和稳定性。本文将详细介绍 Spring Kafka 中关于 Kafka 的配置参数,帮助读者更好地理解和使用 ...
本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控与面试知识点,旨在帮助学习者全面理解并掌握这一强大的分布式流处理平台。 **第 1 章 Kafka 概述** Apache Kafka是一款高吞吐量的...