`
kane_xie
  • 浏览: 144579 次
社区版块
存档分类
最新评论

Kafka Connect

阅读更多

Kafka Connect是Kafka0.9新增的模块。可以从名称看出,它可以和外部系统、数据集建立一个数据流的连接,实现数据的输入、输出。有以下特性:

 

  • 使用了一个通用的框架,可以在这个框架上非常方面的开发、管理Kafka Connect接口
  • 支持分布式模式或单机模式进行运行
  • 支持REST接口,可以通过REST API提交、管理 Kafka Connect集群
  • offset自动管理

 

在大部分Kafka应用场景中,我们常常需要从某一数据源导入数据到Kafka中,或者将Kafka中的数据导出(准确说是被取出)到其他系统中。为了实现这一功能,我们一般需要在上游系统中创建一个Kafka Producer,或在下游系统中创建Kafka Consumer。

 

在Kafka0.9中新增的Kafka Connect模块实现了以上功能,这样我们就可以省掉了一些通用交互代码,而只需要做简单的配置就行了。这和Logstash的思路很相似,Kafka提供了connector的接口,通过实现该接口来,我们可以创建各种各样的Input和Output插件。当然Kafka Connect才刚刚推出,插件远没有Logstash丰富,不过相信随着Kafka0.9的普及,这一功能将变得更加实用。

 

以下介绍Kafka Connect的一个简单的实现,FileInput和FileOutput,启动方式如下:

 
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

 

执行connect-standalone.sh会在本地单机启动connector(s)。我们必须声明至少两个参数,第一个是Kafka Connect的配置文件,包含一些基本配置例如kafka server地址,序列化格式等等;其他的配置文件用于配置connector(s),每个配置文件创建一个connector。我们来看一下这两个配置文件:

 

connect-file-source.properties 
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/test-src.txt
topic=test

 

该配置会启动一个名叫local-file-source的connector,这个connector会从/tmp/test-src.txt中读取数据,然后写入名叫test的topic中。

 

connect-file-sink.properties 
name=local-file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test-dest.txt
topics=test

 

该配置会启动一个名叫local-file-sink的connector,这个connector会从Kafka的test topic中读取数据,然后写入/tmp/test-dest.tx文件中。

 

所以我们看到,运行以上命令实际上是启动了一个程序不断地读取/tmp/test-src.txt,并通过Kafka Producer发送到Kafka 的test这个topic中,同时启动一个Kafka Consumer从这个topic中读取数据并写入/tmp/test-dest.txt,启动之后我们可以测试一下。

 
echo "hello world" >> /tmp/test-src.txt

 

我们往/tmp/test-src.txt中写入几个字符,然后我们查看/tmp/test-dest.txt,发现“hello world”已经被写入/tmp/test-dest.txt中。

 

多说一句,数据并不是以文本的形式存入Kafka,而是Json。它的格式类似于:

 
{"schema":{"type":"string","optional":false},"payload":"hello world"}

 

所以本文中两个connector实际上还做了数据的封装和Json解析的工作。

 

 

 

1
2
分享到:
评论

相关推荐

    kafka-connect.pdf

    本文档详细介绍了Zookeeper和Kafka的安装流程,以及Kafka Connect的配置方法,并解释了相关的参数设置。此外,还涉及了通过Kafka Connect集成Oracle数据库的配置示例。 知识点一:Zookeeper安装和配置 首先,文档...

    藏经阁-BUILDING REALTIME DATA PIPELINES WITH KAFKA CONNECT AND SPAR

    在构建实时数据管道时,Kafka Connect 和 Spark Streaming 是两个重要的工具,它们在阿里云的分布式数据处理场景中发挥着关键作用。本文将深入探讨如何利用这两个技术来创建高效的实时数据流处理系统。 首先,Kafka...

    代码:kafka数据接入到mysql中

    一种常见的方法是使用Apache Kafka Connect,这是一个用于在Kafka和其它系统之间进行数据集成的开放源码框架。Kafka Connect提供了一种声明式的方式来定义数据源(Source)和数据接收器(Sink),使得数据能够在不同...

    kafka-connect-ui:用于Kafka Connect的Web工具|

    kafka-connect-ui 这是Kafka Connect的网络工具,用于设置和管理多个连接集群的连接器。现场演示与Docker独立运行docker run --rm -it -p 8000:8000 \ -e "CONNECT_URL=...

    PyPI 官网下载 | mypy-boto3-kafkaconnect-1.18.56.tar.gz

    标题中的“PyPI 官网下载 | mypy-boto3-kafkaconnect-1.18.56.tar.gz”表明这是一个在Python Package Index (PyPI) 上发布的软件包,名为 `mypy-boto3-kafkaconnect`,版本号为1.18.56,并且以tar.gz格式压缩。...

    kafka-connect-jdbc:Kafka Connect连接器,用于兼容JDBC的数据库

    Kafka Connect JDBC连接器 kafka-connect-jdbc是一个用于与任何兼容JDBC的数据库之间加载数据。 可以在找到该连接器的文档。发展要构建开发版本,您需要Kafka的最新版本以及一系列上游Confluent项目,您必须从其相应...

    PyPI 官网下载 | mypy-boto3-kafkaconnect-1.19.7.tar.gz

    《PyPI官网下载:mypy-boto3-kafkaconnect-1.19.7.tar.gz——探索Python在云原生环境中的应用》 在当今的软件开发领域,Python以其简洁的语法和强大的库支持,成为了许多开发者首选的编程语言。在云原生环境中,...

    kafka-connect-redis:f Kafka Connect Redis的源和接收器连接器

    Kafka Connect Redis 用于Redis的Kafka源和接收器连接器 连接器 来源 Kafka Connect Redis Source使用Redis发布/订阅订阅Redis通道/模式(包括),并将接收到的消息写入Kafka。 有关更多信息,请参见。 下沉 Kafka...

    kafka-connect-mongodb:用于Kafka Connect的MongoDB接收器连接器

    您可以通过使用kafka9-connect-mongodb分支将此连接器用于Kafka9。 用于Kafka Connect的MongoDB接收器连接器提供了从Kafka主题或主题集到MongoDB集合或多个集合的简单,连续的链接。 连接器使用Kafka消息,重命名...

    kafka-connect-spooldir:Kafka Connect连接器,用于将CSV文件读入Kafka

    介绍通过安装该Kafka Connect连接器提供了监视目录的文件和在将新文件写入输入目录时读取数据的功能。 输入文件中的每个记录将根据用户提供的模式进行转换。 CSVRecordProcessor支持读取CSV或TSV文件。 它可以将CSV...

    Moving data from Kafka to S3 with Kafka Connect

    Moving data from Kafka to S3 with Kafka Connect

    kafka-connect-rest:Kafka Connect REST连接器

    && \cp ../../kafka-connect-rest-plugin/target/kafka-connect-rest-plugin-*-shaded.jar jars/ && \cp ../../kafka-connect-transform-from-json/kafka-connect-transform-from-json-plugin/target/kafka-connect...

    java8看不到源码-kafka-connect-rockset:Rockset的KafkaConnect插件

    名称将是kafka-connect-rockset-[VERSION]-SNAPSHOT-jar-with-dependencies.jar 。 用法 您的 Kafka 集群并确认它正在运行。 Kafka Connect 可以在模式下运行。 在这两种模式下,有一个配置文件控制 Kafka 连接

    kafka-connect-mqtt:Kafka Connect MQTT连接器

    kafka-connect-mqtt 此仓库包含用于Apache Kafka的MQTT源和接收器连接器。 已通过Kafka 2+进行了测试。 使用源连接器,您可以订阅MQTT主题,并将这些消息写到Kafka主题。 接收器连接器以相反的方式工作。 笔记: ...

    kafka-connect-jdbc-4.1.1.zip

    《Kafka Connect JDBC 4.1.1:数据迁移与集成的桥梁》 Kafka Connect JDBC 4.1.1 是 Apache Kafka 生态系统中一个关键的组件,它提供了连接和交互能力,允许数据在 Kafka 和关系型数据库之间进行高效、可靠的传输。...

    kafkaconnect

    Kafka Connect是Apache Kafka项目的一部分,它是一个开源框架,用于在Kafka和其它系统之间高效、可靠地传输数据。Kafka Connect的主要目标是简化数据集成,使得数据可以轻松地流入或流出Kafka,如数据库、文件系统...

    kafkaconnect:kafka连接示例

    在这个“kafkaconnect:kafka连接示例”中,我们将深入探讨如何配置和使用Kafka Connect来实现不同数据源和数据接收器的连接。 **1. Kafka Connect的基本概念** - **Connector**: 连接器(Connector)是Kafka ...

    kafka-connect-storage-cloud:用于云存储的Kafka Connect连接器套件套件(当前包括Amazon S3)

    Kafka Connect套件的云存储连接器 kafka-connect-storage-cloud是一套旨在用于在Kafka和公共云存储(例如Amazon S3)之间复制数据。 下面列出了当前可用的连接器: 适用于Amazon Simple Storage Service(S3)的...

    kafka-connect-jsonschema:使用JSONSchema的Kafka Connect Converter

    Kafka Connect JsonSchema转换器使用JSONSchema将Kafka中的JSON消息转换为Kafka ConnectRecords。 模式是从期望包含在每个JSON消息中的URI解析的。 可以通过http://或java.net.URI支持的任何协议来解析这些模式。 ...

    Kafka-Connect:Kafka Connect自定义连接器

    欢迎使用新的Kafka Connect连接器! 在开发中运行 此存储库中包含基于Confluent Platform Docker映像。 查看Docker映像的。 主机名confluent的主机名必须是可解析的。 您将需要使用docker-machine ip confluent确定...

Global site tag (gtag.js) - Google Analytics