接着上篇安装完 postgresql connect ,我们再安装es connect就容易多了; 安装es connector plugins
因为docker 安装的connect容器里没有es的connect plugins,所以我们去 confluent 官网下载(搜索 Kafka Connect Elasticsearch下载即可)
下载解压后放至 connect目录(上篇中设置的挂载目录)中,如果不记得将容器目录挂载到哪可通过如下命令查看: docker inspect 容器id |grep Mounts -A 20
放置完成后重启connect 容器,并请求如下http验证: get ip:8093/connector-plugins
创建es sink connector post ip:8093/connectors 为何不可为大牛? { "name": "es-sink1", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url": "http://ip:9200", "connection.username": "elastic", "connection.password": "elastic_xdeas", "type.name": "_doc", "key.ignore": "false", "topics": "know.knowledge.formal_new", "write.method": "upsert", "behavior.on.null.values": "delete", "transforms": "key,ExtractFieldObject", "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.key.field": "id", "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractFieldObject.field": "after" } } (为何不可为大牛?) 这里的es connector 配置的着重解析一下: 一开始不知道怎么配认证,翻遍了国内外官方/非官方博客文档都没有找到,几乎要放弃了,最后在stackoverflow找到了 https://stackoverflow.com/questions/58381240/how-to-kafka-connect-elasticsearch-with-ssl (强烈吐槽!官方文档能不能详细点!) key.ignore 如果设置为true,ES里面_id的值会自动生成,这样的话表里某行记录只要一变化,es就会增加一条数据,所以一定要设置为false; topics:需要订阅的topic,即上篇配置完pg connector后生成的topic; transforms:数据转换有关; transforms.key.type和transforms.key.field这里配置的意思是将表中的id作为es里面的文档id; "transforms.ExtractFieldObject.field": "after" 字段筛选,我们只需要"after"字段的数据, 因为如果没有transforms.ExtractFieldObject.type 和 transforms.ExtractFieldObject.field的配置,其他的一些无关紧要的元数据也会进入es,索引里数据会是下面这样: (再次吐槽官方文档,这里也是花了很多时间才摸索这试出来,太难了) "payload":{"before":null, "after":{"id":"1","collect_id":"1","title":"test","content":"1","publish_date":1591025759000000,"collect_date":1591025761000000,"status":1,"create_date":1591025764000000,"creater":"1","update_date":1591025769000000,"updater":"1","link":"1","label":["1"],"origin":"4"}, "source":{"version":"1.1.1.Final","connector":"postgresql","name":"know","ts_ms":1591006642405,"snapshot":"false","db":"xdeasdb","schema":"knowledge","table":"knowledge_formal_new","txId":1604,"lsn":29368760,"xmin":null}, "op":"u","ts_ms":1591006642869,"transaction":null}} 验证:获取所有的connectors:get ip:8093/connectors/ 同步验证
如上述操作没问题,修改表数据,能看到es中自动创建了索引并将最新数据同步了过来,索引名即对应上步配置的topics :know.knowledge.formal_new
总结:kafka connector 是kafka内置的数据传输工具,上文我们创建了一个postgresql connector(依赖debezium的PostgresConnector)其实就是等价于我们在kafka的config目录中添加了一个connect-file-source.properties配置文件(source代表数据来源);这里我们创建的 es sink connector 等价于在config目录添加了一个connect-file-sink.properties配置文件(sink代表数据输出);这里采用docker 和api管理kafka的connector就显得方便多了;
转发自http://www.forenose.com/column/content/427083577.html
相关推荐
在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...
消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...
总之,将Kafka数据接入到MySQL涉及到Kafka Connect的使用、JDBC连接器的配置,以及对数据一致性、性能优化等方面的考虑。通过这样的数据管道,可以实现大数据实时流处理与关系型数据库之间的高效交互。
总体来说,文档详细地介绍了Zookeeper、Kafka的安装和配置步骤,以及如何使用Kafka Connect进行数据集成的具体实践,尤其对于如何配置和启动Oracle数据库连接器,提供了相当具体的示例和故障排除的步骤。这些内容...
kafka-connect-ui 这是Kafka Connect的网络工具,用于设置和管理多个连接集群的连接器。现场演示与Docker独立运行docker run --... 例如:CONNECT_URL = “ 另外,您可以通过在端点URL后面附加分号和群集名称来为Connec
《Kafka数据同步至Elasticsearch的深度解析》 在大数据处理领域,Kafka和Elasticsearch都是不可或缺的重要组件。Kafka作为一个强大的分布式流处理平台,主要用于实时数据管道和流处理,而Elasticsearch则是一个功能...
Kafka Connect Redis 用于Redis的Kafka源和接收器连接器... 该演示将引导您在本地计算机上设置Kubernetes,安装连接器,并使用连接器将数据写入Redis集群或将数据从Redis引入Kafka。 兼容性 需要Redis 2.6或更高版本。
Kafka Connect Elasticsearch来源:从elastic-search获取数据并将其发送到kafka。 连接器仅使用严格的增量/时间字段(例如时间戳或增量ID)来获取新数据。 它支持动态模式和嵌套对象/数组。 要求: Elasticsearch ...
Apache Kafka:KafkaConnect深入解析.docx
StreamX:Kafka Connect for S3 从很棒的 StreamX是基于kafka连接的连接器,用于将数据从Kafka复制到对象存储,例如Amazon s3,Google Cloud Storage和Azure Blob存储。 它专注于可靠和可扩展的数据复制。 它可以以...
您可以通过使用kafka9-connect-mongodb分支将此连接器用于Kafka9。 用于Kafka Connect的MongoDB接收器连接器提供了从Kafka主题或主题集到MongoDB集合或多个集合的简单,连续的链接。 连接器使用Kafka消息,重命名...
本项目涉及到一个用Go编写的程序,它实现了从Apache Kafka主题中读取数据,并将这些数据写入Elasticsearch集群。Kafka是一个分布式流处理平台,而Elasticsearch则是一种流行的全文搜索引擎和实时数据分析工具。接...
docker-kafka-连接 Dockerized (分布式模式) 支持的标签 0.10.0.0 (2.11) 0.10.1.1 (2.11) 最新的0.10.2.0 (2.12) 快速开始 使用Docker Compose 像这样编写docker-compose.yml ,然后执行docker-...
kafka-connect-elasticsearch-5.4.1.zip包含了kafka-connect-elasticsearch-5.4.1.jar包。 欢迎大家下载,大家也可关注我的博客,欢迎一起交流,如有疑问请留言!
- **Elasticsearch连接器**:将Kafka主题中的数据实时写入Elasticsearch索引,使得数据可用于实时搜索、日志分析和仪表盘展示。 ### 6. 应用挑战与最佳实践 - **监控与故障排查**:确保适当监控连接器性能和数据...
* 数据分析平台:Kafka 能够构建数据分析平台,满足实时数据分析需求。 * 实时数据处理系统:Kafka 能够构建实时数据处理系统,满足高吞吐量的数据处理需求。 * 流媒体处理系统:Kafka 能够构建流媒体处理系统,满足...
- Elasticsearch 连接器:将 Kafka 数据流实时同步到 Elasticsearch,用于实时搜索和分析。 - S3 连接器:将 Kafka 数据持久化到 Amazon S3 存储桶。 ### 5. 结论 Kafka Connect Tools 提供了一套强大的工具集,...
转换器控制将数据写入源连接器的Kafka或从Kafka接收器的连接器读取的数据格式。兼容性2.x版本系列与Kafka Connect 5.x兼容。 以及更高版本(较早的版本已经过验证,可以一直使用到Kafka Connect 3.2.0,尽管我们...
Kafka Connect Elasticsearch连接器 kafka-connect-elasticsearch是一个用于在Kafka和Elasticsearch之间复制数据。发展要构建开发版本,您需要Kafka的最新版本。 您可以使用标准生命周期阶段,使用Maven构建kafka-...