Kafka 学习笔记
原创编写: 王宇
2016-10-24
消息系统(Messaging System)
Kafka 结构
集群结构(Cluster Architecture)
WorkFlow
安装步骤
Kafka 基本操作
Simple Producer Example
Simple Consumer Example
参考
消息系统(Messaging System)
- Point to Point Messaging System
- Publish-Subscribe Messaging System
两者最主要的区别是 Publish-Subscribe 模式支持多 Receiver
Kafka 结构
Topics | A stream of messages belonging to a particular category is called a topic. Data is stored in topics.Topics are split into partitions. For each topic, Kafka keeps a mini-mum of one partition. Each such partition contains messages in an immutable ordered sequence. A partition is implemented as a set of segment files of equal sizes. |
Partition | Topics may have many partitions, so it can handle an arbitrary amount of data. |
Partition offset | Each partitioned message has a unique sequence id called as offset. |
Replicas of partition | Replicas are nothing but backups of a partition. Replicas are never read or write data. They are used to prevent data loss. |
Brokers | Brokers are simple system responsible for maintaining the pub-lished data. Each broker may have zero or more partitions per topic. Assume, if there are N partitions in a topic and N number of brokers, each broker will have one partition.Assume if there are N partitions in a topic and more than N brokers (n + m), the first N broker will have one partition and the next M broker will not have any partition for that particular topic.Assume if there are N partitions in a topic and less than N brokers (n-m), each broker will have one or more partition sharing among them. This scenario is not recommended due to unequal load distri-bution among the broker. |
Kafka cluster | Kafka’s having more than one broker are called as Kafka cluster. A Kafka cluster can be expanded without downtime. These clusters are used to manage the persistence and replication of message data. |
Producers | Producers are the publisher of messages to one or more Kafka topics. Producers send data to Kafka brokers. Every time a producer pub-lishes a message to a broker, the broker simply appends the message to the last segment file. Actually, the message will be appended to a partition. Producer can also send messages to a partition of their choice. |
Consumers | Consumers read data from brokers. Consumers subscribes to one or more topics and consume published messages by pulling data from the brokers. |
Leader | Leader is the node responsible for all reads and writes for the given partition. Every partition has one server acting as a leader. |
Follower | Node which follows leader instructions are called as follower. If the leader fails, one of the follower will automatically become the new leader.A follower acts as normal consumer, pulls messages and up-dates its own data store. |
集群结构(Cluster Architecture)
Broker | Kafka cluster typically consists of multiple brokers to maintain load balance. Kafka brokers are stateless, so they use ZooKeeper for maintaining their cluster state. One Kafka broker instance can handle hundreds of thousands of reads and writes per second and each bro-ker can handle TB of messages without performance impact. Kafka broker leader election can be done by ZooKeeper. |
ZooKeeper | ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker then pro-ducer and consumer takes decision and starts coordinating their task with some other broker. |
Producers | Producers push data to brokers. When the new broker is started, all the producers search it and automatically sends a message to that new broker. Kafka producer doesn’t wait for acknowledgements from the broker and sends messages as fast as the broker can handle. |
Consumers | Since Kafka brokers are stateless, which means that the consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper. |
WorkFlow
Kafka 是一个通过topics分割到一个或多个partitions的collection. 一个Kafka partition 是一个线性有序的messages, 每一个message通过索引标识
-
Workflow of Pub-Sub Messaging
- Producers send message to a topic at regular intervals.
- Kafka broker stores all messages in the partitions configured for that particular topic. It ensures the messages are equally shared between partitions. If the producer sends two messages and there are two partitions, Kafka will store one message in the first partition and the second message in the second partition.
- Consumer subscribes to a specific topic.
- Once the consumer subscribes to a topic, Kafka will provide the current offset of the topic to the consumer and also saves the offset in the Zookeeper ensemble.
- Consumer will request the Kafka in a regular interval (like 100 Ms) for new messages.
- Once Kafka receives the messages from producers, it forwards these messages to the consumers.
- Consumer will receive the message and process it.
- Once the messages are processed, consumer will send an acknowledgement to the Kafka broker.
- Once Kafka receives an acknowledgement , it changes the offset to the new value and updates it in the Zookeeper. Since offsets are maintained in the Zookeeper, the consumer can read next message correctly even during server outrages.
- This above flow will repeat until the consumer stops the request.
- Consumer has the option to rewind/skip to the desired offset of a topic at any time and read all the subsequent messages.
-
Workflow of Queue Messaging / Consumer Group
一个consumers组有相同的Group ID 去 Subscribe 一个Topic.- Producers send message to a topic in a regular interval.
- Kafka stores all messages in the partitions configured for that particular topic similar to the earlier scenario.
- A single consumer subscribes to a specific topic, assume Topic-01 with Group ID as Group-1.
- Kafka interacts with the consumer in the same way as Pub-Sub Messaging until new consumer subscribes the same topic, Topic-01 with the same Group ID as Group-1.
- Once the new consumer arrives, Kafka switches its operation to share mode and shares the data between the two consumers. This sharing will go on until the number of con-sumers reach the number of partition configured for that particular topic.
- Once the number of consumer exceeds the number of partitions, the new consumer will not receive any further message until any one of the existing consumer unsubscribes. This scenario arises because each consumer in Kafka will be assigned a minimum of one partition and once all the partitions are assigned to the existing consumers, the new consumers will have to wait.
- This feature is also called as Consumer Group. In the same way, Kafka will provide the best of both the systems in a very simple and efficient manner.
-
Role of ZooKeeper
A critical dependency of Apache Kafka is Apache Zookeeper, which is a distributed configuration and synchronization service. Zookeeper serves as the coordination interface between the Kafka brokers and consumers. The Kafka servers share information via a Zookeeper cluster. Kafka stores basic metadata in Zookeeper such as information about topics, brokers, consumer offsets (queue readers) and so on.
安装步骤
- 步骤一: 安装JDK 并配置环境变量 JAVA_HOME CLASSPATH
-
步骤二 : 安装ZooKeeper
下载ZooKeeper
解包$ tar xzvf zookeeper-3.5.2-alpha.tar.gz
$ mv ./zookeeper-3.5.2-alpha /opt/zookeepter
$ cd /opt/zookeeper
$ mkdir data
创建配置文件
$ cd /opt/zookeeper
$ vim conf/zoo.cfg
tickTime=2000
dataDir=/opt/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
启动ZooKeeper Seve
$ bin/zkServer.sh start
-
步骤三安装 SLF4J
下载 SL4J : slf4j-1.7.21.tar.gz www.slf4j.org
解包并配置CLASSPATH$ tar xvzf ./slf4j-17.21.tar.gz
$ mv ./slf4j-17.21/op/slf4j
vim ~/.bashrc
export CLASSPATH = ${CLASSPATH}:/opt/slf4j/*
-
步骤四: 安装 Kafka
下载 Kafka
解包$ tar xzvf kafka_2.11-0.10.1.0.tgz
$ mv ./kafka_2.11-0.10.1.0/opt/kafka
$ cd /opt/kafka
启动服务
$ ./bin/kafka-server-start.sh config/server.properties
- 步骤五: 停止服务
$ ./bin/kafka-server-stop.sh config/server.properties
Kafka 基本操作
- 启动 ZooKeeper 和 Kafka
$ ./bin/zkServer.sh start
$ ./bin/kafka-server-start.sh config/server.properties
- 单节点Broker 配置
-
创建一个 Kafka Topic
语法:
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-fator 1--partitions 1--topic topic-name
例子:
$./bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic Hello-Kafka
输出:
Created topic "Hello-Kafka"
-
List of Topics
语法:
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
输出:
Hello-Kafka
-
启动 Producer 去发 Messages
语法:
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092--topic topic-name
例子:
$./bin/kafka-console-producer.sh --broker-list localhost:9092--topic Hello-Kafka
命令行下手动输入如下信息:
Hello
My first message
My second message
-
启动 Consumer 去接受 Messages
语法:
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic topic-name --from-beginning
例子:
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic Hello-Kafka--from-beginning
输出:
Hello
My first message
My second message
-
- 多节点Broker配置
待续 - 基本Topic 操作
-
修改 Topic
语法:
$ ./bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic topic-name --parti-tions count
例子:
$ ./bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic Hello-kafka --parti-tions 2
输出:
WARNING:If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectedAdding partitions succeeded!
-
删除 Topic
语法:
$ ./bin/kafka-topics.sh --zookeeper localhost:2181--delete--topic topic-name
例子:
$ ./bin/kafka-topics.sh --zookeeper localhost:2181--delete--topic Hello-kafka
输出:
TopicHello-kafka marked for deletion
-
删除数据
Stop the ApacheKafka daemon
Delete the topic data folder: rm -rf /tmp/kafka-logs/MyTopic-0
-
Simple Producer Example
-
Kafka Producer API
-
KafkaProducer class provides send method to send messages asynchronously to a topic. The signature of send() is as follows
producer.send(newProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback);
-
ProducerRecord − The producer manages a buffer of records waiting to be sent.
- Callback − A user-supplied callback to execute when the record has been acknowl-edged by the server (null indicates no callback).
-
KafkaProducer class provides a flush method to ensure all previously sent messages have been actually completed. Syntax of the flush method is as follows −
publicvoid flush()
-
KafkaProducer class provides partitionFor method, which helps in getting the partition metadata for a given topic. This can be used for custom partitioning. The signature of this method is as follows −
publicMap metrics()
It returns the map of internal metrics maintained by the producer.
-
public void close() − KafkaProducer class provides close method blocks until all previously sent requests are completed.
-
-
Producer API
-
Send
publicvoid send(KeyedMessaget<k,v> message)- sends the data to a single topic,par-titioned by key using either sync or async producer.
publicvoid send(List<KeyedMessage<k,v>>messages)- sends data to multiple topics.
Properties prop =newProperties();
prop.put(producer.type,”async”)
ProducerConfig config =newProducerConfig(prop);
There are two types of producers –SyncandAsync.
-
Close
public void close()
Producer class provides close method to close the producer pool connections to all Kafka bro-kers.
-
-
Configuration Settings
client.id | identifies producer application |
producer.type | either sync or async |
acks | The acks config controls the criteria under producer requests are con-sidered complete. |
retries | If producer request fails, then automatically retry with specific value. |
bootstrap.servers | bootstrapping list of brokers. |
linger.ms | if you want to reduce the number of requests you can set linger.ms to something greater than some value. |
key.serializer | Key for the serializer interface. |
value.serializer | value for the serializer interface |
batch.size | Buffer size. |
buffer.memory | controls the total amount of memory available to the producer for buff-ering. |
-
ProducerRecord API
public ProducerRecord (string topic, int partition, k key, v value)- Topic − user defined topic name that will appended to record.
- Partition − partition count
- Key − The key that will be included in the record.
- Value − Record contents
public ProducerRecord (string topic, k key, v value)
ProducerRecord class constructor is used to create a record with key, value pairs and without partition.- Topic − Create a topic to assign record.
- Key − key for the record.
- Value − record contents.
public ProducerRecord (string topic, v value)
ProducerRecord class creates a record without partition and key.- Topic − create a topic.
- Value − record contents.
Class Method Description public string topic() Topic will append to the record. public K key() Key that will be included in the record. If no such key, null will be re-turned here. public V value() Record contents. partition() Partition count for the record -
SimpleProducer 应用实例
-
在创建SimpleProducer之前,采用Kafka的命令创建Topic
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
publicclassSimpleProducer{
publicstaticvoid main(String[] args)throwsException{
// Check arguments length value
if(args.length ==0){
System.out.println("Enter topic name");
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props =newProperties();
//Assign localhost id
props.put("bootstrap.servers","localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks","all");
//If the request fails, the producer can automatically retry,
props.put("retries",0);
//Specify buffer size in config
props.put("batch.size",16384);
//Reduce the no of requests less than 0
props.put("linger.ms",1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory",33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer =newKafkaProducer<String,String>(props);
for(int i =0; i <10; i++)
producer.send(newProducerRecord<String,String>(topicName,
Integer.toString(i),Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
}
}
-
编译
$ javac SimpleProducer
-
执行
$ java SimpleProducer<topic-name>
-
输出结果
Message sent successuflly
执行下列语句,接收数据
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181-topic <topic-name>--from-beginning
-
Simple Consumer Example
- KafkaConsumer class
- ConsumerRecordAPI
- ConsumerRecords API
- Conifguration Settings
-
SimpleConsumer 应用实例
-
接收Topic数据代码
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
publicclassSimpleConsumer{
publicstaticvoid main(String[] args)throwsException{
if(args.length ==0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName));
//print the topic name
System.out.println("Subscribed to topic "+ topicName);
int i =0;
while(true){
ConsumerRecords<String,String> records = consumer.poll(100);
for(ConsumerRecord<String,String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
-
编译
$ javac SimpleConsumer
-
执行
$ java SimpleConsumer<topic-name>
-
输出结果
Subscribed to topic Hello-Kafka
offset =3, key =null, value =HelloConsumer
-
参考
Useful Links on Apache Kafka
Apache Kafka Official Website − Official Website for Apache Kafka
http://kafka.apache.org/
Apache Kafka Wiki − Wikipedia Reference for Apache Kafka
https://en.wikipedia.org/wiki/Apache_Kafka
相关推荐
10. 代码实践《kafka学习代码》中可能包含了一些示例,涵盖了producer、consumer的创建、消息发送与接收、配置调整等内容,是深入理解Kafka工作原理和使用技巧的重要参考资料。 总结,Kafka作为一个分布式流处理...
【Kafka学习笔记】 Kafka是由LinkedIn开发的分布式日志系统,后来成为Apache顶级开源项目。它是一个设计为高吞吐、低延迟的系统,特别适用于处理和存储大量实时数据。Kafka的主要特点包括分布式、分区、多副本以及...
### Kafka学习笔记精要 #### 一、为什么需要消息系统 在现代软件开发中,消息系统扮演着极其重要的角色,特别是在分布式系统中。Kafka作为一款高性能的消息队列中间件,其价值在于解决了传统分布式系统中常见的...
kafka学习笔记(一) ================= 本人整理的学习笔记,该笔记目前只有第一版,适合初学者初步了解kafka
《Kafka学习笔记》 Apache Kafka是一款开源的流处理平台,由LinkedIn开发并捐赠给了Apache软件基金会。它最初设计为一个高吞吐量、低延迟的消息队列系统,但现在已经成为大数据领域的重要组件,广泛用于实时数据...
【Kafka学习笔记】 Kafka是一款高性能的分布式消息中间件,广泛应用于大数据实时处理和流处理领域。它具有高吞吐量、低延迟、可扩展性以及容错性等特点,常用于日志收集、监控数据聚合、用户行为追踪等多个场景。 ...
Kafka学习笔记,全网最全
在正式学习Kafka之前,我们首先需要了解消息系统应具备的基本功能。消息系统需要实现消息的发布和订阅,存储消息流,并具备容错性。消息系统的消费者需要能够实时处理消息,具备历史消息的获取能力,并且消息源无需...
kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列...
kafka简单工程
【Kafka知识点详解】 Kafka是一个分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它是消息中间件的一种,广泛应用于大数据实时处理、日志收集、用户行为追踪等领域。 ### 1. 为什么需要消息系统 -...
排版紧凑易于阅读,笔记详细适合初学者下载学习,有详细的实践代码和说明,欢迎下载学习
本套学习笔记将带你深入理解Kafka的核心概念、架构设计以及实战技巧。 一、Kafka概述 Kafka是一个高吞吐量的分布式发布订阅消息系统,它的主要特性包括持久化、分区、复制和并行处理。Kafka的设计目标是提供低延迟...
在本课程中,你将学习到,Kafka架构原理、安装配置使用、详细的Kafka写入数据和处理数据以及写出数据的流程、新旧版本对比及运用、分区副本机制的详解、内部存储策略、高阶API直接消费数据、等等
【Kafka基础知识】 Kafka是一种高吞吐量、分布式的发布订阅消息系统,最初由LinkedIn开发,后来成为Apache...通过深入学习Kafka,你将能够构建高效、可靠的消息传递系统,为大数据处理和实时分析提供强有力的支持。
### Kafka学习详细文档笔记 #### 一、入门 **1、简介** Kafka是由LinkedIn开源的一款分布式的流处理平台,其核心功能在于消息传递。它能够处理大量的实时数据流,并且具备高性能、高吞吐量的特点。Kafka采用发布...
IT十八掌第三期配套笔记! 1、kafka消息系统的介绍 2、producer有分区类 3、kafka支持的副本模式 4、kafka消费者偏移量考察 5、kafka自定义消费者 6、kafka自定义生产者 7、kafka带分区生产者 8、flume集成kafka的几...