精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2009-02-01
方案理念--四化理念
架构通过视图实现来至两个不同数据库的表的结构完全一致, 在结构完全相同的两个表之间进行数据同步, 问题变得相当简单. 同步代码如下.
ETL
---初始同步 delete from ods_table; insert into v_table select * from db_table; commit;
---新增同步 insert into v_table select * from db_table t where t.id not in (select id from v_table);
commit;
---变更同步
update ods_table t set t.c_number = (select db.c_number from db_table db where db.id = t.id) where t.c_number != (select db.c_number from db_table db where db.id = t.id);
commit; 实现两个表结构完全一致的方法如下---建表
CREATE SEQUENCE SEQ_ETL_INCREASE_ID INCREMENT BY 1 START WITH 1 NOCACHE;
/*==============================================================*/ /* Table: ETL_TABLES */ /*==============================================================*/ CREATE TABLE ETL_TABLES ( "ID" NUMBER DEFAULT -1 NOT NULL, "TABLE_NAME" VARCHAR2(100) NOT NULL, "TABLE_TYPE" VARCHAR2(30) NOT NULL, "TABLE_ROOT_IN" VARCHAR2(30), "TABLE_NEED_CREATE_VIEW" NUMBER DEFAULT 1, "TABLE_CREATE_VIEW_NAME_PREFIX" VARCHAR2(30) DEFAULT 'v', "DB_LINK_NAME" VARCHAR2(100), "CURRENT_VERSION" NUMBER DEFAULT 1 NOT NULL, "VERSION_HISTORY" VARCHAR2(3000) DEFAULT 'init input' NOT NULL, "DEVELOP_DATE" DATE DEFAULT SYSDATE NOT NULL, "DEVELOP_BY" VARCHAR2(100) DEFAULT 'cyyan@isoftstone' NOT NULL, "LAST_MAINTAIN_DATE" DATE DEFAULT SYSDATE NOT NULL, "LAST_MAINTAIN_BY" VARCHAR2(100) DEFAULT 'cyyan@isoftstone' NOT NULL, "MEMO" VARCHAR2(500), "STATUS" NUMBER DEFAULT 1, CONSTRAINT PK_ETL_TABLES PRIMARY KEY ("ID") );
COMMENT ON TABLE ETL_TABLES IS '此表用于维护ETL涉及到所有表, 包括: 1, db---业务系统数据库 2, ods---操作数据数据库 3, dw---数据仓库';
/*==============================================================*/ /* Table: ETL_VIEWS */ /*==============================================================*/ CREATE TABLE ETL_VIEWS ( "ID" NUMBER DEFAULT -1 NOT NULL, "VIEW_NAME" VARCHAR2(100) NOT NULL, "VIEW_TYPE" VARCHAR2(30) NOT NULL, "VIEW_ROOT_IN" VARCHAR2(30), "VIEW_SELECT" VARCHAR2(4000) NOT NULL, "VIEW_FROM" VARCHAR2(600) NOT NULL, "VIEW_WHERE" VARCHAR2(2000), "VIEW_ORDER_BY" VARCHAR2(600), "VIEW_GROUP_BY" VARCHAR2(600), "VIEW_HAVING" VARCHAR2(600), "VIEW_DB_LINK_NAME" VARCHAR2(100), "CURRENT_VERSION" NUMBER DEFAULT 1 NOT NULL, "VERSION_HISTORY" VARCHAR2(3000) DEFAULT 'init input' NOT NULL, "DEVELOP_DATE" DATE DEFAULT SYSDATE NOT NULL, "DEVELOP_BY" VARCHAR2(100) DEFAULT 'cyyan@isoftstone' NOT NULL, "LAST_MAINTAIN_DATE" DATE DEFAULT SYSDATE NOT NULL, "LAST_MAINTAIN_BY" VARCHAR2(100) DEFAULT 'cyyan@isoftstone' NOT NULL, "MEMO" VARCHAR2(500), "STATUS" NUMBER DEFAULT 1, CONSTRAINT PK_ETL_VIEWS PRIMARY KEY ("ID") );
COMMENT ON TABLE ETL_VIEWS IS '此表用于维护ETL涉及到所有视图, 包括: 1, v1---db表中与ods对应到视图 2, v2---ods表中与db对应到视图 3, v3---ods表中与dw对应到视图 4, v4---dw表中与ods中对应到视图';
/*==============================================================*/ /* Table: ETLS */ /*==============================================================*/ CREATE TABLE ETLS ( "ID" NUMBER NOT NULL, "ETL_NAME" VARCHAR2(300) NOT NULL, "ETL_TYPE" VARCHAR2(30) NOT NULL, "ETL_SRC_VIEW_OR_TABLE" NUMBER NOT NULL, "ETL_DES_VIEW_OR_TABLE" NUMBER NOT NULL, "ETL_INIT_ENABLE" NUMBER(1) DEFAULT 1 NOT NULL, "ETL_ADD_ENABLE" NUMBER(1) DEFAULT 1 NOT NULL, "ETL_CHARGE_ENABLE" NUMBER(1) DEFAULT 1 NOT NULL, "CURRENT_VERSION" NUMBER DEFAULT 1 NOT NULL, "VERSION_HISTORY" VARCHAR2(3000) DEFAULT 'init input' NOT NULL, "DEVELOP_DATE" DATE DEFAULT SYSDATE NOT NULL, "DEVELOP_BY" VARCHAR2(100) DEFAULT 'cyyan@isoftstone' NOT NULL, "LAST_MAINTAIN_DATE" DATE DEFAULT SYSDATE NOT NULL, "LAST_MAINTAIN_BY" VARCHAR2(100) DEFAULT 'cyyan@isoftstone' NOT NULL, "MEMO" VARCHAR2(500), "STATUS" NUMBER DEFAULT 1, CONSTRAINT PK_ETLS PRIMARY KEY ("ID") );
COMMENT ON TABLE ETLS IS '此表用于维护ETL转换时设计到源表和目的表
源表(或视图)--->目的表(或视图)- (推荐全部使用视图, 视图具有更过到灵活性, 而且更统一)
整体架构是在完全相同两张表(或视图)之间进行同步处理
规范: 1, 源表(或视图)-和目的表(或视图)-完全相同 2, 目的视图必须是单表'; --存储过程
/*==============================================================*/
/* Database name: %DATABASE% */
/* DBMS name: ORACLE Version 10g */
/* Created on: 2009-2-1 23:29:27 */
/*==============================================================*/
-- INTEGRITY PACKAGE DECLARATION
CREATE OR REPLACE PACKAGE INTEGRITYPACKAGE AS
PROCEDURE INITNESTLEVEL;
FUNCTION GETNESTLEVEL RETURN NUMBER;
PROCEDURE NEXTNESTLEVEL;
PROCEDURE PREVIOUSNESTLEVEL;
END INTEGRITYPACKAGE;
/
-- INTEGRITY PACKAGE DEFINITION
CREATE OR REPLACE PACKAGE BODY INTEGRITYPACKAGE AS
NESTLEVEL NUMBER;
-- PROCEDURE TO INITIALIZE THE TRIGGER NEST LEVEL
PROCEDURE INITNESTLEVEL IS
BEGIN
NESTLEVEL := 0;
END;
-- FUNCTION TO RETURN THE TRIGGER NEST LEVEL
FUNCTION GETNESTLEVEL RETURN NUMBER IS
BEGIN
IF NESTLEVEL IS NULL THEN
NESTLEVEL := 0;
END IF;
RETURN(NESTLEVEL);
END;
-- PROCEDURE TO INCREASE THE TRIGGER NEST LEVEL
PROCEDURE NEXTNESTLEVEL IS
BEGIN
IF NESTLEVEL IS NULL THEN
NESTLEVEL := 0;
END IF;
NESTLEVEL := NESTLEVEL + 1;
END;
-- PROCEDURE TO DECREASE THE TRIGGER NEST LEVEL
PROCEDURE PREVIOUSNESTLEVEL IS
BEGIN
NESTLEVEL := NESTLEVEL - 1;
END;
END INTEGRITYPACKAGE;
/
CREATE OR REPLACE PROCEDURE PRO_CREATE_VIEW_BY_ETL_VIEWS
AS
--------------PRO_CREATE_VIEW_BY_ETL_VIEWS------------------------
-- CREATED ON 2009-2-1 BY CYYAN@ISOFTSTONE
-- 功能 : 根据ETL_VIEWS中到数据生成视图
------------------------------------------------------------------------------
VIEW_CREATE_CODE VARCHAR2(10000); --生成视图到代码
VIEW_NAME VARCHAR2(100); --视图名称
VIEW_SELECT VARCHAR2(4000); --视图的SELECT部分
VIEW_FROM VARCHAR2(300); --视图的FROM部分
VIEW_WHERE VARCHAR2(3000); --视图的WHERE部分
VIEW_ORDER_BY VARCHAR2(600); --视图的ORDER BY部分
VIEW_GROUP_BY VARCHAR2(600); --视图的GROUP BY部分
VIEW_HAVING VARCHAR2(600); --视图的HAVING部分
VIEW_DB_LINK_NAME VARCHAR2(100); --视图的DB LINK部分
ROW_COUNT NUMBER; --行数
CURSOR ETL_VIEWS_CURSOR IS --提取创建视图需要到信息
SELECT VIEW_NAME, VIEW_SELECT, VIEW_FROM, VIEW_WHERE, VIEW_ORDER_BY, VIEW_GROUP_BY, VIEW_HAVING, VIEW_DB_LINK_NAME FROM ETL_VIEWS T WHERE T.CURRENT_VERSION = (SELECT MAX(T2.CURRENT_VERSION) FROM ETL_VIEWS T2 WHERE T.VIEW_NAME = T2.VIEW_NAME);
BEGIN
-- 统计行数
SELECT COUNT(*) INTO ROW_COUNT FROM ETL_VIEWS T WHERE T.CURRENT_VERSION = (SELECT MAX(T2.CURRENT_VERSION) FROM ETL_VIEWS T2 WHERE T.VIEW_NAME = T2.VIEW_NAME);
OPEN ETL_VIEWS_CURSOR; --打开游标
FOR I IN 1 .. ROW_COUNT LOOP --遍历
FETCH ETL_VIEWS_CURSOR
INTO VIEW_NAME, VIEW_SELECT, VIEW_FROM, VIEW_WHERE, VIEW_ORDER_BY, VIEW_GROUP_BY, VIEW_HAVING, VIEW_DB_LINK_NAME;
---拼接创建视图到语句
VIEW_CREATE_CODE := 'create or replace view ' || VIEW_NAME || ' as select ' || VIEW_SELECT || ' from ' || VIEW_FROM;
IF VIEW_DB_LINK_NAME IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || '@' || VIEW_DB_LINK_NAME;
END IF;
IF VIEW_WHERE IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || ' where ' || VIEW_WHERE;
END IF;
IF VIEW_ORDER_BY IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || ' order by ' || VIEW_ORDER_BY;
END IF;
IF VIEW_GROUP_BY IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || ' group by ' || VIEW_GROUP_BY;
END IF;
IF VIEW_HAVING IS NOT NULL THEN
VIEW_CREATE_CODE := VIEW_CREATE_CODE || ' having ' || VIEW_HAVING;
END IF;
--输出创建语句
--DBMS_OUTPUT.PUT_LINE(VIEW_CREATE_CODE);
--DBMS_OUTPUT.PUT_LINE('');
--执行创建视图
EXECUTE IMMEDIATE VIEW_CREATE_CODE;
END LOOP;
CLOSE ETL_VIEWS_CURSOR; --关闭游标
END;
/
CREATE OR REPLACE PROCEDURE PRO_INSERT_INTO_ETL_VIEWS
AS
--ADD BY CYYAN@ISOFTSTONE
--2009年2月1日21:33:37
---此存储过程用于 将ETL_TABLE中标识需要创建VIEW 到TABLE, 进行自动提起转换到ETL_VIEWS中.
--处理过程用到啦系统表COL从此表中获取列名
TABLE_NAME VARCHAR2(100); --表名
COL_NAME VARCHAR2(100); --列名
TABLE_COUNT NUMBER; --表到行数
--COL_COUNT NUMBER; --列数
ETL_VIEWS_INSERT_CODE VARCHAR2(600); --插入语句到 INSERT部分
ETL_VIEWS_VALUES_CODE VARCHAR2(16000); --插入语句到VALUES部分
--ETL_VIEWS的到列
VIEW_NAME_PREFIX VARCHAR2(30);--实体名到前缀
TABLE_TYPE VARCHAR2(30); --表类型 如 DB, ODS, DW
TABLE_ROOT_IN VARCHAR2(30); --表来源, 来自那个系统, 如资金系统"NHZJ", 财务系统"NHCW"
VIEW_SELECT VARCHAR2(10000); --VIEW 语句到SELECT部分, 这个需要遍历一个表到所有列
DB_LINK_NAME VARCHAR2(100);
CURRENT_VERSION VARCHAR2(600); --版本部分, 这里没更新, 只要全部删除, 或不断插入, 此字段定义了版本, 没有变更都形成新到版本, 取值是取最大值
CURSOR_NUMBER NUMBER;
COL_SELECT_SQL VARCHAR2(100);
RETURN_VALUE NUMBER;
--从ETL_TABLES中查询需要生成视图到表
CURSOR DB_TABLES_CURSOR IS
SELECT UPPER(TABLE_NAME), T.TABLE_TYPE, T.TABLE_ROOT_IN, T.TABLE_CREATE_VIEW_NAME_PREFIX, DB_LINK_NAME FROM ETL_TABLES T WHERE (UPPER(T.TABLE_TYPE) = 'DB' OR UPPER(T.TABLE_TYPE) = 'DW' ) AND T.TABLE_NEED_CREATE_VIEW = 1;
--CURSOR_NUMBER NUMBER; --游标 OLD WAY 执行需要, NEW WAY 不需要
--RETURN_VALUE NUMBER; --执行后返回值 OLD WAY 执行需要, NEW WAY 不需要
BEGIN
-- TEST STATEMENTS HERE
SELECT COUNT(*) INTO TABLE_COUNT FROM ETL_TABLES T WHERE (UPPER(T.TABLE_TYPE) = 'DB' OR UPPER(T.TABLE_TYPE) = 'DW' ) AND T.TABLE_NEED_CREATE_VIEW = 1;
--构造INSERT部分
ETL_VIEWS_INSERT_CODE := 'insert into etl_views(view_name, view_type, view_root_in, view_select, view_from, current_version, VIEW_DB_LINK_NAME) ';
OPEN DB_TABLES_CURSOR;
FOR I IN 1 .. TABLE_COUNT LOOP --表遍历
FETCH DB_TABLES_CURSOR
INTO TABLE_NAME, TABLE_TYPE, TABLE_ROOT_IN, VIEW_NAME_PREFIX, DB_LINK_NAME;
--构造VALUES部分
ETL_VIEWS_VALUES_CODE := 'values(''' || VIEW_NAME_PREFIX || TABLE_NAME || ''', ''' || TABLE_TYPE || ''', ''' || TABLE_ROOT_IN || '''';
DBMS_OUTPUT.PUT(TABLE_NAME);
/* 使用CURSOR遍历列到方法 不适用于DB_LINK
--准备遍历列
SELECT COUNT(*) INTO COL_COUNT FROM COL@DB_LINK_NHZJ WHERE COL.TNAME = UPPER(TABLE_NAME);
DBMS_OUTPUT.PUT_LINE(' table has ' || COL_COUNT || ' cols');
DECLARE
CURSOR COLS_CURSOR IS
SELECT C.CNAME FROM COL@DB_LINK_NHZJ C WHERE C.TNAME = UPPER(TABLE_NAME);
BEGIN
OPEN COLS_CURSOR;
VIEW_SELECT := '';
--下面用逗号拼接列
FETCH COLS_CURSOR --遍历第一列
INTO COL_NAME;
VIEW_SELECT := VIEW_SELECT || COL_NAME;
FOR J IN 2 .. COL_COUNT LOOP --遍历后面到列
FETCH COLS_CURSOR
INTO COL_NAME;
DBMS_OUTPUT.PUT_LINE(' ' || COL_NAME);
VIEW_SELECT := VIEW_SELECT || ', ' || COL_NAME;
END LOOP;
CLOSE COLS_CURSOR;
END;
--DBMS_OUTPUT.PUT_LINE(VIEW_SELECT);
-- DBMS_OUTPUT.PUT_LINE(ETL_VIEWS_VALUES_CODE);
*/
/* 使用 DBMS_SQL */
-- ANOTHER WAY USER DBMS_SQL PACKAGE
COL_SELECT_SQL := 'select t.cname from sys.col@' || DB_LINK_NAME || ' T where T.tname = ''' || TABLE_NAME || '''';
--SQL_CODE := 'select t.cname from sys.col T where T.tname = ''' || TABLE_NAME || '''';
CURSOR_NUMBER := DBMS_SQL.OPEN_CURSOR();
DBMS_SQL.PARSE(CURSOR_NUMBER, COL_SELECT_SQL, DBMS_SQL.NATIVE);
DBMS_SQL.DEFINE_COLUMN(CURSOR_NUMBER,1,COL_NAME, 100);
RETURN_VALUE := DBMS_SQL.EXECUTE(CURSOR_NUMBER);
DBMS_OUTPUT.PUT_LINE(' RETURN_VALUE = ' || RETURN_VALUE);
RETURN_VALUE := DBMS_SQL.FETCH_ROWS(CURSOR_NUMBER); --获取第一列
DBMS_SQL.COLUMN_VALUE(CURSOR_NUMBER,1,COL_NAME);
VIEW_SELECT := COL_NAME;
WHILE DBMS_SQL.FETCH_ROWS(CURSOR_NUMBER)<>0 LOOP ---遍历其它到列
DBMS_SQL.COLUMN_VALUE(CURSOR_NUMBER,1,COL_NAME);
DBMS_OUTPUT.PUT_LINE(COL_NAME);
VIEW_SELECT := VIEW_SELECT || ', ' || COL_NAME;
END LOOP;
-- DBMS_OUTPUT.PUT_LINE('VIEW_SELECT : ' || VIEW_SELECT);
DBMS_SQL.CLOSE_CURSOR(CURSOR_NUMBER);
--生成最新到版本号: 视图名称是唯一的
SELECT NVL(MAX(CURRENT_VERSION),0) + 1 INTO CURRENT_VERSION FROM ETL_VIEWS V WHERE V.VIEW_NAME = VIEW_NAME_PREFIX || TABLE_NAME;
ETL_VIEWS_VALUES_CODE := ETL_VIEWS_VALUES_CODE || CHR(10) || ', ''' || VIEW_SELECT || '''' || CHR(10) || ', ''' || TABLE_NAME || ''', ''' || CURRENT_VERSION || ''', ''' || DB_LINK_NAME || ''')';
--输出插入到语句
--DBMS_OUTPUT.PUT_LINE(ETL_VIEWS_INSERT_CODE);
--DBMS_OUTPUT.PUT_LINE(ETL_VIEWS_VALUES_CODE);
--DBMS_OUTPUT.PUT_LINE('');
--DBMS_STANDARD.
--执行插入语句
-- NEW WAY
EXECUTE IMMEDIATE ETL_VIEWS_INSERT_CODE || ETL_VIEWS_VALUES_CODE;
/*
-- OLD WAY
CURSOR_NUMBER := DBMS_SQL.OPEN_CURSOR();
DBMS_SQL.PARSE(CURSOR_NUMBER, ETL_VIEWS_INSERT_CODE, DBMS_SQL.NATIVE);
RETURN_VALUE := DBMS_SQL.EXECUTE(CURSOR_NUMBER);
DBMS_SQL.CLOSE_CURSOR(CURSOR_NUMBER);
*/
END LOOP;
COMMIT; --提交
CLOSE DB_TABLES_CURSOR; --关闭游标
--EXCEPTION
--ROLLBACK;
END;
/
CREATE OR REPLACE TRIGGER TRG_ID_ON_ETLS
BEFORE INSERT ON ETLS
FOR EACH ROW
DECLARE
-- NOTHING
BEGIN
SELECT SEQ_ETL_INCREASE_ID.NEXTVAL INTO :NEW.ID FROM DUAL;
END TRIGGER_ID_INCREASE;
/
CREATE OR REPLACE TRIGGER TRG_ID_ON_ETL_TABLES
BEFORE INSERT ON ETL_TABLES
FOR EACH ROW
DECLARE
-- NOTHING
BEGIN
SELECT SEQ_ETL_INCREASE_ID.NEXTVAL INTO :NEW.ID FROM DUAL;
END TRIGGER_ID_INCREASE;
/
CREATE OR REPLACE TRIGGER TRG_ID_ON_ETL_VIEWS
BEFORE INSERT ON ETL_VIEWS
FOR EACH ROW
DECLARE
-- NOTHING
BEGIN
SELECT SEQ_ETL_INCREASE_ID.NEXTVAL INTO :NEW.ID FROM DUAL;
END TRIGGER_ID_INCREASE;
/
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
浏览 5126 次