`
wiselyman
  • 浏览: 2093388 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
博客专栏
Group-logo
点睛Spring4.1
浏览量:82285
74ae1471-94c5-3ae2-b227-779326b57435
点睛Spring MVC4...
浏览量:130766
社区版块
存档分类
最新评论

基于K8s、Strimzi的Kafka Connect实战

阅读更多

0. 源码地址

https://github.com/wiselyman/kafka-in-battle

1. Operator Framework

Operator Framework是一个用来管理k8s原生应用(Operator)的开源工具。

Operator Framework支持的Operator分享地址:https://operatorhub.io

如安装Kafka使用Strimzi Apache Kafka Operator,地址为:https://operatorhub.io/operator/strimzi-kafka-operator 。

打开Strimzi Apache Kafka Operator页面,右侧有install按钮,按照页面提示进行Operator安装。

2. 安装Operator Lifecycle Manager

Operator Lifecycle Manager是Operator Framework的一部分,OLM扩展了k8s提供声明式方法安装、管理、更新Operator以及他们的依赖。

点击页面上的install显示如何安装Strimzi Apache Kafka Operator,我们首先第一步要安装Operator Lifecycle Manager(不要执行下句命令):

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/0.12.0/install.sh | bash -s 0.12.0

该命令需要使用quay.io的镜像,我们需采取从源码安装,并修改源码中的镜像地址加速。

源码地址:https://github.com/operator-framework/operator-lifecycle-manager/releases,当前最新版本为0.12.0

olm.yml中:

quay.io ->  quay.azk8s.cn

执行安装:

kubectl apply -f crds.yaml
kubectl apply -f olm.yaml

3. 安装Strimzi Apache Kafka Operator

kubectl create -f https://operatorhub.io/install/strimzi-kafka-operator.yaml

使用下面命令观察Operator启动情况

kubectl get csv -n operators

显示如下则安装成功

wangyunfeis-MacBook-Pro:olm wangyunfei$ kubectl get csv -n operators
NAME                               DISPLAY                         VERSION   REPLACES                           PHASE
strimzi-cluster-operator.v0.14.0   Strimzi Apache Kafka Operator   0.14.0    strimzi-cluster-operator.v0.13.0   Succeeded

4. 安装Kafka集群

下载https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.14.0/examples/kafka/kafka-persistent.yaml,主要修改的是所需存储空间为5Gi作为测试条件,这里的存储需要K8s集群中有默认的StorageClass

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.3.0
    replicas: 3
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      delete.topic.enable: "true"
      transaction.state.log.min.isr: 2
      log.message.format.version: "2.3"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 5Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}
kubectl apply -f kafka-persistent.yml -n kafka 
  • 发送消息测试
kubectl exec -i -n kafka my-cluster-kafka-0 -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic strimizi-my-topic
  • 接受消息测试
kubectl exec -i -n kafka  my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic strimizi-my-topic --from-beginning
  • 显示集群Topic
kubectl exec -n kafka my-cluster-kafka-0   -- bin/kafka-topics.sh --list --zookeeper localhost:2181

5. Kafka Connect

本节将外部的SQL Server中的表person(字段只有idname)通过Kafka Connect同步至K8s集群里的PostgreSQL中。

5.1 开启SQL Server数据库的CDC(Change Data Capture)功能

5.1.1 启用数据库CDC

USE bs_portal
EXEC sys.sp_cdc_enable_db;

bs_portal为数据库名,此时会自动给我们创建cdc的schema和相关表:

  • captured_columns
  • change_tables
  • dbo_person_CT
  • ddl_history
  • index_columns
  • lsn_time_mapping

可使用下面sql语句查询已开启CDC的数据库:

select * from sys.databases where is_cdc_enabled = 1 

5.1.2 启用表的CDC

USE bs_portal 
EXEC sys.sp_cdc_enable_table  
    @source_schema = 'dbo',  
    @source_name = 'person',  
    @role_name = 'cdc_admin',
    @supports_net_changes = 1;

@source_name为表名,查询表开启CDS的sql语句:

select name, is_tracked_by_cdc from sys.tables where object_id = OBJECT_ID('dbo.person')  

查看新增的job

SELECT job_id,name,enabled,date_created,date_modified FROM msdb.dbo.sysjobs ORDER BY date_created

确定用户有权限访问CDC表

EXEC sys.sp_cdc_help_change_data_capture;

5.1.3 开启“SQL Server 代理”

检查安装了SQL Server的操作系统中“服务”中是否开启了“SQL Server 代理”。

5.1.4 关闭CDC

关闭数据库的CDC

USE bs_portal
EXEC sys.sp_cdc_disable_db;

关闭表的CDC

USE bs_portal
EXEC sys.sp_cdc_disable_table   
    @source_schema = 'dbo',  
    @source_name = 'person',  
    @capture_instance = 'all';

5.2 SQL Server To PosgreSQL

5.2.1 准备Kafka Connect镜像

输入插件(source):下载SQL Server Connector plugin:http://central.maven.org/maven2/io/debezium/debezium-connector-sqlserver/;输出插件(sink):下载Kafka Connect JDBC:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

新建Dockerfile文件,将debezium-connector-sqlserver-0.10.0.Final-plugin.zip解压放置到Dockerfile相同目录下的plugins目录;在plugins目录下新建目录kafka-connect-jdbc,解压confluentinc-kafka-connect-jdbc-5.3.1.zip,将lib下的kafka-connect-jdbc-5.3.1.jarpostgresql-9.4.1212.jar放置在kafka-connect-jdbc

编写Dockerfile

FROM strimzi/kafka:0.14.0-kafka-2.3.0
USER root:root
COPY ./plugins/ /opt/kafka/plugins/
USER 1001
MAINTAINER 285414629@qq.com

使用阿里云“容器镜像服务”(https://cr.console.aliyun.com/)编译镜像,目前我们的源码地址位于:https://github.com/wiselyman/kafka-in-battle

  • “镜像仓库”->“创建镜像仓库”:

    1. 仓库名称:kafka-connect-form-sql-to-jdbc

    2. 仓库类型:公开

  • 下一步后,选择“Github”标签页,使用自己的GitHub库,“构建设置”只勾选“海外机器构建”,然后点击“创建镜像仓库”。

  • 点击镜像仓库列表中的“kafka-connect-mysql-postgres”->“构建”->“添加规则”:

    1. 类型:Branch

    2. Branch/Tag:master

    3. Dockerfile目录:/sqlserver-to-jdbc/

    4. Dockfile文件名:Dockerfile

    5. 镜像版本:0.1

  • 确认后,“构建规则设置”->“立即构建”,“构建日志”显示“构建状态”为“成功”即可。

5.2.2 安装Kafka Connect

编写Kafka Connect集群部署文件kafka-connect-sql-postgres.yml

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  version: 2.3.0
  replicas: 1
  bootstrapServers: 'my-cluster-kafka-bootstrap:9093'
  image: registry.cn-hangzhou.aliyuncs.com/wiselyman/kafka-connect-from-sql-to-jdbc:0.1
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt

执行安装

kubectl apply -f kafka-connect-sql-postgres.yml -n kafka

查询已安装的插件

kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connector-plugins

结果如:

[{
	"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
	"type": "sink",
	"version": "5.3.1"
}, {
	"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
	"type": "source",
	"version": "5.3.1"
}, {
	"class": "io.debezium.connector.sqlserver.SqlServerConnector",
	"type": "source",
	"version": "0.10.0.Final"
}, {
	"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
	"type": "sink",
	"version": "2.3.0"
}, {
	"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
	"type": "source",
	"version": "2.3.0"
}]

5.2.3 使用Helm安装PostgreSQL

使用helm安装PostgreSQL,这里的PostgreSQL库来自于https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts/,可在Helm中配置。

对PostgreSQL的账号、密码、初始化数据库、服务类型进行定制后安装:

helm install --name my-pg --set global.storageClass=standard,postgresUser=wisely,postgresPassword=zzzzzz,postgresDatabase=center,service.type=NodePort,service.nodePort=5432 stable/postgresql

5.2.4 Kafka Connect Source配置

编写source配置:sql-server-source.json

{
  "name": "sql-server-connector",
  "config": {
    "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max" : "1",
    "database.server.name" : "exam",
    "database.hostname" : "172.16.8.221",
    "database.port" : "1433",
    "database.user" : "sa",
    "database.password" : "sa",
    "database.dbname" : "bs_portal",
    "database.history.kafka.bootstrap.servers" : "my-cluster-kafka-bootstrap:9092",
    "database.history.kafka.topic": "schema-changes.person",
    "table.whitelist": "dbo.person"
  }
}

编写sink配置:postgres-sink.json

{
  "name": "postgres-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "exam.dbo.MH_YCZM",
    "connection.url": "jdbc:postgresql://my-pg-postgresql.default.svc.cluster.local:5432/center?user=wisely&password=zzzzzz",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "auto.create": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.fields": "IPDZ",
    "pk.mode": "record_key"
  }
}

5.2.5 使用

将配置文件提交到Kafka Connect

cat sql-server-source.json | kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-
cat postgres-sink.json| kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-

查看所有的Connector

kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connectors

删除Connect

kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X DELETE http://my-connect-cluster-connect-api:8083/connectors/postgres-sink

查看所有的topic

kubectl exec -n kafka my-cluster-kafka-0   -- bin/kafka-topics.sh --list --zookeeper localhost:2181

查看SQL Server Connector中的数据

kubectl exec -i -n kafka my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic exam.dbo.person --from-beginning

我们此时查看PostgreSQL数据库已经有了person表和数据,当对SQL Server新增、修改、删除数据时,PostgreSQL中也会同步更新。

0
0
分享到:
评论

相关推荐

    strimzi-kafka-group-authorizer:Strimzi Kafka Operator的简单授权者,可以基于模式为用户组配置ACL

    Strimzi Kafka Group Authorizer的工作原理是,它将用户组与ACL规则关联起来,这些规则基于特定的模式匹配资源名。例如,一个规则可能允许“developers”组的所有成员读取以“sales-”开头的主题。这种模式匹配使得...

    kafka-connect.pdf

    本文档详细介绍了Zookeeper和Kafka的安装流程,以及Kafka Connect的配置方法,并解释了相关的参数设置。此外,还涉及了通过Kafka Connect集成Oracle数据库的配置示例。 知识点一:Zookeeper安装和配置 首先,文档...

    图解 Kafka 之实战指南

    ### 图解Kafka之实战指南知识点详述 #### 一、Kafka简介 **Kafka** 起初由LinkedIn采用Scala语言开发,后捐赠给Apache基金会,现已成为一款广泛应用于分布式流处理平台的成熟软件。它凭借高吞吐量、可持久化存储、...

    图解 Kafka 之实战指南.7z

    《图解 Kafka 之实战指南》是一本深入解析Apache Kafka的实用书籍,旨在帮助读者理解和掌握这个分布式消息系统的精髓。Kafka是一个高吞吐、低延迟的开源流处理平台,常用于实时数据管道和流应用的构建。在这个实战...

    Apache Kafka实战.pdf

    7. **Kafka Connect**:Kafka Connect是一个用于简化数据集成的框架,可以方便地将数据源(如数据库)连接到Kafka,或从Kafka导出数据到其他系统(如Hadoop)。 8. **Kafka Streams**:Kafka Streams是Kafka内置的...

    PyPI 官网下载 | strimzi-kafka-cli-0.1.0a3.tar.gz

    此外,该工具还支持与Kafka Connect的集成,可以方便地管理和配置各种连接器,将Kafka与其他系统如数据库、文件系统等无缝连接。 总之,strimzi-kafka-cli-0.1.0a3.tar.gz是一个强大的工具,它简化了Kafka的运维...

    kafka实战pdf

    《Apache Kafka实战》这本书是关于分布式流处理平台Kafka的深度实践指南,旨在帮助读者深入理解和熟练运用Kafka。Kafka是由LinkedIn开发并开源的一种高吞吐量、分布式的发布订阅消息系统,现在已经成为大数据领域不...

    kafka-connect-jdbc-4.1.1.zip

    《Kafka Connect JDBC 4.1.1:数据迁移与集成的桥梁》 Kafka Connect JDBC 4.1.1 是 Apache Kafka 生态系统中一个关键的组件,它提供了连接和交互能力,允许数据在 Kafka 和关系型数据库之间进行高效、可靠的传输。...

    k8s-kafka:Kafka for Kubernetes

    k8s-kafka Kubernetes上的Kafka容器编辑controller.yaml并更改ZOOKEEPER_CONNECT环境以指向您已经拥有的Zookeeper实例。 Zookeeper的配置不在本文档范围内。 Zookeeper列表是逗号分隔的列表,例如host:port,host:...

    Python库 | strimzi_kafka_cli-0.1.0a34-py3-none-any.whl

    `strimzi_kafka_cli`是Python开发者管理和操控Apache Kafka集群的利器,它简化了与Kafka交互的过程,使得在Python环境中构建基于Kafka的应用变得更加高效。通过了解和掌握这个库的使用,开发者可以更好地融入Kafka的...

    strimzi-kafka-operator:在Kubernetes上运行的Apache Kafka

    在Kubernetes和OpenShift上运行Apache Kafka Strimzi提供了一种在各种部署配置中的或上运行集群的方法。 有关该项目的更多详细信息,请参见我们的。快速入门要并运行,请查看我们。文献资料当前master分支以及所有...

    高级-项目实战-日志收集系统kafka库实战

    1. 高级-项目实战-日志收集系统kafka库实战 2. 高级-etcd、contex、kafka消费实例、logagent 3. 实战-商品秒杀架构设计与开发 4. 实战-商品秒杀开发与接入层实现 总共18课时,网上收集的资料,只共用于学习,不...

    kafka-connect-oracle-1.0.71.jar

    2021年6.29号打包,最新版本:kafka-connect-oracle-1.0.71.jar,用于根据oracle binglog日志同步数据到kafka

    kafka-connect-ui:用于Kafka Connect的Web工具|

    kafka-connect-ui 这是Kafka Connect的网络工具,用于设置和管理多个连接集群的连接器。现场演示与Docker独立运行docker run --rm -it -p 8000:8000 \ -e "CONNECT_URL=...

    kafka-connect-jdbc:Kafka Connect连接器,用于兼容JDBC的数据库

    Kafka Connect JDBC连接器 kafka-connect-jdbc是一个用于与任何兼容JDBC的数据库之间加载数据。 可以在找到该连接器的文档。发展要构建开发版本,您需要Kafka的最新版本以及一系列上游Confluent项目,您必须从其相应...

    kafka-connect-mqtt:Kafka Connect MQTT连接器

    kafka-connect-mqtt 此仓库包含用于Apache Kafka的MQTT源和接收器连接器。 已通过Kafka 2+进行了测试。 使用源连接器,您可以订阅MQTT主题,并将这些消息写到Kafka主题。 接收器连接器以相反的方式工作。 笔记: ...

    Kafka Streams 实战 - 源码.zip

    Kafka Streams是Kafka提供的一个用于构建流式处理程序的Java库,它与Storm、Spark等流式处理框架不同,是一个仅依赖于Kafka的Java库,而不是一个流式处理框架。除Kafka之外,Kafka Streams不需要额外的流式处理集群...

    代码:kafka数据接入到mysql中

    一种常见的方法是使用Apache Kafka Connect,这是一个用于在Kafka和其它系统之间进行数据集成的开放源码框架。Kafka Connect提供了一种声明式的方式来定义数据源(Source)和数据接收器(Sink),使得数据能够在不同...

    藏经阁-BUILDING REALTIME DATA PIPELINES WITH KAFKA CONNECT AND SPAR

    在构建实时数据管道时,Kafka Connect 和 Spark Streaming 是两个重要的工具,它们在阿里云的分布式数据处理场景中发挥着关键作用。本文将深入探讨如何利用这两个技术来创建高效的实时数据流处理系统。 首先,Kafka...

Global site tag (gtag.js) - Google Analytics