`

Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新版Java Client的使用要点,高可用性测试,以及各种坑(二)

阅读更多

上一节中(点此传送),我们完成了Kafka集群的搭建,本节中我们将介绍0.9版本中的新API,以及Kafka集群高可用性的测试

 

1. 使用Kafka的Producer API来完成消息的推送

 

1) Kafka 0.9.0.1的java client依赖:

	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.9.0.1</version>
	</dependency>

 

2) 写一个KafkaUtil工具类,用于构造Kafka Client

public class KafkaUtil {
	private static KafkaProducer<String, String> kp;

	public static KafkaProducer<String, String> getProducer() {
		if (kp == null) {
			Properties props = new Properties();
			props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
			props.put("acks", "1");
			props.put("retries", 0);
			props.put("batch.size", 16384);
			props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			kp = new KafkaProducer<String, String>(props);
		}
		return kp;
	}
}

  KafkaProducer<K,V>的K代表每条消息的key类型,V代表消息类型。消息的key用于决定此条消息由哪一个partition接收,所以我们需要保证每条消息的key是不同的。

  Producer端的常用配置

  • bootstrap.servers:Kafka集群连接串,可以由多个host:port组成
  • acks:broker消息确认的模式,有三种:
    0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
    1:由Leader确认,Leader接收到消息后会立即返回确认信息
    all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
    我们可以根据消息的重要程度,设置不同的确认模式。默认为1
  • retries:发送失败时Producer端的重试次数,默认为0
  • batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节
  • linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
  • key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定
  • buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)

  更多的Producer配置见官网:http://kafka.apache.org/documentation.html#producerconfigs

 

  3) 写一个简单的Producer端,每隔1秒向Kafka集群发送一条消息:

public class KafkaTest {
	public static void main(String[] args) throws Exception{
		Producer<String, String> producer = KafkaUtil.getProducer();
		int i = 0;
		while(true) {
			ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.valueOf(i), "this is message"+i);
			producer.send(record, new Callback() {
				public void onCompletion(RecordMetadata metadata, Exception e) {
					if (e != null)
						e.printStackTrace();
					System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
				}
			});
			i++;
			Thread.sleep(1000);
		}
	}
}

 

  在调用KafkaProducer的send方法时,可以注册一个回调方法,在Producer端完成发送后会触发回调逻辑,在回调方法的metadata对象中,我们能够获取到已发送消息的offset和落在的分区等信息。注意,如果acks配置为0,依然会触发回调逻辑,只是拿不到offset和消息落地的分区信息。

    跑一下,输出是这样的:

message send to partition 0, offset: 28
message send to partition 1, offset: 26
message send to partition 0, offset: 29
message send to partition 1, offset: 27
message send to partition 1, offset: 28
message send to partition 0, offset: 30
message send to partition 0, offset: 31
message send to partition 1, offset: 29
message send to partition 1, offset: 30
message send to partition 1, offset: 31
message send to partition 0, offset: 32
message send to partition 0, offset: 33
message send to partition 0, offset: 34
message send to partition 1, offset: 32

  乍一看似乎offset乱掉了,但其实这是因为消息分布在了两个分区上,每个分区上的offset其实是正确递增的。

 

4) 编写Consumer端来消费消息

  首先改造一下KafkaUtil类,加入Consumer client的构造。

public class KafkaUtil {
	private static KafkaProducer<String, String> kp;
	private static KafkaConsumer<String, String> kc;

	public static KafkaProducer<String, String> getProducer() {
		if (kp == null) {
			Properties props = new Properties();
			props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
			props.put("acks", "1");
			props.put("retries", 0);
			props.put("batch.size", 16384);
			props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
			kp = new KafkaProducer<String, String>(props);
		}
		return kp;
	}
	
	public static KafkaConsumer<String, String> getConsumer() {
		if(kc == null) {
			Properties props = new Properties();
			props.put("bootstrap.servers", "10.0.0.100:9092,10.0.0.101:9092");
			props.put("group.id", "1");
			props.put("enable.auto.commit", "true");
			props.put("auto.commit.interval.ms", "1000");
			props.put("session.timeout.ms", "30000");
			props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
			props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
			kc = new KafkaConsumer<String, String>(props);
		}
		return kc;
	}
}

  同样,我们介绍一下Consumer常用配置

  • bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含义一样,不再赘述
  • fetch.min.bytes:每次最小拉取的消息大小(byte)。Consumer会等待消息积累到一定尺寸后进行批量拉取。默认为1,代表有一条就拉一条
  • max.partition.fetch.bytes:每次从单个分区中拉取的消息最大尺寸(byte),默认为1M
  • group.id:Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息,不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。
  • enable.auto.commit:是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true
  • auto.commit.interval.ms:自动提交offset的间隔毫秒数,默认5000。

  全部的Consumer配置见官方文档:http://kafka.apache.org/documentation.html#newconsumerconfigs

 

  接下来编写Consumer端:

public class KafkaTest {
	public static void main(String[] args) throws Exception{
		KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
		consumer.subscribe(Arrays.asList("test"));
		while(true) {
			ConsumerRecords<String, String> records = consumer.poll(1000);
			for(ConsumerRecord<String, String> record : records) {
				System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
			}
		}
	}
}

 

  运行输出:

fetched from partition 0, offset: 28, message: this is message0
fetched from partition 0, offset: 29, message: this is message2
fetched from partition 0, offset: 30, message: this is message5
fetched from partition 0, offset: 31, message: this is message6
fetched from partition 0, offset: 32, message: this is message10
fetched from partition 0, offset: 33, message: this is message11
fetched from partition 0, offset: 34, message: this is message12
fetched from partition 1, offset: 26, message: this is message1
fetched from partition 1, offset: 27, message: this is message3
fetched from partition 1, offset: 28, message: this is message4
fetched from partition 1, offset: 29, message: this is message7
fetched from partition 1, offset: 30, message: this is message8
fetched from partition 1, offset: 31, message: this is message9
fetched from partition 1, offset: 32, message: this is message13

 

说明:

  • KafkaConsumer的poll方法即是从Broker拉取消息,在poll之前首先要用subscribe方法订阅一个Topic。
  • poll方法的入参是拉取超时毫秒数,如果没有新的消息可供拉取,consumer会等待指定的毫秒数,到达超时时间后会直接返回一个空的结果集。
  • 如果Topic有多个partition,KafkaConsumer会在多个partition间以轮询方式实现负载均衡。如果启动了多个Consumer线程,Kafka也能够通过zookeeper实现多个Consumer间的调度,保证同一组下的Consumer不会重复消费消息。注意,Consumer数量不能超过partition数,超出部分的Consumer无法拉取到任何数据。
  • 可以看出,拉取到的消息并不是完全顺序化的,kafka只能保证一个partition内的消息先进先出,所以在跨partition的情况下,消息的顺序是没有保证的。
  • 本例中采用的是自动提交offset,Kafka client会启动一个线程定期将offset提交至broker。假设在自动提交的间隔内发生故障(比如整个JVM进程死掉),那么有一部分消息是会被重复消费的。要避免这一问题,可使用手动提交offset的方式。构造consumer时将enable.auto.commit设为false,并在代码中用consumer.commitSync()来手动提交。

如果不想让kafka控制consumer拉取数据时在partition间的负载均衡,也可以手工控制:

	public static void main(String[] args) throws Exception{
		KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer();
	    String topic = "test";
	    TopicPartition partition0 = new TopicPartition(topic, 0);
	    TopicPartition partition1 = new TopicPartition(topic, 1);
	    consumer.assign(Arrays.asList(partition0, partition1));
		while(true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for(ConsumerRecord<String, String> record : records) {
				System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());
			}
			consumer.commitSync();
		}
	}

 使用consumer.assign()方法为consumer线程指定1个或多个partition。

 

  此处的坑:

在测试中我发现,如果用手工指定partition的方法拉取消息,不知为何kafka的自动提交offset机制会失效,必须使用手动方式才能正确提交已消费的消息offset。

 

  题外话:

在真正的应用环境中,Consumer端将消息拉取下来后要做的肯定不止是输出出来这么简单,在消费消息时很有可能需要花掉更多的时间。1个Consumer线程消费消息的速度很有可能是赶不上Producer产生消息的速度,所以我们不得不考虑Consumer端采用多线程模型来消费消息。
然而KafkaConsumer并不是线程安全的,多个线程操作同一个KafkaConsumer实例会出现各种问题,Kafka官方对于Consumer端的多线程处理给出的指导建议如下:

1. 每个线程都持有一个KafkaConsumer对象
好处:
  • 实现简单
  • 不需要线程间的协作,效率最高
  • 最容易实现每个Partition内消息的顺序处理

弊端:

  • 每个KafkaConsumer都要与集群保持一个TCP连接
  • 线程数不能超过Partition数
  • 每一batch拉取的数据量会变小,对吞吐量有一定影响

2. 解耦,1个Consumer线程负责拉取消息,数个Worker线程负责消费消息
好处:

  • 可自由控制Worker线程的数量,不受Partition数量限制

弊端:

  • 消息消费的顺序无法保证
  • 难以控制手动提交offset的时机

个人认为第二种方式更加可取,consumer数不能超过partition数这个限制是很要命的,不可能为了提高Consumer消费消息的效率而把Topic分成更多的partition,partition越多,集群的高可用性就越低。

 

 

2. Kafka集群高可用性测试

 

1) 查看当前Topic的状态:

/kafka-topics.sh --describe --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181 --topic test

  输出:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
   Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1

  可以看到,partition0的leader是broker1,parition1的leader是broker0

 

2) 启动Producer向Kafka集群发送消息

  输出:

message send to partition 0, offset: 35
message send to partition 1, offset: 33
message send to partition 0, offset: 36
message send to partition 1, offset: 34
message send to partition 1, offset: 35
message send to partition 0, offset: 37
message send to partition 0, offset: 38
message send to partition 1, offset: 36
message send to partition 1, offset: 37

 

3) 登录SSH将broker0,也就是partition 1的leader kill掉

 

  再次查看Topic状态:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
  Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1
  Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1

  可以看到,当前parition0和parition1的leader都是broker1了

 

  此时再去看Producer的输出:

message send to partition 1, offset: 38
message send to partition 0, offset: 39
message send to partition 0, offset: 40
message send to partition 0, offset: 41
message send to partition 1, offset: 39
message send to partition 1, offset: 40
message send to partition 0, offset: 42
message send to partition 0, offset: 43

 

  Producer端非常平稳的继续运行,完全没有任何异常产生(但实际上broker0挂掉后下一条消息的发送延迟了几秒),能够看出Kafka集群的故障切换机制还是很厉害的

 

4) 我们再把broker0启动起来

bin/kafka-server-start.sh -daemon config/server.properties 

  然后再次检查Topic状态:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
   Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0

  我们看到,broker0启动起来了,并且已经是in-sync状态(注意Isr从1变成了1,0),但此时两个partition的leader还都是broker1,也就是说当前broker1会承载所有的发送和拉取请求。这显然是不行的,我们要让集群恢复到负载均衡的状态。

  这时候,需要使用Kafka的选举工具触发一次选举:

bin/kafka-preferred-replica-election.sh --zookeeper 10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181

  选举完成后,再次查看Topic状态:

Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
   Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
   Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0

  可以看到,集群重新回到了broker0挂掉之前的状态

  但此时,Producer端产生了异常:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

  原因是Producer端在尝试向broker1的parition0发送消息时,partition0的leader已经切换成了broker0,所以消息发送失败。

  此时用Consumer去消费消息,会发现消息的编号不连续了,确实漏发了一条消息。这是因为我们在构造Producer时设定了retries=0,所以在发送失败时Producer端不会尝试重发。

  将retries改为3后再次尝试,会发现leader切换时再次发生了同样的问题,但Producer的重发机制起了作用,消息重发成功,启动Consumer端检查也证实了所有消息都发送成功了。

 

每次集群单点发生故障恢复后,都需要进行重新选举才能彻底恢复集群的leader分配,如果嫌每次这样做很麻烦,可以在broker的配置文件(即server.properties)中配置auto.leader.rebalance.enable=true,这样broker在启动后就会自动进行重新选举

 

 

至此,我们通过测试证实了集群出现单点故障和恢复的过程中,Producer端能够保持正确运转。接下来我们看一下Consumer端的表现:

 

5) 同时启动Producer进程和Consumer进程

  此时Producer一边在生产消息,Consumer一边在消费消息

 

6) 把broker0干掉,观察Consumer端的输出:

 

能看到,在broker0挂掉后,consumer端产生了一系列INFO和WARN输出,但若干秒后自动恢复,消息仍然是连续的,并未出现断点。

 

7) 再次把broker0启动,并触发重新选举,然后观察输出:

fetched from partition 0, offset: 418, message: this is message48
fetched from partition 0, offset: 419, message: this is message49
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group 1 failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483646 dead.
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed: This is not the correct coordinator for this group.
fetched from partition 1, offset: 392, message: this is message50
fetched from partition 0, offset: 420, message: this is message51

  能看到,重选举后Consumer端也输出了一些日志,意思是在提交offset时发现当前的调度器已经失效了,但很快就重新获取了新的有效调度器,恢复了offset的自动提交,验证已提交offset的值也证明了offset提交并未因leader切换而发生错误。

 

  如上,我们也通过测试证实了Kafka集群出现单点故障时,Consumer端的功能正确性。

 

  通过测试,我们完整验证了Kafka集群的高可用性。本文至此结束。

分享到:
评论

相关推荐

    kafka+zookeeper高可用集群搭建shell使用脚本

    【标题】"kafka+zookeeper高可用集群搭建shell使用脚本"所涉及的知识点主要集中在构建高可用的Kafka和Zookeeper集群上,以及如何利用Shell脚本来自动化这个过程。Kafka是一个分布式流处理平台,而Zookeeper是Apache...

    ELK+Filebeat+Kafka+ZooKeeper构建日志分析平台

    ELK+Filebeat+Kafka+ZooKeeper构建日志分析平台,架构图解

    集群环境搭建-Centos+kafka+zookeeper+hadoop+Spark

    通过VirtualBox安装多台虚拟机,实现集群环境搭建。 优势:一台电脑即可。 应用场景:测试,学习。...内附百度网盘下载地址,有hadoop+zookeeper+spark+kafka等等·····需要的安装包和配置文件

    zookeeper linux集群搭建流程

    ZooKeeper 的高可用性和高性能使其成为分布式系统的优选解决方案。 ZooKeeper Linux 集群搭建流程可以分为四步:下载和解压 ZooKeeper、创建目录和 myid 文件、修改 ZooKeeper 启动配置文件、启动 ZooKeeper 节点。...

    centos7下Redis哨兵集群和kafka集群和zookeeper集群搭建

    centos7下Redis哨兵集群和kafka集群和zookeeper集群搭建 http://blog.csdn.net/gaowenhui2008/article/details/71516901 https://cwiki.apache.org/confluence/display/KAFKA/Clients

    Hadoop+Hive+Spark+Kafka+Zookeeper+Flume+Sqoop+Azkaban+Scala

    基于 Zookeeper 搭建 Hadoop 高可用集群 二、Hive 简介及核心概念 Linux 环境下 Hive 的安装部署 CLI 和 Beeline 命令行的基本使用 常用 DDL 操作 分区表和分桶表 视图和索引 常用 DML 操作 数据查询详解 三、Spark ...

    徐老师大数据培训Hadoop+HBase+ZooKeeper+Spark+Kafka+Scala+Ambari

    根据提供的标题、描述、标签及部分内容链接,我们可以推断出这是一个关于大数据技术栈的培训课程,涉及的技术包括Hadoop、HBase、Zookeeper、Spark、Kafka、Scala以及Ambari。下面将针对这些技术进行详细的介绍和...

    spark环境安装(Hadoop HA+Hbase+phoneix+kafka+flume+zookeeper+spark+scala)

    本项目旨在搭建一套完整的Spark集群环境,包括Hadoop HA(高可用)、HBase、Phoenix、Kafka、Flume、Zookeeper以及Scala等多个组件的集成。这样的环境适用于大规模的数据处理与分析任务,能够有效地支持实时数据流...

    kafka2.4.0+zookeeper+kafka-connect集成环境包

    标题中的“kafka2.4.0+zookeeper+kafka-connect集成环境包”指的是一个包含了Apache Kafka 2.4.0版本、ZooKeeper以及Kafka Connect的完整集成环境。这个安装包是为了方便用户一次性安装和配置这三个关键组件,用于...

    kafka+zookeeper.zip

    标题中的"kafka+zookeeper.zip"表明这是一个关于Apache Kafka和Zookeeper的组合包,通常用于构建高效的消息队列系统,尤其在大数据处理场景中。Apache Kafka是一个分布式流处理平台,而Zookeeper是一个分布式协调...

    spring+zookeeper+kafka

    Zookeeper和Kafka则是大数据领域的两个重要组件,分别用于分布式协调和服务管理以及高吞吐量的消息传递。本文将深入探讨如何整合Spring、Zookeeper和Kafka,构建一个高效、可靠的分布式系统。 首先,让我们关注...

    zookeeper、kafka集群部署

    zookeeper配置、集群部署 kafka配置、集群部署 Window平台下

    kafka+zookeeper

    具体来说,Zookeeper存储了Kafka的Broker信息、Topic的分区信息和副本分配等,确保了Kafka的高可用性和一致性。 zookeeper-3.3.6.tar.gz这个文件是Zookeeper的3.3.6版本源码包,包含了Zookeeper服务器端和客户端的...

    centos7下kafka和zookeeper和Redis集群搭建.zip

    本教程将详述如何在CentOS7操作系统中搭建Kafka、Zookeeper和Redis的集群环境,以实现高效、稳定的分布式数据处理和存储。 **Zookeeper简介** Apache ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,...

    zookeeper-3.4.6.zip

    - **一致性**:Zookeeper 提供了一致性模型,所有客户端看到的数据视图是一致的,确保了数据的高可用性。 - **原子性**:所有的操作(如读取和写入)都是原子的,即要么成功,要么失败,不存在部分执行的情况。 ...

    Linux下的Kafka+Zookeeper使用以及配置

    总的来说,掌握Linux下Kafka和Zookeeper的安装、配置和使用是构建高可用、高性能的数据流平台的基础。这需要对分布式系统、Java编程以及相关的数据处理概念有一定的了解。通过实践和不断学习,你可以熟练地利用Kafka...

    kafka0.8.1+zookeeper3.4.9+storm0.9.1

    搭建storm+kafka+zookeeper环境所需要的三个环境组件,里面的参数已经配置完成,你只需要把对应的集群的ipHost换成你搭建的ip即可,已经成功搭建并测试,如果有疑问可以访问本人的博客,里面有详细的配置可供参考

    kafka+zookeeper+简单demo

    Kafka是一种高吞吐量、低延迟的分布式消息系统,而ZooKeeper则是一个集中式服务,用于分布式应用程序的配置管理、命名服务、集群同步等。在这个"Kafka + ZooKeeper + 简单demo"中,我们将深入理解这两个工具的结合...

    spark+hadoop+kafka+zookeeper 大数据平台搭建脚本

    spark+hadoop+zookeeper 大数据平台搭建脚本,亲测通过,适用于大数据初学者在虚拟机玩

Global site tag (gtag.js) - Google Analytics