`
agapple
  • 浏览: 1596139 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Canal Client API

 
阅读更多

 

快速体验

1.  首先需要先启动canal server,可参见:Canal Server的QuickStart

2.  运行canal client,可参见:canal client的ClientExample

 

如何下载

1.  如果是maven用户,可配置mvn dependency

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>x.y.z</version>
</dependency>

 对应的version版本,可见https://github.com/alibaba/canal/releases

 

2.  其他用户,可通过mvn仓库直接下载jar包

mvn仓库下载url :  http://central.maven.org/maven2/com/alibaba/otter/canal.client/

选择对应的version,下载jar/source/javadoc文件即可. 

 

类设计

在了解具体API之前,需要提前了解下canal client的类设计,这样才可以正确的使用好canal. 

 

大致分为几部分:

  • ClientIdentity
    canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)
  • CanalConnector
    SimpleCanalConnector/ClusterCanalConnector :   两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制
  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server. 
  • ClientRunningMonitor/ClientRunningListener/ClientRunningData
    client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点.  保证整个系统的高可用性.

javadoc查看:

server/client交互协议


  

具体的网络协议格式,可参见:CanalProtocol.proto

get/ack/rollback协议介绍:

  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
    a. batch id 唯一标识
    b. entries 具体的数据对象,可参见下面的数据介绍
  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
    a. 拿够batchSize条记录或者超过timeout时间
    b. timeout=0,阻塞等到足够的batchSize
  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api. 

流式api设计的好处:

  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化.  (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

流式api设计:

  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

流式api带来的异步响应模型:

 

数据对象格式简单介绍:EntryProtocol.proto

Entry  
    Header  
        logfileName [binlog文件名]  
        logfileOffset [binlog position]  
        executeTime [binlog里记录变更发生的时间戳,精确到秒]  
        schemaName   
        tableName  
        eventType [insert/update/delete类型]  
    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  
    storeValue  [byte数据,可展开,对应的类型为RowChange]  
      
RowChange  
    isDdl       [是否是ddl变更操作,比如create table/drop table]  
    sql         [具体的ddl sql]  
    rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]  
        beforeColumns [Column类型的数组,变更前的数据字段]  
        afterColumns [Column类型的数组,变更后的数据字段]  
          
Column   
    index         
    sqlType     [jdbc type]  
    name        [column name]  
    isKey       [是否为主键]  
    updated     [是否发生过变更]  
    isNull      [值是否为null]  
    value       [具体的内容,注意为string文本]  

说明:

  • 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全
  • 可以提供ddl的变更语句
  • insert只有after columns,  delete只有before columns,而update则会有before / after columns数据.

Client使用例子

 

1. 创建Connector

a.  创建SimpleCanalConnector (直连ip,不支持server/client的failover机制)

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
b.  创建ClusterCanalConnector (基于zookeeper获取canal server ip,支持server/client的failover机制)
CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");
c.  创建ClusterCanalConnector (基于固定canal server的地址,支持固定的server ip的failover机制,不支持client的failover机制
CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");

 

2.  get/ack/rollback使用 

 

3.   RowData数据处理


 

如果需要更详细的exmpale例子,请下载canal当前最新源码包,里面有个example工程,谢谢.

  • 大小: 69.8 KB
  • 大小: 45.9 KB
  • 大小: 74 KB
  • 大小: 57.6 KB
  • 大小: 75.5 KB
分享到:
评论

相关推荐

    canal技术调研2.0.docx

    启动Canal server后,通过Canal client API与Canal交互,实现数据同步。 **五、业务使用** 业务上,启动Canal server后,可以使用Canal client API监听数据库变化,根据binlog更新Redis数据或异构数据库数据。对于...

    canal-1.1.5.zip

    客户端可以是任何能够消费Canal发布数据的应用,例如基于Java的API,或者通过Canal的HTTP API来获取数据变更。 5. **Canal适配器** - **canal.adapter-1.1.5.tar.gz**:这是Canal的适配器模块,提供了与各种系统...

    canal-1.1.5(deployer和adapter)

    - 监控Canal:通过HTTP API或Web界面查看Canal的状态和日志。 3. **使用Canal Adapter** Canal Adapter是数据同步的关键组件,它支持多种目标系统,如Elasticsearch、HBase、RabbitMQ等。配置Adapter需要: - ...

    增量数据同步组件-canal,服务器端源码

    2. **Client端**:也称为Canal Client,用于连接Canal Server并订阅感兴趣的数据表,当数据发生变化时,Canal Server会将变更事件推送给Client。 3. **Adapter**:适配器层,Canal提供多种适配器,用于将捕获的...

    canal-canal-1.1.7.tar.gz

    Canal提供了一套完整的API,开发者可以根据需要编写插件或扩展,实现更复杂的数据处理逻辑。例如,自定义过滤规则、支持更多数据库类型等。 7. **最佳实践**: - 使用Canal时,需合理选择binlog的解析模式,如ROW...

    Canal安装包、安装文档

    服务端(canal-server)负责从MySQL实例中获取binlog并进行解析,客户端(canal-client)则订阅并消费这些数据。Canal通过模拟MySQL的binlog slave来监听数据库的变化,采用基于Row Image的binlog解析方式,确保数据...

    canal.deployer_1.1.2.zip

    6. **配置Canal Client(Canal Deployer)**: Canal Deployer作为客户端,负责管理和监控Canal Server。同样需要配置对应的yaml文件,如`conf/canal.conf`,指定Canal Server的地址、端口和实例名等。 7. **数据...

    canal-1.1.4.rar

    2. **架构组成**:Canal由三部分构成:Server(服务端)、Client(客户端)和Admin(管理端)。Server负责接收数据库的Binlog事件,Client用于订阅和消费这些事件,而Admin则提供了管理和监控的功能。 3. **安装...

    canal.admin-1.1.4.rar

    2. **Client**:Canal客户端,用于订阅Canal服务端的数据变更,可以是应用程序或者其他数据存储系统。 3. **Adapter**:适配器模块,用于将Canal服务端推送的事件转换为不同目标存储系统能理解的格式,例如转换为...

    canal.server-1.1.2.zip

    - 创建实例:在Canal管理界面或者通过API创建一个数据库实例,配置要监听的数据库和表。 - 配置消费者:编写消费者程序,通过Canal的SDK连接Canal Server,订阅并处理数据变更事件。 - 测试数据同步:在数据库中...

    canal-master.zip

    2. **canal-client**:客户端主要用于订阅Canal-server发布的数据变更事件,它提供了一套API供开发者调用。在源码中,`com.alibaba.otter.canal.client.CanalConnector`接口定义了客户端的基本操作,包括连接、订阅...

    canal数据binlog同步demo

    8. **canalclient**:这个文件可能是一个Canal的客户端示例,包含了连接Canal服务器、订阅binlog事件、解析和处理事件的相关代码。 在实际使用中,开发者需要根据自己的业务需求,结合上述知识点,配置Canal并编写...

    canal简介.ppt

    【Canal简介】 Canal是阿里巴巴开源的一款针对...订阅者可以通过Canal的API或MQ来接收这些增量数据,实现数据的实时流动。在实际使用过程中,还需关注性能优化、错误处理和监控等方面,确保数据同步的稳定性和高效性。

    canal数据同步工具

    4. **订阅与消费**:客户端通过Canal的API订阅指定实例,接收到数据变更事件后进行处理,如写入其他数据库或存储系统。 5. **监控与运维**:定期查看logs目录下的日志,确保服务正常运行,及时处理异常。 总结,...

    product-center-canal.zip

    这通常涉及到Canal的Client端配置,如通过Canal Client SDK或者使用Spring Boot集成Canal,实现数据库变更事件的订阅和接收。 四、数据处理 在获取到数据库变更事件后,Canal提供了数据处理的接口,开发者可以根据...

    Generator-Canal.rar

    4. 应用集成:commerce_canal项目提供了与业务应用集成的API,使得业务系统能够方便地接入Canal,实现数据的实时同步。 使用这个JAR包,开发者可以轻松地在自己的应用中集成Canal,实时监听数据库的变化,比如将...

    canal.adapter-1.1.6.tar.gz

    在Canal adapter 1.1.6中,你会找到启动和停止Canal服务的相关脚本,如`canal-server.sh`用于启动Canal服务器,`canal-client.sh`则用于运行客户端命令。此外,还有用于配置环境变量的`setenv.sh`,以及监控和管理...

    data-subscribe:数据订阅

    Canal Client通常使用Java API来编写,使得在Java环境中集成和使用变得简单。 **Kafka Consumer**: Kafka是一个分布式流处理平台,它主要用于构建实时数据管道和流应用。在Kafka中,Consumer是读取数据的一方,...

    Downloads.rar

    3. **canal-client-master.zip** - 这是Canal的客户端项目,提供了与Canal Server通信的API和工具。开发者可以使用这些客户端库来订阅Canal发布的数据变更事件,进而处理或传递这些变更,实现数据的实时同步到其他...

    mysql-replication:基于canal实现MySQL实时复制工具,把MySQL变化的数据build到Redis Queue

    1. **配置Canal**:在MySQL服务器上创建一个用户,如`canal`,并赋予必要的权限,如`SELECT`, `REPLICATION SLAVE`, `REPLICATION CLIENT`,以便Canal可以读取Binlog并跟踪数据库的变化。 2. **启动Canal**:配置...

Global site tag (gtag.js) - Google Analytics