flexible pipelined functions
Pipelined functions, type polymorphism (type hierarchies and substitution) and multi-table inserts are three features from the Oracle 9i timeframe. Pipelined functions are used primarily in ETL applications (extract, transform and load) for efficient processing of complex transformations. Type substitution is used for modelling complex data types and relationships in more object-oriented applications. Multi-table inserts (another ETL feature of Oracle) are used to load one or more tables from a single rowsource.
In this article we will combine these features by using type polymorphism to return multiple record structures from a single pipelined function. We will load the resulting dataset into multiple tables using multi-table insert.
It is assumed that readers are familiar with the technologies being described in this article.
why is this technique significant
This technique is significant on two counts.
First, it is usually the case that pipelined functions will return a single, known record structure (unless using complex ANYDATASET structures made available by Oracle's Data Cartridge). The Oracle documentation and other articles often show pipelined functions transforming a single input record into two output records, but always of the same structure. This article will show how single input records can be transformed into multiple output rows of different structures.
--可以返回不同结构的type
Second, this technique solves a very particular, but common, performance problem. It is quite common in traditional batch environments (such as data warehouses) to receive source data in flat-files. It is equally common that some of these files contain data for more than one target table in the data warehouse. Assuming that the data transformations that must take place are complex (i.e. too complex for SQL), a typical loading approach is as follows (in pseudo-code).
--解决了数据源进行多次转换和导入到性能问题
FOR rec IN (SELECT * FROM source_data) LOOP
...prepare table_A variables...
...prepare table_B variables...
...prepare table_C variables...
INSERT INTO table_A VALUES (...);
INSERT INTO table_B VALUES (...);
INSERT INTO table_C VALUES (...);
END LOOP;
This is a simple process to code and understand, yet it is inefficient and slow. Using the techniques we will describe in this article, we will combine the efficiency of bulk SQL, parallel pipelined functions and substitutable types to change this load to something that resembles the following pseudo-code.
INSERT FIRST
--
WHEN (record is of table_A format)
INTO table_A VALUES (...)
--
WHEN (record is of table_B format)
INTO table_B VALUES (...)
--
WHEN (record is of table_C format)
INTO table_C VALUES (...)
--
SELECT ...
FROM TABLE(parallel_pipelined_function(CURSOR(...)));
setup
We will begin by setting up our sample application. We are going use a simplified investment trading model for our demonstrations. This will include:
TRADES: this will store information on a trade that takes place between two counterparties;
TRADE_LEGS: child of TRADES. A single trade might comprise multiple legs (e.g. a swap trade). In our application, we are going to ensure that every trade has two legs; and
LEG_VALUATIONS: child of TRADE_LEGS. Each leg of a trade will be valued every day that it is active (i.e. not matured).
target tables
Given this simple model, we can now create our target tables. Note that all tables are defined with PARALLEL because we will exploit this later when we come to using our pipelined function.
SQL> CREATE TABLE trades
2 ( trade_id NUMBER
3 , product_type VARCHAR2(10)
4 , counterparty VARCHAR2(30)
5 , trade_timestamp TIMESTAMP
6 , trading_book VARCHAR2(30)
7 , maturity_date DATE
8 , CONSTRAINT trades_pk
9 PRIMARY KEY (trade_id)
10 )
11 PARALLEL;
Table created.
SQL> CREATE TABLE trade_legs
2 ( trade_id NUMBER
3 , leg_no NUMBER
4 , trade_amount NUMBER
5 , currency VARCHAR2(3)
6 , trade_price NUMBER
7 , CONSTRAINT trade_legs_pk
8 PRIMARY KEY (trade_id, leg_no)
9 , CONSTRAINT trade_legs_fk01
10 FOREIGN KEY (trade_id)
11 REFERENCES trades (trade_id)
12 )
13 PARALLEL;
Table created.
SQL> CREATE TABLE leg_valuations
2 ( trade_id NUMBER
3 , leg_no NUMBER
4 , valuation_date DATE
5 , market_value NUMBER
6 , CONSTRAINT leg_valuations_pk
7 PRIMARY KEY (trade_id, leg_no, valuation_date)
8 , CONSTRAINT leg_valuations_fk01
9 FOREIGN KEY (trade_id, leg_no)
10 REFERENCES trade_legs (trade_id, leg_no)
11 )
12 PARALLEL;
Table created.
source data
For our demonstration, we require some source data. This is typically provided in flat-files and loaded into a staging table (or made available directly via an external table). For simplicity, we will create a staging table and populate it with data as follows. Again, this table is defined with PARALLEL.
SQL> CREATE TABLE trades_staging
2 ( trade_id NUMBER
3 , leg_no NUMBER
4 , valuation_date DATE
5 , product_type VARCHAR2(10)
6 , counterparty VARCHAR2(30)
7 , trade_timestamp TIMESTAMP
8 , trading_book VARCHAR2(30)
9 , maturity_date DATE
10 , trade_amount NUMBER
11 , currency VARCHAR2(3)
12 , trade_price NUMBER
13 , market_value NUMBER
14 )
15 PARALLEL;
Table created.
SQL> INSERT INTO trades_staging
2 SELECT object_id AS trade_id
3 , ROW_NUMBER() OVER
4 (PARTITION BY object_id ORDER BY 1) AS leg_no
5 , TRUNC(SYSDATE)-1 AS valuation_date
6 , SUBSTR(object_type,1,10) AS product_type
7 , owner AS counterparty
8 , TO_TIMESTAMP(timestamp,'YYYY-MM-DD:HH24:MI:SS') AS trade_timestamp
9 , object_type AS trading_book
10 , created + MOD(object_id,500) AS maturity_date
11 , ABS(DBMS_RANDOM.RANDOM) AS trade_amount
12 , 'GBP' AS currency
13 , DBMS_RANDOM.VALUE AS trade_price
14 , ABS(DBMS_RANDOM.RANDOM) AS market_value
15 FROM all_objects
16 , (SELECT NULL FROM all_objects WHERE ROWNUM <= 2)
17 WHERE object_id IS NOT NULL;
99716 rows created.
The source data is manufactured in such a way that there are two legs and valuations for every trade. Because of the denormalised nature of the source data, we will have duplicated TRADES data denormalised on each record-pairing. As with the scenario described in the introduction, we will need to transform and load this source data into each of our three trading tables.
type hierarchy
Pipelined functions require two types: one to define a output record structure and one to buffer a collection of this structure. Usually, we create a single object type and corresponding collection type for this purpose. As described earlier, however, we are going to use type polymorphism to enable us to pass around record types of different structures under the guise of a single object type. To do this, we need to create a type hierarchy. Our hierarchy will comprise the following:
a single supertype, containing a single attribute that each subtype will inherit. This will be a "generic" type, used to define parameters and variables in our PL/SQL APIs; and
three subtypes, each matching one of the structures of our sample trading tables (minus the supertype attribute). Because these are defined as subtypes, they can be used in place of the supertype (i.e. substitute for the supertype).
We will begin by creating our "generic" supertype as follows. This will be the object type that the pipelined function will stream from the PIPE ROW statements.
SQL> CREATE TYPE transaction_ot AS OBJECT
2 ( transaction_id NUMBER
3 )
4 NOT FINAL;
5 /
Type created.
Note that we have declared this as NOT FINAL to indicate that we are creating a hierarchy that will be implemented through subtypes. We have included a single attribute of TRANSACTION_ID to represent a generic primary key attribute that may or may not be extended in the subtypes (in our trading model, we have a TRADE_ID as the primary transaction key). We could also define this as NOT INSTANTIABLE if we wished to ensure that no direct instances of the supertype were coded into our programs.
This object type defines the structure of a single record returned by our pipelined function. As with all pipelined function implementations, we must create a corresponding collection type, as follows.
SQL> CREATE OR REPLACE TYPE transaction_ntt
2 AS TABLE OF transaction_ot;
3 /
Type created.
In "regular" pipelined function implementations, we would be ready to code the function at this point. However, such a function would only pipe out arrays of a single record structure containing a single attribute (TRANSACTION_ID). We want to be able to pipe back multiple record structures from the function, hence we need a type hierarchy. To complete this hierarchy, we will create three further object types: one for each of our target trading tables described earlier. We will begin by creating a subtype for TRADES records as follows. Note that the TRADE_ID column will be accounted for by the supertype's generic TRANSACTION_ID attribute.
SQL> CREATE TYPE trade_ot UNDER transaction_ot
2 ( product_type VARCHAR2(10)
3 , counterparty VARCHAR2(30)
4 , trade_timestamp TIMESTAMP
5 , trading_book VARCHAR2(30)
6 , maturity_date DATE
7 , CONSTRUCTOR FUNCTION trade_ot
8 RETURN SELF AS RESULT
9 )
10 FINAL;
11 /
Type created.
Note that we have defined this as a subtype of our generic supertype using the UNDER syntax. Note also that we have declared a non-default constructor function. This is purely for convenience later, when we will be able to initialise an instance of this type without having to pass a value for each attribute in the type.
--关于constructor function可以参考如下解释
Constructors and Type Evolution
The attribute value constructor function saves you the trouble of defining your own constructors for a type. However, with an attribute-value constructor, you must supply a value for every attribute declared in the type. Otherwise the constructor call will fail to compile.
This requirement of an attribute-value constructor can create a problem if you evolve the type later on—by adding an attribute, for example. When you change the attributes of a type, the type's attribute-value constructor changes, too. If you add an attribute, the updated attribute-value constructor expects a value for the new attribute as well as the old ones. As a result, all the attribute-value constructor calls in your existing code, where values for only the old number of attributes are supplied, will fail to compile.
Advantages of User-Defined Constructors
User-defined constructors avoid the problem with the attribute-value constructor because user-defined constructors do not need to explicitly set a value for every attribute of a type. A user-defined constructor can have any number of arguments, of any type, and these do not need to map directly to type attributes. In your definition of the constructor, you can initialize the attributes to any appropriate values. Any attributes for which you do not supply values are initialized by the system to NULL.
If you evolve a type—for example, by adding an attribute—calls to user-defined constructors for the type do not need to be changed. User-defined constructors, like ordinary methods, are not automatically modified when the type evolves, so the call signature of a user-defined constructor remains the same. You may, however, need to change the definition of the constructor if you do not want the new attribute to be initialized to NULL.
http://docs.oracle.com/cd/B28359_01/appdev.111/b28371/adobjadv.htm#CHDJCFAG
We therefore need a type body, as follows.
SQL> CREATE TYPE BODY trade_ot AS
2 CONSTRUCTOR FUNCTION trade_ot
3 RETURN SELF AS RESULT IS
4 BEGIN
5 RETURN;
6 END;
7 END;
8 /
Type body created.
We will now continue the pattern and complete our type hierarchy by creating the remaining types.
SQL> CREATE TYPE trade_leg_ot UNDER transaction_ot
2 ( leg_no NUMBER
3 , trade_amount NUMBER
4 , currency VARCHAR2(3)
5 , trade_price NUMBER
6 , CONSTRUCTOR FUNCTION trade_leg_ot
7 RETURN SELF AS RESULT
8 )
9 FINAL;
10 /
Type created.
SQL> CREATE TYPE BODY trade_leg_ot AS
2 CONSTRUCTOR FUNCTION trade_leg_ot
3 RETURN SELF AS RESULT IS
4 BEGIN
5 RETURN;
6 END;
7 END;
8 /
Type body created.
SQL> CREATE TYPE leg_valuation_ot UNDER transaction_ot
2 ( leg_no NUMBER
3 , valuation_date DATE
4 , market_value NUMBER
5 , CONSTRUCTOR FUNCTION leg_valuation_ot
6 RETURN SELF AS RESULT
7 )
8 FINAL;
9 /
Type created.
SQL> CREATE TYPE BODY leg_valuation_ot AS
2 CONSTRUCTOR FUNCTION leg_valuation_ot
3 RETURN SELF AS RESULT IS
4 BEGIN
5 RETURN;
6 END;
7 END;
8 /
Type body created.
creating the pipelined function
To recap, therefore, we have a single set of source data and three target tables to be loaded from this source data. We have described this data via a type hierarchy that we will now use in our pipelined function implementation. We will begin with a package specification, as follows.
SQL> CREATE PACKAGE trades_load AS
2 FUNCTION trades_transform( p_source_data IN SYS_REFCURSOR )
3 RETURN transaction_ntt
4 PIPELINED
5 PARALLEL_ENABLE (PARTITION p_source_data BY ANY);
6 PROCEDURE load_trades;
7 END trades_load;
8 /
Package created.
Note that our package has two public programs: our pipelined function (defined as parallel-enabled) and a procedure to invoke the load itself. Note how our pipelined function returns the generic collection type as described earlier. That is, it will return multiple instances of the TRANSACTION_OT type or any other type that is allowed to substitute for it (i.e. any of our three subtypes). This is the critical point and it highlights the flexibility we achieve by using type polymorphism in this way.
We will now implement our pipelined function by creating the package body. Note that the loading procedure is stubbed at this stage to minimise the code listing.
SQL> CREATE PACKAGE BODY trades_load AS
2
3 ------------------------------------------------------------
4
5 FUNCTION trades_transform( p_source_data IN SYS_REFCURSOR )
6 RETURN transaction_ntt
7 PIPELINED
8 PARALLEL_ENABLE (PARTITION p_source_data BY ANY) IS
9
10 /* Array of input record type... */
11 TYPE aat_source_data IS TABLE OF trades_staging%ROWTYPE
12 INDEX BY PLS_INTEGER;
13 aa_source_data aat_source_data;
14 r_source_data trades_staging%ROWTYPE;
15
16 /* Output record types... */
17 r_trade trade_ot := trade_ot();
18 r_trade_leg trade_leg_ot := trade_leg_ot();
19 r_leg_valuation leg_valuation_ot := leg_valuation_ot();
20
21 BEGIN
22
23 LOOP
24
25 FETCH p_source_data BULK COLLECT INTO aa_source_data LIMIT 100;
26
27 FOR i IN 1 .. aa_source_data.COUNT LOOP
28
29 /* Work with a single record... */
30 r_source_data := aa_source_data(i);
31
32 /* Reset the variables... */
33 r_trade := trade_ot();
34 r_trade_leg := trade_leg_ot();
35 r_leg_valuation := leg_valuation_ot();
36
37 /* Prepare and pipe the trade record... */
38 IF r_source_data.leg_no = 1 THEN
39
40 r_trade.transaction_id := r_source_data.trade_id;
41 r_trade.product_type := r_source_data.product_type;
42 r_trade.counterparty := r_source_data.counterparty;
43 r_trade.trading_book := r_source_data.trading_book;
44 r_trade.trade_timestamp := r_source_data.trade_timestamp;
45 r_trade.maturity_date := r_source_data.maturity_date;
46
47 PIPE ROW (r_trade);
48
49 END IF;
50
51 /* Prepare and pipe the trade_leg record... */
52 r_trade_leg.transaction_id := r_source_data.trade_id;
53 r_trade_leg.leg_no := r_source_data.leg_no;
54 r_trade_leg.trade_amount := r_source_data.trade_amount;
55 r_trade_leg.currency := r_source_data.currency;
56 r_trade_leg.trade_price := r_source_data.trade_price;
57
58 PIPE ROW (r_trade_leg);
59
60 /* Prepare and pipe the leg_valuation record... */
61 r_leg_valuation.transaction_id := r_source_data.trade_id;
62 r_leg_valuation.leg_no := r_source_data.leg_no;
63 r_leg_valuation.valuation_date := r_source_data.valuation_date;
64 r_leg_valuation.market_value := r_source_data.market_value;
65
66 PIPE ROW (r_leg_valuation);
67
68 END LOOP;
69
70 EXIT WHEN p_source_data%NOTFOUND;
71
72 END LOOP;
73 CLOSE p_source_data;
74
75 RETURN;
76
77 END trades_transform;
78
79 ------------------------------------------------------------
80
81 PROCEDURE load_trades IS
82 BEGIN
83 NULL;
84 END load_trades;
85
86 END trades_load;
87 /
Package body created.
We now have a pipelined function that returns three different record structures as substitutes for the supertype. Note in particular the following:
Lines 17-19: we have three different record variables to be populated and returned from each staging row;
Lines 33-35: prior to processing each source row, we reset each record by invoking the non-default type constructors we created earlier;
Lines 38-50: we only want to pipe one trade record per trade leg pairing, hence the LEG_NO test. We assign the TRADES "record" and pipe it;
Lines 52-58: we prepare and pipe a TRADE_LEGS record for each source record;
Lines 61-66: we prepare and pipe a LEG_VALUATIONS record for each source record.
For performance reasons, we have defined the pipelined function as parallel-enabled and are using bulk fetches from the cursor in small array sizes. A critical point to note is that for clarity this example deliberately excludes any of the complex transformations that would necessitate a PL/SQL solution. Given the format above, it would of course be much more simple and efficient to use bulk SQL loading. The need for a PL/SQL approach is therefore assumed as a necessity.
loading from the pipelined function
We will now use our pipelined function. We will begin by demonstrating how we query the function, starting with a simple SQL statement as follows.
SQL> SELECT *
2 FROM TABLE(
3 trades_load.trades_transform(
4 CURSOR( SELECT * FROM trades_staging ) ) )
5 WHERE ROWNUM <= 5;
TRANSACTION_ID
--------------
14636
14636
14637
14637
14637
5 rows selected.
First, note the syntax. We are passing in a ref cursor parameter to the pipelined function because this is a pre-requisite to enable parallel execution. Second, note how we use "SELECT *" but only receive a single column back. Remember that the pipelined function is based on our TRANSACTION_OT type (and TRANSACTION_NTT collection type). This type only contains a single attribute, so what we see above is semantically correct, even though we have piped rows of a different structure.
The reason (and solution) is simple. When using type substitution, Oracle does not downcast a supertype into its correct subtype unless we tell it to.
--oracle不会自动寻找子类的值
We do this in two stages. First, we must retrieve the actual object instances from the function and not the individual attributes. In the first example, we tried to access the attributes using "SELECT *". We retrieve the actual object instances by using the VALUE function, as follows.
SQL> SELECT VALUE(nt)
2 FROM TABLE(
3 trades_load.trades_transform(
4 CURSOR( SELECT * FROM trades_staging ) ) ) nt
5 WHERE ROWNUM <= 5;
VALUE(NT)(TRANSACTION_ID)
-----------------------------------------------------------------------------------------
TRADE_LEG_OT(14636, 2, 386190879, 'GBP', .724850851)
LEG_VALUATION_OT(14636, 2, '12-AUG-07', 2096427733)
TRADE_OT(14637, 'SYNONYM', 'PUBLIC', '30-AUG-05 14.16.38.000000', 'SYNONYM', '14-JAN-06')
TRADE_LEG_OT(14637, 1, 292552620, 'GBP', .555342993)
LEG_VALUATION_OT(14637, 1, '12-AUG-07', 670904643)
5 rows selected.
We can now see for the first time that we have a truly flexible pipelined function! In the output above, we have three different structures being returned from the function. We can take this a stage further and decode the type of each object instance using IS OF conditions, as follows.
SQL> SELECT VALUE(nt) AS record_value
2 , CASE
3 WHEN VALUE(nt) IS OF TYPE (trade_ot)
4 THEN 'TRADES'
5 WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
6 THEN 'TRADE_LEGS'
7 ELSE 'LEG_VALUATIONS'
8 END AS record_type
9 FROM TABLE(
10 trades_load.trades_transform(
11 CURSOR( SELECT * FROM trades_staging ) ) ) nt
12 WHERE ROWNUM <= 5;
RECORD_VALUE(TRANSACTION_ID) RECORD_TYPE
---------------------------------------------------------------------- --------------------
TRADE_LEG_OT(14636, 2, 386190879, 'GBP', .724850851) TRADE_LEGS
LEG_VALUATION_OT(14636, 2, '12-AUG-07', 2096427733) LEG_VALUATIONS
TRADE_OT(14637, 'SYNONYM', 'PUBLIC', '30-AUG-05 14.16.38.000000', TRADES
'SYNONYM', '14-JAN-06')
TRADE_LEG_OT(14637, 1, 292552620, 'GBP', .555342993) TRADE_LEGS
LEG_VALUATION_OT(14637, 1, '12-AUG-07', 670904643) LEG_VALUATIONS
5 rows selected.
We have now "labelled" each object instance with the table it is eventually going to be loaded into. When we build our multi-table insert statement later, it will be obvious why we have included this CASE expression. To complete the downcasting of the object instances to their correct subtypes, we require a final step. Using the TREAT function, we can attempt to cast each object instance to each of the subtypes, as follows.
SQL> SELECT CASE
2 WHEN VALUE(nt) IS OF TYPE (trade_ot)
3 THEN 'TRADES'
4 WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
5 THEN 'TRADE_LEGS'
6 ELSE 'LEG_VALUATIONS'
7 END AS record_type
8 , TREAT(VALUE(nt) AS trade_ot) AS trade_rec
9 , TREAT(VALUE(nt) AS trade_leg_ot) AS trade_leg_rec
10 , TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec
11 FROM TABLE(
12 trades_load.trades_transform(
13 CURSOR( SELECT * FROM trades_staging ) ) ) nt
14 WHERE ROWNUM <= 5;
RECORD_TYPE TRADE_REC TRADE_LEG_REC LEG_VALUATION_REC
-------------- ------------------------------ ------------------------------ --------------------------
TRADE_LEGS TRADE_LEG_OT(14636, 2, 3861908
79, 'GBP', .724850851)
LEG_VALUATIONS LEG_VALUATION_OT(14636, 2,
'12-AUG-07', 2096427733)
TRADES TRADE_OT(14637, 'SYNONYM', 'PU
BLIC', '30-AUG-05 14.16.38.000
000', 'SYNONYM', '14-JAN-06')
TRADE_LEGS TRADE_LEG_OT(14637, 1, 2925526
20, 'GBP', .555342993)
LEG_VALUATIONS LEG_VALUATION_OT(14637, 1,
'12-AUG-07', 670904643)
5 rows selected.
Of course, each record returning from the pipelined function is of one subtype only. On each record, therefore, two of the TREAT functions will return NULL and only one will yield the correct subtype. At this stage, however, we have successfully returned multiple record types from a single pipelined function and are now ready to access their respective attributes. We do this as follows.
SQL> SELECT ilv.record_type
2 , ilv.trade_rec.transaction_id AS trade_id
3 , ilv.trade_rec.product_type AS product_type
4 , ilv.trade_leg_rec.leg_no AS leg_no
5 , ilv.leg_valuation_rec.valuation_date AS valuation_date
6 FROM (
7 SELECT CASE
8 WHEN VALUE(nt) IS OF TYPE (trade_ot)
9 THEN 'TRADES'
10 WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
11 THEN 'TRADE_LEGS'
12 ELSE 'LEG_VALUATIONS'
13 END AS record_type
14 , TREAT(VALUE(nt) AS trade_ot) AS trade_rec
15 , TREAT(VALUE(nt) AS trade_leg_ot) AS trade_leg_rec
16 , TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec
17 FROM TABLE(
18 trades_load.trades_transform(
19 CURSOR( SELECT * FROM trades_staging ) ) ) nt
20 WHERE ROWNUM <= 5
21 ) ilv;
RECORD_TYPE TRADE_ID PRODUCT_TY LEG_NO VALUATION
-------------------- ---------- ---------- ---------- ---------
TRADE_LEGS 2
LEG_VALUATIONS 12-AUG-07
TRADES 14637 SYNONYM
TRADE_LEGS 1
LEG_VALUATIONS 12-AUG-07
5 rows selected.
This shows a small sample of the available attributes, but we now have everything we need for our multi-table load. A restriction of multi-table insert is that we cannot carry object instances up to the VALUES clauses on the inserts. Hence we must decompose the objects in the SELECT section, as we see above. Given this, we will now add the LOAD_TRADES procedure to our package body. Note that the pipelined function code is omitted for brevity.
SQL> CREATE OR REPLACE PACKAGE BODY trades_load AS
2
3 ------------------------------------------------------------
4
5 FUNCTION trades_transform( ...snip...
78
79 ------------------------------------------------------------
80
81 PROCEDURE load_trades IS
82 BEGIN
83
84 INSERT FIRST
85 WHEN record_type = 'TRADES'
86 THEN
87 INTO trades ( trade_id
88 , product_type
89 , counterparty
90 , trade_timestamp
91 , trading_book
92 , maturity_date
93 )
94 VALUES ( trade_id
95 , product_type
96 , counterparty
97 , trade_timestamp
98 , trading_book
99 , maturity_date
100 )
101 WHEN record_type = 'TRADE_LEGS'
102 THEN
103 INTO trade_legs ( trade_id
104 , leg_no
105 , trade_amount
106 , currency
107 , trade_price
108 )
109 VALUES ( trade_id
110 , leg_no
111 , trade_amount
112 , currency
113 , trade_price
114 )
115 WHEN record_type = 'LEG_VALUATIONS'
116 THEN
117 INTO leg_valuations ( trade_id
118 , leg_no
119 , valuation_date
120 , market_value
121 )
122 VALUES ( trade_id
123 , leg_no
124 , valuation_date
125 , market_value
126 )
127 SELECT ilv.record_type
128 , COALESCE(
129 ilv.trade_rec.transaction_id,
130 ilv.trade_leg_rec.transaction_id,
131 ilv.leg_valuation_rec.transaction_id
132 ) AS trade_id
133 , COALESCE(
134 ilv.trade_leg_rec.leg_no,
135 ilv.leg_valuation_rec.leg_no
136 ) AS leg_no
137 , ilv.trade_rec.product_type AS product_type
138 , ilv.trade_rec.counterparty AS counterparty
139 , ilv.trade_rec.trade_timestamp AS trade_timestamp
140 , ilv.trade_rec.trading_book AS trading_book
141 , ilv.trade_rec.maturity_date AS maturity_date
142 , ilv.trade_leg_rec.trade_amount AS trade_amount
143 , ilv.trade_leg_rec.currency AS currency
144 , ilv.trade_leg_rec.trade_price AS trade_price
145 , ilv.leg_valuation_rec.valuation_date AS valuation_date
146 , ilv.leg_valuation_rec.market_value AS market_value
147 FROM (
148 SELECT CASE
149 WHEN VALUE(nt) IS OF TYPE (trade_ot)
150 THEN 'TRADES'
151 WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
152 THEN 'TRADE_LEGS'
153 ELSE 'LEG_VALUATIONS'
154 END AS record_type
155 , TREAT(VALUE(nt) AS trade_ot) AS trade_rec
156 , TREAT(VALUE(nt) AS trade_leg_ot) AS trade_leg_rec
157 , TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec
158 FROM TABLE(
159 trades_load.trades_transform(
160 CURSOR( SELECT * FROM trades_staging ) ) ) nt
161 ) ilv;
162
163 DBMS_OUTPUT.PUT_LINE( SQL%ROWCOUNT || ' rows inserted.' );
164
165 END load_trades;
166
167 END trades_load;
168 /
Package body created.
Our load is quite simple. We select the attributes from each record as described in earlier examples and label each row with its target table using a CASE expression. This "label" is used in the WHEN clauses of the INSERT FIRST section to determine which table the record represents and we insert accordingly. We now have a multi-table load using a pipelined function and type substitution.
testing the loads
We are now ready to test our loads. Remember that we enabled parallel at both table and function level. In multi-table inserts, the entire DML section is parallel-enabled if just one of the target tables is set to parallel. We will therefore enable parallel query and DML in our session as follows.
SQL> ALTER SESSION ENABLE PARALLEL QUERY;
Session altered.
SQL> ALTER SESSION ENABLE PARALLEL DML;
Session altered.
We will now execute our load, as follows.
SQL> exec trades_load.load_trades;
BEGIN trades_load.load_trades; END;
*
ERROR at line 1:
ORA-02291: integrity constraint (SCOTT.TRADE_LEGS_FK01) violated - parent key not found
ORA-06512: at "SCOTT.TRADES_LOAD", line 72
ORA-06512: at line 1
This is disappointing! Oracle has tried to load a TRADE_LEGS record before its parent TRADES record is present. The reason for this is simple: multi-table inserts do not guarantee order of inserts and we are seeing evidence of this. We might think that we can simply force the ordering of the input data, but this does not change the fact that Oracle has decided to load TRADE_LEGS before TRADES. In the sample code file (see bottom of article for download details) another version of the TRADES_LOAD package is included. This is the same as the above with the following additions, all added in the vain attempt to ensure that the INSERT FIRST section receives data in the order in which we need it to be loaded.
an ORDER BY on the select from the pipelined function, to ensure that records are ordered in TRADES->TRADE_LEGS->LEG_VALUATIONS order; plus
an ORDER BY on the CURSOR expression over the TRADES_STAGING table to ensure records are input into the pipelined function in TRADE_ID and LEG_NO order; plus
a streaming clause on the pipelined function itself, which enables us to control the order in which data is piped to the consumer.
Despite these three combined attempts at "brute force" ordering, we receive the same error message as above. The issue lies with the multi-table insert. We must therefore workaround this problem in one of two ways:
temporarily disable the foreign key constraints; or
use deferrable foreign key constraints.
We will test both of these workarounds below.
disable/enable constraints
To ensure we can load all of our tables, we will disable the foreign key constraints, as follows.
SQL> ALTER TABLE trade_legs DISABLE CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations DISABLE CONSTRAINT leg_valuations_fk01;
Table altered.
We will now test our loading procedure again.
SQL> exec trades_load.load_trades;
249290 rows inserted.
PL/SQL procedure successfully completed.
SQL> COMMIT;
Commit complete.
We have successfully loaded our tables. As a quick sanity-check, we will count the records we loaded, as follows.
SQL> SELECT COUNT(*) FROM trades;
COUNT(*)
----------
49858
1 row selected.
SQL> SELECT COUNT(*) FROM trade_legs;
COUNT(*)
----------
99716
1 row selected.
SQL> SELECT COUNT(*) FROM leg_valuations;
COUNT(*)
----------
99716
1 row selected.
We will now enable our foreign key constraints, which will raise an exception if there are any data issues.
SQL> ALTER TABLE trade_legs ENABLE CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations ENABLE CONSTRAINT leg_valuations_fk01;
Table altered.
Success! Note that this method is possible for serialised systems, but for concurrent applications, it might not be suitable. As an alternative, we can use deferrable constraints, as demonstrated below.
deferrable constraints
Deferred constraints enable us to postpone the checking of constraint violations until the end of a transaction (i.e. a COMMIT). This enables us to load data in a state that temporarily violates one or more constraints, but which is rectified at a later stage. The deferrable property of constraints can only be set at the time of creation. We will therefore drop and re-create our two foreign keys as follows.
SQL> ALTER TABLE trade_legs DROP CONSTRAINT trade_legs_fk01;
Table altered.
SQL> ALTER TABLE leg_valuations DROP CONSTRAINT leg_valuations_fk01;
Table altered.
SQL> ALTER TABLE trade_legs ADD
2 CONSTRAINT trade_legs_fk01
3 FOREIGN KEY (trade_id)
4 REFERENCES trades (trade_id)
5 DEFERRABLE;
Table altered.
SQL> ALTER TABLE leg_valuations ADD
2 CONSTRAINT leg_valuations_fk01
3 FOREIGN KEY (trade_id, leg_no)
4 REFERENCES trade_legs (trade_id, leg_no)
5 DEFERRABLE;
Table altered.
Our foreign keys are now deferrable. To exploit this, we must set their states to deferred, as follows.
SQL> SET CONSTRAINT trade_legs_fk01 DEFERRED;
Constraint set.
SQL> SET CONSTRAINT leg_valuations_fk01 DEFERRED;
Constraint set.
Having cleared the sample tables of the data loaded in the previous example, we will re-run our loading procedure.
SQL> exec trades_load.load_trades;
249305 rows inserted.
PL/SQL procedure successfully completed.
Again, we manage to load the three tables successfully but we haven't validated the data yet. Deferred constraints are not checked until the end of a transaction, so we will now COMMIT our load.
SQL> COMMIT;
Commit complete.
Success! We will now check a small sample of the data we loaded.
SQL> SELECT * FROM trades WHERE ROWNUM <= 5;
TRADE_ID PRODUCT_TYPE COUNTERPARTY TRADE_TIMESTAMP MATURITY_DATE
---------- ------------ ------------- -------------------------- -------------
6774 TYPE SYS 30-AUG-05 14.02.47.000000 31-MAY-06
6778 VIEW SYS 30-AUG-05 14.02.47.000000 04-JUN-06
6782 VIEW SYS 30-AUG-05 14.02.48.000000 08-JUN-06
6786 VIEW SYS 30-AUG-05 14.02.48.000000 12-JUN-06
6790 TYPE SYS 30-AUG-05 14.02.49.000000 16-JUN-06
5 rows selected.
SQL> SELECT * FROM trade_legs WHERE ROWNUM <= 5;
TRADE_ID LEG_NO TRADE_AMOUNT CURRENCY TRADE_PRICE
---------- ---------- ------------ -------- -----------
6774 1 1624901166 GBP .347733816
6776 2 524881873 GBP .904404062
6778 1 622715309 GBP .608247575
6780 2 821449852 GBP .508567497
6782 1 1623359117 GBP .674977682
5 rows selected.
SQL> SELECT * FROM leg_valuations WHERE ROWNUM <= 5;
TRADE_ID LEG_NO VALUATION_DATE MARKET_VALUE
---------- ---------- -------------- ------------
2545 2 14-AUG-07 1042583556
2546 1 14-AUG-07 1098505446
2547 1 14-AUG-07 49145215
2548 2 14-AUG-07 97502618
2549 2 14-AUG-07 127231786
5 rows selected.
The primary advantage of this method over the disable/enable method is that concurrency is not affected by the constraint states of a single session. We can therefore adopt this method in a multi-user application that is likely to load the same target tables concurrently.
an alternative approach to consider
In this article, we have demonstrated a powerful new technique, combining three unique features (multi-table insert, parallel pipelined functions and type substitution) to extend the ETL potential of Oracle. As a final note, however, there is a simple alternative that some readers might wish to explore. Remember that pipelined functions are usually defined by a single object type and collection of this type. We can easily remove the type hierarchy from our example but still load multiple tables from a single function. We would instead develop the following:
a single object type that contains all of the attributes needed for all target tables. In our example, we would have a single denormalised type containing all of the attributes in TRADES, TRADE_LEGS and LEG_VALUATIONS;
a collection of this single object type as usual;
a pipelined function to prepare and pipe each output record with all denormalised attributes populated; and
a multi-table insert that loads the attributes into their respective tables.
With reference to our trading examples, the alternative multi-table insert would be based on the following pseudo-code.
INSERT ALL
WHEN leg_no = 1
THEN
INTO trades VALUES (...)
WHEN 1=1
THEN
INTO trade_legs VALUES (...)
WHEN 1=1
THEN
INTO leg_valuations VALUES (...)
SELECT trade_id
, leg_no
, ...all other attributes of TRADES, TRADE_LEGS and LEG_VALUATIONS...
FROM TABLE(
trades_load.trades_transform(
CURSOR( SELECT * FROM trades_staging ) ) );
Completing this can be an exercise for the reader. The advantages of this method are that fewer records are piped from the function (i.e. several tables are loaded from each record) and the SELECT is much simpler (no type semantics are required). The disadvantage is that the records can become very wide and at the time of writing, pipelined functions perform badly with wide record structures (over 50 attributes).
further reading
--修正後的procedure代碼
PROCEDURE load_trades IS
BEGIN
INSERT FIRST
WHEN record_type = 'TRADES'
THEN
INTO trades ( trade_id
, product_type
, counterparty
, trade_timestamp
, trading_book
, maturity_date
)
VALUES ( trade_id
, product_type
, counterparty
, trade_timestamp
, trading_book
, maturity_date
)
WHEN record_type = 'TRADE_LEGS'
THEN
INTO trade_legs ( trade_id
, leg_no
, trade_amount
, currency
, trade_price
)
VALUES ( trade_id
, leg_no
, trade_amount
, currency
, trade_price
)
WHEN record_type = 'LEG_VALUATIONS'
THEN
INTO leg_valuations ( trade_id
, leg_no
, valuation_date
, market_value
)
VALUES ( trade_id
, leg_no
, valuation_date
, market_value
)
SELECT ilv.record_type
, COALESCE(
ilv.trade_rec.transaction_id,
ilv.trade_leg_rec.transaction_id,
ilv.leg_valuation_rec.transaction_id
) AS trade_id
, COALESCE(
ilv.trade_leg_rec.leg_no,
ilv.leg_valuation_rec.leg_no
) AS leg_no
, ilv.trade_rec.product_type AS product_type
, ilv.trade_rec.counterparty AS counterparty
, ilv.trade_rec.trade_timestamp AS trade_timestamp
, ilv.trade_rec.trading_book AS trading_book
, ilv.trade_rec.maturity_date AS maturity_date
, ilv.trade_leg_rec.trade_amount AS trade_amount
, ilv.trade_leg_rec.currency AS currency
, ilv.trade_leg_rec.trade_price AS trade_price
, ilv.leg_valuation_rec.valuation_date AS valuation_date
, ilv.leg_valuation_rec.market_value AS market_value
FROM (
SELECT CASE
WHEN VALUE(nt) IS OF TYPE (trade_ot)
THEN 'TRADES'
WHEN VALUE(nt) IS OF TYPE (trade_leg_ot)
THEN 'TRADE_LEGS'
ELSE 'LEG_VALUATIONS'
END AS record_type
, TREAT(VALUE(nt) AS trade_ot) AS trade_rec
, TREAT(VALUE(nt) AS trade_leg_ot) AS trade_leg_rec
, TREAT(VALUE(nt) AS leg_valuation_ot) AS leg_valuation_rec
FROM TABLE(
trades_load.trades_transform(
CURSOR( SELECT * FROM trades_staging ORDER BY trade_id, leg_no) ) ) nt
) ilv
ORDER BY
COALESCE(
ilv.trade_rec.transaction_id,
ilv.trade_leg_rec.transaction_id,
ilv.leg_valuation_rec.transaction_id
)
, COALESCE(
ilv.trade_leg_rec.leg_no,
ilv.leg_valuation_rec.leg_no
)
, CASE ilv.record_type
WHEN 'TRADES'
THEN 0
WHEN 'TRADE_LEGS'
THEN 1
ELSE 2
END;
DBMS_OUTPUT.PUT_LINE( SQL%ROWCOUNT || ' rows inserted.' );
END load_trades;
END trades_load;
/
参考至:http://www.oracle-developer.net/display.php?id=420
http://docs.oracle.com/cd/B28359_01/appdev.111/b28371/adobjadv.htm#CHDJCFAG
本文原创,转载请注明出处、作者
如有错误,欢迎指正
邮箱:czmcj@163.com
相关推荐
Pipeline ADC设计指南 Pipeline ADC是一种常用的模数转换器架构,它通过将模拟信号分解成多个阶段来实现高精度的数字化。下面我们将详细介绍Pipeline ADC的设计指南。 一、Pipeline ADC的基本原理 Pipeline ADC...
Jenkins Pipeline 部署 SpringBoot 应用详解 本篇文章主要介绍了使用 Jenkins Pipeline 部署 SpringBoot 应用的详细教程,从安装依赖包到环境准备、安装 Jenkins 等步骤进行详细的介绍,对读者学习或工作具有一定的...
《gltf-pipeline:构建三维图形的利器》 gltf-pipeline 是一个强大的工具集,主要用于处理和优化基于 glTF(GL Transmission Format)的三维模型数据。glTF 是一种开放标准的三维模型格式,旨在高效、轻量地传输和...
Doris PipeLine设计文档 Doris PipeLine设计文档是关于Doris执行引擎的设计文档,旨在解决当前Doris执行引擎中存在的一些问题,如无法充分利用多核计算能力、提升查询性能、手动设置并行度等问题。该设计文档提出了...
高通QCOM camera Pipeline可视化工具 1.4版本
Avalon-MM Pipeline Bridge是Qsys系统中一种重要的互联组件,用于优化基于Avalon-MM接口的SoC(System-on-a-Chip)设计中的数据传输性能。Avalon-MM是Altera公司(现已被Intel收购)开发的一种内存映射协议,广泛...
本篇重点讲解的是Jenkins Pipeline的高级技巧,旨在帮助初学者,即"小白",快速搭建并掌握Pipeline项目的开发环境。 一、Jenkins Pipeline基础 Jenkins Pipeline是一种定义和执行CI/CD流程的声明式或脚本化方式。...
pipeline studio快速入门,确实容易上手
jenkins 构建项目之 pipeline基础教程 jenkins Pipeline 是一种基于工作流框架的自动化构建工具,它可以将原本独立运行于单个或者多个节点的任务连接起来,实现单个任务难以完成的复杂流程编排和可视化的工作。...
在本文中,我们将深入探讨Netty中的关键概念,尤其是Pipeline(管道)的顺序及其工作原理。 首先,让我们理解Netty的核心组件:Server和Client。在Netty中,服务器端通常被称为BossGroup和WorkerGroup,BossGroup...
This book describes the Direct3D graphics pipeline, from presentation of scene data to pixels appearing on the screen. The book is organized sequentially following the data °ow through the pipeline ...
标题中的"PyPI 官网下载 | bamboo-pipeline-3.6.3.tar.gz"指出这是一个从Python Package Index(PyPI)官方站点获取的软件包,名为“bamboo-pipeline”。这个软件包的版本是3.6.3,并且是以tar.gz格式压缩的。在...
### Pipeline ADC概述与关键技术 #### 一、Pipeline ADC的基本概念 Pipeline ADC(流水线模数转换器)是一种高性能、高速度的模数转换技术,在现代通信系统、雷达系统及信号处理领域发挥着至关重要的作用。它能够...
构建机器学习Pipeline,也就是构建机器学习流程线,是数据科学和软件工程领域中的一个重要议题。在现实世界中,数据科学家通常在一个为统计和机器学习量身定做的开发环境中工作,例如使用Python等工具,在一个“沙盒...
https://stackoverflow.com/questions/41893846/jacoco-coverage-in-jenkins-pipeline jenkins官网介绍:https://jenkins.io/doc/pipeline/steps/jacoco/ 流水线语法的片段生成器可以选择jacoco,设置jacoco jacoco...
Hosted Community Edition - Try It Now! ...Email: help@pipeline.ai Web: https://support.pipeline.ai YouTube: https://youtube.pipeline.ai Slideshare: https://slideshare.pipeline.ai Work
### 如何使用软件PIPELINE-STUDIO #### 一、软件特点与主要用途 **Pipeline-Studio**是一款专门针对天然气管道的模拟软件,适用于管道系统的稳态及动态模拟。这款软件由英国ESI公司开发,拥有丰富的功能和强大的...
Jenkins CI Pipeline 是一个强大的自动化工具,用于构建、测试和部署软件。Pipeline 允许开发者定义持续集成(CI)和持续部署(CD)的工作流程,这些工作流程是声明式的,可编写为版本化的代码,提高了可重复性和可...
在Jenkins自动化构建和部署的过程中,Pipeline作为一种强大的脚本化工具,可以让我们更灵活地管理持续集成和持续交付流程。本资料重点讲解了在Pipeline中如何通过代码串联多个job的执行,这对于构建复杂的CI/CD流程...
Redis Pipeline 是 Redis 数据库操作中的一个高效特性,它允许客户端一次性发送多个命令到服务器,而无需等待每个命令的响应。这种技术显著提高了批量处理和高并发环境下的性能,因为减少了网络通信的开销。 首先,...