`
gaojingsong
  • 浏览: 1201230 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【Kafka配置参数--Consumer详解】

阅读更多

#############################Consumer #############################

# Consumer端核心的配置是group.id、zookeeper.connect

# 决定该Consumer归属的唯一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group.

group.id

# 消费者的ID,若是没有设置的话,会自增

consumer.id

# 一个用于跟踪调查的ID ,最好同group.id相同

client.id = <group_id>

 

# 对于zookeeper集群的指定,必须和broker使用同样的zk配置

zookeeper.connect=debugo01:2182,debugo02:2182,debugo03:2182

# zookeeper的心跳超时时间,查过这个时间就认为是无效的消费者

zookeeper.session.timeout.ms = 6000

# zookeeper的等待连接时间

zookeeper.connection.timeout.ms = 6000

# zookeeper的follower同leader的同步时间

zookeeper.sync.time.ms = 2000

# 当zookeeper中没有初始的offset时,或者超出offset上限时的处理方式 。

# smallest :重置为最小值 

# largest:重置为最大值 

# anything else:抛出异常给consumer

auto.offset.reset = largest

 

# socket的超时时间,实际的超时时间为max.fetch.wait + socket.timeout.ms.

socket.timeout.ms= 30 * 1000

# socket的接收缓存空间大小

socket.receive.buffer.bytes=64 * 1024

#从每个分区fetch的消息大小限制

fetch.message.max.bytes = 1024 * 1024

 

# true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset

auto.commit.enable = true

# 自动提交的时间间隔

auto.commit.interval.ms = 60 * 1000

 

# 用于消费的最大数量的消息块缓冲大小,每个块可以等同于fetch.message.max.bytes中数值

queued.max.message.chunks = 10

 

# 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数

rebalance.max.retries = 4

# 每次reblance的时间间隔

rebalance.backoff.ms = 2000

# 每次重新选举leader的时间

refresh.leader.backoff.ms

 

# server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。

fetch.min.bytes = 1

# 若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间

fetch.wait.max.ms = 100

# 如果指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限

consumer.timeout.ms = -1

 

 

 

------------------------------------Kafka配置参数--Consumer详解-----------------------------

 

 

group.id 默认值:无

唯一的指明了consumer的group的名字,group名一样的进程属于同一个consumer group。

 

zookeeper.connect 默认值:无

指定了ZooKeeper的connect string,以hostname:port的形式,hostname和port就是ZooKeeper集群各个节点的hostname和port。 ZooKeeper集群中的某个节点可能会挂掉,所以可以指定多个节点的connect string。如下所式:

hostname1:port1,hostname2:port2,hostname3:port3.

 

ZooKeeper也可以允许你指定一个"chroot"的路径,可以让Kafka集群将需要存储在ZooKeeper的数据存储到指定的路径下这可以让多个Kafka集群或其他应用程序公用同一个ZooKeeper集群。可以使用如下的connect string:

hostname1:port1,hostname2:port2,hostname3:port3/chroot/path

 

consumer.id 默认值:null

如果没有设置的话则自动生成。

 

socket.timeout.ms 默认值:30 * 1000

socket请求的超时时间。实际的超时时间为max.fetch.wait + socket.timeout.ms。

 

socket.receive.buffer.bytes 默认值:64 * 1024

socket的receiver buffer的字节大小。

 

fetch.message.max.bytes 默认值:1024 * 1024

每一个获取某个topic的某个partition的请求,得到最大的字节数,每一个partition的要被读取的数据会加载入内存,所以这可以帮助控制consumer使用的内存。这个值的设置不能小于在server端设置的最大消息的字节数,否则producer可能会发送大于consumer可以获取的字节数限制的消息。

 

auto.commit.enable 默认值:true

如果设为true,consumer会定时向ZooKeeper发送已经获取到的消息的offset。当consumer进程挂掉时,已经提交的offset可以继续使用,让新的consumer继续工作。

 

auto.commit.interval.ms 默认值:60 * 1000

consumer向ZooKeeper发送offset的时间间隔。

 

queued.max.message.chunks 默认值:10

缓存用来消费的消息的chunk的最大数量,每一个chunk最大可以达到fetch.message.max.bytes。

 

rebalance.max.retries 默认值:4

当一个新的consumer加入一个consumer group时,会有一个rebalance的操作,导致每一个consumer和partition的关系重新分配。如果这个重分配失败的话,会进行重试,此配置就代表最大的重试次数。

 

fetch.min.bytes 默认值:1

一个fetch请求最少要返回多少字节的数据,如果数据量比这个配置少,则会等待,知道有足够的数据为止。

 

fetch.wait.max.ms 默认值:100

在server回应fetch请求前,如果消息不足,就是说小于fetch.min.bytes时,server最多阻塞的时间。如果超时,消息将立即发送给consumer.。

 

rebalance.backoff.ms 默认值:2000

在rebalance重试时的backoff时间。

 

refresh.leader.backoff.ms 默认值:200

在consumer发现失去某个partition的leader后,在leader选出来前的等待的backoff时间。

 

auto.offset.reset 默认值:largest

在Consumer在ZooKeeper中发现没有初始的offset时或者发现offset不在范围呢,该怎么做:

* smallest : 自动把offset设为最小的offset。

* largest : 自动把offset设为最大的offset。

* anything else: 抛出异常。

 

consumer.timeout.ms 默认值:-1

如果在指定的时间间隔后,没有发现可用的消息可消费,则抛出一个timeout异常。

 

client.id 默认值: group id value

每一个请求中用户自定义的client id,可帮助追踪调用情况。

 

zookeeper.session.timeout.ms 默认值:6000

ZooKeeper的session的超时时间,如果在这段时间内没有收到ZK的心跳,则会被认为该Kafka server挂掉了。如果把这个值设置得过低可能被误认为挂掉,如果设置得过高,如果真的挂了,则需要很长时间才能被server得知。

 

zookeeper.connection.timeout.ms 默认值:6000

client连接到ZK server的超时时间。

 

zookeeper.sync.time.ms 默认值:2000

一个ZK follower能落后leader多久。

0
2
分享到:
评论

相关推荐

    kafka_2.11-2.2.2.tgz

    《Kafka在Linux环境下的安装与使用详解》 Apache Kafka是一款高性能、分布式的消息中间件,广泛应用于大数据实时处理、日志收集、流式数据处理等领域。本文将详细讲解如何在Linux环境下安装并使用Kafka 2.2.2版本。...

    kafka_2.12-1.0.0.zip

    3. 启动Kafka服务器,设置相关配置参数。 4. 创建主题,如`bin/kafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --if-not-exists`. 5. 使用producer和consumer工具进行消息生产和消费...

    最新版linux kafka_2.12-2.6.1.tgz

    **Linux上的Kafka 2.12-2.6.1:高级消息队列系统详解** Kafka是一款广泛应用于大数据处理、实时流处理和微服务架构中的分布式消息中间件。这款由LinkedIn开发并贡献给Apache基金会的开源项目,以其高效、可扩展性和...

    kafka_2.11-2.2.0.tgz

    在实际使用中,用户需要配置Kafka的broker、producer和consumer参数,例如设置broker的端口、数据存储路径、消费者的offset管理策略等。对于Kafka_2.11-2.2.0版本,用户可以参考官方文档,了解详细的配置选项和最佳...

    1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试

    Kafka 的部署涉及配置服务器参数,启动服务,以及设置安全性和网络通信。对于初学者,理解 Kafka 的基本概念,如主题(Topics)、分区(Partitions)和副本(Replicas),以及如何通过 Shell 命令进行管理和监控,是...

    kafka_2.13-3.2.1.zip

    4. **配置Kafka**:修改`config/server.properties`配置文件,设置broker的ID、端口、日志路径等参数。 5. **启动Kafka**:使用`bin/kafka-server-start.sh config/server.properties`命令启动Kafka服务器。 6. **...

    kafka_2.12-1.1.0.tgz

    《Kafka 2.12-1.1.0在Windows上的安装与使用详解》 Apache Kafka是一款分布式流处理平台,被广泛应用于大数据实时处理、日志聚合、消息队列等多个场景。本文将详细介绍如何在Windows系统上安装并使用Kafka 2.12-...

    kafka_2.11-0.11.0.3.zip

    《Kafka 0.11.0.3在CentOS 7.0系统中的安装与使用详解》 Apache Kafka是一款高性能、分布式的消息中间件,它主要用于处理实时数据流。Kafka_2.11-0.11.0.3是针对Java 2.11版本的一个发行版,适用于Linux环境,特别...

    kafka-manager-2.0.0.2.zip 已经编译

    5. 配置管理:动态修改Kafka的配置参数。 二、解压与配置 首先,你需要将下载的kafka-manager-2.0.0.2.zip解压到一个合适的目录,例如 `/opt/apps/kafka-manager`。解压后,你会看到一个名为`kafka-manager-2.0....

    kafka-manager-1.3.3.16编译好的zip包

    4. **配置管理**:允许修改Kafka的配置参数,方便集群的动态调整。 5. **安全控制**:支持Kerberos认证,确保数据传输的安全性。 在编译完成后,压缩包中的文件包括了Kafka-Manager的所有源代码、配置文件、静态...

    Python库 | kafka-python-1.3.4.tar.gz

    2. 配置参数的设置对于性能和稳定性至关重要,如设置适当的批处理大小、超时时间等。 3. 考虑到版本兼容性,使用kafka-python时需确保其与运行的Kafka集群版本相匹配。 总结,kafka-python-1.3.4作为Python与Kafka...

    编译后kafka-manager-2.0.0.0.rar

    在实际使用时,解压缩"编译后kafka-manager-2.0.0.0"文件后,你需要配置`conf/application.conf`文件,其中主要涉及Zookeeper的连接信息、Kafka-Manager自身的运行参数等。部署完成后,启动服务,即可通过浏览器访问...

    kafka_2.10-0.10.1.0

    《Kafka 2.10-0.10.1.0在Windows环境下的应用与配置详解》 Kafka是一款高性能、分布式的消息中间件,它主要用于处理实时数据流。在这个版本,即Kafka 2.10-0.10.1.0,主要面向的是Java 2.10 SDK,提供了稳定且高效...

    Kafka Manager cmak-3.0.0.4 最新版本

    - 集群配置:允许用户查看和修改Kafka集群的配置参数,以适应不同的业务需求。 - Topic管理:创建、删除、修改Topic,以及查看Topic的详细信息,如Replication Factor、Partition数等。 - Group管理:监控...

    kafka_2.10-0.10.2.0.tgz

    - **配置参数**:包括broker设置、日志管理、网络连接等,可以根据实际需求进行调整。 5. **使用场景**: - **实时数据管道**:作为数据流转的桥梁,将数据从源头传输到处理系统。 - **流处理**:通过Kafka ...

    kafka_2.11-1.0.0.tgz

    3. **配置Kafka**:修改`config/server.properties`文件,配置broker id、端口、日志存储路径等关键参数。 4. **启动Zookeeper**:Kafka依赖Zookeeper,所以首先需要启动Zookeeper服务。 5. **启动Kafka**:运行`bin...

    kafka_2.11-0.10.1.0及使用说明

    **Kafka 2.11-0.10.1.0详解及使用指南** Apache Kafka 是一个开源的分布式流处理平台,由LinkedIn开发并捐赠给Apache软件基金会。它最初设计为一个高吞吐量、低延迟的消息队列系统,但随着时间的发展,Kafka已经...

    kafka-manager-2.0.0.2.7z

    6. **配置调整**:允许在线调整Kafka的配置参数,无需停机即可完成更新。 三、安装与使用 “kafka-manager-2.0.0.2”这个版本提供了稳定性和性能的提升。安装步骤通常包括以下几步: 1. **下载与解压**:首先,从...

    kafka配置安装详解

    ### Kafka配置安装详解 #### 一、环境搭建与配置 Kafka是一款开源的消息队列中间件,被广泛应用于大数据处理领域。本篇文章将详细介绍如何在本地环境中安装并配置Kafka,以及进行基本的操作演示。 ##### 环境要求...

    Python库 | confluent_kafka-1.5.0-cp36-cp36m-win_amd64.whl

    通过`ConfluentKafka.Producer`类初始化对象,配置相关参数,如Bootstrap Servers(Kafka集群地址)和错误处理回调函数。然后,可以使用`produce()`方法将消息发送到指定的主题。例如: ```python from confluent_...

Global site tag (gtag.js) - Google Analytics