Using SimpleConsumer
Why use SimpleConsumer?
The main reason to use a SimpleConsumer implementation is you want greater control over partition consumption than Consumer Groups give you.
For example you want to:
- Read a message multiple times
- Consume only a subset of the partitions in a topic in a process
- Manage transactions to make sure a message is processed once and only once
Downsides of using SimpleConsumer
The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups:
- You must keep track of the offsets in your application to know where you left off consuming.
- You must figure out which Broker is the lead Broker for a topic and partition
- You must handle Broker leader changes
Steps for using a SimpleConsumer
- Find an active Broker and find out which Broker is the leader for your topic and partition
- Determine who the replica Brokers are for your topic and partition
- Build the request defining what data you are interested in
- Fetch the data
- Identify and recover from leader changes
Finding the Lead Broker for a Topic and Partition
The easiest way to do this is to pass in a set of known Brokers to your logic, either via a properties file or the command line. These don’t have to be all the Brokers in the cluster, rather just a set where you can start looking for a live Broker to query for Leader information.
The call to topicsMetadata() asks the Broker you are connected to for all the details about the topic we are interested in.
The loop on partitionsMetadata iterates through all the partitions until we find the one we want. Once we find it, we can break out of all the loops.
Finding Starting Offset for Reads
Now define where to start reading data. Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages. Don’t assume that offset 0 is the beginning offset, since messages age out of the log over time.
Error Handling
Since the SimpleConsumer doesn't handle lead Broker failures, you have to write a bit of code to handle it.
Here, once the fetch returns an error, we log the reason, close the consumer then try to figure out who the new leader is.
This method uses the findLeader() logic we defined earlier to find the new leader, except here we only try to connect to one of the replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard.
Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer. In reality ZooKeeper often does the failover very quickly so you never sleep.
Reading the Data
Finally we read the data being streamed back and write it out.
Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.
Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn't the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.
Finally, we keep track of the # of messages read. If we didn't read anything on the last request we go to sleep for a second so we aren't hammering Kafka when there is no data.
Running the example
The example expects the following parameters:
- Maximum number of messages to read (so we don’t loop forever)
- Topic to read from
- Partition to read from
- One broker to use for Metadata lookup
- Port the brokers listen on
相关推荐
标题 "Arbie-0.8.0-py3-none-any.whl.zip" 暗示这是一个包含Python软件包的压缩文件。在这个特定的案例中,它是一个名为"Arbie"的项目的版本0.8.0的发布。"py3-none-any"部分表示这个软件包是为Python 3编译的,可以...
资源分类:Python库 所属语言:Python 资源全名:requires-0.8.0-py3-none-any.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
标题 "aeon-0.8.0-py3-none-any.whl.zip" 暗示这是一个包含Python软件包的压缩文件。"aeon"可能是该软件包的名字,而"0.8.0"则表示这是该软件包的版本号。".py3-none-any"这部分表明这个软件包是为Python 3编写的,...
ADT-0.8.0.zipADT-0.8.0.zipADT-0.8.0.zipADT-0.8.0.zip
本资源“PyPI 官网下载 | b4-0.8.0-py3-none-any.whl”就是从PyPI官网获取的一个Python库的可安装文件,名为“b4-0.8.0-py3-none-any.whl”。 首先,让我们深入了解PyPI。PyPI允许开发者发布自己的Python项目,使得...
**PyPI官网下载:perfbench-0.8.0-py3-none-any.whl** PyPI,全称为Python Package Index,是Python社区官方的软件包仓库。它为Python开发者提供了一个发布、查找和安装第三方模块的平台。在Python开发中,使用`pip...
标题中的“AS_Object_models-0.8.0-py3-none-any.whl.zip”表明这是一个包含AS_Object_models库的特定版本(0.8.0)的压缩文件,它以Python的.whl格式打包。.whl是Python的一种二进制分发格式,用于方便安装Python的...
《PyPI官网下载:深入解析deeposlandia-0.8.0-py3-none-any.whl》 PyPI(Python Package Index)是Python社区官方的软件包仓库,为全球Python开发者提供了一个集中分享和下载Python库的平台。在Python编程中,我们...
APIFlask-0.8.0-py3-none-any.whl.zip是一个压缩包,其中包含了一个名为"APIFlask-0.8.0-py3-none-any.whl"的文件和一个"使用说明.txt"。这个压缩包的核心是".whl"文件,它是Python中的 Wheel(轮子)格式,用于分发...
标题中的"PyPI 官网下载 | morelia-0.8.0-py3-none-any.whl"指的是一款名为Morelia的Python库,版本为0.8.0,它在Python的包索引(PyPI)官网上可以下载到。PyPI是Python社区广泛使用的软件包仓库,开发者可以在这里...
openocd for windows 0.8.0
标题中的“AdvancedAnalytics-0.8.0-py3-none-any.whl.zip”是一个压缩文件,其中包含了Python的轮子(wheel)包。在Python的生态系统中,轮子是一种预编译的二进制包格式,它使得安装过程更加高效,避免了编译源...
python库。 资源全名:amplpy-0.8.0b3-cp38-cp38-win32.whl
《PyPI官网下载 | orthopy-0.8.0-py3-none-any.whl》 在Python的世界里,PyPI(Python Package Index)是官方的软件仓库,它为Python开发者提供了一个集中分享和下载第三方库的平台。"orthopy-0.8.0-py3-none-any....
标题中的"PyPI 官网下载 | revns-0.8.0-py3-none-any.whl"指的是一款名为`revns`的Python库,版本号为0.8.0,该库是从Python的官方软件包索引(Python Package Index,简称PyPI)上下载的。PyPI是Python开发者发布...
标题中的“PyPI 官网下载 | dvc_cc-0.8.0-py3-none-any.whl”表明这是一个从Python Package Index(PyPI)官方源下载的软件包,名为`dvc_cc`,版本号为0.8.0。PyPI是Python社区广泛使用的软件仓库,用于发布和分发...
3. **详细注释**: jadx-gui会尽可能地保留原始的Java源代码注释,使得分析过程中能够获取更多的上下文信息。 4. **代码重构**: 支持代码重命名、复制、粘贴等功能,允许你在分析过程中进行简单的代码修改和整理。 ...
**PyPI官网下载 | pyjamalib-0.8.0-py3-none-any.whl** PyPI(Python Package Index)是Python社区广泛使用的软件包仓库,它为开发者提供了发布和分享自己创建的Python模块、库和其他工具的平台。用户可以通过PyPI...
《PyPI官网下载的maturin-0.8.0-py3-none-win32.whl:Python跨平台构建工具的深度解析》 在Python的世界里,开发人员常常需要使用各种库来提升效率和功能。PyPI(Python Package Index)是Python社区的主要资源库,...
**PyPI 官网下载 | meshzoo-0.8.0-py3-none-any.whl** 在Python的生态系统中,PyPI(Python Package Index)是官方的第三方库仓库,它为开发者提供了一个集中发布和下载Python软件包的平台。`meshzoo`是一个在PyPI...