/**
* ProjectName:etl-canal-war<BR>
* File name: EntCanalClientController.java <BR>
* Project:etl-canal-war <BR>
* Version: v 1.0 <BR>
* Date: 2017年3月15日 下午3:04:32 <BR>
* Description: <BR>
* Function List: <BR>
*/
package com.sunlands.eagle.canal.sample.product;
import java.net.InetSocketAddress;
//import java.util.Date;
import java.util.ResourceBundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.sunlands.eagle.sms.SendSmsUtilService;
/**
* 功能描述: . <BR>
* 历史版本: <Br>
* 时间:2017年3月15日 下午3:04:32 <BR>
* 变更原因: <BR>
* 变化内容 :<BR>
* 首次开发时间:2017年3月15日 下午3:04:32 <BR>
* 描述: <BR>
* 版本:V1.0
*/
public class EntNewCanalClientProductController {
private static final Logger logger = LoggerFactory
.getLogger(EntNewCanalClientProductController.class);
@Autowired
private CanalEntryCommonEntNewProduct entry;
/**
* 方法说明:新模式调用初方法 . <BR>
* @see com.sunlands.eagle.canal.sample.EntNewCanalClientController <BR>
* @return: void
* @Datetime:2017年3月15日 下午3:05:46 <BR>
*/
public void entNew_canalInit() {
Runnable task = new Runnable() {
@Override
public void run() {
try {
logger.info("entNew_CanalClient---接口正常");
entNew_process();
sendSms("202entNew_CanalClient---接口正常");
} catch (Exception e) {
// TODO Auto-generated catch block
logger.info("202entNew_CanalClient---接口异常"+e.getMessage());
try {
sendSms("202entNew_CanalClient企业APP-新企业家-产品---接口异常");
Thread.sleep(60000);
entNew_canalInit();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
};
new Thread(task).start();
}
private void sendSms(String message) {
SendSmsUtilService service = SendSmsUtilService
.getInstance();
String defaultReceivers = ResourceBundle.getBundle(
"sms").getString("defaultReceivers");
String[] mobiles = defaultReceivers.split(",");
for (String mobile : mobiles) {
service.sendMessageSimple(mobile, message
+ System.currentTimeMillis());
}
}
/**
* 方法说明: 新模式连接Canal-Server. <BR>
* @see com.sunlands.eagle.canal.sample.EntNewCanalClientController <BR>
* @return: void
* @Datetime:2017年3月15日 下午3:07:00 <BR>
*/
public void entNew_process() {
// 创建链接 Canal-Server
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP",
11116), "dbName", "username", paseword");
int batchSize = 1000;
try {
connector.connect();
//新模式-抽取指定的表
connector.subscribe();
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
entry.printEntry(message.getEntries());
} catch (Exception e) {
// TODO Auto-generated catch block
connector.rollback(batchId); // 处理失败, 回滚数据
sendSms("202entNew_CanalClient---接口异常");
e.printStackTrace();
}
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
}
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* ProjectName:etl-canal-war<BR>
* File name: CanalEntryCommon.java <BR>
* Project:etl-canal-war <BR>
* Version: v 1.0 <BR>
* Date: 2017年3月15日 下午3:50:39 <BR>
* Description: <BR>
* Function List: <BR>
*/
package com.sunlands.eagle.canal.sample.product;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.sunlands.eagle.canal.common.CommonProductUtil;
import com.sunlands.eagle.dmo.ClientDTO;
import com.sunlands.eagle.entNew.biz.CanalSampleEntNewService;
/**
* 功能描述: 公共方法. <BR>
* 历史版本: <Br>
* 时间:2017年3月15日 下午3:50:39 <BR>
* 变更原因: <BR>
* 变化内容 :<BR>
* 首次开发时间:2017年3月15日 下午3:50:39 <BR>
* 描述: <BR>
* 版本:V1.0
*/
@Controller
public class CanalEntryCommonEntNewProduct {
private static final Logger logger = LoggerFactory
.getLogger(CanalEntryCommonEntNewProduct.class);
@Autowired
private CanalSampleEntNewService canalSampleEntNewServiceImpl;
/**
* 方法说明:数据处理 . <BR>
* @see com.sunlands.eagle.canal.sample.CanalEntNewryCommonComProduct <BR>
* @param entrys
* @return
* @return: ClientDTO
* @Datetime:2017年3月15日 下午3:51:52 <BR>
*/
public void printEntry( List<Entry> entrys) throws Exception{
List<String> ddlList = new ArrayList<String>();
List<String> updComList = new ArrayList<String>();
ClientDTO cdto = new ClientDTO();
String jsonSql = "";
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN && entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
logger.error("ERROR "+entry.toString());
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
//eventType == EventType.QUERY ||
if(rowChage.getIsDdl()){
logger.info("DDL---------->"+rowChage.getSql());
ddlList.add(rowChage.getSql().replace("`ent_portal_prod`.", "`inner_ent_portal_prod`."));
// if(rowChage.getSql().indexOf("FOREIGN KEY")!=-1){
// }else{
//
// }
// canalSampleServiceImpl.updEntCollegeProjs(rowChage.getSql());
}
cdto.setTblName(entry.getHeader().getTableName());
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
jsonSql = redisDelete(rowData.getBeforeColumnsList(),cdto.getTblName());
updComList.add(jsonSql);
} else if (eventType == EventType.INSERT) {
jsonSql = redisInsert(rowData.getAfterColumnsList(),cdto.getTblName());
updComList.add(jsonSql);
} else if(eventType == EventType.UPDATE){
jsonSql = redisUpdate(rowData.getAfterColumnsList(),cdto.getTblName());
updComList.add(jsonSql);
} else {
cdto.setOpr("OTHER");
cdto.setJsonData("");
}
}
}
if(ddlList.size()>0){
canalSampleEntNewServiceImpl.updEntNewCollegeProjs(ddlList);
}
if(updComList.size()>0){
canalSampleEntNewServiceImpl.updEntNewCollegeProjs(updComList);
}
}
/**
* 方法说明:Insert操作 . <BR>
* @see com.sunlands.eagle.canal.sample.CanalEntryCommonComProduct <BR>
* @param columns
* @param table
* @return
* @return: String
* @Datetime:2017年3月15日 下午4:44:51 <BR>
*/
private String redisInsert( List<Column> columns,String table){
StringBuffer sqlStr = new StringBuffer();
StringBuffer valuestr = new StringBuffer();
String jsonSql = "";
for (Column column : columns) {
sqlStr.append("`").append(column.getName()).append("`,");
//Null值处理
if(column.getIsNull()){
valuestr.append("NULL,");
continue;
}
//手机号
if("mobile".equals(column.getName()) || "phone".equals(column.getName()) ||"urgent_mobile".equals(column.getName()) ||"home_phone".equals(column.getName()) ||"office_phone".equals(column.getName()) || "user_mobile".equals(column.getName())){
// valuestr.append("'").append(column.getValue().substring(0, column.getValue().length()-3)).append("**',");
valuestr.append("CONCAT(left('").append(column.getValue()).append("',9),'**'),");
}else if("qq_code".equals(column.getName()) || "password".equals(column.getName()) ||"remark".equals(column.getName()) ){ //邮箱
valuestr.append("NULL,");
// }else if("email".equals(column.getName()) ){ //邮箱
// valuestr.append("'',");
}else if("cert_no".equals(column.getName()) ){//身份证号
valuestr.append("CONCAT(left('").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',3),'**',substring('").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',6,11),'**'),");
// valuestr.append("'").append(column.getValue().substring(0, 2)).append("**").append(column.getValue().substring(5,column.getValue().length()-3)).append("**',");
}else if("bank_account".equals(column.getName()) ||"bankSubBranch".equals(column.getName()) ){//银行卡号
valuestr.append("'-',");
}else{
//特殊符号处理
if(column.getValue().indexOf("\\")!=-1){
valuestr.append("'").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',");
}else if(column.getValue().indexOf("'")!=-1 ){
valuestr.append("\"").append(column.getValue().replace("\"", "\\\"").replace("\'", "\\\'")).append("\",");
}else{
valuestr.append("'").append(column.getValue().replace("\"", "\\\"").replace("\'", "\\\'")).append("',");
}
}
}
if(columns.size()>0){
jsonSql = CommonProductUtil.insertSQL(sqlStr.toString(), valuestr.toString(),table);
}
return jsonSql;
}
/**
* 方法说明: Update操作. <BR>
* @see com.sunlands.eagle.canal.sample.CanalEntryCEntNewonComProduct <BR>
* @param columns
* @param table
* @return
* @return: String
* @Datetime:2017年3月15日 下午4:45:00 <BR>
*/
private String redisUpdate( List<Column> columns,String table){
StringBuffer valuestr = new StringBuffer();
StringBuffer key = new StringBuffer();
boolean flag = false;
String jsonSql = "";
for (Column column : columns) {
//Null值处理
if(column.getIsNull()){
valuestr.append("`").append(column.getName()).append("`=NULL,");
continue;
}
//手机号
if("mobile".equals(column.getName()) || "phone".equals(column.getName()) ||"urgent_mobile".equals(column.getName()) ||"home_phone".equals(column.getName()) ||"office_phone".equals(column.getName()) ){
valuestr.append(column.getName()).append("=CONCAT(left('").append(column.getValue()).append("',9),'**'),");
}else if("qq_code".equals(column.getName()) || "password".equals(column.getName()) ||"remark".equals(column.getName()) ){
valuestr.append(column.getName()).append("=NULL,");
// }else if("email".equals(column.getName()) ){
// valuestr.append(column.getName()).append("='',");
}else if("cert_no".equals(column.getName()) ){//身份证号
valuestr.append(column.getName()).append("=CONCAT(left('").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',3),'**',substring('").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',6,11),'**'),");
}else if("bank_account".equals(column.getName()) ||"bankSubBranch".equals(column.getName()) ){//银行卡号
valuestr.append(column.getName()).append("='-',");
}else{
//特殊字符处理
if(column.getValue().indexOf("\\")!=-1){
valuestr.append("`").append(column.getName()).append("`='").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',");
}else if(column.getValue().indexOf("'")!=-1){
valuestr.append("`").append(column.getName()).append("`=\"").append(column.getValue().replace("\"", "\\\"").replace("\'", "\\\'")).append("\",");
}else{
valuestr.append("`").append(column.getName()).append("`='").append(column.getValue().replace("\'", "\\\'").replace("\'", "\\\'")).append("',");
}
}
//主键或Id
if("id".equals(column.getName())){
flag = true;
key.append("id='").append(column.getValue()).append("'");
}else if(column.getIsKey()){
flag = true;
if(key.length()>0){
key.append(" and ").append("`").append(column.getName()).append("`='").append(column.getValue()).append("'");
}else{
key.append("`").append(column.getName()).append("`='").append(column.getValue()).append("'");
}
}else if("name".equals(column.getName()) && !flag){
if(key.length()>0){
key.append(" and `name`='").append(column.getValue()).append("'");
}else{
key.append("`name`='").append(column.getValue()).append("'");
}
}
}
if(columns.size()>0){
if(key.length()==0){
key.append("1=-1");
}
jsonSql = CommonProductUtil.updateSQL(valuestr.toString(), key.toString(),table);
}
return jsonSql;
}
/**
* 方法说明: delete删除操作. <BR>
* @see com.sunlands.eagle.canal.sample.CanalEntryCommonComProduct <BR>
* @param columns
* @param table
* @return
* @return: String
* @Datetime:2017年3月15日 下午4:45:10 <BR>
*/
private String redisDelete( List<Column> columns,String table){
StringBuffer valuestr = new StringBuffer();
StringBuffer key = new StringBuffer();
boolean flag = false;
String jsonSql = "";
for (Column column : columns) {
//主键或Id
if("id".equals(column.getName())){
flag = true;
key.append("id=").append(column.getValue()).append("");
}else if(column.getIsKey()){
flag = true;
if(key.length()>0){
key.append(" and ").append(column.getName()).append("='").append(column.getValue()).append("'");
}else{
key.append(column.getName()).append("='").append(column.getValue()).append("'");
}
}else if("name".equals(column.getName()) && !flag){
if(key.length()>0){
key.append(" and `name`='").append(column.getValue()).append("'");
}else{
key.append("`name`='").append(column.getValue()).append("'");
}
}
}
if(columns.size()>0){
if(key.length()==0){
key.append("1=-1");
}
jsonSql = CommonProductUtil.deleteSQL(valuestr.toString(), key.toString(),table);
}
return jsonSql;
}
public CanalSampleEntNewService getCanalSampleEntNewServiceImpl() {
return canalSampleEntNewServiceImpl;
}
public void setCanalSampleEntNewServiceImpl(
CanalSampleEntNewService canalSampleEntNewServiceImpl) {
this.canalSampleEntNewServiceImpl = canalSampleEntNewServiceImpl;
}
}
相关推荐
Canal采用生产者-消费者模式设计,主要由三部分组成:Server端(canal-server)、Client端(canal-client)和适配器(canal-adapter)。Server端负责监听和捕获MySQL的binlog事件,Client端用于订阅和获取数据变更...
client 介绍 canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件 使用该客户端前请先了解canal, canal 自身提供了简单的客户端,如果要转换为数据库的实体对象,处理消费数据要每次进行对象转换。 该客户端直接...
它主要由三大部分组成:canal-server、canal-client以及canal-adapter。 1. **canal-server**:这是Canal的核心组件,负责连接数据库,读取binlog,解析binlog事件,并将事件推送给客户端。Canal-server采用了基于...
Canal的架构主要由三部分组成:Server、Client以及中间的消息传输层。Server端负责连接MySQL服务器,读取binlog并解析;Client端则作为消费者,订阅Server推送的binlog事件。消息传输层可以是简单的TCP连接,也可以...
canal-client 项目介绍 canal客户端、解析binlog日志将数据从mysql过渡到rabbitmq和mongodb中 软件架构 软件架构说明 安装教程 下载最新版本的canal服务端, 解压缩 mkdir /tmp/canal tar zxvf canal.deployer-$...
【canal-canal-1.0.22_源码】是阿里巴巴开源的数据库实时增量数据订阅与同步工具,主要用于实现MySQL到其他数据库或者数据存储系统的实时数据迁移、同步,常用于构建数据仓库、日志收集等场景。Canal的核心功能是...
canal-python 一.canal-python 简介 canal-python 是阿里巴巴开源项目 是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql...
- Client:Canal Client作为消费者,订阅并消费Canal Server发布的binlog事件。 - Adapter:Canal Adapter是连接Canal Server和目标系统的桥梁,它将Canal Server解析出的binlog事件转换为目标系统能够理解的数据...
阿里巴巴的canal的1.1.4版本的安装包。里面包含了canal.admin-1.1.4.tar.gz、canal.deployer-1.1.4.tar.gz、canal.example-1.1.4.tar.gz、canal-canal-1.1.4.tar.gz、canal-canal-1.1.4.zip
Canal主要由三部分组成:canal-server、canal-client以及canal-admin。canal-server负责数据的抓取、解析和分发;canal-client是消费者,可以订阅canal-server发布出来的数据;canal-admin则提供了图形化管理界面,...
2. Canal Client:Canal客户端,如`canal-spring-boot-starter`,连接Canal Server,订阅并消费binlog事件。 3. Canal Destination:Canal实例,代表一个数据源,通常对应一个MySQL实例。 4. Canal Instance:Canal...
3. **Canal Client**:客户端连接到Canal Server,订阅感兴趣的数据库和表,接收到变更事件后进行相应的处理,如写入消息队列、触发业务逻辑等。 4. **Spring Boot集成**:Spring Boot Starter Canal Master使得在...
client-adapter.es7x-1.1.5-jar-with-dependencies、canal-glue-core
binlog开源同步组件canal部署包 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行...
3. **canal.deployer-1.1.5.tar.gz**:这是Canal的核心部署模块,包含Canal Server和Canal Client。Canal Server用于捕获MySQL的binlog事件,并将其分发给Canal Client。Canal Client可以订阅并消费这些事件,实现...
2. Canal Client:客户端,也称为消费者,可以订阅Canal Server推送的数据库变更事件,通常用于数据同步或数据分析。 3. Deployer:部署器,Canal的管理组件,负责管理和调度Canal Server实例,以及相关的配置管理...
Canal安装与使用教程 Canal 是一个基于 MySQL 数据库增量日志解析的工具,提供增量数据订阅和...首先,关闭 canal-server(canal.deployer),canal-client(canal.adapter)服务,然后修改 example instance 的配置。
- **Canal Client**:客户端,用于连接Canal Server,订阅并消费数据库变更事件。 - **Adaptor**:适配器,将Canal抓取到的数据库变更事件转换为特定格式,便于下游系统消费,如RocketMQ、Kafka等消息中间件。 3....
Canal 是mysql数据库binlog的增量订阅&消费组件。 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息 关于 ...
1. **bin**:这是存放可执行脚本的目录,包括启动、停止Canal服务的命令,如`canal-server.sh`、`canal-client.sh`等,方便用户管理和操作Canal实例。 2. **conf**:配置文件目录,内含Canal实例的配置文件,如`...