`
mikixiyou
  • 浏览: 1105479 次
  • 性别: Icon_minigender_1
  • 来自: 南京
博客专栏
C3c8d188-c0ab-3396-821d-b68331e21226
Oracle管理和开发
浏览量:355129
社区版块
存档分类
最新评论

Oracle的pipelined函数实现高性能大数据处理

阅读更多

在plsql开发中,会涉及到一些大数据量表的数据处理,如将某记录数超亿的表的记录经过处理转换插入到另外一张或几张表。

常规的操作方法固然可以实现,但时间、磁盘IO、redo日志等等都非常大。Oracle 提供了一种高级函数,可以将这种数据处理的性能提升到极限。这种函数称为管道函数。

在实际项目中,管道函数会和表函数、数据流函数(即表函数和CURSOR结合)、数据集合、并行度一起使用,达到大数据处理的性能顶峰。

 

(miki西游 @mikixiyou 原文链接: http://mikixiyou.iteye.com/blog/1673672 )

 

下面是一个例子,将表t_ss_normal的记录插入到表t_target中,插入过程中有部分转换操作。

我分成四个方法来实现这个数据处理操作。

 

第一个方法,也是最常规的方法,代码如下:

 

create table T_SS_NORMAL
(
  owner          VARCHAR2(30),
  object_name    VARCHAR2(128),
  subobject_name VARCHAR2(30),
  object_id      NUMBER,
  data_object_id NUMBER,
  object_type    VARCHAR2(19),
  created        DATE,
  last_ddl_time  DATE,
  timestamp      VARCHAR2(19),
  status         VARCHAR2(7),
  temporary      VARCHAR2(1),
  generated      VARCHAR2(1),
  secondary      VARCHAR2(1)
);
/

create table T_TARGET
(
  owner       VARCHAR2(30),
  object_name VARCHAR2(128),
  comm        VARCHAR2(10)
);
 

这是源表和目标表的表结构。现在源表有200W条,其数据来自dba_objects视图。

 

create or replace package pkg_test is
  procedure load_target_normal;
end pkg_test;

create or replace package body pkg_test is
  procedure load_target_normal is
  begin  
    insert into t_target (owner, object_name, comm)
      select owner, object_name, 'xxx' from t_ss_normal;  
    commit;  
  end;
begin
  null;
end pkg_test; 
 

 

一个insert into select语句搞定这个数据处理,简单。

 

第二方法,采用管道函数实现这个数据处理。

 

create type obj_target as object(
owner VARCHAR2(30), object_name VARCHAR2(128), comm varchar2(10)
);
/
create or replace type typ_array_target as table of obj_target;
/

create or replace package pkg_test is

  function pipe_target(p_source_data in sys_refcursor) return typ_array_target
    pipelined;

  procedure load_target;
end pkg_test;

 

首先创建两个自定义的类型。obj_target的定义和t_target的表结构一致,用于存储每一条目标表记录。typ_array_target用于管道函数的返回值。

接着定义一个管道函数。

普通函数的结尾加一个pipelined关键字,就是管道函数。这个函数的返回参数类型为集合,这是为了使其能作为表函数使用。表函数就是在from子句中以table(v_resultset)调用的,v_resultset就是一个集合类型的参数。

最后定义一个调用存储过程。

 

在包体中定义该管道函数和调用存储过程。管道函数pipe_target的传入参数一个sys_refcursor类型。这是一个游标,可以理解为使用select * from table才能得到的结果集。

你也可以不用这个传入的游标,取而代之,在函数中定义一个游标,也一样使用。

 

  function pipe_target(p_source_data in sys_refcursor) return typ_array_target
    pipelined is
    r_target_data obj_target := obj_target(null, null, null);
    r_source_data t_ss%rowtype;

 begin
  
    loop
      fetch p_source_data
        into r_source_data;
      exit when p_source_data%notfound;    
      
      r_target_data.owner       := r_source_data.owner;
      r_target_data.object_name := r_source_data.object_name;
      r_target_data.comm        := 'xxx';    
      pipe row(r_target_data);
    
    end loop;
  
    close p_source_data;
    return;
  
  end;

  procedure load_target is
  begin  
    insert into t_target
      (owner, object_name, comm)
      select owner, object_name, comm
        from table(pipe_target(cursor(select * from t_ss_normal)));  
    commit;  
  end;
 
 

关键字 pipe row 的作用是将obj_target插入到typ_array_target类型的数组中,管道函数自动返回这些数据。

 

因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。这种转换称为上下文切换。

 

 

  function pipe_target_array(p_source_data in sys_refcursor,
                             p_limit_size  in pls_integer default c_default_limit)
    return typ_array_target
    pipelined is  
    r_target_data obj_target := obj_target(null, null, null); 
     
    type typ_source_data is table of t_ss%rowtype index by pls_integer;
    aa_source_data typ_source_data;
  
  begin
  
    loop
      fetch p_source_data bulk collect
        into aa_source_data;
      exit when aa_source_data.count = 0;
    
      for i in 1 .. aa_source_data.count loop
      
        r_target_data.owner       := aa_source_data(i).owner;
        r_target_data.object_name := aa_source_data(i).object_name;
        r_target_data.comm        := 'xxx';
      
        pipe row(r_target_data);
      
      end loop;
    
    end loop;
  
    close p_source_data;
    return;
  
  end;


  procedure load_target_array is
  begin
    insert into t_target
      (owner, object_name, comm)
      select owner, object_name, comm
        from table(pipe_target_array(cursor (select * from t_ss_normal),
                                     100));  
    commit;  
  end;

 

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

 

 

  function pipe_target_parallel(p_source_data in sys_refcursor,
                                p_limit_size  in pls_integer default c_default_limit)
    return typ_array_target
    pipelined
    parallel_enable(partition p_source_data by any) is
  
    r_target_data obj_target := obj_target(null, null, null);
  
    type typ_source_data is table of t_ss%rowtype index by pls_integer;  
    aa_source_data typ_source_data;
  
  begin  
    loop
      fetch p_source_data bulk collect
        into aa_source_data;
      exit when aa_source_data.count = 0;    
      for i in 1 .. aa_source_data.count loop      
        r_target_data.owner       := aa_source_data(i).owner;
        r_target_data.object_name := aa_source_data(i).object_name;
        r_target_data.comm        := 'xxx';      
        pipe row(r_target_data);      
      end loop;    
    end loop;  
    close p_source_data;
    return;
  
  end;
  

  procedure load_target_parallel is
  begin
    execute immediate 'alter session enable parallel dml';  
    insert /*+parallel(t,4)*/
    into t_target t
      (owner, object_name, comm)
      select owner, object_name, comm
        from table(pipe_target_array(cursor (select /*+parallel(s,4)*/
                                       *
                                        from t_ss_normal s),
                                     100));  
    commit;
  end;
 

 

 

在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。

 

 

 

 

在此数据处理操作中,涉及到集合(collection)、表函数、管道函数、流函数、bulk collect、游标等知识点。

PLSQL集合类型的使用总结  

使用bulk collect insert实现大数据快速迁移

Oracle的pipelined函数提升数据输出性能

以前写的这三篇文章,都详细介绍了这些知识点,更多可以去参考oracle官方文档。


 

 

 

分享到:
评论
2 楼 dahai639 2014-05-15  
挺好的,支持一下
1 楼 datawarehouse 2012-09-20  
好多不明白的地方啊

相关推荐

    oracle管道函数用法

    ### Oracle管道函数详解 #### 一、概述 ...通过以上介绍,我们可以看出Oracle管道函数为Oracle数据库提供了一种灵活高效的方式来处理大量数据并实时返回处理结果,对于某些特定的应用场景非常有用。

    利用PB实现DBF文件到Oracle基表的数据转换.pdf

    它提供高性能、高可用性和安全性,支持复杂的数据处理和事务处理需求。 4. 数据转换:在文中,数据转换是指将DBF文件中的数据迁移到Oracle数据库的过程。这个过程通常涉及到数据清洗、格式匹配、错误处理等步骤,...

    Streamsets Data Collector管道详细配置文档(组件、事件、函数表达式、记录头属性)

    ### Streamsets Data Collector管道详细配置文档 #### 一、概览 Streamsets Data Collector (SDC) 是一种高性能...此外,合理利用组件、事件以及函数表达式等功能,可以显著提高数据处理效率,满足复杂场景下的需求。

    orai18n.zip

    5. **多语言支持**: Oracle数据库可以存储多种语言的文本,支持Unicode标准,从而能够处理世界上大部分语言的数据。 6. **区域设置(Locale)**: 区域设置定义了一组文化和地理相关的特性,如日期和时间的格式、...

    Netty5.0架构剖析和源码解读

    3. **Grizzly**:Oracle JavaFX项目的一部分,用于构建高性能的服务器端应用。 这些框架通过封装底层复杂的网络通信细节,提供了简单易用的API,使得开发者能够更加专注于业务逻辑的开发。 #### 2. NIO入门 #####...

    数据库经常出的面试题

    这类函数通常用于需要动态生成数据源的场景,比如在数据管道(Pipeline)和ETL过程中非常有用。 - **应用场景**:例如,在ETL过程中,可能需要根据不同的条件动态生成不同的数据集,这时`TABLE Function`就可以派...

    北京中科信软PowerBuilder培训01基础

    9. 数据处理估计时间,合理规划开发周期和资源。 10. 数据库连接新特性,掌握JDBC、JDB数据库接口,以及对Unicode的支持。 11. 支持的图像格式和HTML转化,实现图像处理和Web页面生成。 五、PB仓库表的用途 1. PB...

    pb7参考手册1_3

    - **Pipeline对象**:用于创建数据处理管道。 - **ProfileCall对象**:用于跟踪函数调用。 - **ProfileClass对象**:用于跟踪类的性能。 - **ProfileLine对象**:用于跟踪代码行的性能。 - **ProfileRoutine对象**:...

    JDK 7确定B计划 部分特性延迟到JDK 8

    - 引入了XRender Pipeline for Java 2D,提高了2D图形处理的性能。 - Nimbus Look and Feel 的引入,为Swing组件提供了一套全新的外观和感觉,使得基于Swing的应用程序更加美观。 6. **安全性与加密技术**:增加...

Global site tag (gtag.js) - Google Analytics