`

python连接kafka

阅读更多

       思路的:

       往kafka的集群写,

       往zk的读

       开始肯定去找python连接kafka的标准库,kafka-pythonpykafka 前者使用的人多是比较成熟的库,后者是Samsa的升级版本,在网上到文章在python连接并使用kafka 使用samsa连接zookeeper然后使用kafka Cluster很能满足我的需求,在pykafka的例子中也看到了zk的支持,而kafka-python并没有zk的支持,所以选择了pykafka做为连接库

概念问题

kafaka和zookeeper的群集,使用samsa的时候生产者和消费者都连接了zookeeper,但是我跟峰云(大数据大牛,运维屌丝逆转)沟通,他们使用的时候是生产者直接连接kafaka服务器列表,消费者才用zookeeper。这也解决了我看pykafka文档,只有消费者才连接zookeeper的困惑,所以问题解决,直接按照文档搞起。

生产者

1
2
3
4
5
6
>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="192.168.1.1:9092, 192.168.1.2:9092") # 可接受多个Client这是重点
>>> client.topics # 查看所有topic
>>> topic = client.topics['my.test'] # 选择一个topic
>>> producer = topic.get_producer()
>>> producer.produce(['test message ' + str(i ** 2) for i in range(4)]) # 加了个str官方的例子py2.7跑不过

消费者

1
2
3
4
5
>>> balanced_consumer = topic.get_balanced_consumer(
consumer_group='testgroup',
auto_commit_enable=True, # 设置为Flase的时候不需要添加 consumer_group
zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk
)
分享到:
评论

相关推荐

    kafka连接池_python版本

    kafka连接池_python版本 里面包含java的jar包 由于kafka在写入时会存在并发问题,采用连接池思想,抽取一种连接池的方式,连接池是采用Apache pool作为池管理,然后将生产者的连接点放到池中,在编译时需注意kafka...

    Python库 | kafka-python-1.3.4.tar.gz

    4. **连接管理**:库自动处理网络故障和重试策略,确保与Kafka集群的连接稳定可靠。 5. **序列化和反序列化**:kafka-python支持多种数据格式的序列化和反序列化,如JSON、pickle等,方便数据的处理和传输。 三、...

    Python测试Kafka集群(pykafka)实例

    在本文中,我们将深入探讨如何使用Python库`pykafka`来测试Apache Kafka集群。`pykafka`是一个强大的Python客户端,它提供了简洁的API,用于与Kafka进行交互,包括生产消息和消费消息。Apache Kafka是一种分布式流...

    Python-kafka集群搭建PythonAPI调用Producer和Consumer

    **Python-Kafka集群搭建与Python API使用指南** Kafka是一种分布式流处理平台,常用于实时数据处理和消息传递。在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的...

    kafka-python-2.0.2.tar.gz

    `kafka-python-2.0.2`为Python开发者提供了一个强大且灵活的工具,以连接和操作Apache Kafka。通过理解和熟练使用这个库,开发者可以构建高效、可靠的实时数据处理系统,实现数据的可靠传输和处理。无论你是初学者...

    python读取Kafka实例

    之后,我们通过配置文件加载了连接Kafka服务所需的信息,并创建了一个`KafkaConsumer`实例。我们通过`subscribe`方法订阅了特定的主题,然后就可以通过遍历消费者来接收和处理消息了。 以上就是一个基本的Python...

    PyPI 官网下载 | python-kafka-logger-0.4.tar.gz

    标题中的"PyPI 官网下载 | python-kafka-logger-0.4.tar.gz"指出这是一个从Python Package Index(PyPI)官方下载的压缩包,名为"python-kafka-logger-0.4.tar.gz"。PyPI是Python社区用于分发Python软件的中央仓库,...

    Python-KQ基于Kafka的简单Python的作业队列

    实际应用中,你可能需要连接到具体的Kafka集群,并根据业务需求调整配置和错误处理策略。 **总结** Python-KQ为Python开发者提供了一种简单的方法来利用Kafka构建作业队列。通过其易于使用的API,开发者可以快速地...

    kafka-python批量发送数据的实例

    from kafka import KafkaClient from kafka.producer import SimpleProducer def send_data_2_kafka(datas): ''' 向kafka解析队列发送数据 ''' client = KafkaClient(hosts=KAFKABROKER.split(,), timeout=30)...

    PyPI 官网下载 | kafka_transport-0.6.1-py3-none-any.whl

    《PyPI上的kafka_transport-0.6.1-py3-none-any.whl:Python连接Kafka的关键工具》 在Python编程中,处理分布式消息队列系统如Apache Kafka时,有一个名为`kafka_transport`的库是至关重要的。这个库在Python ...

    Python confluent kafka客户端配置kerberos认证流程详解

    其中第一种SASL/GSSAPI的认证就是kerberos认证,对于java来说有原生的支持,但是对于python来说配置稍微麻烦一些,下面说一下具体的配置过程,confluent kafka模块底层依赖于librdkafka,这是使用c编写的高性能的...

    Kafka离线安装包已经python安装工具(setuptools and pip of py2&py3;)

    Kafka离线安装包,此压缩包还提供了python的安装工具的离线安装包,可将其解压后使用python setup.py install 命令分别安装setuptools和pip后,在使用pip安装Kafka,若系统已经存在pip,则可忽略上一步。

    kafka-python

    5. **连接管理**:库自动处理与Kafka集群的连接,包括重试、超时和重连策略。 6. **记录查询和管理**:可以使用`kafka-python`查询特定分区和偏移量的消息,或者管理消费者组的偏移提交。 7. **事件驱动**:库提供...

    数据采集课设报告(网络爬虫-Python,Kafka与MySQL的组合使用-Java)

    Kafka和MySQL的操作则可能涉及到Java语言,利用Kafka的Java客户端API发送和接收消息,以及JDBC连接MySQL进行数据存取。开发环境可能包括Python IDE(如PyCharm)、Java IDE(如IntelliJ IDEA)以及数据库管理工具...

    mysql+canal+kafka配置及python实现文档.docx

    MySQL+Canal+Kafka 配置及 Python 实现文档 本文档将介绍如何使用 MySQL、Canal 和 Kafka 实现数据实时同步的配置和 Python 实现。 MySQL 配置 MySQL 需要开启日志记录功能,以便 Canal 监听日志变化。首先,...

    kafka-python 获取topic lag值方式

    它创建了一个`SimpleClient`实例,用于连接Kafka服务器(brokers),然后获取指定主题的所有分区,并为每个分区发送OffsetRequestPayload请求,请求最新的offset(-1代表最新)。最后,返回所有响应的offset值之和。...

    kafka-connect-python:Kafka Connect REST API的Python模块

    kafka连接python Kafka Connect REST API的Python模块 要求 巨蟒(3.6) 安装 使用pip安装... pip install kafka-connect-python 例子 创建KafkaConnect REST接口 from kafka_connect import KafkaConnect ...

    对python操作kafka写入json数据的简单demo分享

    ### Python 操作 Kafka 写入 JSON 数据 Demo 分析 #### 一、引言 随着大数据技术的发展,消息队列在现代软件架构中的作用越来越重要。Apache Kafka 作为一款分布式的流处理平台,因其高吞吐量、低延迟等特点,在...

    python3连接kafka模块pykafka生产者简单封装代码

    from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in range...

Global site tag (gtag.js) - Google Analytics