流程
Canal的原理是模拟Slave向Master发送请求,Canal解析binlog,但不将解析结果持久化,而是保存在内存中,每次有客户端读取一次消息,就删除该消息。这里所说的客户端,就需要我们写一个连接Canal的程序,持续从Canal获取数据。
步骤
一、配置Canal
参考https://github.com/alibaba/canal
【mysql配置】
1,配置参数
[mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
2,在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
【canal下载和配置】
1,下载canal https://github.com/alibaba/canal/releases
2,解压
mkdir /tmp/canal tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
3,修改配置文件
vi conf/example/instance.properties
################################################# ## mysql serverId canal.instance.mysql.slaveId = 1234 # position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 # table regex canal.instance.filter.regex = .*\\..* #################################################
【canal启动和关闭】
1,启动
sh bin/startup.sh
2,查看日志
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. <pre name="user-content-code">2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111] 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
具体instance的日志:
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
3,关闭
sh bin/stop.sh
注意:
1,这里只需要配置好参数后,就可以直接运行
2,Canal没有解析后的文件,不会持久化
二、创建客户端
参考https://github.com/alibaba/canal/wiki/ClientExample
其中一个是连接canal并操作的类,一个是redis的工具类,使用maven主要是依赖包的下载很方便。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.otter</groupId> <artifactId>canal.sample</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>3.1.2.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.4.2</version> </dependency> </dependencies> <build/> </project>
2,ClientSample代码
这里主要做两个工作,一个是循环从Canal上取数据,一个是将数据更新至Redis
package canal.sample; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.client.*; public class ClientSample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } finally { connector.disconnect(); } } private static void printEntry( List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); redisUpdate(rowData.getAfterColumnsList()); } } } } private static void printColumn( List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } private static void redisInsert( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisUpdate( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisDelete( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.delKey("user:"+ columns.get(0).getValue()); } } }
3,RedisUtil代码
package canal.sample; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisUtil { // Redis服务器IP private static String ADDR = "10.1.2.190"; // Redis的端口号 private static int PORT = 6379; // 访问密码 private static String AUTH = "admin"; // 可用连接实例的最大数目,默认值为8; // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。 private static int MAX_ACTIVE = 1024; // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。 private static int MAX_IDLE = 200; // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException; private static int MAX_WAIT = 10000; // 过期时间 protected static int expireTime = 60 * 60 *24; // 连接池 protected static JedisPool pool; /** * 静态代码,只在初次调用一次 */ static { JedisPoolConfig config = new JedisPoolConfig(); //最大连接数 config.setMaxTotal(MAX_ACTIVE); //最多空闲实例 config.setMaxIdle(MAX_IDLE); //超时时间 config.setMaxWaitMillis(MAX_WAIT); // config.setTestOnBorrow(false); pool = new JedisPool(config, ADDR, PORT, 1000); } /** * 获取jedis实例 */ protected static synchronized Jedis getJedis() { Jedis jedis = null; try { jedis = pool.getResource(); } catch (Exception e) { e.printStackTrace(); if (jedis != null) { pool.returnBrokenResource(jedis); } } return jedis; } /** * 释放jedis资源 * * @param jedis * @param isBroken */ protected static void closeResource(Jedis jedis, boolean isBroken) { try { if (isBroken) { pool.returnBrokenResource(jedis); } else { pool.returnResource(jedis); } } catch (Exception e) { } } /** * 是否存在key * * @param key */ public static boolean existKey(String key) { Jedis jedis = null; boolean isBroken = false; try { jedis = getJedis(); jedis.select(0); return jedis.exists(key); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } return false; } /** * 删除key * * @param key */ public static void delKey(String key) { Jedis jedis = null; boolean isBroken = false; try { jedis = getJedis(); jedis.select(0); jedis.del(key); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } } /** * 取得key的值 * * @param key */ public static String stringGet(String key) { Jedis jedis = null; boolean isBroken = false; String lastVal = null; try { jedis = getJedis(); jedis.select(0); lastVal = jedis.get(key); jedis.expire(key, expireTime); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } return lastVal; } /** * 添加string数据 * * @param key * @param value */ public static String stringSet(String key, String value) { Jedis jedis = null; boolean isBroken = false; String lastVal = null; try { jedis = getJedis(); jedis.select(0); lastVal = jedis.set(key, value); jedis.expire(key, expireTime); } catch (Exception e) { e.printStackTrace(); isBroken = true; } finally { closeResource(jedis, isBroken); } return lastVal; } /** * 添加hash数据 * * @param key * @param field * @param value */ public static void hashSet(String key, String field, String value) { boolean isBroken = false; Jedis jedis = null; try { jedis = getJedis(); if (jedis != null) { jedis.select(0); jedis.hset(key, field, value); jedis.expire(key, expireTime); } } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } } }
注意:
1,客户端的Jedis连接不同于项目里的Jedis连接需要Spring注解,直接使用静态方法就可以。
运行
1,运行canal服务端startup.bat / startup.sh
2,运行客户端程序
注意
1,虽然canal服务端解析binlog后不会把数据持久化,但canal服务端会记录每次客户端消费的位置(客户端每次ack时服务端会记录pos 点)。如果数据正在更新时,canal服务端挂掉,客户端也会跟着挂掉,mysql依然在插入数据,而redis则因为客户端的关闭而停止更新,造成 mysql和redis的数据不一致。解决办法是,只要重启canal服务端和客户端就可以了,虽然canal服务端因为重启之前解析数据清空,但因为 canal服务端记录的是客户端最后一次获取的pos点,canal服务端再从这个pos点开始解析,客户端更新至redis,以达到数据的一致。
2,如果只有一个canal服务端和一个客 户端,肯定存在可用性低的问题,一种做法是用程序来监控canal服务端和客户端,如果挂掉,再重启;一种做法是多个canal服务端+zk,将 canal服务端的配置文件放在zk,任何一个canal服务端挂掉后,切换到其他canal服务端,读到的配置文件的内容就是一致的(还有记录的消费 pos点),保证业务的高可用,客户端可使用相同的做法。
http://m.blog.csdn.net/article/details?id=50371405
相关推荐
与MySQL同步,可以满足对非结构化数据或半结构化数据的处理需求。 6. **数据同步原理**: Canal通过解析MySQL的binlog,获取到数据库的增删改查操作,然后将这些操作以事件的形式推送到目标NoSQL数据库。这种方式...
本教程将详细介绍如何配置Canal监听MySQL的binlog,以及如何通过RabbitMQ实现数据同步。 1. **MySQL Binlog介绍** MySQL的Binary Log(binlog)是一种记录所有对数据库进行更改操作的二进制日志,用于数据恢复、...
mysql需开启binlog 查看是否开启binlog SHOW VARIABLES LIKE '%log_bin%' 如果log_bin的值为OFF是未开启,为ON是已开启。 未开启的话可以修改/etc/my.cnf 开启binlog [mysqld] log-bin=mysql-bin binlog-format=...
基于canal的mysql与redismemcachedmongodb的nosql数据实时同步方案案例canalclient”指的是一个使用C++实现的项目,它利用了阿里巴巴开源的Canal工具来实现实时同步MySQL数据库的数据到NoSQL数据库,如Redis、...
而“spring-cloud Task”标签则暗示了我们可能要使用Spring Cloud Task来创建一次性或周期性的后台任务,完成数据同步工作。 在具体实现过程中,我们可以使用诸如JDBC River(已被废弃)或Logstash,甚至是自定义的...
如果数据丢失或更新慢了,可能会导致Redis中的数据与MySQL中的数据不同步。因此,我们需要采取降级或补偿方案来解决这种问题。 在实现MySQL到Redis的同步时,我们还需要关心缓存更新的策略。例如,我们可以使用消息...
2. `Canal`: 类似于Maxwell,用于MySQL到其他系统的实时数据同步。 3. `Redlock`: Redis的分布式锁方案,可解决多节点操作时的并发问题,保证数据一致性。 五、挑战与解决方案 1. 数据延迟:由于网络延迟,可能会...
在这种场景下,我们讨论的是基于Canal实现的MySQL实时复制工具,它能够将MySQL中的数据变化实时地构建到Redis Queue中,以实现更高效的数据处理和分发。 Canal是阿里巴巴开源的一款针对MySQL数据库的数据变更捕获和...
【标题】"o-eye:Mysql+Canal+RabbitMQ+Redis打造订单实时大屏" 涉及的关键技术是数据库实时监控与数据同步,这是一个常见的大数据实时处理场景,用于构建能够实时展示订单状态的可视化大屏。在这个系统中,MySQL作为...
使用Canal,开发者可以轻松地将MySQL数据同步到Redis、Memcached、Solr、Elasticsearch等服务,确保数据的一致性。此外,Canal还提供了丰富的适配器和插件,便于与其他系统集成,比如Kafka、RabbitMQ等消息中间件,...
在这里,你可以配置Canal连接MySQL的方式,设置数据同步的目标,定义过滤规则等。 Canal的工作原理大致如下: - 它监听MySQL的binlog(二进制日志),当数据库发生更改时,Canal会捕获这些变更事件。 - 使用Canal...
Bifrost ----面向生产环境MySQL同步到Redis,ClickHouse等服务的多个中间件 漫威里的彩虹桥可以将雷神放置到阿斯加德和地球 而这个Bifrost可以将你MySQL里的数据全量,实时的同步到: 雷迪斯 MongoDB ClickHouse...
基于Canal的MySql RabbitMQ Redis / memcached / mongodb的nosql同步(多读,nosql延迟不严格需求) 1.mysql主从配置 2.对mysql binlog(row) parser 这一步交给canal 3.MQ对解析后binlog增量数据的推送 4.对MQ数据...
Bifrost 可以将数据全部、实时的同步到多个目标端目标端支持目标端 数据链接库 纪要Redis 不 MongoDB 不 MySQL 是的 ClickHouse 是的 内存缓存 不 RabbitMQ 是的 卡夫卡 是的 支持canal,debezium格式,以及kafka ...
【描述】"canal保持mysql与redis同步之demo" 描述了该压缩包的主要功能,即利用Canal这个工具来实现实时地从MySQL数据库中捕获数据变更,并将这些变更同步到Redis缓存系统中。这种同步机制在很多场景下非常有用,...
Canal 是阿里巴巴开源的一款高效的数据同步工具,它主要专注于MySQL数据库的变更数据捕获(CDC, Change Data Capture)并提供数据同步的能力。在IT领域,Canal 被广泛应用于异构数据库同步、多级索引更新以及业务...
Canal,由阿里巴巴开源的一款高性能、轻量级的MySQL数据库实时增量数据订阅和消费组件,主要用于实现数据库的数据变更实时同步到其他系统,如大数据处理、缓存更新等场景。在Java开发领域,Canal以其高效稳定的特点...
Canal是阿里巴巴开源的一款高效率、高性能的数据库变更数据捕获工具,主要用于实现MySQL的数据实时同步到其他系统,如Elasticsearch、Hadoop、Redis等。而Canal Deployer则是Canal项目中的部署组件,用于管理和调度...
Canal是阿里巴巴的一款数据库增量日志订阅与消费组件,主要用于实现MySQL到其他数据存储系统的实时数据复制,例如MySQL到Elasticsearch、MySQL到Hadoop、MySQL到Redis等。其设计灵感来源于Facebook的Binlog ...
Canal Deployer 的核心功能是实现MySQL到其他数据存储的实时数据同步,如MySQL到Redis、MySQL到Elasticsearch、MySQL到Kafka等。这个过程通常涉及到以下几个关键知识点: 1. **Canal Server**:Canal Server 是...