`

Cannal实现数据异构

 
阅读更多

问题:
在大型网站架构中,DB会采用分库分表来解决容量和性能的问题。但这带来个新的问题:比如不同维度的查询或者聚合查询
方案:
一般会通过数据异构机制来解决问题。

具体示例:
为提升系统的接单能力,需要对订单表进行分库分表,随之而来的问题:用户如何查询自己的订单列表?
方法1:扫描所有订单表,然后内存聚合,在大流量的架构中肯定是不行的;
方法2:双写,但是双写无法保证一致性;
方法3:订阅数据库变更日志,比如订阅mysql的binlog日志模拟数据库的主从同步机制,然后解析变更日志写到订单列表,从而实现数据异构,
这种机制也能保证数据的一致性。
比如,订单中心按照订单号分库分表,然后异构出:订单列表按照用户分库分表,商家订单,订单缓存,ES搜索

MYSQL主从复制
原理
--canal类似该原理



1.准备:
github:https://github.com/alibaba/canal
包括
1-canal的文档
2-server端
3-client端的
4-例子
5-源码包等等

2.canal概述
canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
说明:目前内部使用的同步,已经支持mysql5.x和Oracle部分版本的日志解析

canal server通过slave机制订阅数据库的binlong日志

基于日志增量订阅&消费支持的业务:
(1)数据库镜像
(2)数据库实时备份
(3)多级索引 (卖家和买家各自分库索引)
(4)search build
(5)业务cache刷新
(6)价格变化等重要业务消息

keyword:数据库同步,增量订阅&消费。


3.canal工作原理:
从上层来看,复制分成三步:
(1)  master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
(2) slave将master的binary log events拷贝到它的中继日志(relay log);
(3) slave重做中继日志中的事件,将改变反映它自己的数据。


4.部署canal:
4.0 前提:
Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH.
需要安装JDK


4.1 部署canal-server:
4.1.1数据库配置:
开启MySQL的binlog功能,并配置binlog模式为row。

在my.cnf 加入如下:
vi /etc/my.cnf
[mysqld] 
log-bin=mysql-bin #开启二进制日志 
binlog-format=ROW #选择row模式  ,不要使用statement或者mix模式
server_id=1  #配置主数据库ID,不能和从数据库重复,即不能和canal的slaveId重复,配置mysql replaction需要定义
binlog提供的三种记录模式:
见书,在使用Canal时建议使用row模式

另外在MYSQL中执行"show binary logs",将看到当前有哪些二进制文件及其大小

4.1.2在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)
CREATE USER canal IDENTIFIED BY 'canal';   
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; 
FLUSH PRIVILEGES;
说明:一定要重启,否则不生效,避免类似这样的错误

4.1.3下载canal https://github.com/alibaba/canal/releases
并解压到相应文件夹,比如我下载的是canal.deployer-1.0.24.tar.gz
mkdir /usr/server/canal
cd /usr/server
tar -zxvf canal.tar.gz -C  canal

canal 文件目录结构
drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin 
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf 
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib 
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs 

4.1.4修改配置-canal的数据库实例 instance.properties
说明:这里可以使用已经存在的 example,也可以新起实例,这个名字和客户端java类中写的名字需要一致
这里我们新配置一个canal Server实例

// vi /usr/server/canal/conf/example/instance.properties 
mkdir -p /usr/server/canal/conf/product
cp /usr/server/canal/conf/example/instance.properties  /usr/server/canal/conf/product/
vi  /usr/server/canal/conf/product/instance.properties

################################################# 
## mysql serverId  必须和master的SQL的ID不一致
canal.instance.mysql.slaveId = 101 
 
# position info:连接的数据库地址,从哪个二进制文件,哪个位置开始 
canal.instance.master.address = 127.0.0.1:3306
# MYSQL主库连接时,起始的binlog文件
canal.instance.master.journal.name =
# MYSQL主库连接时,起始的binlog文件 偏移量  
canal.instance.master.position =
# MYSQL主库连接时,起始的binlog文件 时间戳
canal.instance.master.timestamp =  
 
#从库
#canal.instance.standby.address =  
#canal.instance.standby.journal.name = 
#canal.instance.standby.position =  
#canal.instance.standby.timestamp =  
 
# username/password,需要改成自己的数据库信息 
canal.instance.dbUsername = canal   
canal.instance.dbPassword = canal 
canal.instance.defaultDatabaseName = canal_test 
canal.instance.connectionCharset = UTF-8

# 通过如下配置过滤订阅的是 哪个库中的哪些表,减少不必要的订阅;
# 比如只关注产品数据库,通过如下模式可只订阅产品数据库
# table regex 
# canal.instance.filter.regex = .*\\..* 
  canal.instance.filter.regex = product_\d+\\.*
 
################################################# 

说明:若多个库订阅,则需要配置多个实例,为每个数据库配置一个配置文件


4.1.5 进行canal Server 的配置,修改conf/canal.properties
vi /usr/server/canal/conf/canal.properties

canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=


# 当前canalserver上部署的实例,配置多个时用逗号分隔,此处配置product
canal.destinations= product

# 使用zk持久化模式,这样可以保证集群数据共享,共享HA
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

4.1.5然后cd到bin目录  启动和停止canal-server
启动 
/usr/server/canal/bin/startup.sh & tail -f /usr/server/canal/logs/canal/canal.log

停止
/usr/server/canal/bin/stop.sh 

验证启动状态,查看log文件
tail -f /usr/server/canal/logs/canal/canal.log

2014-07-18 10:21:08.525 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 
2014-07-18 10:21:08.609 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.109.201:11111] 
2014-07-18 10:21:09.037 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ...... 
---> 上述日志信息显示启动canal成功


*********canal server 集群**************
canal server可以部署一台,也可以部署多台,但是只有一台是活跃的,其它的作为备机;canalserver的高可用是通过zk维护的。

需要安装:zookeeper

配置文件如下修改:
vi /usr/server/canal/conf/canal.properties

canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=127.0.0.1:2181


4.2运行canal-client实例:
4.2.1 建立实例maven工程
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample 
[实践:手动创建Maven工程]

4.2.2 添加pom依赖:
<dependency> 
    <groupId>com.alibaba.otter</groupId> 
    <artifactId>canal.client</artifactId> 
    <version>1.0.12</version> 
</dependency> 

4.2.3 更新依赖 mvn install

4.2.4 ClientSamplet.Java 
实例代码
package canal.sample;
/**
 * Created by hp on 14-7-17.
 */ 
import java.net.InetSocketAddress; 
import java.util.List;
 
import com.alibaba.otter.canal.client.CanalConnector; 
import com.alibaba.otter.canal.common.utils.AddressUtils; 
import com.alibaba.otter.canal.protocol.Message; 
import com.alibaba.otter.canal.protocol.CanalEntry.Column; 
import com.alibaba.otter.canal.protocol.CanalEntry.Entry; 
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; 
import com.alibaba.otter.canal.protocol.CanalEntry.EventType; 
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; 
import com.alibaba.otter.canal.protocol.CanalEntry.RowData; 
import com.alibaba.otter.canal.client.*; 
import com.google.protobuf.InvalidProtocolBufferException;
//import org.jetbrains.annotations.NotNull;  
 
public class ClientSample { 
 
    public static void main(String args[]) throws Exception { 
        // 连接 canal Server 
        //CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.121",
                11111), "example", "", ""); 
                //11111), "chapter6", "", "");
       
        /**通过zk连接canal Server
        String zkServers = "192.168.1.121:2181";
        //目标实例:可以自定义一个,例如product
        String destination = "product";
        //连接 canal Server
        CanalConnector connector = CanalConnectors.newClusterConnector(zkServers, destination, "", "");
        **/
       
        int emptyCount = 0;  //空跑的次数
        int totalEmtryCount = 1200;//循环多少次为空时退出
        try { 
            connector.connect(); //连接
            connector.subscribe(".*\\..*"); //订阅所有,和不写此行一个效果
            //connector.subscribe("product_.*\\.product_.*");//订阅product数据库下的product表
            connector.rollback(); 
            while (emptyCount < totalEmtryCount) {
            //while(true){//一直循环
                //批量获取1000个日志(不确认模式)
                Message message = connector.getWithoutAck(1000);//这个值根据实际情况修改 
                long batchId = message.getId();
               
                //以下为空跑计数
                int size = message.getEntries().size(); 
                if (batchId == -1 || size == 0) { 
                    emptyCount++; 
                    System.out.println("empty count : " + emptyCount); 
                    try { 
                        Thread.sleep(1000); 
                    } catch (InterruptedException e) { 
                        e.printStackTrace(); 
                    } 
                } else { 
                    emptyCount = 0;
                    //做数据处理
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); 
                    printEntry(message.getEntries());
                } 
 
                connector.ack(batchId); // 提交确认 
                // connector.rollback(batchId); // 处理失败, 回滚数据 
            } 
 
            System.out.println("empty too many times, exit"); 
        } finally { 
            connector.disconnect(); 
        } 
    } 
 
    //private static void printEntry(@NotNull List<Entry> entrys) {
    private static void printEntry(List<Entry> entrys) throws InvalidProtocolBufferException {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { 
                continue; 
            } 
 
            //如果是行数据
            if(entry.getEntryType() == EntryType.ROWDATA){
                //则解析行变更
                RowChange rowChage = null; 
                try { 
                    rowChage = RowChange.parseFrom(entry.getStoreValue()); 
                } catch (Exception e) { 
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); 
                } 
               
                EventType eventType = rowChage.getEventType();
                //这里捕获binlog变更信息
//                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", 
//                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), 
//                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), 
//                        eventType));
               
                for (RowData rowData : rowChage.getRowDatasList()) {
                    //如果是删除,则获取删除的数据进行业务处理
                    if (eventType == EventType.DELETE) { 
                        printColumn(rowData.getBeforeColumnsList());
                        //List<Column> columns = rowData.getBeforeColumnsList();
                        //delete(columns);
                       
                    //如果是新增修改则获取新增修改数据进行处理
                    } else if (eventType == EventType.INSERT || eventType == eventType.UPDATE) { 
                        //printColumn(rowData.getAfterColumnsList()); 
                        List<Column> columns = rowData.getAfterColumnsList();
                        save(columns);
                       
                    } else { 
                        System.out.println("-------> before"); 
                        printColumn(rowData.getBeforeColumnsList()); 
                        System.out.println("-------> after"); 
                        printColumn(rowData.getAfterColumnsList()); 
                    } 
                }                
               
            }
           
        } 
    } 
   
    //新增的异构操作
    private static void save(List<Column> columns) {
        for (Column col:columns) {
            String name = col.getName();
            String value = col.getValue();
            System.out.println("name: "+ name + ",value:" + value);
           
            //name: uid,value:4
            //name: name,value:10
        }
       
    }

    //private static void printColumn(@NotNull List<Column> columns) { 
    private static void printColumn(List<Column> columns) { 
        for (Column column : columns) { 
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated()); 
        } 
    } 

报错[有个jar包没找到注释掉了]
import org.jetbrains.annotations.NotNull;

通过以上代码,捕获数据库日志变更,然后进行相关业务的处理。无论是数据异构还是缓存更新


4.2.5 运行java实例

启动后看到控制端信息:
empty count : 1 
empty count : 2 
empty count : 3 
empty count : 4 


4.2.6触发数据库变更
create table test ( 
    uid int (4) primary key not null auto_increment, 
    name varchar(10) not null
); 
 
insert into test (name) values('10'); 


4.2.7 client 抓取mysql信息:
================> binlog[mysql-bin.000016:3281] , name[canal_test,test] , eventType : INSERT 
uid : 7    update=false 
name : 10    update=false 
empty count : 1 
empty count : 2 

[发现没有捕获到信息]
tail -f /usr/server/canal/logs/example/example.log

2017-06-03 13:11:28.802 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status
2017-06-03 13:11:28.817 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /127.0.0.1:3306 has an error, retrying. caused by
com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
2017-06-03 13:11:28.820 [destination = chapter6 , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:chapter6[com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation
]

原因很简单:因为Mysql需要开启binlog,我设置了,但是没重启。


这样可可以捕获到mysql信息了:
================> binlog[mysql-bin.000001:557] , name[chapter6,test] , eventType : INSERT
uid : 1    update=true
name : 10    update=true
empty count : 1
empty count : 2


5、部署过程中产生问题:

(1)启动失败,log日志中地址正在使用
1、11111端口正在被占用 可以用 ls -i:11111 查看监听进程谁占用端口 或者 用 ps -ef | grep 11111 查看哪个进程占用端口号  然后 kill -9 进程号  杀掉占用进程
2、可以编辑 canal/conf/canal.properties 中的端口号 ,改为不占用的端口

(2)canal无法抓取mysql触发数据库改变的信息
1、检查mysql是否打开binlog写入功能  检查binlog 是否为行模式。
    show variables like "binlog_format" 

2、检查my.cnf 和 instance.properties 等配置文件填写信息是否正确。

3、检查client 代码 调试实例代码

4、版本兼容问题,canal 1.8 换成 canal 1.7 继续测试

5、查看所有日志文件 分析日志

分享到:
评论

相关推荐

    canal数据异构组件包

    【标题】:“Canal数据异构组件包” 【描述】:“Canal数据异构组件包”是一个专为数据库变更捕获和同步而设计的开源工具,由阿里巴巴开发并维护。这个组件包主要用于实现数据库之间的实时数据复制,尤其适用于...

    canal之-缓存一致性和跨服务器查询的数据异构解决方案.docx

    为解决这两个问题,可以使用 canal 来实现缓存一致性和跨服务器查询的数据异构解决方案。 一、使用 Cache 实现缓存一致性 在项目中,会针对商品、订单等维度为某一个商家店铺自动化建立大约 400 个数据模型,然后...

    开涛高可用高并发-亿级流量核心技术

    15.11 基于Canal实现数据异构 314 15.11.1 Mysql主从复制 315 15.11.2 Canal简介 316 15.11.3 Canal示例 318 第4部分案例 323 16 构建需求响应式亿级商品详情页 324 16.1 商品详情页是什么 324 16.2 商品详情页前端...

    canal技术调研2.0.docx

    启动Canal server后,通过Canal client API与Canal交互,实现数据同步。 **五、业务使用** 业务上,启动Canal server后,可以使用Canal client API监听数据库变化,根据binlog更新Redis数据或异构数据库数据。对于...

    canal安装包免费下载.mysql同步ES工具

    Canal 在这里被当作一个软件或者数据库同步的插件来使用,它提供了简单易用的接口和配置,方便用户集成到自己的系统中,实现数据的实时迁移和备份。 5. **Windows 使用**: 虽然 Canal 常用于服务器环境,但描述...

    canal产品介绍.ppt

    这款纯Java开发的工具,主要功能是基于数据库增量日志的实时解析,实现数据的订阅与消费,尤其对MySQL的支持尤为突出。 在数据库同步领域,传统的MySQL主从复制依赖于I/O线程接收binlog(二进制日志)和SQL线程执行...

    深入浅出Otter与Canal.pdf

    Otter通过解析数据库的增量日志来实现数据的迁移与同步。 2. Canal开源产品: Otter系统基于Canal开源产品,Canal主要用于获取MySQL数据库的增量日志数据。Canal的设计初衷是为了满足阿里巴巴在杭州和美国双机房...

    canal.deployer-1.1.4.tar.zip

    canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger ...

    binlog开源同步组件canal部署包,版本1.1.4

    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库...

    wzq深入浅出Otter与Canal.pdf

    1. **异构库同步**:Otter支持MySQL到MySQL或Oracle的同步,依赖Canal的功能。 2. **单机房同步**:用于数据库升级、数据表迁移和异步二级索引创建,具有低延迟(如1ms以内)的特点。 3. **异地机房同步**:适用于跨...

    爷青回,canal 1.1.6来了,几个重要特性和bug修复.doc

    canal 是一个由阿里云开发的开源项目,主要用于实现 MySQL 数据库增量日志解析,它提供了订阅和消费的功能,使得用户能够轻松地获取数据库的实时变更数据。在数据库同步、数据迁移以及实时数据处理场景中,canal ...

    数据中台的深度思考与总结.docx

    数据中台是现代企业信息化建设的关键组成部分,它旨在整合企业内的多源异构数据,提供高效的数据处理和分析能力,支持业务创新和决策优化。本文将深入探讨数据中台的核心功能、工具和技术方案。 首先,数据汇聚是...

    海量数据的实时读写和查询实践.pptx

    - 按商家ID、结算单号、日期进行数据异构,便于不同业务场景检索。 - 使用Canal进行MySQL数据同步的异构方案,存储方式可选ES、REDIS等。 - 数据聚合在客户端合并请求,服务端做数据聚合。 6. **TiDB使用与优化*...

    ElasticSearch数据迁移与容灾实践.docx

    为实现实时同步,需要基于 MySQL 的 Binlog,使用开源组件 like canal/Mypipe,实时获取到日志进行解析,或者直接吐出到 ES 中。也可以加一层消息队列,把解析到的 Binlog 中的操作数据做持久化,保证实时同步数据的...

    Flink CDC 如何简化实时数据入湖入仓-Flink Forward Asia 2021.pdf

    2.0版本在一定程度上改进了1.0版本的问题,通过使用Canal进行增量数据同步到Kafka,并定时回流到HDFS,实现了小时级别的数据产出。但这种方式的链路仍然很长,涉及的组件众多,导致系统复杂且难以维护。 Flink CDC...

    深入理解otter(数据同步)

    标题:“深入理解otter(数据同步)” 描述:“阿里开源数据同步otter方面的好书,otter是数据同步利器” ...对于希望在自己的项目中实现高效数据同步的开发者或技术团队来说,otter无疑是一个值得学习和探索的强大工具。

    阿里巴巴开源项目:分布式数据库同步系统otter(解决中美异地机房).docx

    这些阶段是为了解决数据来源的差异性,并实现数据同步的高可用性和扩展性。 实现介绍 Otter 的实现基于 Java,使用 Canal 获取数据库增量日志数据,并使用 ZooKeeper 实现分布式状态调度。Otter 的系统架构和工作...

    datax-es:从mysql导入es 重写代码

    DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步...

    阿里巴巴开源项目:分布式数据库同步系统otter(解决中美异地机房).pdf

    它基于Canal开源项目,Canal负责捕获数据库的增量日志数据。Otter 采用典型的管理架构,包括Web管理组件(Manager)和工作节点(Node)。Manager负责配置推送和状态监控,而Node节点则处理实际的同步任务,并将状态...

    otter4使用介绍

    在实现数据同步时,需要考虑数据的同步模式、冲突解决策略以及数据一致性问题。 ### 同步需求 在具体使用Otter4之前,需要明确同步需求,包括同步的业务性、隔离性、关联数据同步、双A写入(避免回环同步和数据冲突...

Global site tag (gtag.js) - Google Analytics