- 浏览: 565210 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (267)
- 随笔 (4)
- Spring (13)
- Java (61)
- HTTP (3)
- Windows (1)
- CI(Continuous Integration) (3)
- Dozer (1)
- Apache (11)
- DB (7)
- Architecture (41)
- Design Patterns (11)
- Test (5)
- Agile (1)
- ORM (3)
- PMP (2)
- ESB (2)
- Maven (5)
- IDE (1)
- Camel (1)
- Webservice (3)
- MySQL (6)
- CentOS (14)
- Linux (19)
- BI (3)
- RPC (2)
- Cluster (9)
- NoSQL (7)
- Oracle (25)
- Loadbalance (7)
- Web (5)
- tomcat (1)
- freemarker (1)
- 制造 (0)
最新评论
-
panamera:
如果设置了连接需要密码,Dynamic Broker-Clus ...
ActiveMQ 集群配置 -
panamera:
请问你的最后一种模式Broker-C节点是不是应该也要修改持久 ...
ActiveMQ 集群配置 -
maosheng:
longshao_feng 写道楼主使用 文件共享 模式的ma ...
ActiveMQ 集群配置 -
longshao_feng:
楼主使用 文件共享 模式的master-slave,produ ...
ActiveMQ 集群配置 -
tanglanwen:
感触很深,必定谨记!
少走弯路的十条忠告
kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
Kafka的架构:
Kafka的整体架构非常简单,是显式分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。
一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干broker(作为 Kafka 节点的服务器),若干Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。
在 Kafka 架构中,有以下术语:
•Producer:生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;
•Broker:Kafka节点,一个Kafka节点就是一个broker,Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;
•Topic:producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;
•Partition:每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;
•Consumer:消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;
•Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;
•replicas:partition 的副本,保障 partition 的高可用;
•leader:replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;
•follower:replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;
•controller:Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;
•ZooKeeper:Kafka 通过 ZooKeeper 来存储集群的 meta 信息等
Topic 只是逻辑概念,面向的是 producer 和 consumer;而 Partition 则是物理概念。可以想象,如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 broker,那么关于该 Topic 的所有读写请求都将由这一个 broker 处理,吞吐量很容易陷入瓶颈,这显然是不符合高吞吐量应用场景的。有了 Partition 概念以后,假设一个 Topic 被分为 10 个 Partitions,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀的分布到不同的 broker(服务器)上,当 producer 发布消息时,producer 客户端可以采用 random、key-hash 及 轮询 等算法选定目标 partition,若不指定,Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。
消息发送的流程:
1.Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面
2.kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长,而不关注消息是否被消费。
3.Consumer从kafka集群pull数据,并控制获取消息的offset。
Kafka的设计:
吞吐量:
高吞吐是kafka需要实现的核心目标之一,为此kafka做了以下一些设计:
1.数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能
2.zero-copy:减少IO操作步骤
3.数据批量发送
4.数据压缩
5.Topic划分为多个partition,提高parallelism
负载均衡:
1.producer根据用户指定的算法,将消息发送到指定的partition
2.存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上
3.多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over
4.通过zookeeper管理broker与consumer的动态加入与离开
拉取系统:
由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:
1.简化kafka设计
2.consumer根据消费能力自主控制消息拉取速度
3.consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等
可扩展性:
当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。
Kafka的应用场景:
1.消息队列
比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ。
2.行为跟踪
Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。
3.元信息监控
作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。
4.日志收集
日志收集方面,其实开源产品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。
5.流处理
这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。Strom和Samza是非常著名的实现这种类型数据转换的框架。
6.事件源
事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。
7.持久性日志(commit log)
Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。
发送消息的主要步骤:
首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化为ByteArrays,以便它们可以通过网络发送。
接下来,数据会被发送到分区器。如果在ProducerRecord中指定了一个分区,那么分区器会直接返回指定的分区;否则,分区器通常会基于ProducerRecord的key值计算出一个分区。一旦分区被确定,生产者就知道数据会被发送到哪个topic和分区。然后数据会被添加到同一批发送到相同topic和分区的数据里面,一个单独的线程会负责把那些批数据发送到对应的brokers。
当broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。
生产者配置属性
发送数据到Kafka的第一步是创建一个生产者,必须指定以下三个属性:
bootstrap.servers:生产者用于与Kafka集群建立初始连接的主机和端口的列表。该列表不需要包括所有的brokers信息,因为生产者在建立连接后能够获取所有brokers的信息。但建议至少包含两个,防止一个broker宕机,生产者仍然能够通过另外一个broker连接到群集。
key.serializer:用于序列化keys的类名。Kafka brokers期待key和value的类型为byte数组,但是也允许使用参数化的Java对象作为key和value。这使得代码非常易读,但也意味着生产者必须知道如何把这些对象转换为byte数组。key.serializer应设为实现了org.apache.kafka.common.serialization.Serializer接口的类名,生产者将会使用这个类来把key对象序列化为byte数组。Kafka内置实现了ByteArraySerializer、StringSerializer和IntegerSerializer。注意,即使生产者发送的数据没有指定key,也必须设置key.serializer这个属性。
value.serializer:用于序列化value的类名。类似于key.serializer,生产者将会使用指定的类来把value对象序列化为byte数组。
生产者有很多配置属性,除了上述的三个之外,下面是一些比较重要的属性:
1 acks
此配置设置在生产者可以认为发送请求完成之前,有多少分区副本必须接收到数据。此选项对消息可能丢失的可能性有重大影响,此配置有三个允许的值,默认为1:
acks=0,生产者不会等待broker的任何确认,消息会被立即添加到缓冲区并被认为已经发送。在这种情况下,不能保证服务器已经收到消息,并且重试配置不会生效(因为客户端通常不会知道任何异常),每条消息返回的偏移量始终设置为-1。由于生产者不等待broker的任何确认,因此它可以以网络支持的最快速度发送消息,所以这个配置适用于实现非常高的吞吐量。
acks=1,在leader服务器的副本收到消息的同一时间,生产者会接收到broker的确认。如果消息不能写入leader的副本(例如,如果leader宕机并且还没有选出新的leader),生产者将接收到异常响应,然后可以重新发送消息,避免丢失数据。如果leader宕机并且消息没有被写入到新的leader(通过不确定的leader选举),该消息仍然会丢失。在这种情况下,吞吐量取决于消息是同步还是异步发送。如果我们的客户端等待服务器的回复(通过上述的调用发送消息时返回的Future对象的get()方法),它明显会显著地增加延迟(至少通过网络往返)。如果客户端使用callback,则延迟不会那么明显,但吞吐量将受到正在发送消息数量的限制(例如,在接收到响应之前生产者将会发送多少消息)。
acks=all(或-1),一旦所有的同步副本接收到消息,生产者才会接收到broker的确认。这是最安全的模式,因为可以确保多于一个的broker接收到该消息,即使在宕机的情况下,该消息也能被保存。然而,延迟性会比acks=1的时候更高,因为需要等待所有broker接收到消息。
2 buffer.memory
此配置设置生产者可用于缓冲等待发送给brokers消息的总内存字节数,默认为33554432=32MB。如果消息发送到缓存区的速度比发送到broker的速度快,那么生产者会被阻塞(根据max.block.ms配置的时间,默认为60000ms=1分钟,在0.9.0.0版本之前使用block.on.buffer.full配置),之后会抛出异常。
3 compression.type
生产者对生成的所有数据使用的压缩类型,默认值是none(即不压缩),有效值为none,gzip,snappy或lz4。Snappy压缩技术是Google开发的,它可以在提供较好的压缩比的同时,减少对CPU的使用率并保证好的性能,所以建议在同时考虑性能和带宽的情况下使用。Gzip压缩技术通常会使用更多的CPU和时间,但会产生更好的压缩比,所以建议在网络带宽更受限制的情况下使用。通过启用压缩功能,可以减少网络利用率和存储空间,这往往是向Kafka发送消息的瓶颈。
4 retries
默认值为0,当设置为大于零的值,客户端会重新发送任何发送失败的消息。注意,此重试与客户端收到错误时重新发送消息是没有区别的。在配置max.in.flight.requests.per.connection不等于1的情况下,允许重试可能会改变消息的顺序,因为如果两个批次的消息被发送到同一个分区,第一批消息发送失败但第二批成功,而第一批消息会被重新发送,则第二批消息会先被写入。
5 batch.size
当多个消息被发送到同一个分区时,生产者会把它们一起处理。此配置设置用于每批处理使用的内存字节数,默认为16384=16KB。当使用的内存满的时候,生产者会发送当前批次的所有消息。但是,这并不意味着生产者会一直等待使用的内存变满,根据下面linger.ms配置的时间也会触发消息发送。设置较小的值会增加发送的频率,从而可能会减少吞吐量;设置较大的值会使用较多的内存,设置为0会关闭批处理的功能。
6 linger.ms
此配置设置在发送当前批次消息之前等待新消息的时间量,默认值为0。KafkaProducer会在当前批次使用的内存已满或等待时间到达linger.ms配置时间的时候发送消息。当linger.ms>0时,延时性会增加,但会提高吞吐量,因为会减少消息发送频率。
7 client.id
用于标识发送消息的客户端,通常用于日志和性能指标以及配额。
8 max.in.flight.requests.per.connection
此配置设置客户端在单个连接上能够发送的未确认请求的最大数量,默认为5,超过此数量会造成阻塞。设置大的值可以提高吞吐量但会增加内存使用,但是需要注意的是,当设置值大于1而且发送失败时,如果启用了重试配置,有可能会改变消息的顺序。设置为1时,即使重新发送消息,也可以保证发送的顺序和写入的顺序一致。
9 request.timeout.ms
此配置设置客户端等待请求响应的最长时间,默认为30000ms=30秒,如果在这个时间内没有收到响应,客户端将重发请求,如果超过重试次数将抛异常。此配置应该比replica.lag.time.max.ms(broker配置,默认10秒)大,以减少由于生产者不必要的重试造成消息重复的可能性。
10 max.block.ms
当发送缓冲区已满或者元数据不可用时,生产者调用send()和partitionsFor()方法会被阻塞,默认阻塞时间为60000ms=1分钟。由于使用用户自定义的序列化器和分区器造成的阻塞将不会计入此时间。
11 max.request.size
此配置设置生产者在单个请求中能够发送的最大字节数,默认为1048576字节=1MB。例如,你可以发送单个大小为1MB的消息或者1000个大小为1KB的消息。注意,broker也有接收消息的大小限制,使用的配置是message.max.bytes=1000012字节(好奇怪的数字,约等于1MB)。
12 receive.buffer.bytes和send.buffer.bytes
receive.buffer.bytes:读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小,默认值为32768字节=32KB。如果设置为-1,则将使用操作系统的默认值。
send.buffer.bytes:发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小,默认值为131072字节=128KB。如果设置为-1,则将使用操作系统的默认值。
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
Kafka的架构:
Kafka的整体架构非常简单,是显式分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。
一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干broker(作为 Kafka 节点的服务器),若干Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。
在 Kafka 架构中,有以下术语:
•Producer:生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;
•Broker:Kafka节点,一个Kafka节点就是一个broker,Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;
•Topic:producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;
•Partition:每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;
•Consumer:消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;
•Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;
•replicas:partition 的副本,保障 partition 的高可用;
•leader:replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;
•follower:replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;
•controller:Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;
•ZooKeeper:Kafka 通过 ZooKeeper 来存储集群的 meta 信息等
Topic 只是逻辑概念,面向的是 producer 和 consumer;而 Partition 则是物理概念。可以想象,如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 broker,那么关于该 Topic 的所有读写请求都将由这一个 broker 处理,吞吐量很容易陷入瓶颈,这显然是不符合高吞吐量应用场景的。有了 Partition 概念以后,假设一个 Topic 被分为 10 个 Partitions,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀的分布到不同的 broker(服务器)上,当 producer 发布消息时,producer 客户端可以采用 random、key-hash 及 轮询 等算法选定目标 partition,若不指定,Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。
消息发送的流程:
1.Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面
2.kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长,而不关注消息是否被消费。
3.Consumer从kafka集群pull数据,并控制获取消息的offset。
Kafka的设计:
吞吐量:
高吞吐是kafka需要实现的核心目标之一,为此kafka做了以下一些设计:
1.数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能
2.zero-copy:减少IO操作步骤
3.数据批量发送
4.数据压缩
5.Topic划分为多个partition,提高parallelism
负载均衡:
1.producer根据用户指定的算法,将消息发送到指定的partition
2.存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上
3.多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over
4.通过zookeeper管理broker与consumer的动态加入与离开
拉取系统:
由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:
1.简化kafka设计
2.consumer根据消费能力自主控制消息拉取速度
3.consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等
可扩展性:
当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。
Kafka的应用场景:
1.消息队列
比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ。
2.行为跟踪
Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。
3.元信息监控
作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。
4.日志收集
日志收集方面,其实开源产品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。
5.流处理
这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。Strom和Samza是非常著名的实现这种类型数据转换的框架。
6.事件源
事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。
7.持久性日志(commit log)
Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。
发送消息的主要步骤:
首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化为ByteArrays,以便它们可以通过网络发送。
接下来,数据会被发送到分区器。如果在ProducerRecord中指定了一个分区,那么分区器会直接返回指定的分区;否则,分区器通常会基于ProducerRecord的key值计算出一个分区。一旦分区被确定,生产者就知道数据会被发送到哪个topic和分区。然后数据会被添加到同一批发送到相同topic和分区的数据里面,一个单独的线程会负责把那些批数据发送到对应的brokers。
当broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。
生产者配置属性
发送数据到Kafka的第一步是创建一个生产者,必须指定以下三个属性:
bootstrap.servers:生产者用于与Kafka集群建立初始连接的主机和端口的列表。该列表不需要包括所有的brokers信息,因为生产者在建立连接后能够获取所有brokers的信息。但建议至少包含两个,防止一个broker宕机,生产者仍然能够通过另外一个broker连接到群集。
key.serializer:用于序列化keys的类名。Kafka brokers期待key和value的类型为byte数组,但是也允许使用参数化的Java对象作为key和value。这使得代码非常易读,但也意味着生产者必须知道如何把这些对象转换为byte数组。key.serializer应设为实现了org.apache.kafka.common.serialization.Serializer接口的类名,生产者将会使用这个类来把key对象序列化为byte数组。Kafka内置实现了ByteArraySerializer、StringSerializer和IntegerSerializer。注意,即使生产者发送的数据没有指定key,也必须设置key.serializer这个属性。
value.serializer:用于序列化value的类名。类似于key.serializer,生产者将会使用指定的类来把value对象序列化为byte数组。
生产者有很多配置属性,除了上述的三个之外,下面是一些比较重要的属性:
1 acks
此配置设置在生产者可以认为发送请求完成之前,有多少分区副本必须接收到数据。此选项对消息可能丢失的可能性有重大影响,此配置有三个允许的值,默认为1:
acks=0,生产者不会等待broker的任何确认,消息会被立即添加到缓冲区并被认为已经发送。在这种情况下,不能保证服务器已经收到消息,并且重试配置不会生效(因为客户端通常不会知道任何异常),每条消息返回的偏移量始终设置为-1。由于生产者不等待broker的任何确认,因此它可以以网络支持的最快速度发送消息,所以这个配置适用于实现非常高的吞吐量。
acks=1,在leader服务器的副本收到消息的同一时间,生产者会接收到broker的确认。如果消息不能写入leader的副本(例如,如果leader宕机并且还没有选出新的leader),生产者将接收到异常响应,然后可以重新发送消息,避免丢失数据。如果leader宕机并且消息没有被写入到新的leader(通过不确定的leader选举),该消息仍然会丢失。在这种情况下,吞吐量取决于消息是同步还是异步发送。如果我们的客户端等待服务器的回复(通过上述的调用发送消息时返回的Future对象的get()方法),它明显会显著地增加延迟(至少通过网络往返)。如果客户端使用callback,则延迟不会那么明显,但吞吐量将受到正在发送消息数量的限制(例如,在接收到响应之前生产者将会发送多少消息)。
acks=all(或-1),一旦所有的同步副本接收到消息,生产者才会接收到broker的确认。这是最安全的模式,因为可以确保多于一个的broker接收到该消息,即使在宕机的情况下,该消息也能被保存。然而,延迟性会比acks=1的时候更高,因为需要等待所有broker接收到消息。
2 buffer.memory
此配置设置生产者可用于缓冲等待发送给brokers消息的总内存字节数,默认为33554432=32MB。如果消息发送到缓存区的速度比发送到broker的速度快,那么生产者会被阻塞(根据max.block.ms配置的时间,默认为60000ms=1分钟,在0.9.0.0版本之前使用block.on.buffer.full配置),之后会抛出异常。
3 compression.type
生产者对生成的所有数据使用的压缩类型,默认值是none(即不压缩),有效值为none,gzip,snappy或lz4。Snappy压缩技术是Google开发的,它可以在提供较好的压缩比的同时,减少对CPU的使用率并保证好的性能,所以建议在同时考虑性能和带宽的情况下使用。Gzip压缩技术通常会使用更多的CPU和时间,但会产生更好的压缩比,所以建议在网络带宽更受限制的情况下使用。通过启用压缩功能,可以减少网络利用率和存储空间,这往往是向Kafka发送消息的瓶颈。
4 retries
默认值为0,当设置为大于零的值,客户端会重新发送任何发送失败的消息。注意,此重试与客户端收到错误时重新发送消息是没有区别的。在配置max.in.flight.requests.per.connection不等于1的情况下,允许重试可能会改变消息的顺序,因为如果两个批次的消息被发送到同一个分区,第一批消息发送失败但第二批成功,而第一批消息会被重新发送,则第二批消息会先被写入。
5 batch.size
当多个消息被发送到同一个分区时,生产者会把它们一起处理。此配置设置用于每批处理使用的内存字节数,默认为16384=16KB。当使用的内存满的时候,生产者会发送当前批次的所有消息。但是,这并不意味着生产者会一直等待使用的内存变满,根据下面linger.ms配置的时间也会触发消息发送。设置较小的值会增加发送的频率,从而可能会减少吞吐量;设置较大的值会使用较多的内存,设置为0会关闭批处理的功能。
6 linger.ms
此配置设置在发送当前批次消息之前等待新消息的时间量,默认值为0。KafkaProducer会在当前批次使用的内存已满或等待时间到达linger.ms配置时间的时候发送消息。当linger.ms>0时,延时性会增加,但会提高吞吐量,因为会减少消息发送频率。
7 client.id
用于标识发送消息的客户端,通常用于日志和性能指标以及配额。
8 max.in.flight.requests.per.connection
此配置设置客户端在单个连接上能够发送的未确认请求的最大数量,默认为5,超过此数量会造成阻塞。设置大的值可以提高吞吐量但会增加内存使用,但是需要注意的是,当设置值大于1而且发送失败时,如果启用了重试配置,有可能会改变消息的顺序。设置为1时,即使重新发送消息,也可以保证发送的顺序和写入的顺序一致。
9 request.timeout.ms
此配置设置客户端等待请求响应的最长时间,默认为30000ms=30秒,如果在这个时间内没有收到响应,客户端将重发请求,如果超过重试次数将抛异常。此配置应该比replica.lag.time.max.ms(broker配置,默认10秒)大,以减少由于生产者不必要的重试造成消息重复的可能性。
10 max.block.ms
当发送缓冲区已满或者元数据不可用时,生产者调用send()和partitionsFor()方法会被阻塞,默认阻塞时间为60000ms=1分钟。由于使用用户自定义的序列化器和分区器造成的阻塞将不会计入此时间。
11 max.request.size
此配置设置生产者在单个请求中能够发送的最大字节数,默认为1048576字节=1MB。例如,你可以发送单个大小为1MB的消息或者1000个大小为1KB的消息。注意,broker也有接收消息的大小限制,使用的配置是message.max.bytes=1000012字节(好奇怪的数字,约等于1MB)。
12 receive.buffer.bytes和send.buffer.bytes
receive.buffer.bytes:读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小,默认值为32768字节=32KB。如果设置为-1,则将使用操作系统的默认值。
send.buffer.bytes:发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小,默认值为131072字节=128KB。如果设置为-1,则将使用操作系统的默认值。
发表评论
-
Zookeeper 总结
2020-01-10 14:53 293zookeeper是一个开源的分 ... -
消息队列之 RabbitMQ
2018-10-16 11:53 819RabbitMQ 特点 RabbitMQ 是一个由 Erla ... -
Disruptor 介绍
2016-07-04 15:53 3106并发的复杂性: 在计算机科学中,并发的意思是两个或两 ... -
Apache kafka 解读
2016-06-29 12:42 868消息队列应用场景:异步处理、应用解耦、流量削锋、日志处理、消息 ... -
ActiveMQ 集群配置
2015-01-09 10:55 18994构建高可用的ActiveMQ系 ... -
JXPath使用介绍
2014-03-11 15:13 1334一、JXPath简介 JXPath是apache公司提供 ... -
注册Tomcat服务为系统服务
2013-12-12 11:40 5721Tomcat服务注册为系统服务之后,就不用每次启动机器之 ... -
Apache ActiveMQ 介绍
2013-08-22 15:18 1207一、下载部署 1、下载 ... -
Apache PDFBox 介绍
2012-02-03 10:22 1695PDFBox是Java实现的PDF文档协作类库,提供PDF文档 ... -
Apache POI 介绍
2012-02-03 10:13 1711Apache POI是Apache软件基金会的开放源码函式库, ...
相关推荐
Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 用于构建实时数据流和流式应用程序。它以高吞吐量、可扩展性和容错性著称,适用于处理大规模数据流。 ...
《Apache Kafka实战》这本书深入浅出地介绍了Apache Kafka这一分布式流处理平台的各个方面,旨在帮助读者掌握Kafka的实际应用和核心概念。Kafka是一个高吞吐量、低延迟的消息发布订阅系统,常用于构建实时数据管道和...
Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。
This book is here to help you get familiar with Apache Kafka and to solve your challenges related to the consumption of millions of messages in publisher-subscriber architectures. It is aimed at ...
Apache Kafka是一款高性能分布式消息队列系统,它的特点包括可分区、可备份以及基于Zookeeper进行协调。消息队列系统是现代企业级系统架构中的核心组件之一,它允许不同的系统组件之间通过消息进行异步通信,具有...
Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...
《Apache Kafka Cookbook》是由PACKT在2015年出版的一本专著,专注于介绍Apache Kafka这一分布式流处理平台的实战技巧和最佳实践。Apache Kafka是一个高性能、可扩展且容错性强的消息中间件,它被广泛应用于大数据...
本文将深入探讨"Streaming Architecture New Designs Using Apache Kafka and MapR Streams"这一主题,阐述如何利用这两种强大的工具构建高效、可扩展的流处理系统。 Apache Kafka是一种分布式流处理平台,由...
Building Data Streaming Applications with Apache Kafka 英文azw3 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
### Apache Kafka介绍与核心知识点详解 #### 一、Apache Kafka概览 **Apache Kafka**是一种分布式提交日志服务,广泛应用于数据摄入场景。它提供了一系列关键特性,包括可扩展性、高性能、高可靠性和灵活性。从...
从提供的内容来看,这本书由Nishant Garg编写,Packt Publishing出版,详细介绍了如何搭建Apache Kafka集群,并通过实用、动手的例子来开发自定义的消息生产者和消费者。Nishant Garg是一位拥有超过13年工作经验的...
1. **Apache Kafka 介绍**:Kafka是一个分布式流处理平台,它允许创建实时的数据流,这些数据流可以被多个消费者同时消费。Kafka的核心概念包括生产者(Producer)、主题(Topic)、分区(Partition)和消费者...
Kafka是一个对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。 1.Kafka集群包含一个或多个服务器,这种服务器被称为broker 2.Partition是物理上的概念,每个Topic...
### Apache Kafka:设置集群与开发消息生产者及消费者 #### Apache Kafka概述 Apache Kafka是一种开源流处理平台,由LinkedIn公司创建并捐赠给Apache软件基金会。Kafka被设计为一种高吞吐量、分布式的消息系统,它...
1. **安全概述**:介绍 Kafka 安全框架的整体架构。 2. **加密与认证**:通过 SSL 协议实现数据加密传输和双向认证。 3. **授权与 ACLs**:定义了访问控制列表,控制用户对资源的操作权限。 4. **安全特性集成**:...
Learning Apache Kafka Second Edition provides you with step-by-step, practical examples that help you take advantage of the real power of Kafka and handle hundreds of megabytes of messages per second ...
本书可能详细介绍了Kafka的安装与配置,包括设置集群、配置broker、创建主题以及调整参数以优化性能。还可能涉及如何编写生产者和消费者程序,以及如何使用Java、Python等不同语言进行API调用。 另外,书中可能探讨...