使用到Pig来分析线上的搜索日志数据,散仙本打算使用hive来分析的,但由于种种原因,没有用成,而Pig(pig0.12-cdh)散仙一直没有接触过,所以只能临阵磨枪了,花了两天时间,大致看完了pig官网的文档,在看文档期间,也是边实战边学习,这样以来,对pig的学习,会更加容易,当然本篇不是介绍如何快速学好一门框架或语言的文章,正如标题所示,散仙打算介绍下如何在Pig中,使用用户自定义的UDF函数,关于学习经验,散仙会在后面的文章里介绍。
一旦你学会了UDF的使用,就意味着,你可以以更加灵活的方式来使用Pig,使它扩展一些为我们的业务场景定制的特殊功能,而这些功能,在通用的pig里是没有的,举个例子:
你从HDFS上读取的数据格式,如果使用默认的PigStorage()来加载,存储可能只支持有限的数据编码和类型,如果我们定义了一种特殊的编码存储或序列化方式,那么当我们使用默认的Pig来加载的时候,就会发现加载不了,这时候我们的UDF就派上用场了,我们只需要自定义一个LoadFunction和一个StoreFunction就可以解决,这种问题。
本篇散仙根据官方文档的例子,来实战一下,并在hadoop集群上使用Pig测试通过:
我们先来看下定义一个UDF扩展类,需要几个步骤:
序号 | 步骤 | 说明 | 1 | 在eclipse里新建一个java工程,并导入pig的核心包 | java项目 | 2 | 新建一个包,继承特定的接口或类,重写自定义部分 | 核心业务 | 3 | 编写完成后,使用ant打包成jar | 编译时需要pig依赖,但不用把pig的jar包打入UDF中 | 4 | 把打包完成后的jar上传到HDFS上 | pig运行时候需要加载使用 | 5 | 在pig脚本里,注册我们自定义的udf的jar包 | 注入运行时环境 | 6 | 编写我们的核心业务pig脚本运行 | 测试是否运行成功 |
项目工程截图如下:
核心代码如下:
- package com.pigudf;
- import java.io.IOException;
- import org.apache.pig.EvalFunc;
- import org.apache.pig.data.Tuple;
- import org.apache.pig.impl.util.WrappedIOException;
- /**
- * 自定义UDF类,对字符串转换大写
- * @author qindongliang
- * */
- public class MyUDF extends EvalFunc<String> {
- @Override
- public String exec(Tuple input) throws IOException {
- //判断是否为null或空,就跳过
- if(input==null||input.size()==0){
- return null;
- }
- try{
- //获取第一个元素
- String str=(String) input.get(0);
- //转成大写返回
- return str.toUpperCase();
- }catch(Exception e){
- throw WrappedIOException.wrap("Caught exception processing input row ",e);
- }
- }
- }
package com.pigudf; import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.WrappedIOException; /** * 自定义UDF类,对字符串转换大写 * @author qindongliang * */ public class MyUDF extends EvalFunc<String> { @Override public String exec(Tuple input) throws IOException { //判断是否为null或空,就跳过 if(input==null||input.size()==0){ return null; } try{ //获取第一个元素 String str=(String) input.get(0); //转成大写返回 return str.toUpperCase(); }catch(Exception e){ throw WrappedIOException.wrap("Caught exception processing input row ",e); } } }
关于打包的ant脚本,散仙会在文末上传附件,下面看下造的一些测试数据(注意,文件一定要上传到HDFS上,除非你是local模式):
- grunt> cat s.txt
- zhang san,12
- Song,34
- long,34
- abC,12
- grunt>
grunt> cat s.txt zhang san,12 Song,34 long,34 abC,12 grunt>
我们在看下,操作文件和jar包是放在一起的:
- grunt> ls
- hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3> 1295
- hdfs://dnode1:8020/tmp/udf/s.txt<r 3> 36
- grunt>
grunt> ls hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3> 1295 hdfs://dnode1:8020/tmp/udf/s.txt<r 3> 36 grunt>
最后,我们看下pig脚本的定义:
- --注册自定义的jar包
- REGISTER pudf.jar;
- --加载测试文件的数据,逗号作为分隔符
- a = load 's.txt' using PigStorage(',');
- --遍历数据,对name列转成大写
- b = foreach a generate com.pigudf.MyUDF((chararray)$0);
- --启动MapReduce的Job进行数据分析
- dump b
--注册自定义的jar包 REGISTER pudf.jar; --加载测试文件的数据,逗号作为分隔符 a = load 's.txt' using PigStorage(','); --遍历数据,对name列转成大写 b = foreach a generate com.pigudf.MyUDF((chararray)$0); --启动MapReduce的Job进行数据分析 dump b
最后,我们看下结果,只要过程不出现异常和任务失败,就证明我们的udf使用成功:
- Counters:
- Total records written : 4
- Total bytes written : 64
- Spillable Memory Manager spill count : 0
- Total bags proactively spilled: 0
- Total records proactively spilled: 0
- Job DAG:
- job_1419419533357_0147
- 2014-12-30 18:10:24,394 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
- 2014-12-30 18:10:24,395 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
- 2014-12-30 18:10:24,396 [main] INFO org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
- 2014-12-30 18:10:24,405 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
- 2014-12-30 18:10:24,405 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
- (ZHANG SAN,12)
- (SONG,34)
- (LONG,34)
- (ABC,12)
Counters: Total records written : 4 Total bytes written : 64 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_1419419533357_0147 2014-12-30 18:10:24,394 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2014-12-30 18:10:24,395 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2014-12-30 18:10:24,396 [main] INFO org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code. 2014-12-30 18:10:24,405 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2014-12-30 18:10:24,405 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 (ZHANG SAN,12) (SONG,34) (LONG,34) (ABC,12)
结果没问题,我们的UDF加载执行成功,如果我们还想将我们的输出结果直接写入到HDFS上,可以在pig脚本的末尾,去掉dump命令,加入
store e into '/tmp/dongliang/result/'; 将结果存储到HDFS上,当然我们可以自定义存储函数,将结果写入数据库,Lucene,Hbase等关系型或一些NOSQL数据库里。
相关推荐
HiStore适用于日志分析、通信行业数据分析、大数据量分析应用、数据仓库/数据集市以及物联网等场景,其优势在于低成本存储和实时查询性能,尤其适合对数据存储成本敏感且需要实时查询的业务。 在营销领域,大数据的...
同时,Elasticsearch的聚合功能也非常出色,能够对数据进行高效的统计分析,这一点上Elasticsearch已经超越了传统搜索引擎的角色,开始向数据分析工具发展。 然而,Elasticsearch在复杂数据分析方面与Hadoop或Spark...
Hadoop生态系统中的组件如Flume、Nutch、Mahout、Solr、Sqoop、Hive、HBase、Pig、MapReduce、Tez、Spark、Storm等,为从数据采集、存储、处理、分析到数据服务提供了全面的技术支持。这些组件通过分布式存储系统...
6. **Flume**:一个可靠、高性能的日志收集系统,用于收集、聚合和移动日志数据。 #### 四、Hadoop应用场景 Hadoop广泛应用于各种场景中,包括但不限于: 1. **数据分析**:利用Hadoop处理海量数据,进行数据挖掘...
在使用 Flink 进行数据分析时,开发者可以充分利用其提供的API,编写出既快速又可靠的分布式数据处理程序。Flink 作为一个高性能、高吞吐量的处理系统,非常适合实时数据流处理、批处理以及复杂事件处理等多种数据...
- **应用场景**: 适用于需要处理大量非结构化数据的应用场景,如日志分析、搜索引擎索引等。 **1.2 Hadoop YARN (Yet Another Resource Negotiator)** - **定义**: Hadoop YARN是Hadoop的资源管理器,负责集群资源...
3. **Process**:定义处理逻辑,支持Oozie、Hive、Pig和Spark等作业流引擎,实现对数据的转换和分析。 **数据流水线**:Falcon将这些实体组合成数据流水线,通过预定义的策略来处理数据的复制、保留和存档。 **...
百度的 Hadoop 集群为整个公司的数据团队、大搜索团队、社区产品团队、广告团队,以及 LBS 团体提供统一的计算和存储服务,主要应用包括:数据挖掘与分析、日志分析平台、数据仓库系统、推荐引擎系统、用户行为分析...
6. **集成与定制**:WEBPIG商业版通常支持与其他DevOps工具集成,如CI/CD流水线、日志管理平台等,便于企业构建完整的运维生态系统。此外,还可能提供API或插件机制,满足特定需求的定制化开发。 7. **易用性**:...
- **应用场景**:适用于大规模数据集的存储,如日志数据、用户行为数据等。 #### 2. MapReduce - **定义**:MapReduce是一种编程模型,用于大规模数据集的并行处理。它将复杂的、大规模的数据处理任务分解为两个...
其产品线涵盖了数据的摄入、转换、探索、建模、存储和服务等多个环节,支持多种数据处理场景,如通过Sqoop进行数据导入,Flume处理日志流,Kafka用于消息传递,Impala实现快速查询,Solr进行全文搜索,HBase提供...
3. 数据集成:Hadoop可以与其他数据处理工具(如Spark、Hive、Pig等)结合,构建复杂的数据处理流水线,实现数据的ETL(提取、转换、加载)过程。 总结,Hadoop 3.1.2的源码分析不仅是对技术的深度探究,也是提升...
第三期期刊可能深入到Hadoop的实际应用场景,如大数据分析、日志处理、推荐系统等,同时可能会探讨Hadoop的安全性、性能优化策略以及故障排查技巧。对于想要在企业环境中部署Hadoop的读者来说,这些内容具有很高的...
大数据是指那些超出传统数据处理能力的大量、高速、多样化的信息资产,它需要新的技术和处理模式来挖掘潜在价值。本方案重点介绍了基于开源Hadoop生态体系的大数据平台,该平台具备强大的数据存储、处理和分析能力,...
- **Pig**:适合非编程人员使用的高级数据流语言和执行框架。 - **HBase**:建立在Hadoop之上的分布式列式存储系统。 - **ZooKeeper**:为分布式应用程序提供协调服务。 ### 二、Hadoop的安装与配置 #### 2.1 环境...
在大数据环境中,Flume主要用于实时流数据的采集,它可以从各种数据源(例如网络日志、应用程序日志、社交媒体流等)收集数据,并将其可靠地传输到存储系统,如Hadoop HDFS或实时处理引擎Kafka。Flume具有高度可配置...
4. 数据处理与分析:书中会讲解如何使用Hadoop进行数据预处理、清洗、转换和分析,以及如何与其他数据处理工具如Spark、Flink集成,实现更高效的数据处理流水线。 5. 安装与部署:对于实际操作,书中提供了详细的...