`

Kafka 学习笔记

 
阅读更多

Kafka 学习笔记

原创编写: 王宇 
2016-10-24


 

 


消息系统(Messaging System)

  • Point to Point Messaging System 


     
  • Publish-Subscribe Messaging System 


     

两者最主要的区别是 Publish-Subscribe 模式支持多 Receiver

Kafka 结构



 

Components Description
Topics A stream of messages belonging to a particular category is called a topic. Data is stored in topics.Topics are split into partitions. For each topic, Kafka keeps a mini-mum of one partition. Each such partition contains messages in an immutable ordered sequence. A partition is implemented as a set of segment files of equal sizes.
Partition Topics may have many partitions, so it can handle an arbitrary amount of data.
Partition offset Each partitioned message has a unique sequence id called as offset.
Replicas of partition Replicas are nothing but backups of a partition. Replicas are never read or write data. They are used to prevent data loss.
Brokers Brokers are simple system responsible for maintaining the pub-lished data. Each broker may have zero or more partitions per topic. Assume, if there are N partitions in a topic and N number of brokers, each broker will have one partition.Assume if there are N partitions in a topic and more than N brokers (n + m), the first N broker will have one partition and the next M broker will not have any partition for that particular topic.Assume if there are N partitions in a topic and less than N brokers (n-m), each broker will have one or more partition sharing among them. This scenario is not recommended due to unequal load distri-bution among the broker.
Kafka cluster Kafka’s having more than one broker are called as Kafka cluster. A Kafka cluster can be expanded without downtime. These clusters are used to manage the persistence and replication of message data.
Producers Producers are the publisher of messages to one or more Kafka topics. Producers send data to Kafka brokers. Every time a producer pub-lishes a message to a broker, the broker simply appends the message to the last segment file. Actually, the message will be appended to a partition. Producer can also send messages to a partition of their choice.
Consumers Consumers read data from brokers. Consumers subscribes to one or more topics and consume published messages by pulling data from the brokers.
Leader Leader is the node responsible for all reads and writes for the given partition. Every partition has one server acting as a leader.
Follower Node which follows leader instructions are called as follower. If the leader fails, one of the follower will automatically become the new leader.A follower acts as normal consumer, pulls messages and up-dates its own data store.

集群结构(Cluster Architecture)



 

Components Description
Broker Kafka cluster typically consists of multiple brokers to maintain load balance. Kafka brokers are stateless, so they use ZooKeeper for maintaining their cluster state. One Kafka broker instance can handle hundreds of thousands of reads and writes per second and each bro-ker can handle TB of messages without performance impact. Kafka broker leader election can be done by ZooKeeper.
ZooKeeper ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker then pro-ducer and consumer takes decision and starts coordinating their task with some other broker.
Producers Producers push data to brokers. When the new broker is started, all the producers search it and automatically sends a message to that new broker. Kafka producer doesn’t wait for acknowledgements from the broker and sends messages as fast as the broker can handle.
Consumers Since Kafka brokers are stateless, which means that the consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper.

WorkFlow

Kafka 是一个通过topics分割到一个或多个partitions的collection. 一个Kafka partition 是一个线性有序的messages, 每一个message通过索引标识

  • Workflow of Pub-Sub Messaging

    • Producers send message to a topic at regular intervals.
    • Kafka broker stores all messages in the partitions configured for that particular topic. It ensures the messages are equally shared between partitions. If the producer sends two messages and there are two partitions, Kafka will store one message in the first partition and the second message in the second partition.
    • Consumer subscribes to a specific topic.
    • Once the consumer subscribes to a topic, Kafka will provide the current offset of the topic to the consumer and also saves the offset in the Zookeeper ensemble.
    • Consumer will request the Kafka in a regular interval (like 100 Ms) for new messages.
    • Once Kafka receives the messages from producers, it forwards these messages to the consumers.
    • Consumer will receive the message and process it.
    • Once the messages are processed, consumer will send an acknowledgement to the Kafka broker.
    • Once Kafka receives an acknowledgement , it changes the offset to the new value and updates it in the Zookeeper. Since offsets are maintained in the Zookeeper, the consumer can read next message correctly even during server outrages.
    • This above flow will repeat until the consumer stops the request.
    • Consumer has the option to rewind/skip to the desired offset of a topic at any time and read all the subsequent messages.
  • Workflow of Queue Messaging / Consumer Group 
    一个consumers组有相同的Group ID 去 Subscribe 一个Topic.

    • Producers send message to a topic in a regular interval.
    • Kafka stores all messages in the partitions configured for that particular topic similar to the earlier scenario.
    • A single consumer subscribes to a specific topic, assume Topic-01 with Group ID as Group-1.
    • Kafka interacts with the consumer in the same way as Pub-Sub Messaging until new consumer subscribes the same topic, Topic-01 with the same Group ID as Group-1.
    • Once the new consumer arrives, Kafka switches its operation to share mode and shares the data between the two consumers. This sharing will go on until the number of con-sumers reach the number of partition configured for that particular topic.
    • Once the number of consumer exceeds the number of partitions, the new consumer will not receive any further message until any one of the existing consumer unsubscribes. This scenario arises because each consumer in Kafka will be assigned a minimum of one partition and once all the partitions are assigned to the existing consumers, the new consumers will have to wait.
    • This feature is also called as Consumer Group. In the same way, Kafka will provide the best of both the systems in a very simple and efficient manner.
  • Role of ZooKeeper 
    A critical dependency of Apache Kafka is Apache Zookeeper, which is a distributed configuration and synchronization service. Zookeeper serves as the coordination interface between the Kafka brokers and consumers. The Kafka servers share information via a Zookeeper cluster. Kafka stores basic metadata in Zookeeper such as information about topics, brokers, consumer offsets (queue readers) and so on.

安装步骤

  • 步骤一: 安装JDK 并配置环境变量 JAVA_HOME CLASSPATH
  • 步骤二 : 安装ZooKeeper

    下载ZooKeeper 
    解包

    1. $ tar xzvf zookeeper-3.5.2-alpha.tar.gz
    2. $ mv ./zookeeper-3.5.2-alpha /opt/zookeepter
    3. $ cd /opt/zookeeper
    4. $ mkdir data

    创建配置文件

    1. $ cd /opt/zookeeper
    2. $ vim conf/zoo.cfg
    3. tickTime=2000
    4. dataDir=/opt/zookeeper/data
    5. clientPort=2181
    6. initLimit=5
    7. syncLimit=2

    启动ZooKeeper Seve

    1. $ bin/zkServer.sh start
  • 步骤三安装 SLF4J

    下载 SL4J : slf4j-1.7.21.tar.gz www.slf4j.org 
    解包并配置CLASSPATH

    1. $ tar xvzf ./slf4j-17.21.tar.gz
    2. $ mv ./slf4j-17.21/op/slf4j
    3. vim ~/.bashrc
    4. export CLASSPATH = ${CLASSPATH}:/opt/slf4j/*
  • 步骤四: 安装 Kafka

    下载 Kafka 
    解包

    1. $ tar xzvf kafka_2.11-0.10.1.0.tgz
    2. $ mv ./kafka_2.11-0.10.1.0/opt/kafka
    3. $ cd /opt/kafka

    启动服务

    1. $ ./bin/kafka-server-start.sh config/server.properties
  • 步骤五: 停止服务
    1. $ ./bin/kafka-server-stop.sh config/server.properties

Kafka 基本操作

  • 启动 ZooKeeper 和 Kafka
  1. $ ./bin/zkServer.sh start
  2. $ ./bin/kafka-server-start.sh config/server.properties
  • 单节点Broker 配置
    • 创建一个 Kafka Topic

      1. 语法:
      2. $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-fator 1--partitions 1--topic topic-name
      3. 例子:
      4. $./bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic Hello-Kafka
      5. 输出:
      6. Created topic "Hello-Kafka"
    • List of Topics

      1. 语法:
      2. $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
      3. 输出:
      4. Hello-Kafka
    • 启动 Producer 去发 Messages

      1. 语法:
      2. $ ./bin/kafka-console-producer.sh --broker-list localhost:9092--topic topic-name
      3. 例子:
      4. $./bin/kafka-console-producer.sh --broker-list localhost:9092--topic Hello-Kafka
      5. 命令行下手动输入如下信息:
      6. Hello
      7. My first message
      8. My second message
    • 启动 Consumer 去接受 Messages

      1. 语法:
      2. $ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic topic-name --from-beginning
      3. 例子:
      4. $ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic Hello-Kafka--from-beginning
      5. 输出:
      6. Hello
      7. My first message
      8. My second message
  • 多节点Broker配置 
    待续
  • 基本Topic 操作
    • 修改 Topic

      1. 语法:
      2. $ ./bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic topic-name --parti-tions count
      3. 例子:
      4. $ ./bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic Hello-kafka --parti-tions 2
      5. 输出:
      6. WARNING:If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectedAdding partitions succeeded!
    • 删除 Topic

      1. 语法:
      2. $ ./bin/kafka-topics.sh --zookeeper localhost:2181--delete--topic topic-name
      3. 例子:
      4. $ ./bin/kafka-topics.sh --zookeeper localhost:2181--delete--topic Hello-kafka
      5. 输出:
      6. TopicHello-kafka marked for deletion
    • 删除数据

      1. Stop the ApacheKafka daemon
      2. Delete the topic data folder: rm -rf /tmp/kafka-logs/MyTopic-0

Simple Producer Example

  • Kafka Producer API

    • KafkaProducer class provides send method to send messages asynchronously to a topic. The signature of send() is as follows

      1. producer.send(newProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback);
    • ProducerRecord − The producer manages a buffer of records waiting to be sent.

    • Callback − A user-supplied callback to execute when the record has been acknowl-edged by the server (null indicates no callback).
    • KafkaProducer class provides a flush method to ensure all previously sent messages have been actually completed. Syntax of the flush method is as follows −

      1. publicvoid flush()
    • KafkaProducer class provides partitionFor method, which helps in getting the partition metadata for a given topic. This can be used for custom partitioning. The signature of this method is as follows −

      1. publicMap metrics()
      2. It returns the map of internal metrics maintained by the producer.
    • public void close() − KafkaProducer class provides close method blocks until all previously sent requests are completed.

  • Producer API

    • Send

      1. publicvoid send(KeyedMessaget<k,v> message)- sends the data to a single topic,par-titioned by key using either sync or async producer.
      2. publicvoid send(List<KeyedMessage<k,v>>messages)- sends data to multiple topics.
      3. Properties prop =newProperties();
      4. prop.put(producer.type,”async”)
      5. ProducerConfig config =newProducerConfig(prop);
      6. There are two types of producers SyncandAsync.
    • Close 
      public void close() 
      Producer class provides close method to close the producer pool connections to all Kafka bro-kers.

  • Configuration Settings

name Description
client.id identifies producer application
producer.type either sync or async
acks The acks config controls the criteria under producer requests are con-sidered complete.
retries If producer request fails, then automatically retry with specific value.
bootstrap.servers bootstrapping list of brokers.
linger.ms if you want to reduce the number of requests you can set linger.ms to something greater than some value.
key.serializer Key for the serializer interface.
value.serializer value for the serializer interface
batch.size Buffer size.
buffer.memory controls the total amount of memory available to the producer for buff-ering.
  • ProducerRecord API 
    public ProducerRecord (string topic, int partition, k key, v value)

    • Topic − user defined topic name that will appended to record.
    • Partition − partition count
    • Key − The key that will be included in the record.
    • Value − Record contents

    public ProducerRecord (string topic, k key, v value) 
    ProducerRecord class constructor is used to create a record with key, value pairs and without partition.

    • Topic − Create a topic to assign record.
    • Key − key for the record.
    • Value − record contents.

    public ProducerRecord (string topic, v value) 
    ProducerRecord class creates a record without partition and key.

    • Topic − create a topic.
    • Value − record contents.
    Class Method Description
    public string topic() Topic will append to the record.
    public K key() Key that will be included in the record. If no such key, null will be re-turned here.
    public V value() Record contents.
    partition() Partition count for the record
  • SimpleProducer 应用实例

    • 在创建SimpleProducer之前,采用Kafka的命令创建Topic

      1. //import util.properties packages
      2. import java.util.Properties;
      3. //import simple producer packages
      4. import org.apache.kafka.clients.producer.Producer;
      5. //import KafkaProducer packages
      6. import org.apache.kafka.clients.producer.KafkaProducer;
      7. //import ProducerRecord packages
      8. import org.apache.kafka.clients.producer.ProducerRecord;
      9. //Create java class named “SimpleProducer”
      10. publicclassSimpleProducer{
      11. publicstaticvoid main(String[] args)throwsException{
      12. // Check arguments length value
      13. if(args.length ==0){
      14. System.out.println("Enter topic name");
      15. return;
      16. }
      17. //Assign topicName to string variable
      18. String topicName = args[0].toString();
      19. // create instance for properties to access producer configs
      20. Properties props =newProperties();
      21. //Assign localhost id
      22. props.put("bootstrap.servers","localhost:9092");
      23. //Set acknowledgements for producer requests.
      24. props.put("acks","all");
      25. //If the request fails, the producer can automatically retry,
      26. props.put("retries",0);
      27. //Specify buffer size in config
      28. props.put("batch.size",16384);
      29. //Reduce the no of requests less than 0
      30. props.put("linger.ms",1);
      31. //The buffer.memory controls the total amount of memory available to the producer for buffering.
      32. props.put("buffer.memory",33554432);
      33. props.put("key.serializer",
      34. "org.apache.kafka.common.serialization.StringSerializer");
      35. props.put("value.serializer",
      36. "org.apache.kafka.common.serialization.StringSerializer");
      37. Producer<String,String> producer =newKafkaProducer<String,String>(props);
      38. for(int i =0; i <10; i++)
      39. producer.send(newProducerRecord<String,String>(topicName,
      40. Integer.toString(i),Integer.toString(i)));
      41. System.out.println("Message sent successfully");
      42. producer.close();
      43. }
      44. }
    • 编译

      1. $ javac SimpleProducer
    • 执行

      1. $ java SimpleProducer<topic-name>
    • 输出结果

      1. Message sent successuflly
      2. 执行下列语句,接收数据
      3. $ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181-topic <topic-name>--from-beginning

Simple Consumer Example

  • KafkaConsumer class
  • ConsumerRecordAPI
  • ConsumerRecords API
  • Conifguration Settings
  • SimpleConsumer 应用实例

    • 接收Topic数据代码

      1. import java.util.Properties;
      2. import java.util.Arrays;
      3. import org.apache.kafka.clients.consumer.KafkaConsumer;
      4. import org.apache.kafka.clients.consumer.ConsumerRecords;
      5. import org.apache.kafka.clients.consumer.ConsumerRecord;
      6. publicclassSimpleConsumer{
      7. publicstaticvoid main(String[] args)throwsException{
      8. if(args.length ==0){
      9. System.out.println("Enter topic name");
      10. return;
      11. }
      12. //Kafka consumer configuration settings
      13. String topicName = args[0].toString();
      14. Properties props =newProperties();
      15. props.put("bootstrap.servers","localhost:9092");
      16. props.put("group.id","test");
      17. props.put("enable.auto.commit","true");
      18. props.put("auto.commit.interval.ms","1000");
      19. props.put("session.timeout.ms","30000");
      20. props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      21. props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      22. KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);
      23. //Kafka Consumer subscribes list of topics here.
      24. consumer.subscribe(Arrays.asList(topicName));
      25. //print the topic name
      26. System.out.println("Subscribed to topic "+ topicName);
      27. int i =0;
      28. while(true){
      29. ConsumerRecords<String,String> records = consumer.poll(100);
      30. for(ConsumerRecord<String,String> record : records)
      31. // print the offset,key and value for the consumer records.
      32. System.out.printf("offset = %d, key = %s, value = %s\n",
      33. record.offset(), record.key(), record.value());
      34. }
      35. }
      36. }
    • 编译

      1. $ javac SimpleConsumer
    • 执行

      1. $ java SimpleConsumer<topic-name>
    • 输出结果

      1. Subscribed to topic Hello-Kafka
      2. offset =3, key =null, value =HelloConsumer

参考

 

Useful Links on Apache Kafka 
Apache Kafka Official Website − Official Website for Apache Kafka 
http://kafka.apache.org/ 
Apache Kafka Wiki − Wikipedia Reference for Apache Kafka 
https://en.wikipedia.org/wiki/Apache_Kafka

  • 大小: 16.3 KB
  • 大小: 19.7 KB
  • 大小: 48.1 KB
  • 大小: 87.9 KB
分享到:
评论

相关推荐

    kafka学习笔记.doc

    10. 代码实践《kafka学习代码》中可能包含了一些示例,涵盖了producer、consumer的创建、消息发送与接收、配置调整等内容,是深入理解Kafka工作原理和使用技巧的重要参考资料。 总结,Kafka作为一个分布式流处理...

    Kafka学习笔记.pdf

    【Kafka学习笔记】 Kafka是由LinkedIn开发的分布式日志系统,后来成为Apache顶级开源项目。它是一个设计为高吞吐、低延迟的系统,特别适用于处理和存储大量实时数据。Kafka的主要特点包括分布式、分区、多副本以及...

    kafka 学习笔记 good

    ### Kafka学习笔记精要 #### 一、为什么需要消息系统 在现代软件开发中,消息系统扮演着极其重要的角色,特别是在分布式系统中。Kafka作为一款高性能的消息队列中间件,其价值在于解决了传统分布式系统中常见的...

    Kafka学习笔记.pptx

    kafka学习笔记(一) ================= 本人整理的学习笔记,该笔记目前只有第一版,适合初学者初步了解kafka

    Kafka学习笔记.rar

    《Kafka学习笔记》 Apache Kafka是一款开源的流处理平台,由LinkedIn开发并捐赠给了Apache软件基金会。它最初设计为一个高吞吐量、低延迟的消息队列系统,但现在已经成为大数据领域的重要组件,广泛用于实时数据...

    Kafka学习笔记.doc

    【Kafka学习笔记】 Kafka是一款高性能的分布式消息中间件,广泛应用于大数据实时处理和流处理领域。它具有高吞吐量、低延迟、可扩展性以及容错性等特点,常用于日志收集、监控数据聚合、用户行为追踪等多个场景。 ...

    Kafka学习笔记,全网最全

    Kafka学习笔记,全网最全

    Kafka学习笔记

    在正式学习Kafka之前,我们首先需要了解消息系统应具备的基本功能。消息系统需要实现消息的发布和订阅,存储消息流,并具备容错性。消息系统的消费者需要能够实时处理消息,具备历史消息的获取能力,并且消息源无需...

    kafka消息队列学习笔记

    kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列...

    kafka学习笔记-生产者消息发送API

    kafka简单工程

    kafka学习笔记:知识点整理.docx

    【Kafka知识点详解】 Kafka是一个分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它是消息中间件的一种,广泛应用于大数据实时处理、日志收集、用户行为追踪等领域。 ### 1. 为什么需要消息系统 -...

    Strom,kafka学习笔记

    排版紧凑易于阅读,笔记详细适合初学者下载学习,有详细的实践代码和说明,欢迎下载学习

    Kafka全套学习笔记.zip

    本套学习笔记将带你深入理解Kafka的核心概念、架构设计以及实战技巧。 一、Kafka概述 Kafka是一个高吞吐量的分布式发布订阅消息系统,它的主要特性包括持久化、分区、复制和并行处理。Kafka的设计目标是提供低延迟...

    尚硅谷大数据技术之Kafka(笔记+代码+资料).rar

    在本课程中,你将学习到,Kafka架构原理、安装配置使用、详细的Kafka写入数据和处理数据以及写出数据的流程、新旧版本对比及运用、分区副本机制的详解、内部存储策略、高阶API直接消费数据、等等

    尚硅谷大数据视频_Kafka视频教程-笔记.zip

    【Kafka基础知识】 Kafka是一种高吞吐量、分布式的发布订阅消息系统,最初由LinkedIn开发,后来成为Apache...通过深入学习Kafka,你将能够构建高效、可靠的消息传递系统,为大数据处理和实时分析提供强有力的支持。

    kafka学习详细文档笔记

    ### Kafka学习详细文档笔记 #### 一、入门 **1、简介** Kafka是由LinkedIn开源的一款分布式的流处理平台,其核心功能在于消息传递。它能够处理大量的实时数据流,并且具备高性能、高吞吐量的特点。Kafka采用发布...

    IT十八掌_kafka阶段学习笔记(课堂笔记与原理图解)

    IT十八掌第三期配套笔记! 1、kafka消息系统的介绍 2、producer有分区类 3、kafka支持的副本模式 4、kafka消费者偏移量考察 5、kafka自定义消费者 6、kafka自定义生产者 7、kafka带分区生产者 8、flume集成kafka的几...

Global site tag (gtag.js) - Google Analytics