`

Kafka的consumer

 
阅读更多

Consumer的使用示例代码

//创建soncumer connector
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig());
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
	//设置topic和监控份数的映射
    topicCountMap.put(topic, new Integer(1));
	//创建kafkaStream,一个topic可以对应多个kafkaStream。kafkaStream的份数和上面配置的监控份数相同
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
	//循环遍历消息
    while(it.hasNext())
	  //处理消息
      System.out.println(new String(it.next().message()));

   创建ConsumerConnector

 /**
   *  Create a ConsumerConnector
   *
   *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
   *                 connection string zookeeper.connect.
   */
  def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
    val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
    consumerConnect
  }

   ZookeeperConsumerConnector调用consume方法

  def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
      : Map[String,List[KafkaStream[K,V]]] = {
    debug("entering consume ")
    if (topicCountMap == null)
      throw new RuntimeException("topicCountMap is null")
    //获取topic consumer线程
    val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
    val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

    // make a list of (queue,stream) pairs, one pair for each threadId
    val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
      threadIdSet.map(_ => {
        val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
        val stream = new KafkaStream[K,V](
          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
        (queue, stream)
      })
    ).flatten.toList

    val dirs = new ZKGroupDirs(config.groupId)
    //注册consumer信息到zk
    registerConsumerInZK(dirs, consumerIdString, topicCount)
    //初始化consumer
    reinitializeConsumer(topicCount, queuesAndStreams)

    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
  }

 

 private def reinitializeConsumer[K,V](
      topicCount: TopicCount,
      queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {

    val dirs = new ZKGroupDirs(config.groupId)

    // listener to consumer and partition changes
    if (loadBalancerListener == null) {
      val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
      loadBalancerListener = new ZKRebalancerListener(
        config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
    }

    // create listener for session expired event if not exist yet
    if (sessionExpirationListener == null)
      sessionExpirationListener = new ZKSessionExpireListener(
        dirs, consumerIdString, topicCount, loadBalancerListener)

    // create listener for topic partition change event if not exist yet
    if (topicPartitionChangeListener == null)
      topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)

    val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams

    // map of {topic -> Set(thread-1, thread-2, ...)}
    val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
      topicCount.getConsumerThreadIdsPerTopic

    val allQueuesAndStreams = topicCount match {
      case wildTopicCount: WildcardTopicCount =>
        /*
         * Wild-card consumption streams share the same queues, so we need to
         * duplicate the list for the subsequent zip operation.
         */
        (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
      case statTopicCount: StaticTopicCount =>
        queuesAndStreams
    }

    val topicThreadIds = consumerThreadIdsPerTopic.map {
      case(topic, threadIds) =>
        threadIds.map((topic, _))
    }.flatten
    //判断thread ids和queue stream的大小是否一样
    require(topicThreadIds.size == allQueuesAndStreams.size,
      "Mismatch between thread ID count (%d) and queue count (%d)"
      .format(topicThreadIds.size, allQueuesAndStreams.size))
    val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams)

    threadQueueStreamPairs.foreach(e => {
      val topicThreadId = e._1
      val q = e._2._1
      topicThreadIdAndQueues.put(topicThreadId, q)
      debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
      newGauge(
        "FetchQueueSize",
        new Gauge[Int] {
          def value = q.size
        },
        Map("clientId" -> config.clientId,
          "topic" -> topicThreadId._1,
          "threadId" -> topicThreadId._2.threadId.toString)
      )
    })

    val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
    groupedByTopic.foreach(e => {
      val topic = e._1
      val streams = e._2.map(_._2._2).toList
      topicStreamsMap += (topic -> streams)
      debug("adding topic %s and %d streams to map.".format(topic, streams.size))
    })

    // listener to consumer and partition changes
    zkClient.subscribeStateChanges(sessionExpirationListener)

    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)

    topicStreamsMap.foreach { topicAndStreams =>
      // register on broker partition path changes
      val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
    }

    // explicitly trigger load balancing for this consumer
    loadBalancerListener.syncedRebalance()
  }

 

   

  def syncedRebalance() {
      rebalanceLock synchronized {
        rebalanceTimer.time {
          if(isShuttingDown.get())  {
            return
          } else {
            for (i <- 0 until config.rebalanceMaxRetries) {
              info("begin rebalancing consumer " + consumerIdString + " try #" + i)
              var done = false
              var cluster: Cluster = null
              try {
                cluster = getCluster(zkClient)
                done = rebalance(cluster)
              } catch {
                case e: Throwable =>
                  /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
                    * For example, a ZK node can disappear between the time we get all children and the time we try to get
                    * the value of a child. Just let this go since another rebalance will be triggered.
                    **/
                  info("exception during rebalance ", e)
              }
              info("end rebalancing consumer " + consumerIdString + " try #" + i)
              if (done) {
                return
              } else {
                /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
                 * clear the cache */
                info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
              }
              // stop all fetchers and clear all the queues to avoid data duplication
              closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
              Thread.sleep(config.rebalanceBackoffMs)
            }
          }
        }
      }

      throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
    }

 

   private def rebalance(cluster: Cluster): Boolean = {
      val myTopicThreadIdsMap = TopicCount.constructTopicCount(
        group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
      val brokers = getAllBrokersInCluster(zkClient)
      if (brokers.size == 0) {
        // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
        // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
        // are up.
        warn("no brokers found when trying to rebalance.")
        zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
        true
      }
      else {
        /**
         * fetchers must be stopped to avoid data duplication, since if the current
         * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
         * But if we don't stop the fetchers first, this consumer would continue returning data for released
         * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
         */
        closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)

        releasePartitionOwnership(topicRegistry)

        val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
        //分配partition到工作线程
        val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
        val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
          valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))

        // fetch current offsets for all topic-partitions
        val topicPartitions = partitionOwnershipDecision.keySet.toSeq

        val offsetFetchResponseOpt = fetchOffsets(topicPartitions)

        if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
          false
        else {
          val offsetFetchResponse = offsetFetchResponseOpt.get
          topicPartitions.foreach(topicAndPartition => {
            val (topic, partition) = topicAndPartition.asTuple
            val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
            val threadId = partitionOwnershipDecision(topicAndPartition)
            addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
          })

          /**
           * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
           * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
           */
          if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
            allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size

            partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
                                      .foreach { case (topic, partitionThreadPairs) =>
              newGauge("OwnedPartitionsCount",
                new Gauge[Int] {
                  def value() = partitionThreadPairs.size
                },
                ownedPartitionsCountMetricTags(topic))
            }

            topicRegistry = currentTopicRegistry
            updateFetcher(cluster)
            true
          } else {
            false
          }
        }
      }
    }

    RangeAssignor分配partition到工作线程

def assign(ctx: AssignmentContext) = {
    val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()

    for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) {
      val curConsumers = ctx.consumersForTopic(topic)
      val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)

      val nPartsPerConsumer = curPartitions.size / curConsumers.size
      val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

      info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
        " for topic " + topic + " with consumers: " + curConsumers)

      for (consumerThreadId <- consumerThreadIdSet) {
        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
        assert(myConsumerPosition >= 0)
        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

        /**
         *   Range-partition the sorted partitions to consumers for better locality.
         *  The first few consumers pick up an extra partition, if any.
         */
        if (nParts <= 0)
          warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
        else {
          for (i <- startPart until startPart + nParts) {
            val partition = curPartitions(i)
            info(consumerThreadId + " attempting to claim partition " + partition)
            // record the partition ownership decision
            partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
          }
        }
      }
    }

    当计算好当前consumer应该需要处理的partition之后,调用updateFetcher函数更新fetcher线程

   private def updateFetcher(cluster: Cluster) {
      // update partitions for fetcher
      var allPartitionInfos : List[PartitionTopicInfo] = Nil
      for (partitionInfos <- topicRegistry.values)
        for (partition <- partitionInfos.values)
          allPartitionInfos ::= partition
      info("Consumer " + consumerIdString + " selected partitions : " +
        allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))

      fetcher match {
        case Some(f) =>
          f.startConnections(allPartitionInfos, cluster)
        case None =>
      }
    }

   再startConnections方法中会调用addFetcherForPartitions方法,用于启动fetcher线程

 

 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
    mapLock synchronized {
      val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
        BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
      for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
        var fetcherThread: AbstractFetcherThread = null
        fetcherThreadMap.get(brokerAndFetcherId) match {
          case Some(f) => fetcherThread = f
          case None =>
            fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
            fetcherThread.start
        }

        fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
          topicAndPartition -> brokerAndInitOffset.initOffset
        })
      }
    }

    info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
      "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
  }

 然后我们看ConsumerFetcherThread中的processPartitionData方法,这个方法中将获得的数据插入到queue里面。后面stream就可以处理相应的数据了

  // process fetched data
  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
    val pti = partitionMap(topicAndPartition)
    if (pti.getFetchOffset != fetchOffset)
      throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
                                .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
    pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
  }

 

分享到:
评论

相关推荐

    RdKafka::KafkaConsumer使用实例

    在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...

    kafkaConsumerDemo.zip

    1. **创建Consumer实例**:首先,我们需要配置一个`Properties`对象,设置必要的参数如bootstrap servers、group id等,然后使用`KafkaConsumer`类的构造函数创建消费者实例。 2. **订阅主题**:消费者通过调用`...

    Go-Go-consumergroup采用golang编写的kafkaconsumer库

    《Go-consumergroup:构建基于Golang的Kafka消费者库》 在现代软件开发中,消息队列系统如Apache Kafka扮演着至关重要的角色,它提供了高效、可靠的异步通信能力。而Go语言以其简洁的语法和高性能特性,成为了编写...

    KafkaConsumerDemo.java

    KafkaConsumerDemo.java

    kafka demo ,两种线程消费方式

    Kafka是一个发布/订阅模型的消息队列,它包含生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责发布消息到主题,而消费者则订阅这些主题并消费消息。消费者通过消费者组(Consumer Group)进行...

    kafka java 下载的jar

    KafkaConsumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, String&gt; records = consumer.poll(Duration.ofMillis(100)); ...

    kafkaconsumer

    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} import java.util.Properties val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...

    1号店电商实时数据分析系统-14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试.pptx

    项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试15.项目1-地区销售额-Bolt业务逻辑处理一16.项目1-地区销售额-优化Bolt支持重启及结果数据核查17.项目1-地区销售额-HighCharts图表开发一及Web端架构设计18....

    Kafka C++客户端库librdkafka笔记

    - KafkaConsumer:是创建消费者实例的静态接口。 - Consumer:是消费者结构,处理消费相关的逻辑。 - KafkaConsumerImpl:是消费者实现的具体封装。 librdkafka还提供了消费者回调机制,如ConsumeCb和EventCb,它们...

    Kafka设计解析(五)-KafkaConsumer设计解析

    很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播)。因此,KafkaHightL

    phpkafkaconsumer是一个kafkaconsumer库支持group和rebalance

    【标题】"phpkafkaconsumer"是一个专门为PHP设计的Kafka消费者库,它不仅提供了基本的Kafka消息消费功能,还特别支持了消费者组(Consumer Group)和再平衡(Rebalance)机制。Kafka是一种分布式流处理平台,常用于...

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看...

    kafka 插件kafka 插件kafka 插件

    **Kafka插件详解** Kafka插件是Apache Kafka与各种工具集成的重要组成部分,它使得开发者和运维人员能够更方便地在不同的系统中利用Kafka的功能。Kafka是一款分布式流处理平台,常用于构建实时数据管道和流应用,...

    springboot 基于spring-kafka动态创建kafka消费者

    5. **运行与测试**:启动Spring Boot应用,当`kafka.consumer.enabled`设置为`true`时,消费者将开始监听指定的Kafka主题。你可以通过发送消息到该主题来测试消费者的运行情况。 以上就是基于Spring Boot和Spring ...

    kafka_hdfs_consumer

    【标题】"kafka_hdfs_consumer"涉及到的关键技术是将数据从Kafka消费并存储到HDFS(Hadoop Distributed File System)中。这个过程通常在大数据处理和流处理场景下非常常见,它允许实时或近实时的数据从消息队列流向...

    KafkaConsumer:Spring Boot Kafka消费者

    在本文中,我们将深入探讨如何在Spring Boot应用中使用KafkaConsumer来消费Apache Kafka主题中的消息。Apache Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。Spring Boot简化了Kafka消费者的...

    kafka-consumer:Kafka消费者示例

    KafkaConsumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("myTopic")); // 订阅主题 while (true) { ConsumerRecords, String&gt; records = consumer.poll...

    Kafka-MySQL-Avro:Kafka Consumer将avro记录插入mysql

    Kafka Consumer将avro记录插入mysql git clone https://github.com/cahuja1992/Kafka-MySQL-Avro.git python kafka-mysql-avro/setup.py install #!/usr/bin/env python from divoltemysql.kafkamysql import ...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    在IT行业中,网络通信和大数据处理是两个至关重要的领域,Netty和Kafka分别是这两个领域的佼佼者。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka...

Global site tag (gtag.js) - Google Analytics