`
weitao1026
  • 浏览: 1056037 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

使用到Pig来分析线上的搜索日志数据

pig 
阅读更多

使用到Pig来分析线上的搜索日志数据,散仙本打算使用hive来分析的,但由于种种原因,没有用成,而Pig(pig0.12-cdh)散仙一直没有接触过,所以只能临阵磨枪了,花了两天时间,大致看完了pig官网的文档,在看文档期间,也是边实战边学习,这样以来,对pig的学习,会更加容易,当然本篇不是介绍如何快速学好一门框架或语言的文章,正如标题所示,散仙打算介绍下如何在Pig中,使用用户自定义的UDF函数,关于学习经验,散仙会在后面的文章里介绍。



 

 

一旦你学会了UDF的使用,就意味着,你可以以更加灵活的方式来使用Pig,使它扩展一些为我们的业务场景定制的特殊功能,而这些功能,在通用的pig里是没有的,举个例子:

你从HDFS上读取的数据格式,如果使用默认的PigStorage()来加载,存储可能只支持有限的数据编码和类型,如果我们定义了一种特殊的编码存储或序列化方式,那么当我们使用默认的Pig来加载的时候,就会发现加载不了,这时候我们的UDF就派上用场了,我们只需要自定义一个LoadFunction和一个StoreFunction就可以解决,这种问题。


本篇散仙根据官方文档的例子,来实战一下,并在hadoop集群上使用Pig测试通过:
我们先来看下定义一个UDF扩展类,需要几个步骤:

序号 步骤 说明 1 在eclipse里新建一个java工程,并导入pig的核心包 java项目 2 新建一个包,继承特定的接口或类,重写自定义部分 核心业务 3 编写完成后,使用ant打包成jar 编译时需要pig依赖,但不用把pig的jar包打入UDF中 4 把打包完成后的jar上传到HDFS上 pig运行时候需要加载使用 5 在pig脚本里,注册我们自定义的udf的jar包 注入运行时环境 6 编写我们的核心业务pig脚本运行 测试是否运行成功

 


项目工程截图如下:





核心代码如下:

Java代码 复制代码 收藏代码
  1. package com.pigudf;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.pig.EvalFunc;  
  6. import org.apache.pig.data.Tuple;  
  7. import org.apache.pig.impl.util.WrappedIOException;  
  8. /** 
  9.  * 自定义UDF类,对字符串转换大写 
  10.  * @author qindongliang 
  11.  * */  
  12. public class MyUDF extends EvalFunc<String> {  
  13.   
  14.     @Override  
  15.     public String exec(Tuple input) throws IOException {  
  16.           
  17.          //判断是否为null或空,就跳过  
  18.         if(input==null||input.size()==0){  
  19.             return null;  
  20.         }  
  21.         try{  
  22.             //获取第一个元素  
  23.             String str=(String) input.get(0);  
  24.             //转成大写返回  
  25.             return str.toUpperCase();  
  26.               
  27.         }catch(Exception e){  
  28.             throw WrappedIOException.wrap("Caught exception processing input row ",e);  
  29.         }  
  30.     }  
  31.       
  32.   
  33. }  
package com.pigudf;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.WrappedIOException;
/**
 * 自定义UDF类,对字符串转换大写
 * @author qindongliang
 * */
public class MyUDF extends EvalFunc<String> {

	@Override
	public String exec(Tuple input) throws IOException {
		
		 //判断是否为null或空,就跳过
		if(input==null||input.size()==0){
			return null;
		}
		try{
			//获取第一个元素
			String str=(String) input.get(0);
			//转成大写返回
			return str.toUpperCase();
			
		}catch(Exception e){
			throw WrappedIOException.wrap("Caught exception processing input row ",e);
		}
	}
	

}



关于打包的ant脚本,散仙会在文末上传附件,下面看下造的一些测试数据(注意,文件一定要上传到HDFS上,除非你是local模式):

Java代码 复制代码 收藏代码
  1. grunt> cat s.txt  
  2. zhang san,12  
  3. Song,34  
  4. long,34  
  5. abC,12  
  6. grunt>   
grunt> cat s.txt
zhang san,12
Song,34
long,34
abC,12
grunt> 




我们在看下,操作文件和jar包是放在一起的:

Java代码 复制代码 收藏代码
  1. grunt> ls  
  2. hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3>        1295  
  3. hdfs://dnode1:8020/tmp/udf/s.txt<r 3>   36  
  4. grunt>   
grunt> ls
hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3>        1295
hdfs://dnode1:8020/tmp/udf/s.txt<r 3>   36
grunt> 



最后,我们看下pig脚本的定义:

Pig代码 复制代码 收藏代码
  1. --注册自定义的jar包  
  2. REGISTER pudf.jar;   
  3. --加载测试文件的数据,逗号作为分隔符  
  4. a = load 's.txt' using PigStorage(',');     
  5. --遍历数据,对name列转成大写  
  6. b =  foreach a generate com.pigudf.MyUDF((chararray)$0);   
  7. --启动MapReduce的Job进行数据分析  
  8. dump b  
--注册自定义的jar包
REGISTER pudf.jar; 
--加载测试文件的数据,逗号作为分隔符
a = load 's.txt' using PigStorage(',');   
--遍历数据,对name列转成大写
b =  foreach a generate com.pigudf.MyUDF((chararray)$0); 
--启动MapReduce的Job进行数据分析
dump b


最后,我们看下结果,只要过程不出现异常和任务失败,就证明我们的udf使用成功:

Java代码 复制代码 收藏代码
  1. Counters:  
  2. Total records written : 4  
  3. Total bytes written : 64  
  4. Spillable Memory Manager spill count : 0  
  5. Total bags proactively spilled: 0  
  6. Total records proactively spilled: 0  
  7.   
  8. Job DAG:  
  9. job_1419419533357_0147  
  10.   
  11.   
  12. 2014-12-30 18:10:24,394 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!  
  13. 2014-12-30 18:10:24,395 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS  
  14. 2014-12-30 18:10:24,396 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.  
  15. 2014-12-30 18:10:24,405 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1  
  16. 2014-12-30 18:10:24,405 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1  
  17. (ZHANG SAN,12)  
  18. (SONG,34)  
  19. (LONG,34)  
  20. (ABC,12)  
Counters:
Total records written : 4
Total bytes written : 64
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1419419533357_0147


2014-12-30 18:10:24,394 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2014-12-30 18:10:24,395 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-12-30 18:10:24,396 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
2014-12-30 18:10:24,405 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2014-12-30 18:10:24,405 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(ZHANG SAN,12)
(SONG,34)
(LONG,34)
(ABC,12)


结果没问题,我们的UDF加载执行成功,如果我们还想将我们的输出结果直接写入到HDFS上,可以在pig脚本的末尾,去掉dump命令,加入
store e into '/tmp/dongliang/result/'; 将结果存储到HDFS上,当然我们可以自定义存储函数,将结果写入数据库,Lucene,Hbase等关系型或一些NOSQL数据库里。

分享到:
评论

相关推荐

    大数据分析学习路线.docx

    HiStore适用于日志分析、通信行业数据分析、大数据量分析应用、数据仓库/数据集市以及物联网等场景,其优势在于低成本存储和实时查询性能,尤其适合对数据存储成本敏感且需要实时查询的业务。 在营销领域,大数据的...

    elasticsearch与hadoop比较

    同时,Elasticsearch的聚合功能也非常出色,能够对数据进行高效的统计分析,这一点上Elasticsearch已经超越了传统搜索引擎的角色,开始向数据分析工具发展。 然而,Elasticsearch在复杂数据分析方面与Hadoop或Spark...

    2-童小军-运用Hadoop构建数据仓库平台.pdf

    Hadoop生态系统中的组件如Flume、Nutch、Mahout、Solr、Sqoop、Hive、HBase、Pig、MapReduce、Tez、Spark、Storm等,为从数据采集、存储、处理、分析到数据服务提供了全面的技术支持。这些组件通过分布式存储系统...

    Hadoop权威指南

    6. **Flume**:一个可靠、高性能的日志收集系统,用于收集、聚合和移动日志数据。 #### 四、Hadoop应用场景 Hadoop广泛应用于各种场景中,包括但不限于: 1. **数据分析**:利用Hadoop处理海量数据,进行数据挖掘...

    apache-flink

    在使用 Flink 进行数据分析时,开发者可以充分利用其提供的API,编写出既快速又可靠的分布式数据处理程序。Flink 作为一个高性能、高吞吐量的处理系统,非常适合实时数据流处理、批处理以及复杂事件处理等多种数据...

    hadoop&spark安装、环境配置、使用教程.docx

    - **应用场景**: 适用于需要处理大量非结构化数据的应用场景,如日志分析、搜索引擎索引等。 **1.2 Hadoop YARN (Yet Another Resource Negotiator)** - **定义**: Hadoop YARN是Hadoop的资源管理器,负责集群资源...

    Hadoop权威指南-中文版(前三章).doc

    2.2 使用Unix Tools分析数据 - 对比传统方法:在大量数据面前,Unix工具(如grep、awk、sort)的局限性。 2.3 使用Hadoop进行数据分析 - MapReduce的优势:分布式处理、容错性、可扩展性。 2.4 分布式 - ...

    Apache Hadoop---Falcon.docx

    3. **Process**:定义处理逻辑,支持Oozie、Hive、Pig和Spark等作业流引擎,实现对数据的转换和分析。 **数据流水线**:Falcon将这些实体组合成数据流水线,通过预定义的策略来处理数据的复制、保留和存档。 **...

    Hadoop应用案例.pptx

    百度的 Hadoop 集群为整个公司的数据团队、大搜索团队、社区产品团队、广告团队,以及 LBS 团体提供统一的计算和存储服务,主要应用包括:数据挖掘与分析、日志分析平台、数据仓库系统、推荐引擎系统、用户行为分析...

    Hadoop实战

    - **应用场景**:适用于大规模数据集的存储,如日志数据、用户行为数据等。 #### 2. MapReduce - **定义**:MapReduce是一种编程模型,用于大规模数据集的并行处理。它将复杂的、大规模的数据处理任务分解为两个...

    WEBPIG商业版

    6. **集成与定制**:WEBPIG商业版通常支持与其他DevOps工具集成,如CI/CD流水线、日志管理平台等,便于企业构建完整的运维生态系统。此外,还可能提供API或插件机制,满足特定需求的定制化开发。 7. **易用性**:...

    Cloudera大数据行业应用介绍.pptx

    其产品线涵盖了数据的摄入、转换、探索、建模、存储和服务等多个环节,支持多种数据处理场景,如通过Sqoop进行数据导入,Flume处理日志流,Kafka用于消息传递,Impala实现快速查询,Solr进行全文搜索,HBase提供...

    hadoop-3.1.2-src.tar.gz

    3. 数据集成:Hadoop可以与其他数据处理工具(如Spark、Hive、Pig等)结合,构建复杂的数据处理流水线,实现数据的ETL(提取、转换、加载)过程。 总结,Hadoop 3.1.2的源码分析不仅是对技术的深度探究,也是提升...

    《Hadoop开发者 》1-3期刊

    第三期期刊可能深入到Hadoop的实际应用场景,如大数据分析、日志处理、推荐系统等,同时可能会探讨Hadoop的安全性、性能优化策略以及故障排查技巧。对于想要在企业环境中部署Hadoop的读者来说,这些内容具有很高的...

    大数据方案介绍.pdf

    大数据是指那些超出传统数据处理能力的大量、高速、多样化的信息资产,它需要新的技术和处理模式来挖掘潜在价值。本方案重点介绍了基于开源Hadoop生态体系的大数据平台,该平台具备强大的数据存储、处理和分析能力,...

    Hadoop实战中文版

    - **Pig**:适合非编程人员使用的高级数据流语言和执行框架。 - **HBase**:建立在Hadoop之上的分布式列式存储系统。 - **ZooKeeper**:为分布式应用程序提供协调服务。 ### 二、Hadoop的安装与配置 #### 2.1 环境...

    大数据系列-协作框架

    在大数据环境中,Flume主要用于实时流数据的采集,它可以从各种数据源(例如网络日志、应用程序日志、社交媒体流等)收集数据,并将其可靠地传输到存储系统,如Hadoop HDFS或实时处理引擎Kafka。Flume具有高度可配置...

    Hadoop权威指南_第四版_中文版

    4. 数据处理与分析:书中会讲解如何使用Hadoop进行数据预处理、清洗、转换和分析,以及如何与其他数据处理工具如Spark、Flink集成,实现更高效的数据处理流水线。 5. 安装与部署:对于实际操作,书中提供了详细的...

Global site tag (gtag.js) - Google Analytics