- 浏览: 52096 次
- 性别:
- 来自: 北京
文章分类
最新评论
Pig 重写加载函数和存储函数UDF
pig自带的pigstorage不能指定行分隔符,所以自己重写了一个简单的UDF类,可以指定列和行的分隔符,之前研究过的简单的,
http://blog.csdn.net/ruishenh/article/details/12048067
但是弊端大,所以这次重写一下。
操作步骤打好包上传到服务器,
grunt> register /home/pig/pig-0.11.0/udflib/myStorage.jar
grunt> cat student;
1,xiaohouzi,25/2,xiaohouzi2,24/3,xiaohouzi3,23
grunt> a = load 'student' using com.hcr.hadoop.pig.MyStorage(',','/');
grunt> dump a;
(1,xiaohouzi,25)
(2,xiaohouzi2,24)
(3,xiaohouzi3,23)
grunt> store a into 'myStorageOut' using com.hcr.hadoop.pig.MyStorage(',','/');
执行提示成功后查看
grunt> cat myStorageOut
1,xiaohouzi,25/2,xiaohouzi2,24/3,xiaohouzi3,23/
源码类
package com.hcr.hadoop.pig; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.pig.Expression; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.ResourceStatistics; import org.apache.pig.StoreFunc; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.apache.pig.impl.util.StorageUtil; public class MyStorage extends LoadFunc implements StoreFuncInterface,LoadMetadata { private static final Log LOG = LogFactory.getLog(MyStorage.class); private static final String utf8 = "UTF-8"; private static String fieldDel = "\t"; private static String recordDel = "\n"; protected RecordReader recordReader = null; protected RecordWriter writer = null; public MyStorage() { } public MyStorage(String fieldDel) { this(fieldDel, "\n"); } public MyStorage(String fieldDel, String recordDel) { this.fieldDel = fieldDel; this.recordDel = recordDel; } @Override public void setLocation(String s, Job job) throws IOException { FileInputFormat.setInputPaths(job, s); } @Override public InputFormat getInputFormat() throws IOException { return new MyStorageInputFormat(recordDel); } @Override public void prepareToRead(RecordReader recordReader, PigSplit pigSplit) throws IOException { this.recordReader = recordReader; } @Override public Tuple getNext() throws IOException { try { boolean flag = recordReader.nextKeyValue(); if (!flag) { return null; } Text value = (Text) recordReader.getCurrentValue(); String[] strArray = value.toString().split(fieldDel); List lst = new ArrayList<String>(); int i = 0; for (String singleItem : strArray) { lst.add(i++, singleItem); } return TupleFactory.getInstance().newTuple(lst); } catch (InterruptedException e) { throw new ExecException("Read data error", PigException.REMOTE_ENVIRONMENT, e); } } /** * */ @Override public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } @Override public OutputFormat getOutputFormat() throws IOException { return new MyStorageOutputFormat(StorageUtil.parseFieldDel(fieldDel), this.recordDel); } @Override public void setStoreLocation(String location, Job job) throws IOException { job.getConfiguration().set("mapred.textoutputformat.separator", ""); FileOutputFormat.setOutputPath(job, new Path(location)); if ("true".equals(job.getConfiguration().get( "output.compression.enabled"))) { FileOutputFormat.setCompressOutput(job, true); String codec = job.getConfiguration().get( "output.compression.codec"); try { FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class .forName(codec)); } catch (ClassNotFoundException e) { throw new RuntimeException("Class not found: " + codec); } } else { // This makes it so that storing to a directory ending with ".gz" or // ".bz2" works. setCompression(new Path(location), job); } } private void setCompression(Path path, Job job) { String location = path.getName(); if (location.endsWith(".bz2") || location.endsWith(".bz")) { FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); } else if (location.endsWith(".gz")) { FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } else { FileOutputFormat.setCompressOutput(job, false); } } @Override public void checkSchema(ResourceSchema s) throws IOException { // TODO Auto-generated method stub } @Override public void prepareToWrite(RecordWriter writer) throws IOException { this.writer = writer; } @Override public void putNext(Tuple t) throws IOException { try { writer.write(null, t); } catch (InterruptedException e) { throw new IOException(e); } } @Override public void setStoreFuncUDFContextSignature(String signature) { // TODO Auto-generated method stub } @Override public void cleanupOnFailure(String location, Job job) throws IOException { StoreFunc.cleanupOnFailureImpl(location, job); } @Override public void cleanupOnSuccess(String location, Job job) throws IOException { // TODO Auto-generated method stub } @Override public ResourceSchema getSchema(String location, Job job) throws IOException { ResourceSchema rs=new ResourceSchema(); FieldSchema c1 = new FieldSchema("c1", DataType.INTEGER); FieldSchema c2 = new FieldSchema("c2", DataType.INTEGER); FieldSchema c3 = new FieldSchema("c3", DataType.DOUBLE); ResourceFieldSchema fs1 =new ResourceFieldSchema(c1); ResourceFieldSchema fs2 =new ResourceFieldSchema(c2); ResourceFieldSchema fs3 =new ResourceFieldSchema(c3); rs.setFields(new ResourceFieldSchema[]{fs1,fs2,fs3}); return rs; } @Override public ResourceStatistics getStatistics(String location, Job job) throws IOException { // TODO Auto-generated method stub return null; } @Override public String[] getPartitionKeys(String location, Job job) throws IOException { // TODO Auto-generated method stub return null; } @Override public void setPartitionFilter(Expression partitionFilter) throws IOException { // TODO Auto-generated method stub } } class MyStorageInputFormat extends TextInputFormat { private final String recordDel; public MyStorageInputFormat(String recordDel) { this.recordDel = recordDel; } @Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); if (recordDel != null) { delimiter = recordDel; } byte[] recordDelimiterBytes = null; if (null != delimiter){ try { recordDelimiterBytes = decode(delimiter).getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return new LineRecordReader(recordDelimiterBytes); } /** * 工作流传过来的列分隔符,有可能是特殊字符,用八进制或者十六进制表示 * @throws IOException */ public static String decode(String str) throws IOException { String re = str; if (str != null && str.startsWith("\\")) { str = str.substring(1, str.length()); String[] chars = str.split("\\\\"); byte[] bytes = new byte[chars.length]; for (int i = 0; i < chars.length; i++) { if (chars[i].equals("t")) { bytes[i] = 9; } else if (chars[i].equals("r")) { bytes[i] = 13; } else if (chars[i].equals("n")) { bytes[i] = 10; } else if (chars[i].equals("b")) { bytes[i] = 8; } else { bytes[i] = Byte.decode(chars[i]); } } try { re = new String(bytes, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new IOException(str, e); } } return re; } } class MyStorageOutputFormat extends TextOutputFormat<WritableComparable, Tuple> { private final byte fieldDel; private final String recordDel; public MyStorageOutputFormat(byte delimiter) { this(delimiter, "\n"); } public MyStorageOutputFormat(byte delimiter, String recordDel) { this.fieldDel = delimiter; this.recordDel = recordDel; } protected static class MyRecordWriter extends TextOutputFormat.LineRecordWriter<WritableComparable, Tuple> { private static byte[] newline; private final byte fieldDel; public MyRecordWriter(DataOutputStream out, byte fieldDel) throws UnsupportedEncodingException { this(out, fieldDel, "\n".getBytes("UTF-8")); } public MyRecordWriter(DataOutputStream out, byte fieldDel, byte[] record) { super(out); this.fieldDel = fieldDel; this.newline = record; } public synchronized void write(WritableComparable key, Tuple value) throws IOException { int sz = value.size(); for (int i = 0; i < sz; i++) { StorageUtil.putField(out, value.get(i)); if (i != sz - 1) { out.writeByte(fieldDel); } } out.write(newline); } } @Override public RecordWriter<WritableComparable, Tuple> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass( job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new MyRecordWriter(fileOut, fieldDel, this.recordDel.getBytes()); } else { FSDataOutputStream fileOut = fs.create(file, false); return new MyRecordWriter(new DataOutputStream( codec.createOutputStream(fileOut)), fieldDel, this.recordDel.getBytes()); } } }
本人试验 \001 和 \002同样可以
grunt> register /home/pig/pig-0.11.0/udflib/myStorage.jar grunt> cat X; keyDataKNZKCZY:ZDKJS:616150:AFS:3842708d_20131219194420-642464756keyDataKNZKCZY:ZDKJS:616614:AFS:3843920d_20131219194420-642464756keyDataKNZKCZY:ZDKJS:616661:AFS:3844040d_20131219194420-642464756 grunt> a = load 'X' using com.hcr.hadoop.pig.MyStorage('\\001','\\002'); grunt> dump a; (keyData,KNZKCZY:ZDKJS:616150:AFS:3842708,d_20131219194420-642464756) (keyData,KNZKCZY:ZDKJS:616614:AFS:3843920,d_20131219194420-642464756) (keyData,KNZKCZY:ZDKJS:616661:AFS:3844040,d_20131219194420-642464756) grunt>
有的时候如果加载模式不想指定具体模式(比如太多了字段,或者不够公有化)就想使用已存在的模式
实现LoadMetadata接口,然后
重写
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
ResourceSchema rs=new ResourceSchema();
FieldSchema c1 = new FieldSchema("c1", DataType.INTEGER);
FieldSchema c2 = new FieldSchema("c2", DataType.INTEGER);
FieldSchema c3 = new FieldSchema("c3", DataType.DOUBLE);
ResourceFieldSchema fs1 =new ResourceFieldSchema(c1);
ResourceFieldSchema fs2 =new ResourceFieldSchema(c2);
ResourceFieldSchema fs3 =new ResourceFieldSchema(c3);
rs.setFields(new ResourceFieldSchema[]{fs1,fs2,fs3});
return rs;
}
这一个简单的例子中就返回了直接使用模式的形式
grunt> register /home/pig/pig-0.11.0/udflib/myStorage.jar
grunt> a = load 'student' using com.hcr.hadoop.pig.MyStorage(',','/');
grunt> describe a;
a: {c1: int,c2: int,c3: double}
grunt> b = foreach a generate c1,c2,c3;
grunt> describe b;
b: {c1: int,c2: int,c3: double}
相关推荐
在这个主题中,我们将深入探讨 `urldecode`、`row_number` 和 `tomap` 这三个 UDF 在 Pig 中的应用以及如何在 CDH4.1.2 版本中实现它们。 1. **URLDecode UDF**: `urldecode` 是一个用于解码 URL 编码的字符串的...
然后,我们可以将项目编译成 jar 包,指定生成的 jar 包名和存储位置。 四、上传至 Hive 服务器并注册自定义函数 将生成的 jar 包上传至 Hive 服务器,然后注册自定义函数。我们可以使用以下命令注册临时函数: ``...
2. **UDF结构**:UDF通常包括几个关键部分,如初始化函数(`initFunctionObjects`)、计算函数(如`computeProperties`)和边界条件函数(如`patchFunctionObjects`)。每个函数都有其特定的作用,需要按照OpenFOAM...
phoenix-udf自定义函数,上传到hdfs的lib目录,测试udf自定义函数功能,有需要可以下载;select QUARTER(birth) from person
由于impala处理日期的函数如date_sub(),date_trunc(),last_day()等这些日期处理函数还需要进行日期格式化为yyyy-MM-dd使用,sql代码段过长,导致频繁嵌套过于复杂.所以自定义udf函数解决这些问题.以下为实现过程.
在Hive中,UDF分为三种类型:UDF(单行函数)、UDAF(累积聚合函数)和UDTF(多行转换函数)。在这里,我们只需要UDF,因为它适用于处理单行数据。 1. **编写Java类**: 要创建一个UDF,你需要编写一个Java类,该...
综上所述,Hive UDF的开发和使用涉及到Java编程、Hive和Hadoop的库依赖管理、函数注册以及在实际查询中的应用。了解和掌握这些知识点,能够帮助开发者更好地利用Hive UDF解决复杂的数据处理问题。
在本文中,我们将通过实例代码,详细介绍如何开发和使用 Java Hive UDF 函数。 UDF 函数的实现 首先,我们需要在 Maven 项目中添加依赖项,包括 Hadoop 和 Hive。 Maven 项目的 POM 文件如下所示: ```xml ...
本文将详细讲解如何通过自定义函数(UDF)来处理各种格式的手机号码,确保数据的质量和一致性。我们将涵盖正则表达式在手机号码清洗中的应用,以及如何编写和使用Java UDF在Hive和Impala中实现这个过程。 1. **手机...
一、UDF相关概念 ...如果udf嵌套复杂,可以重写一个嵌套层数较少且可以实现相同功能的udf,使性能成倍提升 针对过滤类的udf,将过滤率高的放在前面,减少中间结果,避免不必要的计算 二、UDF的使用 1、建hive表
数据架构师第013节函数概述和udf演示第14节UDF实战:实现udf.mp4
RSAUdf.java可能包含了自定义的用户定义函数(UDF),这些函数用于执行RSA加解密操作。而KeyRSA.java可能包含了生成和管理RSA密钥对的代码。 在RSAUdf.java中,常见的方法可能有`encrypt()`和`decrypt()`。`encrypt...
___下载.zip" 提供的资源显然是关于Hive的自定义用户定义函数(UDF)和一些实用的函数库,这对于优化Hive查询和处理复杂的数据操作至关重要。 1. **自定义UDF(User Defined Functions)**: UDF是Hive提供的扩展...
* 脱敏UDF函数 * 功能:对一些敏感信息进行脱敏处理,替换方式可选择自定义替换,如'#','*'等,,如不指定脱敏符号,使用个随机字符替换 * 脱敏位置可自定义,不指定位置,会对数据进行全脱敏 * 例如身份证信息: ...
这个类需要继承 Hive 中提供的 UDF 抽象类,并重写其中的方法。 步骤二:编译打包 将编写好的 Java 类编译成一个 JAR 文件,并包含所有依赖的库。确保 JAR 文件中包含 Hive 的相关类和接口。 步骤三:将 JAR ...
AES解密UDF函数的主要目的是对存储在Hive表中的加密数据进行解密,以便于进一步的数据分析和处理。此函数适用于已经使用AES算法加密的数据。 ##### 2.2 函数实现流程 实现AES解密UDF函数的流程主要包括以下几个...
5. **编译与验证**:UDF代码编写完成后,需要在Fluent环境中编译并加载,然后通过模拟验证其是否正确模拟了预期的振动行为。 压缩包中的多个`.c`文件(udfxx.c及副本)可能代表不同版本或尝试的UDF源代码。这可能是...
在旅游集市数仓建设中,可能会用到各种UDF(User Defined Function,用户自定义函数)来处理和分析数据。这些UDF函数可以根据具体的需求和业务场景来设计和实现。以提高数据处理的效率和灵活性。以下是一些在旅游...
Maxcompute UDF 函数打包和注册详解 在大数据处理中,Maxcompute UDF 函数是非常重要的一部分,它...这些知识点对开发者来说非常重要,因为它们能够帮助开发者更好地使用Maxcompute UDF 函数,提高开发效率和质量。
fluent中使用UDF代码,实现边界源相的加载