Hadoop Pig学习笔记(一)
Pig简介
这节总结Pig语言的方方面面。
Pig是一种探索大规模数据集的脚本语言。
pig是在HDFS和MapReduce之上的数据流处理语言,它将数据流处理翻译成多个map和reduce函数,提供更高层次的抽象将程序员从具体的编
程中解放出来。
Pig包括两部分:用于描述数据流的语言,称为Pig Latin;和用于运行Pig Latin程序的执行环境。
Pig Latin程序有一系列的operation和transformation组成。每个操作或变换对输入进行数据处理,然后产生输出结果。这些操作整体上
描述了一个数据流。Pig内部,这些变换操作被转换成一系列的MapReduce作业。
Pig不适合所有的数据处理任务,和MapReduce一样,它是为数据批处理而设计的。如果只想查询大数据集中的一小部分数据,pig的实现
不会很好,因为它要扫描整个数据集或绝大部分。
1. Pig的运行
Pig是作为客户端运行的程序,你需要将其连接到本地Hadoop或者集群上。当安装Pig之后,有三种执行pig程序的方法:pig脚本
(将程序写入.pig文件中),Grunt(运行Pig命令的交互式shell环境)和嵌入式方式。
records = Load ‘sample.txt’ as (year:chararray, temperature:int, quality:int);
filter_records = FILTER records BY temperature != 9999 AND quality == 0;
group_records = GROUP filter_records BY year;
max_temp = FOREACH group_records GENERATE group, MAX(filter_records.temperature);
DUMP max_temp;
生成上面程序的创建的数据集结构: grunt> ILLUSTRATE max_temp;
Pig和数据库的比较:
1)Pig是数据流编程语言,而SQL是一种描述型编程语言。Pig是相对于输入的一步步操作,其中每一步都是对数据的一个简单的变换;
而SQL语句是一个约束的集合,这些约束结合在一起定义了输出。Pig更像RDBMS中的查询规划器。
2)RDBMS把数据存储在严格定义了模式的表内,但pig对数据的要求更宽松,可以在运行时定义模式,而且是可选的。
3)pig对复杂、嵌套数据结构的支持更强;
4)Pig不支持事务和索引,也不支持随机读和几十毫秒级别的查询,它是针对数据批量处理的。
5)Hive是介于Pig和RDBMS之间的系统。Hive以HDFS为存储,但是查询语言是基于SQL的,而且Hive要求所有数据必须存储在表中,
表必须有模式,而模式由Hive管理。但Hive允许为预先存在HDFS中的数据关联一个模式,因此数据加载步骤是可选的。
2 Pig Latin
程序有一系列语句构成。操作和命令是大小写无关的,而别名和函数名是大小写敏感的。
Pig处理多行语句时,在整个程序逻辑计划没有构造完毕前,pig并不处理数据。
Pig Latin关系操作
类型 操作 描述
加载与存储
LOAD 将数据从外部文件或其它存储中加载数据,存入关系
STORE 将一个关系存放到文件系统或其它存储中
DUMP 将关系打印到控制台
过滤
FILTER 从关系中删除不需要的行
DISTINCT 从关系中删除重复的行
FOREACH…GENERATE 对于集合的每个元素,生成或删除字段
STREAM 使用外部程序对关系进行变换
SAMPLE 从关系中随机取样
分组与连接
JOIN 连接两个或多个关系
COGROUP 在两个或多个关系中分组
GROUP 在一个关系中对数据分组
CROSS 获取两个或更多关系的乘积(叉乘)
排序
ORDER 根据一个或多个字段对某个关系进行排序
LIMIT 限制关系的元组个数
合并与分割
UNION 合并两个或多个关系
SPLIT 把某个关系切分成两个或多个关系
Pig Latin的诊断操作
操作 描述
DESCRIBE 打印关系的模式
EXPLAIN 打印逻辑和物理计划
ILLUSTRATE 使用生成的输入子集显示逻辑计划的试运行结果
Pig Latin UDF语句
REGISTER 在Pig运行时环境中注册一个JAR文件
DEFINE 为UDF、流式脚本或命令规范新建别名
Pig Latin命令类型
kill 中止某个MapReduce任务
exec 在一个新的Grunt shell程序中以批处理模式运行一个脚本
run 在当前Grunt外壳程序中运行程序
quit 退出解释器
set 设置Pig选项
Pig Latin表达式
类型 表达式 描述 示例
字段 $n 第n个字段 $0
字段 f 字段名f year
投影 c.$n, c.f 在关系、包或元组中的字段 records.$0, records.year
Map查找 m#k 在映射m中键k对应的值 items’Coat’
类型转换 (t)f 将字段t转换成f类型 (int)year
函数型平面化 fn(f1, f2, …) 在字段上应用函数 fn isGood(quality)
FLATTEN(f) 从包和元组中去除嵌套 flatten(group)
其它的表达式,如算术、条件、比较和布尔型类似其它语言,不详述.
Pig Latin类型
数据类型包括int (32位有符号整数), long(64位有符号整数), float(32位浮点数), double(64位浮点数),
chararray(UTF16格式的字符数组), Bytearray(字节数组), tuple(元组), bag(包), map(键值对).
tuple: (1, ‘hello’) //任何类型的字段序列
bag: {(1, ‘hello’), (2)} //元组的无序多重集合(允许重复元组)
map: [‘a’ ‘hello’] //一组键值对,键必须是字符数组
关系和包在概念上是相同的,但是有细微差别。关系是顶层构造结构,只能从上表中的关系操作中创建关系,包必须在某个关系中。
举例:
A = {(1, 2), (3, 4)} //错,使用load语句从文件中加载数据
B = A.$0 //错, B = foreach A generate $0;
模式(Schema)
Pig的一个关系可以有一个关联的模式,模式为关系的字段指定名称和类型。Pig的这种模式声明方式和SQL数据库要求数据加载前必须
先声明模式截然不同,Pig设计的目的是用于分析不包含数据类型信息的纯文本输入文件的。但是尽量定义模式,会让程序运行地更高效。
缺点:在查询中声明模式的方式是灵活的,但不利于模式重用。每个查询中维护重复出现的模式会很困难。处理这一问题的办法是写
自己的加载函数来封装模式。
SQL数据库在加载数据时,会强制检查表模式中的约束。在pig中,如果一个值无法被强制转换为模式中申明的类型,pig会用空值null代替,
显示一个空位。大数据集普遍都有被损坏的值、无效值或意料之外的值,简单的方法是过滤掉无效值:
grunt>good_records = filter records by temperature is not null;
另一种技巧是使用SPLIT操作把数据划分成好和坏两个关系,然后在分别进行分析:
grunt> split records into good_records if temperature is not null,
bad_records if temperature is null;
grunt> dump good_records;
在Pig中,不用为数据流中的每个新产生的关系声明模式。大多数情况下,Pig能够根据关系操作的输入关系的模式来确定输出结果的模式。
有些操作不改变模式,如Limit。而Union会自动生成新的模式。
如果要重新定义一个关系的模式,可以使用带as子句的FOREACH…GENERATE操作来定义输入关系的一部分或全部字段的模式。
函数
Pig的函数分为计算函数,过滤函数,加载函数和存储函数。
计算函数: AVG, COUNT, CONCAT, COUNTSTAR, DIFF, MAX, MIN, SIZE, SUM, TOKENIZE
过滤函数: IsEmpty
加载/存储函数:PigStorage, BinStorage, BinaryStorage, TextLoader, PigDump
3 用户自定义函数(UDF)
public abstract class EvalFunc<T> {
public abstract T exec(Tuple input) throws IOException;
public List<FuncSpec> getAvgToFuncMapping() throws FrontendException;
public FuncSpec outputSchema() throws FrontendException;
}
输入元组的字段包含传递给函数的表达式,输出是泛型;对于过滤函数输出就是Boolean类型。建议尽量在
getAvgToFuncMapping()/outputSchema()申明输入和输出数据的类型,以便Pig进行类型转换或过滤不匹配类型的错误值。
Grunt>REGISTER pig-examples.jar;
DEFINE isGood org.hadoopbook.pig.IsGoodQuality();
加载UDF
public LoadFunc {
public void setLocation(String location, Job job);
public InputFormat getInputFormat();
public void prepareToRead(RecordReader reader, PigSplit split);
public Tuple next() throws IOException;
}
类似Hadoop,Pig的数据加载先于mapper的运行,所以保证数据可以被分割成能被各个mapper独立处理的部分非常重要。从Pig 0.7开始,
加载和存储函数接口已经进行了大幅修改,以便与Hadoop的InputFormat和OutputFormat类基本一致。
Grunt>Register loadfunc.jar
Define customLoad org.hadoopbook.pig.loadfunc()
records = load ‘input/sample.txt’ using customLoad(‘16-19, 88-92, 93-93’)
as (year:int, temperature:int, quality:int);
4 数据处理操作
加载和存储数据: store A into ‘out’ using pigStorage(‘:’) ; // 将元组存储为以分号分隔的纯文本值
过滤数据
Foreach … generate // 逐个处理一个关系中的行,来生成一个新的关系包含部分或全部的字段
例子: B = foreach A generate $0, $2+1, ‘Constant’;
分组与连接数据
Join 连接
C = join A by $0, B by $1; // 默认为内连接,将A的第一个字段和B的第二个字段连接,输出匹配的字段
// 连接后新关系的字段为输入关系的字段和
C = join A by $0, B by $1 using “replicated”; // 分段复制链接,B表中的数据将会放在内存中
C= join A by $0 left outer, B by $1; // 左外连接,左边的没有匹配项也输出
Cogroup 多关系分组
类似于Join,但默认是外连接,连接键为第一个字段,第二个字段为匹配的第一个关系中的所有元组的包,第三个字段为第二个表中匹配的
所有元组的包。
D = COGROUP A by $0, B by $1; // 新的关系的元组个数为连接键的并集(去除重复);
D= COGROUP A by $0 inner, B by $1 inner; // 新关系的元组个数是连接键取交集的个数(只输出匹配的)。每个元组中的第二个和
第三个字段都是一个包含一个元组的包
COGROUP,inner和flatten组合使用相当于实现了内连接:
G = COGROUP A by $0 innner, B by $1 inner;
H = foreach G generate flatten($1), flatten($2)
// H和join A by $0, B by $1相同
cross叉乘
I = cross A, B; // 所有可能m*n行
Group 分组
B = group A by $0; // 第一个字段为group字段,第二个字段为一个包,包含元组的其它字段
B = group A by size($1); // 长度为第一个字段,第二个字段为一个包,包含所有长度为第一个字段的元组
C = group A all; // 只有一行,第一个字段为all,第二个字段为A中所有元组的包
D = group A any; // 对关系中的元组随机分组,对取样非常有用
排序数据
Pig按什么顺序来处理关系的行是不确定的,只能在输出前排序。
B = order A by $0, $1 DESC;
C = Limit B 2;
组合和切分数据
Union可以将几个关系合在一起,即所有元组的集合,当关系的模式不匹配时,新关系就没有模式。
C = union A, B;
Split 可以将一个关系的元组按某种条件分成几个子集。
Split A into B if $0 is null, C if $0 is not null;
5 pig实用技巧
并行处理: 可以在很多语句中指定reducer的数量
group, join, cogroup, cross, distinct, order
(复习:reduce的任务个数设置为稍小于集群中的reduce任务槽数)
参数替换:在pig语句中使用$加变量名的方式使用外部定义的变量值,在运行时可以通过"-param input=”设置变量的值,
或者通过"-param_file ”来指定参数文件。
动态参数:很多Unix shell用反引号引用的命令来替换实际值,如`date “+%Y-%m-%d” `会按规定格式输出日期。
这个可以放在-param或参数文件中来动态得到一个值。
==========================================================================================================
各种SQL在PIG中实现
我这里以Mysql 5.1.x为例,Pig的版本是0.8
同时我将数据放在了两个文件,存放在/tmp/data_file_1和/tmp/data_file_2中.文件内容如下:
tmp_file_1:
Txt代码
zhangsan 23 1
lisi 24 1
wangmazi 30 1
meinv 18 0
dama 55 0
tmp_file_2:
Txt代码
1 a
23 bb
50 ccc
30 dddd
66 eeeee
1.从文件导入数据
1)Mysql (Mysql需要先创建表).
CREATE TABLE TMP_TABLE(USER VARCHAR(32),AGE INT,IS_MALE BOOLEAN);
CREATE TABLE TMP_TABLE_2(AGE INT,OPTIONS VARCHAR(50)); -- 用于Join
LOAD DATA LOCAL INFILE '/tmp/data_file_1' INTO TABLE TMP_TABLE ;
LOAD DATA LOCAL INFILE '/tmp/data_file_2' INTO TABLE TMP_TABLE_2;
2)Pig
tmp_table = LOAD '/tmp/data_file_1' USING PigStorage('\t') AS (user:chararray, age:int,is_male:int);
tmp_table_2= LOAD '/tmp/data_file_2' USING PigStorage('\t') AS (age:int,options:chararray);
2.查询整张表
1)Mysql
SELECT * FROM TMP_TABLE;
2)Pig
DUMP tmp_table;
3. 查询前50行
1)Mysql
SELECT * FROM TMP_TABLE LIMIT 50;
2)Pig
tmp_table_limit = LIMIT tmp_table 50;
DUMP tmp_table_limit;
4.查询某些列
1)Mysql
SELECT USER FROM TMP_TABLE;
2)Pig
tmp_table_user = FOREACH tmp_table GENERATE user;
DUMP tmp_table_user;
5. 给列取别名
1)Mysql
SELECT USER AS USER_NAME,AGE AS USER_AGE FROM TMP_TABLE;
2)Pig
tmp_table_column_alias = FOREACH tmp_table GENERATE user AS user_name,age AS user_age;
DUMP tmp_table_column_alias;
6.排序
1)Mysql
SELECT * FROM TMP_TABLE ORDER BY AGE;
2)Pig
tmp_table_order = ORDER tmp_table BY age ASC;
DUMP tmp_table_order;
7.条件查询
1)Mysql
SELECT * FROM TMP_TABLE WHERE AGE>20;
2) Pig
tmp_table_where = FILTER tmp_table by age > 20;
DUMP tmp_table_where;
8.内连接Inner Join
1)Mysql
SELECT * FROM TMP_TABLE A JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
2)Pig
tmp_table_inner_join = JOIN tmp_table BY age,tmp_table_2 BY age;
DUMP tmp_table_inner_join;
9.左连接Left Join
1)Mysql
SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
2)Pig
tmp_table_left_join = JOIN tmp_table BY age LEFT OUTER,tmp_table_2 BY age;
DUMP tmp_table_left_join;
10.右连接Right Join
1)Mysql
SELECT * FROM TMP_TABLE A RIGHT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
2)Pig
tmp_table_right_join = JOIN tmp_table BY age RIGHT OUTER,tmp_table_2 BY age;
DUMP tmp_table_right_join;
11.全连接Full Join
1)Mysql
SELECT * FROM TMP_TABLE A JOIN TMP_TABLE_2 B ON A.AGE=B.AGE
UNION SELECT * FROM TMP_TABLE A LEFT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE
UNION SELECT * FROM TMP_TABLE A RIGHT JOIN TMP_TABLE_2 B ON A.AGE=B.AGE;
2)Pig
tmp_table_full_join = JOIN tmp_table BY age FULL OUTER,tmp_table_2 BY age;
DUMP tmp_table_full_join;
2.同时对多张表交叉查询
1)Mysql
SELECT * FROM TMP_TABLE,TMP_TABLE_2;
2)Pig
tmp_table_cross = CROSS tmp_table,tmp_table_2;
DUMP tmp_table_cross;
3.分组GROUP BY
1)Mysql
SELECT * FROM TMP_TABLE GROUP BY IS_MALE;
2)Pig
tmp_table_group = GROUP tmp_table BY is_male;
DUMP tmp_table_group;
14.分组并统计
1)Mysql
SELECT IS_MALE,COUNT(*) FROM TMP_TABLE GROUP BY IS_MALE;
2)Pig
tmp_table_group_count = GROUP tmp_table BY is_male;
tmp_table_group_count = FOREACH tmp_table_group_count GENERATE group,COUNT($1);
DUMP tmp_table_group_count;
15.查询去重DISTINCT
1)MYSQL
SELECT DISTINCT IS_MALE FROM TMP_TABLE;
2)Pig
tmp_table_distinct = FOREACH tmp_table GENERATE is_male;
tmp_table_distinct = DISTINCT tmp_table_distinct;
DUMP tmp_table_distinct;
============================================================================================
PIG是个很好的大数据查询的工具,刚好参加一个项目用到了PIG,机会难得,学习学习。
1. 执行模式
- 本地模式,在同一个JVM中运行,访问本地文件系统,适用于小数据集,使用时加上参数 –x local 或 –exectype local。
- hadoop模式,在hadoop集群中运行,适用于海量数据集。
hadoop模式配置
- 设置环境变量PIG_HADOOP_VERSION,如 export PIG_HADOOP_VERSION=18
- 设置hadoop job tracker 和 namenode 的地址(有两种方案)
如果有hadoop site的配置文件,且定义了fs.default.name和mapred.job.tracker,可以将含有改配置文件的文件夹的路径设置给环境变量PIG_CLASSPATH,如 export PIG_CLASSPATH=$HADOOP_INSTALL/conf
也可以创建一个配置文件pig.properties,在该配置文件中设置fs.default.name和mapred.job.tracker的值, 如:
fs.default.name=hdfs://localhost:9100
mapred.job.tracker=localhost:9000
2. Run PIG 程序
有三种方式可以run pig程序,script/grunt/embeded
script,直接run一个pig command script file。
grunt,交互式命令行。
embeded,在java中运行pig program。
3. An Example
?1 records = LOAD 'input/ncdc/micro-tab/simple.txt' >> AS (year:chararray,temperature:int,quality:int);
- 该语句读取源数据
- 输入是以 TAB 键分割的
- chararray相当于java的string, int相当于java的int
- LOAD 后接着用文件路径,文件路径可以是本地文件,或者是HDFS URI- AS(可选)给每一个field命名,以便后续的语句引用
- LOAD 的输出是一个关系(relation),它类似于数据库表的元组集合,由多个field组成,每个field都有名字。
DESCIBE操作符可以查看输出关系的结构,DUMP操作符可以查看结果。
?1 filtered_records = FILTER records BY temperature != 9999 AND ( quality ==0 OR quality ==1 OR quality ==4 OR quality ==5
OR quality ==9);
- 该语句过滤了一些无效的数据
?1 grouped_records = GROUP filtered_records BY year;
- 该语句根据年份来进行分组
?1 max_temp = FOREACH grouped_records GENERATE group,MAX(filtered_records.temperature);
- FOREACH 处理每一行的数据, GENERATE 关键字定义输出的数据,这里的输出是 年份(group)和该年的最高温度
- MAX是内置的一个函数,用于计算最大值
4. PIG 语句基本上由三部分组成
- LOAD语句或load/store函数, 读取数据,PigStorage是默认的load函数
- 数据处理语句
FILTER
FOREACH
GROUP
COGROUP
INNER JOIN
OUTER JOIN
UNION
SPLIT
- DUMP or STORE 语句,查询或保存结果, PigStorage是默认的store函数
5. Debug
DUMP
DESCRIBE
EXPLAIN
ILLUSTRATE
==============================================================================================
为了实现一个较为完备、实用的数据流处理机制,开始系统的学习Apache Pig——并行数据处理语言。
Pig的基本数据类型:
int
int
as (a:int)
long
long
as (a:long)
float
float
as (a:float)
double
double
as (a:double)
chararray
chararray
as (a:chararray)
bytearray
bytearray
as (a:bytearray)
map
chararray到任意数据元素的映射
as (a:map[],b:map[int])
tuple
元组——定长、有序的数据元素集合(每个数据元素的类型不必一致,但因有序而可以进行定位)。
as (a:tuple(),
b:tuple(x:int,y:int))
bag
包——由tuples组成的无序集合。如:{('bob', 55), ('sally', 52), ('john', 25)}是一个有3个tuple、每个tuple含有两个字段的bag。
bag{} or bag{t:(list_of_fields)},
list_of_fields是一个逗号分隔的字段声明列表。
要注意的是,bag内的tuple必须有名字,否则就无法直接访问该元素。
(a:bag{}, b:bag{t:
(x:int, y:int)})
关于数据类型转换,任何失败的转换都将返回NULL值(与SQL中的NULL概念一致)。
dividends = load 'NYSE_dividends' as
(exchange:chararray, symbol:chararray, date:chararray, dividend:float);
dividends = load 'NYSE_dividends' as (exchange, symbol, date, dividend);
为了让Pig能吃进任何数据,当AS部分的列不在数据源提供范围内时,会自动使用NULL填充。
Pig Latin是数据流语言。每一个处理步骤都会返回一个新的数据集(data set)或关系(relation)。如:input = load 'data' ——input就是关系名,它返回自对数据集“data”的加载。一旦一个关系被赋值,这个赋值就是永久性的。
下面介绍Pig Latins的主要元素:
输入输出
Load
在默认情况下,load方法处理的是tab分隔的文本文件,但也可以使用函数作为数据源,如:
divs = load 'NYSE_dividends' using HBaseStorage();
divs = load 'NYSE_dividends' using PigStorage(',');
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
?
Matches any single character.
*
Matches zero or more characters.
[abc]
Matches a single character from character set (a,b,c).
[a-z]
Matches a single character from the character range (a..z), inclusive. The first character must be lexicographically less than or equal to the second character.
[^abc]
Matches a single character that is not in the character set (a, b, c). The ^ character must occur immediately to the right of the opening bracket.
[^a-z]
Matches a single character that is not from the character range (a..z), inclusive. The ^ character must occur immediately to the right of the opening bracket.
\c
Removes (escapes) any special meaning of character c.
{ab,cd}
Matches a string from the string set {ab, cd}.
Store
Pig默认用PigStorage以tab分隔文件的形式将数据保存到HDFS上:
store processed into '/data/examples/processed';
也可以指定保存函数:
store processed into 'processed' using HBaseStorage();
store processed into 'processed' using PigStorage(',');
Dump
用于在屏幕上显示数据。
关系操作
foreach
通过一组表达式对数据流中的每一行进行运算,产生的结果就是用于下一个算子的数据集。
下面的语句加载整个数据,但最终结果B中只保留其中的user和id字段:
A = load 'input' as (user:chararray, id:long, address:chararray, phone:chararray,preferences:map[]);
B = foreach A generate user, id;
在foreach中可以使用$+数字代表某个位置的列。下面语句中gain与gain2的值一样:
prices = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,volume, adj_close);
gain = foreach prices generate close - open;
gain2 = foreach prices generate $6 - $3;
在schema定义不确定或不清楚时,位置风格的引用是很有用的。
表达式中可以使用“*”代表全部列,还可以使用“..”表示范围内的列,这对简化命令文本很有用:
prices = load 'NYSE_daily' as (exchange, symbol, date, open,high, low, close, volume, adj_close);
beginning = foreach prices generate ..open; -- exchange, symbol, date, open
middle = foreach prices generate open..close; -- open, high, low, close
end = foreach prices generate volume..; -- volume, adj_close
Pig支持算术表达式以及问号表达式:
2 == 2 ? 1 : 4 --returns 1
2 == 3 ? 1 : 4 --returns 4
null == 2 ? 1 : 4 -- returns null
2 == 2 ? 1 : 'fred' -- type error; both values must be of the same type
对于复杂的数据,可以使用投影(projection)操作符。对于map而言,是“#”,其后跟随字符串形式的键名。
bball = load 'baseball' as (name:chararray, team:chararray,position:bag{t:(p:chararray)}, bat:map[]);
avg = foreach bball generate bat#'batting_average';
元组的投影符为“.”,其后可以跟字段名或位置风格的标识:
A = load 'input' as (t:tuple(x:int, y:int));
B = foreach A generate t.x, t.$1;
filter
使数据流中仅保留符合条件表达式的那些行。如:
startswithcm = filter divs by symbol matches 'CM.*';
用户自定义的返回布尔值的filter 函数也可用于这里。
group
按照键值相同的规则归并数据。在SQL中,group操作必然与聚集函数组合使用,而在pig中,group操作将产生与键值有关的bag。
daily = load 'NYSE_daily' as (exchange, stock);
grpd = group daily by stock;
cnt = foreach grpd generate group, COUNT(daily);
下面的语句则可以将分组后的数据储存起来,以备将来使用:
daily = load 'NYSE_daily' as (exchange, stock);
grpd = group daily by stock;
store grpd into 'by_group';
下面是针对多个字段的group:
--twokey.pig
daily = load 'NYSE_daily' as (exchange, stock, date, dividends);
grpd = group daily by (exchange, stock);
avg = foreach grpd generate group, AVG(daily.dividends);
describe grpd;
grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray,
stock: bytearray,date: bytearray,dividends: bytearray}}
可以使用ALL实现对所有字段的分组:
daily = load 'NYSE_daily' as (exchange, stock);
grpd = group daily all;
cnt = foreach grpd generate COUNT(daily);
order by
与SQL中的用法一致。
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
byclose = order daily by close desc, open;
distinct
与SQL中的用法一致。
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray);
uniq = distinct daily;
join
连接一组key:
jnd = join daily by symbol, divs by symbol;
连接两组key:
jnd = join daily by (symbol, date), divs by (symbol, date);
外连接:
jnd = join daily by (symbol, date) left outer, divs by (symbol, date);
多表连接:
A = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C by e;
自连接(必须准备两份数据):
divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:int,date:chararray, dividends);
divs2 = load 'NYSE_dividends' as (exchange:chararray, symbol:int,date:chararray, dividends);
jnd = join divs1 by symbol, divs2 by symbol;
increased = filter jnd by divs1::date < divs2::date and
divs1::dividends < divs2::dividends;
limit
用于限制行数(类似MySQL的LIMIT)。
divs = load 'NYSE_dividends';
first10 = limit divs 10;
sample
随机抽取指定比例(0到1)的数据。
some = sample divs 0.1;
parallel
用于实现指定数量的多节点并行运算。
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,volume, adj_close);
bysymbl = group daily by symbol parallel 10;
可以使用default_parallel来指定默认的并行度选项:
set default_parallel 10;
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted = order average by avg desc;
自定义函数(UDFS)
register 'your_path_to_piggybank/piggybank.jar';
define reverse org.apache.pig.piggybank.evaluation.string.Reverse();
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
backwards = foreach divs generate reverse(symbol);
flatten
将数据内部的包或元组扁平化(展开为底层字段——以笛卡尔积的方式扩展)。
players = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]);
pos = foreach players generate name, flatten(position) as position;
bypos = group pos by position;
上面的players中的数据应该是这样的(已用逗号取代TAB便于查看):
Jorge Posada,New York Yankees,{(Catcher),(Designated_hitter)},...
一旦经过了flatten语句,它将变成两个数据:
Jorge Posada,Catcher
Jorge Posada,Designated_hitter
foreach内的处理流
daily = load 'NYSE_daily' as (exchange, symbol); -- not interested in other fields
grpd = group daily by exchange;
uniqcnt = foreach grpd {
sym =daily.symbol;
uniq_sym =distinct sym;
generate group, COUNT(uniq_sym);
};
下面是在处理前对股票数据排序的例子:
--analyze_stock.pig
register 'acme.jar';
define analyze com.acme.financial.AnalyzeStock();
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float,
volume:int, adj_close:float);
grpd = group daily by symbol;
analyzed = foreach grpd {
sorted = order daily by date;
generate group, analyze(sorted);
};
下面的语句能找出每支股票最高的3次分红:
--hightest_dividend.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
grpd = group divs by symbol;
top3 = foreach grpd {
sorted = order divs by dividends desc;
top = limit sorted 3;
generate group, flatten(top);
};
内部的聚集运算:
--double_distinct.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray);
grpd = group divs all;
uniq = foreach grpd {
exchanges = divs.exchange;
uniq_exchanges = distinct exchanges;
symbols = divs.symbol;
uniq_symbols = distinct symbols;
generate COUNT(uniq_exchanges), COUNT(uniq_symbols);
};
上面的代码能计算出每个exchange及symbol的组合中,不同的exchange及symbol数。
连接已排序数据
--mergejoin.pig
-- use sort_for_mergejoin.pig to build NYSE_daily_sorted and NYSE_dividends_sorted
daily = load 'NYSE_daily_sorted' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends_sorted' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
jnd = join daily by symbol, divs by symbol using 'merge';
cogroup
基于一个key,记录多个输入。
A = load 'input1' as (id:int, val:float);
B = load 'input2' as (id:int, val2:int);
C = cogroup A by id, B by id;
describe C;
C: {group: int,A: {id: int,val: float},B: {id: int,val2: int}}
用它可以达到全外连接的效果:
--semijoin.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
grpd = cogroup daily by (exchange, symbol), divs by (exchange, symbol);
sjnd = filter grpd by not IsEmpty(divs);
final = foreach sjnd generate flatten(daily);
union
与SQL中的一致。
cross
将多个数据集中的数据按照字段名进行同值组合,形成笛卡尔积。
--cross.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
tonsodata = cross daily, divs parallel 10;
非线性数据流(split)
wlogs = load 'weblogs' as (pageid, url, timestamp);
split wlogs into
apr03 if timestamp < '20110404',
apr02 if timestamp < '20110403' and timestamp > '20110401',
apr01 if timestamp < '20110402' and timestamp > '20110331';
store apr03 into '20110403';
store apr02 into '20110402';
store apr01 into '20110401';
上面的split等价于:
apr03 = filter wlogs by timestamp < '20110404';
apr02 = filter wlogs by timestamp < '20110403' and timestamp > '20110401';
apr01 = filter wlogs by timestamp < '20110402' and timestamp > '20110331';
====================================================================================================
- 浏览: 39166 次
- 性别:
- 来自: 上海
相关推荐
【HADOOP学习笔记】 Hadoop是Apache基金会开发的一个开源分布式计算框架,是云计算领域的重要组成部分,尤其在大数据处理方面有着广泛的应用。本学习笔记将深入探讨Hadoop的核心组件、架构以及如何搭建云计算平台。...
本笔记基于林子雨老师在MOOC上的《大数据技术原理》课程,旨在为IT从业者和大学生提供一个全面了解大数据的基础框架。 首先,我们要认识到大数据的发展背景。随着互联网的普及,以及物联网、社交媒体、移动设备等...
Hadoop的生态系统还包括Hive、HBase、Pig、Zookeeper等多种工具和组件,用于数据分析、实时处理、任务调度和协调。 Hadoop安装通常包括下载、配置和启动集群的NameNode和DataNode。NameNode是HDFS的主节点,负责...
压缩包内的“学习笔记”可能包括以下内容:Hadoop安装与配置教程,HDFS的基本操作和管理,MapReduce编程模型的实例解析,Hadoop集群的优化策略,以及YARN、HBase、Hive和Pig的使用方法等。这些笔记可以帮助读者深入...
大数据技术学习笔记1 大数据技术学习笔记1 是一份关于大数据技术的学习笔记,涵盖了大数据技术的基本概念、Hadoop 生态系统、MapReduce 算法、Spark 框架、分布式计算平台等多个方面。 Hadoop 生态系统 Hadoop 是...
《Apache Pig 0.17.0 安装与配置指南》 Apache Pig 是一个用于大数据分析的平台,它提供了一种高级语言 Pig Latin 来处理大规模...通过不断的实践和学习,你将能更好地掌握Pig的潜力,为大数据分析带来强大的动力。
本学习笔记涵盖了Hadoop 1.0和2.0两个主要版本,旨在帮助读者全面理解Hadoop的核心概念、架构以及实际操作。 在Hadoop 1.0中,核心组件主要包括HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一种...
【标题】"pig-0.16.0.tar安装包" 涉及的主要知识点是Apache Pig的安装和使用,这是一个基于Hadoop的数据流编程平台,用于处理大规模数据集。...通过学习和理解上述概念,你可以高效地使用Pig-0.16.0进行大数据分析。
在这份斯坦福大学机器学习课程的个人笔记中,涵盖了许多关键的机器学习知识点和概念。...作为个人学习笔记,它为读者提供了一个很好的学习资源,能够帮助读者更好地理解和掌握机器学习的相关概念和方法。
《大数据学习笔记文档》 大数据领域是信息技术的热门方向,涵盖了多个关键技术,如Linux、Kafka、Python、Hadoop和Scala等。以下是对这些技术的详细介绍: **Linux**:作为大数据处理的基础平台,Linux因其开源、...
- Pig:是一种高级的脚本语言,用于编写MapReduce任务,适用于数据流和数据转换的场景。 - Hive:为数据仓库设计,提供了SQL方言HiveQL,允许用户编写类似SQL的查询语句来操作Hadoop中的数据。 - Sqoop:是一个开源...
《粉红猪小妹学习笔记》 粉红猪小妹(Peppa Pig)是一部广受欢迎的儿童动画片,它通过简单有趣的故事教给孩子们日常生活中的词汇和表达。以下是根据动画内容整理出的一些重要英语知识点: 1. **muddy** - 形容词,...
《Hadoop学习笔记》 Hadoop,作为大数据处理的核心框架,是开源社区的杰作,由Apache软件基金会维护。这份文档旨在深入解析Hadoop的基本概念、架构及其在大数据处理中的应用,帮助读者全面掌握这一重要技术。 一、...
《Hadoop学习笔记》 Hadoop,作为大数据处理的核心组件之一,是Apache软件基金会开发的开源框架,专门针对大规模数据集进行分布式计算。这个框架基于Java语言实现,它设计的目标是高容错性、可扩展性和高效的数据...
这份“大数据技术学习笔记之Hive”旨在帮助我们深入理解并掌握Hive的核心概念和技术应用。 一、Hive概述 Hive主要为非结构化或半结构化的海量数据提供数据仓库服务,通过SQL-like查询语言(HQL)进行数据查询,简化...
本资料包“大数据学习笔记,学习路线,技术案例整理”是一个全面的大数据学习资源,旨在帮助初学者和进阶者系统地掌握大数据的核心技术和应用实践。 首先,我们来了解一下大数据的关键概念和技术栈。大数据通常有四...
文章的作者通过个人学习笔记的方式整理了上述内容,反映了作者对机器学习各个方面的认识和理解。笔记中不仅涉及了理论知识,还包括了一些实践和应用,如使用Hadoop、Pig、Hive、Mahout和NoSQL等工具处理大数据和...
### Hadoop数据分析平台学习笔记 #### 一、Hadoop概述 **Hadoop**是一个开源软件框架,用于分布式存储和处理大型数据集。它的设计灵感来源于Google的论文,其中包括Google文件系统(GFS)和MapReduce计算模型。...