improving performance with pipelined table functions

In January 2009, I wrote a section on performance tuning with pipelined functions for Steven Feuerstein's Oracle PL/SQL Programming, Fifth Edition. My contribution appears in Chapter 21 of the book, but I've re-printed it here for oracle-developer.net readers, courtesy of O'Reilly Media, Inc. Note that the following article contains references to other chapters in the book as well as the source code that I wrote to accompany the topic.

Pipelined functions are where the elegance and simplicity of PL/SQL converge with the performance of SQL. Complex data transformations are effortless to develop and support with PL/SQL, yet to achieve high-performance data processing, we often resort to set-based SQL solutions. Pipelined functions bridge the gap between the two methods effortlessly, but they also have some unique performance features of their own, making them a superb performance optimization tool.

In the following pages, I'll show some examples of typical data-processing requirements and how you might tune them with pipelined functions. I'll cover the following topics:

The basic syntax for pipelined table functions is covered in Chapter 17. To recap, a pipelined function is called in the FROM clause of a SQL statement and is queried as if it were a relational table or other rowsource. Unlike standard table functions (that have to complete all of their processing before passing a potentially large collection of data back to the calling context), pipelined table functions stream their results to the client almost as soon as they are prepared. In other words, pipelined functions do not materialize their entire result set, and this optimization feature dramatically reduces their PGA memory footprint. Another unique performance feature of pipelined functions is the ability to call them in the context of a parallel query. I have taken advantage of these unique performance features many times, and in the next few pages I will show you how and when to use pipelined functions to improve the performance of some of your own programs.

replacing row-based inserts with pipelined function-based loads

To demonstrate the performance of pipelined functions, let's first imagine a typical legacy loading scenario that I want to bring into the 21st century. Using the stockpivot example, I have coded a simple row-by-row load to fetch the stockpivot source data and pivot each record into two rows for insert. It is contained in a package and is as follows:

/* File on web: stockpivot_setup.sql */
   PROCEDURE load_stocks_legacy IS

      CURSOR c_source_data IS
         SELECT ticker, open_price, close_price, trade_date
         FROM   stocktable;

      r_source_data stockpivot_pkg.stocktable_rt;
      r_target_data stockpivot_pkg.tickertable_rt;

   BEGIN
      OPEN c_source_data;
      LOOP
         FETCH c_source_data INTO r_source_data;
         EXIT WHEN c_source_data%NOTFOUND;

         /* Opening price... */
         r_target_data.ticker      := r_source_data.ticker;
         r_target_data.price_type  := 'O';
         r_target_data.price       := r_source_data.open_price;
         r_target_data.price_date  := r_source_data.trade_date;
         INSERT INTO tickertable VALUES r_target_data;

         /* Closing price... */
         r_target_data.price_type := 'C';
         r_target_data.price      := r_source_data.close_price;
         INSERT INTO tickertable VALUES r_target_data;

      END LOOP;
      CLOSE c_source_data;
   END load_stocks_legacy;

I regularly see code of this format and since Oracle8i Database I’ve typically used BULK COLLECT and FORALL as my primary tuning tool (when the logic is too complex for a set-based SQL solution). However, an alternative technique (that I first saw described by Tom Kyte - see his discussion in Expert Oracle Database Architecture, pp. 640-643) is to use a set-based insert from a pipelined function. In other words, a pipelined function is used for all of the legacy data transformation and preparation logic, but the target-table load is handled separately as a set-based insert. Since reading about this powerful technique, I have used it successfully in my own performance optimization work, as described in the following sections.

a pipelined function implementation

As demonstrated in Chapter 17, the first thing to consider when creating a pipelined function is the data that it will return. For this, I need to create an object type to define a single row of the pipelined function's return data.

/* File on web: stockpivot_setup.sql */
CREATE TYPE stockpivot_ot AS OBJECT
( ticker      VARCHAR2(10)
, price_type  VARCHAR2(1)
, price       NUMBER 
, price_date  DATE
);

I also need to create a collection of this object as this defines the function's return type.

/* File on web: stockpivot_setup.sql */
CREATE TYPE stockpivot_ntt AS TABLE OF stockpivot_ot;

Transforming the legacy code into a pipelined function is quite simple. First I must define the function specification in the header (see the stockpivot_setup.sql file on the book’s web site). I must also include a load procedure that I will describe later:

/* File on web: stockpivot_setup.sql */
CREATE PACKAGE stockpivot_pkg AS

   TYPE stocktable_rct IS REF CURSOR
      RETURN stocktable%ROWTYPE;

   <snip>

   FUNCTION pipe_stocks( 
            p_source_data IN stockpivot_pkg.stocktable_rct 
            ) RETURN stockpivot_ntt PIPELINED;

   PROCEDURE load_stocks;

END stockpivot_pkg;

My pipelined function takes a strong REF CURSOR as an input parameter (I could also use a weak REF CURSOR in this case). The cursor parameter itself is not necessarily required. It would be just as valid for me to declare the cursor in the function itself (as I did with the legacy procedure). However, the cursor parameter is going to be required for further iterations of this pipelined function, so I’ve introduced it from the outset.

The function's implementation follows.

/* File on web: stockpivot_setup.sql */
  1     FUNCTION pipe_stocks(
  2              p_source_data IN stockpivot_pkg.stocktable_rct
  3              ) RETURN stockpivot_ntt PIPELINED IS
  4
  5        r_target_data stockpivot_ot := stockpivot_ot(NULL, NULL, NULL, NULL);
  6        r_source_data stockpivot_pkg.stocktable_rt;
  7
  8     BEGIN
  9        LOOP
 10           FETCH p_source_data INTO r_source_data;
 11           EXIT WHEN p_source_data%NOTFOUND;
 12
 13           /* First row... */
 14           r_target_data.ticker     := r_source_data.ticker;
 15           r_target_data.price_type := 'O';
 16           r_target_data.price      := r_source_data.open_price;
 17           r_target_data.price_date := r_source_data.trade_date;
 18           PIPE ROW (r_target_data);
 19
 20           /* Second row... */
 21           r_target_data.price_type := 'C';
 22           r_target_data.price      := r_source_data.close_price;
 23           PIPE ROW (r_target_data);
 24
 25        END LOOP;
 26        CLOSE p_source_data;
 27        RETURN;
 28     END pipe_stocks;

Other than the general pipelined function syntax (that you should by now be familiar with from Chapter 17), the majority of the pipelined function's code is recognizable from the legacy example. The main differences to consider are summarized here.

Line(s) Description
2 The legacy cursor is removed from the code and instead is passed as a REF CURSOR parameter.
5 My target data variable is no longer defined as the target table's ROWTYPE. It is now of the STOCKPIVOT_OT object type that defines the pipelined function's return data.
18 and 23 Instead of inserting records into tickertable, I pipe records from the function. At this stage, the database will buffer a small number of my piped object rows into a corresponding collection. Depending on the client's array size, this buffered collection of data will be available almost immediately.

loading from a pipelined function

As you can see, with only a small number of changes to the original load program, I now have a pipelined function that prepares and pipes all of the data that I need to load into tickertable. To complete the conversion of my legacy code, I only need to write an additional procedure to insert the piped data into my target table.

