`
1028826685
  • 浏览: 943737 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

Kafka Consumer机制优化-保证每条消息至少消费一次

 
阅读更多

背景

Kafka中由Consumer维护消费状态,当Consumer消费消息时,支持2种模式commit消费状态,分别为立即commit和周期commit。前者会导致性能低下,做到消息投递恰好一次,但很少使用,后者性能高,通常用于实际应用,但极端条件下无法保证消息不丢失。

目标

在有效期内,保证每条消息至少可被消费一次

问题分析

这里写图片描述
请看如上图1,Consumer Thread读取一条消息,更新缓存消费状态,传入消息执行业务逻辑,同时有另外一个调度线程异步周期执行,从缓存中读取消费状态信息,持久化消费状态。假设Consumer Thread更新了缓存消费状态,Scheduler Thread在“执行业务逻辑”完成前就持久化消费状态,正在此时,Consumer失效或宕机了,这条消息就丢失了。

解决思路

这里写图片描述
等待“执行业务逻辑”成功完成后更新缓存消费状态,就可以保证消息不会丢失。 
具体做法:新增一个消费机制策略开关,此开关启动执行图2策略,关闭启动执行图1策略

实现代码

package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
import kafka.message.{MessageAndOffset, MessageAndMetadata}
import kafka.common.{KafkaException, MessageSizeTooLargeException}

class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
                             consumerTimeoutMs: Int,
                             private val keyDecoder: Decoder[K],
                             private val valueDecoder: Decoder[V],
                             val clientId: String, val consumerAtLeastOnceMessageEnabled: Boolean)
  extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
  private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
  private var currentTopicInfo: PartitionTopicInfo = null
  private var consumedOffset: Long = -1L
  private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
  override def next(): MessageAndMetadata[K, V] = {
    val item = super.next()
    if(consumedOffset < 0)
      throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
    if (consumerAtLeastOnceMessageEnabled)
      currentTopicInfo.resetConsumeOffset(consumedOffset)
    val topic = currentTopicInfo.topic
    trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
    consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
    consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
    item
  }
  protected def makeNext(): MessageAndMetadata[K, V] = {
    var currentDataChunk: FetchedDataChunk = null
    // if we don't have an iterator, get one
    var localCurrent = current.get()
    if(localCurrent == null || !localCurrent.hasNext) {
      if (consumerTimeoutMs < 0)
        currentDataChunk = channel.take
      else {
        currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
        if (currentDataChunk == null) {
          // reset state to make the iterator re-iterable
          resetState()
          throw new ConsumerTimeoutException
        }
      }
      if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
        debug("Received the shutdown command")
        return allDone
      } else {
        currentTopicInfo = currentDataChunk.topicInfo
        val cdcFetchOffset = currentDataChunk.fetchOffset
        val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
        if (ctiConsumeOffset < cdcFetchOffset) {
          error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
            .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
          currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
        }
        localCurrent = currentDataChunk.messages.iterator
        current.set(localCurrent)
      }
      // if we just updated the current chunk and it is empty that means the fetch size is too small!
      if(currentDataChunk.messages.validBytes == 0)
        throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
                                               "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
                                               .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
    }
    var item = localCurrent.next()
    // reject the messages that have already been consumed
    while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
      item = localCurrent.next()
    }
    consumedOffset = item.nextOffset
    item.message.ensureValid() // validate checksum of message to ensure it is valid
    new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
  }
  def clearCurrentChunk() {
    try {
      debug("Clearing the current data chunk for this consumer iterator")
      current.set(null)
    }
  }

  def resetConsumeOffset() {
    if (!consumerAtLeastOnceMessageEnabled)
      currentTopicInfo.resetConsumeOffset(consumedOffset)
  }
}
class ConsumerTimeoutException() extends RuntimeException()

 

分享到:
评论

相关推荐

    kafka深度分析

    - **At Least Once**:至少一次送达保证,即消息至少会被消费一次,但可能会多次消费同一消息。 - **Exactly Once**:恰好一次送达保证,确保消息恰好被消费一次,不重复也不遗漏。 - **At Most Once**:至多一次...

    kafka集群部署文档(部署,运维,FAQ)

    - **高吞吐量**:即便是普通硬件配置下,Kafka也能支持每秒数百万条消息的处理能力。 - **灵活的消息分发**:Kafka支持消息的分区,通过Kafka服务器和消费者集群来高效地分发消息。 #### 二、Kafka架构组件 Kafka...

    Kafka_API_文档

    Kafka提供了至少一次交付和精确一次交付两种语义。 **6.6 复制** - **不纯洁的Leader选举**: 如果所有副本都失效,则选举过程可能会出现问题。 - **可用性和持久性**: 复制机制确保即使某个节点失败,数据仍然可用...

    Kafka入门、介绍、使用及部署

    - **Consumer Group(消费组)**:多个Consumer可以组成一个Group,这样同一组内的多个Consumer可以同时消费同一个Topic的消息,但每条消息只会被分配给该组中的一个Consumer。 #### 二、Kafka的优缺点 ##### 优点...

    Kafka面试题,面试知识

    - **精确一次(Exactly once)**: 每条消息恰好被传输一次,这是Kafka的目标,但需要特定的配置和使用模式来实现。 3. **节点健康判断** - **ZooKeeper连接**: 节点通过心跳机制与ZooKeeper保持连接以确认其存活...

    rabbitmq面试题.pdf

    如何根据业务需求选择正确的消息传递语义(例如,至少一次、最多一次、恰好一次)?** - **至少一次(At Least Once):** 适用于对数据完整性要求高的场景。 - **最多一次(At Most Once):** 可能丢失部分数据...

    海量日志处理开源系统比较.docx

    (5) Consumer Group:消费者组内的消费者可以并行消费一个主题的不同分区,保证每个消息只被消费一次(至少一次)。 3. Kafka与Scribe和Chukwa的对比 - 相比于Scribe,Kafka更注重消息的高吞吐量和低延迟,更适合...

Global site tag (gtag.js) - Google Analytics