`
BucketLi
  • 浏览: 194265 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
博客专栏
5a76a659-f8e6-3bf3-b39a-8ae8f7a0f9d9
Percolator与分布...
浏览量:5651
社区版块
存档分类
最新评论

ORACLE CDC增量同步初始化

 
阅读更多
  // Step 1 Find the source tables for which the subscriber has access
        // privileges.


rs = stmt.executeQuery("SELECT * FROM ALL_SOURCE_TABLES");
            sources = new ArrayList<OracleCDCSource>();
            while (rs.next())
            {
                String srcSchema = rs.getString("SOURCE_SCHEMA_NAME");
                String srcTable = rs.getString("SOURCE_TABLE_NAME");
                if (logger.isDebugEnabled())
                    logger.debug("Subscribing to " + srcSchema + "." + srcTable);
                sources.add(new OracleCDCSource(srcSchema, srcTable));
            }



             // Step 2 Find the change set names and columns for which the subscriber
        // has access privileges.


for (Iterator<OracleCDCSource> iterator = sources.iterator(); iterator
                .hasNext();)
        {
            OracleCDCSource src = iterator.next();
            rs = stmt
                        .executeQuery("SELECT UNIQUE CHANGE_SET_NAME, PUB.COLUMN_NAME,"
                                + " PUB_ID, COL.COLUMN_ID "
                                + " FROM ALL_PUBLISHED_COLUMNS PUB, ALL_TAB_COLUMNS COL "
                                + " WHERE SOURCE_SCHEMA_NAME = '"
                                + src.getSchema()
                                + "'"
                                + " AND SOURCE_TABLE_NAME = '"
                                + src.getTable()
                                + "'"
                                + " AND SOURCE_SCHEMA_NAME = COL.OWNER "
                                + " AND SOURCE_TABLE_NAME = COL.TABLE_NAME"
                                + " AND PUB.COLUMN_NAME = COL.COLUMN_NAME"
                                + " ORDER BY COL.COLUMN_ID");

                while (rs.next())
                {
                    String changeSetName = rs.getString("CHANGE_SET_NAME");
                    String columnName = rs.getString("COLUMN_NAME");
                    long pubId = rs.getLong("PUB_ID");
                    src.addPublication(changeSetName, columnName, pubId);

                    changeSets.add(changeSetName);
                    if (logger.isDebugEnabled())
                        logger.debug("Found column " + changeSetName + "\t"
                                + columnName + "\t" + pubId);
                }
            }

           
// Step 3 Create subscriptions.

   subscriberViews = new HashMap<String, OracleCDCSource>();

        for (Iterator<OracleCDCSource> iterator = sources.iterator(); iterator
                .hasNext();)
        {
            OracleCDCSource src = iterator.next();
            Map<Long, OracleCDCPublication> publications = src
                    .getPublications();

            StringBuffer subscribeStmt = new StringBuffer();
            for (OracleCDCPublication pub : publications.values())
            {
                if (changeSets.remove(pub.getPublicationName()))
                {
                    if (logger.isDebugEnabled())
                        logger.debug("Creating subscription to "
                                + pub.getPublicationName());

                    /*
                     * Dropping subscription if it already exists : this can
                     * happen if release code was not called
                     */
                    executeQuery("BEGIN DBMS_CDC_SUBSCRIBE.DROP_SUBSCRIPTION("
                            + "subscription_name => 'TUNGSTEN_PUB');END;", true);

                    executeQuery(
                            "BEGIN DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION("
                                    + "change_set_name => '"
                                    + pub.getPublicationName()
                                    + "', description => 'Change data used by Tungsten', "
                                    + "subscription_name => 'TUNGSTEN_PUB"
                                    + "');end;", false);
                }


// Step 4 Subscribe to a source table and the columns in the
                // source table.


 String viewName = "VW_TUNGSTEN_CDC" + i;
                subscribeStmt
                        .append("DBMS_CDC_SUBSCRIBE.SUBSCRIBE(subscription_name => 'TUNGSTEN_PUB"
                                + "', "
                                + "publication_id    => "
                                + pub.getPublicationId()
                                + ","
                                + "column_list => '"
                                + pub.getColumnList()
                                + "',"
                                + "subscriber_view => '"
                                + viewName
                                + "');");

                subscriberViews.put(viewName, src);
                src.setSubscriptionView(viewName, pub.getPublicationId());

                if (logger.isDebugEnabled())
                    logger.debug("Creating change view " + viewName
                            + " - Now handling "
                            + subscriberViews.keySet().size() + " views");

                i++;

            }

            executeQuery("BEGIN " + subscribeStmt.toString() + " END;", false);
        }


// Step 5 Activate the subscription.
           
   executeQuery("BEGIN DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION("
                + "subscription_name => 'TUNGSTEN_PUB'" + ");END;", false);
分享到:
评论

相关推荐

    Oracle CDC 数据同步9i/10G

    Oracle Change Data Capture (CDC) 是 Oracle 数据库提供的一项高级特性,用于捕捉数据库中的更改数据,以便在异步环境中实现高效的数据同步。Oracle CDC 特别适用于大数据集成、实时数据分析和企业级应用程序之间的...

    Oracle的增量数据处理方案

    Oracle CDC 增量数据处理方案 Oracle CDC(Change Data Capture)是一种数据增量处理技术,在构建数据仓库系统的 ETL 过程中,增量数据的抽取是一个非常关键的环节。CDC 能够帮助识别从上次提取之后发生变化的数据...

    oracle cdc同步模式step by step

    ### Oracle CDC 同步模式详解 #### 一、概述 Oracle CDC (Change Data Capture) 是一种用于捕获数据库中表的变化并记录这些变化的技术。它主要用于实现数据的实时同步、审计跟踪以及支持业务智能等场景。本文将...

    流批一体Streamsets Data Collector(SDC)管道Oracle CDC实时同步配置文档

    ### 流批一体Streamsets Data Collector(SDC)管道Oracle CDC实时同步配置详解 #### 一、概述 在大数据处理领域,Streamsets Data Collector (SDC)作为一种强大的数据集成工具,被广泛应用于各种复杂的数据流场景中...

    使用 PDI 和 Oracle CDC 来实现Oracle 数据库向其他数据库的数据同步

    "使用 PDI 和 Oracle CDC 来实现 Oracle 数据库向其他数据库的数据同步" PDI(Pentaho Data Integration)是一种商业智能解决方案,旨在帮助用户快速、可靠地集成、转换和处理大量数据。Oracle CDC(Change Data ...

    kettle实现时间戳增量同步案例

    在这个“kettle实现时间戳增量同步案例”中,我们将探讨如何利用Kettle来实现Oracle数据库中基于时间戳的数据增量同步。 1. **理解时间戳增量同步** 时间戳增量同步是指只同步自上次同步以来发生改变的数据,通常...

    oracle-nc数据库初始化脚本

    NC6系列的数据库oracle 11g初始化语句。 Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上...

    ORACLE CDC介绍

    Oracle CDC(Change Data Capture)是 Oracle 在数据库级别实现的增量抽取解决方案。CDC 可以在数据库层面上直接实现增量抽取功能,具有很高的性能和实时性。CDC 有两个模式:同步模式和异步模式。同步模式主要是...

    CDC(Change Data Capture)增量抽取

    - **ChangeSource**: 指增量数据的捕获来源,如同步CDC模式是从源数据库直接获取的,而异步CDC模式则是从日志文件中获取的。 - **ChangeSet**: 表示一组逻辑上相关的增量数据集合,需要保证数据的一致性。 - **...

    Kettle增量同步.rar

    Kettle增量同步,里边包含项目文件...初次更新 需要初始化 edp_etl_record表 里面 data_range_fm、data_range_to 字段为 需要抽取数据表中的 min(唯一自动或时间戳),此时表示同步全量,执行完毕后第二次开始同步增量

    java使用datax增量同步代码

    1. **初始化配置**:创建 DataX 作业配置文件,设置源数据库和目标数据库的信息,包括连接地址、用户名、密码等,以及同步的表名和增量字段。 2. **选择增量策略**:根据数据表实际情况,选择时间戳或自增 ID 作为...

    ODI工具CDC增量数据抽取

    ### ODI工具CDC增量数据抽取知识点 #### 一、ODI与CDC概念解析 - **ODI (Oracle Data Integrator)**:是一种企业级的数据集成工具,由Oracle公司提供,主要用于ETL(Extract, Transform, Load)操作,即数据的提取、...

    oracle cdc

    Oracle CDC(Change Data Capture)是一种高效的数据同步技术,主要用于在异构环境中实现实时数据捕捉、转换及传递。它通过监控数据库的日志文件来检测数据变化,并将这些变化数据进行捕获、记录和传输。此技术对于...

    解决ORA-01033 ORACLE 正在初始化或关闭

    ### 解决ORA-01033:Oracle正在初始化或关闭 #### 背景与问题解析 在Oracle数据库管理中,遇到ORA-01033错误是较为常见的问题之一,该错误通常出现在数据库启动或关闭的过程中。ORA-01033错误的全称是“ORA-01033:...

    oracle_cdc

    ### Oracle CDC 技术详解:实现数据库级别的增量数据同步 #### 概述 Oracle CDC(Change Data Capture)技术,作为Oracle数据库的一项核心功能,旨在提供一种高效的数据同步方案,尤其适用于处理增量数据的抽取与...

    深入解析 ORACLE 数据库的初始化pdf

    《深入解析 ORACLE 数据库的初始化》是由知名数据库专家盖国强编著的一本专业书籍,专注于探讨 Oracle 数据库在启动、配置和管理过程中的各项技术细节。这本书以丰富的实例和深入的理论相结合,旨在帮助读者全面理解...

    Oracle数据库逻辑增量备份之exp/imp

    逻辑增量备份的主要目标是在最小化系统停机时间和资源消耗的同时,提供对最新数据的保护。这包括备份新插入、更新或删除的数据,以便在需要时能够快速恢复到特定的时间点。 1二、逻辑备份恢复工具exp/imp exp...

    oracle 增量恢复详细记录

    ### Oracle 增量恢复详解 #### 一、Oracle RMAN 增量备份概述 在Oracle数据库管理系统中,RMAN(Recovery Manager)是一种强大的工具,用于执行数据库的备份与恢复操作。其中,增量备份作为一种高效的方式,在实际...

    深入解析Oracle数据库的初始化

    深入解析Oracle-数据库的初始化,适用于kindle的mobi版本

Global site tag (gtag.js) - Google Analytics