/* File on web: stockpivot_setup.sql */
   PROCEDURE load_stocks IS
   BEGIN

      INSERT INTO tickertable (ticker, price_type, price, price_date)
      SELECT ticker, price_type, price, price_date
      FROM   TABLE(
                stockpivot_pkg.pipe_stocks(
                   CURSOR(SELECT * FROM stocktable)));

   END load_stocks;

That completes the basic conversion of the row-by-row legacy code to a pipelined function solution. So how does this compare to the original? In my tests, I created the stocktable as an external table with a file of 500,000 records. The legacy row-by-row code completed in 57 seconds (inserting 1 million rows into tickertable) and the set-based insert using the pipelined function ran in just 16 seconds (test results for all examples are available on the book’s web site).

Considering that this is my first and most basic pipelined function implementation, the improvement in performance shown above is quite respectable. However, it is not quite the performance I can get when using a simple BULK COLLECT and FORALL solution (which runs in just over 5 seconds in my tests), so I will need to make some modifications to my pipelined function load.

Before I do this, however, notice that I retained the single-row fetches off the main cursor and did nothing to reduce the “expensive” context-switching (which would require a BULK COLLECT fetch). So why is it faster than the legacy row-by-row code?

It is faster primarily because of the switch to set-based SQL. Set-based DML (such as the INSERT...SELECT I used in my pipelined load) is almost always considerably faster than a row-based, procedural solution. In this particular case, I have benefited directly from the Oracle database's internal optimization of set-based inserts. Specifically, the database writes considerably less redo information for set-based inserts (INSERT...SELECT) than it does for singleton inserts (INSERT...VALUES). That is to say, if I insert 100 rows in a single statement, it will generate less redo than if I inserted 100 rows one-by-one. My original legacy load of 1 million tickertable rows generated over 270Mb of redo information. This was reduced to just over 37Mb when using the pipelined function-based load, contributing to a significant proportion of the time savings.

I have omitted any complicated data transformations from my examples for the sake of clarity. You should assume in all cases that the data-processing rules are sufficiently complex to warrant a PL/SQL, pipelined function solution in the first place. Otherwise, I would probably just use a set-based SQL solution with analytic functions, subquery factoring, and CASE expressions to transform my high-volume data!

tuning pipelined functions with array fetches

Despite having tuned the legacy code with a pipelined function implementation, I am not done yet. There are further optimization possibilities and I need to make my processing at least as fast as a BULK COLLECT and FORALL solution. Notice that I used single-row fetches from the main source cursor. The first simple tuning possibility is therefore to use array fetches with BULK COLLECT.

I begin by adding a default array size to my package specification. The optimal array fetch size will vary according to your specific data-processing requirements, but I always prefer to start my tests with 100 and work from there. I also add an associative array type to the package specification (it could just as well be declared in the body); this is for bulk fetches from the source cursor. Finally, I add a second parameter to the pipelined function signature so that I can control the array fetch size (this isn’t necessary of course: just good practice). My specification is now as follows.

/* File on web: stockpivot_setup.sql */
CREATE PACKAGE stockpivot_pkg AS
   <snip>
   c_default_limit CONSTANT PLS_INTEGER := 100;

   TYPE stocktable_aat IS TABLE OF stocktable%ROWTYPE
      INDEX BY PLS_INTEGER;

   FUNCTION pipe_stocks_array( 
            p_source_data IN stockpivot_pkg.stocktable_rct,
            p_limit_size  IN PLS_INTEGER DEFAULT stockpivot_pkg.c_default_limit
            ) RETURN stockpivot_ntt PIPELINED;
   <snip> 
END stockpivot_pkg;

The function itself is very similar to the original version.

/* File on web: stockpivot_setup.sql */
   FUNCTION pipe_stocks_array( 
            p_source_data IN stockpivot_pkg.stocktable_rct,
            p_limit_size  IN PLS_INTEGER DEFAULT stockpivot_pkg.c_default_limit
            ) RETURN stockpivot_ntt PIPELINED IS

      r_target_data  stockpivot_ot := stockpivot_ot(NULL, NULL, NULL, NULL);
      aa_source_data stockpivot_pkg.stocktable_aat;

   BEGIN
      LOOP
         FETCH p_source_data BULK COLLECT INTO aa_source_data LIMIT p_limit_size;
         EXIT WHEN aa_source_data.COUNT = 0;

         /* Process the batch of (p_limit_size) records... */
         FOR i IN 1 .. aa_source_data.COUNT LOOP
   
            /* First row... */
            r_target_data.ticker     := aa_source_data(i).ticker;
            r_target_data.price_type := 'O';
            r_target_data.price      := aa_source_data(i).open_price;
            r_target_data.price_date := aa_source_data(i).trade_date;
            PIPE ROW (r_target_data);

            /* Second row... */
            r_target_data.price_type := 'C';
            r_target_data.price      := aa_source_data(i).close_price;
            PIPE ROW (r_target_data);

         END LOOP;

      END LOOP;
      CLOSE p_source_data; 
      RETURN;

   END pipe_stocks_array;

The only difference from my original version is the use of BULK COLLECT…LIMIT from the source cursor. The load procedure is the same as before, modified to reference the array-version of the pipelined function. This reduced my loading time further to just 6 seconds, purely because of the reduction in context-switching from array-based PL/SQL. My pipelined function solution now has comparable performance to my BULK COLLECT and FORALL solution.

exploiting parallel pipelined functions for ultimate performance

I've achieved some good performance gains from the switch to a set-based insert from a pipelined function. Yet I have one more tuning option for my stockpivot load that will give me better performance than any other solution: using the parallel capability of pipelined functions described in Chapter 17. In this next iteration, I parallel-enable my stockpivot function by adding another clause to the function signature:

/* File on web: stockpivot_setup.sql */
CREATE PACKAGE stockpivot_pkg AS
   <snip>
   FUNCTION pipe_stocks_parallel( 
            p_source_data IN stockpivot_pkg.stocktable_rct 
            p_limit_size  IN PLS_INTEGER DEFAULT stockpivot_pkg.c_default_limit
            ) RETURN stockpivot_ntt 
              PIPELINED
              PARALLEL_ENABLE (PARTITION p_source_data BY ANY);
   <snip>
END stockpivot_pkg;

By using the ANY partitioning scheme, I have instructed the Oracle database to randomly allocate my source data to the parallel processes. This is because the order in which the function receives and processes the source data has no effect on the resulting output (i.e. there are no inter-row dependencies). That is not always the case, of course.

enabling parallel pipelined function execution

Aside from the parallel-enabling syntax in the specification and body, the function implementation is the same as the array-fetch example (see the stockpivot_setup.sql file on the web site for the full package). However, I need to ensure that my tickertable load is executed in parallel. First, I must enable parallel DML at the session level and once this is done, parallel query is invoked in one of the following ways:

Parallel query/DML is a feature of Oracle Database Enterprise Edition. If you use either Standard Edition or Standard Edition One, you are not licensed to use the parallel feature of pipelined functions.

