本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和负载逻辑都在Producer中维护。
一、Kafka的总体结构图
(图片转发)
二、Producer源码分析
class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // only for unit testing extends Logging { private val hasShutdown = new AtomicBoolean(false) //异步发送队列 private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages) private var sync: Boolean = true //异步处理线程 private var producerSendThread: ProducerSendThread[K,V] = null private val lock = new Object() //根据从配置文件中载入的信息封装成ProducerConfig类 //判断发送类型是同步,还是异步,如果是异步则启动一个异步处理线程 config.producerType match { case "sync" => case "async" => sync = false producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId, queue, ventHandler, config.queueBufferingMaxMs, config.batchNumMessages, config.clientId) producerSendThread.start() } private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) KafkaMetricsReporter.startReporters(config.props) AppInfo.registerInfo() def this(config: ProducerConfig) = this(config, new DefaultEventHandler[K,V](config, Utils.createObject[Partitioner](config.partitionerClass, config.props), Utils.createObject[Encoder[V]](config.serializerClass, config.props), Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), new ProducerPool(config))) /** * Sends the data, partitioned by key to the topic using either the * synchronous or the asynchronous producer * @param messages the producer data object that encapsulates the topic, key and message data */ def send(messages: KeyedMessage[K,V]*) { lock synchronized { if (hasShutdown.get) throw new ProducerClosedException recordStats(messages) sync match { case true => eventHandler.handle(messages) case false => asyncSend(messages) } } } private def recordStats(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark() producerTopicStats.getProducerAllTopicsStats.messageRate.mark() } } //异步发送流程 //将messages异步放到queue里面,等待异步线程获取 private def asyncSend(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { val added = config.queueEnqueueTimeoutMs match { case 0 => queue.offer(message) case _ => try { config.queueEnqueueTimeoutMs < 0 match { case true => queue.put(message) true case _ => queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS) } } catch { case e: InterruptedException => false } } if(!added) { producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark() producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark() throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) }else { trace("Added to send queue an event: " + message.toString) trace("Remaining queue size: " + queue.remainingCapacity) } } } /** * Close API to close the producer pool connections to all Kafka brokers. Also closes * the zookeeper client connection if one exists */ def close() = { lock synchronized { val canShutdown = hasShutdown.compareAndSet(false, true) if(canShutdown) { info("Shutting down producer") val startTime = System.nanoTime() KafkaMetricsGroup.removeAllProducerMetrics(config.clientId) if (producerSendThread != null) producerSendThread.shutdown eventHandler.close info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") } } } }
说明:
上面这段代码很多方法我加了中文注释,首先要初始化一系列参数,比如异步消息队列queue,是否是同步sync,异步同步数据线程ProducerSendThread,其实重点就是ProducerSendThread这个类,从队列中取出数据并让kafka.producer.EventHandler
将消息发送到broker。这个代码量不多,但是包含了很多内容,通过config.producerType判断是同步发送还是异步发送,每一种发送方式都有相关类支持,下面我们将重点介绍这二种类型。
我们发送消息的类是如下格式:
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V)
说明:
当使用三个参数的构造函数时, partKey会等于key。partKey是用来做partition的,但它不会最当成消息的一部分被存储。
1、同步发送
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = { //分区并且整理方法 val partitionedDataOpt = partitionAndCollate(messages) partitionedDataOpt match { case Some(partitionedData) => val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] try { for ((brokerid, messagesPerBrokerMap) <- partitionedData) { if (logger.isTraceEnabled) messagesPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap) val failedTopicPartitions = send(brokerid, messageSetPerBroker) failedTopicPartitions.foreach(topicPartition => { messagesPerBrokerMap.get(topicPartition) match { case Some(data) => failedProduceRequests.appendAll(data) case None => // nothing } }) } } catch { case t: Throwable => error("Failed to send messages", t) } failedProduceRequests case None => // all produce requests failed messages } }
说明:
这个方法主要说了二个重要信息,一个是partitionAndCollate,这个方法主要获取topic、partition和broker的,这个方法很重要,下面会进行分析。另一个重要的方法是groupMessageToSet是要对所发送数据进行压缩设置,如果没有设置压缩,就所有topic对应的消息集都不压缩。如果设置了压缩,并且没有设置对个别topic启用压缩,就对所有topic都使用压缩;否则就只对设置了压缩的topic压缩。
在这个gruopMessageToSet中,并不有具体的压缩逻辑。而是返回一个ByteBufferMessageSet对象。
在我们了解的partitionAndCollate方法之前先来了解一下如下类结构:
TopicMetadata -->PartitionMetadata case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty, errorCode: Short = ErrorMapping.NoError)也就是说,Topic元数据包括了partition元数据,partition元数据中包括了partitionId,leader(leader partition在哪个broker中,备份partition在哪些broker中,以及isr有哪些等等。
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = { val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] try { for (message <- messages) { //获取Topic的partition列表 val topicPartitionsList = getPartitionListForTopic(message) //根据hash算法得到消息应该发往哪个分区(partition) val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList) val brokerPartition = topicPartitionsList(partitionIndex) // postpone the failure until the send operation, so that requests for other brokers are handled correctly val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1) var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null ret.get(leaderBrokerId) match { case Some(element) => dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]] case None => dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] ret.put(leaderBrokerId, dataPerBroker) } val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId) var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null dataPerBroker.get(topicAndPartition) match { case Some(element) => dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]] case None => dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]] dataPerBroker.put(topicAndPartition, dataPerTopicPartition) } dataPerTopicPartition.append(message) } Some(ret) }catch { // Swallow recoverable exceptions and return None so that they can be retried. case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None } }
说明:
调用partitionAndCollate根据topics的messages进行分组操作,messages分配给dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送消息数据,SyncProducer包装了nio网络操作信息。
partitionAndCollate这个方法的主要作用是:获取所有partitions的leader所在leaderBrokerId(就是在该partiionid的leader分布在哪个broker上),创建一个HashMap>>>,把messages按照brokerId分组组装数据,然后为SyncProducer分别发送消息作准备工作,在确定一个消息应该发给哪个broker之前,要先确定它发给哪个partition,这样才能根据paritionId去找到对应的leader所在的broker。
我们进入getPartitionListForTopic这个方法看一下,这个方法主要是干什么的。
private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = { val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement) debug("Broker partitions registered for topic: %s are %s" .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(","))) val totalNumPartitions = topicPartitionsList.length if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition key = " + m.key) topicPartitionsList }说明:这个方法看上去没什么,主要是getBrokerPartitionInfo这个方法,其中KeyedMessage这个就是我们要发送的消息,返回值是Seq[PartitionAndLeader]。
相关推荐
Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...
kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala
《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载
#### 三、Kafka源码分析 ##### 3.1 生产者发送流程 1. **消息序列化**:生产者在发送消息之前,首先需要将消息对象序列化为字节数组。 2. **消息分发**:根据用户指定的分区策略(如轮询、按key哈希等),将消息...
apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社,分为part1和part2,一起下载解压
在这个项目中,`kafka_producer.zip`包含了实现这一功能的所有必要文件:一个配置文件`config.conf`和一个Python脚本`producer.py`。 首先,让我们了解`config.conf`文件的作用。这个配置文件通常包含Kafka生产者...
### Kafka源码解析新手版本(修正版)知识点详解 #### 一、Kafka诞生背景及其在LinkedIn的应用 **1.1 Apache Kafka项目简介** - **诞生背景:** Apache Kafka最初由LinkedIn开发,随后于2011年初开源,并在2012年...
标题中提到的“Kafka源码解析与实战”指向了关于Apache Kafka这个开源流处理平台的深入分析和应用实践。Apache Kafka是一个分布式流处理平台,最初是由LinkedIn公司开发,并于2011年开源。它主要用于构建实时数据...
Kafka源码解析与实战.zip
apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社
《Kafka源码解析及实战》是一本专为深度学习Apache Kafka的读者设计的教材,旨在帮助读者深入了解Kafka的工作原理及其内部机制。通过源码级别的解析,读者可以更好地掌握Kafka在分布式消息系统中的核心功能和设计...
apache kafka技术内幕 和 apacke kafka源码分析2本PDF 电子书 网盘下载
### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...
《Kafka消息队列源码深度解析》 Kafka,由LinkedIn开源并被Apache基金会接纳,是一款高吞吐量的分布式消息系统,广泛应用于大数据实时处理、日志收集、流计算等多个领域。本文将深入剖析Kafka的核心概念,以及其...
**Apache Kafka 源码分析** Apache Kafka 是一个分布式流处理平台,被广泛用于构建实时数据管道和流应用。它的核心设计目标是提供高吞吐量、低延迟的消息传递,同时保证消息的可靠性和顺序性。Kafka 的源码分析有助...
**Kafka 源码分析概述** Kafka 是一个分布式流处理平台,由 LinkedIn 开发并贡献给 Apache 软件基金会。它被设计为高吞吐量、低延迟的消息传递系统,支持实时数据流处理。Kafka 主要用于构建实时数据管道和流应用,...
Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。
这本书详细介绍了Kafka的核心概念、工作原理以及源码分析,旨在帮助读者理解并掌握这个分布式流处理平台。 Kafka是一个高吞吐量、可扩展的开源消息系统,最初由LinkedIn开发,后来成为Apache软件基金会的顶级项目。...