`
angelmm1234567
  • 浏览: 1261 次
  • 来自: 北京
最近访客 更多访客>>
社区版块
存档分类
最新评论

Canal-client

 
阅读更多
/**
* ProjectName:etl-canal-war<BR>
* File name:  EntCanalClientController.java     <BR>
* Project:etl-canal-war    <BR>
* Version: v 1.0      <BR>
* Date: 2017年3月15日 下午3:04:32 <BR>
* Description:     <BR>
* Function List:  <BR>
*/
package com.sunlands.eagle.canal.sample.product;
import java.net.InetSocketAddress;
//import java.util.Date;
import java.util.ResourceBundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.sunlands.eagle.sms.SendSmsUtilService;
/**
* 功能描述: .  <BR>
* 历史版本: <Br>
* 时间:2017年3月15日 下午3:04:32  <BR>
* 变更原因:    <BR>
* 变化内容 :<BR>
* 首次开发时间:2017年3月15日 下午3:04:32 <BR>
* 描述:   <BR>
* 版本:V1.0
*/
public class EntNewCanalClientProductController {
    
    private static final Logger logger = LoggerFactory
            .getLogger(EntNewCanalClientProductController.class);
    
    @Autowired
    private CanalEntryCommonEntNewProduct entry;
    /**
     * 方法说明:新模式调用初方法 . <BR>
     * @see com.sunlands.eagle.canal.sample.EntNewCanalClientController <BR>
     * @return: void
     * @Datetime:2017年3月15日 下午3:05:46 <BR>
     */
    public void entNew_canalInit() {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    logger.info("entNew_CanalClient---接口正常");
                    entNew_process();
                    sendSms("202entNew_CanalClient---接口正常");
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    logger.info("202entNew_CanalClient---接口异常"+e.getMessage());
                     try {    
                         sendSms("202entNew_CanalClient企业APP-新企业家-产品---接口异常");
                           Thread.sleep(60000);   
                           entNew_canalInit();
                           
                       } catch (InterruptedException e1) {    
                           e1.printStackTrace();   
                       }    
                    
                }
            }
            
        };
        new Thread(task).start();
    }
    
    private void sendSms(String message) {
        SendSmsUtilService service = SendSmsUtilService
                .getInstance();
        String defaultReceivers = ResourceBundle.getBundle(
                "sms").getString("defaultReceivers");
        String[] mobiles = defaultReceivers.split(",");
        for (String mobile : mobiles) {
            service.sendMessageSimple(mobile, message
                    + System.currentTimeMillis());
        }
    }
    /**
     * 方法说明: 新模式连接Canal-Server. <BR>
     * @see com.sunlands.eagle.canal.sample.EntNewCanalClientController <BR>
     * @return: void
     * @Datetime:2017年3月15日 下午3:07:00 <BR>
     */
    public void entNew_process() {
           // 创建链接   Canal-Server
                 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP",  
                         11116), "dbName", "username", paseword");  
                   int batchSize = 1000;    
                   try {    
                       connector.connect();  
                       //新模式-抽取指定的表
                       connector.subscribe();
                       connector.rollback();      
                       while (true) {    
                           Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据  
                           long batchId = message.getId();    
                           int size = message.getEntries().size();    
                           if (batchId == -1 || size == 0) {    
                               try {    
                                   Thread.sleep(1000);    
                               } catch (InterruptedException e) {    
                                   e.printStackTrace();    
                               }    
                           } else {    
                               try {
                                entry.printEntry(message.getEntries());
                            } catch (Exception e) {
                                // TODO Auto-generated catch block
                                connector.rollback(batchId); // 处理失败, 回滚数据  
                                 sendSms("202entNew_CanalClient---接口异常");
                                e.printStackTrace();
                            }    
                           }    
               
                           connector.ack(batchId); // 提交确认    
                           // connector.rollback(batchId); // 处理失败, 回滚数据    
                       }    
               
                   } finally {    
                       connector.disconnect();    
                   }   
        }
    
}
 
 
 
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
/**
* ProjectName:etl-canal-war<BR>
* File name:  CanalEntryCommon.java     <BR>
* Project:etl-canal-war    <BR>
* Version: v 1.0      <BR>
* Date: 2017年3月15日 下午3:50:39 <BR>
* Description:     <BR>
* Function List:  <BR>
*/
package com.sunlands.eagle.canal.sample.product;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.sunlands.eagle.canal.common.CommonProductUtil;
import com.sunlands.eagle.dmo.ClientDTO;
import com.sunlands.eagle.entNew.biz.CanalSampleEntNewService;
/**
* 功能描述:  公共方法.  <BR>
* 历史版本: <Br>
* 时间:2017年3月15日 下午3:50:39  <BR>
* 变更原因:    <BR>
* 变化内容 :<BR>
* 首次开发时间:2017年3月15日 下午3:50:39 <BR>
* 描述:   <BR>
* 版本:V1.0
*/
@Controller
public class CanalEntryCommonEntNewProduct {
    
    private static final Logger logger = LoggerFactory
            .getLogger(CanalEntryCommonEntNewProduct.class);
    
    @Autowired
    private CanalSampleEntNewService canalSampleEntNewServiceImpl;
    
    /**
     * 方法说明:数据处理 . <BR>
     * @see com.sunlands.eagle.canal.sample.CanalEntNewryCommonComProduct <BR>
     * @param entrys
     * @return
     * @return: ClientDTO
     * @Datetime:2017年3月15日 下午3:51:52 <BR>
     */
    public  void printEntry( List<Entry> entrys) throws Exception{  
        List<String> ddlList =  new ArrayList<String>();
          List<String> updComList =  new ArrayList<String>();
           ClientDTO cdto = new ClientDTO();
           String jsonSql = "";
           for (Entry entry : entrys) {    
               if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN && entry.getEntryType() == EntryType.TRANSACTIONEND) {    
                   continue;    
               }    
       
               RowChange rowChage = null;    
               try {    
                   rowChage = RowChange.parseFrom(entry.getStoreValue());    
               } catch (Exception e) {    
                   logger.error("ERROR "+entry.toString());
                   throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),    
                           e);
               }    
               EventType eventType = rowChage.getEventType();    
              //eventType == EventType.QUERY ||
               if(rowChage.getIsDdl()){
                   logger.info("DDL---------->"+rowChage.getSql());
                   ddlList.add(rowChage.getSql().replace("`ent_portal_prod`.", "`inner_ent_portal_prod`."));
//                   if(rowChage.getSql().indexOf("FOREIGN KEY")!=-1){
//                   }else{
//                       
//                   }
//                   canalSampleServiceImpl.updEntCollegeProjs(rowChage.getSql());
               }
               cdto.setTblName(entry.getHeader().getTableName());
               for (RowData rowData : rowChage.getRowDatasList()) {
                   if (eventType == EventType.DELETE) {    
                       jsonSql = redisDelete(rowData.getBeforeColumnsList(),cdto.getTblName());
                       updComList.add(jsonSql);
                   } else if (eventType == EventType.INSERT) {   
                       jsonSql = redisInsert(rowData.getAfterColumnsList(),cdto.getTblName());   
                       updComList.add(jsonSql);
                   } else if(eventType == EventType.UPDATE){
                       jsonSql = redisUpdate(rowData.getAfterColumnsList(),cdto.getTblName());
                       updComList.add(jsonSql);
                   } else {    
                       cdto.setOpr("OTHER");
                       cdto.setJsonData("");
                   }  
                   
               }    
               
           }  
           if(ddlList.size()>0){
               canalSampleEntNewServiceImpl.updEntNewCollegeProjs(ddlList);
           }
           if(updComList.size()>0){
               canalSampleEntNewServiceImpl.updEntNewCollegeProjs(updComList);
           }
       }
    
    
    /**
     * 方法说明:Insert操作 . <BR>
     * @see com.sunlands.eagle.canal.sample.CanalEntryCommonComProduct <BR>
     * @param columns
     * @param table
     * @return
     * @return: String
     * @Datetime:2017年3月15日 下午4:44:51 <BR>
     */
    private  String redisInsert( List<Column> columns,String table){
        StringBuffer sqlStr = new StringBuffer();
        StringBuffer valuestr = new StringBuffer();
        String jsonSql = "";
        for (Column column : columns) {
          
            sqlStr.append("`").append(column.getName()).append("`,");
          //Null值处理
            if(column.getIsNull()){
                valuestr.append("NULL,");
                continue;
            }
          //手机号
            if("mobile".equals(column.getName()) || "phone".equals(column.getName()) ||"urgent_mobile".equals(column.getName()) ||"home_phone".equals(column.getName()) ||"office_phone".equals(column.getName()) || "user_mobile".equals(column.getName())){
//                valuestr.append("'").append(column.getValue().substring(0, column.getValue().length()-3)).append("**',");
                valuestr.append("CONCAT(left('").append(column.getValue()).append("',9),'**'),");
            }else if("qq_code".equals(column.getName()) || "password".equals(column.getName()) ||"remark".equals(column.getName()) ){ //邮箱
              valuestr.append("NULL,");
//            }else if("email".equals(column.getName()) ){ //邮箱
//                valuestr.append("'',");
           }else if("cert_no".equals(column.getName()) ){//身份证号
              valuestr.append("CONCAT(left('").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',3),'**',substring('").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',6,11),'**'),");
//              valuestr.append("'").append(column.getValue().substring(0, 2)).append("**").append(column.getValue().substring(5,column.getValue().length()-3)).append("**',");
           }else if("bank_account".equals(column.getName()) ||"bankSubBranch".equals(column.getName()) ){//银行卡号
              valuestr.append("'-',");
           }else{
            //特殊符号处理
            if(column.getValue().indexOf("\\")!=-1){
                valuestr.append("'").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',");
            }else  if(column.getValue().indexOf("'")!=-1 ){
                valuestr.append("\"").append(column.getValue().replace("\"", "\\\"").replace("\'", "\\\'")).append("\",");
            }else{
                valuestr.append("'").append(column.getValue().replace("\"", "\\\"").replace("\'", "\\\'")).append("',");
            }
         }
        }
       
        if(columns.size()>0){  
            jsonSql =  CommonProductUtil.insertSQL(sqlStr.toString(), valuestr.toString(),table);
            
        }  
        return jsonSql;
     }  
    
      
    /**
     * 方法说明: Update操作. <BR>
     * @see com.sunlands.eagle.canal.sample.CanalEntryCEntNewonComProduct <BR>
     * @param columns
     * @param table
     * @return
     * @return: String
     * @Datetime:2017年3月15日 下午4:45:00 <BR>
     */
    private   String redisUpdate( List<Column> columns,String table){  
        StringBuffer valuestr = new StringBuffer();
        StringBuffer key = new StringBuffer();
        boolean flag = false;
        String jsonSql = "";
        for (Column column : columns) {  
          //Null值处理
            if(column.getIsNull()){
                valuestr.append("`").append(column.getName()).append("`=NULL,");
                continue;
            }
            //手机号
            if("mobile".equals(column.getName()) || "phone".equals(column.getName()) ||"urgent_mobile".equals(column.getName()) ||"home_phone".equals(column.getName()) ||"office_phone".equals(column.getName()) ){
              valuestr.append(column.getName()).append("=CONCAT(left('").append(column.getValue()).append("',9),'**'),");
           }else if("qq_code".equals(column.getName()) || "password".equals(column.getName()) ||"remark".equals(column.getName()) ){
              valuestr.append(column.getName()).append("=NULL,");
//           }else if("email".equals(column.getName()) ){
//               valuestr.append(column.getName()).append("='',");
           }else if("cert_no".equals(column.getName()) ){//身份证号
              valuestr.append(column.getName()).append("=CONCAT(left('").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',3),'**',substring('").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',6,11),'**'),");
           }else if("bank_account".equals(column.getName()) ||"bankSubBranch".equals(column.getName()) ){//银行卡号
               valuestr.append(column.getName()).append("='-',");
            }else{
            //特殊字符处理
            if(column.getValue().indexOf("\\")!=-1){
                valuestr.append("`").append(column.getName()).append("`='").append(column.getValue().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"")).append("',");
            }else if(column.getValue().indexOf("'")!=-1){
                valuestr.append("`").append(column.getName()).append("`=\"").append(column.getValue().replace("\"", "\\\"").replace("\'", "\\\'")).append("\",");
            }else{
                valuestr.append("`").append(column.getName()).append("`='").append(column.getValue().replace("\'", "\\\'").replace("\'", "\\\'")).append("',");
            }
           }
           //主键或Id
            if("id".equals(column.getName())){
                flag = true;
                key.append("id='").append(column.getValue()).append("'");
            }else  if(column.getIsKey()){
                flag = true;
                if(key.length()>0){
                    key.append(" and ").append("`").append(column.getName()).append("`='").append(column.getValue()).append("'");
                }else{
                    key.append("`").append(column.getName()).append("`='").append(column.getValue()).append("'");
                }
           }else  if("name".equals(column.getName()) && !flag){
              if(key.length()>0){
                 key.append(" and `name`='").append(column.getValue()).append("'");
               }else{
                  key.append("`name`='").append(column.getValue()).append("'");
               }
           }
         }   
        if(columns.size()>0){  
            if(key.length()==0){
                key.append("1=-1");
            }
            jsonSql = CommonProductUtil.updateSQL(valuestr.toString(), key.toString(),table);
             
        }  
        return jsonSql;
    }  
  
     /**
     * 方法说明: delete删除操作. <BR>
     * @see com.sunlands.eagle.canal.sample.CanalEntryCommonComProduct <BR>
     * @param columns
     * @param table
     * @return
     * @return: String
     * @Datetime:2017年3月15日 下午4:45:10 <BR>
     */
    private  String redisDelete( List<Column> columns,String table){
               StringBuffer valuestr = new StringBuffer();
              StringBuffer key = new StringBuffer();
              boolean flag = false;
              String jsonSql = "";
            for (Column column : columns) {
                //主键或Id
            if("id".equals(column.getName())){
                flag = true;
                key.append("id=").append(column.getValue()).append("");
            }else if(column.getIsKey()){
                flag = true;
                if(key.length()>0){
                    key.append(" and ").append(column.getName()).append("='").append(column.getValue()).append("'");
                }else{
                    key.append(column.getName()).append("='").append(column.getValue()).append("'");
                }
            }else  if("name".equals(column.getName()) && !flag){
                  if(key.length()>0){
                     key.append(" and `name`='").append(column.getValue()).append("'");
                   }else{
                      key.append("`name`='").append(column.getValue()).append("'");
                   }
               }
             }    
            if(columns.size()>0){  
                if(key.length()==0){
                    key.append("1=-1");
                }
                jsonSql = CommonProductUtil.deleteSQL(valuestr.toString(), key.toString(),table);
                
            }  
            return jsonSql;
     }
    public CanalSampleEntNewService getCanalSampleEntNewServiceImpl() {
        return canalSampleEntNewServiceImpl;
    }
    public void setCanalSampleEntNewServiceImpl(
            CanalSampleEntNewService canalSampleEntNewServiceImpl) {
        this.canalSampleEntNewServiceImpl = canalSampleEntNewServiceImpl;
    }
    
    
}
 
 
 
 
 
 
分享到:
评论

相关推荐

    canal-1.1.5.zip

    Canal采用生产者-消费者模式设计,主要由三部分组成:Server端(canal-server)、Client端(canal-client)和适配器(canal-adapter)。Server端负责监听和捕获MySQL的binlog事件,Client端用于订阅和获取数据变更...

    java看源码看不到-canal-client:springbootcanalstarter易用的canal客户端canalclient

    client 介绍 canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件 使用该客户端前请先了解canal, canal 自身提供了简单的客户端,如果要转换为数据库的实体对象,处理消费数据要每次进行对象转换。 该客户端直接...

    canal-master.zip

    它主要由三大部分组成:canal-server、canal-client以及canal-adapter。 1. **canal-server**:这是Canal的核心组件,负责连接数据库,读取binlog,解析binlog事件,并将事件推送给客户端。Canal-server采用了基于...

    canal-canal-1.1.7.tar.gz

    Canal的架构主要由三部分组成:Server、Client以及中间的消息传输层。Server端负责连接MySQL服务器,读取binlog并解析;Client端则作为消费者,订阅Server推送的binlog事件。消息传输层可以是简单的TCP连接,也可以...

    canal-client:canal客户端、解析binlog日志将数据从mysql过渡到rabbitmq和mongodb中

    canal-client 项目介绍 canal客户端、解析binlog日志将数据从mysql过渡到rabbitmq和mongodb中 软件架构 软件架构说明 安装教程 下载最新版本的canal服务端, 解压缩 mkdir /tmp/canal tar zxvf canal.deployer-$...

    canal-canal-1.0.22_源码

    【canal-canal-1.0.22_源码】是阿里巴巴开源的数据库实时增量数据订阅与同步工具,主要用于实现MySQL到其他数据库或者数据存储系统的实时数据迁移、同步,常用于构建数据仓库、日志收集等场景。Canal的核心功能是...

    canal-python:alibaba canal 客户端(Python3 版本)

    canal-python 一.canal-python 简介 canal-python 是阿里巴巴开源项目 是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 python 客户端。为 python 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql...

    canal-1.1.5(deployer和adapter)

    - Client:Canal Client作为消费者,订阅并消费Canal Server发布的binlog事件。 - Adapter:Canal Adapter是连接Canal Server和目标系统的桥梁,它将Canal Server解析出的binlog事件转换为目标系统能够理解的数据...

    canal1.1.4版本的所有安装包资源

    阿里巴巴的canal的1.1.4版本的安装包。里面包含了canal.admin-1.1.4.tar.gz、canal.deployer-1.1.4.tar.gz、canal.example-1.1.4.tar.gz、canal-canal-1.1.4.tar.gz、canal-canal-1.1.4.zip

    canal.deployer-1.1.4.zip

    Canal主要由三部分组成:canal-server、canal-client以及canal-admin。canal-server负责数据的抓取、解析和分发;canal-client是消费者,可以订阅canal-server发布出来的数据;canal-admin则提供了图形化管理界面,...

    canal-spring-boot-starter:springboot运河支持

    2. Canal Client:Canal客户端,如`canal-spring-boot-starter`,连接Canal Server,订阅并消费binlog事件。 3. Canal Destination:Canal实例,代表一个数据源,通常对应一个MySQL实例。 4. Canal Instance:Canal...

    spring-boot-starter-canal-master.zip

    3. **Canal Client**:客户端连接到Canal Server,订阅感兴趣的数据库和表,接收到变更事件后进行相应的处理,如写入消息队列、触发业务逻辑等。 4. **Spring Boot集成**:Spring Boot Starter Canal Master使得在...

    client-adapter.es7x-1.1.5-jar-with-dependencies、canal-glue-core

    client-adapter.es7x-1.1.5-jar-with-dependencies、canal-glue-core

    binlog开源同步组件canal部署包,版本1.1.4

    binlog开源同步组件canal部署包 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行...

    canal-1.1.5全家桶

    3. **canal.deployer-1.1.5.tar.gz**:这是Canal的核心部署模块,包含Canal Server和Canal Client。Canal Server用于捕获MySQL的binlog事件,并将其分发给Canal Client。Canal Client可以订阅并消费这些事件,实现...

    canal-deployer-1.1.8

    2. Canal Client:客户端,也称为消费者,可以订阅Canal Server推送的数据库变更事件,通常用于数据同步或数据分析。 3. Deployer:部署器,Canal的管理组件,负责管理和调度Canal Server实例,以及相关的配置管理...

    Canal安装与使用教程

    Canal安装与使用教程 Canal 是一个基于 MySQL 数据库增量日志解析的工具,提供增量数据订阅和...首先,关闭 canal-server(canal.deployer),canal-client(canal.adapter)服务,然后修改 example instance 的配置。

    mall-canal-service.zip

    - **Canal Client**:客户端,用于连接Canal Server,订阅并消费数据库变更事件。 - **Adaptor**:适配器,将Canal抓取到的数据库变更事件转换为特定格式,便于下游系统消费,如RocketMQ、Kafka等消息中间件。 3....

    canal-php:Alibaba mysql database binlog incremental subscription & consumer components Canal's php client[阿里巴巴mysql数据库binlog的增量订阅&消费组件 Canal 的 php 客户端 ] https

    Canal 是mysql数据库binlog的增量订阅&消费组件。 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息 关于 ...

    canal.deployer-1.1.3.tar.gz

    1. **bin**:这是存放可执行脚本的目录,包括启动、停止Canal服务的命令,如`canal-server.sh`、`canal-client.sh`等,方便用户管理和操作Canal实例。 2. **conf**:配置文件目录,内含Canal实例的配置文件,如`...

Global site tag (gtag.js) - Google Analytics