In my load, I have enabled parallel DML at the session level and used hints to specify a degree of parallelism (DOP) of 4:

/* File on web: stockpivot_setup.sql */
   PROCEDURE load_stocks_parallel IS
   BEGIN

      EXECUTE IMMEDIATE 'ALTER SESSION ENABLE PARALLEL DML';

      INSERT /*+ PARALLEL(t, 4) */ INTO tickertable t
            (ticker, price_type, price, price_date)
      SELECT ticker, price_type, price, price_date
      FROM   TABLE(
                stockpivot_pkg.pipe_stocks_parallel(
                   CURSOR(SELECT /*+ PARALLEL(s, 4) */ * FROM stocktable s)));

   END load_stocks_parallel;

This reduces the load time to just over 3 seconds, a significant improvement on my original legacy code and all other versions of my pipelined function load. Of course, when dealing in small units of time such as this, the startup costs of parallel processes will impact the overall runtime, but I have still managed almost a 50% improvement on my array version. The fact that parallel inserts use direct path rather than conventional path also means that the redo generation dropped further still to just 25Kb!

In commercial systems, you might be tuning processes that run for an hour or more, so the gains you can achieve with parallel pipelined loads will be significant in both proportional and actual terms.

When you are using parallel pipelined functions, your source cursor must be passed as a REF CURSOR parameter. In serial pipelined functions, the source cursor can be embedded in the function itself (although I have chosen not to do this in any of my examples).
Furthermore, the REF CURSOR can be either weakly or strongly typed for functions partitioned with the ANY scheme, but for HASH or RANGE based partitioning, it must be strongly typed. See Chapter 15 for more details on REF CURSORs and cursor variables.

tuning merge operations with pipelined functions

You might now be considering serial or parallel pipelined functions as a tuning mechanism for your own high-volume data loads. Yet not all loads involve inserts like the stockpivot example. Many data loads are incremental and require periodic merges of new and modified data. The good news is that the same principle of combining PL/SQL transformations with set-based SQL applies to merges (and updates) as well.

row-based pl/sql merge processing

Consider the following procedure, taken from my employee_pkg example. I have a merge of a large number of employee records but my legacy code uses an old PL/SQL technique of attempting an update first and inserting only when the update matches zero records in the target table.

/* File on web: employees_merge_setup.sql */
   PROCEDURE upsert_employees IS
      n PLS_INTEGER := 0;
   BEGIN
      FOR r_emp IN (SELECT * FROM employees_staging) LOOP
         UPDATE employees
         SET    <snip>
         WHERE  employee_id = r_emp.employee_id;

         IF SQL%ROWCOUNT = 0 THEN
            INSERT INTO employees (<snip>)
            VALUES (<snip>);
         END IF;
      END LOOP;
   END upsert_employees;

I've removed some of the code for brevity, but you can clearly see the “upsert” technique in action. Note that I've used an implicit cursor FOR loop that will benefit from the array-fetch optimization introduced to PL/SQL in Oracle Database 10g.

To test this procedure, I created a staging table of 500,000 employees records (this is a massive corporation!) and inserted 250,000 of them into an employees table to manufacture an even split between updates and inserts. This PL/SQL “poor man's merge” solution completed in 46 seconds.

using pipelined functions for set-based merge

Converting this example to a set-based SQL MERGE from a pipelined function is, once again, quite simple. First, I create the supporting object and nested table types (see the employees_merge_setup.sql file for details) and declare the function in the package header.

/* File on web: employees_merge_setup.sql */
CREATE PACKAGE employee_pkg AS

   c_default_limit CONSTANT PLS_INTEGER := 100;

   TYPE employee_rct IS REF CURSOR RETURN employees_staging%ROWTYPE;
   TYPE employee_aat IS TABLE OF employees_staging%ROWTYPE
      INDEX BY PLS_INTEGER;

   <snip>

   FUNCTION pipe_employees( 
            p_source_data IN employee_pkg.employee_rct
            p_limit_size  IN PLS_INTEGER DEFAULT employee_pkg.c_default_limit
            ) RETURN employee_ntt 
              PIPELINED
              PARALLEL_ENABLE (PARTITION p_source_data BY ANY);
END employee_pkg;

I have parallel-enabled the pipelined function and used the ANY partitioning scheme as before. The function implementation is as follows:

/* File on web: employees_merge_setup.sql */
   FUNCTION pipe_employees( 
            p_source_data IN employee_pkg.employee_rct,
            p_limit_size  IN PLS_INTEGER DEFAULT employee_pkg.c_default_limit
            ) RETURN employee_ntt 
              PIPELINED 
              PARALLEL_ENABLE (PARTITION p_source_data BY ANY) IS
      aa_source_data employee_pkg.employee_aat;
   BEGIN
      LOOP
         FETCH p_source_data BULK COLLECT INTO aa_source_data LIMIT p_limit_size;
         EXIT WHEN aa_source_data.COUNT = 0;
         FOR i IN 1 .. aa_source_data.COUNT LOOP
            PIPE ROW (
               employee_ot( aa_source_data(i).employee_id,
                            <snip>
                            SYSDATE ));
         END LOOP;
      END LOOP;
      CLOSE p_source_data;
      RETURN;
   END pipe_employees;

This function simply array-fetches the source data and pipes it out in the correct format. I can now use my function in a MERGE statement, which I wrap in a procedure in employee_pkg, as follows.

/* File on web: employees_merge_setup.sql */
   PROCEDURE merge_employees IS
   BEGIN

      EXECUTE IMMEDIATE 'ALTER SESSION ENABLE PARALLEL DML';

      MERGE /*+ PARALLEL(e, 4) */ 
         INTO  employees e
         USING TABLE(
                  employee_pkg.pipe_employees(
                     CURSOR(SELECT /*+ PARALLEL(es, 4) */ *
                            FROM employees_staging es))) s
         ON   (e.employee_id = s.employee_id)
      WHEN MATCHED THEN
         UPDATE
         SET    <snip>
      WHEN NOT MATCHED THEN
         INSERT ( <snip> )
         VALUES ( <snip> );

   END merge_employees;

The SQL MERGE from my parallel pipelined function reduces the load time by over 50% to just 21 seconds. So using parallel pipelined functions as a rowsource for set-based SQL operations is clearly a valuable tuning technique for volume data loads.

asynchronous data unloading with parallel pipelined functions

