原文地址:http://www.aboutyun.com/thread-9938-1-1.html
问题导读
1.Kafka提供了Producer类作为java producer的api,此类有几种发送方式?
2.总结调用producer.send方法包含哪些流程?
3.Producer难以理解的在什么地方?
producer的发送方式剖析
Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式。
sync架构图
async架构图
调用流程如下:
代码流程如下:
Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer、DefaultEventHandler。在创建的同时,会默认new一个ProducerPool,即我们每new一个java的Producer类,就会有创建Producer、EventHandler和ProducerPool,ProducerPool为连接不同kafka broker的池,初始连接个数有broker.list参数决定。
调用producer.send方法流程:
当应用程序调用producer.send方法时,其内部其实调的是eventhandler.handle(message)方法,eventHandler会首先序列化该消息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()
调用逻辑解释:当客户端应用程序调用producer发送消息messages时(既可以发送单条消息,也可以发送List多条消息),调用eventhandler.serialize首先序列化所有消息,序列化操作用户可以自定义实现Encoder接口,下一步调用partitionAndCollate根据topics的messages进行分组操作,messages分配给dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送消息数据,SyncProducer包装了nio网络操作信息。
Producer的sync与async发送消息处理,大家看以上架构图一目了然。
partitionAndCollate方法详细作用:获取所有partitions的leader所在leaderBrokerId(就是在该partiionid的leader分布在哪个broker上),
创建一个HashMap>>>,把messages按照brokerId分组组装数据,然后为SyncProducer分别发送消息作准备工作。
名称解释:partKey:分区关键字,当客户端应用程序实现Partitioner接口时,传入参数key为分区关键字,根据key和numPartitions,返回分区(partitions)索引。记住partitions分区索引是从0开始的。
Producer平滑扩容机制
如果开发过producer客户端代码,会知道metadata.broker.list参数,它的含义是kafak broker的ip和port列表,producer初始化时,就连接这几个broker,这时大家会有疑问,producer支持kafka cluster新增broker节点?它又没有监听zk broker节点或从zk中获取broker信息,答案是肯定的,producer可以支持平滑扩容broker,他是通过定时与现有的metadata.broker.list通信,获取新增broker信息,然后把新建的SyncProducer放入ProducerPool中。等待后续应用程序调用。
DefaultEventHandler类中初始化实例化BrokerPartitionInfo类,然后定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分代码如下: def handle(events: Seq[KeyedMessage[K,V]]) { ...... while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds } outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) if (outstandingProduceRequests.size > 0) { info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) //休眠时间,多长时间刷新一次 Thread.sleep(config.retryBackoffMs) // 生产者定期请求刷新最新topics的broker元数据信息 Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) ..... } } }
BrokerPartitionInfo的updateInfo方法代码如下:
def updateInfo(topics: Set[String], correlationId: Int) { var topicsMetadata: Seq[TopicMetadata] = Nil //根据topics列表,meta.broker.list,其他配置参数,correlationId表示请求次数,一个计数器参数而已 //创建一个topicMetadataRequest,并随机的选取传入的broker信息中任何一个去取metadata,直到取到为止 val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId) topicsMetadata = topicMetadataResponse.topicsMetadata // throw partition specific exception topicsMetadata.foreach(tmd =>{ trace("Metadata for topic %s is %s".format(tmd.topic, tmd)) if(tmd.errorCode == ErrorMapping.NoError) { topicPartitionInfo.put(tmd.topic, tmd) } else warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) { warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId, ErrorMapping.exceptionFor(pmd.errorCode).getClass)) } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata }) }) producerPool.updateProducer(topicsMetadata) }
ClientUtils.fetchTopicMetadata方法代码:
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null val shuffledBrokers = Random.shuffle(brokers) //生成随机数 while(i ProducerPool的updateProducer def updateProducer(topicMetadata: Seq[TopicMetadata]) { val newBrokers = new collection.mutable.HashSet[Broker] topicMetadata.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { if(pmd.leader.isDefined) newBrokers+=(pmd.leader.get) }) }) lock synchronized { newBrokers.foreach(b => { if(syncProducers.contains(b.id)){ syncProducers(b.id).close() syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) } else syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) }) } }
当我们启动kafka broker后,并且大量producer和consumer时,经常会报如下异常信息。
- root@lizhitao:/opt/soft$ Closing socket connection to 192.168.11.166
笔者也是经常很长时间看源码分析,才明白了为什么ProducerConfig配置信息里面并不要求使用者提供完整的kafka集群的broker信息,而是任选一个或几个即可。因为他会通过您选择的broker和topics信息而获取最新的所有的broker信息。
值得了解的是用于发送TopicMetadataRequest的SyncProducer虽然是用ProducerPool.createSyncProducer方法建出来的,但用完并不还回ProducerPool,而是直接Close.
重难点理解:
刷新metadata并不仅在第一次初始化时做。为了能适应kafka broker运行中因为各种原因挂掉、paritition改变等变化,
eventHandler会定期的再去刷新一次该metadata,刷新的间隔用参数topic.metadata.refresh.interval.ms定义,默认值是10分钟。
这里有三点需要强调:
客户端调用send, 才会新建SyncProducer,只有调用send才会去定期刷新metadata在每次取metadata时,kafka会新建一个SyncProducer去取metadata,逻辑处理完后再close。根据当前SyncProducer(一个Broker的连接)取得的最新的完整的metadata,刷新ProducerPool中到broker的连接.每10分钟的刷新会直接重新把到每个broker的socket连接重建,意味着在这之后的第一个请求会有几百毫秒的延迟。如果不想要该延迟,把topic.metadata.refresh.interval.ms值改为-1,这样只有在发送失败时,才会重新刷新。Kafka的集群中如果某个partition所在的broker挂了,可以检查错误后重启重新加入集群,手动做rebalance,producer的连接会再次断掉,直到rebalance完成,那么刷新后取到的连接着中就会有这个新加入的broker。
说明:每个SyncProducer实例化对象会建立一个socket连接
特别注意:
在ClientUtils.fetchTopicMetadata调用完成后,回到BrokerPartitionInfo.updateInfo继续执行,在其末尾,pool会根据上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)
在ProducerPool中,SyncProducer的数目是由该topic的partition数目控制的,即每一个SyncProducer对应一个broker,内部封了一个到该broker的socket连接。每次刷新时,会把已存在SyncProducer给close掉,即关闭socket连接,然后新建SyncProducer,即新建socket连接,去覆盖老的。
如果不存在,则直接创建新的。
相关推荐
5. **源码分析**: 拥有Kafka-clients的源码意味着你可以深入理解内部工作原理,例如消息序列化、分区逻辑、重试策略等。这对于优化性能、排查问题和定制功能非常有用。 6. **依赖管理**: 在Java项目中,通常...
Kafka是一种广泛应用于大数据处理和实时流数据处理的开源分布式消息中间件,由LinkedIn开发并在Apache Software Foundation下维护。在“kafka-2.12-3.4.0.tgz”这个压缩包中,包含了Kafka的源码、库文件和其他相关...
- **免编译安装**: `kafka_2.12-2.1.1.zip` 提供了预编译的二进制版本,无需用户自行编译源码。解压后,可以直接按照官方文档配置环境变量,启动服务器。 - **配置文件**: 主要包括`server.properties`(服务器配置...
标题中的"kafka_2.10-0.10.2.1.tgz"是一个Apache Kafka的特定版本压缩包,适用于Scala 2.10,并且版本号为0.10.2.1。Kafka是一款分布式流处理平台,由LinkedIn开发并捐赠给Apache软件基金会。它被广泛用于构建实时...
本文将通过分析kafka-clients-2.1.0.jar和kafka-clients-2.1.0-sources.jar这两个源码文件,深入探讨其核心知识点。 一、消费者API 1. **消费者组管理**:Kafka消费者通过消费者组来实现负载均衡和故障恢复。消费...
文件名称列表中的“apache-flume-1.9.0-bin.tar.gz”、“zookeeper-3.3.6_.tar.gz”和“kafka_2.11-0.10.1.0.tgz”分别是Flume、ZooKeeper和Kafka的不同版本的源码包。用户需要先解压缩这些文件,然后根据各自的安装...
通过深入学习和分析这些源代码,我们不仅可以了解Kafka的设计理念,还能掌握如何定制和优化Kafka以适应特定的业务场景。无论是对于提升个人技能,还是解决实际问题,这都是一份非常宝贵的学习资源。
《Apache Kafka 0.10.2.0详解:构建高效流处理平台》 Apache Kafka是一款分布式流处理平台,由LinkedIn开发并贡献给Apache软件基金会,如今已成为大数据领域的重要组件。这里的“kafka_2.10-0.10.2.0.tgz”是一个...
标题中提到的“Kafka源码解析与实战”指向了关于Apache Kafka这个开源流处理平台的深入分析和应用实践。Apache Kafka是一个分布式流处理平台,最初是由LinkedIn公司开发,并于2011年开源。它主要用于构建实时数据...
Apache Kafka是一个分布式流处理平台,由LinkedIn开发并在2011年开源,之后成为Apache软件基金会的顶级项目。Kafka以其高吞吐量、低延迟以及容错性而闻名,常用于构建实时数据管道和流应用。本文将深入探讨Kafka的...
在本文中,我们将深入探讨 Kafka 的核心概念、架构设计以及源码分析,以帮助读者更好地理解和应用 Kafka。 一、Kafka 核心概念 1. **主题(Topic)**:主题是 Kafka 中数据的分类,类似于数据库中的表。每个主题...
### Apache Kafka 知识点概览 #### 一、引言 Apache Kafka 是一款开源的流处理平台,由 LinkedIn 开发并捐赠给 Apache 软件基金会。它以一种高吞吐量、低延迟的方式处理实时数据流,并支持可扩展性、持久性和容错性...
整体而言,《深入理解Apache Kafka-初稿》为读者提供了一套全面、系统的Kafka知识体系,从基础概念到安装部署、监控优化,再到源码分析和架构剖析,内容丰富,覆盖了Kafka应用开发和维护的方方面面。对于希望深入...
《深入理解Kafka源码:2.1.0版本解析》 Kafka作为一个高效、可扩展、持久化的消息中间件,已经成为大数据领域的基石之一。它以其强大的吞吐能力、低延迟特性和分布式架构赢得了广大开发者和企业的青睐。本文将基于...
在源代码中,`kafka-producer-perf-test`提供了性能测试工具,`src/java/kafka/producer`包含生产者API的具体实现。 2. **Consumer**: 消费者模块负责从broker订阅和拉取消息。0.8.1.1版本中,消费者采用的是简单的...
通过分析`kafka-2.2.1-src`源码,我们可以更深入地理解Kafka的内部实现,例如消息的存储、网络通信、消费者心跳机制、分区分配策略等,这对于开发、运维和优化Kafka系统具有重要的实践价值。同时,源码学习也能帮助...
4. **Kafka 源码分析** - **Scala 2.12**:这个版本的 Kafka 使用 Scala 2.12 编写,提供了更现代的语法和性能改进。如果你对 Kafka 的内部工作原理感兴趣,可以深入研究源代码。 - **编译与运行**:通过 SBT ...
在下载的"**kafka-2.12-2.3.1.tgz**"压缩包中,包含了Kafka的源码和二进制文件,用户可以通过解压后配置和运行这些文件,在本地搭建Kafka集群,进行进一步的学习和开发。这个版本的Kafka支持Scala 2.12,并且是2.3.1...
总结,Kafka 2.10-0.9.0.1 的源码分析涵盖了 Mirror Maker 工具的实现,多机房消息异步处理的架构设计,以及 Kafka 内部组件的工作原理。深入研究这些源码将帮助我们更好地理解和优化 Kafka 系统,为大数据实时处理...
- **实时数据分析**:结合Hadoop等离线分析系统,Kafka能实现实时数据处理的需求,为用户提供即时反馈。 #### 二、Kafka核心概念与架构 - **核心概念**: - **主题(Topic)**:一个逻辑上的分类,用于发布消息...