Consumer的使用示例代码
//创建soncumer connector ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig()); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //设置topic和监控份数的映射 topicCountMap.put(topic, new Integer(1)); //创建kafkaStream,一个topic可以对应多个kafkaStream。kafkaStream的份数和上面配置的监控份数相同 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); //循环遍历消息 while(it.hasNext()) //处理消息 System.out.println(new String(it.next().message()));
创建ConsumerConnector
/** * Create a ConsumerConnector * * @param config at the minimum, need to specify the groupid of the consumer and the zookeeper * connection string zookeeper.connect. */ def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = { val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config) consumerConnect }
ZookeeperConsumerConnector调用consume方法
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") //获取topic consumer线程 val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList val dirs = new ZKGroupDirs(config.groupId) //注册consumer信息到zk registerConsumerInZK(dirs, consumerIdString, topicCount) //初始化consumer reinitializeConsumer(topicCount, queuesAndStreams) loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }
private def reinitializeConsumer[K,V]( topicCount: TopicCount, queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) { val dirs = new ZKGroupDirs(config.groupId) // listener to consumer and partition changes if (loadBalancerListener == null) { val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] loadBalancerListener = new ZKRebalancerListener( config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) } // create listener for session expired event if not exist yet if (sessionExpirationListener == null) sessionExpirationListener = new ZKSessionExpireListener( dirs, consumerIdString, topicCount, loadBalancerListener) // create listener for topic partition change event if not exist yet if (topicPartitionChangeListener == null) topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] = topicCount.getConsumerThreadIdsPerTopic val allQueuesAndStreams = topicCount match { case wildTopicCount: WildcardTopicCount => /* * Wild-card consumption streams share the same queues, so we need to * duplicate the list for the subsequent zip operation. */ (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList case statTopicCount: StaticTopicCount => queuesAndStreams } val topicThreadIds = consumerThreadIdsPerTopic.map { case(topic, threadIds) => threadIds.map((topic, _)) }.flatten //判断thread ids和queue stream的大小是否一样 require(topicThreadIds.size == allQueuesAndStreams.size, "Mismatch between thread ID count (%d) and queue count (%d)" .format(topicThreadIds.size, allQueuesAndStreams.size)) val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams) threadQueueStreamPairs.foreach(e => { val topicThreadId = e._1 val q = e._2._1 topicThreadIdAndQueues.put(topicThreadId, q) debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) newGauge( "FetchQueueSize", new Gauge[Int] { def value = q.size }, Map("clientId" -> config.clientId, "topic" -> topicThreadId._1, "threadId" -> topicThreadId._2.threadId.toString) ) }) val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) groupedByTopic.foreach(e => { val topic = e._1 val streams = e._2.map(_._2._2).toList topicStreamsMap += (topic -> streams) debug("adding topic %s and %d streams to map.".format(topic, streams.size)) }) // listener to consumer and partition changes zkClient.subscribeStateChanges(sessionExpirationListener) zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) } // explicitly trigger load balancing for this consumer loadBalancerListener.syncedRebalance() }
def syncedRebalance() { rebalanceLock synchronized { rebalanceTimer.time { if(isShuttingDown.get()) { return } else { for (i <- 0 until config.rebalanceMaxRetries) { info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false var cluster: Cluster = null try { cluster = getCluster(zkClient) done = rebalance(cluster) } catch { case e: Throwable => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. **/ info("exception during rebalance ", e) } info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) { return } else { /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should * clear the cache */ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") } // stop all fetchers and clear all the queues to avoid data duplication closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) Thread.sleep(config.rebalanceBackoffMs) } } } } throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries") }
private def rebalance(cluster: Cluster): Boolean = { val myTopicThreadIdsMap = TopicCount.constructTopicCount( group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic val brokers = getAllBrokersInCluster(zkClient) if (brokers.size == 0) { // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers // are up. warn("no brokers found when trying to rebalance.") zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) true } else { /** * fetchers must be stopped to avoid data duplication, since if the current * rebalancing attempt fails, the partitions that are released could be owned by another consumer. * But if we don't stop the fetchers first, this consumer would continue returning data for released * partitions in parallel. So, not stopping the fetchers leads to duplicate data. */ closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) releasePartitionOwnership(topicRegistry) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) //分配partition到工作线程 val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) // fetch current offsets for all topic-partitions val topicPartitions = partitionOwnershipDecision.keySet.toSeq val offsetFetchResponseOpt = fetchOffsets(topicPartitions) if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) false else { val offsetFetchResponse = offsetFetchResponseOpt.get topicPartitions.foreach(topicAndPartition => { val (topic, partition) = topicAndPartition.asTuple val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset val threadId = partitionOwnershipDecision(topicAndPartition) addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) }) /** * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt * A rebalancing attempt is completed successfully only after the fetchers have been started correctly */ if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } .foreach { case (topic, partitionThreadPairs) => newGauge("OwnedPartitionsCount", new Gauge[Int] { def value() = partitionThreadPairs.size }, ownedPartitionsCountMetricTags(topic)) } topicRegistry = currentTopicRegistry updateFetcher(cluster) true } else { false } } } }
RangeAssignor分配partition到工作线程
def assign(ctx: AssignmentContext) = { val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers) for (consumerThreadId <- consumerThreadIdSet) { val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) /** * Range-partition the sorted partitions to consumers for better locality. * The first few consumers pick up an extra partition, if any. */ if (nParts <= 0) warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) else { for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) // record the partition ownership decision partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) } } } }
当计算好当前consumer应该需要处理的partition之后,调用updateFetcher函数更新fetcher线程
private def updateFetcher(cluster: Cluster) { // update partitions for fetcher var allPartitionInfos : List[PartitionTopicInfo] = Nil for (partitionInfos <- topicRegistry.values) for (partition <- partitionInfos.values) allPartitionInfos ::= partition info("Consumer " + consumerIdString + " selected partitions : " + allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) fetcher match { case Some(f) => f.startConnections(allPartitionInfos, cluster) case None => } }
再startConnections方法中会调用addFetcherForPartitions方法,用于启动fetcher线程
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { var fetcherThread: AbstractFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { case Some(f) => fetcherThread = f case None => fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) fetcherThread.start } fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => topicAndPartition -> brokerAndInitOffset.initOffset }) } } info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) }
然后我们看ConsumerFetcherThread中的processPartitionData方法,这个方法中将获得的数据插入到queue里面。后面stream就可以处理相应的数据了
// process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { val pti = partitionMap(topicAndPartition) if (pti.getFetchOffset != fetchOffset) throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) }
相关推荐
在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...
1. **创建Consumer实例**:首先,我们需要配置一个`Properties`对象,设置必要的参数如bootstrap servers、group id等,然后使用`KafkaConsumer`类的构造函数创建消费者实例。 2. **订阅主题**:消费者通过调用`...
《Go-consumergroup:构建基于Golang的Kafka消费者库》 在现代软件开发中,消息队列系统如Apache Kafka扮演着至关重要的角色,它提供了高效、可靠的异步通信能力。而Go语言以其简洁的语法和高性能特性,成为了编写...
KafkaConsumerDemo.java
Kafka是一个发布/订阅模型的消息队列,它包含生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责发布消息到主题,而消费者则订阅这些主题并消费消息。消费者通过消费者组(Consumer Group)进行...
KafkaConsumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, String> records = consumer.poll(Duration.ofMillis(100)); ...
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} import java.util.Properties val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...
项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试15.项目1-地区销售额-Bolt业务逻辑处理一16.项目1-地区销售额-优化Bolt支持重启及结果数据核查17.项目1-地区销售额-HighCharts图表开发一及Web端架构设计18....
- KafkaConsumer:是创建消费者实例的静态接口。 - Consumer:是消费者结构,处理消费相关的逻辑。 - KafkaConsumerImpl:是消费者实现的具体封装。 librdkafka还提供了消费者回调机制,如ConsumeCb和EventCb,它们...
很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播)。因此,KafkaHightL
【标题】"phpkafkaconsumer"是一个专门为PHP设计的Kafka消费者库,它不仅提供了基本的Kafka消息消费功能,还特别支持了消费者组(Consumer Group)和再平衡(Rebalance)机制。Kafka是一种分布式流处理平台,常用于...
5. **运行与测试**:启动Spring Boot应用,当`kafka.consumer.enabled`设置为`true`时,消费者将开始监听指定的Kafka主题。你可以通过发送消息到该主题来测试消费者的运行情况。 以上就是基于Spring Boot和Spring ...
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看...
【标题】"kafka_hdfs_consumer"涉及到的关键技术是将数据从Kafka消费并存储到HDFS(Hadoop Distributed File System)中。这个过程通常在大数据处理和流处理场景下非常常见,它允许实时或近实时的数据从消息队列流向...
在本文中,我们将深入探讨如何在Spring Boot应用中使用KafkaConsumer来消费Apache Kafka主题中的消息。Apache Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。Spring Boot简化了Kafka消费者的...
KafkaConsumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("myTopic")); // 订阅主题 while (true) { ConsumerRecords, String> records = consumer.poll...
**Kafka插件详解** Kafka插件是Apache Kafka与各种工具集成的重要组成部分,它使得开发者和运维人员能够更方便地在不同的系统中利用Kafka的功能。Kafka是一款分布式流处理平台,常用于构建实时数据管道和流应用,...
Kafka Consumer将avro记录插入mysql git clone https://github.com/cahuja1992/Kafka-MySQL-Avro.git python kafka-mysql-avro/setup.py install #!/usr/bin/env python from divoltemysql.kafkamysql import ...
在IT行业中,网络通信和大数据处理是两个至关重要的领域,Netty和Kafka分别是这两个领域的佼佼者。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka...