`

【canal】canal hello world

阅读更多
具体参考:
https://github.com/alibaba/canal/wiki/QuickStart

准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
启动

官方指导的配置是有问题的,启动canal-server会报以下错误:
^@2020-06-29 08:27:52.320 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /127.0.0.1:3306 has an error, retrying. caused by
java.io.IOException: connect /127.0.0.1:3306 failure:java.io.IOException: Unexpected End Stream


log-bin=【有权限的绝对路径】 # 开启 binlog
重启mysql后生效




下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.0.17 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
解压缩

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal
解压完成后,进入 /tmp/canal 目录,可以看到如下结构
drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs
配置修改

vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal 
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
启动

sh bin/startup.sh
查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
关闭

sh bin/stop.sh


客户端:
package com.alibaba.test.steed.utils;

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
//        11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}


客户端消费:
empty count : 113
empty count : 114
================&gt; binlog[mysql-bin.000001:382] , name[example,tb_user] , eventType : INSERT
id : 1    update=false
aid : 1    update=false
name : 1111    update=false
passwd :     update=false
telephone :     update=false
icon :     update=false
email :     update=false
title :     update=false
status : 0    update=false
create_time : 2000-01-01 00:00:00    update=false
update_time : 2020-06-29 08:32:52    update=false
empty count : 1
empty count : 2
分享到:
评论

相关推荐

    课程设计——回文判别

    - 非回文:`"hello world"` 在实际编写代码时,可以考虑使用Python、Java、C++等常见编程语言。每种语言都有其特定的语法和库函数,但基本的逻辑和数据结构概念是相通的。完成程序后,记得进行充分的测试和调试,以...

    Alibaba_Java_Coding_Guidelines-2.2.3.0x.zip

    Alibaba_Java_Coding_Guidelines-2.2.3.0x

    【ABB机器人】-IRB460机器人维护信息V1.pdf

    【ABB机器人】-IRB460机器人维护信息V1.pdf

    新能源汽车VCU控制器全开源:从代码到硬件设计的全面解析

    内容概要:本文详细介绍了新能源汽车VCU(车辆控制单元)控制器的开源项目,涵盖从应用层代码到底层代码、原理图、PCB设计、通信协议及控制策略等多个方面。应用层代码展示了如何根据电池电量调整车辆行驶模式,底层代码涉及硬件驱动如GPIO控制和ADC采样配置。硬件设计部分包括详细的原理图和PCB布局,确保系统的稳定性和可靠性。通信协议采用CAN网络,确保数据可靠传输,控制策略则涵盖了能量回收、扭矩控制等关键技术。丰富的文档资料和测试用例为开发人员提供了宝贵的学习和开发资源。 适合人群:新能源汽车开发人员、硬件工程师、嵌入式软件工程师、学生及研究人员。 使用场景及目标:帮助开发人员深入了解新能源汽车VCU控制器的工作原理和技术细节,加速项目开发进程,降低开发难度。无论是初学者还是有经验的专业人士,都可以从中受益。 其他说明:该项目不仅提供了完整的源代码和硬件设计文件,还包括详细的测试用例和故障处理方案,使得VCU开发变得更加透明和可复现。

    详解DeepSeek的十个安全问题.pdf

    详解DeepSeek的十个安全问题.pdf

    《网络传播技术与实务》第10章-握在手中的网络——移动通信与无线网络技术.ppt

    《网络传播技术与实务》第10章-握在手中的网络——移动通信与无线网络技术.ppt

    《计算机专业英语》chapter9-Communication-by-Avatars.ppt

    《计算机专业英语》chapter9-Communication-by-Avatars.ppt

    Xrunner的使用手册

    性能测试工具Xrunner的使用手册

    基于自抗扰控制(ADRC)的永磁同步电机(PMSM)矢量控制调速系统仿真研究与实现

    内容概要:本文深入探讨了基于自抗扰控制(ADRC)的永磁同步电机(PMSM)矢量控制调速系统的仿真方法及其优势。首先介绍了模型搭建,包括DC直流电压源、三相逆变器、永磁同步电机、采样模块、Clark、Park、Ipark以及SVPWM等关键组件。接着详细解析了ADRC在电流环和转速环中的应用,展示了其通过扩张状态观测器(ESO)实现的高精度扰动观测与补偿机制。文中还提供了部分MATLAB代码示例,如SVPWM模块和ADRC控制器的具体实现。仿真结果显示,ADRC相比传统PI控制器,在突加负载时表现出更好的稳定性和更快的响应速度,且不存在积分饱和问题。此外,文章讨论了一些实际应用中的注意事项和技术挑战。 适合人群:从事电机控制领域的研究人员、工程师及高校相关专业师生。 使用场景及目标:适用于希望深入了解和掌握现代先进电机控制技术的研究人员和工程师。目标是通过仿真平台验证ADRC的有效性,并为实际工程项目提供理论支持和技术指导。 其他说明:尽管ADRC具有诸多优点,但在实际应用中仍需注意参数选择和硬件条件限制等问题。

    《网络设备安装与调试(锐捷版)》项目1-配置交换机设备-优化网络传输.pptx

    《网络设备安装与调试(锐捷版)》项目1-配置交换机设备-优化网络传输.pptx

    ABAQUS UMAT/VUMAT子程序二次开发:基于Fortran实现材料损伤断裂弹塑性建模

    内容概要:本文详细介绍了如何使用Fortran语言在ABAQUS中开发UMAT(用户材料子程序)和VUMAT(显式用户材料子程序),以实现材料损伤断裂弹塑性的自定义建模。文章首先阐述了材料损伤断裂弹塑性的重要性和应用场景,强调了自定义材料子程序在处理复杂材料行为方面的优势。接着,分别展示了UMAT和VUMAT的基本代码结构及其核心计算步骤,如材料参数读取、弹性刚度矩阵初始化、塑性应变增量计算以及应力更新等。此外,还讨论了DISP模型的应用,提供了具体的损伤演化和应力折减方法,并分享了一些实用的调试技巧和注意事项。 适合人群:具备一定ABAQUS使用经验和Fortran编程基础的研究人员和技术人员,尤其是从事材料力学、结构工程等领域的工作人士。 使用场景及目标:适用于需要对特定材料进行精确建模的工程项目,如航空航天、土木建筑等。通过自定义UMAT和VUMAT子程序,能够更好地模拟材料在复杂载荷条件下的损伤演化与断裂过程,提高结构安全性和可靠性评估的准确性。 其他说明:文中不仅提供了详细的代码示例,还分享了许多实践经验,帮助开发者避免常见错误并优化性能。同时提醒读者关注材料参数的正确配置、雅可比矩阵的对称性等问题,确保计算稳定可靠。

    V1_3_example.ipynb

    V1_3_example.ipynb

    安川机器人DX100操作要领书 通用-搬运用途-E.0.pdf

    安川机器人DX100操作要领书 通用-搬运用途-E.0.pdf

    【java毕业设计】SpringBoot+Vue图书馆(图书借阅)管理系统 源码+sql脚本+论文 完整版

    这个是完整源码 SpringBoot + vue 实现 【java毕业设计】SpringBoot+Vue图书馆(图书借阅)管理系统 源码+sql脚本+论文 完整版 数据库是mysql 随着社会的发展,计算机的优势和普及使得阿博图书馆管理系统的开发成为必需。阿博图书馆管理系统主要是借助计算机,通过对图书借阅等信息进行管理。减少管理员的工作,作,同时也方便广大用户对所需图书借阅信息的及时查询以及管理。 阿博图书馆管理系统的开发过程中,采用B / S架构,主要使用Java技术进行开发,结合最新流行的springboot框架。使用Mysql数据库和Eclipse开发环境。该阿博图书馆馆管理系统的开发过程中,采用B / S架构,主要使用Java技术进行开发,结合最新流行的spri管理系统包括用户和管理员。其主要功能包括管理员:首页、个人中心、用户管理、图书分类管理、图书信息管理、图书借阅管理、图书归还管理、缴纳罚金管理、留言板管理、系同时也方便广大用户对所需图书借阅信息的及时查询以及管理。 阿博图书馆管理系统的开发过程中,采用B / S架构,主要使用Java技术进行开发,结合最新流行的springboot框架。使用Mysql数据库和Eclipse开发环境。该阿博图书馆管理系统包括用户和管理员。其主要功能包括管理员:首页、个人中心、用户管理、图书分类管理、图书信息管理、图书借阅管理、图书归还管理、缴纳罚金管理、留言板管理、系统管理,用户:首页、个人中心、图书借阅管理、图书归还管理、缴纳罚金管理、我的收藏管理,前台首页;首页、图书信息、公告信息、留言反馈、个人中心、后台管理等功能。 本论文对阿博图书馆管理系统的发展背景进行详细的介绍,并且对系统开发技术进行介绍,然后对系统进行需求分析,对阿博图书馆管理系统业务流程、系统结构以及数据都进行详细说明。用户可根据关键字进行查找自己想要的信息等。

    基于YALMIP与MATLAB的微电网优化调度模型:新手友好型学习教程

    内容概要:本文详细介绍了一个基于YALMIP和MATLAB的微电网优化调度模型,旨在帮助新手理解和应用微电网优化调度的基本概念和技术。模型综合考虑了蓄电池管理、市场购电售电约束以及功率平衡等因素,以实现系统总费用最低为目标。文中提供了详细的MATLAB代码示例,涵盖变量定义、约束条件建立、目标函数设定及优化求解过程,并附带了调试建议和可视化方法。此外,还讨论了一些常见的错误及其解决办法,如充放电互斥约束、功率平衡约束等。 适合人群:对微电网优化调度感兴趣的初学者,尤其是有一定MATLAB基础的学生或研究人员。 使用场景及目标:适用于希望快速掌握微电网优化调度基本原理的学习者,通过动手实践加深对相关理论的理解。具体应用场景包括但不限于:学术研究、课程作业、个人兴趣项目等。 其他说明:该模型不仅有助于理解微电网的工作机制,还可以为进一步探索复杂的微电网优化问题奠定坚实的基础。

    基于MATLAB的CNN多输入多输出预测模型构建与应用

    内容概要:本文详细介绍了如何利用MATLAB搭建卷积神经网络(CNN),用于处理具有10个输入特征和3个输出变量的数据预测任务。首先进行数据预处理,包括数据读取、归一化以及训练集和测试集的划分。接着设计了一个包含多个卷积层、批量归一化层、ReLU激活函数层和全连接层的网络架构,确保能够有效提取特征并完成多输出预测。训练过程中采用Adam优化算法,并设置了合理的超参数如最大迭代次数、批次大小和初始学习率等。最终通过预测和反归一化步骤得到模型性能评价指标MAE和R²,展示了良好的预测效果。 适合人群:具有一定MATLAB编程基础和技术背景的研究人员或工程师,尤其是那些从事数据分析、机器学习领域的专业人士。 使用场景及目标:适用于需要解决多输入多输出预测问题的实际项目中,比如工业生产过程监控、设备故障诊断等领域。目的是帮助用户掌握使用MATLAB实现CNN的方法论,从而提高工作效率和解决问题的能力。 其他说明:文中提供了完整的代码片段供读者参考实践,同时针对可能出现的问题给出了实用性的建议,如调整批量大小、降低学习率等方法来应对训练不稳定的情况。此外还提到了一些改进方向,例如改变卷积核尺寸或者引入空洞卷积以增强模型表现。

    机器人概要(外形图、目录的阅读方法)20120428.ppt

    机器人概要(外形图、目录的阅读方法)20120428.ppt

    《计算机程序设计(C语言)》第7章-第2节-函数的定义.ppt

    《计算机程序设计(C语言)》第7章-第2节-函数的定义.ppt

    《网络工程设计与项目实训》02-交换机及其基本配置.ppt

    《网络工程设计与项目实训》02-交换机及其基本配置.ppt

    【微服务架构】Nacos Client服务注册与配置管理:Spring Cloud应用服务发现与配置中心集成指南

    内容概要:本文档详细介绍了将服务迁移到Nacos注册与配置中心的具体步骤,包括pom文件中依赖包的更新、启动类注解的添加以及详细的nacos客户端和服务配置文件设置。在pom文件中,需要移除旧的服务发现工具(如Eureka)相关依赖并引入特定版本的nacos-client及相关starter组件,确保springboot版本不低于2.2.3。启动类需添加`@EnableDiscoveryClient`注解以启用服务发现功能。配置文件中,明确指定了服务的基本信息(如端口、应用名称)、nacos服务器地址、命名空间、分组等关键参数,并强调了配置文件格式为YAML的重要性。对于已存在的服务,仅需完成前三个步骤,而对于新的服务,则还需进行配置文件的导入工作。 适合人群:对微服务架构有一定了解,特别是正在考虑或已经决定从其他服务发现工具迁移至Nacos的企业级开发者或运维人员。 使用场景及目标:①帮助团队将现有基于其他服务发现机制的应用程序平滑迁移到Nacos平台;②确保新开发的服务能够正确地注册到Nacos并使用其提供的配置管理功能;③通过合理的配置减少服务间的耦合度,提高系统的可维护性和扩展性。 阅读建议:由于涉及到具体的版本号和配置细节,在实际操作过程中应严格按照文档指导执行,同时关注官方最新动态,确保所使用的版本是最稳定且符合项目需求的。此外,建议在非生产环境中先行测试,验证配置无误后再推广到生产环境。

Global site tag (gtag.js) - Google Analytics