`
sqlxx
  • 浏览: 17643 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

干货:使用Kafka connect 同步数据至Elasticsearch

 
阅读更多

接着上篇安装完 postgresql connect ,我们再安装es connect就容易多了; 安装es connector plugins
因为docker 安装的connect容器里没有es的connect plugins,所以我们去 confluent 官网下载(搜索 Kafka Connect Elasticsearch下载即可)

下载解压后放至 connect目录(上篇中设置的挂载目录)中,如果不记得将容器目录挂载到哪可通过如下命令查看: docker inspect 容器id |grep Mounts -A 20

放置完成后重启connect 容器,并请求如下http验证: get ip:8093/connector-plugins
创建es sink connector post ip:8093/connectors 为何不可为大牛? { "name": "es-sink1", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url": "http://ip:9200", "connection.username": "elastic", "connection.password": "elastic_xdeas", "type.name": "_doc", "key.ignore": "false", "topics": "know.knowledge.formal_new", "write.method": "upsert", "behavior.on.null.values": "delete", "transforms": "key,ExtractFieldObject", "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.key.field": "id", "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractFieldObject.field": "after" } } (为何不可为大牛?) 这里的es connector 配置的着重解析一下: 一开始不知道怎么配认证,翻遍了国内外官方/非官方博客文档都没有找到,几乎要放弃了,最后在stackoverflow找到了 https://stackoverflow.com/questions/58381240/how-to-kafka-connect-elasticsearch-with-ssl (强烈吐槽!官方文档能不能详细点!) key.ignore 如果设置为true,ES里面_id的值会自动生成,这样的话表里某行记录只要一变化,es就会增加一条数据,所以一定要设置为false; topics:需要订阅的topic,即上篇配置完pg connector后生成的topic; transforms:数据转换有关; transforms.key.type和transforms.key.field这里配置的意思是将表中的id作为es里面的文档id; "transforms.ExtractFieldObject.field": "after" 字段筛选,我们只需要"after"字段的数据, 因为如果没有transforms.ExtractFieldObject.type 和 transforms.ExtractFieldObject.field的配置,其他的一些无关紧要的元数据也会进入es,索引里数据会是下面这样: (再次吐槽官方文档,这里也是花了很多时间才摸索这试出来,太难了) "payload":{"before":null, "after":{"id":"1","collect_id":"1","title":"test","content":"1","publish_date":1591025759000000,"collect_date":1591025761000000,"status":1,"create_date":1591025764000000,"creater":"1","update_date":1591025769000000,"updater":"1","link":"1","label":["1"],"origin":"4"}, "source":{"version":"1.1.1.Final","connector":"postgresql","name":"know","ts_ms":1591006642405,"snapshot":"false","db":"xdeasdb","schema":"knowledge","table":"knowledge_formal_new","txId":1604,"lsn":29368760,"xmin":null}, "op":"u","ts_ms":1591006642869,"transaction":null}} 验证:获取所有的connectors:get ip:8093/connectors/ 同步验证
如上述操作没问题,修改表数据,能看到es中自动创建了索引并将最新数据同步了过来,索引名即对应上步配置的topics :know.knowledge.formal_new

总结:kafka connector 是kafka内置的数据传输工具,上文我们创建了一个postgresql connector(依赖debezium的PostgresConnector)其实就是等价于我们在kafka的config目录中添加了一个connect-file-source.properties配置文件(source代表数据来源);这里我们创建的 es sink connector 等价于在config目录添加了一个connect-file-sink.properties配置文件(sink代表数据输出);这里采用docker 和api管理kafka的connector就显得方便多了;

 

转发自http://www.forenose.com/column/content/427083577.html

分享到:
评论

相关推荐

    RdKafka::KafkaConsumer使用实例

    在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...

    java语言kafka数据批量导入到Elasticsearch实例

    消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...

    代码:kafka数据接入到mysql中

    总之,将Kafka数据接入到MySQL涉及到Kafka Connect的使用、JDBC连接器的配置,以及对数据一致性、性能优化等方面的考虑。通过这样的数据管道,可以实现大数据实时流处理与关系型数据库之间的高效交互。

    kafka-connect.pdf

    总体来说,文档详细地介绍了Zookeeper、Kafka的安装和配置步骤,以及如何使用Kafka Connect进行数据集成的具体实践,尤其对于如何配置和启动Oracle数据库连接器,提供了相当具体的示例和故障排除的步骤。这些内容...

    kafka-connect-ui:用于Kafka Connect的Web工具|

    kafka-connect-ui 这是Kafka Connect的网络工具,用于设置和管理多个连接集群的连接器。现场演示与Docker独立运行docker run --... 例如:CONNECT_URL = “ 另外,您可以通过在端点URL后面附加分号和群集名称来为Connec

    kafka数据同步工具kafka2x-elasticsearch-master.zip

    《Kafka数据同步至Elasticsearch的深度解析》 在大数据处理领域,Kafka和Elasticsearch都是不可或缺的重要组件。Kafka作为一个强大的分布式流处理平台,主要用于实时数据管道和流处理,而Elasticsearch则是一个功能...

    kafka-connect-redis:f Kafka Connect Redis的源和接收器连接器

    Kafka Connect Redis 用于Redis的Kafka源和接收器连接器... 该演示将引导您在本地计算机上设置Kubernetes,安装连接器,并使用连接器将数据写入Redis集群或将数据从Redis引入Kafka。 兼容性 需要Redis 2.6或更高版本。

    kafka-connect-elasticsearch-source:Kafka Connect Elasticsearch源

    Kafka Connect Elasticsearch来源:从elastic-search获取数据并将其发送到kafka。 连接器仅使用严格的增量/时间字段(例如时间戳或增量ID)来获取新数据。 它支持动态模式和嵌套对象/数组。 要求: Elasticsearch ...

    Apache Kafka:KafkaConnect深入解析.docx

    Apache Kafka:KafkaConnect深入解析.docx

    streamx:kafka-connect-s3:从Kafka到对象存储(s3)提取数据

    StreamX:Kafka Connect for S3 从很棒的 StreamX是基于kafka连接的连接器,用于将数据从Kafka复制到对象存储,例如Amazon s3,Google Cloud Storage和Azure Blob存储。 它专注于可靠和可扩展的数据复制。 它可以以...

    kafka-connect-mongodb:用于Kafka Connect的MongoDB接收器连接器

    您可以通过使用kafka9-connect-mongodb分支将此连接器用于Kafka9。 用于Kafka Connect的MongoDB接收器连接器提供了从Kafka主题或主题集到MongoDB集合或多个集合的简单,连续的链接。 连接器使用Kafka消息,重命名...

    Go-Golang程序从一组kafka主题中读取记录并将它们写入elasticsearch集群

    本项目涉及到一个用Go编写的程序,它实现了从Apache Kafka主题中读取数据,并将这些数据写入Elasticsearch集群。Kafka是一个分布式流处理平台,而Elasticsearch则是一种流行的全文搜索引擎和实时数据分析工具。接...

    docker-kafka-connect:用于kafka-connect的Docker映像

    docker-kafka-连接 Dockerized (分布式模式) 支持的标签 0.10.0.0 (2.11) 0.10.1.1 (2.11) 最新的0.10.2.0 (2.12) 快速开始 使用Docker Compose 像这样编写docker-compose.yml ,然后执行docker-...

    kafka-connect-elasticsearch-5.4.1.zip

    kafka-connect-elasticsearch-5.4.1.zip包含了kafka-connect-elasticsearch-5.4.1.jar包。 欢迎大家下载,大家也可关注我的博客,欢迎一起交流,如有疑问请留言!

    kafka-connect:Real-time Data Integration.pptx

    - **Elasticsearch连接器**:将Kafka主题中的数据实时写入Elasticsearch索引,使得数据可用于实时搜索、日志分析和仪表盘展示。 ### 6. 应用挑战与最佳实践 - **监控与故障排查**:确保适当监控连接器性能和数据...

    Kafka the Definitive Guide 2nd Edition

    * 数据分析平台:Kafka 能够构建数据分析平台,满足实时数据分析需求。 * 实时数据处理系统:Kafka 能够构建实时数据处理系统,满足高吞吐量的数据处理需求。 * 流媒体处理系统:Kafka 能够构建流媒体处理系统,满足...

    kafka-connect-tools

    - Elasticsearch 连接器:将 Kafka 数据流实时同步到 Elasticsearch,用于实时搜索和分析。 - S3 连接器:将 Kafka 数据持久化到 Amazon S3 存储桶。 ### 5. 结论 Kafka Connect Tools 提供了一套强大的工具集,...

    kafka-connect-protobuf-converter:用于Kafka Connect的Protobuf转换器插件

    转换器控制将数据写入源连接器的Kafka或从Kafka接收器的连接器读取的数据格式。兼容性2.x版本系列与Kafka Connect 5.x兼容。 以及更高版本(较早的版本已经过验证,可以一直使用到Kafka Connect 3.2.0,尽管我们...

    kafka-connect-elasticsearch:Kafka Connect Elasticsearch连接器

    Kafka Connect Elasticsearch连接器 kafka-connect-elasticsearch是一个用于在Kafka和Elasticsearch之间复制数据。发展要构建开发版本,您需要Kafka的最新版本。 您可以使用标准生命周期阶段,使用Maven构建kafka-...

Global site tag (gtag.js) - Google Analytics