快速体验
1. 首先需要先启动canal server,可参见:Canal Server的QuickStart
2. 运行canal client,可参见:canal client的ClientExample
如何下载
1. 如果是maven用户,可配置mvn dependency
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>x.y.z</version>
</dependency>
对应的version版本,可见https://github.com/alibaba/canal/releases
2. 其他用户,可通过mvn仓库直接下载jar包
mvn仓库下载url : http://central.maven.org/maven2/com/alibaba/otter/canal.client/
选择对应的version,下载jar/source/javadoc文件即可.
类设计
在了解具体API之前,需要提前了解下canal client的类设计,这样才可以正确的使用好canal.
大致分为几部分:
- ClientIdentity
canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)
- CanalConnector
SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制
- CanalNodeAccessStrategy
SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.
- ClientRunningMonitor/ClientRunningListener/ClientRunningData
client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.
javadoc查看:
server/client交互协议
get/ack/rollback协议介绍:
- Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
a. batch id 唯一标识
b. entries 具体的数据对象,可参见下面的数据介绍
- getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
a. 拿够batchSize条记录或者超过timeout时间
b. timeout=0,阻塞等到足够的batchSize
- void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
- void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
流式api设计的好处:
- get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
- get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)
流式api设计:
- 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
- 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
- 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
- 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取
流式api带来的异步响应模型:
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为string文本]
说明:
- 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全
- 可以提供ddl的变更语句
- insert只有after columns, delete只有before columns,而update则会有before / after columns数据.
Client使用例子
1. 创建Connector
a. 创建SimpleCanalConnector (直连ip,不支持server/client的failover机制)
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
b. 创建ClusterCanalConnector (基于zookeeper获取canal server ip,支持server/client的failover机制)
CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");
c. 创建ClusterCanalConnector (基于固定canal server的地址,支持固定的server ip的failover机制,不支持client的failover机制
CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");
2. get/ack/rollback使用
3. RowData数据处理
如果需要更详细的exmpale例子,请下载canal当前最新源码包,里面有个example工程,谢谢.
- 大小: 69.8 KB
- 大小: 45.9 KB
- 大小: 74 KB
- 大小: 57.6 KB
- 大小: 75.5 KB
分享到:
相关推荐
启动Canal server后,通过Canal client API与Canal交互,实现数据同步。 **五、业务使用** 业务上,启动Canal server后,可以使用Canal client API监听数据库变化,根据binlog更新Redis数据或异构数据库数据。对于...
客户端可以是任何能够消费Canal发布数据的应用,例如基于Java的API,或者通过Canal的HTTP API来获取数据变更。 5. **Canal适配器** - **canal.adapter-1.1.5.tar.gz**:这是Canal的适配器模块,提供了与各种系统...
- 监控Canal:通过HTTP API或Web界面查看Canal的状态和日志。 3. **使用Canal Adapter** Canal Adapter是数据同步的关键组件,它支持多种目标系统,如Elasticsearch、HBase、RabbitMQ等。配置Adapter需要: - ...
2. **Client端**:也称为Canal Client,用于连接Canal Server并订阅感兴趣的数据表,当数据发生变化时,Canal Server会将变更事件推送给Client。 3. **Adapter**:适配器层,Canal提供多种适配器,用于将捕获的...
Canal提供了一套完整的API,开发者可以根据需要编写插件或扩展,实现更复杂的数据处理逻辑。例如,自定义过滤规则、支持更多数据库类型等。 7. **最佳实践**: - 使用Canal时,需合理选择binlog的解析模式,如ROW...
服务端(canal-server)负责从MySQL实例中获取binlog并进行解析,客户端(canal-client)则订阅并消费这些数据。Canal通过模拟MySQL的binlog slave来监听数据库的变化,采用基于Row Image的binlog解析方式,确保数据...
6. **配置Canal Client(Canal Deployer)**: Canal Deployer作为客户端,负责管理和监控Canal Server。同样需要配置对应的yaml文件,如`conf/canal.conf`,指定Canal Server的地址、端口和实例名等。 7. **数据...
2. **架构组成**:Canal由三部分构成:Server(服务端)、Client(客户端)和Admin(管理端)。Server负责接收数据库的Binlog事件,Client用于订阅和消费这些事件,而Admin则提供了管理和监控的功能。 3. **安装...
2. **Client**:Canal客户端,用于订阅Canal服务端的数据变更,可以是应用程序或者其他数据存储系统。 3. **Adapter**:适配器模块,用于将Canal服务端推送的事件转换为不同目标存储系统能理解的格式,例如转换为...
- 创建实例:在Canal管理界面或者通过API创建一个数据库实例,配置要监听的数据库和表。 - 配置消费者:编写消费者程序,通过Canal的SDK连接Canal Server,订阅并处理数据变更事件。 - 测试数据同步:在数据库中...
2. **canal-client**:客户端主要用于订阅Canal-server发布的数据变更事件,它提供了一套API供开发者调用。在源码中,`com.alibaba.otter.canal.client.CanalConnector`接口定义了客户端的基本操作,包括连接、订阅...
8. **canalclient**:这个文件可能是一个Canal的客户端示例,包含了连接Canal服务器、订阅binlog事件、解析和处理事件的相关代码。 在实际使用中,开发者需要根据自己的业务需求,结合上述知识点,配置Canal并编写...
【Canal简介】 Canal是阿里巴巴开源的一款针对...订阅者可以通过Canal的API或MQ来接收这些增量数据,实现数据的实时流动。在实际使用过程中,还需关注性能优化、错误处理和监控等方面,确保数据同步的稳定性和高效性。
4. **订阅与消费**:客户端通过Canal的API订阅指定实例,接收到数据变更事件后进行处理,如写入其他数据库或存储系统。 5. **监控与运维**:定期查看logs目录下的日志,确保服务正常运行,及时处理异常。 总结,...
这通常涉及到Canal的Client端配置,如通过Canal Client SDK或者使用Spring Boot集成Canal,实现数据库变更事件的订阅和接收。 四、数据处理 在获取到数据库变更事件后,Canal提供了数据处理的接口,开发者可以根据...
4. 应用集成:commerce_canal项目提供了与业务应用集成的API,使得业务系统能够方便地接入Canal,实现数据的实时同步。 使用这个JAR包,开发者可以轻松地在自己的应用中集成Canal,实时监听数据库的变化,比如将...
在Canal adapter 1.1.6中,你会找到启动和停止Canal服务的相关脚本,如`canal-server.sh`用于启动Canal服务器,`canal-client.sh`则用于运行客户端命令。此外,还有用于配置环境变量的`setenv.sh`,以及监控和管理...
Canal Client通常使用Java API来编写,使得在Java环境中集成和使用变得简单。 **Kafka Consumer**: Kafka是一个分布式流处理平台,它主要用于构建实时数据管道和流应用。在Kafka中,Consumer是读取数据的一方,...
3. **canal-client-master.zip** - 这是Canal的客户端项目,提供了与Canal Server通信的API和工具。开发者可以使用这些客户端库来订阅Canal发布的数据变更事件,进而处理或传递这些变更,实现数据的实时同步到其他...
1. **配置Canal**:在MySQL服务器上创建一个用户,如`canal`,并赋予必要的权限,如`SELECT`, `REPLICATION SLAVE`, `REPLICATION CLIENT`,以便Canal可以读取Binlog并跟踪数据库的变化。 2. **启动Canal**:配置...