So far, I have demonstrated two types of data loads that have benefited from conversion to a parallel pipelined function. You might also want to exploit the parallel feature of pipelined functions for those times when you need to unload data (even well into the 21st century I have yet to see a corporate in-house ODS/DSS/warehouse that doesn't extract data for transfer to other systems).

a typical data-extract program

Imagine the following scenario. I have a daily extract of all my trading data (held in tickertable) for transfer to a middle-office system (which expects a delimited flat file). To achieve this, I write a simple utility to unload data from a cursor:

/* File on web: parallel_unload_setup.sql */
   PROCEDURE legacy_unload( 
             p_source    IN SYS_REFCURSOR,
             p_filename  IN VARCHAR2,
             p_directory IN VARCHAR2,
             p_limit_size IN PLS_INTEGER DEFAULT unload_pkg.c_default_limit
             ) IS
      TYPE row_aat IS TABLE OF VARCHAR2(32767)
         INDEX BY PLS_INTEGER;
      aa_rows row_aat;
      v_name  VARCHAR2(128) := p_filename || '.txt';
      v_file  UTL_FILE.FILE_TYPE;
   BEGIN
      v_file := UTL_FILE.FOPEN( p_directory, v_name, 'w', c_maxline );
      LOOP
         FETCH p_source BULK COLLECT INTO aa_rows LIMIT p_limit_size;
         EXIT WHEN aa_rows.COUNT = 0;
         FOR i IN 1 .. aa_rows.COUNT LOOP
            UTL_FILE.PUT_LINE(v_file, aa_rows(i));
         END LOOP;
      END LOOP;
      CLOSE p_source;
      UTL_FILE.FCLOSE(v_file);
   END legacy_unload;

I simply loop through the source cursor parameter using an array fetch size of 100 and write each batch of rows to the destination file using UTL_FILE. The source cursor has just one column - the cursor is prepared with the source columns already concatenated/delimited.

In testing, 1 million delimited tickertable rows unloaded to a flat file in just 24 seconds (I ensured that tickertable was fully scanned a few times beforehand to reduce the impact of physical I/O). But tickertable has an average row length of just 25 bytes, and so unloads very quickly. Commercial systems will write significantly more data (in both row length and row counts) and potentially take tens of minutes.

a parallel-enabled pipelined function unloader

If you recognize this scenario from your own systems, you should consider tuning with parallel pipelined functions. If you analyze the legacy example above, all of the data manipulation can be placed within a pipelined function (specifically, there are no DML operations). So how about if I take that cursor fetch logic and UTL_FILE management and put it inside a parallel pipelined function? If I do this, I can exploit Oracle's parallel query to unload the data to multiple files much faster.

Of course, pipelined functions usually return piped data, but in this case my source rows are being written to a file and I don't need them returned to the client. Instead, I will return one row per parallel process with some very basic metadata to describe the session information and number of rows it extracted. My supporting types are as follows:

/* File on web: parallel_unload_setup.sql */
CREATE TYPE unload_ot AS OBJECT
( file_name  VARCHAR2(128)
, no_records NUMBER
, session_id NUMBER );

CREATE TYPE unload_ntt AS TABLE OF unload_ot;

My function implementation is based on the legacy processing with some additional setup required for the metadata being returned.

/* File on web: parallel_unload_setup.sql */
  1     FUNCTION parallel_unload(
  2              p_source     IN SYS_REFCURSOR,
  3              p_filename   IN VARCHAR2,
  4              p_directory  IN VARCHAR2,
  5              p_limit_size IN PLS_INTEGER DEFAULT unload_pkg.c_default_limit
  6              )
  7        RETURN unload_ntt
  8        PIPELINED PARALLEL_ENABLE (PARTITION p_source BY ANY) AS
  9        aa_rows row_aat;
 10        v_sid   NUMBER := SYS_CONTEXT('USERENV','SID');
 11        v_name  VARCHAR2(128) := p_filename || '_' || v_sid || '.txt';
 12        v_file  UTL_FILE.FILE_TYPE;
 13        v_lines PLS_INTEGER;
 14     BEGIN
 15        v_file := UTL_FILE.FOPEN(p_directory, v_name, 'w', c_maxline);
 16        LOOP
 17           FETCH p_source BULK COLLECT INTO aa_rows LIMIT p_limit_size;
 18           EXIT WHEN aa_rows.COUNT = 0;
 19           FOR i IN 1 .. aa_rows.COUNT LOOP
 20              UTL_FILE.PUT_LINE(v_file, aa_rows(i));
 21           END LOOP;
 22        END LOOP;
 23        v_lines := p_source%ROWCOUNT;
 24        CLOSE p_source;
 25        UTL_FILE.FCLOSE(v_file);
 26        PIPE ROW (unload_ot(v_name, v_lines, v_sid));
 27        RETURN;
 28     END parallel_unload;

Note the following about this function:

Line(s) Description
1 and 8 My function is parallel-enabled and will partition the source data by ANY. Therefore, I am able to declare my source cursor based on the system-defined SYS_REFCURSOR type.
10 My return metadata will include the session ID (SID). This is available in the USERENV application context. You can derive the SID from views such as V$MYSTAT in versions prior to Oracle Database 10g.
11 I want to unload in parallel to multiple files so I create a unique filename for each parallel invocation.
15-22 and 24-25 I reuse all of the processing logic from the original legacy implementation.
26 For each invocation of the function, I pipe a single row containing the filename, number of rows extracted, and session identifier.

With minimal effort, I have parallel-enabled my data unloader, using the pipelined function as an asynchronous forking mechanism. Let's see how to invoke this new version below. I've also included my test output from SQL*Plus.

/* File on web: parallel_unload_test.sql */
SELECT *
FROM   TABLE(
          unload_pkg.parallel_unload( 
             p_source => CURSOR(SELECT /*+ PARALLEL(t, 4) */
                                       ticker     || ',' || 
                                       price_type || ',' ||
                                       price      || ',' ||
                                       TO_CHAR(price_date,'YYYYMMDDHH24MISS')
                                FROM   tickertable t),
             p_filename => 'tickertable',
             p_directory => 'DIR' ));

The output is:


FILE_NAME                      NO_RECORDS SESSION_ID
------------------------------ ---------- ----------
tickertable_144.txt                260788        144
tickertable_142.txt                252342        142
tickertable_127.txt                233765        127
tickertable_112.txt                253105        112

4 rows selected.

Elapsed: 00:00:12.21

On my test system, with four parallel processes, I have roughly halved my processing time. Remember that when dealing in small numbers of seconds, as in this example, the cost of parallel startup can have an impact on processing time. For extracts that take minutes or more to complete, your potential savings (in both actual and real terms) might be far greater.

It is easy to improve further on this technique by “tuning” the UTL_FILE calls, using a buffering mechanism. See the PARALLEL_UNLOAD_BUFFERED function in the parallel_unload_setup.sql file on the book’s web site for the implementation. Rather than write each line to file immediately, I instead append it to a large VARCHAR2 buffer (I could alternatively use a collection), and flush it to a file periodically. Reducing the UTL_FILE calls in such a way nearly halved the extract time of my parallel unloader to just under 7 seconds.

performance implications of partitioning and streaming clauses in parallel pipelined functions

All of my parallel pipelined function examples so far have used the ANY partitioning scheme because there have been no dependencies between the rows of source data. As described in Chapter 17, there are several partitioning and streaming options to control how source input data is allocated and ordered in parallel processes. To recap, these are:

The particular method you choose depends on your specific data-processing requirements. For example, if you need to ensure that all orders for a specific customer are processed together, but in date order, you could use HASH partitioning with ORDER streaming. If you need to ensure that all of your trading data is processed in event order, you might use a RANGE/ORDER combination.

relative performance of partitioning and streaming combinations

These options have their own performance characteristics resulting from the sorting they imply. The following table summarizes the time taken to pipe 1 million tickertable rows through a parallel pipelined function (with a DOP of 4) using each of the partitioning and streaming options (to test the performance of these options for yourself, use the parallel_options_*.sql files available on the web site for this book).

Partitioning Option Streaming Option Elapsed Time (s)
ANY-5.37
ANYORDER8.06
ANYCLUSTER9.58
HASH-7.48
HASHORDER7.84
HASHCLUSTER8.10
RANGE-9.84
RANGEORDER10.59
RANGECLUSTER10.90

As you might expect, ANY and HASH partitioning are comparable (although the unordered ANY option is comfortably the quickest), but the RANGE partitioning mechanism is significantly slower. This is probably to be expected because the source data must be ordered before the database can divide it among the slaves. Within the parallel processes themselves, ordering is quicker than clustering for all partitioning options (this is perhaps a surprising result as clustering doesn't need to order the entire set of data). Your mileage might vary, of course.

partitioning with skewed data

A further consideration with partitioning is the division of the workload among the parallel processes. The ANY and HASH options lead to a reasonably uniform spread of data among the parallel processes, regardless of the number of rows in the source. However, depending on your data characteristics, RANGE partitioning might lead to a very uneven allocation, especially if the values in the partitioning column(s) are skewed. If one parallel process receives too large a share of the data, this can negate any benefits of parallel pipelined functions. To test this yourself, use the files named parallel_skew_*.sql available on the book’s web site.

All of my pipelined function calls include a REF CURSOR parameter supplied via the CURSOR(SELECT...) function. As an alternative, it is perfectly legal to prepare a REF CURSOR variable using the OPEN ref cursor FOR... construct and pass this variable in place of the CURSOR(SELECT...) call. If you choose to do this, beware bug 5349930! When you are using parallel-enabled pipelined functions, this bug can cause a parallel process to die unexpectedly with an ORA-01008: not all variables bound exception.

pipelined functions and the cost-based optimizer

The examples in this chapter demonstrate the use of pipelined functions as simple rowsources that generate data for loading and unloading scenarios. At some point, however, you might need to join a pipelined function to another rowsource (such as a table, a view, or the intermediate output of other joins within a SQL execution plan). Rowsource statistics (such as cardinality, data distribution, nulls, etc) are critical to achieving efficient execution plans, but in the case of pipelined functions (or indeed any table function), the cost-based optimizer doesn't have much information to work with.

cardinality heuristics for pipelined table functions

Up to and including Oracle Database 11g Release 1, the CBO applies a heuristic cardinality to pipelined and table functions in SQL statements and this can sometimes lead to inefficient execution plans. The default cardinality appears to be dependent on the value of the DB_BLOCK_SIZE initialization parameter, but on a database with a standard 8Kb block size Oracle uses a heuristic of 8,168 rows. I can demonstrate this quite easily with a pipelined function that pipes a subset of columns from the employees table. Using Autotrace in SQL*Plus to generate an execution plan, I see the following.

/* Files on web: cbo_setup.sql and cbo_test.sql */
SQL> SELECT *
  2  FROM   TABLE(pipe_employees) e;

Execution Plan
----------------------------------------------------------
Plan hash value: 1802204150

--------------------------------------------------------------------
| Id  | Operation                         | Name           | Rows  |
--------------------------------------------------------------------
|   0 | SELECT STATEMENT                  |                |  8168 |
|   1 |  COLLECTION ITERATOR PICKLER FETCH| PIPE_EMPLOYEES |       |
--------------------------------------------------------------------

This pipelined function actually returns 50,000 rows, so if I join this pipelined function to the departments table, I run the risk of getting a suboptimal plan.

/* File on web: cbo_test.sql */
SQL> SELECT *
  2  FROM   departments           d
  3  ,      TABLE(pipe_employees) e
  4  WHERE  d.department_id = e.department_id;

Execution Plan
----------------------------------------------------------
Plan hash value: 4098497386

----------------------------------------------------------------------
| Id  | Operation                           | Name           | Rows  |
----------------------------------------------------------------------
|   0 | SELECT STATEMENT                    |                |  8168 |
|   1 |  MERGE JOIN                         |                |  8168 |
|   2 |   TABLE ACCESS BY INDEX ROWID       | DEPARTMENTS    |    27 |
|   3 |    INDEX FULL SCAN                  | DEPT_ID_PK     |    27 |
|*  4 |   SORT JOIN                         |                |  8168 |
|   5 |    COLLECTION ITERATOR PICKLER FETCH| PIPE_EMPLOYEES |       |
----------------------------------------------------------------------

As predicted, this appears to be a suboptimal plan; it is unlikely that a sort-merge join will be more efficient than a hash join in this scenario. So how do I influence the CBO? For this example I could use simple access hints such as LEADING and USE_HASH to effectively override the CBO’s cost-based decision and secure a hash join between the table and pipelined function. However, for more complex SQL statements, it is quite difficult to provide all the hints necessary to “lock down” an execution plan. It is often far better to provide the CBO with better statistics with which to make its decisions. There are two ways to do this:

I’ll demonstrate both of these methods for my pipe_employees function below.

using optimizer dynamic sampling for pipelined functions

Dynamic sampling is an extremely useful feature that enables the optimizer to take a small statistics sample of one or more objects in a query during the parse phase. You might use dynamic sampling when you haven’t gathered statistics on all of your tables in a query or when you are using transient objects such as global temporary tables. Starting with version 11.1.0.7, the Oracle database is able to use dynamic sampling for table or pipelined functions.

To see what difference this feature can make, I’ll repeat my previous query but include a DYNAMIC_SAMPLING hint for the pipe_employees function.

/* File on web: cbo_test.sql */
SQL> SELECT /*+ DYNAMIC_SAMPLING(e 5) */
  2         *
  3  FROM   departments           d
  4  ,      TABLE(pipe_employees) e
  5  WHERE  d.department_id = e.department_id;

Execution Plan
----------------------------------------------------------
Plan hash value: 815920909

---------------------------------------------------------------------
| Id  | Operation                          | Name           | Rows  |
---------------------------------------------------------------------
|   0 | SELECT STATEMENT                   |                | 50000 |
|*  1 |  HASH JOIN                         |                | 50000 |
|   2 |   TABLE ACCESS FULL                | DEPARTMENTS    |    27 |
|   3 |   COLLECTION ITERATOR PICKLER FETCH| PIPE_EMPLOYEES |       |
---------------------------------------------------------------------

This time, the CBO has correctly computed the 50,000 rows that my function returns and has generated a more suitable plan. Note that I used the word “computed” and not “estimated” because in version 11.1.0.7 and later, the optimizer takes a 100% sample of the table or pipelined function, regardless of the dynamic sampling level being used (this is also the case in Oracle Database 11g Release 2). I used level 5, but I could have used anything between level 2 and level 10 to get exactly the same result. This means, of course, that dynamic sampling can be potentially costly or time-consuming if it is being used for queries involving high-volume or long-running pipelined functions.

providing cardinality statistics to the optimizer

The only information that I can explicitly pass to the CBO for my pipelined function is its cardinality. As is often the case with Oracle, there are several ways to do this:

The CARDINALITY and OPT_ESTIMATE hints are not officially supported by Oracle Corporation. For this reason, I prefer not to use them in production code. Other than SQL profiles (or dynamic sampling, as described earlier), the only officially supported method for supplying pipelined functions’ cardinality estimates to the CBO is to use the optimizer extensibility features introduced in Oracle Database 10g.

extensible optimizer and pipelined function cardinality

Optimizer extensibility is part of Oracle’s Data Cartridge implementation – a set of well-formed interfaces that enable us to extend the database’s built-in functionality with our own code and algorithms (typically stored in object types). For pipelined and table functions, the database provides a dedicated interface specifically for cardinality estimates. In the following simple example for my pipe_employees function, I will associate my pipelined function with a special object type that will tell the CBO about the function’s cardinality. The pipe_employees function specification is as follows:

/* File on web: cbo_setup.sql */
FUNCTION pipe_employees(
         p_cardinality IN INTEGER DEFAULT 1
         ) RETURN employee_ntt PIPELINED

Note the p_cardinality parameter. My pipe_employees body doesn’t use this parameter at all; instead, I am going to use this to tell the CBO the number of rows I expect my function to return. As the Extensible Optimizer needs this to be done via an interface type, I first create my interface object type specification:

/* File on web: cbo_setup.sql */
  1 CREATE TYPE pipelined_stats_ot AS OBJECT (
  2
  3     dummy INTEGER,
  4
  5     STATIC FUNCTION ODCIGetInterfaces (
  6                     p_interfaces OUT SYS.ODCIObjectList
  7                     ) RETURN NUMBER,
  8
  9     STATIC FUNCTION ODCIStatsTableFunction (
 10                     p_function    IN  SYS.ODCIFuncInfo,
 11                     p_stats       OUT SYS.ODCITabFuncStats,
 12                     p_args        IN  SYS.ODCIArgDescList,
 13                     p_cardinality IN INTEGER
 14                     ) RETURN NUMBER
 15  );

Note the following points about this type specification:

Line(s) Description
3 All object types must have at least one attribute, so I’ve included one called “dummy” because it is not needed for this example.
5 and 9 These methods are part of the well-formed interface for the Extensible Optimizer. There are several other methods available, but the two I’ve used are the ones needed to implement a cardinality interface for my pipelined function.
10-12 These ODCIStatsTableFunction parameters are mandatory. The parameter names are flexible, but their positions and datatypes are fixed.
13 All parameters in a pipelined or table function must be replicated in its associated statistics type. In my example, pipe_employees has a single parameter, p_cardinality, which I must also include in my ODCIStatsTableFunction signature.

My cardinality algorithm is implemented in the type body as follows:

/* File on web: cbo_setup.sql */
  1  CREATE TYPE BODY pipelined_stats_ot AS
  2
  3     STATIC FUNCTION ODCIGetInterfaces (
  4                     p_interfaces OUT SYS.ODCIObjectList
  5                     ) RETURN NUMBER IS
  6     BEGIN
  7        p_interfaces := SYS.ODCIObjectList(
  8                           SYS.ODCIObject ('SYS', 'ODCISTATS2')
  9                           );
 10        RETURN ODCIConst.success;
 11     END ODCIGetInterfaces;
 12
 13     STATIC FUNCTION ODCIStatsTableFunction (
 14                     p_function    IN  SYS.ODCIFuncInfo,
 15                     p_stats       OUT SYS.ODCITabFuncStats,
 16                     p_args        IN  SYS.ODCIArgDescList,
 17                     p_cardinality IN INTEGER
 18                     ) RETURN NUMBER IS
 19     BEGIN
 20        p_stats := SYS.ODCITabFuncStats(NULL);
 21        p_stats.num_rows := p_cardinality;
 22        RETURN ODCIConst.success;
 23     END ODCIStatsTableFunction;
 24
 25  END;

This is a very simple interface implementation. The key points to note are:

Line(s) Description
3-11 This mandatory assignment is needed by the Oracle database. No user-defined logic is required here.
20-21 This is my cardinality algorithm. The p_stats OUT parameter is how I tell the CBO the cardinality of my function. Any value that I pass to my pipe_employees’ p_cardinality parameter will be referenced inside my statistics type. During query optimization (i.e., a “hard parse”), the CBO will invoke the ODCIStatsTableFunction method to retrieve the p_stats parameter value and use it in its calculations.

To recap, I now have a pipelined function and a statistics type. All I need to do now is to associate the two objects using the ASSOCIATE STATISTICS SQL command. This association is what enables the “magic” I’ve described above to happen:

/* File on web: cbo_test.sql */
ASSOCIATE STATISTICS WITH FUNCTIONS pipe_employees USING pipelined_stats_ot;

Now I am ready to test. I’ll repeat my previous query but include the number of rows I expect my pipelined function to return (this function pipes 50,000 rows).

/* File on web: cbo_test.sql */
SQL> SELECT *
  2  FROM   departments                  d
  3  ,      TABLE(pipe_employees(50000)) e
  4  WHERE  d.department_id = e.department_id;

Execution Plan
----------------------------------------------------------
Plan hash value: 815920909

---------------------------------------------------------------------
| Id  | Operation                          | Name           | Rows  |
---------------------------------------------------------------------
|   0 | SELECT STATEMENT                   |                | 50000 |
|*  1 |  HASH JOIN                         |                | 50000 |
|   2 |   TABLE ACCESS FULL                | DEPARTMENTS    |    27 |
|   3 |   COLLECTION ITERATOR PICKLER FETCH| PIPE_EMPLOYEES |       |
---------------------------------------------------------------------

This time, my expected cardinality has been picked up and used by the CBO, and I have the execution plan that I was expecting. I haven’t even had to use any hints! In most cases, if the CBO is given accurate inputs, it will make a good decision, as demonstrated in this example .Of course, the example also highlights the “magic” of the Extensible Optimizer. I supplied my expected cardinality as a parameter to the pipe_employees function, and during the optimization phase, the database accessed this parameter via the associated statistics type and used it to set the rowsource cardinality accordingly (using my algorithm). I find this quite impressive.

As a final thought, note that it makes good sense to find a systematic way to derive pipelined function cardinalities. I have demonstrated one method – in fact, I should add a p_cardinality parameter to all my pipelined functions and associate them all with the pipelined_statistics_ot interface type. The algorithms you use in your interface types can be as sophisticated as you require. They might be based on other function parameters (for example, you might return different cardinalities based on particular parameter values). Perhaps you might store the expected cardinalities in a lookup table and have the interface type query this instead. There are many different ways that you can use this feature.

tuning complex data loads with pipelined functions

My stockpivot example transformed each input row into two output rows of the same record structure. All of my other examples piped a single output row of a single record structure. But some transformations or loads are not so simple. It is quite common to load multiple tables from a single staging table--can pipelined functions be useful in such scenarios as well?

The good news is that they can; multi-table loads can also be tuned with pipelined functions. The function itself can pipe as many different record types as you need, and conditional or unconditional multitable inserts can load the corresponding tables with the relevant attributes.

one source, two targets

Consider an example of loading customers and addresses from a single file delivery. Let's imagine that a single customer record has up to three addresses stored in his or her history. This means that as many as four records are generated for each customer. For example:


CUSTOMER_ID LAST_NAME  ADDRESS_ID STREET_ADDRESS                 PRIMARY
----------- ---------- ---------- ------------------------------ -------
       1060 Kelley          60455 7310 Breathing Street          Y
       1060 Kelley         119885 7310 Breathing Street          N
     103317 Anderson        65045 57 Aguadilla Drive             Y
     103317 Anderson        65518 117 North Union Avenue         N
     103317 Anderson        61112 27 South Las Vegas Boulevard   N

I have removed most of the detail, but this example shows that Kelley has two addresses in the system and Anderson has three. My loading scenario is that I need to add a single record per customer to the customers table, and all of the address records need to be inserted into the addresses table.

piping multiple record types from pipelined functions

How can a pipelined function generate a customer record and an address record at the same time? Surprisingly, there are two relatively simple ways to achieve this:

using object-relational features

Let's take a look at the first method as it is the most elegant solution to this requirement. I first need to create four types to describe my data:

I've picked a small number of attributes for demonstration purposes. My types look like this:

/* File on web: multitype_setup.sql */
-- Supertype...
CREATE TYPE customer_ot AS OBJECT
( customer_id NUMBER
) NOT FINAL;

-- Collection of supertype...
CREATE TYPE customer_ntt AS TABLE OF customer_ot;

-- Customer detail subtype...
CREATE TYPE customer_detail_ot UNDER customer_ot
( first_name VARCHAR2(20)
, last_name  VARCHAR2(60)
, birth_date DATE
) FINAL;

-- Address detail subtype...
CREATE TYPE address_detail_ot UNDER customer_ot
( address_id     NUMBER
, primary        VARCHAR2(1)
, street_address VARCHAR2(40)
, postal_code    VARCHAR2(10)
) FINAL;

If you have never worked with object types, I suggest that you review the contents of Chapter 26. Briefly, however, Oracle's support for substitutability means that I can create rows of either customer_detail_ot or address_detail_ot, and use them wherever the customer_ot supertype is expected. So if I create a pipelined function to pipe a collection of the supertype, this means that I can also pipe rows of either of the subtypes. This is but one example of how an object-oriented type hierarchy can offer a simple and elegant solution.

a multitype pipelined function

Let's take a look at the pipelined function body, and then I'll explain the key concepts.

/* File on web: multitype_setup.sql */
  1     FUNCTION customer_transform_multi(
  2              p_source     IN customer_staging_rct,
  3              p_limit_size IN PLS_INTEGER DEFAULT customer_pkg.c_default_limit
  4              )
  5        RETURN customer_ntt
  6        PIPELINED
  7        PARALLEL_ENABLE (PARTITION p_source BY HASH(customer_id))
  8        ORDER p_source BY (customer_id, address_id) IS
  9  
 10        aa_source     customer_staging_aat;
 11        v_customer_id customer_staging.customer_id%TYPE := -1; /* Needs a non-null default */
 12  
 13     BEGIN
 14        LOOP
 15           FETCH p_source BULK COLLECT INTO aa_source LIMIT p_limit_size;
 16           EXIT WHEN aa_source.COUNT = 0;
 17  
 18           FOR i IN 1 .. aa_source.COUNT LOOP
 19  
 20              /* Only pipe the first instance of the customer details... */
 21              IF aa_source(i).customer_id != v_customer_id THEN
 22                 PIPE ROW ( customer_detail_ot( aa_source(i).customer_id,
 23                                                aa_source(i).first_name,
 24                                                aa_source(i).last_name,
 25                                                aa_source(i).birth_date ));
 26              END IF;
 27  
 28              PIPE ROW( address_detail_ot( aa_source(i).customer_id,
 29                                           aa_source(i).address_id,
 30                                           aa_source(i).primary,
 31                                           aa_source(i).street_address,
 32                                           aa_source(i).postal_code ));
 33  
 34              /* Save customer ID for "control break" logic... */
 35              v_customer_id := aa_source(i).customer_id;
 36  
 37           END LOOP;
 38        END LOOP;
 39        CLOSE p_source;
 40        RETURN;
 41     END customer_transform_multi;

This function is parallel-enabled, and it processes the source data in arrays for maximum performance. The main concepts specific to multityping are:

Line(s) Description
5 My function's return is a collection of the customer supertype. This allows me to pipe subtypes instead.
7-8 I have data dependencies so have used hash partitioning with ordered streaming. I need to process each customer's records together, because I will need to pick off the customer attributes from the first record only, and then allow all addresses through.
21-26 If this is the first source record for a particular customer, pipe out a row of CUSTOMER_DETAIL_OT. Only one customer details record will be piped per customer.
28-32 For every source record, pick out the address information and pipe out a row of ADDRESS_DETAIL_OT.

querying a multitype pipelined function

I now have a single function generating rows of two different types and structures. Using SQL*Plus, let's query a few rows from this function.

/* File on web: multitype_query.sql */
SQL> SELECT *
  2  FROM   TABLE(
  3            customer_pkg.customer_transform_multi(
  4               CURSOR( SELECT * FROM customer_staging ) ) ) nt
  5  WHERE  ROWNUM <= 5;

CUSTOMER_ID
-----------
          1
          1
          1
          1
          2

That's a surprise - where's my data? Even though I used SELECT *, I have only the CUSTOMER_ID column in my results. The reason for this is simple: my function is defined to return a collection of the customer_ot supertype, which has only one attribute. So unless I code explicitly for the range of subtypes being returned from my function, the database will not expose any of their attributes. In fact, if I reference any of the subtypes’ attributes using the above query format, the database will raise an ORA-00904: invalid identifier exception.

Fortunately, Oracle supplies two ways to access instances of object types: the VALUE function and the OBJECT_VALUE pseudo-column. Let's see what they do (they are interchangeable):

/* File on web: multitype_query.sql */
SQL> SELECT VALUE(nt) AS object_instance --could use “nt.OBJECT_VALUE” instead
  2  FROM   TABLE(
  3            customer_pkg.customer_transform_multi(
  4               CURSOR( SELECT * FROM customer_staging ) ) ) nt
  5  WHERE  ROWNUM <= 5;

OBJECT_INSTANCE(CUSTOMER_ID)
---------------------------------------------------------------------------
CUSTOMER_DETAIL_OT(1, 'Abigail', 'Kessel', '31/03/1949')
ADDRESS_DETAIL_OT(1, 12135, 'N', '37 North Coshocton Street', '78247')
ADDRESS_DETAIL_OT(1, 12136, 'N', '47 East Sagadahoc Road', '90285')
ADDRESS_DETAIL_OT(1, 12156, 'Y', '7 South 3rd Circle', '30828')
CUSTOMER_DETAIL_OT(2, 'Anne', 'KOCH', '23/09/1949')

This is more promising. I now have the data as it is returned from the pipelined function, so I'm going to do two things with it. First I will determine the type of each record using the IS OF condition; this will be useful to me later on. Second, I will use the TREAT function to downcast each record to its underlying subtype (until I do this, the database thinks that my data is of the supertype and so will not allow me access to any of the attributes). The query now looks something like this:

/* File on web: multitype_query.sql */
SQL> SELECT CASE
  2            WHEN VALUE(nt) IS OF TYPE (customer_detail_ot)
  3            THEN 'C'
  4            ELSE 'A'
  5         END                                    AS record_type
  6  ,      TREAT(VALUE(nt) AS customer_detail_ot) AS cust_rec
  7  ,      TREAT(VALUE(nt) AS address_detail_ot)  AS addr_rec
  8  FROM   TABLE(
  9            customer_pkg.customer_transform_multi(
 10               CURSOR( SELECT * FROM customer_staging ) ) ) nt
 11  WHERE  ROWNUM <= 5;

RECORD_TYPE CUST_REC                       ADDR_REC
----------- ------------------------------ ------------------------------
C           CUSTOMER_DETAIL_OT(1, 'Abigail
            ', 'Kessel', '31/03/1949')

A                                          ADDRESS_DETAIL_OT(1, 12135, 'N
                                           ', '37 North Coshocton Street'
                                           , '78247')

A                                          ADDRESS_DETAIL_OT(1, 12136, 'N
                                           ', '47 East Sagadahoc Road', '
                                           90285')

A                                          ADDRESS_DETAIL_OT(1, 12156, 'Y
                                           ', '7 South 3rd Circle', '3082
                                           8')

C           CUSTOMER_DETAIL_OT(2, 'Anne',
            'KOCH', '23/09/1949')

I now have my data in the correct subtype format, which means that I can access the underlying attributes. I do this by wrapping the previous query in an in-line view and accessing the attributes using dot notation, as follows.

/* File on web: multitype_query.sql */
SELECT ilv.record_type
,      NVL(ilv.cust_rec.customer_id,
           ilv.addr_rec.customer_id) AS customer_id
,      ilv.cust_rec.first_name       AS first_name
,      ilv.cust_rec.last_name        AS last_name
       <snip>
,      ilv.addr_rec.postal_code      AS postal_code
FROM  (
       SELECT CASE...
              <snip>
       FROM   TABLE(
                 customer_pkg.customer_transform_multi(
                     CURSOR( SELECT * FROM customer_staging ) ) ) nt
       ) ilv;

loading multiple tables from a multi-type pipelined function

I've removed some lines from the example above, but you should recognize the pattern. I now have all the elements needed for a multitable insert into my customers and addresses tables. Here's the loading code:

/* File on web: multitype_setup.sql */
      INSERT FIRST
         WHEN record_type = 'C'
         THEN
            INTO customers
            VALUES (customer_id, first_name, last_name, birth_date)
         WHEN record_type = 'A'
         THEN
            INTO addresses
            VALUES (address_id, customer_id, primary, street_address, postal_code)
      SELECT ilv.record_type
      ,      NVL(ilv.cust_rec.customer_id,
                 ilv.addr_rec.customer_id) AS customer_id
      ,      ilv.cust_rec.first_name       AS first_name
      ,      ilv.cust_rec.last_name        AS last_name
      ,      ilv.cust_rec.birth_date       AS birth_date
      ,      ilv.addr_rec.address_id       AS address_id
      ,      ilv.addr_rec.primary          AS primary
      ,      ilv.addr_rec.street_address   AS street_address
      ,      ilv.addr_rec.postal_code      AS postal_code
      FROM (
            SELECT CASE
                      WHEN VALUE(nt) IS OF TYPE (customer_detail_ot)
                      THEN 'C'
                      ELSE 'A'
                   END                                    AS record_type
            ,      TREAT(VALUE(nt) AS customer_detail_ot) AS cust_rec
            ,      TREAT(VALUE(nt) AS address_detail_ot)  AS addr_rec
            FROM   TABLE(
                      customer_pkg.customer_transform_multi(
                         CURSOR( SELECT * FROM customer_staging ))) nt
           ) ilv;

With this INSERT FIRST statement, I have a complex load that uses a range of object-relational features in a way that enables me to retain set-based principles. This approach might also work for you.

an alternative multitype method

The alternative to this method is to create a single “wide” object record and pipe a single row for each set of customer addresses. I'll show you the type definition to clarify what I mean by this, but see the multitype_setup.sql files on the book’s web site for the full example).

/* File on web: multitype_setup.sql */
CREATE TYPE customer_address_ot AS OBJECT
( customer_id          NUMBER
, first_name           VARCHAR2(20)
, last_name            VARCHAR2(60)
, birth_date           DATE
, addr1_address_id     NUMBER
, addr1_primary        VARCHAR2(1)
, addr1_street_address VARCHAR2(40)
, addr1_postal_code    VARCHAR2(10)
, addr2_address_id     NUMBER
, addr2_primary        VARCHAR2(1)
, addr2_street_address VARCHAR2(40)
, addr2_postal_code    VARCHAR2(10)
, addr3_address_id     NUMBER
, addr3_primary        VARCHAR2(1)
, addr3_street_address VARCHAR2(40)
, addr3_postal_code    VARCHAR2(10)
, CONSTRUCTOR FUNCTION customer_address_ot 
     RETURN SELF AS RESULT
);

You can see that each of the three address instances per customer is “denormalized” into its respective attributes. Each row piped from the function is pivoted into four rows with a conditional INSERT ALL statement. The INSERT syntax is simpler and, for this particular example, quicker than the substitutable type method. The technique you choose will depend on your particular circumstances; note, however, that you may find that as the number of attributes increases, the performance of the denormalized method may degrade. Having said that, I’ve used this method successfully to tune a load that inserts up to nine records into four tables for every distinct financial transaction.

You can expect to experience a degradation in the performance of a pipelined function implementation when using wide rows or rows with many columns (pertinent to the denormalized multirecord example described above). For example, I tested a 50,000-row serial pipelined bulk load against row-by-row inserts using multiple columns of 10 bytes each. In Oracle9i Database, the row-based solution became faster than the pipelined solution at just 50 columns. Fortunately, this increases to somewhere between 100 and 150 columns in all major versions of Oracle Database 10g and Oracle Database 11g.

pipelined function summary

In this discussion of pipelined functions, I‘ve shown several scenarios where such functions (serial or parallel) can help you improve the performance of your data loads and extracts. As a tuning tool, some of these techniques should prove to be useful. However, I do not recommend that you convert your entire code base to pipelined functions! They are a specific tool that is likely to apply to only a subset of your data-processing tasks. If you need to implement complex transformations that are too unwieldy when represented in SQL (typically as analytic functions, CASE expressions, subqueries, or even the frightening MODEL clause), then encapsulating them in pipelined functions, as I've shown in this section, may provide substantial performance benefits.

acknowledgements

Thanks to Steven Feuerstein for inviting me to contribute to Oracle PL/SQL Programming and to O'Reilly Media, Inc. for allowing me to re-print my material here for oracle-developer.net readers.

source code

The source code for the examples in this article can be downloaded from the O'Reilly website.

Adrian Billington, January 2010

Back to Top