`
QING____
  • 浏览: 2250601 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Kafka Broker配置(1.1.0)

 
阅读更多

    kafka目前已升级到1.1.0版本,Broker的在线upgrade比较麻烦,所以我建议大家新项目直接基于最新版本(1.1.0),旧版本不同程度存在一些bug,比较难以解决。

 

    1、broker配置样例(未声明的配置项,建议保持默认):server.properties

 

############################# Server Basics #############################

# brokerId,同一集群id必须唯一,Integer类型
broker.id=0
auto.create.topics.enable=true
#允许客户端直接删除,admin工具可以。
delete.topic.enable=true
auto.leader.rebalance.enable=true
# 单条消息最大尺寸,10M
message.max.bytes=10000120
replica.fetch.max.bytes=10485760
# ISR,当Producer Ack模式为“all”时必须等待ISR列表中replicas全部写入成功(否则抛出异常,无法继续write);
# 此值适用于repicas 可靠性担保。
# 如果replicas为3,此值可以设置为2 + ACK="all"可以极高的确保消息可靠性。
min.insync.replicas=2

# 默认replication个数,broker=3,replication=2
default.replication.factor=2
############################# Socket Server Settings #############################
listeners=PLAINTEXT://10.0.34.121:9092

# 接收客户端请求的IO线程数,建议保持默认
num.network.threads=3

# 处理请求包括磁盘IO的线程数,建议保持默认,调优方式受制于磁盘并发能力、CPU个数等。
num.io.threads=8

# SO_SNDBUF
socket.send.buffer.bytes=102400
# SO_RCVBUF
socket.receive.buffer.bytes=102400

# 单次请求,Server允许接收的最大数据量,避免OOM。默认值为100M(104857600)
socket.request.max.bytes=104857600

# 亟待处理的请求个数,超过阈值将会阻塞网络IO线程(类似于backlog)
queued.max.requests=500

# 客户端等待broker响应最大等待时间(包括重试),Client可配置。默认30000秒。
request.timeout.ms=15000

# 单个IP允许建立的连接数,默认为Integer最大值,此处修正为256
max.connections.per.ip=256

############################# Log Basics #############################
# 底层日志文件保存的文件路径,多个磁盘路径则以“,”分割,多磁盘可以提高IO并发能力。
log.dirs=/data/kafka

# Topic默认的partition个数,此值越大理论上可以提高并发消费能力,不同的partitions将会分布在多个broker上。
# 可以通过AdminClient修改指定Topic的partition数量。
# partition个数,通常不建议大于Broker节点的个数。
num.partitions=1

# 在logs回复或者关闭刷盘时,每个dir所使用的IO线程个数。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# 目前已知kafka内部状态数据将保存在特定的topic中,
# 比如offsets(由以前的kafka迁移到内部topic中)保存在“__consumer_offsets,
# 每个producer的事务,保存在“__transaction_state”。
# 对于内部topic,其replicas个数,建议为“大多数派”,且最多不要超过3。
# 比如:
# broker<3,replicas=1;
# brokers = 3,replicas=2;
# broker>=5,replicas=3;
# 线上配置,broker=3;事务功能暂时关闭
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# 日志刷盘策略,我们有两个重要参数可以可以权衡,分别适用在“吞吐量优先”、“数据可靠性优先”。
# 需要明确表达,如果你对数据可靠性有一定的要求,那么尚需要开启replication来保障,仅仅依赖刷盘尚且无法完全意义上确保数据的安全性。
# 如果你对系统确实是“吞吐量优先”,则可以关闭replicaiton(=1)、同时启用“按消息量”刷怕策略。

# 如下两个参数满足其一,即可Flush
# 吞吐量优先,默认为关闭(Long最大值)
log.flush.interval.messages=10000

# 可靠性 + 延迟优先,每个1秒刷盘,通用高优策略。
log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# 日志保留策略,存储日志可以根据“保留时间长”、“存量数据大小”两个维度,对于超出阈值的日志将会被清理,以确保存储层面更加可控。
log.cleanup.policy=delete
log.cleaner.enable=true
# 满足retention条件之外的“文件”,保留多久则会被cleanup。默认值为12小时。
log.cleaner.delete.retention.ms=43200000

# 日志保留 7 天:保留时长,取决于每天数据量、消费者回溯需求等。
log.retention.hours=168
## 350G
log.retention.bytes=375809638400

# 底层为LSM树,每个逻辑topic、partition都会有多个segments文件序列构成,每个segments大小,默认为1G。
# 此值建议保持默认,过小、过大都有一定的影响。
log.segment.bytes=1073741824

# 检测周期,5分钟
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
# 可以包含chroot,比如:host:ip/kafka
zookeeper.connect=10.0.11.107:2181,10.0.12.107:2181,10.0.11.108:2181/kafka
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################
# 分组协调 (broker端)
# rebalance通常对于“单组 + 多个消费者”场景,此值适用于“消费者入组后,首次relance之前等待的时长”
# (消费者启动可能逐个进行,等待足够多的consumers入组后进行rebalance)
group.initial.rebalance.delay.ms=3000
group.max.session.timeout.ms=300000
group.min.session.timeout.ms=6000

 

    1)broker.id:如果你手动声明,则整个集群不能重复,数字类型,不能为负数。

    2)min.insync.replicas:即ISR的个数,这个是数据可靠性的保障策略之一,配合Producer端“acks”参数。

        如果你的数据可靠性要求低于producer性能要求,可以设置为1。

        如果可靠性要求较高,建议设置为>=2的值,此值应该应该大于broker节点个数的“一半”;通常我们权衡的策略为,如果broker个数N == 3,此值设置为2;N >= 5,设置为3即可。过大的值,将极其影响producer效率,增加producer响应延迟,而且并不能获得更高的可靠性收益。

    3)日志刷盘策略:kafka的数据保存在本地一序列segement文件中(包括用于提高查询效率的index文件),消息提交给broker之后,最终写入磁盘才能确保数据不丢失;kafka提供了“基于时间间隔”、“基于累计消息条数”两种同步刷盘策略。

     基于时间间隔:每隔X毫秒,执行一次文件的fsync操作。基于累计消息条数:每X条消息后,同步执行一次fsync操作。

     如果你关注“吞吐量优先”,比如一些普通的日志采集、监控消息等业务类型,你完全关闭“基于时间间隔”策略(即注释点log.flush.interval.ms),此时将采用文件系统默认刷盘机制;此外你还需要适度调整IRS个数(包括replication的个数)。

     如果你关注“数据可靠性优先”,除了合适的IRS个数、以及producer端合理的“acks”时机,我们还应该权衡这两种刷盘策略的合理阈值,建议使用“每秒刷盘”,同时评估一个broker正常的TPS,并将其作为“刷盘条数”的阈值(此值不能太小,比如TPS为20000,你设置“log.flush.interval.messages=1000”,这以为这broker仍然在不停的刷盘,性能极大降级并最终影响SLA)。

 

    4)num.partitions:每个Topic的partition个数,默认值为1,尽管admin工具可以修改此值,但是仍然建议大家在部署时就设置合理。

   如果你的集群是存储一些时序性有严格要求的消息,建议设置为1,或者设置为其他值、但是需要producer端使用合理的Partitioner确保消息一致性,以及consumer消费时不要混合消费多个partitions。

  如果你的集群对发送、消费的效率要求很高,那么partition的个数,可以设置的较大,具体partitions的个数应该设置为多少,尚没有"完美"的参考值,通常需要考虑broker集群的个数、每个broker磁盘并发的能力、broker上Topic的个数(以及它们各自partitions的个数总和)等;一个比较有粗略的参考值为,建议初步设置为broker节点的个数。

 

    5)日志保留策略:通常我们需要开启,设定kafka保存消息(数据文件)的时长,也限定consumer允许数据回溯的最大时间。查出时间阈值的日志文件,可以“压缩备份”,也可以删除。

    6)分组协调:主要适用于消费者,通常production环境,消费者时分组的,每组多个消费者使用相同的groupId;虽然消费者处理消息的效率基本差异不大,但是仍然存在消费者失效(或者block)等问题,此时Broker则可以决策哪些是“慢消费者”、“失效消费者”,以决定适时平衡(由其他在线的消费者接管消费服务)。

 

 

    2、Broker JVM参数修改(kafka-server-start.sh)

base_dir=$(dirname $0)
## 在base_dir参数之后增加
export KAFKA_HEAP_OPTS="-Xms12G -Xmx12G -XX:NewRatio=2 -XX:SurvivorRatio=8 -XX:MaxMetaspaceSize=512M -XX:CompressedClassSpaceSize=512M"
## 开启JMX监控,默认端口为5760
export JMX_PORT=${JMX_PORT:-5760}
## 如果你希望基于jolokia + HTTP方式输出监控数据,以被采集器获取,可以增加jolokia配置
## export KAFKA_OPTS="$KAFKA_OPTS -javaagent:/opt/jolokia/jolokia-jvm-1.5-agent.jar"

    只需要增加有关内存的相关参数即可。

 

    3、kafka-server-stop.sh

#将原来PIDS的获取方式,修改为“kafka\.kafka”,否则无法获取实际进程的ID

PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')

 

    主要是解决,此脚本无法正确找到kafka进程的问题。

 

    4、kafka broker与client兼容关系描述

    参考:https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix

    简述:0.9以上版本的client和broker,低版本client可以访问高版本的broker,但是高版本的client访问低版本的broker时可能存在兼容性问题。

 

    5、kafka consumer group的几种状态

    1)Empty:此group中没有任何消费者在线(曾经提交过offset),可能是新建的group。当一个“Empty”的group过期之后,其状态会迁移为“Dead”。

    2)PrepareingRebalance:准备进行rebalance,通常是此group中有消费者加入或者离开时(探测周期)触发,这意味着topic/partition在组内多个消费者之间重新分配。

    3)AwaitingSync:等待Group Leader重新分配topic/partitions。

    4)Stable:正常状态。

    5)Dead:Group内已经没有任何消费者(members),且其offset记录等meta信息即将被删除。

 

 

    

  • 大小: 347 KB
分享到:
评论

相关推荐

    kafka_2.12-1.1.0.tgz

    kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic ``` 在打开的控制台中,输入你想发送的消息,每按一次回车键,一条消息就会被发送到"my-topic"。 八、消费消息 同时,我们可以创建一...

    Kafka_2.11-1.1.0

    - **安装与配置**:讲解如何在不同操作系统上安装Kafka_2.11-1.1.0,并进行基本的配置。 - **监控与调优**:介绍监控Kafka性能的指标和工具,以及如何根据实际情况进行调优。 - **安全设置**:讨论如何启用SSL/...

    kafka-2.11-1.1.0-aarch64,ARM版本

    安装kafka_2.11-1.1.0-aarch64通常涉及解压文件,配置配置文件如server.properties,设置broker ID、端口、日志目录等参数,然后启动Kafka服务。需要注意的是,由于ARM架构的特殊性,可能需要调整某些默认配置以...

    kafka配置安装详解

    进入`/opt/software/kafka_2.12-1.1.0/config`目录,找到`server.properties`文件并进行必要的配置修改。 - **listeners**:配置Kafka监听器地址和端口。 - 示例配置:`listeners=PLAINTEXT://192.168.17.11:9092`...

    kafka java 下载的jar

    Kafka的核心组件包括生产者(Producer)、消费者(Consumer)和代理(Broker)。生产者负责发布消息到主题(Topic),消费者则订阅并消费这些消息,而代理是Kafka集群中的节点,它们接收、存储和转发消息。 在Java...

    kafka_2.12-1.0.0.zip

    1. **broker配置**:如`broker.id`,每个Kafka节点的唯一标识;`num.partitions`,默认分区数;`log.retention.hours`,消息保留时间。 2. **producer配置**:如`acks`,确认模式,决定何时认为消息已成功发送;`...

    kafka1.0.0文档

    **Broker 配置** 包括了一系列用于管理单个 Kafka 代理(broker)的设置,如监听端口、日志存储位置等。 ##### 3.2 Topic 配置 **Topic 配置** 用于定义特定主题的属性,如分区数量、副本因子等。 ##### 3.3 ...

    kafka_2.12-1.1.0.tgz源码包下载.txt

    Apache Kafka是开源的分布式流处理平台,也是高吞吐量的分布式跨平台订阅消息系统,主要包含Broker服务器、Topic消息类别、Partition物理分区、Producer生产者、Consumer消费者、Consumer Group消费组部分。

    kafka_2.11-1.0.0.tgz.zip

    其中,配置文件是操作Kafka的关键,如`server.properties`定义了broker的配置,`producer.properties`和`consumer.properties`分别用于设置生产者和消费者的参数。理解这些配置对于调整Kafka的性能至关重要。 安装...

    kafka_2.11-1.1.1安装包

    3. **配置文件**:修改`config/server.properties`配置文件,设置broker的ID、端口、日志存储路径等参数。 4. **启动Zookeeper**:Kafka依赖Zookeeper进行集群管理和元数据存储,需要先启动Zookeeper服务。 5. **...

    hive+kafka安装包

    2. 对于Kafka,你需要解压`kafka_2.12-1.1.0.tgz`,配置`server.properties`文件,指定broker的配置,如端口、日志目录等。Kafka也可能需要配置Zookeeper连接信息,因为它是用作Kafka的协调服务。 3. 在配置好Hive...

    kafka 安装 部署 实用

    - `bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic test` **8. 启动消费者** - **启动命令**: - `bin/windows/kafka-console-consumer.bat --zookeeper localhost:2181 --topic ...

    kafka安装包

    4. **配置环境变量**: 在系统的环境变量配置文件(如Linux的`.bashrc`或Windows的`System Environment Variables`)中添加Kafka的路径,例如: ``` export KAFKA_HOME=/path/to/kafka_2.11-1.1.0 export PATH=$...

    kafka在centos7上搭建及springboot集成kafka的小demo用例

    - 下载Kafka,例如`kafka_2.11-1.1.0.tgz`。 - 解压并重命名,配置Kafka环境变量。 - 创建日志目录,并编辑`server.properties`文件,设置`broker.id`、`port`、`log.dirs`和`zookeeper.connect`等关键参数。 - ...

    kafka-manager-1.3.3.20.zip

    Kafka Manager 1.3.3.20是专为Kafka 0.8.1.1至1.1.0版本设计的管理工具,这意味着它可以很好地兼容这些版本的Kafka集群。这个版本的Kafka Manager提供了一系列实用的功能,包括: 1. **集群管理**:用户可以通过...

    kafka-connector的使用.zip_kafka

    - **Kafka Integration**: Spark 应用程序需要配置 Kafka 的 broker 列表、主题等信息,然后创建 DStream(Discretized Stream)来读取或写入 Kafka 数据。 - **数据处理**: 一旦数据被导入 Spark,就可以利用 Spark...

    linux contos6.8下部署kafka集群的方法

    - 打开每台服务器上 `$KAFKA_HOME/config/server.properties` 文件,这是Kafka的主要配置文件。 - 设置`broker.id`,这是一个唯一的标识符,每台服务器上的值必须不同,例如:1, 2, 3。 - 配置`log.dirs`来指定...

    ELK 安装文档

    - **修改配置文件**:编辑Kafka的配置文件,配置broker ID、监听端口等信息。 - **启动Zookeeper**:启动Kafka的依赖组件Zookeeper。 - **启动Kafka**:启动Kafka服务。 - **创建主题**:使用Kafka命令创建新的消息...

Global site tag (gtag.js) - Google Analytics