`
1028826685
  • 浏览: 938715 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

Kafka Producer机制优化-提高发送消息可靠性

 
阅读更多

名称解释:

Broker:负责消息的存储和转发,也可以叫消息中介节点 
Topic:每种消息的分类叫做主题(Topic)。 
Partition:每一个Topic被切分为多个Partitions。

背景

Producer构造Message对象时,传入key参数,当Producer发送Message,会根据key确定目标Partition,当Kafka集群中某个Partition所有存活的节点都失效或挂掉。会造成Producer尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,每次尝试都会休眠一定时间(默认值为100ms)。用户捕捉到异常其结果,停止发送会阻塞,继续发送消息会丢失。

解决思路

Kafka中默认发送消息方式不变,给用户提供一种可选择方式,增加一个消息发送失效转移的开关,当Producer发送到目标Partition的(所有副本本来存活的)节点都失效或挂掉,就转发到其他Partition上。 
M:表示Partition的主分区。 
S:表示Partition的从分区。下面图2所示,消息轮询发送到Partition的0-3上。 
这里写图片描述
图1 
上图本来一条消息是发送到Partition-0所在Broker的,当Partition-0所在Broker全部失效或挂掉后,如下图所示。 
这里写图片描述 
图2 
消息(轮询均衡)发送到其他Partition上,待失效的Brokers恢复,发送消息恢复到图1状态。

Producer代码修改

代码修改如下: 
ProducerConfig.Scala类代码中增加一行: 
val sendSwitchBrokerEnabled = props.getBoolean(“send.switch.broker.enable”, false) 
DefaultEventHandler.scala类增加一行 
val isSendSwitchBrokerEnabled = config.sendSwitchBrokerEnabled

DefaultEventHandler.scala类中增加如下方法:

private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader], isSendSwitchBrokerEnabled: Boolean = false): Int = {
  val numPartitions = topicPartitionList.size
  if(numPartitions <= 0)
    throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
  var partition =
    if (key == null) {
      // If the key is null, we don't really need a partitioner
      // So we look up in the send partition cache for the topic to decide the target partition
      val id = sendPartitionPerTopicCache.get(topic)
      id match {
        case Some(partitionId) =>
          // directly return the partitionId without checking availability of the leader,
          // since we want to postpone the failure until the send operation anyways
          partitionId
        case None =>
          val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
          if (availablePartitions.isEmpty)
            throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
          val index = Utils.abs(Random.nextInt) % availablePartitions.size
          val partitionId = availablePartitions(index).partitionId
          sendPartitionPerTopicCache.put(topic, partitionId)
          partitionId
      }
    } else
    partitioner.partition(key, numPartitions)

  if(partition < 0 || partition >= numPartitions)
    throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
      "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
  trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))

  if (isSendSwitchBrokerEnabled) {
        if(partitionsLeaderInTopicsCache.isEmpty()) {
          val availablePartitions = topicPartitionList.map { partitionAndLeader =>
              if(partitionAndLeader.leaderBrokerIdOpt.isDefined) {
                partitionsLeaderInTopicsCache.put(TopicAndPartition(topic, partitionAndLeader.partitionId),
                    partitionAndLeader.leaderBrokerIdOpt.get)
              }
          }
        }

        if(!partitionsLeaderInTopicsCache.containsKey(TopicAndPartition(topic, partition))) {
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            partition = availablePartitions(index).partitionId
        }
  }
  partition
}

 

分享到:
评论

相关推荐

    kafka-2.12-3.3.2.tgz

    2. **消费者位移管理**:消费者位移现在默认存储在Kafka主题中,提高了消费者恢复速度和故障转移的可靠性。 3. **连接器(Connectors)**:Kafka Connect允许快速地建立到外部系统的连接,如数据库或文件系统,简化...

    kafka_2.13-3.2.1.tgz

    Kafka的高性能得益于其优化的设计,例如零拷贝(Zero-Copy)技术,减少了数据在内存和磁盘之间的传输次数,提高了消息处理速度。另外,Kafka还支持批处理(Batching),将多条消息打包发送,进一步提升了网络传输...

    最新版linux kafka_2.13-2.5.0.tgz

    生产者使用 `kafka-console-producer.sh` 发送消息,消费者使用 `kafka-console-consumer.sh` 接收消息。例如,启动一个生产者向 `my-topic` 发送消息: ```bash bin/kafka-console-producer.sh --broker-list ...

    kafka_2.13-3.2.1.zip

    1. **性能优化**:每次版本迭代,Kafka都会进行性能调优,提高消息处理速度和吞吐量。 2. **错误修复**:官方会修复已知的bug,增强系统的稳定性和可靠性。 3. **新功能**:可能引入新的API或配置选项,以满足更多...

    kafka-eagle-web-2.0.3-bin.tar.gz

    1. **运维监控**:对于大型企业来说,Kafka Eagle 2.0.3可以帮助运维人员实时监控Kafka集群的健康状况,确保数据传输的稳定性和可靠性。 2. **开发调试**:开发者可以借助其丰富的监控数据,进行问题定位和性能优化...

    kafka-php-master.zip

    3. 发送消息:使用`Kafka\Producer::send()`方法将消息发送到指定主题和分区。 4. 订阅主题:`Kafka\Consumer::subscribe()`方法订阅所需的主题,开始监听。 5. 消费消息:通过`Kafka\Consumer::consume()`方法获取...

    kafka_2.10-0.10.2.2

    Partition是Kafka保证消息顺序和并行处理的关键机制,每个分区内的消息会按照发送顺序被消费。 至于"apache-storm-1.2.2",它可能包含了稳定性增强、性能提升以及与其他组件(如Kafka)集成的改进。在部署Storm集群...

    Kafka自学文档 - 带书签

    Kafka通过使用Zookeeper来协调和同步不同broker之间的日志状态,保证了消息的持久性和可靠性。Zookeeper也负责存储和管理consumer的offset信息,以便消费者可以跟踪它们的消费进度。 总的来说,Kafka自学文档为学习...

    kafka_2.9.2-0.8.1

    3. **消费者API改进**:在这个版本中,Kafka 引入了新的消费者 API,即 Simple Consumer 被 High Level Consumer 所取代,后者引入了消费组的概念,提高了消息处理的可靠性和效率。 4. **故障恢复与复制**:Kafka ...

    kafka_2.11-2.4.0.tgz

    2. **调优**:根据实际负载调整配置,例如增加分区数量、优化网络设置、调整内存分配等,以提高 Kafka 的性能和稳定性。 总结,Kafka 2.4.0 作为一款强大的消息中间件,提供了高效、可靠的数据流处理能力,其新特性...

    Kafka从入门到源码分析原理课-视频教程网盘链接提取码下载 .txt

    - **Producer(生产者)**:向Kafka集群发送消息的应用程序。 - **Consumer(消费者)**:从Kafka集群读取消息的应用程序。 - **Topic(主题)**:逻辑上分类的发布-订阅系统,类似于邮件列表或新闻组。 - **Broker...

    kafka官方文档-中文

    为了提高系统的可用性和可靠性,Kafka 实现了数据的多副本存储。 #### 4.8 Log Compaction 日志压缩技术帮助减少磁盘空间占用,优化数据存储。 #### 4.9 Quotas 配额机制限制了客户端的资源使用,确保集群的稳定性...

    Kafka视频教程-从入门到实战轻松学Kafka系统教程(13讲)

    Kafka集群通常包括多个Broker节点,这些节点之间通过复制机制实现数据冗余,提高系统的可用性和可靠性。 **5.2 Kafka集群扩展** 随着数据量的增长,可以通过增加Broker节点的数量来水平扩展Kafka集群。同时,还...

    【BAT必备】kafka面试题

    - **可靠性**:消息一旦被写入磁盘就不会丢失。 - **可扩展性**:支持在线水平扩展。 ### Kafka关键知识点详解 #### 1. Kafka的基本概念 - **Broker**:Kafka集群的核心组件,负责消息的接收、存储和分发。 - *...

    kafka_2.13-2.8.1.tgz

    《Kafka:深入理解消息队列的核心机制》 Apache Kafka是一款高效、可扩展且持久化的分布式消息队列系统,广泛应用于大数据实时处理、日志收集、流式计算等多个领域。Kafka 2.13-2.8.1是其在Java 11环境下稳定版本的...

    kafka集群部署文档(部署,运维,FAQ)

    - **数据持久性**:Kafka通过高效的磁盘数据结构来实现消息的持久化存储,即使面对大量数据也能保持高性能。 - **高吞吐量**:即便是普通硬件配置下,Kafka也能支持每秒数百万条消息的处理能力。 - **灵活的消息分发...

    kafka-0.10.1.1-src

    在Kafka中,消息被持久化到磁盘,以实现高可靠性。每个主题被划分为多个Partition(分区),每个Partition是一个有序的消息队列,保证了消息的顺序性。Partition在多个Brokers之间分布,提供水平扩展能力。 3. **...

    kafka 2.10-0.9.0.1 源码

    **Kafka 2.10-0.9.0.1 源码解析与镜像制作** 在深入探讨 Kafka 的源码之前,先来理解 Kafka 是什么。...深入研究这些源码将帮助我们更好地理解和优化 Kafka 系统,为大数据实时处理和消息传递提供可靠的基础。

    kafka_2.10-0.10.0.0

    - **容错性**:通过副本和ISR(In-Sync Replica)机制确保数据的可靠性。 - **实时性**:Kafka支持实时数据流处理,消息一旦发布即可被消费。 - **API支持**:提供Java、Python、C++等多种语言的SDK,方便集成。 ...

Global site tag (gtag.js) - Google Analytics