`
农村外出务工男JAVA
  • 浏览: 105797 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

了解kafka

阅读更多

一:kafka简介
   Kafka 是一个分布式的,可划分的,多订阅者,冗余备份的持久性的日志服务,可以简单理解为一个消息中间件。
二:kafka的特点
  1:分布式
    kafka的producer,consumer,broker都是分布式的,可水平扩展,无需停机。
  2:持久化
    kafka将日志持久化到磁盘,通过将消息持久化到磁盘(磁盘线性读写)以及它的replication机制,保证数据的不丢失,通常一个topic会有多个分区,不同的分区分布在不同的server上,而每个分区又可以有多个副本,这样既提高了并行处理的能力,又保证了消息的可靠,因为越多的partition意味着可以接受更多的consumer的pull请求。
 3:高吞吐
   kafka采用磁盘直接进行线性读写而不是随机读写,大大提高了起出来读写请求的速度。
 4、kafka消息保障:
   1.最多一次 --- 消息可能丢失,但永远不会重发。
     我们知道所有的副本都有相同的日志相同的偏移量,当某个消费者在日志中保存了其消费消息的offest但是在消费之前崩溃了,那么其它进程来接管消费的时候,就会获取offest位置之后的消息开始消费,而前面未真正消费的消息就会丢失,这就是‘最多一次’语义
   2.至少一次---消息绝不会丢失,但有可能重复
     当某个消费者消费了某条消息,但是在日志中保存了其消费消息的offest之前崩溃了,那么其它进程来接管消费的时候,就会消费到已经被消费的消息。
而前面未真正消费的消息就会丢失,这就是‘最多一次’语义
   3.恰好一次---每条消息保障只会被传递一次
     实现恰好一次语义可以考虑二阶段提交或着让消费者的存储和输出的偏移量用同一个位置
三:kafka的核心概念
  Producer 特指消息的生产者
  Consumer 特指消息的消费者
  Consumer Group 消费者组,可以并行消费Topic中partition的消息
  Broker:Kafka 集群中的一台服务器称为一个 broker。
  Topic:kafka处理的消息分类,一个消息分类就是一种topic。
  Partition:Topic的分区,一个 topic可以分为多个
  partition,每个partition是一个有序的队列,分区里面的消息都是按接收的顺序追加的且partition中的每条消息都会被分配一个有序的id(offset)。
  Producers:消息和数据生产者,向 Kafka 的一个 topic发布消息的过程叫做 producers。
  Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。
 3.1 kafka的Producers
    在kafka中,生产的消息像kafka的topic发送的过程叫做producers,Producer能够决定将此消息发送到topic的哪个分区下面,可以通过配置文件配置也可在api里显示指定,其支持异步批量发送,批量发送可以有效的提高发送效率,先将消息缓存,然后批量发送给topic,减少网络IO.
 3.2 kafka的Consumers
    在kafka中,一个分区的消息可以被一个消费者组中一个consumer消费,但是一个consumer可以消费多个分区的消息。如果多个消费者同时在一个消费者组中,那么kafka会以轮询的方式,让消息在消费者之间负载均衡
   如果不同的消费者存在不同的消费者组中,这就有点像zookeeper里面的发布-订阅模式,不同组的消费者可以同时消费某个分区的消息。
   需要注意的是,kafka只能给我们保证某个分区的消息是按顺序被消费的,但它不能保证不同分区的消费按一定顺序。
 3.3 kafka的broker
    我们可以理解为一台机器就是一个broker,我们发送的消息(message)日志在broker中是以append的方式追加,并且broker会将我们的消息暂时的buffer起来,根据我们的配置,当消息的大小或者是个数达到了配置的值,broker会将消息一次性的刷新到磁盘,有效降低了每次消息磁盘调用的IO次数。kafka中的broker没有状态,如果一个broker挂掉,这里面的消息也会丢,由于broker的无状态,所以消息的消费都记录在消费者那,并不记录在broker。已经被消费了的消息会根据配置在保存一定时间后自动删除,默认是7天。
3.4 kafka的message
   在kafka中,一条message消息由几部分组成,offest代表消息的偏移量,MessageSize表示消息的大小,data代表了消息的具体内容,kafka在记录message的时候,还会每隔一定的字节建立一个索引,当消费者需要消费指定某条消息的时候,kafka采用二分法查找索引的位置从而找到你需要消费的消息.
四:kafka常用命令
  创建主题

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions
4 --topic test

  查询集群描述

bin/kafka-topics.sh --describe --zookeeper localhost:2181

  生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

  消费者
  

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

  显示某个组的消费者详情
  

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

  平衡leader

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port

    查看某个分区的log和index

  

 bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /bigdata/kafka/kafka-logs/test_find_2-0/00000000000000000000.log  --print-data-log

 
五:kafka在zookeeper中的节点存储结构
    zookeeper在kafka中扮演了举足轻重的作用(0.9版本后offest不放在zk上,由kafka内部topic自己保存),kakfa的 broker,消费者等相关的信息都存在zk的节点上,zk还提供了对kafka的动态负载均衡等机制,下面我们一一介绍
  5.1:broker注册
   在kafka中,每当一个broker启动后,会在zk的节点下存储相关的信息,这是一个临时节点(如果不清楚zk的节点类型,可以参考其官网介绍),所以当某个broker宕掉后,其对应的临时节点会消失.
 

[zk: ip:2181(CONNECTED) 0] ls /brokers
[seqid, topics, ids]
[zk:
ip:2181(CONNECTED) 1] ls /brokers/topics
[testKafka, __consumer_offsets,
test, consumer_offsets, myfirstTopic, first_topic, kafka_first_topic]
[zk: ip:2181(CONNECTED) 2] ls /brokers/ids
[2, 1, 0]

   可以看到,brokers节点下存储目前有多少台broker,该broker下有哪些topic,每个broker可以通过get来获取详细信息
   

[zk: 192.168.8.88:2181(CONNECTED) 0] get /brokers/ids/2
{"jmx_port":-1,"timestamp":"1461828196728","endpoints":["PLAINTEXT://192.168.8.88:9094"],"host":"192.168.8.88","version":2,"port":9094}
cZxid = 0x490000006b
ctime = Thu Apr 28 19:23:16 GMT+12:00 2016
mZxid =
0x490000006b
mtime = Thu Apr 28 19:23:16 GMT+12:00 2016
pZxid =
0x490000006b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x3545bbe83450003
dataLength = 135
numChildren = 0
[zk:
192.168.8.88:2181(CONNECTED) 1]

   这里面有每个broker的ip,port,版本等信息,每个topic的分区消息分布在不同的broker上,
  

[zk: 192.168.8.88:2181(CONNECTED) 1] get
/brokers/topics/kafka_first_topic
{"version":1,"partitions":{"2":[0,1,2],"1":[2,0,1],"0":[1,2,0]}}
cZxid =
0x4100000236
ctime = Sat Apr 16 04:48:10 GMT+12:00 2016
mZxid =
0x4100000236
mtime = Sat Apr 16 04:48:10 GMT+12:00 2016
pZxid =
0x410000023a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x0
dataLength = 64
numChildren = 1
[zk: 192.168.8.88:2181(CONNECTED) 2]

   意思是说,kafka_first_topic这个topic有3个分区,每个不同的分区有3个备份,分别备份在broker id为0,1,2的机器上
 5.2 消费者注册
    前面已经说到,消费者有消费者组的概念,kafka会为每个消费者组分配一个唯一的ID,也为会每个消费者分配一个ID,

[zk: 192.168.8.88:2181(CONNECTED) 40] ls
/consumers/console-consumer-85351/ids
[console-consumer-85351_hadoop-1461831147033-3265fdb3]

    意思是在消费者组console-consumer-85351下有一个id为如上的消费者,而一个消费者组里面的某个消费者消费某个分区的消息,在zk中是这样记录的
   

[zk: 192.168.8.88:2181(CONNECTED) 4]
get /consumers/console-consumer-85351/owners/kafka_first_topic/2
console-consumer-85351_hadoop-1461831147033-3265fdb3-0
cZxid =
0x490000010f
ctime = Thu Apr 28 20:12:28 GMT+12:00 2016
mZxid =
0x490000010f
mtime = Thu Apr 28 20:12:28 GMT+12:00 2016
pZxid =
0x490000010f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x1545bbe6fb80015
dataLength = 54
numChildren = 0

   表示在消费者组console-consumer-85351的topic为kafka_first_topic的第二个分区下,有一个消费者id为 console-consumer-85351_hadoop-1461831147033-3265fdb3-0正在消费
每个topic有不同的分区,每个分区存储了消息的offest,当消费者重启后能够从该节点记录的值后开始继续消费

[zk:
192.168.8.88:2181(CONNECTED) 37] get
/consumers/console-consumer-85351/offsets/kafka_first_topic/0
38
cZxid =
0x4900000117
ctime = Thu Apr 28 20:13:27 GMT+12:00 2016
mZxid =
0x4900000117
mtime = Thu Apr 28 20:13:27 GMT+12:00 2016
pZxid =
0x4900000117
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x0
dataLength = 2
numChildren = 0

   表示分区0的消息的消费偏移到了38这个位置,需要注意的是,这是一个临时节点,当我关了消费者经常,你会发现在consumers里面没有刚才那个消费者组了.
 5.3 controller
    zookeeper节点contorller主要存储的是中央控制器(可以理解为leader)所在机器的信息,下面表示此集群leader是broker id为0这台机器
  

[zk: 192.168.8.88:2181(CONNECTED) 33] get /controller
{"version":1,"brokerid":0,"timestamp":"1461828122648"}
cZxid =
0x4900000007
ctime = Thu Apr 28 19:22:02 GMT+12:00 2016
mZxid =
0x4900000007
mtime = Thu Apr 28 19:22:02 GMT+12:00 2016
pZxid =
0x4900000007
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner =
0x2545bbe6fda0000
dataLength = 54
numChildren = 0
[zk:
192.168.8.88:2181(CONNECTED) 34]

 
 5.4:controller_epoch
   zookeeper里面的controller_epoch存储的是leader选举的次数,比如一开始只有broker0这台机器,后面加入了broker1,那么就会重新进行leader选择,次数就会+1,这样依次类推
   

[zk: 192.168.8.88:2181(CONNECTED) 2] get /controller_epoch
72
cZxid =
0x3700000014
ctime = Tue Apr 05 19:09:47 GMT+12:00 2016
mZxid =
0x4900000008
mtime = Thu Apr 28 19:22:02 GMT+12:00 2016
pZxid =
0x3700000014
cversion = 0
dataVersion = 71
aclVersion = 0
ephemeralOwner =
0x0
dataLength = 2
numChildren = 0

5.5 动态负载均衡
  5.5.1消费者负载均衡
    消费者在注册的时候,会使用zookeeper的watcher监听机制来实现对消费者分组里的各个消费者,以及broker节点注册监听,
一旦某个消费者组里的某个消费者宕掉,或者某个broker宕掉,消费者会收到了事件监听回复,就会根据需要触发消费者负载均衡。
  5.5.2生产这负载均衡
    生产者在将消息发送给broker的时候,会注册watcher监听,监听broker节点的变化,每个topic的新增和减少,以便合理的发送消息到broker.生产者可以将将消息随机或者以hash(key)或者指定某个分区发送,在客户端控制消息的负载均衡。

2
1
分享到:
评论

相关推荐

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    同时,附带的源码可以让读者更加直观地了解Kafka的实际运作。 Kafka是一个分布式流处理平台,由LinkedIn开发并捐赠给了Apache软件基金会。它主要用作消息队列,用于构建实时数据管道和流应用。以下将详细介绍Kafka...

    springboot 基于spring-kafka动态创建kafka消费者

    首先,了解Kafka基本概念:Kafka是一种分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、低延迟的消息传递,支持发布/订阅模式,适用于大数据实时处理场景。 Spring Kafka是Spring ...

    Kafka详细课程讲义

    **Kafka详细课程讲义** 本课程主要涵盖了Apache Kafka的核心概念、安装配置、架构解析、API使用以及监控...通过学习以上章节,你可以深入了解Kafka的原理、配置、使用和优化,为实际项目中的数据流处理打下坚实基础。

    Kafka尚硅谷.rar

    **Kafka概述** Kafka是由LinkedIn开发并贡献给Apache软件基金会的一个开源消息系统,它是一个高性能、可扩展的分布式消息中间件。...通过深入学习Kafka尚硅谷.pdf,你可以更全面地了解Kafka的实践应用和技术细节。

    Apache Kafka实战.pdf

    《Apache Kafka实战》这本书深入浅出地介绍了...通过阅读《Apache Kafka实战》这本书,你可以深入了解Kafka的工作原理,学习如何设计、部署和维护一个高效、可靠的Kafka集群,并利用其强大的功能构建实时数据处理系统。

    kafka部署和使用详尽PDF

    同时,了解Kafka的集群配置和扩展性是掌握大数据实时处理的关键。Kafka的高吞吐量、低延迟特性和强大的消息处理能力使其成为大数据领域的重要组件。在实际应用中,还需要结合Zookeeper进行集群管理,确保服务的稳定...

    Kafka 配置用户名密码例子

    首先,我们需要了解Kafka支持的三种认证机制:SSL(Secure Sockets Layer)、SASL/Kerberos和SASL/PLAIN。SSL主要用于加密传输,防止数据在传输过程中被窃取;SASL/Kerberos基于票证的认证方式,适合大型企业环境,...

    Kafka官方中文文档.pdf

    这些信息对于了解Kafka的演进和版本兼容性非常有用。 整个文档还贯穿了Kafka的部署和运行指导,包括在不同数据中心的部署、客户端配置、Java版本选择、硬件和操作系统要求、监控以及ZooKeeper集群的配置和操作。 ...

    kafka实战pdf

    6. **Kafka Streams**:了解Kafka提供的流处理库Kafka Streams,用于构建实时数据管道和复杂的应用程序。掌握其核心概念,如状态存储和窗口操作。 7. **连接器(Connectors)**:学习使用Kafka Connect框架来简化...

    C#kafka开发实例

    首先,让我们了解Kafka的基本概念。Kafka是一个分布式流处理平台,它允许我们创建发布(Producer)和订阅(Consumer)主题(Topic)。主题是逻辑上的分类,类似于数据库中的表。发布者向主题发布消息,而消费者则从...

    Kafka可视化工具offsetExplore2

    offsetExplore2 实际上是 Kafka 的一个工具,用于管理和监控 Apache Kafka 中...总的来说,offsetExplore2 是一款专门用于管理和监控 Kafka 中偏移量的工具,它可以帮助用户更好地了解和控制 Kafka 中消息的消费情况。

    jemter-kafka连接器

    《JMeter与Kafka连接器:构建高并发数据流测试》 ...通过理解和熟练运用这个工具,你可以更深入地了解Kafka的性能边界,为你的分布式系统优化提供有力的支持。记住,持续的测试和优化是保障大数据系统高效运行的关键。

    kafka监控工具kafka-eagle

    **Kafka Eagle** 是一款专为Apache Kafka设计的开源监控和管理工具,旨在提供更为直观、高效的监控解决方案。...尽管2018年的版本可能无法满足最新的需求,但它仍然是了解Kafka监控工具的一个良好起点。

    实测可用kafka监控工具(2)--kafka-manager

    这个仪表盘是管理员了解Kafka集群健康状况的首要窗口,通过它可以快速查看到整个集群的总体情况,比如节点数量、分区数量等关键指标。 Brokers列表是Kafka-Manager的一大亮点,它列出了所有参与集群的服务器节点,...

    Maven搭建Kafka Java开发环境需要的jar包

    在Java开发环境中,Apache Kafka是一个广泛使用的分布式流处理平台,用于构建实时数据管道和流应用程序。为了使用Kafka进行开发,我们需要...同时,了解Kafka的基本概念和使用方式,有助于更好地开发基于Kafka的应用。

    kafka_2.11-2.0.0.tgz

    《深入理解Kafka:以kafka_2.11-...总结,Kafka_2.11-2.0.0.tgz是一个包含了完整Kafka二进制文件的压缩包,通过学习和实践,我们可以深入了解Kafka的工作原理和使用方法,从而在大数据实时处理领域发挥其强大的功能。

    kafka大文件的代码

    首先,我们要了解Kafka的核心组件:生产者(Producer)和消费者(Consumer)。生产者负责将数据写入Kafka的主题(Topic),而消费者则从主题中读取并处理这些数据。对于处理大文件,我们需要关注的配置主要集中在...

    kafka权威指南中文版

    《Kafka权威指南》中文版是一本深入解析Apache ...通过阅读《Kafka权威指南》中文版,读者可以全面了解Kafka的原理、配置、使用方法以及实战技巧,从而在实际工作中高效地利用Kafka构建高性能、高可靠的数据处理系统。

    springboot-kafka-simple-demo

    首先,我们需要了解Kafka的基本概念。Kafka是一个高吞吐量、低延迟的消息队列,它可以处理PB级别的数据,适用于日志收集、实时分析、流处理等多种场景。其核心组成部分包括生产者(Producer)、消费者(Consumer)...

Global site tag (gtag.js) - Google Analytics