名称解释:
Broker:负责消息的存储和转发,也可以叫消息中介节点
Topic:每种消息的分类叫做主题(Topic)。
Partition:每一个Topic被切分为多个Partitions。
背景
Producer构造Message对象时,传入key参数,当Producer发送Message,会根据key确定目标Partition,当Kafka集群中某个Partition所有存活的节点都失效或挂掉。会造成Producer尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,每次尝试都会休眠一定时间(默认值为100ms)。用户捕捉到异常其结果,停止发送会阻塞,继续发送消息会丢失。
解决思路
Kafka中默认发送消息方式不变,给用户提供一种可选择方式,增加一个消息发送失效转移的开关,当Producer发送到目标Partition的(所有副本本来存活的)节点都失效或挂掉,就转发到其他Partition上。
M:表示Partition的主分区。
S:表示Partition的从分区。下面图2所示,消息轮询发送到Partition的0-3上。
图1
上图本来一条消息是发送到Partition-0所在Broker的,当Partition-0所在Broker全部失效或挂掉后,如下图所示。
图2
消息(轮询均衡)发送到其他Partition上,待失效的Brokers恢复,发送消息恢复到图1状态。
Producer代码修改
代码修改如下:
ProducerConfig.Scala类代码中增加一行:
val sendSwitchBrokerEnabled = props.getBoolean(“send.switch.broker.enable”, false)
DefaultEventHandler.scala类增加一行
val isSendSwitchBrokerEnabled = config.sendSwitchBrokerEnabled
DefaultEventHandler.scala类中增加如下方法:
private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader], isSendSwitchBrokerEnabled: Boolean = false): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") var partition = if (key == null) { // If the key is null, we don't really need a partitioner // So we look up in the send partition cache for the topic to decide the target partition val id = sendPartitionPerTopicCache.get(topic) id match { case Some(partitionId) => // directly return the partitionId without checking availability of the leader, // since we want to postpone the failure until the send operation anyways partitionId case None => val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) partitionId } } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition)) if (isSendSwitchBrokerEnabled) { if(partitionsLeaderInTopicsCache.isEmpty()) { val availablePartitions = topicPartitionList.map { partitionAndLeader => if(partitionAndLeader.leaderBrokerIdOpt.isDefined) { partitionsLeaderInTopicsCache.put(TopicAndPartition(topic, partitionAndLeader.partitionId), partitionAndLeader.leaderBrokerIdOpt.get) } } } if(!partitionsLeaderInTopicsCache.containsKey(TopicAndPartition(topic, partition))) { val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size partition = availablePartitions(index).partitionId } } partition }
相关推荐
2. **消费者位移管理**:消费者位移现在默认存储在Kafka主题中,提高了消费者恢复速度和故障转移的可靠性。 3. **连接器(Connectors)**:Kafka Connect允许快速地建立到外部系统的连接,如数据库或文件系统,简化...
Kafka的高性能得益于其优化的设计,例如零拷贝(Zero-Copy)技术,减少了数据在内存和磁盘之间的传输次数,提高了消息处理速度。另外,Kafka还支持批处理(Batching),将多条消息打包发送,进一步提升了网络传输...
生产者使用 `kafka-console-producer.sh` 发送消息,消费者使用 `kafka-console-consumer.sh` 接收消息。例如,启动一个生产者向 `my-topic` 发送消息: ```bash bin/kafka-console-producer.sh --broker-list ...
1. **性能优化**:每次版本迭代,Kafka都会进行性能调优,提高消息处理速度和吞吐量。 2. **错误修复**:官方会修复已知的bug,增强系统的稳定性和可靠性。 3. **新功能**:可能引入新的API或配置选项,以满足更多...
1. **运维监控**:对于大型企业来说,Kafka Eagle 2.0.3可以帮助运维人员实时监控Kafka集群的健康状况,确保数据传输的稳定性和可靠性。 2. **开发调试**:开发者可以借助其丰富的监控数据,进行问题定位和性能优化...
3. 发送消息:使用`Kafka\Producer::send()`方法将消息发送到指定主题和分区。 4. 订阅主题:`Kafka\Consumer::subscribe()`方法订阅所需的主题,开始监听。 5. 消费消息:通过`Kafka\Consumer::consume()`方法获取...
Partition是Kafka保证消息顺序和并行处理的关键机制,每个分区内的消息会按照发送顺序被消费。 至于"apache-storm-1.2.2",它可能包含了稳定性增强、性能提升以及与其他组件(如Kafka)集成的改进。在部署Storm集群...
Kafka通过使用Zookeeper来协调和同步不同broker之间的日志状态,保证了消息的持久性和可靠性。Zookeeper也负责存储和管理consumer的offset信息,以便消费者可以跟踪它们的消费进度。 总的来说,Kafka自学文档为学习...
3. **消费者API改进**:在这个版本中,Kafka 引入了新的消费者 API,即 Simple Consumer 被 High Level Consumer 所取代,后者引入了消费组的概念,提高了消息处理的可靠性和效率。 4. **故障恢复与复制**:Kafka ...
2. **调优**:根据实际负载调整配置,例如增加分区数量、优化网络设置、调整内存分配等,以提高 Kafka 的性能和稳定性。 总结,Kafka 2.4.0 作为一款强大的消息中间件,提供了高效、可靠的数据流处理能力,其新特性...
- **Producer(生产者)**:向Kafka集群发送消息的应用程序。 - **Consumer(消费者)**:从Kafka集群读取消息的应用程序。 - **Topic(主题)**:逻辑上分类的发布-订阅系统,类似于邮件列表或新闻组。 - **Broker...
为了提高系统的可用性和可靠性,Kafka 实现了数据的多副本存储。 #### 4.8 Log Compaction 日志压缩技术帮助减少磁盘空间占用,优化数据存储。 #### 4.9 Quotas 配额机制限制了客户端的资源使用,确保集群的稳定性...
Kafka集群通常包括多个Broker节点,这些节点之间通过复制机制实现数据冗余,提高系统的可用性和可靠性。 **5.2 Kafka集群扩展** 随着数据量的增长,可以通过增加Broker节点的数量来水平扩展Kafka集群。同时,还...
- **可靠性**:消息一旦被写入磁盘就不会丢失。 - **可扩展性**:支持在线水平扩展。 ### Kafka关键知识点详解 #### 1. Kafka的基本概念 - **Broker**:Kafka集群的核心组件,负责消息的接收、存储和分发。 - *...
《Kafka:深入理解消息队列的核心机制》 Apache Kafka是一款高效、可扩展且持久化的分布式消息队列系统,广泛应用于大数据实时处理、日志收集、流式计算等多个领域。Kafka 2.13-2.8.1是其在Java 11环境下稳定版本的...
- **数据持久性**:Kafka通过高效的磁盘数据结构来实现消息的持久化存储,即使面对大量数据也能保持高性能。 - **高吞吐量**:即便是普通硬件配置下,Kafka也能支持每秒数百万条消息的处理能力。 - **灵活的消息分发...
在Kafka中,消息被持久化到磁盘,以实现高可靠性。每个主题被划分为多个Partition(分区),每个Partition是一个有序的消息队列,保证了消息的顺序性。Partition在多个Brokers之间分布,提供水平扩展能力。 3. **...
**Kafka 2.10-0.9.0.1 源码解析与镜像制作** 在深入探讨 Kafka 的源码之前,先来理解 Kafka 是什么。...深入研究这些源码将帮助我们更好地理解和优化 Kafka 系统,为大数据实时处理和消息传递提供可靠的基础。
- **容错性**:通过副本和ISR(In-Sync Replica)机制确保数据的可靠性。 - **实时性**:Kafka支持实时数据流处理,消息一旦发布即可被消费。 - **API支持**:提供Java、Python、C++等多种语言的SDK,方便集成。 ...