`

hadoop pig入门总结

阅读更多

 

在这里贴一个pig源码的分析,做pig很长时间没做笔记,不包含任何细节,以后有机会再说吧

http://blackproof.iteye.com/blog/1769219

 

hadoop pig入门总结

  • pig简介
  • pig数据类型
  • pig latin语法
  • pig udf自定义
  • pig derived衍生
  • 推荐书籍 programming pig
  • 推荐网站 http://pig.apache.org/docs/r0.10.0/basic.html

pig简介

pig是hadoop上层的衍生架构,与hive类似。对比hive(hive类似sql,是一种声明式的语言),pig是一种过程语言,类似于存储过程一步一步得进行数据转化。

 

pig数据类型

  • double > float > long > int > bytearray
  • tuple|bag|map|chararray > bytearray

double float long int chararray bytearray都相当于pig的基本类型

tuple相当于数组 ,但是可以类型不一,举例('dirkzhang','dallas',41)

Bag相当于tuple的一个集合,举例{('dirk',41),('kedde',2),('terre',31)},在group的时候会生成bag

Map相当于哈希表,key为chararray,value为任意类型,例如['name'#dirk,'age'#36,'num'#41

nulls 表示的不只是数据不存在,他更表示数据是unkown

 

pig latin语法

 

1:load

LOAD 'data' [USING function] [AS schema];

       例如:

      load = LOAD 'sql://{SELECT MONTH_ID,DAY_ID,PROV_ID FROM zb_d_bidwmb05009_010}'    USING com.xxxx.dataplatform.bbdp.geniuspig.VerticaLoader('oracle','192.168.6.5','dev','1522','vbap','vbap','1') AS (MONTH_ID:chararray,DAY_ID:chararray,PROV_ID:chararray);

 

Table = load ‘url’ as (id,name…..);    //table和load之间除了等号外 还必须有个空格 不然会出错,url一定要带引号,且只能是单引号。

 

2:filter

       alias = FILTER alias BY expression;

       Table = filter Table1 by + A; //A可以是 id > 10;not name matches ‘’,is not null 等,可以用and  和or连接各条件

       例如:

       filter = filter load20 by ( MONTH_ID == '1210' and  DAY_ID == '18' and  PROV_ID == '010' );

       

 

3:group

alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];

          pig的分组,不仅是数据上的分组,在数据的schema形式上也进行分组为groupcolumn:bag

         Table3 = group Table2 by id;也可以Table3 = group Table2 by (id,name);括号必须加

         可以使用ALL实现对所有字段的分组

 

4:foreach

alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];

 

alias = FOREACH nested_alias {

alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]

GENERATE expression [AS schema] [expression [AS schema]….]

};

 

一般跟generate一块使用

         Table = foreach Table generate (id,name);括号可加可不加。

avg = foreach Table generate group, AVG(age);  MAX ,MIN..

 

在进行数据过滤时,建议尽早使用foreach generate将多余的数据过滤掉,减少数据交换

 

5:join

Inner  join Syntax

alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];

Outer join Syntax

alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];

 

     join/left join / right join

daily = load 'A' as (id,name, sex);

divs  = load 'B' as (id,name, sex);

 

join

jnd   = join daily by (id, name), divs by (id, name);       

 

left join

jnd   = join daily by (id, name) left outer, divs by (id, name);

也可以同时多个变量,但只用于inner join

A = load 'input1' as (x, y);

B = load 'input2' as (u, v);

C = load 'input3' as (e, f);

alpha = join A by x, B by u, C by e;

 

6: union

alias = UNION [ONSCHEMA] alias, alias [, alias …];

 

union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与 变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量 没有模式。

 

A = load 'input1' as (x:int, y:float);

B = load 'input2' as (x:int, y:float);

C = union A, B;

describe C;

 

C: {x: int,y: float}

 

A = load 'input1' as (x:double, y:float);

B = load 'input2' as (x:int, y:double);

C = union A, B;

describe C;

C: {x: double,y: double}

 

A = load 'input1' as (x:int, y:float);

B = load 'input2' as (x:int, y:chararray);

C = union A, B;

describe C;

Schema for C unknown.

 

注意:在pig 1.0中 执行不了最后一种union。

 

如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字

A = load 'input1' as (w: chararray, x:int, y:float);

B = load 'input2' as (x:int, y:double, z:chararray);

C = union onschema A, B;

describe C;

C: {w: chararray,x: int,y: double,z: chararray}

 

join和union之后alias的别名会变

 

7:Dump

     dump alias

用于在屏幕上显示数据。

 

8:Order by

alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n];

         A = order Table by id desc;

 

9:distinct

         A = distinct alias;

 

10:limit

         A = limit alias 10;

 

11:sample

SAMPLE alias size;

 

随机抽取指定比例(0到1)的数据。

some = sample divs 0.1;

 

13:cross

alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];

 

将多个数据集中的数据按照字段名进行同值组合,形成笛卡尔积。

--cross.pig

daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,

close:float, volume:int, adj_close:float);

divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);

tonsodata = cross daily, divs parallel 10;

 

 

15:split

Syntax

SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE];

 

A = LOAD 'data' AS (f1:int,f2:int,f3:int);

DUMP A;

(1,2,3)

(4,5,6)

(7,8,9)

SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6);

 

DUMP X;

(1,2,3)

(4,5,6)

 

DUMP Y;

(4,5,6)

 

DUMP Z;

(1,2,3)

(7,8,9)

 

16:store

         Store  … into … Using…

 

 

pig在别名维护上:

1、join

如e = join d by name,b by name;

    g = foreach e generate $0 as one:chararray, $1 as two:int, $2 as      three:chararray,$3 asfour:int;

    他生成的schemal:

 

        e: {d::name: chararray,d::position: int,b::name: chararray,b::age: int}

 

g: {one: chararray,two: int,three: chararray,four: int}

2、group

   B = GROUP A BY age;

 

----------------------------------------------------------------------
| B     | group: int | A: bag({name: chararray,age: int,gpa: float}) |
----------------------------------------------------------------------
|       | 18         | {(John, 18, 4.0), (Joe, 18, 3.8)}             |
|       | 20         | {(Bill, 20, 3.9)}                             |
----------------------------------------------------------------------

 (18,{(John,18,4.0F),(Joe,18,3.8F)})

 

 

pig udf自定义

pig支持嵌入user defined function,一个简单的udf 继承于evalFunc,通常用在filter,foreach中

Java代码 
  1. public class MyUDF extends EvalFunc<String> {  
  2.   
  3.     @Override  
  4.     public String exec(Tuple input) throws IOException {  
  5.         if(input == null || input.size() ==0)  
  6.             return null;  
  7.         try {  
  8.             String val = (String) input.get(0);  
  9.             return new StringBuffer(val).append(" pig").toString();  
  10.         } catch (Exception e) {  
  11.             throw new IOException(e.getMessage());  
  12.         }  
  13.     }  
  14.   
  15. }  

 

pig支持udf in loader and store

udf loader 需要继承于LoadFunc

udf storer 需要继承于StoreFunc

这类似于hadoop中写inputformat和outputformat

其中vertica就是写了一个DB版本的

 

这里贴一个简单的loader的例子:

Java代码 
  1. public class MyLoader extends LoadFunc{  
  2.   
  3.     protected RecordReader recordReader = null;  
  4.       
  5.     private PreparedStatement ps;  
  6.     private Connection conn;  
  7.     private final String jdbcURL;  
  8.     private final String user;  
  9.     private final String pwd;  
  10.     private final String querySql;  
  11.     private ResultSet rs;  
  12.       
  13.     public MyLoader(String driver,String jdbcURL,String user,String pwd,String querySql){  
  14.         try {  
  15.             Class.forName(driver);  
  16.         } catch (Exception e) {  
  17.             // TODO: handle exception  
  18.         }  
  19.         this.jdbcURL = jdbcURL;  
  20.         this.user = user;  
  21.         this.pwd = pwd;  
  22.         this.querySql = querySql;  
  23.     }  
  24.       
  25.     @Override  
  26.     public InputFormat getInputFormat() throws IOException {  
  27.         return new PigTextInputFormat();  
  28.     }  
  29.   
  30.     @Override  
  31.     public Tuple getNext() throws IOException {  
  32.         // TODO 重要的读取过程  
  33.         Text val = null;  
  34.         boolean next = false;  
  35.         try {  
  36.             next = rs.next();  
  37.         } catch (Exception e) {  
  38.             // TODO: handle exception  
  39.         }  
  40.         if(!next)  
  41.             return null;  
  42.         ResultSetMetaData rsmd;  
  43.         try {  
  44. //          rsmd = result  
  45.         } catch (Exception e) {  
  46.             // TODO: handle exception  
  47.         }  
  48.           
  49.         return null;  
  50.     }  
  51.   
  52.     @Override  
  53.     public void prepareToRead(RecordReader arg0, PigSplit arg1)  
  54.             throws IOException {  
  55.         this.recordReader = arg0;  
  56.     }  
  57.   
  58.     @Override  
  59.     public void setLocation(String arg0, Job arg1) throws IOException {  
  60.         //no idea  
  61.     }  
  62.       
  63.     public ResourceSchema getSchema(String location,Job job) throws IOException{  
  64.         Configuration conf = job.getConfiguration();  
  65.         Schema schema = new Schema();  
  66.         try {  
  67.             //TODO:reader from database table  
  68. //          Connection conn = DriverManager.getConnection(this.jdbcURL, this.user, this.pwd);  
  69.             FieldSchema fieldName = new FieldSchema("name", DataType.CHARARRAY);  
  70.             FieldSchema fieldPosition = new FieldSchema("position", DataType.INTEGER);  
  71.             schema.add(fieldName);  
  72.             schema.add(fieldPosition);  
  73.         } catch (Exception e) {  
  74.             //TODO log exception  
  75.         }  
  76.           
  77.         return null;  
  78.     }  
  79.       
  80.     public void prepareToRead(){  
  81.           
  82.     }  
  83.   
  84. }  

 其中getNext方法就是如何处理reader读取出的数据

        getSchema可以固定读取数据的schema

        setLocation可以处理输入的数据源

        prepareToRead是读取数据之前,可以在此做标识,等等

        

 

pig 衍生

1.penny:

1. Penny的描述

Penny是pig的贡献项目,是pig的调试和监控工具,而且支持根据API自定义penny的监视器和协作器,已实现不同的功能;

2. Penny的总架构

Penny将监视器插入到pig的工作操作中,主要用于监视pig数据流的变化,监视器可以调用协作器,完成各种功能。

3. Penny的总类图关系

ParsePigScript负责根据用户监视器生成新计划newPlan,在ToolsPigServer中根据以前的脚本执行新计划。在执行新计划时,当监视器监视对象数据发生变化,出发监视器,运行自定义的业务,也可以将数据流变化传回协作器里处理,总类图如下: 

4. Penny的使用

Penny的使用需要自定义两个类,一个类继承于监视器基类MonitorAgent,另一个继承于协作器基类Coordinator。然后根据上边类图,就可以使用PennyServer和ParsePigScript进行监控和调试

 5.在pig中就可以找到penny这个贡献的源码

 

Vertica:

   vertica是pig loader和storer的udf

   附件里是vertica,来自github,和vertica的介绍使用文档

   贴一篇将vertica的帖子 http://blackproof.iteye.com/blog/1791995

 

  推荐书籍

    programming pig

 

  推荐网址

   http://pig.apache.org/docs/r0.10.0/basic.html 官网

 

  pig pen开发工具,这个我现在玩得还不熟,就不介绍了,有兴趣的可以去搜搜玩玩

 

我在工作中pig的使用,主要是数据的ETL,所以比较适合。在选择pig hive还是其他非hadoop架构,如redis,这还是一个需要继续尝试探索的问题。

2
0
分享到:
评论
5 楼 zenoh 2013-04-19  
blackproof 写道
zenoh 写道
我的数据如下面的样子:
格式: ip , num,json
其中json有里的data有多条数据。
要求: 把json的data里面的每一天都分解出来,形成这样的格式:ip,num,data1

58.115.228.162`35998`{"data":[{"origin":"page","time":"2013041112","module":"sdk_ad"},{"origin":"download","time":"2013041182","module":"sdk_ad"},
{"origin":"download","time":"2013041182","module":"buy"}]}


我了解了下pig0.10版本的JsonLoader  和  elephant-bird的JsonStringToMap 地址:https://github.com/kevinweil/elephant-bird/blob/273_fix_changes_version/pig/src/test/java/com/twitter/elephantbird/pig/piggybank/TestJsonStringToMap.java

都满足不了我的需求,也可能是我理解不够,我该怎么处理这个问题呢?


满足不了就自己写udf loader,基本上只用要把你的逻辑,就是读json字符串的过程,加到getNext中就哦了



那样的话我需要一个能够返回多条记录,或者一个包,然后再遍历包吗?  刚用上pig,边学边用啊,很多不会
4 楼 blackproof 2013-04-18  
zenoh 写道
我的数据如下面的样子:
格式: ip , num,json
其中json有里的data有多条数据。
要求: 把json的data里面的每一天都分解出来,形成这样的格式:ip,num,data1

58.115.228.162`35998`{"data":[{"origin":"page","time":"2013041112","module":"sdk_ad"},{"origin":"download","time":"2013041182","module":"sdk_ad"},
{"origin":"download","time":"2013041182","module":"buy"}]}


我了解了下pig0.10版本的JsonLoader  和  elephant-bird的JsonStringToMap 地址:https://github.com/kevinweil/elephant-bird/blob/273_fix_changes_version/pig/src/test/java/com/twitter/elephantbird/pig/piggybank/TestJsonStringToMap.java

都满足不了我的需求,也可能是我理解不够,我该怎么处理这个问题呢?


满足不了就自己写udf loader,基本上只用要把你的逻辑,就是读json字符串的过程,加到getNext中就哦了
3 楼 zenoh 2013-04-18  
我的数据如下面的样子:
格式: ip , num,json
其中json有里的data有多条数据。
要求: 把json的data里面的每一天都分解出来,形成这样的格式:ip,num,data1

58.115.228.162`35998`{"data":[{"origin":"page","time":"2013041112","module":"sdk_ad"},{"origin":"download","time":"2013041182","module":"sdk_ad"},
{"origin":"download","time":"2013041182","module":"buy"}]}


我了解了下pig0.10版本的JsonLoader  和  elephant-bird的JsonStringToMap 地址:https://github.com/kevinweil/elephant-bird/blob/273_fix_changes_version/pig/src/test/java/com/twitter/elephantbird/pig/piggybank/TestJsonStringToMap.java

都满足不了我的需求,也可能是我理解不够,我该怎么处理这个问题呢?
2 楼 blackproof 2013-04-17  
pig中每个关系都是个tuple,动态列我一般用map存储在tuple中


zenoh 写道
你好,请问,对于复杂的日志数据使用pig怎么来处理?比如这样的数据   : 字段数据1,字段数据2,json数据 。 其中json数据的字段个数不固定

1 楼 zenoh 2013-04-17  
你好,请问,对于复杂的日志数据使用pig怎么来处理?比如这样的数据   : 字段数据1,字段数据2,json数据 。 其中json数据的字段个数不固定

相关推荐

    hadoop从入门到精通课件pdf

    《Hadoop从入门到精通》课程的PDF课件是一份全面了解和掌握Hadoop技术体系的宝贵资源。这个课程涵盖了从Hadoop的基础概念到高级应用的方方面面,旨在帮助学习者逐步提升对Hadoop的理解和实战能力。以下是根据提供的...

    Hadoop快速入门介绍文档

    ### Hadoop快速入门介绍 #### 一、Hadoop简介 Hadoop是一款开源软件框架,用于分布式存储和处理大型数据集。它能够在廉价的商用硬件上运行,并且具有高可靠性和可扩展性。Hadoop的核心组件包括HDFS(Hadoop ...

    Hadoop的xmind的入门笔记

    3. **Hadoop day03.xmind**:可能涉及Hadoop生态系统中的其他组件,如HBase(分布式数据库)、Hive(数据仓库工具)和Pig(数据分析平台),以及它们与Hadoop的交互方式。 4. **Hadoop day04.xmind**:可能深入到...

    hadoop 第三版-权威指南-从入门到精通-中文pdf版本

    hadoop 第三版-权威指南-从入门到精通-中文pdf版本。介绍hadoop分布式文件系统,MapReduce的工作原理,并手把手教你如何构建hadoop集群,同时附带介绍了pig,hive,hbase,zookeeper,sqoop等hadoop家族的开源软件。

    Hadoop开发者入门-带书签文字版

    《Hadoop开发者入门》是一本面向初学者的指南,旨在帮助读者快速掌握Hadoop的核心概念和技术。这本书的特色是带有完整的目录书签,方便查阅,且内容可复制,非常适合学习和参考。Hadoop是一个开源的大数据处理框架,...

    Hadoop入门教程

    Hadoop生态系统包括许多相关的开源项目,如Hive(数据仓库工具)、Pig(数据分析平台)、HBase(分布式数据库)、Zookeeper(协调服务)和Spark(高速计算引擎)。这些工具与Hadoop配合,可以构建完整的大数据解决...

    Hadoop入门程序java源码

    在进入Hadoop的世界之前,首先需要理解Hadoop是什么。...在实际项目中,你还可以根据需求扩展这个基础模板,例如引入更复杂的Mapper和Reducer逻辑,或者使用其他Hadoop生态中的组件,如Pig、Hive、Spark等。

    Hadoop入门到精通

    "Hadoop入门到精通"的学习资料旨在帮助初学者掌握这一强大的框架,并逐步晋升为专家。以下是对Hadoop及其相关概念的详细解读。 一、Hadoop概述 Hadoop是由Apache基金会开发的一个开源框架,主要用于处理和存储大...

    HADOOP快速入门及搭建集群环境

    HADOOP可以与Hive、Pig、Spark等技术集成,形成大数据处理的整体解决方案。 1.4 国内外HADOOP应用案例介绍 HADOOP在国内外有广泛的应用,包括搜索引擎、社交媒体、电商平台等。例如,Facebook、Twitter、LinkedIn...

    hadoop 入门

    【Hadoop 入门】 Hadoop 是一个由Apache基金会开发的开源分布式计算框架,它以其高效、可扩展和容错性著称,是大数据处理领域的重要工具。本篇将从Hadoop的基本流程、应用开发以及集群配置和使用技巧三个方面进行...

    Hadoop开发者入门专刊

    6. **Hadoop生态**: Hadoop生态系统包括Pig(数据流处理)、Hive(数据仓库工具)、Spark(快速通用计算引擎)、HBase(NoSQL数据库)等,它们共同构建了一个完整的数据处理平台。 7. **数据分发与复制策略**: HDFS...

    Hadoop开发者入门专刊.zip

    《Hadoop开发者入门专刊》是一本专门为初学者设计的指南,旨在帮助读者快速掌握Hadoop生态系统的核心概念和技术。Hadoop是Apache软件基金会开发的一个开源框架,主要用于处理和存储大量数据,尤其适合大数据分析和...

    Hadoop全套入门资源资料整理PDF

    5. **Hadoop生态组件**:包括HBase(分布式数据库)、Hive(数据仓库工具)、Pig(数据分析工具)、Spark(快速通用的大数据处理框架)等,它们是如何与Hadoop协同工作的,以及各自的优缺点和应用场景。 6. **实战...

    Hadoop_入门实践

    Hadoop生态包括许多其他项目,如Hive(SQL-like查询工具)、Pig(数据分析平台)、HBase(NoSQL数据库)、Spark(快速计算引擎)等,它们共同构成了强大的大数据处理平台。 八、优化与性能调优 优化Hadoop涉及到...

    Hadoop 快速入门及常见问题

    5. **与其他技术集成**:如何将Hadoop与Hive、Pig、Spark等大数据处理工具结合使用。 6. **Hadoop扩展**:Hadoop生态系统的其他项目,如HBase(实时数据存储)、Hue(Web界面交互)等。 总之,Hadoop是一个复杂但...

    Hadoop入门实战手册 中文版)

    Hadoop生态系统包含了众多工具和库,如Hive(基于SQL的查询工具)、Pig(数据分析平台)、HBase(NoSQL数据库)、Spark(快速数据处理框架)等。了解这些工具如何与Hadoop配合使用,可以极大地扩展Hadoop的功能。 ...

    Hadoop 大数据入门必备资料

    以上内容仅为对Hadoop大数据入门必备知识的简要概述,每个知识点深入讲解都包含大量的细节和技术要点,需要系统学习和实践操作。对于初学者来说,结合Hadoop官方文档、相关书籍以及在线资源进行学习,同时在实际的...

    Hadoop入门手册

    2. **Hadoop生态系统**:除了核心的Hadoop组件,还有许多相关的项目和工具,如Hive(数据仓库工具)、Pig(数据分析平台)、HBase(NoSQL数据库)、Zookeeper(分布式协调服务)等,它们共同构建了丰富的Hadoop生态...

    大数据云计算技术系列 Hadoop之Hbase从入门到精通(共243页).pdf

    《大数据云计算技术系列:Hadoop之Hbase从入门到精通》 HBase,全称Hadoop Database,是一款基于Hadoop生态系统的分布式列式存储系统,旨在处理海量结构化数据。它借鉴了Google Bigtable的设计思想,但开源并适应了...

Global site tag (gtag.js) - Google Analytics