`

Canal相关理解

阅读更多
转载:http://www.importnew.com/25189.html
概述
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

起源:早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

基于日志增量订阅&消费支持的业务:

数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
工作原理
mysql主备复制实现:



从上层来看,复制分成三步:

master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理


原理相对比较简单:

canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
架构设计
个人理解,数据增量订阅与消费应当有如下几个点:

增量订阅和消费模块应当包括binlog日志抓取,binlog日志解析,事件分发过滤(EventSink),存储(EventStore)等主要模块。
如果需要确保HA可以采用Zookeeper保存各个子模块的状态,让整个增量订阅和消费模块实现无状态化,当然作为consumer(客户端)的状态也可以保存在zk之中。
整体上通过一个Manager System进行集中管理,分配资源。
可以参考下图:



canal架构设计


说明:

server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:

eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
EventParser


整个parser过程大致可分为几部:

Connection获取上一次解析成功的位置(如果第一次启动,则获取初始制定的位置或者是当前数据库的binlog位点)
Connection建立连接,发生BINLOG_DUMP命令
Mysql开始推送Binary Log
接收到的Binary Log通过Binlog parser进行协议解析,补充一些特定信息
传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
存储成功后,定时记录Binary Log位置
EventSink设计


说明:

数据过滤:支持通配符的过滤模式,表名,字段内容等
数据路由/分发:解决1:n (1个parser对应多个store的模式)
数据归并:解决n:1 (多个parser对应1个store)
数据加工:在进入store之前进行额外的处理,比如join
1 数据1:n业务 :

为了合理的利用数据库资源, 一般常见的业务都是按照schema进行隔离,然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。 所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注。

2 数据n:1业务:

同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。 所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并.

EventStore设计
目前实现了Memory内存、本地file存储以及持久化到zookeeper以保障数据集群共享。
Memory内存的RingBuffer设计:



定义了3个cursor

Put : Sink模块进行数据存储的最后一次写入位置
Get : 数据订阅获取的最后一次提取位置
Ack : 数据消费成功的最后一次消费位置
借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:



实现说明:

Put/Get/Ack cursor用于递增,采用long型存储
buffer的get操作,通过取余或者与操作。(与操作: cusor & (size – 1) , size需要为2的指数,效率比较高)
Instance设计


instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。
抽象了CanalInstanceGenerator,主要是考虑配置的管理方式:

1. manager方式: 和你自己的内部web console/manager系统进行对接。(alibaba内部使用方式)

2. spring方式:基于spring xml + properties进行定义,构建spring配置.

spring/memory-instance.xml 所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析。特点:速度最快,依赖最少
spring/file-instance.xml 所有的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制.支持单机持久化
spring/default-instance.xml 所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享. 支持HA
spring/group-instance.xml 主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.
Server设计


server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现:

Embeded : 对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)
Netty : 基于netty封装了一层网络协议,由canal server保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。
增量订阅/消费设计


具体的协议格式,可参见:CanalProtocol.proto
get/ack/rollback协议介绍:

Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
a. batch id 唯一标识
b. entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto
void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
流式api设计的好处:
get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)
流式api设计:



每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取
数据格式
canal采用protobuff:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Entry
    Header
        logfileName [binlog文件名]
        logfileOffset [binlog position]
        executeTime [发生的变更]
        schemaName
        tableName
        eventType [insert/update/delete类型]
    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]
    storeValue  [byte数据,可展开,对应的类型为RowChange]   
RowChange
    isDdl       [是否是ddl变更操作,比如create table/drop table]
    sql     [具体的ddl sql]
    rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
        beforeColumns [Column类型的数组]
        afterColumns [Column类型的数组]     
Column
    index      
    sqlType     [jdbc type]
    name        [column name]
    isKey       [是否为主键]
    updated     [是否发生过变更]
    isNull      [值是否为null]
    value       [具体的内容,注意为文本]
canal-message example:

比如数据库中的表:

1
2
3
4
5
6
7
8
9
mysql> select * from person;
+----+------+------+------+
| id | name | age  | sex  |
+----+------+------+------+
|  1 | zzh  |   10 | m    |
|  3 | zzh3 |   12 | f    |
|  4 | zzh4 |    5 | m    |
+----+------+------+------+
3 rows in set (0.00 sec)
更新一条数据(update person set age=15 where id=4):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
****************************************************
* Batch Id: [2] ,count : [3] , memsize : [165] , Time : 2016-09-07 15:54:18
* Start : [mysql-bin.000003:6354:1473234846000(2016-09-07 15:54:06)]
* End : [mysql-bin.000003:6550:1473234846000(2016-09-07 15:54:06)]
****************************************************

================> binlog[mysql-bin.000003:6354] , executeTime : 1473234846000 , delay : 12225ms
BEGIN ----> Thread id: 67
----------------> binlog[mysql-bin.000003:6486] , name[canal_test,person] , eventType : UPDATE , executeTime : 1473234846000 , delay : 12225ms
id : 4    type=int(11)
name : zzh4    type=varchar(100)
age : 15    type=int(11)    update=true
sex : m    type=char(1)
----------------
END ----> transaction id: 308
================> binlog[mysql-bin.000003:6550] , executeTime : 1473234846000 , delay : 12240ms
HA机制设计
canal的HA分为两部分,canal server和canal client分别有对应的ha实现:

canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),可以看下我之前zookeeper的相关文章。

Canal Server:



大致步骤:

canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.
Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制.
HA配置架构图(举例)如下所示:



canal其他链接方式
canal还有几种连接方式:

1. 单连



2. 两个client+两个instance+1个mysql

当mysql变动时,两个client都能获取到变动



3. 一个server+两个instance+两个mysql+两个client



4. instance的standby配置



整体架构
从整体架构上来说canal是这种架构的(canal中没有包含一个运维的console web来对接,但要运用于分布式环境中肯定需要一个Manager来管理):



一个总体的manager system对应于n个Canal Server(物理上来说是一台服务器), 那么一个Canal Server对应于n个Canal Instance(destinations). 大体上是三层结构,第二层也需要Manager统筹运维管理。

那么随着Docker技术的兴起,是否可以试一下下面的架构呢?



一个docker中跑一个instance服务,相当于略去server这一层的概念。
Manager System中配置一个instance,直接调取一个docker发布这个instance,其中包括向这个instance发送配置信息,启动instance服务.
instance在运行过程中,定时刷新binlog filename+ binlog position的信息至zk。
如果一个instance出现故障,instance本身报错或者zk感知此node消失,则根据相应的信息,比如上一步保存的binlog filename+binlog position重新开启一个docker服务,当然这里可以适当的加一些重试机制。
当要更新时,类似AB test, 先关闭一个docker,然后开启新的已更新的替换,循序渐进的进行。
当涉及到分表分库时,多个物理表对应于一个逻辑表,可以将结果存于一个公共的模块(比如MQ),或者单独存取也可以,具体情况具体分析
存储可以参考canal的多样化:内存,文件,zk,或者加入至MQ中
docker由此之外的工具管理,比如kubernetes
也可以进一步添加HA的功能,两个docker对应一个mysql,互为主备,类似Canal的HA架构。如果时效性不是贴别强的场景,考虑到成本,此功能可以不采用。
总结
这里总结了一下Canal的一些点,仅供参考:

原理:模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;mysql master收到dump请求,开始推送binary log给slave(也就是canal);解析binary log对象(原始为byte流)
重复消费问题:在消费端解决。
采用开源的open-replicator来解析binlog
canal需要维护EventStore,可以存取在Memory, File, zk
canal需要维护客户端的状态,同一时刻一个instance只能有一个消费端消费
数据传输格式:protobuff
支持binlog format 类型:statement, row, mixed. 多次附加功能只能在row下使用,比如otter
binlog position可以支持保存在内存,文件,zk中
instance启动方式:rpc/http; 内嵌
有ACK机制
无告警,无监控,这两个功能都需要对接外部系统
方便快速部署。
分享到:
评论

相关推荐

    canal客户端-canal.deployer-1.1.7-SNAPSHOT.tar.gz

    《深入理解Canal客户端:canal.deployer-1.1.7-SNAPSHOT.tar.gz剖析》 Canal,由阿里开源的一款高效、稳定的数据库增量日志订阅与消费组件,广泛应用于数据同步、实时数据接入等场景。在本篇文章中,我们将深入探讨...

    canal.1.1.5包

    3. canal.example-1.1.5.tar:这是一个示例包,包含了一些示例配置和应用,帮助用户更好地理解和使用Canal,快速入门数据同步功能。 【标签】"综合资源 canal"表明这些文件是关于Canal的综合资源,涵盖了部署、管理...

    canal.deployer-1.1.6

    1. **bin**:包含了启动和停止Canal服务的脚本,如`canal.sh`用于启动Canal实例,`stop.sh`用于停止服务,以及相关的配置检查和启动参数设置命令。 2. **conf**:存放Canal实例的配置文件,如`canal.properties`定义...

    canal-1.1.6

    【标签】"canal" 标签明确了这个压缩包与Canal项目有关,Canal是一个Java实现的开源项目,其核心功能包括: 1. 数据库binlog解析:Canal能够解析MySQL的binlog(二进制日志),这是MySQL记录所有事务操作的地方,...

    阿里canal组件:canal.deployer-1.1.7-SNAPSHOT.tar.gz

    5. **conf**:配置文件目录,包含Canal实例、服务器、目标数据源等相关配置。在这里,你可以配置Canal连接MySQL的方式,设置数据同步的目标,定义过滤规则等。 Canal的工作原理大致如下: - 它监听MySQL的binlog...

    canal-1.1.6-SNAPSHOT.rar

    对于开发者来说,理解和掌握 Canal 的工作原理、配置选项以及如何与其他组件配合是至关重要的。 总的来说,"canal-1.1.6-SNAPSHOT.rar" 是一个便捷的工具,帮助开发者在 JDK 17 环境下快速搭建起数据同步的基础设施...

    canal-canal-1.0.22_源码

    通过深入学习和理解Canal的源码,开发者不仅可以掌握数据库同步的基本原理,还可以了解到如何设计和实现一个高效、稳定的数据订阅系统。这对于从事大数据、云计算、分布式系统等相关领域的工程师来说,是非常有价值...

    canal-1.1.5.zip

    Server端负责监听和捕获MySQL的binlog事件,Client端用于订阅和获取数据变更,适配器则将数据变更转换为各种目标系统可以理解的格式。 2. **Canal工作原理** - **Binlog监听**:Canal通过模拟MySQL的binlog slave...

    canal1.14.zip

    本文将深入探讨Canal的核心特性和应用场景,帮助读者理解并掌握这一利器。 Canal的设计理念是基于数据库的增量日志解析,通过监听和分析数据库的日志,实现实时地将数据库的变更事件推送给订阅者。这一特性使得...

    最新版 canal.deployer-1.1.4.tar.gz

    1. 安装准备:首先,下载Canal Deployer 1.1.4的压缩包(如canal.deployer-1.1.4.tar.gz),解压后配置相关环境变量,如CANAL_HOME。 2. 启动Canal Server:通过启动脚本启动Canal服务器,确保Canal服务正常运行。 3...

    canal.deployer-1.1.5-SNAPSHOT.tar.gz

    《Canal工具详解:深入理解canal.deployer-1.1.5-SNAPSHOT.tar.gz》 在IT行业中,数据库的实时同步与监控是至关重要的任务,尤其在大数据处理、分布式系统以及微服务架构中。Canal是一款由阿里巴巴开源的数据库变更...

    canal.admin-1.1.6-SNAPSHOT.tar.gz

    本文将深入探讨Canal Admin 1.1.6-SNAPSHOT版本的相关知识点。 Canal Admin的核心功能包括: 1. **实例管理**:用户可以通过Canal Admin创建、删除、启动和停止Canal实例,便于集中管理和监控各个实例的状态。 2....

    canal-canal-1.1.7.tar.gz

    在“canal-canal-1.1.7.tar.gz”这个压缩包中,包含了Canal的1.1.7版本的源码及相关文件,为开发者提供了便利的数据库同步服务。 1. **Canal概述**: Canal的主要功能是捕获MySQL的binlog事件,并将这些变化实时地...

    Canal安装包、安装文档

    【正文】 《Canal软件安装指南》 ...正确理解和使用Canal,能有效提升数据处理的实时性,为大数据实时分析、实时监控等场景提供强大支持。通过阅读“canal搭建.txt”文档,可以更深入地了解具体的部署和使用细节。

    深入浅出Otter与Canal.pdf

    根据提供的文件信息,我们可以提取以下IT知识点: 1. Otter系统介绍: ...此外,zookeeper在分布式系统中的重要性,以及灵活的系统架构设计原则,都是我们在构建和维护此类系统时需要深入理解的关键知识点。

    starter-canal.zip

    本篇文章将详细讲解如何在SpringBoot工程中集成Canal,以及相关的配置和使用方法。 首先,让我们理解Canal的基本概念。Canal主要负责监听MySQL的数据变更事件,当数据库中的表发生INSERT、UPDATE、DELETE操作时,...

    canal技术调研2.0.docx

    2. **多级索引**:当数据库中的数据发生变化时,可以通过Canal自动更新相关的二级索引或其他复杂索引结构,确保索引与主数据的一致性。 3. **业务cache刷新**:对于依赖数据库数据的缓存服务(如Redis),Canal可以...

    canal1.1.4(1.1.5).rar

    5. **源码分析**:canal-canal-1.1.5-alpha-2.zip和canal-canal-1.1.4.zip分别是1.1.5和1.1.4版本的源码包,这对于开发者深入理解Canal的内部工作原理、进行二次开发或者定制化需求非常有帮助。 6. **版本对比**:...

    canal-1.1.4.rar

    理解并正确配置这些参数是保证Canal正常运行的关键。 5. **数据订阅与消费**:Canal支持多种订阅模式,如全量同步和增量同步,以及基于SQL语句的过滤规则。消费者可以通过API或者消息队列(如RabbitMQ、Kafka)来...

    Canal的介绍以及使用Canal的介绍以及使用

    2. **部署Canal**: 下载Canal服务端,配置Canal的conf目录下的相关配置文件,如canal.properties、meta.dat等。 3. **创建Canal实例**: 根据需要同步的数据库创建Canal实例,并配置数据库连接信息。 4. **启动...

Global site tag (gtag.js) - Google Analytics