`
sqlxx
  • 浏览: 17625 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

干货:使用Kafka connect 同步数据至Elasticsearch

 
阅读更多

接着上篇安装完 postgresql connect ,我们再安装es connect就容易多了; 安装es connector plugins
因为docker 安装的connect容器里没有es的connect plugins,所以我们去 confluent 官网下载(搜索 Kafka Connect Elasticsearch下载即可)

下载解压后放至 connect目录(上篇中设置的挂载目录)中,如果不记得将容器目录挂载到哪可通过如下命令查看: docker inspect 容器id |grep Mounts -A 20

放置完成后重启connect 容器,并请求如下http验证: get ip:8093/connector-plugins
创建es sink connector post ip:8093/connectors 为何不可为大牛? { "name": "es-sink1", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url": "http://ip:9200", "connection.username": "elastic", "connection.password": "elastic_xdeas", "type.name": "_doc", "key.ignore": "false", "topics": "know.knowledge.formal_new", "write.method": "upsert", "behavior.on.null.values": "delete", "transforms": "key,ExtractFieldObject", "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.key.field": "id", "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractFieldObject.field": "after" } } (为何不可为大牛?) 这里的es connector 配置的着重解析一下: 一开始不知道怎么配认证,翻遍了国内外官方/非官方博客文档都没有找到,几乎要放弃了,最后在stackoverflow找到了 https://stackoverflow.com/questions/58381240/how-to-kafka-connect-elasticsearch-with-ssl (强烈吐槽!官方文档能不能详细点!) key.ignore 如果设置为true,ES里面_id的值会自动生成,这样的话表里某行记录只要一变化,es就会增加一条数据,所以一定要设置为false; topics:需要订阅的topic,即上篇配置完pg connector后生成的topic; transforms:数据转换有关; transforms.key.type和transforms.key.field这里配置的意思是将表中的id作为es里面的文档id; "transforms.ExtractFieldObject.field": "after" 字段筛选,我们只需要"after"字段的数据, 因为如果没有transforms.ExtractFieldObject.type 和 transforms.ExtractFieldObject.field的配置,其他的一些无关紧要的元数据也会进入es,索引里数据会是下面这样: (再次吐槽官方文档,这里也是花了很多时间才摸索这试出来,太难了) "payload":{"before":null, "after":{"id":"1","collect_id":"1","title":"test","content":"1","publish_date":1591025759000000,"collect_date":1591025761000000,"status":1,"create_date":1591025764000000,"creater":"1","update_date":1591025769000000,"updater":"1","link":"1","label":["1"],"origin":"4"}, "source":{"version":"1.1.1.Final","connector":"postgresql","name":"know","ts_ms":1591006642405,"snapshot":"false","db":"xdeasdb","schema":"knowledge","table":"knowledge_formal_new","txId":1604,"lsn":29368760,"xmin":null}, "op":"u","ts_ms":1591006642869,"transaction":null}} 验证:获取所有的connectors:get ip:8093/connectors/ 同步验证
如上述操作没问题,修改表数据,能看到es中自动创建了索引并将最新数据同步了过来,索引名即对应上步配置的topics :know.knowledge.formal_new

总结:kafka connector 是kafka内置的数据传输工具,上文我们创建了一个postgresql connector(依赖debezium的PostgresConnector)其实就是等价于我们在kafka的config目录中添加了一个connect-file-source.properties配置文件(source代表数据来源);这里我们创建的 es sink connector 等价于在config目录添加了一个connect-file-sink.properties配置文件(sink代表数据输出);这里采用docker 和api管理kafka的connector就显得方便多了;

 

转发自http://www.forenose.com/column/content/427083577.html

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics