一、配置并运行kafka服务器
1.在运行kafka服务器之前先搭建zookeeper环境
此步省略,可参考http://code727.iteye.com/blog/2360944
2.在server.properties中配置broker
# 当前机器在集群中的唯一标识,和zookeeper的myid性质一样 broker.id=0 # 当前kafka对外提供服务的端口,默认是9092 port=9092 # 这个参数默认是关闭的,在0.8.1有bug,DNS解析问题,失败率的问题。 # host.name=192.168.1.100 # borker节点进行网络处理的线程数 num.network.threads=3 # borker进行I/O处理的线程数 num.io.threads=8 # 排队等候IO线程执行的请求数,默认为500 queued.max.requests=128 # 消息存放的目录,这个目录可以配置为逗号分割的表达式,num.io.threads要大于这个目录的个数 # 如果配置多个目录,新创建的topic会把消息持久化到这些若干目录中分区数最少的那一个中 log.dirs=/kafka/9092/logs/ # 用于发送消息的缓冲区大小,数据不是立即就发送的,会先存储到缓冲区,当到达一定的大小后再发送,能提高性能 socket.send.buffer.bytes=102400 # 用于接收消息的缓冲区大小,当数据到达一定大小后再序列化到磁盘 socket.receive.buffer.bytes=102400 # 向kafka请求消息或者向kafka发送消息的最大数,不能超过java的堆栈大小 socket.request.max.bytes=104857600 # 是否让程序自动创建Topic,默认true,建议false auto.create.topics.enable=true # 默认的分区数,每个topic默认有1个分区数,单个topic的分区建议不要超过64个 # 例如test topic的分区在log.dirs目录下依次有test-0、test-1...、test7共8个文件夹 num.partitions=8 # 单条消息的最大长度 message.max.bytes=5242880 # 消息备份数目默认为1将不做复制,建议修改 # 当N>1时,如果一个副本失效了,N-1个还可以继续提供服务 default.replication.factor=2 # 抓取消息的最大字节数 replica.fetch.max.bytes=5242880 # 因为kafka的消息是以追加的形式写入文件,当单个文件大小超过这个值的时候,将创建一个新文件来存储 log.segment.bytes=1073741824 # 消息的最大持久化时间,168小时,7天 log.retention.hours=168 # 每隔多少毫秒去检查log.retention.hours配置的log失效时间,如有过期消息,则删除掉 log.retention.check.interval.ms=300000 # 是否启用log压缩,一般不用启用,启用的话可以提高性能 log.cleaner.enable=false # 为保证一致性服务,需配置zookeeper集群环境下,对外服务的地址和端口号,多个以逗号分隔 zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
3.启动broker
# linux命令行,daemon方式启动,加载指定的配置文件 ./kafka-server-start.sh -daemon ../config/server.properties # Windows命令行 kafka-server-start ../config/server.properties
4.运行producer测试
# linux命令行,连接9092 borker上的test topic bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # Windows命令行 bin/windows/kafka-console-producer --broker-list localhost:9092 --topic test
5.运行consumer测试
# Linux命令行,指向producer所连接上的zookeeper节点,并监听test topic bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning # Windows命令行 bin/windows/kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning
6.在producer端输入字符串并回车,consumer端显示则表示成功。
7.手动创建Topic分区
当server.properties中的配置项auto.create.topics.enable为false时,表示需要我们自行为某个topic创建分区
# Linux命令行,创建备份数和分区数分别为3和8,且名称为test的topic bin/kafka-topics --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 8 --topic test # Windows命令行 bin/windows/kafka-topics --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 8 --topic test
二、关于Producer和Consumer配置
1.producer配置
可以在org.apache.kafka.clients.producer.ProducerConfig中找到相关的说明
# kafka对外服务的host和port,在集群环境中多个服务地址以逗号分隔 bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093 #生产一条消息后要求broker对producer进行ACK确认回复的策略 #0:无需等待任何broker节点进行ACK回复即认为生产成功,在这种情况retries配置将不再生效,并且producer得到的记录偏移量(offset)始终为-1 #1:默认值,只需等待leader节点进行ACK回复后即认为生产成功 #all:等待所有节点进行ACK回复后才认为生产成功 acks=1 # 批处理延迟时间上限,单位:毫秒,默认为0(无延迟) # 通常发生在负载下,当记录到达的速度比它们可以发送的速度快时,通过添加这个配置来人为增加延迟,例如:linger.ms=5将具有减少发送请求数量的效果,但发送的记录总计将多达5ms的延迟 linger.ms=5 # producer的唯一标识 # 目的在于通过允许在服务器端请求记录中包括一个逻辑应用程序名称,能够跟踪ip/port的请求源。 client.id=producer-0 # 发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,将使用操作系统默认值 send.buffer.bytes=1024 # 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,将使用操作系统默认值。个人认为producer接收数据主要是一些broker的ACK回复 receive.buffer.bytes=1024 # 单次请求的最大字节数,主要为了避免发送巨大的请求 max.request.size=1024 # 尝试重新连接到制定主机之前等待的时间,单位:毫秒 # 主要是为了避免在并发环境中主机失效后,连接瞬间堆积的问题 reconnect.backoff.ms=3000 # 由于send和partitionsFor方法可能会因为缓冲区已满或元数据不可用而被阻塞,因此这个配置将用于控制阻塞多长时间(毫秒),当超过这个值还未返回时将被释放,抛出TimeoutException # 可用来替换被废弃的metadata.fetch.timeout.ms和block.on.buffer.full配置 max.block.ms=3000 # 尝试向指定topic重试失败的请求之前等待的时间,单位:毫秒 # 这避免了在一些故障情况下在密集的重复发送请求。 retry.backoff.ms=3000 # producer用来缓冲等待发送到服务器的记录的内存总字节数 # 当发送速度比传递到服务器的速度还快时,将会产生max.block.ms个单位时间内的阻塞,并抛出异常,因此需要通过此配置来控制发送的速度,降低异常的发生 # 通常情况下,此设置应大致对应于producer将使用的总内存 buffer.memory=33554432(32768MB=32G) # producer生成的所有数据的压缩类型 # none:不压缩,默认值 # 其余还有gzip、gzip和lz4,压缩是完整的数据批次,因此批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩) compression.type=none # 计算度量样本的时间窗口,单位:毫秒 metrics.sample.window.ms=3000 # 维持计算度量的样本数 metrics.num.samples=3 # 用作度量报告器的类的列表,实现org.apache.kafka.common.metrics.MetricsReporter接口,JmxReporter是实现之一,注册JMX统计信息。 metric.reporters=org.apache.kafka.common.metrics.JmxReporter # 客户端被阻止之前在单个连接上发送的未确认请求的最大数量 # 如果设置为大于1并且发送失败,则存在消息由于重试(如果重试retries被启用)而重新排序。 max.in.flight.requests.per.connection=1 # 发送消息失败后的重试次数 # 如果设置了重试次数,而将max.in.flight.requests.per.connection设置为1,将潜在地更改记录排序,因为如果两个批次的消息发送到单个分区,并且第一个失败并重试,但第二个成功,则第二批中的记录可能会先入队列。 retries=3 # 制定消息键序列化实现类 key.serializer=org.apache.kafka.common.serialization.StringSerializer # 指定消息值序列化实现类 value.serializer=org.apache.kafka.common.serialization.StringSerializer # 指定的毫秒数后关闭空闲连接,即空闲连接的最大保持时间 connections.max.idle.ms=60000 # 分区接口(org.apache.kafka.clients.producer.Partitioner)的实现类 partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner # producer发出请求后等待响应的时间,单位:毫秒 # 如果超时后未得到响应,producer将在retries次数内,每过一个timeout周期发出一次重试请求,直到得到响应或重试用尽为止 # 可用来替换废弃的timeout.ms配置 request.timeout.ms=50002.Consumer配置
可以在org.apache.kafka.clients.consumer.ConsumerConfig中找到相关说明
# 指定consumer在哪些kafka服务器上产生消费,集群环境下多个地址以逗号分隔 bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093 # 消费者所属组的唯一标识 # 如果消费者通过使用subscribe或基于Kafka的offset偏移管理策略使用组管理功能,则需要此属性 # 当有多个相同消费者(监听)时,如果它们的group.id都相同,则消息只能由组内某一个成员消费,在消费者集群环境下,将避免重复消费的问题,要特别注意这点 group.id=consumer_group_0 # 对poll方法的每次调用中返回的最大记录数 max.poll.records=10 # 连续调用poll方法之间的时间间隔,单位毫秒 max.poll.interval.ms=1500 # 检测消费者故障的超时超时,单位毫秒 # 消费者会定期发出心跳来表示其活力,如果代理在此会话超时之前没有收到心跳,则broker会将该消费者从组中删除,并重新计算负载均衡 # group.min.session.timeout.ms >= session.timeout.ms >= group.max.session.timeout.ms session.timeout.ms=60000 # 发出心跳的间隔时间,单位毫秒 # 用于确保消费者的会话保持活跃并且当新消费者加入或离开组时促发重新计算负载平衡 # 该值必须小于session.timeout.ms,但通常应设置不高于此值的1/3,可以调整得更低,以控制重新计算均衡的预期时间 heartbeat.interval.ms=6000 # 如果为true,则将在后台定期提交消费者的偏移量(offset) enable.auto.commit=true # 如果enable.auto.commit设置为true,则消费者将以这个时间为间隔将自动提交一次offset给broker auto.commit.interval.ms=5000 # 分区分配策略的类名 # 客户端将在分组管理使用时用于在消费者实例之间分配分区所有权 partition.assignment.strategy= # 当Kafka没有初始偏移或如果当前偏移在服务器上不再存在时的处理方式 # earliest:自动将偏移重置为最早偏移,可能会导致重复消费 # latest:自动将偏移重置为最新偏移,可能会导致未被消费的消息丢失 # none:如果没有为消费者组找到以前的偏移,则向消费者抛出异常 auto.offset.reset=none # 服务器针对抓取请求返回的最小数据量(字节数) # 如果数据不足,请求将在应答请求之前等待多少数据累积 # 默认设置为1个字节表示只要单个字节的数据可用或者读取请求超时等待数据到达,就会应答读取请求# 将此值设置为大于1将导致服务器等待大量数据累积,这可能以一些额外延迟为代价提高服务器吞吐量 fetch.min.bytes=1 # 服务器针对抓取请求返回的最大数据量(字节数),默认为52428800(500G) # 这不是绝对最大值,如果提取的第一个非空分区中的第一个消息大于此值,则消息仍然会被返回,以确保消费者可以取得结果 fetch.max.bytes=52428800 # 如果没有足够的数据来立即抓取到fetch.min.bytes指定的字节数,则服务器在应答提取请求之前将阻止的最长时间 fetch.max.wait.ms=3000 # 元数据的最大生命周期 # 超出这个时间后,即使没有任何分区leader比昂更以主动发现任何新的broker或分区,也强制刷新元数据 metadata.max.age.ms=60000 # 服务器返回每个分区的最大数据量,默认值为1048576(1G) # 如果抓取的第一个非空分区中的第一个消息大于此值,消息仍然会返回,以确保消费者可以取得结果 # 此值不能超过broker的message.max.bytes,topic的max.message.bytes,以及fetch.max.bytes max.partition.fetch.bytes=1048576 # 和producer的同名配置一样 send.buffer.bytes=1024 # 和producer的同名配置一样 receive.buffer.bytes=1024 # 和producer的同名配置一样 client.id=consumer-0 # 和producer的同名配置一样 reconnect.backoff.ms # 和producer的同名配置一样 retry.backoff.ms # 和producer的同名配置一样 metrics.sample.window.ms # 和producer的同名配置一样 metrics.num.samples # 和producer的同名配置一样 metric.reporters # 自动检查所消耗记录的CRC32(检错算法),这确保没有发生消息的磁盘损坏 # 此检查会增加一些开销,因此在寻求极高性能的情况下可能会禁用此检查 check.crcs=false # 消息键的反序列化器实现类 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer # 消息值得反序列化器实现类 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # 和producer的同名配置一样 connections.max.idle.ms=600000 # 和producer的同名配置一样 request.timeout.ms=3000 # 拦截消费的实现类 # 此实现类需实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口 interceptor.classes=org.apache.kafka.clients.consumer.ConsumerInterceptor # 来自内部topic的信息(例如offset)是否应向消费者展示 # 如果设置为true(默认),则从内部topic接收记录的唯一方法是订阅 exclude.internal.topics=true
相关推荐
### Kafka集群部署与运维知识点详解 #### 一、Kafka概览 Kafka是一种高性能的分布式消息系统,具有以下特点: - **数据持久性**:Kafka通过高效的磁盘数据结构来实现消息的持久化存储,即使面对大量数据也能保持...
Kafka集群搭建和使用过程涉及多个技术要点和配置项,包括SASL安全机制、ACL权限设置、Kafka基础概念以及安装配置步骤等。下面将详细介绍这些知识点。 首先,SASL(Simple Authentication and Security Layer)是为C...
**Kafka集群配置详解——基于3节点实例** 在大数据处理领域,Apache Kafka作为一个高吞吐量、分布式的发布订阅消息系统,被广泛应用于实时数据流处理。本篇将详细解析如何在Linux环境下配置一个3节点的Kafka集群,...
**Kafka集群安装部署全量指南** Apache Kafka是一款开源流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它设计为一个高吞吐量、分布式的消息队列系统,用于处理实时数据流。Kafka通常与ZooKeeper一起使用...
zookeeper配置、集群部署 kafka配置、集群部署 Window平台下
(3)配置Kafka集群:配置Kafka集群的参数,例如副本因子、分区数量等。 (4)启动Kafka集群:启动Kafka集群,确保所有节点正常运行。 (5)测试Kafka集群:测试Kafka集群的可用性和性能。 3. 遇到的主要问题及...
本文档详细介绍了Kafka集群的部署过程,涵盖了环境信息、JDK安装、Kafka安装、配置文件修改、集群启动等步骤。通过阅读本文档,读者可以了解Kafka集群的部署过程,并掌握相关的技术要点。 一、环境信息 在部署...
总的来说,搭建Kafka集群涉及到虚拟机安装、JDK安装配置、Zookeeper集群配置和启动等关键步骤。这些步骤的目的是确保Kafka集群能够稳定运行,提供高性能的消息队列服务。在实际操作过程中,还应注意检查防火墙设置、...
**Kafka集群配置** 1. **节点设置**:首先,你需要至少三个Kafka节点来构建一个基础的集群。每个节点都需要有自己的配置文件`server.properties`,其中包含了节点的ID(`broker.id`),这是集群中的唯一标识。例如...
### Kafka集群及Kafka-Manager安装部署 #### 一、Kafka集群的安装与配置 **1. 工作环境准备** - **JDK**:确保安装了JDK 1.8.0_60版本。这一步骤至关重要,因为Kafka依赖于Java运行时环境。 - **Zookeeper**...
### Kafka集群部署步骤详解 #### 一、安装Java环境(JDK 1.8) Kafka作为基于Java语言开发的消息中间件,其运行环境需要Java支持。为了确保Kafka能够正常运行,首先需要在每台服务器上安装Java环境。推荐使用JDK ...
【Kafka集群安装部署-自带zookeeper】 Apache Kafka是一个分布式流...总之,部署Kafka集群需要考虑多方面的配置,包括Zookeeper、Kafka本身以及网络和硬件环境。理解这些概念和步骤是构建稳定、高效Kafka集群的关键。
本篇文档将详细介绍如何在Linux环境中搭建Kafka集群,同时结合Hadoop和Spark的分布式安装,构建一个完整的数据处理平台。 首先,搭建Kafka集群的基础条件包括: 1. 至少一台Linux服务器,但为了高可用性,推荐多台...
以上只是Kafka集群配置的部分要点,实际部署时还需考虑监控、故障恢复、网络拓扑等因素。理解并熟练掌握这些配置,能帮助你构建稳定、高效、可靠的Kafka集群。在实践中不断优化,可以提升Kafka在大数据处理、实时流...
现在,我们来深入探讨《Kafka集群部署》配置文件中的关键知识点。 1. **Kafka集群**: Kafka集群由一个或多个服务器(称为Brokers)组成,它们负责存储和转发消息。为了实现高可用性和容错性,通常会设置多个副本。...
本手册主要涵盖了Kafka集群的部署步骤,包括Zookeeper和Kafka组件的配置。Kafka是一种高吞吐量的分布式发布订阅消息系统,而Zookeeper是Apache的一个分布式协调服务,它在Kafka集群中扮演着重要的角色。 一、...
本文详细介绍了如何在Linux环境下安装Kafka及搭建Kafka集群的过程,并对关键配置进行了说明。此外,还简要介绍了Kafka管理工具的安装方法。通过这些步骤,用户可以顺利地部署并使用Kafka,为实际应用场景提供支持。
总之,搭建Kafka集群涉及多个步骤,包括Zookeeper集群的配置、Kafka的安装与配置、主题创建以及服务启动。正确配置和管理Kafka集群对于实现高效、稳定的数据流处理至关重要。随着对Kafka的深入理解和实践,你可以...
总的来说,Kafka集群的部署涉及了云基础设施的规划、软件的安装配置以及服务的启动与测试。理解这些步骤对于构建稳定、可靠的分布式消息系统至关重要。在云计算环境中,Kafka的高效数据处理能力和弹性扩展性使其成为...
通过本文档的学习,我们不仅深入了解了 Kafka 的架构原理和使用方法,还掌握了 Kafka 集群的安装部署过程。此外,我们还学习了 Kafka 生产者和消费者的 Java API 使用方法,以及 JMS 规范的相关概念。这些知识对于...