上一节中(点此传送),我们完成了Kafka集群的搭建,本节中我们将介绍0.9版本中的新API,以及Kafka集群高可用性的测试
1. 使用Kafka的Producer API来完成消息的推送
1) Kafka 0.9.0.1的java client依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>
2) 写一个KafkaUtil工具类,用于构造Kafka Client
public class KafkaUtil { private static KafkaProducer<String, String> kp; public static KafkaProducer<String, String> getProducer() { if (kp == null) { Properties props = new Properties(); props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kp = new KafkaProducer<String, String>(props); } return kp; } }
KafkaProducer<K,V>的K代表每条消息的key类型,V代表消息类型。消息的key用于决定此条消息由哪一个partition接收,所以我们需要保证每条消息的key是不同的。
Producer端的常用配置
- bootstrap.servers:Kafka集群连接串,可以由多个host:port组成
- acks:broker消息确认的模式,有三种:
0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
1:由Leader确认,Leader接收到消息后会立即返回确认信息
all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
我们可以根据消息的重要程度,设置不同的确认模式。默认为1 - retries:发送失败时Producer端的重试次数,默认为0
- batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节
- linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
- key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定
- buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)
更多的Producer配置见官网:http://kafka.apache.org/documentation.html#producerconfigs
3) 写一个简单的Producer端,每隔1秒向Kafka集群发送一条消息:
public class KafkaTest { public static void main(String[] args) throws Exception{ Producer<String, String> producer = KafkaUtil.getProducer(); int i = 0; while(true) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) e.printStackTrace(); System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset()); } }); i++; Thread.sleep(1000); } } }
在调用KafkaProducer的send方法时,可以注册一个回调方法,在Producer端完成发送后会触发回调逻辑,在回调方法的metadata对象中,我们能够获取到已发送消息的offset和落在的分区等信息。注意,如果acks配置为0,依然会触发回调逻辑,只是拿不到offset和消息落地的分区信息。
跑一下,输出是这样的:
message send to partition 1, offset: 26
message send to partition 0, offset: 29
message send to partition 1, offset: 27
message send to partition 1, offset: 28
message send to partition 0, offset: 30
message send to partition 0, offset: 31
message send to partition 1, offset: 29
message send to partition 1, offset: 30
message send to partition 1, offset: 31
message send to partition 0, offset: 32
message send to partition 0, offset: 33
message send to partition 0, offset: 34
message send to partition 1, offset: 32
乍一看似乎offset乱掉了,但其实这是因为消息分布在了两个分区上,每个分区上的offset其实是正确递增的。
4) 编写Consumer端来消费消息
首先改造一下KafkaUtil类,加入Consumer client的构造。
public class KafkaUtil { private static KafkaProducer<String, String> kp; private static KafkaConsumer<String, String> kc; public static KafkaProducer<String, String> getProducer() { if (kp == null) { Properties props = new Properties(); props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kp = new KafkaProducer<String, String>(props); } return kp; } public static KafkaConsumer<String, String> getConsumer() { if(kc == null) { Properties props = new Properties(); props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092"); props.put("group.id", "1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kc = new KafkaConsumer<String, String>(props); } return kc; } }
同样,我们介绍一下Consumer常用配置
- bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含义一样,不再赘述
- fetch.min.bytes:每次最小拉取的消息大小(byte)。Consumer会等待消息积累到一定尺寸后进行批量拉取。默认为1,代表有一条就拉一条
- max.partition.fetch.bytes:每次从单个分区中拉取的消息最大尺寸(byte),默认为1M
- group.id:Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息,不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。
- enable.auto.commit:是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true
- auto.commit.interval.ms:自动提交offset的间隔毫秒数,默认5000。
全部的Consumer配置见官方文档:http://kafka.apache.org/documentation.html#newconsumerconfigs
接下来编写Consumer端:
public class KafkaTest { public static void main(String[] args) throws Exception{ KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(); consumer.subscribe(Arrays.asList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(1000); for(ConsumerRecord<String, String> record : records) { System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value()); } } } }
运行输出:
fetched from partition 0, offset: 29, message: this is message2
fetched from partition 0, offset: 30, message: this is message5
fetched from partition 0, offset: 31, message: this is message6
fetched from partition 0, offset: 32, message: this is message10
fetched from partition 0, offset: 33, message: this is message11
fetched from partition 0, offset: 34, message: this is message12
fetched from partition 1, offset: 26, message: this is message1
fetched from partition 1, offset: 27, message: this is message3
fetched from partition 1, offset: 28, message: this is message4
fetched from partition 1, offset: 29, message: this is message7
fetched from partition 1, offset: 30, message: this is message8
fetched from partition 1, offset: 31, message: this is message9
fetched from partition 1, offset: 32, message: this is message13
说明:
- KafkaConsumer的poll方法即是从Broker拉取消息,在poll之前首先要用subscribe方法订阅一个Topic。
- poll方法的入参是拉取超时毫秒数,如果没有新的消息可供拉取,consumer会等待指定的毫秒数,到达超时时间后会直接返回一个空的结果集。
- 如果Topic有多个partition,KafkaConsumer会在多个partition间以轮询方式实现负载均衡。如果启动了多个Consumer线程,Kafka也能够通过zookeeper实现多个Consumer间的调度,保证同一组下的Consumer不会重复消费消息。注意,Consumer数量不能超过partition数,超出部分的Consumer无法拉取到任何数据。
- 可以看出,拉取到的消息并不是完全顺序化的,kafka只能保证一个partition内的消息先进先出,所以在跨partition的情况下,消息的顺序是没有保证的。
- 本例中采用的是自动提交offset,Kafka client会启动一个线程定期将offset提交至broker。假设在自动提交的间隔内发生故障(比如整个JVM进程死掉),那么有一部分消息是会被重复消费的。要避免这一问题,可使用手动提交offset的方式。构造consumer时将enable.auto.commit设为false,并在代码中用consumer.commitSync()来手动提交。
如果不想让kafka控制consumer拉取数据时在partition间的负载均衡,也可以手工控制:
public static void main(String[] args) throws Exception{ KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(); String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecord<String, String> record : records) { System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value()); } consumer.commitSync(); } }
使用consumer.assign()方法为consumer线程指定1个或多个partition。
此处的坑:
题外话:
然而KafkaConsumer并不是线程安全的,多个线程操作同一个KafkaConsumer实例会出现各种问题,Kafka官方对于Consumer端的多线程处理给出的指导建议如下:
1. 每个线程都持有一个KafkaConsumer对象
好处:
- 实现简单
- 不需要线程间的协作,效率最高
- 最容易实现每个Partition内消息的顺序处理
弊端:
- 每个KafkaConsumer都要与集群保持一个TCP连接
- 线程数不能超过Partition数
- 每一batch拉取的数据量会变小,对吞吐量有一定影响
2. 解耦,1个Consumer线程负责拉取消息,数个Worker线程负责消费消息
好处:
- 可自由控制Worker线程的数量,不受Partition数量限制
弊端:
- 消息消费的顺序无法保证
- 难以控制手动提交offset的时机
个人认为第二种方式更加可取,consumer数不能超过partition数这个限制是很要命的,不可能为了提高Consumer消费消息的效率而把Topic分成更多的partition,partition越多,集群的高可用性就越低。
2. Kafka集群高可用性测试
1) 查看当前Topic的状态:
/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test
输出:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
可以看到,partition0的leader是broker1,parition1的leader是broker0
2) 启动Producer向Kafka集群发送消息
输出:
message send to partition 1, offset: 33
message send to partition 0, offset: 36
message send to partition 1, offset: 34
message send to partition 1, offset: 35
message send to partition 0, offset: 37
message send to partition 0, offset: 38
message send to partition 1, offset: 36
message send to partition 1, offset: 37
3) 登录SSH将broker0,也就是partition 1的leader kill掉
再次查看Topic状态:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1
Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1
可以看到,当前parition0和parition1的leader都是broker1了
此时再去看Producer的输出:
message send to partition 0, offset: 39
message send to partition 0, offset: 40
message send to partition 0, offset: 41
message send to partition 1, offset: 39
message send to partition 1, offset: 40
message send to partition 0, offset: 42
message send to partition 0, offset: 43
Producer端非常平稳的继续运行,完全没有任何异常产生(但实际上broker0挂掉后下一条消息的发送延迟了几秒),能够看出Kafka集群的故障切换机制还是很厉害的
4) 我们再把broker0启动起来
bin/kafka-server-start.sh -daemon config/server.properties
然后再次检查Topic状态:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
我们看到,broker0启动起来了,并且已经是in-sync状态(注意Isr从1变成了1,0),但此时两个partition的leader还都是broker1,也就是说当前broker1会承载所有的发送和拉取请求。这显然是不行的,我们要让集群恢复到负载均衡的状态。
这时候,需要使用Kafka的选举工具触发一次选举:
bin/kafka-preferred-replica-election.sh --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
选举完成后,再次查看Topic状态:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0
可以看到,集群重新回到了broker0挂掉之前的状态
但此时,Producer端产生了异常:
原因是Producer端在尝试向broker1的parition0发送消息时,partition0的leader已经切换成了broker0,所以消息发送失败。
此时用Consumer去消费消息,会发现消息的编号不连续了,确实漏发了一条消息。这是因为我们在构造Producer时设定了retries=0,所以在发送失败时Producer端不会尝试重发。
将retries改为3后再次尝试,会发现leader切换时再次发生了同样的问题,但Producer的重发机制起了作用,消息重发成功,启动Consumer端检查也证实了所有消息都发送成功了。
至此,我们通过测试证实了集群出现单点故障和恢复的过程中,Producer端能够保持正确运转。接下来我们看一下Consumer端的表现:
5) 同时启动Producer进程和Consumer进程
此时Producer一边在生产消息,Consumer一边在消费消息
6) 把broker0干掉,观察Consumer端的输出:
能看到,在broker0挂掉后,consumer端产生了一系列INFO和WARN输出,但若干秒后自动恢复,消息仍然是连续的,并未出现断点。
7) 再次把broker0启动,并触发重新选举,然后观察输出:
fetched from partition 0, offset: 419, message: this is message49
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group 1 failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483646 dead.
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed: This is not the correct coordinator for this group.
fetched from partition 1, offset: 392, message: this is message50
fetched from partition 0, offset: 420, message: this is message51
能看到,重选举后Consumer端也输出了一些日志,意思是在提交offset时发现当前的调度器已经失效了,但很快就重新获取了新的有效调度器,恢复了offset的自动提交,验证已提交offset的值也证明了offset提交并未因leader切换而发生错误。
如上,我们也通过测试证实了Kafka集群出现单点故障时,Consumer端的功能正确性。
通过测试,我们完整验证了Kafka集群的高可用性。本文至此结束。
相关推荐
模具状态监测市场:6.8%的年复合增长率引领制造业智能化升级 在快速发展的制造业中,模具作为生产过程中的核心部件,其状态直接影响到产品的质量和生产效率。然而,模具的损耗和故障往往难以预测,给企业带来不小的损失。如今,随着模具状态监测技术的兴起,这一切正在发生改变。这项创新技术不仅能够帮助企业提前发现模具的潜在问题,还能显著延长模具的使用寿命,提升生产效率。但你真的了解这个市场的潜力和现状吗?让我们一同揭开模具状态监测市场的神秘面纱。 市场概况: 根据QYR(恒州博智)的统计,2023年全球模具状态监测市场的销售额已经达到了3.2亿美元,预计到2030年,这一数字将攀升至5.06亿美元,年复合增长率高达6.8%。这一显著的增长背后,是制造业对智能化、自动化生产需求的不断提升,以及模具状态监测技术在提高生产效率、降低维护成本方面的显著优势。 技术创新与趋势: 模具状态监测技术主要依赖于传感器、数据分析和处理等技术手段,能够实时采集模具的温度、振动、压力等指标,并通过与预设参数的比对,及时识别模具的异常情况。随着物联网、大数据和人工智能等技术的不断发展,模具状态监测技术将更加智能化,能够提供
Kubernetes DevOps实践工作坊-从理论到实战操作脚本集(含源码).zip [资源说明] 1、该项目是团队成员近期最新开发,代码完整,资料齐全,含设计文档等 2、上传的项目源码经过严格测试,功能完善且能正常运行,请放心下载使用! 3、本项目适合计算机相关专业(人工智能、通信工程、自动化、电子信息、物联网等)的高校学生、教师、科研工作者、行业从业者下载使用,可借鉴学习,也可直接作为毕业设计、课程设计、作业、项目初期立项演示等,也适合小白学习进阶,遇到问题不懂就问,欢迎交流。 4、如果基础还行,可以在此代码基础上进行修改,以实现其他功能,也可直接用于毕设、课设、作业等。 5、不懂配置和运行,可远程教学 欢迎下载,学习使用!
基于springboot+vue3+uniapp的点餐小程序源代码+数据库+文档说明(高分毕设),个人经导师指导并认可通过的毕业设计项目,评审分98分,项目中的源码都是经过本地编译过可运行的,都经过严格调试,确保可以运行!主要针对计算机相关专业的正在做毕业设计的学生和需要项目实战练习的学习者,资源项目的难度比较适中,内容都是经过助教老师审定过的能够满足学习、使用需求,如果有需要的话可以放心下载使用。 基于springboot+vue3+uniapp的点餐小程序源代码+数据库+文档说明(高分毕设)基于springboot+vue3+uniapp的点餐小程序源代码+数据库+文档说明(高分毕设)基于springboot+vue3+uniapp的点餐小程序源代码+数据库+文档说明(高分毕设)基于springboot+vue3+uniapp的点餐小程序源代码+数据库+文档说明(高分毕设)基于springboot+vue3+uniapp的点餐小程序源代码+数据库+文档说明(高分毕设)基于springboot+vue3+uniapp的点餐小程序源代码+数据库+文档说明(高分毕设)基于springb
欧姆龙NX1P2系列总线plc程序 自动检测机,plc程序,无触摸屏程序 1.多工位DDR马达转盘控制,多工位同时加工。 2.多产品配方功能程序。 3.各种实用型自制功能块程序,可重复调用,成熟设备
企业微信最全养号、防封、加人机制.pdf
这是一款用 Python 开发的异步爬虫框架,能够将网站上的数据转化成 Markdown、JSON 等 LLM 友好的输出格式。它完全开源且免费,极大地简化了异步爬虫的编写。相比于付费的 Firecrawl,它具有更快的爬取速度,支持同时抓取多个 URL、页面截图、关键字优化提取(基于 LLM)和复杂的多页面会话管理等功能。
毕设Python春节电影信息爬取与可视化分析源码+项目说明+全部资料.zip [资源说明] 1、该项目是团队成员近期最新开发,代码完整,资料齐全,含设计文档等 2、上传的项目源码经过严格测试,功能完善且能正常运行,请放心下载使用! 3、本项目适合计算机相关专业(人工智能、通信工程、自动化、电子信息、物联网等)的高校学生、教师、科研工作者、行业从业者下载使用,可借鉴学习,也可直接作为毕业设计、课程设计、作业、项目初期立项演示等,也适合小白学习进阶,遇到问题不懂就问,欢迎交流。 4、如果基础还行,可以在此代码基础上进行修改,以实现其他功能,也可直接用于毕设、课设、作业等。 5、不懂配置和运行,可远程教学 欢迎下载,学习使用!
2019厦门国际银行数创金融杯源码+竞赛策略报告文档.zip [资源说明] 1、该项目是团队成员近期最新开发,代码完整,资料齐全,含设计文档等 2、上传的项目源码经过严格测试,功能完善且能正常运行,请放心下载使用! 3、本项目适合计算机相关专业(人工智能、通信工程、自动化、电子信息、物联网等)的高校学生、教师、科研工作者、行业从业者下载使用,可借鉴学习,也可直接作为毕业设计、课程设计、作业、项目初期立项演示等,也适合小白学习进阶,遇到问题不懂就问,欢迎交流。 4、如果基础还行,可以在此代码基础上进行修改,以实现其他功能,也可直接用于毕设、课设、作业等。 5、不懂配置和运行,可远程教学 欢迎下载,学习使用!
基于Android Studio开发的安卓的记事本app项目源码(高分期末大作业),个人经导师指导并认可通过的毕业设计项目,评审分98分,项目中的源码都是经过本地编译过可运行的,都经过严格调试,确保可以运行!主要针对计算机相关专业的正在做毕业设计的学生和需要项目实战练习的学习者,资源项目的难度比较适中,内容都是经过助教老师审定过的能够满足学习、使用需求,如果有需要的话可以放心下载使用。 基于Android Studio开发的安卓的记事本app项目源码(高分期末大作业)基于Android Studio开发的安卓的记事本app项目源码(高分期末大作业)基于Android Studio开发的安卓的记事本app项目源码(高分期末大作业)基于Android Studio开发的安卓的记事本app项目源码(高分期末大作业)基于Android Studio开发的安卓的记事本app项目源码(高分期末大作业)基于Android Studio开发的安卓的记事本app项目源码(高分期末大作业)基于Android Studio开发的安卓的记事本app项目源码(高分期末大作业)基于Android Studio开
基于java的小区智能卡管理系统设计与实现.docx
NLP中文垃圾短信分类系统源码+设计全部资料+文档报告(自然语言处理课设).zip [资源说明] 1、该项目是团队成员近期最新开发,代码完整,资料齐全,含设计文档等 2、上传的项目源码经过严格测试,功能完善且能正常运行,请放心下载使用! 3、本项目适合计算机相关专业(人工智能、通信工程、自动化、电子信息、物联网等)的高校学生、教师、科研工作者、行业从业者下载使用,可借鉴学习,也可直接作为毕业设计、课程设计、作业、项目初期立项演示等,也适合小白学习进阶,遇到问题不懂就问,欢迎交流。 4、如果基础还行,可以在此代码基础上进行修改,以实现其他功能,也可直接用于毕设、课设、作业等。 5、不懂配置和运行,可远程教学 欢迎下载,学习使用!
电源滤波器车辆状态估计,扩展卡尔曼滤波EKF,无迹卡尔曼滤波UKF车辆状态估计,扩展卡尔曼滤波EKF,无迹卡尔曼滤波UKF 角阶跃输入+整车7自由度模型+UKF状态估计模型+附送EKF状态估计模型,针对于轮毂电机分布式驱动车辆,进行车速,质心侧偏角,横摆角速度估计。 模型输入:方向盘转角delta,车辆纵向加速度ax 模型输出:横摆角速度wz,纵向车速vx,质心侧偏角β