`
code727
  • 浏览: 67077 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Kafka集群配置部署

 
阅读更多

一、配置并运行kafka服务器

1.在运行kafka服务器之前先搭建zookeeper环境

此步省略,可参考http://code727.iteye.com/blog/2360944

2.在server.properties中配置broker

# 当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=0

# 当前kafka对外提供服务的端口,默认是9092
port=9092

# 这个参数默认是关闭的,在0.8.1有bug,DNS解析问题,失败率的问题。
# host.name=192.168.1.100 

# borker节点进行网络处理的线程数
num.network.threads=3 

# borker进行I/O处理的线程数
num.io.threads=8 

# 排队等候IO线程执行的请求数,默认为500
queued.max.requests=128

# 消息存放的目录,这个目录可以配置为逗号分割的表达式,num.io.threads要大于这个目录的个数
# 如果配置多个目录,新创建的topic会把消息持久化到这些若干目录中分区数最少的那一个中
log.dirs=/kafka/9092/logs/ 

# 用于发送消息的缓冲区大小,数据不是立即就发送的,会先存储到缓冲区,当到达一定的大小后再发送,能提高性能
socket.send.buffer.bytes=102400 

# 用于接收消息的缓冲区大小,当数据到达一定大小后再序列化到磁盘
socket.receive.buffer.bytes=102400

# 向kafka请求消息或者向kafka发送消息的最大数,不能超过java的堆栈大小
socket.request.max.bytes=104857600 

# 是否让程序自动创建Topic,默认true,建议false
auto.create.topics.enable=true

# 默认的分区数,每个topic默认有1个分区数,单个topic的分区建议不要超过64个
# 例如test topic的分区在log.dirs目录下依次有test-0、test-1...、test7共8个文件夹
num.partitions=8

# 单条消息的最大长度
message.max.bytes=5242880

# 消息备份数目默认为1将不做复制,建议修改
# 当N>1时,如果一个副本失效了,N-1个还可以继续提供服务
default.replication.factor=2 

# 抓取消息的最大字节数
replica.fetch.max.bytes=5242880

# 因为kafka的消息是以追加的形式写入文件,当单个文件大小超过这个值的时候,将创建一个新文件来存储
log.segment.bytes=1073741824 

# 消息的最大持久化时间,168小时,7天
log.retention.hours=168

# 每隔多少毫秒去检查log.retention.hours配置的log失效时间,如有过期消息,则删除掉
log.retention.check.interval.ms=300000 

# 是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false 

# 为保证一致性服务,需配置zookeeper集群环境下,对外服务的地址和端口号,多个以逗号分隔
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

 3.启动broker

# linux命令行,daemon方式启动,加载指定的配置文件
./kafka-server-start.sh -daemon ../config/server.properties

# Windows命令行
kafka-server-start ../config/server.properties

 4.运行producer测试

# linux命令行,连接9092 borker上的test topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  

# Windows命令行
bin/windows/kafka-console-producer --broker-list localhost:9092 --topic test 

 5.运行consumer测试

# Linux命令行,指向producer所连接上的zookeeper节点,并监听test topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 

# Windows命令行
bin/windows/kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning

 6.在producer端输入字符串并回车,consumer端显示则表示成功。

7.手动创建Topic分区

当server.properties中的配置项auto.create.topics.enable为false时,表示需要我们自行为某个topic创建分区

# Linux命令行,创建备份数和分区数分别为3和8,且名称为test的topic
bin/kafka-topics --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 8 --topic test

# Windows命令行
bin/windows/kafka-topics --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 8 --topic test

 

二、关于Producer和Consumer配置

1.producer配置

可以在org.apache.kafka.clients.producer.ProducerConfig中找到相关的说明

 

# kafka对外服务的host和port,在集群环境中多个服务地址以逗号分隔
bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093

#生产一条消息后要求broker对producer进行ACK确认回复的策略
#0:无需等待任何broker节点进行ACK回复即认为生产成功,在这种情况retries配置将不再生效,并且producer得到的记录偏移量(offset)始终为-1
#1:默认值,只需等待leader节点进行ACK回复后即认为生产成功
#all:等待所有节点进行ACK回复后才认为生产成功
acks=1 

# 批处理延迟时间上限,单位:毫秒,默认为0(无延迟)
# 通常发生在负载下,当记录到达的速度比它们可以发送的速度快时,通过添加这个配置来人为增加延迟,例如:linger.ms=5将具有减少发送请求数量的效果,但发送的记录总计将多达5ms的延迟
linger.ms=5

# producer的唯一标识
# 目的在于通过允许在服务器端请求记录中包括一个逻辑应用程序名称,能够跟踪ip/port的请求源。
client.id=producer-0

# 发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,将使用操作系统默认值
send.buffer.bytes=1024

# 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,将使用操作系统默认值。个人认为producer接收数据主要是一些broker的ACK回复
receive.buffer.bytes=1024

# 单次请求的最大字节数,主要为了避免发送巨大的请求
max.request.size=1024

# 尝试重新连接到制定主机之前等待的时间,单位:毫秒
# 主要是为了避免在并发环境中主机失效后,连接瞬间堆积的问题
reconnect.backoff.ms=3000

# 由于send和partitionsFor方法可能会因为缓冲区已满或元数据不可用而被阻塞,因此这个配置将用于控制阻塞多长时间(毫秒),当超过这个值还未返回时将被释放,抛出TimeoutException
# 可用来替换被废弃的metadata.fetch.timeout.ms和block.on.buffer.full配置
max.block.ms=3000

# 尝试向指定topic重试失败的请求之前等待的时间,单位:毫秒
# 这避免了在一些故障情况下在密集的重复发送请求。
retry.backoff.ms=3000

# producer用来缓冲等待发送到服务器的记录的内存总字节数
# 当发送速度比传递到服务器的速度还快时,将会产生max.block.ms个单位时间内的阻塞,并抛出异常,因此需要通过此配置来控制发送的速度,降低异常的发生
# 通常情况下,此设置应大致对应于producer将使用的总内存
buffer.memory=33554432(32768MB=32G)

# producer生成的所有数据的压缩类型
# none:不压缩,默认值
# 其余还有gzip、gzip和lz4,压缩是完整的数据批次,因此批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩) 
compression.type=none

# 计算度量样本的时间窗口,单位:毫秒
metrics.sample.window.ms=3000

# 维持计算度量的样本数
metrics.num.samples=3

# 用作度量报告器的类的列表,实现org.apache.kafka.common.metrics.MetricsReporter接口,JmxReporter是实现之一,注册JMX统计信息。
metric.reporters=org.apache.kafka.common.metrics.JmxReporter

# 客户端被阻止之前在单个连接上发送的未确认请求的最大数量
# 如果设置为大于1并且发送失败,则存在消息由于重试(如果重试retries被启用)而重新排序。
max.in.flight.requests.per.connection=1

# 发送消息失败后的重试次数
# 如果设置了重试次数,而将max.in.flight.requests.per.connection设置为1,将潜在地更改记录排序,因为如果两个批次的消息发送到单个分区,并且第一个失败并重试,但第二个成功,则第二批中的记录可能会先入队列。
retries=3

# 制定消息键序列化实现类
key.serializer=org.apache.kafka.common.serialization.StringSerializer

# 指定消息值序列化实现类
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# 指定的毫秒数后关闭空闲连接,即空闲连接的最大保持时间
connections.max.idle.ms=60000

# 分区接口(org.apache.kafka.clients.producer.Partitioner)的实现类
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner

# producer发出请求后等待响应的时间,单位:毫秒
# 如果超时后未得到响应,producer将在retries次数内,每过一个timeout周期发出一次重试请求,直到得到响应或重试用尽为止
# 可用来替换废弃的timeout.ms配置
request.timeout.ms=5000
 2.Consumer配置

 可以在org.apache.kafka.clients.consumer.ConsumerConfig中找到相关说明

# 指定consumer在哪些kafka服务器上产生消费,集群环境下多个地址以逗号分隔
bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093

# 消费者所属组的唯一标识
# 如果消费者通过使用subscribe或基于Kafka的offset偏移管理策略使用组管理功能,则需要此属性
# 当有多个相同消费者(监听)时,如果它们的group.id都相同,则消息只能由组内某一个成员消费,在消费者集群环境下,将避免重复消费的问题,要特别注意这点
group.id=consumer_group_0

# 对poll方法的每次调用中返回的最大记录数
max.poll.records=10

# 连续调用poll方法之间的时间间隔,单位毫秒
max.poll.interval.ms=1500

# 检测消费者故障的超时超时,单位毫秒
# 消费者会定期发出心跳来表示其活力,如果代理在此会话超时之前没有收到心跳,则broker会将该消费者从组中删除,并重新计算负载均衡
# group.min.session.timeout.ms >= session.timeout.ms >= group.max.session.timeout.ms
session.timeout.ms=60000

# 发出心跳的间隔时间,单位毫秒
# 用于确保消费者的会话保持活跃并且当新消费者加入或离开组时促发重新计算负载平衡
# 该值必须小于session.timeout.ms,但通常应设置不高于此值的1/3,可以调整得更低,以控制重新计算均衡的预期时间
heartbeat.interval.ms=6000

# 如果为true,则将在后台定期提交消费者的偏移量(offset)
enable.auto.commit=true

# 如果enable.auto.commit设置为true,则消费者将以这个时间为间隔将自动提交一次offset给broker
auto.commit.interval.ms=5000

# 分区分配策略的类名
# 客户端将在分组管理使用时用于在消费者实例之间分配分区所有权
partition.assignment.strategy=

# 当Kafka没有初始偏移或如果当前偏移在服务器上不再存在时的处理方式
# earliest:自动将偏移重置为最早偏移,可能会导致重复消费
# latest:自动将偏移重置为最新偏移,可能会导致未被消费的消息丢失
# none:如果没有为消费者组找到以前的偏移,则向消费者抛出异常 
auto.offset.reset=none

# 服务器针对抓取请求返回的最小数据量(字节数) 
# 如果数据不足,请求将在应答请求之前等待多少数据累积
# 默认设置为1个字节表示只要单个字节的数据可用或者读取请求超时等待数据到达,就会应答读取请求# 将此值设置为大于1将导致服务器等待大量数据累积,这可能以一些额外延迟为代价提高服务器吞吐量
fetch.min.bytes=1

# 服务器针对抓取请求返回的最大数据量(字节数),默认为52428800(500G)
# 这不是绝对最大值,如果提取的第一个非空分区中的第一个消息大于此值,则消息仍然会被返回,以确保消费者可以取得结果
fetch.max.bytes=52428800

# 如果没有足够的数据来立即抓取到fetch.min.bytes指定的字节数,则服务器在应答提取请求之前将阻止的最长时间
fetch.max.wait.ms=3000

# 元数据的最大生命周期
# 超出这个时间后,即使没有任何分区leader比昂更以主动发现任何新的broker或分区,也强制刷新元数据
metadata.max.age.ms=60000

# 服务器返回每个分区的最大数据量,默认值为1048576(1G)
# 如果抓取的第一个非空分区中的第一个消息大于此值,消息仍然会返回,以确保消费者可以取得结果
# 此值不能超过broker的message.max.bytes,topic的max.message.bytes,以及fetch.max.bytes
max.partition.fetch.bytes=1048576

# 和producer的同名配置一样
send.buffer.bytes=1024

# 和producer的同名配置一样
receive.buffer.bytes=1024

# 和producer的同名配置一样
client.id=consumer-0
 
# 和producer的同名配置一样
reconnect.backoff.ms
 
# 和producer的同名配置一样
retry.backoff.ms
 
# 和producer的同名配置一样
metrics.sample.window.ms
 
# 和producer的同名配置一样
metrics.num.samples
 
# 和producer的同名配置一样
metric.reporters
 
# 自动检查所消耗记录的CRC32(检错算法),这确保没有发生消息的磁盘损坏
# 此检查会增加一些开销,因此在寻求极高性能的情况下可能会禁用此检查
check.crcs=false
 
# 消息键的反序列化器实现类
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
# 消息值得反序列化器实现类
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
# 和producer的同名配置一样
connections.max.idle.ms=600000
 
# 和producer的同名配置一样
request.timeout.ms=3000
 
# 拦截消费的实现类
# 此实现类需实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口
interceptor.classes=org.apache.kafka.clients.consumer.ConsumerInterceptor
 
# 来自内部topic的信息(例如offset)是否应向消费者展示
# 如果设置为true(默认),则从内部topic接收记录的唯一方法是订阅
exclude.internal.topics=true

 

分享到:
评论

相关推荐

    sblim-gather-provider-2.2.8-9.el7.x64-86.rpm.tar.gz

    1、文件内容:sblim-gather-provider-2.2.8-9.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/sblim-gather-provider-2.2.8-9.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊

    基于pringboot框架的图书进销存管理系统的设计与实现(Java项目编程实战+完整源码+毕设文档+sql文件+学习练手好项目).zip

    本图书进销存管理系统管理员功能有个人中心,用户管理,图书类型管理,进货订单管理,商品退货管理,批销订单管理,图书信息管理,客户信息管理,供应商管理,库存分析管理,收入金额管理,应收金额管理,我的收藏管理。 用户功能有个人中心,图书类型管理,进货订单管理,商品退货管理,批销订单管理,图书信息管理,客户信息管理,供应商管理,库存分析管理,收入金额管理,应收金额管理。因而具有一定的实用性。 本站是一个B/S模式系统,采用Spring Boot框架,MYSQL数据库设计开发,充分保证系统的稳定性。系统具有界面清晰、操作简单,功能齐全的特点,使得图书进销存管理系统管理工作系统化、规范化。本系统的使用使管理人员从繁重的工作中解脱出来,实现无纸化办公,能够有效的提高图书进销存管理系统管理效率。 关键词:图书进销存管理系统;Spring Boot框架;MYSQL数据库

    2024中国在人工智能领域的创新能力如何研究报告.pdf

    2024中国在人工智能领域的创新能力如何研究报告.pdf

    安全生产_人脸识别_移动目标跟踪_智能管控平台技术实现与应用_1741777778.zip

    人脸识别项目实战

    人脸识别_TF2_Facenet_训练预测应用仓库_1741778670.zip

    人脸识别项目实战

    安全人脸识别_对抗攻击_多模型集成_减少扰动_竞赛方案_Ne_1741779504.zip

    人脸识别项目实战

    Python实现基于CEEMDAN完全自适应噪声集合经验模态分解时间序列信号分解的详细项目实例(含完整的程序,GUI设计和代码详解)

    内容概要:本文档详细介绍了基于CEEMDAN(完全自适应噪声集合经验模态分解)的方法实现时间序列信号分解的具体项目。文中涵盖项目背景介绍、主要目标、面临的挑战及解决方案、技术创新点、应用领域等多方面内容。项目通过多阶段流程(数据准备、模型设计与构建、性能评估、UI设计),并融入多项关键技术手段(自适应噪声引入、并行计算、机器学习优化等)以提高非线性非平稳信号的分析质量。同时,该文档包含详细的模型架构描述和丰富的代码样例(Python代码),有助于开发者直接参考与复用。 适合人群:具有时间序列分析基础的科研工作者、高校教师与研究生,从事信号处理工作的工程技术人员,或致力于数据科学研究的从业人员。 使用场景及目标:此项目可供那些面临时间序列数据中噪声问题的人群使用,尤其适用于需从含有随机噪音的真实世界信号里提取有意义成分的研究者。具体场景包括但不限于金融市场趋势预测、设备故障预警、医疗健康监控以及环境质量变动跟踪等,旨在提供一种高效的信号分离和分析工具,辅助专业人士进行精准判断和支持决策。 其他说明:本文档不仅限于理论讲解和技术演示,更着眼于实际工程项目落地应用,强调软硬件资源配置、系统稳定性测试等方面的细节考量。通过完善的代码实现说明以及GUI界面设计指南,使读者能够全面理解整个项目的开发流程,同时也鼓励后续研究者基于已有成果继续创新拓展,探索更多的改进空间与发展机遇。此外,针对未来可能遇到的各种情况,提出了诸如模型自我调整、多模态数据融合等发展方向,为长期发展提供了思路指导。

    监护人,小孩和玩具数据集 4647张原始图片 监护人 食物 孩子 玩具 精确率可达85.4% pasical voc xml格式

    监护人,小孩和玩具数据集 4647张原始图片 监护人 食物 孩子 玩具 精确率可达85.4% pasical voc xml格式

    根据提供的内容可以构建以下_1741777949.zip

    人脸识别项目实战

    `计算机视觉_人脸识别_Python_OpenCV_树莓派毕业设计`.zip

    人脸识别项目实战

    智慧生产企业园区解决方案PPT(54页).pptx

    在智慧园区建设的浪潮中,一个集高效、安全、便捷于一体的综合解决方案正逐步成为现代园区管理的标配。这一方案旨在解决传统园区面临的智能化水平低、信息孤岛、管理手段落后等痛点,通过信息化平台与智能硬件的深度融合,为园区带来前所未有的变革。 首先,智慧园区综合解决方案以提升园区整体智能化水平为核心,打破了信息孤岛现象。通过构建统一的智能运营中心(IOC),采用1+N模式,即一个智能运营中心集成多个应用系统,实现了园区内各系统的互联互通与数据共享。IOC运营中心如同园区的“智慧大脑”,利用大数据可视化技术,将园区安防、机电设备运行、车辆通行、人员流动、能源能耗等关键信息实时呈现在拼接巨屏上,管理者可直观掌握园区运行状态,实现科学决策。这种“万物互联”的能力不仅消除了系统间的壁垒,还大幅提升了管理效率,让园区管理更加精细化、智能化。 更令人兴奋的是,该方案融入了诸多前沿科技,让智慧园区充满了未来感。例如,利用AI视频分析技术,智慧园区实现了对人脸、车辆、行为的智能识别与追踪,不仅极大提升了安防水平,还能为园区提供精准的人流分析、车辆管理等增值服务。同时,无人机巡查、巡逻机器人等智能设备的加入,让园区安全无死角,管理更轻松。特别是巡逻机器人,不仅能进行360度地面全天候巡检,还能自主绕障、充电,甚至具备火灾预警、空气质量检测等环境感知能力,成为了园区管理的得力助手。此外,通过构建高精度数字孪生系统,将园区现实场景与数字世界完美融合,管理者可借助VR/AR技术进行远程巡检、设备维护等操作,仿佛置身于一个虚拟与现实交织的智慧世界。 最值得关注的是,智慧园区综合解决方案还带来了显著的经济与社会效益。通过优化园区管理流程,实现降本增效。例如,智能库存管理、及时响应采购需求等举措,大幅减少了库存积压与浪费;而设备自动化与远程监控则降低了维修与人力成本。同时,借助大数据分析技术,园区可精准把握产业趋势,优化招商策略,提高入驻企业满意度与营收水平。此外,智慧园区的低碳节能设计,通过能源分析与精细化管理,实现了能耗的显著降低,为园区可持续发展奠定了坚实基础。总之,这一综合解决方案不仅让园区管理变得更加智慧、高效,更为入驻企业与员工带来了更加舒适、便捷的工作与生活环境,是未来园区建设的必然趋势。

    第八届全国大学生创新创业年会-创新创业展示项目集

    本届年会的主题是“青春梦想创新创业”。通过学术论文报告、创新创业项目展示、创业项目推介、工作研讨、联谊活动、大会报告等活动,全面展示大学生最新的创新创业成果。年会共收到491所高校推荐的学术论文756篇、创新创业展示项目721项、创业推介项目156项,合计1633项,为历届年会数量最高。经过36所“985”高校相关学科专家的初评以及国家级大学生创新创业训练计划专家组的复选,最终遴选出可参加本次年会的学术论文180篇,创新创业展示项目150个,创业推介项目45项,共计375项,涉及30个省市的236所高校。年会还收到了来自澳门特别行政区、俄罗斯的13项学术论文及参展项目。这些材料集中反映了各高校最新的创新创业教育成果,也直接体现了当代大学生的创新思维和实践能力。

    人脸识别_实时_ArcFace_多路识别技术_JavaScr_1741771263.zip

    人脸识别项目实战

    6ES7215-1AG40-0XB0-V04.04.01固件4.5

    6ES7215-1AG40-0XB0_V04.04.01固件4.5

    在无人机上部署SchurVins的yaml配置文件

    在无人机上部署SchurVins的yaml配置文件

    uniapp实战商城类app和小程序源码​​​​​​.rar

    uniapp实战商城类app和小程序源码,包含后端API源码和交互完整源码。

    基于MobileNet轻量级网络实现的常见30多种食物分类

    基于MobileNet轻量级网络实现的常见30多种食物分类,包含数据集、训练脚本、验证脚本、推理脚本等等。 数据集总共20k左右,推理的形式是本地的网页推理

    2024年央国企RPA市场研究报.pdf

    2024年央国企RPA市场研究报.pdf

    VSCodeSetup-x64-1.98.0.rar

    VSCodeSetup-x64-1.98.0.rar vscode是一种简化且高效的代码编辑器,同时支持诸如调试,任务执行和版本管理之类的开发操作。它的目标是提供一种快速的编码编译调试工具。然后将其余部分留给IDE。vscode集成了所有一款现代编辑器所应该具备的特性,包括语法高亮、可定制的热键绑定、括号匹配、以及代码片段收集等。 Visual Studio Code(简称VSCode)是Microsoft开发的代码编辑器,它支持Windows,Linux和macOS等操作系统以及开源代码。它支持测试,并具有内置的Git版本控制功能以及开发环境功能,例如代码完成(类似于IntelliSense),代码段和代码重构等。编辑器支持用户定制的配置,例如仍在编辑器中时,可以更改各种属性和参数,例如主题颜色,键盘快捷键等,内置的扩展程序管理功能。

    日用品玻璃行业数字化转型:生产管理痛点与工业互联网平台解决方案

    内容概要:本文介绍了日用品玻璃行业的数字化解决方案,针对玻璃制品从原料制备、熔融到成型及深加工等一系列生产过程进行了详细的梳理。文中指出玻璃日用品制造业面临设备不停止运转造成的成本居高不下、频繁的小批量多款式订单切换带来的转产效率低下、以及在成型阶段的质量控制难度较大等严峻的问题,即'一高两低'的问题,并提出构建工业互联网平台,通过采用工业大数据平台等手段来克服现有挑战,达成生产全流程的数据贯通与优化。 适用人群:日用品玻璃企业的高级管理层和技术团队,负责生产流程改进、IT基础设施建设以及智能制造转型的专业人士。 使用场景及目标:该方案旨在帮助企业提升生产效率,增强产品品质,降低成本;具体应用场景涵盖生产设备状态的实时监测、故障预警、预防性维护、生产过程自动化调节等,进而实现企业数字化转型,提高市场响应速度和服务质量。 其他说明:本文提到的具体技术和方法包括物联网(IoT)技术、边缘计算、云计算平台建设和利用,还有通过机器学习和大数据分析技术对生产工艺进行深度理解和优化等。

Global site tag (gtag.js) - Google Analytics