1.前期间准备工作
- mysql 开启binlog写入功能,配置binlog-format为ROW模式
log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=8 # 不要和 canal 的 slaveId 重复
- 创建mysql账号
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
- 安装canal
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz mkdir /usr/local/canal tar -zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
- 修改配置文件example/instance.properties
canal.instance.mysql.slaveId=8 canal.instance.master.address=192.168.207.200:3306
- 启动canal服务 ./startup.sh
- 编写java程序 添加依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
连接canal@Bean public CanalConnector getCanalConnector() { canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(canalHost, Integer.valueOf(canalPort))), canalDestination, canalUsername, canalPassword); canalConnector.connect(); // 指定filter,格式 {database}.{table},这里不做过滤,过滤操作留给用户 canalConnector.subscribe(); // 回滚寻找上次中断的位置 canalConnector.rollback(); logger.info("canal客户端启动成功"); return canalConnector; }
调度任务拉取数据package com.demo.es.scheduling; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.Message; import com.demo.es.event.DeleteAbstractCanalEvent; import com.demo.es.event.InsertAbstractCanalEvent; import com.demo.es.event.UpdateAbstractCanalEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; /** * @version 1.0 * @since 2017-08-26 22:44:00 */ @Component public class CanalScheduling implements Runnable, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(CanalScheduling.class); private ApplicationContext applicationContext; @Resource private CanalConnector canalConnector; @Scheduled(fixedDelay = 100) @Override public void run() { try { int batchSize = 1000; // Message message = connector.get(batchSize); Message message = canalConnector.getWithoutAck(batchSize); long batchId = message.getId(); logger.debug("scheduled_batchId=" + batchId); try { List<Entry> entries = message.getEntries(); if (batchId != -1 && entries.size() > 0) { entries.forEach(entry -> { if (entry.getEntryType() == EntryType.ROWDATA) { publishCanalEvent(entry); } }); } canalConnector.ack(batchId); } catch (Exception e) { logger.error("发送监听事件失败!batchId回滚,batchId=" + batchId, e); canalConnector.rollback(batchId); } } catch (Exception e) { logger.error("canal_scheduled异常!", e); } } private void publishCanalEvent(Entry entry) { EventType eventType = entry.getHeader().getEventType(); switch (eventType) { case INSERT: applicationContext.publishEvent(new InsertAbstractCanalEvent(entry)); break; case UPDATE: applicationContext.publishEvent(new UpdateAbstractCanalEvent(entry)); break; case DELETE: applicationContext.publishEvent(new DeleteAbstractCanalEvent(entry)); break; default: break; } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
解析数据package com.demo.es.listener; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.demo.es.event.AbstractCanalEvent; import com.demo.es.model.DatabaseTableModel; import com.demo.es.model.IndexTypeModel; import com.demo.es.service.MappingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationListener; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @version 1.0 * @since 2017-08-28 14:40:00 */ public abstract class AbstractCanalListener<EVENT extends AbstractCanalEvent> implements ApplicationListener<EVENT> { private static final Logger logger = LoggerFactory.getLogger(AbstractCanalListener.class); @Resource private MappingService mappingService; @Override public void onApplicationEvent(EVENT event) { Entry entry = event.getEntry(); String database = entry.getHeader().getSchemaName(); String table = entry.getHeader().getTableName(); IndexTypeModel indexTypeModel = mappingService.getIndexType(new DatabaseTableModel(database, table)); if (indexTypeModel == null) { return; } String index = indexTypeModel.getIndex(); String type = indexTypeModel.getType(); RowChange change; try { change = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { logger.error("canalEntry_parser_error,根据CanalEntry获取RowChange失败!", e); return; } change.getRowDatasList().forEach(rowData -> doSync(database, table, index, type, rowData)); } Map<String, Object> parseColumnsToMap(List<Column> columns) { Map<String, Object> jsonMap = new HashMap<>(); columns.forEach(column -> { if (column == null) { return; } jsonMap.put(column.getName(), column.getIsNull() ? null : mappingService.getElasticsearchTypeObject(column.getMysqlType(), column.getValue())); }); return jsonMap; } protected abstract void doSync(String database, String table, String index, String type, RowData rowData); }
package com.demo.es.listener; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.demo.es.event.InsertAbstractCanalEvent; import com.demo.es.service.ElasticsearchService; import com.demo.es.service.MappingService; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; import java.util.Map; import java.util.Optional; /** * @version 1.0 * @since 2017-08-26 22:32:00 */ @Component public class InsertCanalListener extends AbstractCanalListener<InsertAbstractCanalEvent> { private static final Logger logger = LoggerFactory.getLogger(InsertCanalListener.class); @Resource private MappingService mappingService; @Resource private ElasticsearchService elasticsearchService; @Override protected void doSync(String database, String table, String index, String type, RowData rowData) { List<Column> columns = rowData.getAfterColumnsList(); String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id"); Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null); if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) { logger.warn("insert_column_find_null_warn insert从column中找不到主键,database=" + database + ",table=" + table); return; } logger.debug("insert_column_id_info insert主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue()); Map<String, Object> dataMap = parseColumnsToMap(columns); elasticsearchService.insertById(index, type, idColumn.getValue(), dataMap); logger.debug("insert_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",data=" + JSONObject.toJSONString(dataMap)); } }
package com.demo.es.service.impl; import com.demo.es.model.DatabaseTableModel; import com.demo.es.model.IndexTypeModel; import com.demo.es.service.MappingService; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; /** * @version 1.0 * @since 2017-08-27 13:14:00 */ @Service @PropertySource("classpath:mapping.properties") @ConfigurationProperties public class MappingServiceImpl implements MappingService, InitializingBean { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private Map<String, String> dbEsMapping; private BiMap<DatabaseTableModel, IndexTypeModel> dbEsBiMapping; private Map<String, String> tablePrimaryKeyMap; private Map<String, Converter> mysqlTypeElasticsearchTypeMapping; @Override public Map<String, String> getTablePrimaryKeyMap() { return tablePrimaryKeyMap; } @Override public void setTablePrimaryKeyMap(Map<String, String> tablePrimaryKeyMap) { this.tablePrimaryKeyMap = tablePrimaryKeyMap; } @Override public IndexTypeModel getIndexType(DatabaseTableModel databaseTableModel) { return dbEsBiMapping.get(databaseTableModel); } @Override public DatabaseTableModel getDatabaseTableModel(IndexTypeModel indexTypeModel) { return dbEsBiMapping.inverse().get(indexTypeModel); } @Override public Object getElasticsearchTypeObject(String mysqlType, String data) { Optional<Entry<String, Converter>> result = mysqlTypeElasticsearchTypeMapping.entrySet().parallelStream().filter(entry -> mysqlType.toLowerCase().contains(entry.getKey())).findFirst(); return (result.isPresent() ? result.get().getValue() : (Converter) data1 -> data1).convert(data); } @Override public void afterPropertiesSet() throws Exception { dbEsBiMapping = HashBiMap.create(); dbEsMapping.forEach((key, value) -> { String[] keyStrings = StringUtils.split(key, "."); String[] valueStrings = StringUtils.split(value, "."); dbEsBiMapping.put(new DatabaseTableModel(keyStrings[0], keyStrings[1]), new IndexTypeModel(valueStrings[0], valueStrings[1])); }); mysqlTypeElasticsearchTypeMapping = Maps.newHashMap(); mysqlTypeElasticsearchTypeMapping.put("char", data -> data); mysqlTypeElasticsearchTypeMapping.put("text", data -> data); mysqlTypeElasticsearchTypeMapping.put("blob", data -> data); mysqlTypeElasticsearchTypeMapping.put("int", Long::valueOf); mysqlTypeElasticsearchTypeMapping.put("date", data -> LocalDateTime.parse(data, FORMATTER)); mysqlTypeElasticsearchTypeMapping.put("time", data -> LocalDateTime.parse(data, FORMATTER)); mysqlTypeElasticsearchTypeMapping.put("float", Double::valueOf); mysqlTypeElasticsearchTypeMapping.put("double", Double::valueOf); mysqlTypeElasticsearchTypeMapping.put("decimal", Double::valueOf); } public Map<String, String> getDbEsMapping() { return dbEsMapping; } public void setDbEsMapping(Map<String, String> dbEsMapping) { this.dbEsMapping = dbEsMapping; } private interface Converter { Object convert(String data); } }
相关推荐
名称:canal [kə'næl] 译意: 水道/管道/沟渠 语言: 纯java开发 ...定位: 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql 关键词: mysql binlog parser / real-time / queue&topic;
Canal 是mysql数据库binlog的增量订阅&消费组件。 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息 关于 ...
Canal 是mysql数据库binlog的增量订阅&消费组件。 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息 关于 ...
MySQL是世界上最流行的关系型数据库管理系统之一,而MySQL的binlog(二进制日志)是其核心特性之一,用于记录所有对数据库进行的更改操作。`mysql-binlog-connector-java` 是一个开源库,它允许Java应用程序实时读取...
canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费 从2010年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由2010年1月1日起,早期阿里巴巴因为...
翼二进制日志-go是一个MySQL(MariaDB的)主从复制客户端中,使用复制协议以读取的顶部从MySQL或MariaDB的的二进制日志生成事件 ,并通过HTTP,TCP,Redis的队列,卡夫卡发送JSON数据或订阅。 它允许您接收事件,...
基于日志增量订阅和消费的业务包括 数据库镜像 数据库实时备份 索引构建和实时维护(拆分异构索引、倒排索引等) 业务 cache 刷新 带业务逻辑的增量数据处理 MySQL主备复制原理 MySQL master 将数据变更写入二进制...
阿里巴巴mysql数据库binlog的增量订阅&消费组件 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息
canal-python 是阿里巴巴开源项目 是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。 基于...
canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。 ## 工作原理 ### 全量 暴露Http接口 > 读取数据库会加**读锁** > 主键必须为数字类型 #### 过程 1. 首先会根据所给的数据库主键字段,拿到最大的主键数字...
对一次学习阿里巴巴mysql数据库binlog的增量订阅&消费组件 的朋友了解入门这个技术
CanalSharp是阿里巴巴开源项目mysql数据库binlog的增量订阅和消费组件Canal的.NET客户端。在数据库中,更改数据捕获( CDC )是一组软件设计模式,用于确定和跟踪已更改的数据,骑士可以使用已更改的数据来采取措施...
canal基于MySQL数据库增量日志解析,提供增量数据订阅和消费,是阿里开源CDC工具,它可以获取MySQL binlog数据并解析,然后将数据变动传输给下游。基于canal,可以实现从MySQL到其他数据库的实时同步 MySQL主备复制...
"canal数据binlog同步demo" 是一个关于使用Canal工具实现MySQL数据库binlog(二进制日志)同步的实例展示。Canal是阿里巴巴开源的一个数据库实时增量数据订阅与推送组件,它能监听MySQL的binlog事件,并将其转化为...
canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。 工作原理 全量 暴露Http接口(接口定义见),待调用后开启后台线程,通过主键分批同步指定数据库中数据到Elasticsearch 读取数据库会加读锁 主键必须为数字...
3. 支持通过不同方式解析和订阅 MySQL binlog,例如通过 GTID。 4. 支持高性能,实时数据同步。 5. Canal Server 和 Canal Client 都支持 HA / Scalability,由 Apache ZooKeeper 提供支持。 6. Docker 支持。 ...
1. 增量数据订阅与消费Canal能够解析MySQL的binlog,实时获取数据库的增量变更数据,并将其同步到其他系统中。 2. 多种数据库操作支持支持插入、更新、删除等多种数据库操作的同步。 3. 高可用性通过ZooKeeper实现高...
Binlog(Binary Log)是MySQL数据库的一种日志系统,记录了所有对数据库进行的更改操作。通过监听和解析MySQL的Binlog事件,我们可以获取到数据库的实时变更信息,进而实现增量数据的捕获。常见的开源工具如Canal、...
Canal是一款由阿里巴巴开源的、基于MySQL binlog的增量日志订阅与消费组件,它能够监听MySQL数据库的增删改查操作,并将这些变更事件转发到各种目标系统,如RabbitMQ消息队列。本教程将详细介绍如何配置Canal监听...
从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。 基于日志增量订阅和消费的业务包括 数据库镜像 数据库实时备份 索引构建和实时维护(拆分...