`
m635674608
  • 浏览: 5028954 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

利用Canal完成Mysql数据同步Redis

 
阅读更多

流程
Canal的原理是模拟Slave向Master发送请求,Canal解析binlog,但不将解析结果持久化,而是保存在内存中,每次有客户端读取一次消息,就删除该消息。这里所说的客户端,就需要我们写一个连接Canal的程序,持续从Canal获取数据。

 

 

 

步骤
一、配置Canal
参考https://github.com/alibaba/canal

【mysql配置】
1,配置参数

 

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

2,在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)

    CREATE USER canal IDENTIFIED BY 'canal';    
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;  
    FLUSH PRIVILEGES;  

 

【canal下载和配置】
1,下载canal https://github.com/alibaba/canal/releases 
2,解压

 

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

3,修改配置文件

vi conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234

# position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*\\..*

#################################################


【canal启动和关闭】
1,启动

sh bin/startup.sh

2,查看日志

vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
<pre name="user-content-code">2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

具体instance的日志:

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

3,关闭

sh bin/stop.sh


注意:
1,这里只需要配置好参数后,就可以直接运行
2,Canal没有解析后的文件,不会持久化

 

 

二、创建客户端
参考https://github.com/alibaba/canal/wiki/ClientExample

 


其中一个是连接canal并操作的类,一个是redis的工具类,使用maven主要是依赖包的下载很方便。

 

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.sample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>  
        <groupId>com.alibaba.otter</groupId>  
        <artifactId>canal.client</artifactId>  
        <version>1.0.12</version>  
    </dependency>  
    
    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-test</artifactId>  
        <version>3.1.2.RELEASE</version>  
        <scope>test</scope>  
    </dependency>  
      
    <dependency>  
        <groupId>redis.clients</groupId>  
        <artifactId>jedis</artifactId>  
        <version>2.4.2</version>  
    </dependency>  
    
    </dependencies>
  <build/>
</project>




2,ClientSample代码
这里主要做两个工作,一个是循环从Canal上取数据,一个是将数据更新至Redis

package canal.sample;

import java.net.InetSocketAddress;  
import java.util.List;  

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;  
import com.alibaba.otter.canal.common.utils.AddressUtils;  
import com.alibaba.otter.canal.protocol.Message;  
import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
import com.alibaba.otter.canal.client.*;  
 
public class ClientSample {  

   public static void main(String args[]) {  
	   
       // 创建链接  
       CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),  
               11111), "example", "", "");  
       int batchSize = 1000;  
       try {  
           connector.connect();  
           connector.subscribe(".*\\..*");  
           connector.rollback();    
           while (true) {  
               Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据  
               long batchId = message.getId();  
               int size = message.getEntries().size();  
               if (batchId == -1 || size == 0) {  
                   try {  
                       Thread.sleep(1000);  
                   } catch (InterruptedException e) {  
                       e.printStackTrace();  
                   }  
               } else {  
                   printEntry(message.getEntries());  
               }  
 
               connector.ack(batchId); // 提交确认  
               // connector.rollback(batchId); // 处理失败, 回滚数据  
           }  
 
       } finally {  
           connector.disconnect();  
       }  
   }  
 
   private static void printEntry( List<Entry> entrys) {  
       for (Entry entry : entrys) {  
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
               continue;  
           }  
 
           RowChange rowChage = null;  
           try {  
               rowChage = RowChange.parseFrom(entry.getStoreValue());  
           } catch (Exception e) {  
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                       e);  
           }  
 
           EventType eventType = rowChage.getEventType();  
           System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                   eventType));  
 
           for (RowData rowData : rowChage.getRowDatasList()) {  
               if (eventType == EventType.DELETE) {  
            	   redisDelete(rowData.getBeforeColumnsList());  
               } else if (eventType == EventType.INSERT) {  
            	   redisInsert(rowData.getAfterColumnsList());  
               } else {  
                   System.out.println("-------> before");  
                   printColumn(rowData.getBeforeColumnsList());  
                   System.out.println("-------> after");  
                   redisUpdate(rowData.getAfterColumnsList());  
               }  
           }  
       }  
   }  
 
   private static void printColumn( List<Column> columns) {  
       for (Column column : columns) {  
           System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
       }  
   }  
   
	  private static void redisInsert( List<Column> columns){
		  JSONObject json=new JSONObject();
		  for (Column column : columns) {  
			  json.put(column.getName(), column.getValue());  
	       }  
		  if(columns.size()>0){
			  RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
		  }
	   }
	  
	  private static  void redisUpdate( List<Column> columns){
		  JSONObject json=new JSONObject();
		  for (Column column : columns) {  
			  json.put(column.getName(), column.getValue());  
	       }  
		  if(columns.size()>0){
			  RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
		  }
	  }
  
	   private static  void redisDelete( List<Column> columns){
		   JSONObject json=new JSONObject();
			  for (Column column : columns) {  
				  json.put(column.getName(), column.getValue());  
		       }  
			  if(columns.size()>0){
				  RedisUtil.delKey("user:"+ columns.get(0).getValue());
			  }
	   }

   
}  


3,RedisUtil代码

package canal.sample;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {

	// Redis服务器IP
	private static String ADDR = "10.1.2.190";

	// Redis的端口号
	private static int PORT = 6379;

	// 访问密码
	private static String AUTH = "admin";

	// 可用连接实例的最大数目,默认值为8;
	// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
	private static int MAX_ACTIVE = 1024;

	// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
	private static int MAX_IDLE = 200;

	// 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
	private static int MAX_WAIT = 10000;

	// 过期时间
	protected static int  expireTime = 60 * 60 *24;
	
	// 连接池
	protected static JedisPool pool;

	/**
	 * 静态代码,只在初次调用一次
	 */
	static {
		JedisPoolConfig config = new JedisPoolConfig();
		//最大连接数
		config.setMaxTotal(MAX_ACTIVE);
		//最多空闲实例
		config.setMaxIdle(MAX_IDLE);
		//超时时间
		config.setMaxWaitMillis(MAX_WAIT);
		//
		config.setTestOnBorrow(false);
		pool = new JedisPool(config, ADDR, PORT, 1000);
	}

	/**
	 * 获取jedis实例
	 */
	protected static synchronized Jedis getJedis() {
		Jedis jedis = null;
		try {
			jedis = pool.getResource();
		} catch (Exception e) {
			e.printStackTrace();
			if (jedis != null) {
				pool.returnBrokenResource(jedis);
			}
		}
		return jedis;
	}

	/**
	 * 释放jedis资源
	 * 
	 * @param jedis
	 * @param isBroken
	 */
	protected static void closeResource(Jedis jedis, boolean isBroken) {
		try {
			if (isBroken) {
				pool.returnBrokenResource(jedis);
			} else {
				pool.returnResource(jedis);
			}
		} catch (Exception e) {

		}
	}

	/**
	 *  是否存在key
	 * 
	 * @param key
	 */
	public static boolean existKey(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		try {
			jedis = getJedis();
			jedis.select(0);
			return jedis.exists(key);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return false;
	}

	/**
	 *  删除key
	 * 
	 * @param key
	 */
	public static void delKey(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		try {
			jedis = getJedis();
			jedis.select(0);
			jedis.del(key);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
	}

	/**
	 *  取得key的值
	 * 
	 * @param key
	 */
	public static String stringGet(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		String lastVal = null;
		try {
			jedis = getJedis();
			jedis.select(0);
			lastVal = jedis.get(key);
			jedis.expire(key, expireTime);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return lastVal;
	}

	/**
	 *  添加string数据
	 * 
	 * @param key
	 * @param value
	 */
	public static String stringSet(String key, String value) {
		Jedis jedis = null;
		boolean isBroken = false;
		String lastVal = null;
		try {
			jedis = getJedis();
			jedis.select(0);
			lastVal = jedis.set(key, value);
			jedis.expire(key, expireTime);
		} catch (Exception e) {
			e.printStackTrace();
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return lastVal;
	}

	/**
	 *  添加hash数据
	 * 
	 * @param key
	 * @param field
	 * @param value
	 */
	public static void hashSet(String key, String field, String value) {
		boolean isBroken = false;
		Jedis jedis = null;
		try {
			jedis = getJedis();
			if (jedis != null) {
				jedis.select(0);
				jedis.hset(key, field, value);
				jedis.expire(key, expireTime);
			}
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
	}

}

注意:

1,客户端的Jedis连接不同于项目里的Jedis连接需要Spring注解,直接使用静态方法就可以。

 

 

 

运行
1,运行canal服务端startup.bat / startup.sh
2,运行客户端程序

 


注意
1,虽然canal服务端解析binlog后不会把数据持久化,但canal服务端会记录每次客户端消费的位置(客户端每次ack时服务端会记录pos 点)。如果数据正在更新时,canal服务端挂掉,客户端也会跟着挂掉,mysql依然在插入数据,而redis则因为客户端的关闭而停止更新,造成 mysql和redis的数据不一致。解决办法是,只要重启canal服务端和客户端就可以了,虽然canal服务端因为重启之前解析数据清空,但因为 canal服务端记录的是客户端最后一次获取的pos点,canal服务端再从这个pos点开始解析,客户端更新至redis,以达到数据的一致。
2,如果只有一个canal服务端一个客 户端,肯定存在可用性低的问题,一种做法是用程序来监控canal服务端和客户端,如果挂掉,再重启;一种做法是多个canal服务端+zk,将 canal服务端的配置文件放在zk,任何一个canal服务端挂掉后,切换到其他canal服务端,读到的配置文件的内容就是一致的(还有记录的消费 pos点),保证业务的高可用,客户端可使用相同的做法。

 

 

http://m.blog.csdn.net/article/details?id=50371405

分享到:
评论

相关推荐

    canal 的 mysql 与 redis/memcached/mongodb 的 nosql 数据实时同步方案

    与MySQL同步,可以满足对非结构化数据或半结构化数据的处理需求。 6. **数据同步原理**: Canal通过解析MySQL的binlog,获取到数据库的增删改查操作,然后将这些操作以事件的形式推送到目标NoSQL数据库。这种方式...

    canal + mysql + rabbitmq步骤(CanalListener)

    本教程将详细介绍如何配置Canal监听MySQL的binlog,以及如何通过RabbitMQ实现数据同步。 1. **MySQL Binlog介绍** MySQL的Binary Log(binlog)是一种记录所有对数据库进行更改操作的二进制日志,用于数据恢复、...

    springbootj集成canal+mysql+rabbitmq

    mysql需开启binlog 查看是否开启binlog SHOW VARIABLES LIKE '%log_bin%' 如果log_bin的值为OFF是未开启,为ON是已开启。 未开启的话可以修改/etc/my.cnf 开启binlog [mysqld] log-bin=mysql-bin binlog-format=...

    cpp-基于canal的mysql与redismemcachedmongodb的nosql数据实时同步方案案例canalclient

    基于canal的mysql与redismemcachedmongodb的nosql数据实时同步方案案例canalclient”指的是一个使用C++实现的项目,它利用了阿里巴巴开源的Canal工具来实现实时同步MySQL数据库的数据到NoSQL数据库,如Redis、...

    spring-cloud模块spring-boot微服务 mysql数据同步到elasticsearch 实时同步

    而“spring-cloud Task”标签则暗示了我们可能要使用Spring Cloud Task来创建一次性或周期性的后台任务,完成数据同步工作。 在具体实现过程中,我们可以使用诸如JDBC River(已被废弃)或Logstash,甚至是自定义的...

    大型企业如何实现MySQL到Redis的同步.docx

    如果数据丢失或更新慢了,可能会导致Redis中的数据与MySQL中的数据不同步。因此,我们需要采取降级或补偿方案来解决这种问题。 在实现MySQL到Redis的同步时,我们还需要关心缓存更新的策略。例如,我们可以使用消息...

    在SQL中操作Redis或者Redis集群,达到Redis与MySQL的数据一致性.zip

    2. `Canal`: 类似于Maxwell,用于MySQL到其他系统的实时数据同步。 3. `Redlock`: Redis的分布式锁方案,可解决多节点操作时的并发问题,保证数据一致性。 五、挑战与解决方案 1. 数据延迟:由于网络延迟,可能会...

    mysql-replication:基于canal实现MySQL实时复制工具,把MySQL变化的数据build到Redis Queue

    在这种场景下,我们讨论的是基于Canal实现的MySQL实时复制工具,它能够将MySQL中的数据变化实时地构建到Redis Queue中,以实现更高效的数据处理和分发。 Canal是阿里巴巴开源的一款针对MySQL数据库的数据变更捕获和...

    o-eye:Mysql+Canal+RabbitMQ+Redis打造订单实时大屏

    【标题】"o-eye:Mysql+Canal+RabbitMQ+Redis打造订单实时大屏" 涉及的关键技术是数据库实时监控与数据同步,这是一个常见的大数据实时处理场景,用于构建能够实时展示订单状态的可视化大屏。在这个系统中,MySQL作为...

    数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧.docx

    使用Canal,开发者可以轻松地将MySQL数据同步到Redis、Memcached、Solr、Elasticsearch等服务,确保数据的一致性。此外,Canal还提供了丰富的适配器和插件,便于与其他系统集成,比如Kafka、RabbitMQ等消息中间件,...

    阿里canal组件:canal.deployer-1.1.7-SNAPSHOT.tar.gz

    在这里,你可以配置Canal连接MySQL的方式,设置数据同步的目标,定义过滤规则等。 Canal的工作原理大致如下: - 它监听MySQL的binlog(二进制日志),当数据库发生更改时,Canal会捕获这些变更事件。 - 使用Canal...

    Bifrost:Bifrost ----面向生产环境MySQL同步到Redis,MongoDB,ClickHouse,MySQL等服务的多个中间件

    Bifrost ----面向生产环境MySQL同步到Redis,ClickHouse等服务的多个中间件 漫威里的彩虹桥可以将雷神放置到阿斯加德和地球 而这个Bifrost可以将你MySQL里的数据全量,实时的同步到: 雷迪斯 MongoDB ClickHouse...

    canal_mysql_nosql_sync:基于canal的mysql与redismemcachedmongodb的nosql数据实时同步方案案例demo canal client

    基于Canal的MySql RabbitMQ Redis / memcached / mongodb的nosql同步(多读,nosql延迟不严格需求) 1.mysql主从配置 2.对mysql binlog(row) parser 这一步交给canal 3.MQ对解析后binlog增量数据的推送 4.对MQ数据...

    canalDemo.zip

    【描述】"canal保持mysql与redis同步之demo" 描述了该压缩包的主要功能,即利用Canal这个工具来实现实时地从MySQL数据库中捕获数据变更,并将这些变更同步到Redis缓存系统中。这种同步机制在很多场景下非常有用,...

    canal技术调研2.0.docx

    Canal 是阿里巴巴开源的一款高效的数据同步工具,它主要专注于MySQL数据库的变更数据捕获(CDC, Change Data Capture)并提供数据同步的能力。在IT领域,Canal 被广泛应用于异构数据库同步、多级索引更新以及业务...

    canal-1.1.5.zip

    Canal,由阿里巴巴开源的一款高性能、轻量级的MySQL数据库实时增量数据订阅和消费组件,主要用于实现数据库的数据变更实时同步到其他系统,如大数据处理、缓存更新等场景。在Java开发领域,Canal以其高效稳定的特点...

    canal.deployer-1.1.6.tar.gz

    Canal是阿里巴巴开源的一款高效率、高性能的数据库变更数据捕获工具,主要用于实现MySQL的数据实时同步到其他系统,如Elasticsearch、Hadoop、Redis等。而Canal Deployer则是Canal项目中的部署组件,用于管理和调度...

    canal.tar资源

    Canal是阿里巴巴的一款数据库增量日志订阅与消费组件,主要用于实现MySQL到其他数据存储系统的实时数据复制,例如MySQL到Elasticsearch、MySQL到Hadoop、MySQL到Redis等。其设计灵感来源于Facebook的Binlog ...

    canal.deployer-1.1.4 下载,抱歉,不知道在哪儿取消积分设置,canal.deployer-1.1.4.tar.gz

    Canal Deployer 的核心功能是实现MySQL到其他数据存储的实时数据同步,如MySQL到Redis、MySQL到Elasticsearch、MySQL到Kafka等。这个过程通常涉及到以下几个关键知识点: 1. **Canal Server**:Canal Server 是...

    springBoot整合RabbitMQ案例

    SpringBoot整合RabbitMQ是一个常见的后端开发任务,它涉及到分布式消息传递和Java应用程序的集成。RabbitMQ是一个开源的消息代理和队列服务器,而SpringBoot是基于Spring框架的简化版,提供了快速构建应用程序的方式...

Global site tag (gtag.js) - Google Analytics