`
1028826685
  • 浏览: 936736 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

Kafka Consumer机制优化-保证每条消息至少消费一次

 
阅读更多

背景

Kafka中由Consumer维护消费状态,当Consumer消费消息时,支持2种模式commit消费状态,分别为立即commit和周期commit。前者会导致性能低下,做到消息投递恰好一次,但很少使用,后者性能高,通常用于实际应用,但极端条件下无法保证消息不丢失。

目标

在有效期内,保证每条消息至少可被消费一次

问题分析

这里写图片描述
请看如上图1,Consumer Thread读取一条消息,更新缓存消费状态,传入消息执行业务逻辑,同时有另外一个调度线程异步周期执行,从缓存中读取消费状态信息,持久化消费状态。假设Consumer Thread更新了缓存消费状态,Scheduler Thread在“执行业务逻辑”完成前就持久化消费状态,正在此时,Consumer失效或宕机了,这条消息就丢失了。

解决思路

这里写图片描述
等待“执行业务逻辑”成功完成后更新缓存消费状态,就可以保证消息不会丢失。 
具体做法:新增一个消费机制策略开关,此开关启动执行图2策略,关闭启动执行图1策略

实现代码

package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
import kafka.message.{MessageAndOffset, MessageAndMetadata}
import kafka.common.{KafkaException, MessageSizeTooLargeException}

class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
                             consumerTimeoutMs: Int,
                             private val keyDecoder: Decoder[K],
                             private val valueDecoder: Decoder[V],
                             val clientId: String, val consumerAtLeastOnceMessageEnabled: Boolean)
  extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
  private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
  private var currentTopicInfo: PartitionTopicInfo = null
  private var consumedOffset: Long = -1L
  private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
  override def next(): MessageAndMetadata[K, V] = {
    val item = super.next()
    if(consumedOffset < 0)
      throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
    if (consumerAtLeastOnceMessageEnabled)
      currentTopicInfo.resetConsumeOffset(consumedOffset)
    val topic = currentTopicInfo.topic
    trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
    consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
    consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
    item
  }
  protected def makeNext(): MessageAndMetadata[K, V] = {
    var currentDataChunk: FetchedDataChunk = null
    // if we don't have an iterator, get one
    var localCurrent = current.get()
    if(localCurrent == null || !localCurrent.hasNext) {
      if (consumerTimeoutMs < 0)
        currentDataChunk = channel.take
      else {
        currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
        if (currentDataChunk == null) {
          // reset state to make the iterator re-iterable
          resetState()
          throw new ConsumerTimeoutException
        }
      }
      if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
        debug("Received the shutdown command")
        return allDone
      } else {
        currentTopicInfo = currentDataChunk.topicInfo
        val cdcFetchOffset = currentDataChunk.fetchOffset
        val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
        if (ctiConsumeOffset < cdcFetchOffset) {
          error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
            .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
          currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
        }
        localCurrent = currentDataChunk.messages.iterator
        current.set(localCurrent)
      }
      // if we just updated the current chunk and it is empty that means the fetch size is too small!
      if(currentDataChunk.messages.validBytes == 0)
        throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
                                               "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
                                               .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
    }
    var item = localCurrent.next()
    // reject the messages that have already been consumed
    while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
      item = localCurrent.next()
    }
    consumedOffset = item.nextOffset
    item.message.ensureValid() // validate checksum of message to ensure it is valid
    new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
  }
  def clearCurrentChunk() {
    try {
      debug("Clearing the current data chunk for this consumer iterator")
      current.set(null)
    }
  }

  def resetConsumeOffset() {
    if (!consumerAtLeastOnceMessageEnabled)
      currentTopicInfo.resetConsumeOffset(consumedOffset)
  }
}
class ConsumerTimeoutException() extends RuntimeException()

 

分享到:
评论

相关推荐

    Kafka Producer机制优化-提高发送消息可靠性

    ### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...

    最新版kafka kafka_2.12-2.5.1.tgz

    - 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning` 7. **最佳实践** - 为了保证高可用性,推荐至少配置 3 个 broker。 - 合理设置分区数和...

    kafka-2.12-3.6.1.tgz

    然后在命令行中输入消息,每按一次回车就发送一条消息。 八、消费消息 同时,可以在另一个终端窗口启动消费者来查看发送的消息: ```bash /usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap...

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...

    kafka-clients-2.4.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    5、kafka监控工具Kafka-Eagle介绍及使用

    3. **性能优化**:通过实时展示各种性能指标,Kafka-Eagle 帮助管理员识别潜在的性能瓶颈,进行相应的调整和优化。 4. **报警机制**:当监控指标超过预设阈值时,Kafka-Eagle 可以触发报警,及时通知管理员处理问题...

    kafka-eagle-bin-2.1.0.tar.gz

    4. 启动:执行`bin/kafka-eagle-start.sh`启动服务,使用`bin/kafka-eagle-stop.sh`停止服务。 5. 访问:在Web浏览器中输入`http://服务器IP:端口`,按照提示进行登录和操作。 ### 使用注意事项 1. 确保运行环境已...

    kafka-java-demo 基于java的kafka生产消费者例子

    在本文中,我们将深入探讨基于Java的Kafka生产者与消费者的实现,这主要围绕着"Kafka-java-demo"项目展开。Kafka是一个分布式流处理平台,由LinkedIn开发并开源,现在是Apache软件基金会的一部分。它被广泛用于实时...

    kafka-python-2.0.2.tar.gz

    在`kafka-python`中,可以通过创建一个`KafkaProducer`实例并调用其`send()`方法来发送消息: ```python from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') ...

    配置好的kafka_2.12-2.8.0 + SCRAM-SHA-256

    在版本2.8.0中,它提供了一种安全的身份验证机制——SCRAM-SHA-256,这是用于用户认证的一种安全方法。在本文中,我们将深入探讨Kafka的配置、SCRAM-SHA-256的工作原理以及如何在Kafka集群中设置和使用它。 ### ...

    kafka_2.12-2.4.1.zip

    - 使用`./kafka-console-producer.sh --broker-list localhost:9092 --topic test`和`./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning`进行消息的生产和消费,操作...

    Apache Kafka 3.0.0 (kafka-3.0.0-src.tgz)

    这个版本的Kafka源代码(kafka-3.0.0-src.tgz)是开发人员深入了解和定制Kafka内部机制的重要资源。 Kafka的设计理念是构建一个可扩展且容错的系统,它允许数据以发布/订阅的方式在生产者和消费者之间流动。Kafka...

    kafka-clients-2.4.1-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    最新版kafka kafka_2.13-2.6.0.tgz

    2. **高吞吐量**:Kafka设计时考虑了高性能,能够处理每秒数十万条消息,这使其在大数据实时处理场景中表现出色。 3. **持久化与容错**:Kafka将消息存储在磁盘上,并且支持多副本,以确保数据的持久性和容错性。...

    pentaho-kafka-consumer.zip

    在标题"pentaho-kafka-consumer.zip"中,我们看到的是一个专门为Pentaho Kettle定制的Kafka消费者插件的压缩包。 这个压缩包的描述提到了如何在Pentaho环境中安装和使用这个插件。首先,你需要在你的Pentaho Kettle...

    kafka-clients-2.2.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...

    Kafka集群及Kafka-Manager安装部署.docx

    - 发送和消费消息,使用Kafka自带的工具如`kafka-console-producer.sh`和`kafka-console-consumer.sh`进行测试。 #### 二、Kafka-Manager的安装与配置 **1. 安装部署** - 解压Kafka-Manager安装包`tar -zvxf ...

    kafka-eagle-bin-2.0.5.tar.gz

    - **配置文件**:修改conf/kafka-eagle-site.xml,配置Kafka集群地址、Zookeeper地址以及其它相关参数。 - **启动服务**:执行bin/kafka-eagle-start.sh脚本启动服务,然后在浏览器中输入http://服务器IP:8080...

    kafka安装包-2.13-3.6.2

    Kafka支持两种消费模式:单消费者模式(每个分区只能有一个消费者)和消费者组模式(多个消费者可以组成一个组,共享主题的分区)。 5. **Partitions**: 主题被分成多个分区,分区内的消息按照顺序存储,且每个分区...

    kafka-schema-registry-client-3.2.0.jar

    kafka-schema-registry-client-3.2.0.jar包,亲测可用,在aliyun仓库内找不到,可以下载此jar包来进行手动安装

Global site tag (gtag.js) - Google Analytics