`

使用 PDI 和 Oracle CDC 来实现Oracle 数据库向其他数据库的数据同步

阅读更多

本文讲述如何利用 Oracle CDC 和 PDI 来实现 Oracle 数据库向其他数据源的数据同步。

第一节 介绍Oracle CDC,如果您熟悉 Oracle CDC 可以跳过该节。
第二节 说明如何配置 PDI 里的 Oracle CDC 输入和 Oracle CDC 输出 插件。

 

第一节 Oracle CDC 介绍(本节主要内容摘自 OWB 用户手册)

发布和订阅模型
大多数 CDC 系统都需有人来完成数据的捕获和发布,这个人就是发布者。同时也有多个应用程序或人来访问发布的数据,这些应用程序或个人就是订阅者。 CDC 提供了 PL/SQL 包来完成发布和订阅任务。

发布者:
发布者通常是数据库的DBA, 他创建和维护包括了 CDC 系统的模式对象,发布者要和两个数据库进行交互
源数据库(source database):
即生产系统数据库,里面包含了需要发布的数据,相关的表称为源表。
中间数据库(staging database)
中间数据库是实际完成数据捕获的数据。依赖于发布者使用的捕获方式,中间数据库可以是源数据库也可以不是源数据库。中间数据库中保存下面的 CDC 对象。
变更表(change table):
变更表是一个关系表,包含了源表中变化的数据。对于发布者而言,变更表就是数据发布。
变更集(change set)
变更集是一组变更的数据,它们用来保证事务的一致性,包含一个或多个变更表。
变更源(change source)
变更源是对源数据库的一种逻辑表示,它包括一个或多个变更集。
发布者需要完成的任务包括:
决定哪些源数据库和表是订阅者关心的,以及要使用的 CDC 的模式(同步或异步)
使用Oracle 提供的包 DBMS_CDC_PUBLISH 从源表中建立 CDC 系统
通过 GRANT 和 REVOKE 这样的SQL 语句来管理订阅者访问变更表的权限(不过访问者不会直接访问变更表,而是访问视图)。
订阅者:
订阅者是已经发布的并更数据的用户,订阅者需要完成下面的任务:
1. 使用Oracle 提供的包 DBMS_CDC_SUBSCRIBE 来完成
创建订阅:
通知CDC准备接收变更的数据:
通知CDC 完成一组变更数据:
2. 使用 SELECT 语句从订阅者视图中抽取变化的数据。


变更源和数据捕获模型
同步模式:
同步模式使用源数据库触发器来捕获变更的数据。这种方式是实时的没有延迟。当 DML 操作提交后,变更表中就产生了变更数据。
同步模式会给源数据库增加负载,但和数据表比较的方式相比,同步模式可以减少系统负载。
同步变更源 SYNC_SOURCE 是唯一的、预定义的,它代表了源数据库。不能更改和删除。
在同步模式下,变更表必须存在于源数据库本地。

异步模式:
异步模式使用数据库的重做日志(redo log)文件,在源数据库发生变更以后,才进行数据捕获。异步模式依赖于源数据库的补充日志(supplemental logging)的级别。补充日志会给源数据库增添了负载,所以要小心使用异步CDC 模式。
有两种方式来捕获异步变更数据,分别是HotLog和AutoLog:

HotLog:
HotLog是从源数据库的重做日志文件中获得变更的数据。这种方式会有一段时间延迟。
HotLog变更源 HOTLOG_SOURCE 也是唯一的、预定义的,它代表了源数据库的重做日志文件。不能更改和删除。
在HotLog模式下,变更表必须存在于源数据库本地。

AutoLog:
AutoLog是从日志转移服务(Log transport services)管理的一组重做日志文件中获得变更数据。日志转移服务将重做日志文件自动地从源数据库转移到中间数据库。通过使用数据库初始化参数,发 布者可以配置日志转移服务使其能够自动的从源数据库将重做日志文件转移到中间数据库。在中间数据库中,当来了新的重做日志文件,就可以获取到新的变更数 据。因此延迟程度取决于重做日志文件的转移频率。
没有预定义的AutoLog 变更源。发布者提供源数据库信息来创建一个AutoLog 数据源。
在HotLog模式下,变更表可以保存在源数据库本地或远程,一般都是保存在远程。


变更集:
一个变更集是一组变更的数据,它作为一个整体来管理,可以保证事务的一致性。一个变更集只属于一个变更源。一个变更源可以包括多个变更集。从概念上将,变更集和变更源的模式应该相同,也就是说,AutoLog模式的变更集是属于AutoLog变更源的。
当一个发布者将两个或更多的变更表放在同一个变更集中,订阅者可以连接变更集里表而且要保证事务的一致性。三种模式的变更集:
同步:
在预定义好的变更源 SYNC_SOURCE 中,发布者可以定义新的变更集,也可以使用预定义的变更集SYNC_SET。预定义的变更集SYNC_SET不能更新和删除。
异步HotLog:
发布者要在预定义的数据源 HOTLOG_SOURCE 中定义新的数据集。
异步AutoLog
发布者要在用户定义的数据源中定义新的数据集。

总结:
模式 变更源 代表的源数据库 相关的变更集
同步 预定义 SYNC_SOURCE 本地 预定义SYNC_SET 和发布者定义
异步HotLog 预定义 HOTLOG_SOURCE 本地 发布者定义
异步AutoLog Publisher-defined 远程或本地 发布者定义


变更表:
变更表中包含了源表中变更的数据。变更表中的数据分为两部分,一部分是变更数据本身,另一部分是变更表的必要的控制数据。
发布者可以指定源表中的哪些列可以包含在变更表中。一般,发布者会将源表的主键列和订阅者感兴趣的列加入到变更表中。
变更表中还包括了必要的和可选的控制列,可选的控制列由发布者在创建变更表时指定。控制列由 CDC 来管理。

列 数据类型 模式 可选 描述
----------------------------------------------------------------------
OPERATION$ CHAR(2) 所有 否 I: INSERT 操作
UO: UPDATE 操作前的源表镜像
UU: UPDATE 操作前的源表镜像
UN: UPDATE 操作后的源表镜像
D: DELETE 操作

CSCN$ NUMBER 所有 否 事务的提交 SCN 号(SYSTEM CHANGE NUMBER)
RSID$ NUMBER 所有 是 事务内的唯一操作顺序ID,不能跨事务, 必须和CSCN$一起使用
SOURCE_COLMAP$ RAW(128) 同步 是 源表中更新的列的位掩码.
TARGET_COLMAP$ RAW(128) 所有 是 变更表中更新的列的位掩码.
COMMIT_TIMESTAMP$ DATE 所有 否 事务提交时间.
TIMESTAMP$ DATE 所有 是 源数据库中操作发生的时间.
USERNAME$ VARCHAR2(30) 所有 是 源数据库中完成该操作的用户名.
ROW_ID$ ROW_ID 所有 是 源表中行的ROW ID.
XIDUSN$ NUMBER 异步 否 事务 ID undo 片段号.
XIDSLT$ NUMBER 异步 否 事务 ID 槽号.
XIDSEQ$ NUMBER 异步 否 事务 ID 顺序号.
SYS_NC_OID$ RAW(16) 异步 是 对象 ID.

 

说明 1. OPERATION$ 是 CHAR(2) 类型,在查询 I 操作和 D操作时,要注意查询条件是 OPERATION$='I ' 和 OPERATION$='D '
2. 如果要按照数据变更发生的顺序排序,则排序子语句是 ORDER BY CSCN$, RSID$


理解 TARGET_COLMAP$和 SOURCE_COLMAP$

TARGET_COLMAP$ 和 SOURCE_COLMAP$ 用于表示那一列发生了变化. TARGET_COLMAP$ 表示变更表中的哪一列发生了变化. SOURCE_COLMAP$ (只使用在同步变更表中) 表示在源表中哪一列发生了变化.
因为 TARGET_COLMAP$ 和 SOURCE_COLMAP$ 的数据类型是 RAW(128), 每一个列可以保存128字节的二进制信息.


下面详细说一下 同步CDC 的发布和订阅步骤,异步CDC 的发布和订阅步骤请参考 OWB 用户手册

同步CDC 的发布步骤:

第一步 DBA: 设置 AVA_POOL_SIZE 参数
java_pool_size = 50000000
第二步 DBA: 创建发布者并授权 (如果DBA 直接作为发布者,该步骤可以省略)
CREATE USER cdcpub IDENTIFIED BY cdcpub DEFAULT TABLESPACE ts_cdcpub QUOTA UNLIMITED ON SYSTEM QUOTA UNLIMITED ON SYSAUX;
GRANT CREATE SESSION TO cdcpub;
GRANT CREATE TABLE TO cdcpub;
GRANT CREATE TABLESPACE TO cdcpub;
GRANT UNLIMITED TABLESPACE TO cdcpub;
GRANT SELECT_CATALOG_ROLE TO cdcpub;
GRANT EXECUTE_CATALOG_ROLE TO cdcpub;
GRANT CONNECT, RESOURCE TO cdcpub;

第三步 cdcpub: 创建变更集 (如果使用缺省变更集 SYNC_SET 该步骤可以省略)
BEGIN
DBMS_CDC_PUBLISH.CREATE_CHANGE_SET(
change_set_name => 'CHICAGO_DAILY',
description => 'Change set for job history info',
change_source_name => 'SYNC_SOURCE');
END;

第四步 cdcpub: 创建变更表
BEGIN
DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE(

owner => 'cdcpub',
change_table_name => 'jobhist_ct',
change_set_name => 'CHICAGO_DAILY',
source_schema => 'HR',
source_table => 'JOB_HISTORY',
column_type_list => 'EMPLOYEE_ID NUMBER(6),START_DATE DATE,
END_DATE DATE,JOB_ID VARCHAR2(10),
DEPARTMENT_ID NUMBER(4)',
capture_values => 'both',
rs_id => 'y',
row_id => 'n',
user_id => 'n',
timestamp => 'n',
object_id => 'n',
source_colmap => 'y',
target_colmap => 'y',
options_string => 'TABLESPACE TS_CHICAGO_DAILY');
END;
/


第五步 cdcpub: 给订阅者授权
GRANT SELECT ON cdcpub.jobhist_ct TO subscriber1;


同步 CDC 的订阅步骤

第一步: 找到订阅者可以访问的已发布的源表
SELECT * FROM ALL_SOURCE_TABLES;

SOURCE_SCHEMA_NAME SOURCE_TABLE_NAME
------------------------------ ------------------------------
HR JOB_HISTORY

第二步:找到订阅者可以访问的变更集名称和列名
SELECT UNIQUE CHANGE_SET_NAME, COLUMN_NAME, PUB_ID
FROM ALL_PUBLISHED_COLUMNS
WHERE SOURCE_SCHEMA_NAME ='HR' AND SOURCE_TABLE_NAME = 'JOB_HISTORY';

CHANGE_SET_NAME COLUMN_NAME PUB_ID
---------------- ------------------ ------------
CHICAGO_DAILY DEPARTMENT_ID 34883
CHICAGO_DAILY EMPLOYEE_ID 34883
CHICAGO_DAILY END_DATE 34883
CHICAGO_DAILY JOB_ID 34883
CHICAGO_DAILY START_DATE 34883

/

第三步:创建一个订阅
BEGIN
DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(
change_set_name => 'CHICAGO_DAILY',
description => 'Change data for JOB_HISTORY',
subscription_name => 'JOBHIST_SUB');
END;
/

订阅名称是 "JOBHIST_SUB"

第四步:订阅源表中的哪些列
BEGIN
DBMS_CDC_SUBSCRIBE.SUBSCRIBE(
subscription_name => 'JOBHIST_SUB',
source_schema => 'HR',
source_table => 'JOB_HISTORY',
column_list => 'EMPLOYEE_ID, START_DATE, END_DATE, JOB_ID',
subscriber_view => 'JOBHIST_VIEW');
END;
/

订阅视图名称是 "JOBHIST_VIEW"

第五步:激活订阅
BEGIN
DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(
subscription_name => 'JOBHIST_SUB');
END;
/

第六步:获得下一组变更数据
BEGIN
DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(
subscription_name => 'JOBHIST_SUB');
END;
/

第七步:读取和查询订阅视图中的内容
SELECT EMPLOYEE_ID, START_DATE, END_DATE FROM JOBHIST_VIEW;
EMPLOYEE_ID START_DAT END_DATE
----------- --------- ---------
176 24-MAR-98 31-DEC-98
180 24-MAR-98 31-DEC-98
190 01-JAN-99 31-DEC-99
200 01-JAN-99 31-DEC-99

第八步:清除当前的变更数据
BEGIN
DBMS_CDC_SUBSCRIBE.PURGE_WINDOW(
subscription_name => 'JOBHIST_SUB');
END;
/

第九步:重复第六步到第八步
第十步:删除订阅
BEGIN
DBMS_CDC_SUBSCRIBE.DROP_SUBSCRIPTION(
subscription_name => 'JOBHIST_SUB');
END;
/

 


第二节 PDI 中 Oracle CDC 输入和 Oracle CDC 输出插件的配置

1. 前提

(1) Oracle 数据库的发布者已经完成了数据的发布,(即上述发布步骤的所有步骤)。
(2) Oracle 数据库的订阅者已经完成一个订阅,并激活了该订阅,(即上述订阅步骤的第一步到第五步)。


2. 配置
Oracle CDC 输入插件:只要配置三个选项
(1)数据库连接:注意要以 订阅者 或 DBA 身份连接。
(2)订阅名称:上述订阅步骤第三步中设置的 subscription_name。
(3)订阅视图名称: 上述订阅步骤第四步中设置的 subscriber_view。

Oracle CDC 输出插件:
(1)数据库连接:选择一个要输出的数据库连接。
(2)目标表名称:选择数据库中的一个表。
(3)关键字:目标表里的关键字。

3. 部署

将定义好的转换保存成 ktr 文件,或保存在资源库中。
使用操作系统的调度(linux:corn,windows:计划任务)或 kettle 的作业调度,来定时运行命令(如 pan /file:C:xxx.ktr)。

分享到:
评论

相关推荐

    kettle同步数据库所有的表数据到其他库.rar

    本示例中的"Kettle同步数据库所有的表数据到其他库.rar"是一个具体的应用场景,即利用Kettle将一个数据库的所有表数据同步到另一个具有相同表结构的数据库中。这里,我们不仅讨论如何实现这个功能,还将深入探讨...

    kettle 从oracle数据库导数据到hive 表

    本文旨在详细介绍如何使用 Kettle(Pentaho Data Integration,PDI)进行 Oracle 数据库到 Hive 的数据迁移过程。 #### 技术选型与环境搭建 1. **Kettle 版本**:PDI CE 6.1.0.1-196 2. **Hadoop 版本**:CDH 5.11...

    pdi数据转换及数据库操作

    例如,你可以使用"PDI的数据库连接"组件来配置与特定数据库的连接,然后使用"SQL脚本"步骤执行预定义的DML(数据操纵语言)或DDL(数据定义语言)语句。此外,PDI还支持数据库的元数据获取,可以帮助你了解数据库...

    Kettle生成1亿条数据导入oracle

    总的来说,这个案例展示了Kettle在大数据处理、数据库导入以及优化内存管理方面的强大功能,为其他类似需求提供了参考和实施思路。通过理解并应用这些知识点,我们可以更高效地处理大规模数据任务,提高数据处理的...

    Kettle实现增量抽取数据

    在本场景中,我们将探讨如何使用Kettle从Oracle数据库中的两个表之间实现增量数据抽取,且这个过程并不依赖于时间戳。 首先,理解增量抽取的概念至关重要。增量抽取是从源系统中提取自上次抽取以来发生改变的新数据...

    kettleCDC_oracle_mysql.rar

    标题 "kettleCDC_oracle_mysql.rar" 涉及到的是一个使用 Pentaho Data Integration (Kettle) 实现的数据库变更数据捕获 (CDC) 解决方案,它专注于从 Oracle 数据库迁移到 MySQL 数据库的过程。Oracle 和 MySQL 是两...

    kettle连接oracle的jar包.zip

    正确部署和配置这些JAR文件,是确保Kettle能够有效与Oracle数据库交互的基础,从而实现数据的提取、转换和加载任务。在处理大量数据和复杂数据转换的项目中,理解并掌握这个过程对IT专业人员来说至关重要。

    使用Kettle同步mysql数据,增量同步教程执行步骤

    本文将深入探讨如何使用Kettle工具实现MySQL数据库之间的增量同步。Kettle,也称为Pentaho Data Integration (PDI),是一款强大的ETL(提取、转换、加载)工具,能够高效地处理数据集成工作。 首先,我们需要理解...

    KETTLE oracle驱动

    Kettle,又称为Pentaho Data Integration(PDI),是一款强大的数据集成工具,它允许用户进行数据抽取、转换和加载(ETL)操作。Oracle驱动在Kettle中扮演着至关重要的角色,因为它使得Kettle能够连接到Oracle数据库...

    kettle实现时间戳增量同步案例

    在这个“kettle实现时间戳增量同步案例”中,我们将探讨如何利用Kettle来实现Oracle数据库中基于时间戳的数据增量同步。 1. **理解时间戳增量同步** 时间戳增量同步是指只同步自上次同步以来发生改变的数据,通常...

    etl工具kettle的 数据库jar包 oracle mysql

    通过这种方式,Kettle就可以识别并使用这些数据库驱动,从而实现对Oracle和MySQL数据库的有效操作。了解并正确配置这些驱动对于成功地执行涉及数据库操作的Kettle工作流至关重要。在实际工作中,可能还需要考虑其他...

    使用kettle转换oracle与 mysql的表数据

    在IT行业中,数据转换是一项关键任务,特别是在大数据和企业级数据管理中。Kettle,又称为Pentaho Data Integration...通过理解其工作原理和配置方法,你可以灵活地处理各种数据集成问题,实现数据的无缝迁移和整合。

    Kettle数据库同步源码

    首先,Kettle支持多种数据库系统,如MySQL、Oracle、SQL Server等,能实现不同数据库之间的数据迁移与同步。这种特性使得Kettle成为跨平台数据整合的理想选择。在批量同步数据库表时,Kettle提供了工作流(Job)和...

    Kettle连接Oracle、Mysql驱动包

    Kettle通过JDBC驱动与Oracle和MySQL通信,实现数据的读取、写入和查询。因此,"Kettle驱动包"实际上指的是Kettle所需的Oracle和MySQL的JDBC驱动程序。 安装Oracle JDBC驱动(也称为ojdbc)通常涉及以下步骤: 1. ...

    kettler连接oracle 12c数据库(目前只能用JNDI)

    在数据集成领域,Kettle 是一款广泛使用的开源ETL(Extract-Transform-Load)工具,它提供了强大的功能来帮助用户从不同的数据源抽取数据、进行数据清洗与转换,并加载到目标系统中。本文将详细介绍如何利用Kettler...

    etl实现不同数据库实时备份

    在"etl实现不同数据库实时备份"这个主题中,我们将探讨如何使用ETL工具来实现实时地备份Oracle、SQL Server和MySQL这三种主流数据库的数据。 1. 数据抽取:ETL过程的第一步是数据抽取。对于Oracle、SQL Server和...

    kettle7.0下实现数据库迁移

    Kettle,又称Pentaho Data Integration (PDI),是一款强大的ETL(提取、转换、加载)工具,它提供了图形化的界面来设计和执行数据迁移任务。在Kettle 7.0版本中,其稳定性和性能得到了进一步提升,使得数据库迁移变...

    使用Kettle同步mysql数据,增量同步

    在处理MySQL数据库的数据同步任务时,Kettle能够提供增量同步的功能,有效实现数据的实时更新。增量同步通常指的是只同步自上次同步以来发生改变的数据,而不是对整个数据库内容进行全量同步,这样能够节省网络带宽...

    异构数据的同步——kettle

    Kettle,又称为Pentaho Data Integration(PDI),由Spoon、Kitchen、Pan等组件组成,能够支持多种数据库、文件系统和其他数据存储系统,为数据整合提供了灵活且高效的方式。 在数据处理领域,异构数据同步是一个...

Global site tag (gtag.js) - Google Analytics