Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有以下不同:
- 它被设计为一个分布式系统,易于向外扩展;
- 它同时为发布和订阅提供高吞吐量;
- 它支持多订阅者,当失败时能自动平衡消费者;
- 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
我们将通过几个case学习Kafka的功能。
这几个case是我在工作中解决一个问题而对Kafka的功能进行的调研。另外还做了性能的测试,在本文中没有提到。
- 分区
- case1: 同一个group的consumer并发处理消息
- case2: 关闭一个consumer,消息会继续由另一个consumer处理
- case3: 恢复consumer,会balance回来
- offset
- case4: auto.commit = true
- case5: offset out of range
- case6: auto.commit = false
- delay
- case7: delay功能
- case8: 容错,是否会丢失数据
启动Kafka boker,创建Topic
启动kafka:
1
|
bin/kafka-server-start.sh config/server.properties
|
查看topic list:
1
|
bin/kafka-topics.sh --list --zookeeper localhost:2181
|
查看特定的topic
1
|
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic colobu
|
创建topic:
1
|
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
|
High-level consumer实现
Producer实现
case 1
启动consumer 1:
1
|
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 grouo1 colobu 2 > 1.txt
|
启动consumer 2:
1
|
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 8 > 2.txt
|
发送 10000 个消息:
1
|
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 10000 colobu
|
查看每个consumer处理的messsage数量:
1
2
3
4
5
|
cat 1.txt|wc -l
2024
cat 2.txt|wc -l
7976
|
总数为 2024+7976=10000, 没有message丢失, 处理比大致为2024:7976 = 1:4,和我们的预想是一样的。
case 2 & case 3:
启动consumer 1:
1
|
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 grouo1 colobu 5 > 1.txt
|
启动consumer 2:
1
|
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 5 > 2.txt
|
发送 1000000 个消息:
1
|
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 500000 colobu
|
CTRL + C
停掉 consumer1, 可以看到 consumer2 继续工作, consumer1停止了工作:
1
2
3
|
tail -f 1.txt
tail -f 2.txt
|
重新启动 consumer1,将处理:
1
|
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 5 >> 1.txt
|
可以看到 consumer1 和 consumer2 继续工作:
1
|
tail -f 1.txt
|
等处理完我们检查一下consumer1和consumer2处理的消息总数,看是否有消息丢失:
1
2
|
cat *.txt|wc -l
500504
|
看起来我们处理的消息似乎多了一些,相信这是在consumer1重启的时候consumer2的offset还没来得及commit。
case 4
offset的值保存在zookeeper的如下节点中:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
可以看到不同的topic不同的分区有着自己的offset, 并且不同组分别拥有自己独立的offset。
这里有一个简单的工具可以监控offset: KafkaOffsetMonitor,
运行下面的命令就可以启动一个web界面来监控:
1
2
3
4
5
6
|
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk zk-server1,zk-server2 \
--port 8080 \
--refresh 10.seconds \
--retain 2.days
|
上面我们已经做了实验, 所以在zookeeper应该有offset的值,你可以用zkCli.sh或者上面的命令查询一下:
1
2
|
get /consumers/group1/offsets/colobu/3
39919
|
因为auto.commit.enable
默认为true, consumer会定时地将offset写入到zookeeper上(时间间隔由auto.commit.interval.ms
决定,默认1分钟).
case 5
根据Kafka的文档, 参数auto.offset.reset
的功能如下所示:
what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer
如果offset is out of range, 这个参数决定了comsumer的行为:
smallest
, 将offset设为当前所能用的最小的offset。 注意不一定是0。largest
, 将offset设为当前可用的最大的offset。也就是consumer将只处理最新写入的消息。 默认值。
其它, 校验失败,抛出kafka.common.InvalidConfigException异常。 看起来只能设置这两个值
我们可以将zookeeper的 /consumers
节点删除::
1
|
[zk: localhost:2181(CONNECTED) 15] rmr /consumers
|
然后将的代码中加入一行
1
|
props.put("auto.offset.reset", "smallest");
|
或者
..
1
2
|
props.put("auto.offset.reset", "largest");
|
如果设置为largest
,可以看到consumer不会处理以前发送的消息,只会处理新进的消息。
如果设置为smallest
,consumer从头开始所有的消息。
case 6
默认情况下auto.commit.enable
等于true
,这也就意味着consumer会定期的commit offset。 前面也介绍了, zookeeper中节点中记录这这些offset。
我们也可以手工进行commit。
首先我们先将zookeeper中把/consumers节点删除掉。
然后在ConsumerExample.java
中增加下面一行:
1
|
props.put("auto.commit.enable", "false");
|
然后启动consumer:
1
|
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 10
|
过几分钟检查zookeeper的/consumers/group1节点,发现并没有offsets节点。
这符合我们的期望,因为我们将自动提交设为false。
修改ConsumerTest.java
的run
方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
int count = 0;
while (it.hasNext()) {
//格式: 处理时间,线程编号:消息
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + new String(it.next().message()));
count++;
if (count == 100) {
consumer.commitOffsets();
count = 0;
}
}
System.out.println("Shutting down Thread: " + threadNumber);
}
|
当处理了100个消息后就commit offset一次。
启动consumer,只运行一个线程,这样我们可以直观的感受到commit.如果多个线程, 我们不能精确确定应该发多少个消息才能使某个线程处理100个消息。
1
|
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 1
|
先用producer发送99个消息,
1
|
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 99 colobu
|
查看zookeeper的节点/consumers/group1下依然没有offsets节点。
再发送一个消息,
1
|
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 1 colobu
|
可以看到zookeeper的节点/consumers/group1下增加了offsets节点。
素以如果你有必要,你也可以手工控制offset的commit的时机。
case 7
这是我现在想利用Kafka实现的一个分布式DelayQueue的功能。
在我的文章跟着实例学习ZooKeeper的用法:讲到,利用zookeeper可以实现Distributed Delay Queue,但是Curator的作者也讲了, zookeeper真心不适合做queue,尤其在数据量很大的情况。
可以利用redis的sorted set实现: Delay queue in redis,
或者使用其它的一些框架实现。
这里我利用Kafka实现这样的一个功能。
实现方式大概和这个帖子相同: Delayed Queue
Each produced message should have a timestamp at which it was pushed to the queue. At the
consumer side, fetch a message from a partition and compare the message timestamp with system's timestamp to see if enough time has passed for you
to process the message. If enough time has passed, process the message and commit the message's offset otherwise make sure you do not commit the
offset.
我们将ConsumerTest
的run方法修改如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
//格式: 处理时间,线程编号:消息
String msg = new String(it.next().message());
long t = System.currentTimeMillis() - Long.parseLong(msg.substring(0, msg.indexOf(",")));
if (t < 5 * 60 *1000) {
try {
Thread.currentThread().sleep(5 * 60 *1000 -t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg);
}
System.out.println("Shutting down Thread: " + threadNumber);
}
|
然后用producer发送100消息:
1
|
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 100 colobu
|
可以看到consumer5分钟后才处理这100个数据。
看起来方案可行。
case 8
注意offset是自动commit的。 在上面的例子中,如果sleep的过程中consumer重启,是否会有message丢失或者重复处理呢?
Kafka默认往zookeeper上写offset的频率是10秒。
查看Kafka的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
import kafka.message.{MessageAndOffset, MessageAndMetadata}
import kafka.common.{KafkaException, MessageSizeTooLargeException}
/**
* An iterator that blocks until a value can be read from the supplied queue.
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
item
}
......
}
|
只有在next获取下一个消息的情况下才会更新consumedOffset,这样的话最多只有一个消息被读取还没来得及处理就因为程序crash而丢掉了。 在我们可以接受的范围之内。
其它
InfoQ网站有几篇Kafka文章挺好的,如
转载自
http://colobu.com/2015/03/12/kafka-in-practice/
相关推荐
### CentOS 8 安装 Kafka 2.11-2.4.1 详细步骤与注意事项 #### 一、安装前的基础环境准备 在正式安装 Kafka 之前,确保已准备好以下基础环境: 1. **系统版本确认**:确认当前使用的操作系统为 CentOS 8。 2. **...
test_case_kafka 这里介绍了如何在windows下安装kakfa,并且提供了单元测试程序进行生产消费。 1.解压kafka_2.12-1.0.1 2.修改kafka_2.12-1.0.1\config\zookeeper.properties文件 dataDir=D:\zookeeperlog 3.修改...
根据提供的文件信息,本文将详细解析“Spark Streaming与Kafka的整合”这一主题,并结合代码片段探讨其在实际场景中的应用。尽管标签中提到“数学建模”,但从标题和描述来看,这部分内容与数学建模无关,因此我们将...
自定义Kafka Producer和Consumer应用程序。 文章: : 目标: 创建自己的CustomConsumer。 创建自己的CustomProducer,以使用Twitter API自动生成并向主题发送至少10条消息。 使用自定义名称空间,以避免与其他...
We also focused on making the book general and comprehensive enough so it will be useful to anyone using Kafka, no matter the use case or architecture. We cover practical matters such as how to ...
在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并确保消息可以被消费者消费。而Topic则是消息的分类单位,代表了相同类型消息的集合。为了提高...
这个库的主要目标是让 Scala 开发者能更自然地使用 Kafka Streams 功能,同时保持与原生 Java API 的紧密兼容性。 Kafka Streams 是 Apache Kafka 的一个模块,它允许开发者以流处理的方式对数据进行实时分析和处理...
`kafka-streams-scala`是一个项目,它为Apache Kafka的Kafka Streams API提供了轻量级的Scala封装,使得Scala开发者可以更自然、更便捷地利用Kafka Streams的功能。Kafka Streams是一个用于在Java和Scala应用程序中...
在分布式消息系统中,Apache Kafka 是一个至关重要的组件,它提供了高效、可扩展的消息传递功能。而序列化和反序列化是Kafka处理数据的关键环节,确保数据在生产者和消费者之间正确地转换和传输。`kafka-...
在分布式消息系统中,Apache Kafka 是一个非常关键的组件,用于构建实时数据管道和流处理应用。在Kafka中,消息通常需要进行序列化和反序列化,以便在网络中传输和存储。`kafka-serde-scala` 库提供了一个方便的解决...
【Spark大数据习题】...总的来说,这个习题集覆盖了Scala语言基础、Spark核心功能、SQL查询、实时数据处理(Kafka)、大数据存储(HBase)和数据仓库(Hive)等多个关键知识点,是学习和掌握大数据技术栈的良好资源。
- **运算符与控制流:** 学习算术运算符、关系运算符、逻辑运算符等,并熟悉if-else、switch-case、循环(for、while、do-while)等控制结构的使用。 - **数组:** 学习一维数组和多维数组的声明、初始化和访问...
【Java后端学习材料大全】是一...通过深入学习这些知识点,并结合实际项目实践,你将能够提升自己的Java后端开发技能,成为一名合格甚至出色的Java开发者。这份【Java后端学习材料大全】无疑是你学习和成长的宝贵资源。
- **参数调整**:通过调整`Spark.streaming.kafka.maxRatePerPartition`参数,可以控制每个Kafka分区的数据消费速率,从而平衡性能和资源使用。 ##### CASE-05 数据清洗 在处理收集的行为数据时,遇到了文本数据...
Java学习体系是一个全面而深入的过程,它涵盖了编程基础、面向对象概念、类库与框架、并发编程、网络编程、数据库交互等多个方面。以下是对这个学习体系的详细解析: 1. **编程基础**:首先,你需要掌握Java语言的...
Java是世界上最流行的编程语言之一,尤其在企业级应用开发中占据主导地位。"JAVA核心知识点整理.zip"这个压缩包文件包含了一系列关于Java编程的重要...通过深入学习和实践,可以显著提升个人在Java开发领域的专业技能。
它通过将实时数据流切分成一系列的小批量数据,然后对每个小批量数据进行处理,从而实现流式计算。这种方式使得 Spark Streaming 能够利用 Spark 的核心引擎来高效地处理大规模数据流。 Spark Streaming 支持多种...
国内各大互联网公司落地大规模 RabbitMQ 集群支撑自身业务的 case 较多,国内各种中小型互联网公司使用 RabbitMQ 的实践也比较多。 RabbitMQ 的缺点 RabbitMQ 的缺点是基于 Erlang 语言开发的,导致较难分析里面的...
Java编程语言是软件开发领域的重要组成部分,尤其在企业级...学习者可以通过这个资源逐步掌握Java开发的全貌,并提升到专业水平。记得理论学习与实践相结合,不断探索和解决实际问题,才能在Java开发道路上走得更远。