`

flexible pipelined functions(原创)

 
阅读更多

flexible pipelined functions
Pipelined functions, type polymorphism (type hierarchies and substitution) and multi-table inserts are three features from the Oracle 9i timeframe. Pipelined functions are used primarily in ETL applications (extract, transform and load) for efficient processing of complex transformations. Type substitution is used for modelling complex data types and relationships in more object-oriented applications. Multi-table inserts (another ETL feature of Oracle) are used to load one or more tables from a single rowsource.
In this article we will combine these features by using type polymorphism to return multiple record structures from a single pipelined function. We will load the resulting dataset into multiple tables using multi-table insert.
It is assumed that readers are familiar with the technologies being described in this article.
why is this technique significant
This technique is significant on two counts.

First, it is usually the case that pipelined functions will return a single, known record structure (unless using complex ANYDATASET structures made available by Oracle's Data Cartridge). The Oracle documentation and other articles often show pipelined functions transforming a single input record into two output records, but always of the same structure. This article will show how single input records can be transformed into multiple output rows of different structures.

--可以返回不同结构的type
Second, this technique solves a very particular, but common, performance problem. It is quite common in traditional batch environments (such as data warehouses) to receive source data in flat-files. It is equally common that some of these files contain data for more than one target table in the data warehouse. Assuming that the data transformations that must take place are complex (i.e. too complex for SQL), a typical loading approach is as follows (in pseudo-code).

--解决了数据源进行多次转换和导入到性能问题
FOR rec IN (SELECT * FROM source_data) LOOP
   ...prepare table_A variables...
   ...prepare table_B variables...
   ...prepare table_C variables...
   INSERT INTO table_A VALUES (...);
   INSERT INTO table_B VALUES (...);
   INSERT INTO table_C VALUES (...);
END LOOP;
This is a simple process to code and understand, yet it is inefficient and slow. Using the techniques we will describe in this article, we will combine the efficiency of bulk SQL, parallel pipelined functions and substitutable types to change this load to something that resembles the following pseudo-code.
INSERT FIRST
   --
   WHEN (record is of table_A format)
   INTO table_A VALUES (...)
   --
   WHEN (record is of table_B format)
   INTO table_B VALUES (...)
   --
   WHEN (record is of table_C format)
   INTO table_C VALUES (...)
   --
SELECT ...
FROM   TABLE(parallel_pipelined_function(CURSOR(...)));
setup
We will begin by setting up our sample application. We are going use a simplified investment trading model for our demonstrations. This will include:
    TRADES: this will store information on a trade that takes place between two counterparties;
    TRADE_LEGS: child of TRADES. A single trade might comprise multiple legs (e.g. a swap trade). In our application, we are going to ensure that every trade has two legs; and
    LEG_VALUATIONS: child of TRADE_LEGS. Each leg of a trade will be valued every day that it is active (i.e. not matured).
target tables
Given this simple model, we can now create our target tables. Note that all tables are defined with PARALLEL because we will exploit this later when we come to using our pipelined function.
SQL> CREATE TABLE trades
  2  ( trade_id           NUMBER
  3  , product_type       VARCHAR2(10)
  4  , counterparty       VARCHAR2(30)
  5  , trade_timestamp    TIMESTAMP
  6  , trading_book       VARCHAR2(30)
  7  , maturity_date      DATE
  8  , CONSTRAINT trades_pk
  9       PRIMARY KEY (trade_id)
 10  )
 11  PARALLEL;
Table created.
SQL> CREATE TABLE trade_legs
  2  ( trade_id           NUMBER
  3  , leg_no             NUMBER
  4  , trade_amount       NUMBER
  5  , currency           VARCHAR2(3)
  6  , trade_price        NUMBER
  7  , CONSTRAINT trade_legs_pk
  8       PRIMARY KEY (trade_id, leg_no)
  9  , CONSTRAINT trade_legs_fk01
 10       FOREIGN KEY (trade_id)
 11       REFERENCES trades (trade_id)
 12  )
 13  PARALLEL;
Table created.
SQL> CREATE TABLE leg_valuations
  2  ( trade_id           NUMBER
  3  , leg_no             NUMBER
  4  , valuation_date     DATE
  5  , market_value       NUMBER
  6  , CONSTRAINT leg_valuations_pk
  7       PRIMARY KEY (trade_id, leg_no, valuation_date)
  8  , CONSTRAINT leg_valuations_fk01
  9       FOREIGN KEY (trade_id, leg_no)
 10       REFERENCES trade_legs (trade_id, leg_no)
 11  )
 12  PARALLEL;
Table created.
source data
For our demonstration, we require some source data. This is typically provided in flat-files and loaded into a staging table (or made available directly via an external table). For simplicity, we will create a staging table and populate it with data as follows. Again, this table is defined with PARALLEL.
SQL> CREATE TABLE trades_staging
  2  ( trade_id           NUMBER
  3  , leg_no             NUMBER
  4  , valuation_date     DATE
  5  , product_type       VARCHAR2(10)
  6  , counterparty       VARCHAR2(30)
  7  , trade_timestamp    TIMESTAMP
  8  , trading_book       VARCHAR2(30)
  9  , maturity_date      DATE
 10  , trade_amount       NUMBER
 11  , currency           VARCHAR2(3)
 12  , trade_price        NUMBER
 13  , market_value       NUMBER
 14  )
 15  PARALLEL;
Table created.
SQL> INSERT INTO trades_staging
  2  SELECT object_id                                       AS trade_id
  3  ,      ROW_NUMBER() OVER
  4            (PARTITION BY object_id ORDER BY 1)          AS leg_no
  5  ,      TRUNC(SYSDATE)-1                                AS valuation_date
  6  ,      SUBSTR(object_type,1,10)                        AS product_type
  7  ,      owner                                           AS counterparty
  8  ,      TO_TIMESTAMP(timestamp,'YYYY-MM-DD:HH24:MI:SS') AS trade_timestamp
  9  ,      object_type                                     AS trading_book
 10  ,      created + MOD(object_id,500)                    AS maturity_date
 11  ,      ABS(DBMS_RANDOM.RANDOM)                         AS trade_amount
 12  ,      'GBP'                                           AS currency
 13  ,      DBMS_RANDOM.VALUE                               AS trade_price
 14  ,      ABS(DBMS_RANDOM.RANDOM)                         AS market_value
 15  FROM   all_objects
 16  ,     (SELECT NULL FROM all_objects WHERE ROWNUM <= 2)
 17  WHERE  object_id IS NOT NULL;
99716 rows created.
The source data is manufactured in such a way that there are two legs and valuations for every trade. Because of the denormalised nature of the source data, we will have duplicated TRADES data denormalised on each record-pairing. As with the scenario described in the introduction, we will need to transform and load this source data into each of our three trading tables.
type hierarchy
Pipelined functions require two types: one to define a output record structure and one to buffer a collection of this structure. Usually, we create a single object type and corresponding collection type for this purpose. As described earlier, however, we are going to use type polymorphism to enable us to pass around record types of different structures under the guise of a single object type. To do this, we need to create a type hierarchy. Our hierarchy will comprise the following:
a single supertype, containing a single attribute that each subtype will inherit. This will be a "generic" type, used to define parameters and variables in our PL/SQL APIs; and
three subtypes, each matching one of the structures of our sample trading tables (minus the supertype attribute). Because these are defined as subtypes, they can be used in place of the supertype (i.e. substitute for the supertype).
We will begin by creating our "generic" supertype as follows. This will be the object type that the pipelined function will stream from the PIPE ROW statements.
SQL> CREATE TYPE transaction_ot AS OBJECT
  2  ( transaction_id NUMBER
  3  )
  4  NOT FINAL;
  5  /
Type created.
Note that we have declared this as NOT FINAL to indicate that we are creating a hierarchy that will be implemented through subtypes. We have included a single attribute of TRANSACTION_ID to represent a generic primary key attribute that may or may not be extended in the subtypes (in our trading model, we have a TRADE_ID as the primary transaction key). We could also define this as NOT INSTANTIABLE if we wished to ensure that no direct instances of the supertype were coded into our programs.
This object type defines the structure of a single record returned by our pipelined function. As with all pipelined function implementations, we must create a corresponding collection type, as follows.
SQL> CREATE OR REPLACE TYPE transaction_ntt
  2     AS TABLE OF transaction_ot;
  3  /
Type created.
In "regular" pipelined function implementations, we would be ready to code the function at this point. However, such a function would only pipe out arrays of a single record structure containing a single attribute (TRANSACTION_ID). We want to be able to pipe back multiple record structures from the function, hence we need a type hierarchy. To complete this hierarchy, we will create three further object types: one for each of our target trading tables described earlier. We will begin by creating a subtype for TRADES records as follows. Note that the TRADE_ID column will be accounted for by the supertype's generic TRANSACTION_ID attribute.
SQL> CREATE TYPE trade_ot UNDER transaction_ot
  2  ( product_type     VARCHAR2(10)
  3  , counterparty     VARCHAR2(30)
  4  , trade_timestamp  TIMESTAMP
  5  , trading_book     VARCHAR2(30)
  6  , maturity_date    DATE
  7  , CONSTRUCTOR FUNCTION trade_ot
  8       RETURN SELF AS RESULT
  9  )
 10  FINAL;
 11  /
Type created.
Note that we have defined this as a subtype of our generic supertype using the UNDER syntax. Note also that we have declared a non-default constructor function. This is purely for convenience later, when we will be able to initialise an instance of this type without having to pass a value for each attribute in the type.

--关于constructor function可以参考如下解释

Constructors and Type Evolution
The attribute value constructor function saves you the trouble of defining your own constructors for a type. However, with an attribute-value constructor, you must supply a value for every attribute declared in the type. Otherwise the constructor call will fail to compile.
This requirement of an attribute-value constructor can create a problem if you evolve the type later on—by adding an attribute, for example. When you change the attributes of a type, the type's attribute-value constructor changes, too. If you add an attribute, the updated attribute-value constructor expects a value for the new attribute as well as the old ones. As a result, all the attribute-value constructor calls in your existing code, where values for only the old number of attributes are supplied, will fail to compile.
Advantages of User-Defined Constructors
User-defined constructors avoid the problem with the attribute-value constructor because user-defined constructors do not need to explicitly set a value for every attribute of a type. A user-defined constructor can have any number of arguments, of any type, and these do not need to map directly to type attributes. In your definition of the constructor, you can initialize the attributes to any appropriate values. Any attributes for which you do not supply values are initialized by the system to NULL.
If you evolve a type—for example, by adding an attribute—calls to user-defined constructors for the type do not need to be changed. User-defined constructors, like ordinary methods, are not automatically modified when the type evolves, so the call signature of a user-defined constructor remains the same. You may, however, need to change the definition of the constructor if you do not want the new attribute to be initialized to NULL.
http://docs.oracle.com/cd/B28359_01/appdev.111/b28371/adobjadv.htm#CHDJCFAG

We therefore need a type body, as follows.
SQL> CREATE TYPE BODY trade_ot AS
  2     CONSTRUCTOR FUNCTION trade_ot
  3        RETURN SELF AS RESULT IS
  4     BEGIN
  5        RETURN;
  6     END;
  7  END;
  8  /
Type body created.
We will now continue the pattern and complete our type hierarchy by creating the remaining types.
SQL> CREATE TYPE trade_leg_ot UNDER transaction_ot
  2  ( leg_no             NUMBER
  3  , trade_amount       NUMBER
  4  , currency           VARCHAR2(3)
  5  , trade_price        NUMBER
  6  , CONSTRUCTOR FUNCTION trade_leg_ot
  7       RETURN SELF AS RESULT
  8  )
  9  FINAL;
 10  /
Type created.
SQL> CREATE TYPE BODY trade_leg_ot AS
  2     CONSTRUCTOR FUNCTION trade_leg_ot
  3        RETURN SELF AS RESULT IS
  4     BEGIN
  5        RETURN;
  6     END;
  7  END;
  8  /
Type body created.
SQL> CREATE TYPE leg_valuation_ot UNDER transaction_ot
  2  ( leg_no           NUMBER
  3  , valuation_date   DATE
  4  , market_value     NUMBER
  5  , CONSTRUCTOR FUNCTION leg_valuation_ot
  6       RETURN SELF AS RESULT
  7  )
  8  FINAL;
  9  /
Type created.
SQL> CREATE TYPE BODY leg_valuation_ot AS
  2     CONSTRUCTOR FUNCTION leg_valuation_ot
  3        RETURN SELF AS RESULT IS
  4     BEGIN
  5        RETURN;
  6     END;
  7  END;
  8  /
Type body created.
creating the pipelined function
To recap, therefore, we have a single set of source data and three target tables to be loaded from this source data. We have described this data via a type hierarchy that we will now use in our pipelined function implementation. We will begin with a package specification, as follows.
SQL> CREATE PACKAGE trades_load AS
  2     FUNCTION trades_transform( p_source_data IN SYS_REFCURSOR )
  3        RETURN transaction_ntt
  4        PIPELINED
  5        PARALLEL_ENABLE (PARTITION p_source_data BY ANY);
  6     PROCEDURE load_trades;
  7   END trades_load;
  8 /
Package created.
Note that our package has two public programs: our pipelined function (defined as parallel-enabled) and a procedure to invoke the load itself. Note how our pipelined function returns the generic collection type as described earlier. That is, it will return multiple instances of the TRANSACTION_OT type or any other type that is allowed to substitute for it (i.e. any of our three subtypes). This is the critical point and it highlights the flexibility we achieve by using type polymorphism in this way.
We will now implement our pipelined function by creating the package body. Note that the loading procedure is stubbed at this stage to minimise the code listing.
SQL> CREATE PACKAGE BODY trades_load AS
  2
  3     ------------------------------------------------------------
  4
  5     FUNCTION trades_transform( p_source_data IN SYS_REFCURSOR )
  6        RETURN transaction_ntt
  7        PIPELINED
  8        PARALLEL_ENABLE (PARTITION p_source_data BY ANY) IS
  9
 10        /* Array of input record type... */
 11        TYPE aat_source_data IS TABLE OF trades_staging%ROWTYPE
 12           INDEX BY PLS_INTEGER;
 13        aa_source_data aat_source_data;
 14        r_source_data  trades_staging%ROWTYPE;
 15
 16        /* Output record types... */
 17        r_trade         trade_ot         := trade_ot();
 18        r_trade_leg     trade_leg_ot     := trade_leg_ot();
 19        r_leg_valuation leg_valuation_ot := leg_valuation_ot();
 20
 21     BEGIN
 22
 23        LOOP
 24
 25           FETCH p_source_data BULK COLLECT INTO aa_source_data LIMIT 100;
 26
 27           FOR i IN 1 .. aa_source_data.COUNT LOOP
 28
 29              /* Work with a single record... */
 30              r_source_data := aa_source_data(i);
 31
 32              /* Reset the variables... */
 33              r_trade         := trade_ot();
 34              r_trade_leg     := trade_leg_ot();
 35              r_leg_valuation := leg_valuation_ot();
 36
 37              /* Prepare and pipe the trade record... */
 38              IF r_source_data.leg_no = 1 THEN
 39
 40                 r_trade.transaction_id  := r_source_data.trade_id;
 41                 r_trade.product_type    := r_source_data.product_type;
 42                 r_trade.counterparty    := r_source_data.counterparty;
 43                 r_trade.trading_book    := r_source_data.trading_book;
 44                 r_trade.trade_timestamp := r_source_data.trade_timestamp;
 45                 r_trade.maturity_date   := r_source_data.maturity_date;
 46
 47                 PIPE ROW (r_trade);
 48
 49              END IF;
 50
 51              /* Prepare and pipe the trade_leg record... */
 52              r_trade_leg.transaction_id := r_source_data.trade_id;
 53              r_trade_leg.leg_no         := r_source_data.leg_no;
 54              r_trade_leg.trade_amount   := r_source_data.trade_amount;
 55              r_trade_leg.currency       := r_source_data.currency;
 56              r_trade_leg.trade_price    := r_source_data.trade_price;
 57
 58              PIPE ROW (r_trade_leg);
 59
 60              /* Prepare and pipe the leg_valuation record... */
 61              r_leg_valuation.transaction_id := r_source_data.trade_id;
 62              r_leg_valuation.leg_no         := r_source_data.leg_no;
 63              r_leg_valuation.valuation_date := r_source_data.valuation_date;
 64              r_leg_valuation.market_value   := r_source_data.market_value;
 65
 66              PIPE ROW (r_leg_valuation);
 67
 68           END LOOP;
 69
 70           EXIT WHEN p_source_data%NOTFOUND;
 71
 72        END LOOP;
 73        CLOSE p_source_data;
 74
 75        RETURN;
 76
 77     END trades_transform;
 78
 79     ------------------------------------------------------------
 80
 81     PROCEDURE load_trades IS
 82     BEGIN
 83        NULL;
 84     END load_trades;
 85
 86  END trades_load;
 87  /
Package body created.
We now have a pipelined function that returns three different record structures as substitutes for the supertype. Note in particular the following:
    Lines 17-19: we have three different record variables to be populated and returned from each staging row;
    Lines 33-35: prior to processing each source row, we reset each record by invoking the non-default type constructors we created earlier;
    Lines 38-50: we only want to pipe one trade record per trade leg pairing, hence the LEG_NO test. We assign the TRADES "record" and pipe it;
    Lines 52-58: we prepare and pipe a TRADE_LEGS record for each source record;
    Lines 61-66: we prepare and pipe a LEG_VALUATIONS record for each source record.
For performance reasons, we have defined the pipelined function as parallel-enabled and are using bulk fetches from the cursor in small array sizes. A critical point to note is that for clarity this example deliberately excludes any of the complex transformations that would necessitate a PL/SQL solution. Given the format above, it would of course be much more simple and efficient to use bulk SQL loading. The need for a PL/SQL approach is therefore assumed as a necessity.
loading from the pipelined function
We will now use our pipelined function. We will begin by demonstrating how we query the function, starting with a simple SQL statement as follows.
SQL> SELECT *
  2  FROM   TABLE(
  3            trades_load.trades_transform(
  4               CURSOR( SELECT * FROM trades_staging ) ) )
  5  WHERE  ROWNUM <= 5;
TRANSACTION_ID
--------------
         14636
         14636
         14637
         14637
         14637
5 rows selected.
First, note the syntax. We are passing in a ref cursor parameter to the pipelined function because this is a pre-requisite to enable parallel execution. Second, note how we use "SELECT *" but only receive a single column back. Remember that the pipelined function is based on our TRANSACTION_OT type (and TRANSACTION_NTT collection type). This type only contains a single attribute, so what we see above is semantically correct, even though we have piped rows of a different structure.
The reason (and solution) is simple. When using type substitution, Oracle does not downcast a supertype into its correct subtype unless we tell it to.

--oracle不会自动寻找子类的值

We do this in two stages. First, we must retrieve the actual object instances from the function and not the individual attributes. In the first example, we tried to access the attributes using "SELECT *". We retrieve the actual object instances by using the VALUE function, as follows.
SQL> SELECT VALUE(nt)
  2  FROM   TABLE(
  3            trades_load.trades_transform(
  4               CURSOR( SELECT * FROM trades_staging ) ) ) nt
  5  WHERE  ROWNUM <= 5;
VALUE(NT)(TRANSACTION_ID)
-----------------------------------------------------------------------------------------
TRADE_LEG_OT(14636, 2, 386190879, 'GBP', .724850851)
LEG_VALUATION_OT(14636, 2, '12-AUG-07', 2096427733)
TRADE_OT(14637, 'SYNONYM', 'PUBLIC', '30-AUG-05 14.16.38.000000', 'SYNONYM', '14-JAN-06')
TRADE_LEG_OT(14637, 1, 292552620, 'GBP', .555342993)
LEG_VALUATION_OT(14637, 1, '12-AUG-07', 670904643)
5 rows selected.
We can now see for the first time that we have a truly flexible pipelined function! In the output above, we have three different structures being returned from the function. We can take this a stage further and decode the type of each object instance using IS OF conditions, as follows.
SQL> SELECT VALUE(nt) AS record_value
  2  ,      CASE
  3            WHEN VALUE(nt) IS OF TYPE (trade_ot)
  4            THEN 'TRADES'
  5            WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
  6            THEN 'TRADE_LEGS'
  7            ELSE 'LEG_VALUATIONS'
  8         END AS record_type
  9  FROM   TABLE(
 10            trades_load.trades_transform(
 11               CURSOR( SELECT * FROM trades_staging ) ) ) nt
 12  WHERE  ROWNUM <= 5;
RECORD_VALUE(TRANSACTION_ID)                                           RECORD_TYPE
---------------------------------------------------------------------- --------------------
TRADE_LEG_OT(14636, 2, 386190879, 'GBP', .724850851)                   TRADE_LEGS
LEG_VALUATION_OT(14636, 2, '12-AUG-07', 2096427733)                    LEG_VALUATIONS
TRADE_OT(14637, 'SYNONYM', 'PUBLIC', '30-AUG-05 14.16.38.000000',      TRADES
'SYNONYM', '14-JAN-06')
TRADE_LEG_OT(14637, 1, 292552620, 'GBP', .555342993)                   TRADE_LEGS
LEG_VALUATION_OT(14637, 1, '12-AUG-07', 670904643)                     LEG_VALUATIONS
5 rows selected.
We have now "labelled" each object instance with the table it is eventually going to be loaded into. When we build our multi-table insert statement later, it will be obvious why we have included this CASE expression. To complete the downcasting of the object instances to their correct subtypes, we require a final step. Using the TREAT function, we can attempt to cast each object instance to each of the subtypes, as follows.
SQL> SELECT CASE
  2            WHEN VALUE(nt) IS OF TYPE (trade_ot)
  3            THEN 'TRADES'
  4            WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
  5            THEN 'TRADE_LEGS'
  6            ELSE 'LEG_VALUATIONS'
  7         END AS record_type
  8  ,      TREAT(VALUE(nt) AS trade_ot)         AS trade_rec
  9  ,      TREAT(VALUE(nt) AS trade_leg_ot)     AS trade_leg_rec
 10  ,      TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec
 11  FROM   TABLE(
 12            trades_load.trades_transform(
 13               CURSOR( SELECT * FROM trades_staging ) ) ) nt
 14  WHERE  ROWNUM <= 5;
RECORD_TYPE    TRADE_REC                      TRADE_LEG_REC                  LEG_VALUATION_REC
-------------- ------------------------------ ------------------------------ --------------------------
TRADE_LEGS                                    TRADE_LEG_OT(14636, 2, 3861908
                                              79, 'GBP', .724850851)
LEG_VALUATIONS                                                               LEG_VALUATION_OT(14636, 2,
                                                                             '12-AUG-07', 2096427733)
TRADES         TRADE_OT(14637, 'SYNONYM', 'PU
               BLIC', '30-AUG-05 14.16.38.000
               000', 'SYNONYM', '14-JAN-06')
TRADE_LEGS                                    TRADE_LEG_OT(14637, 1, 2925526
                                              20, 'GBP', .555342993)
LEG_VALUATIONS                                                               LEG_VALUATION_OT(14637, 1,
                                                                             '12-AUG-07', 670904643)
5 rows selected.
Of course, each record returning from the pipelined function is of one subtype only. On each record, therefore, two of the TREAT functions will return NULL and only one will yield the correct subtype. At this stage, however, we have successfully returned multiple record types from a single pipelined function and are now ready to access their respective attributes. We do this as follows.
SQL> SELECT ilv.record_type
  2  ,      ilv.trade_rec.transaction_id         AS trade_id
  3  ,      ilv.trade_rec.product_type           AS product_type
  4  ,      ilv.trade_leg_rec.leg_no             AS leg_no
  5  ,      ilv.leg_valuation_rec.valuation_date AS valuation_date
  6  FROM (
  7        SELECT CASE
  8                  WHEN VALUE(nt) IS OF TYPE (trade_ot)
  9                  THEN 'TRADES'
 10                  WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
 11                  THEN 'TRADE_LEGS'
 12                  ELSE 'LEG_VALUATIONS'
 13               END AS record_type
 14        ,      TREAT(VALUE(nt) AS trade_ot)         AS trade_rec
 15        ,      TREAT(VALUE(nt) AS trade_leg_ot)     AS trade_leg_rec
 16        ,      TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec
 17        FROM   TABLE(
 18                  trades_load.trades_transform(
 19                     CURSOR( SELECT * FROM trades_staging ) ) ) nt
 20        WHERE  ROWNUM <= 5
 21       ) ilv;
RECORD_TYPE            TRADE_ID PRODUCT_TY     LEG_NO VALUATION
-------------------- ---------- ---------- ---------- ---------
TRADE_LEGS                                          2
LEG_VALUATIONS                                        12-AUG-07
TRADES                    14637 SYNONYM
TRADE_LEGS                                          1
LEG_VALUATIONS                                        12-AUG-07
5 rows selected.
This shows a small sample of the available attributes, but we now have everything we need for our multi-table load. A restriction of multi-table insert is that we cannot carry object instances up to the VALUES clauses on the inserts. Hence we must decompose the objects in the SELECT section, as we see above. Given this, we will now add the LOAD_TRADES procedure to our package body. Note that the pipelined function code is omitted for brevity.
SQL> CREATE OR REPLACE PACKAGE BODY trades_load AS
  2
  3     ------------------------------------------------------------
  4
  5     FUNCTION trades_transform( ...snip...
 78
 79     ------------------------------------------------------------
 80
 81     PROCEDURE load_trades IS
 82     BEGIN
 83
 84        INSERT FIRST
 85           WHEN record_type = 'TRADES'
 86           THEN
 87              INTO trades ( trade_id
 88                          , product_type
 89                          , counterparty
 90                          , trade_timestamp
 91                          , trading_book
 92                          , maturity_date
 93                          )
 94              VALUES ( trade_id
 95                     , product_type
 96                     , counterparty
 97                     , trade_timestamp
 98                     , trading_book
 99                     , maturity_date
100                     )
101           WHEN record_type = 'TRADE_LEGS'
102           THEN
103              INTO trade_legs ( trade_id
104                              , leg_no
105                              , trade_amount
106                              , currency
107                              , trade_price
108                              )
109              VALUES ( trade_id
110                     , leg_no
111                     , trade_amount
112                     , currency
113                     , trade_price
114                     )
115           WHEN record_type = 'LEG_VALUATIONS'
116           THEN
117              INTO leg_valuations ( trade_id
118                                  , leg_no
119                                  , valuation_date
120                                  , market_value
121                                  )
122              VALUES ( trade_id
123                     , leg_no
124                     , valuation_date
125                     , market_value
126                     )
127        SELECT ilv.record_type
128        ,      COALESCE(
129                  ilv.trade_rec.transaction_id,
130                  ilv.trade_leg_rec.transaction_id,
131                  ilv.leg_valuation_rec.transaction_id
132                  )                                    AS trade_id
133        ,      COALESCE(
134                  ilv.trade_leg_rec.leg_no,
135                  ilv.leg_valuation_rec.leg_no
136                  )                                    AS leg_no
137        ,      ilv.trade_rec.product_type              AS product_type
138        ,      ilv.trade_rec.counterparty              AS counterparty
139        ,      ilv.trade_rec.trade_timestamp           AS trade_timestamp
140        ,      ilv.trade_rec.trading_book              AS trading_book
141        ,      ilv.trade_rec.maturity_date             AS maturity_date
142        ,      ilv.trade_leg_rec.trade_amount          AS trade_amount
143        ,      ilv.trade_leg_rec.currency              AS currency
144        ,      ilv.trade_leg_rec.trade_price           AS trade_price
145        ,      ilv.leg_valuation_rec.valuation_date    AS valuation_date
146        ,      ilv.leg_valuation_rec.market_value      AS market_value
147        FROM (
148              SELECT CASE
149                        WHEN VALUE(nt) IS OF TYPE (trade_ot)
150                        THEN 'TRADES'
151                        WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
152                        THEN 'TRADE_LEGS'
153                        ELSE 'LEG_VALUATIONS'
154                     END AS record_type
155              ,      TREAT(VALUE(nt) AS trade_ot)         AS trade_rec
156              ,      TREAT(VALUE(nt) AS trade_leg_ot)     AS trade_leg_rec
157              ,      TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec
158              FROM   TABLE(
159                        trades_load.trades_transform(
160                           CURSOR( SELECT * FROM trades_staging ) ) ) nt
161             ) ilv;
162
163        DBMS_OUTPUT.PUT_LINE( SQL%ROWCOUNT || ' rows inserted.' );
164
165     END load_trades;
166
167  END trades_load;
168  /
Package body created.
Our load is quite simple. We select the attributes from each record as described in earlier examples and label each row with its target table using a CASE expression. This "label" is used in the WHEN clauses of the INSERT FIRST section to determine which table the record represents and we insert accordingly. We now have a multi-table load using a pipelined function and type substitution.
testing the loads
We are now ready to test our loads. Remember that we enabled parallel at both table and function level. In multi-table inserts, the entire DML section is parallel-enabled if just one of the target tables is set to parallel. We will therefore enable parallel query and DML in our session as follows.
SQL> ALTER SESSION ENABLE PARALLEL QUERY;
Session altered.
SQL> ALTER SESSION ENABLE PARALLEL DML;
Session altered.
We will now execute our load, as follows.
SQL> exec trades_load.load_trades;
BEGIN trades_load.load_trades; END;
*
ERROR at line 1:
ORA-02291: integrity constraint (SCOTT.TRADE_LEGS_FK01) violated - parent key not found
ORA-06512: at "SCOTT.TRADES_LOAD", line 72
ORA-06512: at line 1
This is disappointing! Oracle has tried to load a TRADE_LEGS record before its parent TRADES record is present. The reason for this is simple: multi-table inserts do not guarantee order of inserts and we are seeing evidence of this. We might think that we can simply force the ordering of the input data, but this does not change the fact that Oracle has decided to load TRADE_LEGS before TRADES. In the sample code file (see bottom of article for download details) another version of the TRADES_LOAD package is included. This is the same as the above with the following additions, all added in the vain attempt to ensure that the INSERT FIRST section receives data in the order in which we need it to be loaded.
    an ORDER BY on the select from the pipelined function, to ensure that records are ordered in TRADES->TRADE_LEGS->LEG_VALUATIONS order; plus
    an ORDER BY on the CURSOR expression over the TRADES_STAGING table to ensure records are input into the pipelined function in TRADE_ID and LEG_NO order; plus
    a streaming clause on the pipelined function itself, which enables us to control the order in which data is piped to the consumer.
Despite these three combined attempts at "brute force" ordering, we receive the same error message as above. The issue lies with the multi-table insert. We must therefore workaround this problem in one of two ways:
    temporarily disable the foreign key constraints; or
    use deferrable foreign key constraints.
We will test both of these workarounds below.
disable/enable constraints
To ensure we can load all of our tables, we will disable the foreign key constraints, as follows.
SQL> ALTER TABLE trade_legs DISABLE CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations DISABLE CONSTRAINT leg_valuations_fk01;
Table altered.
We will now test our loading procedure again.
SQL> exec trades_load.load_trades;
249290 rows inserted.
PL/SQL procedure successfully completed.
SQL> COMMIT;
Commit complete.
We have successfully loaded our tables. As a quick sanity-check, we will count the records we loaded, as follows.
SQL> SELECT COUNT(*) FROM trades;
  COUNT(*)
----------
     49858
1 row selected.
SQL> SELECT COUNT(*) FROM trade_legs;
  COUNT(*)
----------
     99716
1 row selected.
SQL> SELECT COUNT(*) FROM leg_valuations;
  COUNT(*)
----------
     99716
1 row selected.
We will now enable our foreign key constraints, which will raise an exception if there are any data issues.
SQL> ALTER TABLE trade_legs ENABLE CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations ENABLE CONSTRAINT leg_valuations_fk01;
Table altered.
Success! Note that this method is possible for serialised systems, but for concurrent applications, it might not be suitable. As an alternative, we can use deferrable constraints, as demonstrated below.
deferrable constraints
Deferred constraints enable us to postpone the checking of constraint violations until the end of a transaction (i.e. a COMMIT). This enables us to load data in a state that temporarily violates one or more constraints, but which is rectified at a later stage. The deferrable property of constraints can only be set at the time of creation. We will therefore drop and re-create our two foreign keys as follows.
SQL> ALTER TABLE trade_legs DROP CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations DROP CONSTRAINT leg_valuations_fk01;
Table altered.
SQL> ALTER TABLE trade_legs ADD
  2     CONSTRAINT trade_legs_fk01
  3     FOREIGN KEY (trade_id)
  4     REFERENCES trades (trade_id)
  5     DEFERRABLE;
Table altered.
SQL> ALTER TABLE leg_valuations ADD
  2     CONSTRAINT leg_valuations_fk01
  3     FOREIGN KEY (trade_id, leg_no)
  4     REFERENCES trade_legs (trade_id, leg_no)
  5     DEFERRABLE;
Table altered.
Our foreign keys are now deferrable. To exploit this, we must set their states to deferred, as follows.
SQL> SET CONSTRAINT trade_legs_fk01 DEFERRED;
Constraint set.
SQL> SET CONSTRAINT leg_valuations_fk01 DEFERRED;
Constraint set.
Having cleared the sample tables of the data loaded in the previous example, we will re-run our loading procedure.
SQL> exec trades_load.load_trades;
249305 rows inserted.
PL/SQL procedure successfully completed.
Again, we manage to load the three tables successfully but we haven't validated the data yet. Deferred constraints are not checked until the end of a transaction, so we will now COMMIT our load.
SQL> COMMIT;
Commit complete.
Success! We will now check a small sample of the data we loaded.
SQL> SELECT * FROM trades WHERE ROWNUM <= 5;
  TRADE_ID PRODUCT_TYPE COUNTERPARTY  TRADE_TIMESTAMP            MATURITY_DATE
---------- ------------ ------------- -------------------------- -------------
      6774 TYPE         SYS           30-AUG-05 14.02.47.000000  31-MAY-06
      6778 VIEW         SYS           30-AUG-05 14.02.47.000000  04-JUN-06
      6782 VIEW         SYS           30-AUG-05 14.02.48.000000  08-JUN-06
      6786 VIEW         SYS           30-AUG-05 14.02.48.000000  12-JUN-06
      6790 TYPE         SYS           30-AUG-05 14.02.49.000000  16-JUN-06
5 rows selected.
SQL> SELECT * FROM trade_legs WHERE ROWNUM <= 5;
  TRADE_ID     LEG_NO TRADE_AMOUNT CURRENCY TRADE_PRICE
---------- ---------- ------------ -------- -----------
      6774          1   1624901166 GBP       .347733816
      6776          2    524881873 GBP       .904404062
      6778          1    622715309 GBP       .608247575
      6780          2    821449852 GBP       .508567497
      6782          1   1623359117 GBP       .674977682
5 rows selected.
SQL> SELECT * FROM leg_valuations WHERE ROWNUM <= 5;
  TRADE_ID     LEG_NO VALUATION_DATE MARKET_VALUE
---------- ---------- -------------- ------------
      2545          2 14-AUG-07        1042583556
      2546          1 14-AUG-07        1098505446
      2547          1 14-AUG-07          49145215
      2548          2 14-AUG-07          97502618
      2549          2 14-AUG-07         127231786
5 rows selected.
The primary advantage of this method over the disable/enable method is that concurrency is not affected by the constraint states of a single session. We can therefore adopt this method in a multi-user application that is likely to load the same target tables concurrently.
an alternative approach to consider
In this article, we have demonstrated a powerful new technique, combining three unique features (multi-table insert, parallel pipelined functions and type substitution) to extend the ETL potential of Oracle. As a final note, however, there is a simple alternative that some readers might wish to explore. Remember that pipelined functions are usually defined by a single object type and collection of this type. We can easily remove the type hierarchy from our example but still load multiple tables from a single function. We would instead develop the following:
    a single object type that contains all of the attributes needed for all target tables. In our example, we would have a single denormalised type containing all of the attributes in TRADES, TRADE_LEGS and LEG_VALUATIONS;
    a collection of this single object type as usual;
    a pipelined function to prepare and pipe each output record with all denormalised attributes populated; and
    a multi-table insert that loads the attributes into their respective tables.
With reference to our trading examples, the alternative multi-table insert would be based on the following pseudo-code.
INSERT ALL
   WHEN leg_no = 1
   THEN
      INTO trades VALUES (...)
   WHEN 1=1
   THEN
      INTO trade_legs VALUES (...)
   WHEN 1=1
   THEN
      INTO leg_valuations VALUES (...)
SELECT trade_id
,      leg_no
,      ...all other attributes of TRADES, TRADE_LEGS and LEG_VALUATIONS...
FROM   TABLE(
          trades_load.trades_transform(
             CURSOR( SELECT * FROM trades_staging ) ) );
Completing this can be an exercise for the reader. The advantages of this method are that fewer records are piped from the function (i.e. several tables are loaded from each record) and the SELECT is much simpler (no type semantics are required). The disadvantage is that the records can become very wide and at the time of writing, pipelined functions perform badly with wide record structures (over 50 attributes).
further reading
--修正後的procedure代碼

 PROCEDURE load_trades IS
   BEGIN
      INSERT FIRST
         WHEN record_type = 'TRADES'
         THEN
            INTO trades ( trade_id
                        , product_type
                        , counterparty
                        , trade_timestamp
                        , trading_book
                        , maturity_date
                        )
            VALUES ( trade_id
                   , product_type
                   , counterparty
                   , trade_timestamp
                   , trading_book
                   , maturity_date
                   )
         WHEN record_type = 'TRADE_LEGS'
         THEN
            INTO trade_legs ( trade_id
                            , leg_no
                            , trade_amount
                            , currency
                            , trade_price
                            )
            VALUES ( trade_id
                   , leg_no
                   , trade_amount
                   , currency
                   , trade_price
                   )
         WHEN record_type = 'LEG_VALUATIONS'
         THEN
            INTO leg_valuations ( trade_id
                                , leg_no
                                , valuation_date
                                , market_value
                                )
            VALUES ( trade_id
                   , leg_no
                   , valuation_date
                   , market_value
                   )
      SELECT ilv.record_type
      ,      COALESCE(
                ilv.trade_rec.transaction_id,
                ilv.trade_leg_rec.transaction_id,
                ilv.leg_valuation_rec.transaction_id
                )                                    AS trade_id
      ,      COALESCE(
                ilv.trade_leg_rec.leg_no,
                ilv.leg_valuation_rec.leg_no
                )                                    AS leg_no
      ,      ilv.trade_rec.product_type              AS product_type
      ,      ilv.trade_rec.counterparty              AS counterparty
      ,      ilv.trade_rec.trade_timestamp           AS trade_timestamp
      ,      ilv.trade_rec.trading_book              AS trading_book
      ,      ilv.trade_rec.maturity_date             AS maturity_date
      ,      ilv.trade_leg_rec.trade_amount          AS trade_amount
      ,      ilv.trade_leg_rec.currency              AS currency
      ,      ilv.trade_leg_rec.trade_price           AS trade_price
      ,      ilv.leg_valuation_rec.valuation_date    AS valuation_date
      ,      ilv.leg_valuation_rec.market_value      AS market_value
      FROM (
            SELECT CASE
                      WHEN VALUE(nt) IS OF TYPE (trade_ot)
                      THEN 'TRADES'
                      WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
                      THEN 'TRADE_LEGS'
                      ELSE 'LEG_VALUATIONS'
                   END AS record_type
            ,      TREAT(VALUE(nt) AS trade_ot)         AS trade_rec
            ,      TREAT(VALUE(nt) AS trade_leg_ot)     AS trade_leg_rec
            ,      TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec
            FROM   TABLE(
                      trades_load.trades_transform(
                         CURSOR( SELECT * FROM trades_staging ORDER BY trade_id, leg_no) ) ) nt
           ) ilv
      ORDER BY
            COALESCE(
               ilv.trade_rec.transaction_id,
               ilv.trade_leg_rec.transaction_id,
               ilv.leg_valuation_rec.transaction_id
               )
      ,     COALESCE(
               ilv.trade_leg_rec.leg_no,
               ilv.leg_valuation_rec.leg_no
               )
      ,     CASE ilv.record_type
               WHEN 'TRADES'
               THEN 0
               WHEN 'TRADE_LEGS'
               THEN 1
               ELSE 2
            END;
      DBMS_OUTPUT.PUT_LINE( SQL%ROWCOUNT || ' rows inserted.' );
   END load_trades;
END trades_load;
/

 

参考至:http://www.oracle-developer.net/display.php?id=420
               http://docs.oracle.com/cd/B28359_01/appdev.111/b28371/adobjadv.htm#CHDJCFAG

本文原创,转载请注明出处、作者

如有错误,欢迎指正

邮箱:czmcj@163.com

0
1
分享到:
评论

相关推荐

    pipeline ADC的设计指南

    Pipeline ADC设计指南 Pipeline ADC是一种常用的模数转换器架构,它通过将模拟信号分解成多个阶段来实现高精度的数字化。下面我们将详细介绍Pipeline ADC的设计指南。 一、Pipeline ADC的基本原理 Pipeline ADC...

    Jenkins Pipeline 部署 SpringBoot 应用的教程详解

    Jenkins Pipeline 部署 SpringBoot 应用详解 本篇文章主要介绍了使用 Jenkins Pipeline 部署 SpringBoot 应用的详细教程,从安装依赖包到环境准备、安装 Jenkins 等步骤进行详细的介绍,对读者学习或工作具有一定的...

    gltf-pipeline-3.0.2.zip

    《gltf-pipeline:构建三维图形的利器》 gltf-pipeline 是一个强大的工具集,主要用于处理和优化基于 glTF(GL Transmission Format)的三维模型数据。glTF 是一种开放标准的三维模型格式,旨在高效、轻量地传输和...

    Doris PipeLine的设计文档.pdf

    Doris PipeLine设计文档 Doris PipeLine设计文档是关于Doris执行引擎的设计文档,旨在解决当前Doris执行引擎中存在的一些问题,如无法充分利用多核计算能力、提升查询性能、手动设置并行度等问题。该设计文档提出了...

    高通QCOM camera Pipeline可视化工具 1.4版本

    高通QCOM camera Pipeline可视化工具 1.4版本

    Jenkins高级篇之Pipeline技巧篇-1-小白搭建Pipeline项目开发环境.rar

    本篇重点讲解的是Jenkins Pipeline的高级技巧,旨在帮助初学者,即"小白",快速搭建并掌握Pipeline项目的开发环境。 一、Jenkins Pipeline基础 Jenkins Pipeline是一种定义和执行CI/CD流程的声明式或脚本化方式。...

    jenkins 构建项目之 pipeline基础教程

    jenkins 构建项目之 pipeline基础教程 jenkins Pipeline 是一种基于工作流框架的自动化构建工具,它可以将原本独立运行于单个或者多个节点的任务连接起来,实现单个任务难以完成的复杂流程编排和可视化的工作。...

    Avalon-MM Pipeline Bridge

    Avalon-MM Pipeline Bridge是Qsys系统中一种重要的互联组件,用于优化基于Avalon-MM接口的SoC(System-on-a-Chip)设计中的数据传输性能。Avalon-MM是Altera公司(现已被Intel收购)开发的一种内存映射协议,广泛...

    Netty测试代码,尤其对于Pipeline顺序

    在本文中,我们将深入探讨Netty中的关键概念,尤其是Pipeline(管道)的顺序及其工作原理。 首先,让我们理解Netty的核心组件:Server和Client。在Netty中,服务器端通常被称为BossGroup和WorkerGroup,BossGroup...

    D3D render pipeline

    This book describes the Direct3D graphics pipeline, from presentation of scene data to pixels appearing on the screen. The book is organized sequentially following the data °ow through the pipeline ...

    PyPI 官网下载 | bamboo-pipeline-3.6.3.tar.gz

    标题中的"PyPI 官网下载 | bamboo-pipeline-3.6.3.tar.gz"指出这是一个从Python Package Index(PyPI)官方站点获取的软件包,名为“bamboo-pipeline”。这个软件包的版本是3.6.3,并且是以tar.gz格式压缩的。在...

    pipeline ADC

    ### Pipeline ADC概述与关键技术 #### 一、Pipeline ADC的基本概念 Pipeline ADC(流水线模数转换器)是一种高性能、高速度的模数转换技术,在现代通信系统、雷达系统及信号处理领域发挥着至关重要的作用。它能够...

    构建机器学习Pipeline

    构建机器学习Pipeline,也就是构建机器学习流程线,是数据科学和软件工程领域中的一个重要议题。在现实世界中,数据科学家通常在一个为统计和机器学习量身定做的开发环境中工作,例如使用Python等工具,在一个“沙盒...

    Android代码-pipeline

    Hosted Community Edition - Try It Now! ...Email: help@pipeline.ai Web: https://support.pipeline.ai YouTube: https://youtube.pipeline.ai Slideshare: https://slideshare.pipeline.ai Work

    jenkins-ci-pipeline-源码.rar

    Jenkins CI Pipeline 是一个强大的自动化工具,用于构建、测试和部署软件。Pipeline 允许开发者定义持续集成(CI)和持续部署(CD)的工作流程,这些工作流程是声明式的,可编写为版本化的代码,提高了可重复性和可...

    Jenkins高级篇之Pipeline技巧篇-5-pipeline中如何代码串联多个job的执行.rar

    在Jenkins自动化构建和部署的过程中,Pipeline作为一种强大的脚本化工具,可以让我们更灵活地管理持续集成和持续交付流程。本资料重点讲解了在Pipeline中如何通过代码串联多个job的执行,这对于构建复杂的CI/CD流程...

    pipeline studio快速入门

    pipeline studio快速入门,确实容易上手

    redispipeLine

    Redis Pipeline 是 Redis 数据库操作中的一个高效特性,它允许客户端一次性发送多个命令到服务器,而无需等待每个命令的响应。这种技术显著提高了批量处理和高并发环境下的性能,因为减少了网络通信的开销。 首先,...

    jenkins-common-pipeline:具有共享库的Jenkins通用管道

    共享库是Jenkins Pipeline的一个强大功能,它允许开发者将常用的Pipeline代码片段抽象出来,存储在版本控制系统(如Git)中,并在多个Pipeline脚本中复用。这样做的好处在于减少代码重复,提高代码质量,同时也便于...

    jenkins流水线任务pipeline使用JaCoCo

    https://stackoverflow.com/questions/41893846/jacoco-coverage-in-jenkins-pipeline jenkins官网介绍:https://jenkins.io/doc/pipeline/steps/jacoco/ 流水线语法的片段生成器可以选择jacoco,设置jacoco jacoco...

Global site tag (gtag.js) - Google Analytics