`
ruishen
  • 浏览: 52096 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Pig 重写加载函数和存储函数UDF

 
阅读更多

pig自带的pigstorage不能指定行分隔符,所以自己重写了一个简单的UDF类,可以指定列和行的分隔符,之前研究过的简单的,

http://blog.csdn.net/ruishenh/article/details/12048067

但是弊端大,所以这次重写一下。


操作步骤打好包上传到服务器,

grunt> register /home/pig/pig-0.11.0/udflib/myStorage.jar

grunt> cat student;
1,xiaohouzi,25/2,xiaohouzi2,24/3,xiaohouzi3,23

grunt> a = load 'student' using com.hcr.hadoop.pig.MyStorage(',','/');

grunt> dump a;

(1,xiaohouzi,25)
(2,xiaohouzi2,24)
(3,xiaohouzi3,23)

grunt> store a into 'myStorageOut' using com.hcr.hadoop.pig.MyStorage(',','/');

执行提示成功后查看

grunt> cat myStorageOut
1,xiaohouzi,25/2,xiaohouzi2,24/3,xiaohouzi3,23/



源码类

package com.hcr.hadoop.pig;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.StorageUtil;

public class MyStorage extends LoadFunc implements StoreFuncInterface,LoadMetadata {

	private static final Log LOG = LogFactory.getLog(MyStorage.class);

	private static final String utf8 = "UTF-8";

	private static String fieldDel = "\t";

	private static String recordDel = "\n";

	protected RecordReader recordReader = null;

	protected RecordWriter writer = null;

	public MyStorage() {
	}

	public MyStorage(String fieldDel) {
		this(fieldDel, "\n");
	}

	public MyStorage(String fieldDel, String recordDel) {
		this.fieldDel = fieldDel;
		this.recordDel = recordDel;
	}

	@Override
	public void setLocation(String s, Job job) throws IOException {
		FileInputFormat.setInputPaths(job, s);
	}

	@Override
	public InputFormat getInputFormat() throws IOException {
		return new MyStorageInputFormat(recordDel);
	}

	@Override
	public void prepareToRead(RecordReader recordReader, PigSplit pigSplit)
			throws IOException {
		this.recordReader = recordReader;
	}

	@Override
	public Tuple getNext() throws IOException {
		try {
			boolean flag = recordReader.nextKeyValue();
			if (!flag) {
				return null;
			}
			Text value = (Text) recordReader.getCurrentValue();
			String[] strArray = value.toString().split(fieldDel);
			List lst = new ArrayList<String>();
			int i = 0;
			for (String singleItem : strArray) {
				lst.add(i++, singleItem);
			}
			return TupleFactory.getInstance().newTuple(lst);
		} catch (InterruptedException e) {
			throw new ExecException("Read data error",
					PigException.REMOTE_ENVIRONMENT, e);
		}
	}

	/**
	 * */
	@Override
	public String relToAbsPathForStoreLocation(String location, Path curDir)
			throws IOException {
		return LoadFunc.getAbsolutePath(location, curDir);
	}

	@Override
	public OutputFormat getOutputFormat() throws IOException {
		return new MyStorageOutputFormat(StorageUtil.parseFieldDel(fieldDel),
				this.recordDel);
	}

	@Override
	public void setStoreLocation(String location, Job job) throws IOException {
		job.getConfiguration().set("mapred.textoutputformat.separator", "");
		FileOutputFormat.setOutputPath(job, new Path(location));
		if ("true".equals(job.getConfiguration().get(
				"output.compression.enabled"))) {
			FileOutputFormat.setCompressOutput(job, true);
			String codec = job.getConfiguration().get(
					"output.compression.codec");
			try {
				FileOutputFormat.setOutputCompressorClass(job,
						(Class<? extends CompressionCodec>) Class
								.forName(codec));
			} catch (ClassNotFoundException e) {
				throw new RuntimeException("Class not found: " + codec);
			}
		} else {
			// This makes it so that storing to a directory ending with ".gz" or
			// ".bz2" works.
			setCompression(new Path(location), job);
		}

	}

	private void setCompression(Path path, Job job) {
		String location = path.getName();
		if (location.endsWith(".bz2") || location.endsWith(".bz")) {
			FileOutputFormat.setCompressOutput(job, true);
			FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
		} else if (location.endsWith(".gz")) {
			FileOutputFormat.setCompressOutput(job, true);
			FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
		} else {
			FileOutputFormat.setCompressOutput(job, false);
		}
	}

	@Override
	public void checkSchema(ResourceSchema s) throws IOException {
		// TODO Auto-generated method stub

	}

	@Override
	public void prepareToWrite(RecordWriter writer) throws IOException {
		this.writer = writer;
	}

	@Override
	public void putNext(Tuple t) throws IOException {
		try {
			writer.write(null, t);
		} catch (InterruptedException e) {
			throw new IOException(e);
		}
	}

	@Override
	public void setStoreFuncUDFContextSignature(String signature) {
		// TODO Auto-generated method stub

	}

	@Override
	public void cleanupOnFailure(String location, Job job) throws IOException {
		StoreFunc.cleanupOnFailureImpl(location, job);
	}

	@Override
	public void cleanupOnSuccess(String location, Job job) throws IOException {
		// TODO Auto-generated method stub

	}

	@Override
	public ResourceSchema getSchema(String location, Job job)
			throws IOException {
			ResourceSchema rs=new ResourceSchema();
            FieldSchema c1 = new FieldSchema("c1", DataType.INTEGER);  
            FieldSchema c2 = new FieldSchema("c2", DataType.INTEGER);
            FieldSchema c3 = new FieldSchema("c3", DataType.DOUBLE);
            ResourceFieldSchema fs1 =new ResourceFieldSchema(c1);
            ResourceFieldSchema fs2 =new ResourceFieldSchema(c2);
            ResourceFieldSchema fs3 =new ResourceFieldSchema(c3);
            rs.setFields(new ResourceFieldSchema[]{fs1,fs2,fs3});
            return rs;
	}

	@Override
	public ResourceStatistics getStatistics(String location, Job job)
			throws IOException {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public String[] getPartitionKeys(String location, Job job)
			throws IOException {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public void setPartitionFilter(Expression partitionFilter)
			throws IOException {
		// TODO Auto-generated method stub
		
	}
}

class MyStorageInputFormat extends TextInputFormat {

	private final String recordDel;

	public MyStorageInputFormat(String recordDel) {
		this.recordDel = recordDel;
	}

	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context) {
		String delimiter = context.getConfiguration().get(
				"textinputformat.record.delimiter");
		if (recordDel != null) {
			delimiter = recordDel;
		}
		byte[] recordDelimiterBytes = null;
		if (null != delimiter){
			try {
				recordDelimiterBytes = decode(delimiter).getBytes("UTF-8");
			} catch (UnsupportedEncodingException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return new LineRecordReader(recordDelimiterBytes);
	}
	/**
	 * 工作流传过来的列分隔符,有可能是特殊字符,用八进制或者十六进制表示
	 * @throws IOException 
	 */
	public static String decode(String str) throws IOException {
		String re = str;
		if (str != null && str.startsWith("\\")) {
			str = str.substring(1, str.length());
			String[] chars = str.split("\\\\");
			byte[] bytes = new byte[chars.length];
			for (int i = 0; i < chars.length; i++) {
				if (chars[i].equals("t")) {
					bytes[i] = 9;
				} else if (chars[i].equals("r")) {
					bytes[i] = 13;
				} else if (chars[i].equals("n")) {
					bytes[i] = 10;
				} else if (chars[i].equals("b")) {
					bytes[i] = 8;
				} else {
					bytes[i] = Byte.decode(chars[i]);
				}
			}
			try {
				re = new String(bytes, "UTF-8");
			} catch (UnsupportedEncodingException e) {
				throw new IOException(str, e);
			}
		}
		return re;
	}

}

class MyStorageOutputFormat extends TextOutputFormat<WritableComparable, Tuple> {

	private final byte fieldDel;

	private final String recordDel;

	public MyStorageOutputFormat(byte delimiter) {
		this(delimiter, "\n");
	}

	public MyStorageOutputFormat(byte delimiter, String recordDel) {
		this.fieldDel = delimiter;
		this.recordDel = recordDel;
	}

	protected static class MyRecordWriter extends
			TextOutputFormat.LineRecordWriter<WritableComparable, Tuple> {

		private static byte[] newline;

		private final byte fieldDel;

		public MyRecordWriter(DataOutputStream out, byte fieldDel)
				throws UnsupportedEncodingException {
			this(out, fieldDel, "\n".getBytes("UTF-8"));
		}

		public MyRecordWriter(DataOutputStream out, byte fieldDel, byte[] record) {
			super(out);
			this.fieldDel = fieldDel;
			this.newline = record;
		}

		public synchronized void write(WritableComparable key, Tuple value)
				throws IOException {
			int sz = value.size();
			for (int i = 0; i < sz; i++) {
				StorageUtil.putField(out, value.get(i));
				if (i != sz - 1) {
					out.writeByte(fieldDel);
				}
			}
			out.write(newline);
		}
	}

	@Override
	public RecordWriter<WritableComparable, Tuple> getRecordWriter(
			TaskAttemptContext job) throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
		boolean isCompressed = getCompressOutput(job);
		CompressionCodec codec = null;
		String extension = "";
		if (isCompressed) {
			Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
					job, GzipCodec.class);
			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
					conf);
			extension = codec.getDefaultExtension();
		}
		Path file = getDefaultWorkFile(job, extension);
		FileSystem fs = file.getFileSystem(conf);
		if (!isCompressed) {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new MyRecordWriter(fileOut, fieldDel,
					this.recordDel.getBytes());
		} else {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new MyRecordWriter(new DataOutputStream(
					codec.createOutputStream(fileOut)), fieldDel,
					this.recordDel.getBytes());
		}
	}

}

本人试验 \001 和 \002同样可以

grunt> register  /home/pig/pig-0.11.0/udflib/myStorage.jar 
grunt> cat X;
keyDataKNZKCZY:ZDKJS:616150:AFS:3842708d_20131219194420-642464756keyDataKNZKCZY:ZDKJS:616614:AFS:3843920d_20131219194420-642464756keyDataKNZKCZY:ZDKJS:616661:AFS:3844040d_20131219194420-642464756
grunt> a = load 'X' using com.hcr.hadoop.pig.MyStorage('\\001','\\002');
grunt> dump a;
(keyData,KNZKCZY:ZDKJS:616150:AFS:3842708,d_20131219194420-642464756)
(keyData,KNZKCZY:ZDKJS:616614:AFS:3843920,d_20131219194420-642464756)
(keyData,KNZKCZY:ZDKJS:616661:AFS:3844040,d_20131219194420-642464756)
grunt> 


有的时候如果加载模式不想指定具体模式(比如太多了字段,或者不够公有化)就想使用已存在的模式

实现LoadMetadata接口,然后

重写



@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
ResourceSchema rs=new ResourceSchema();
FieldSchema c1 = new FieldSchema("c1", DataType.INTEGER);
FieldSchema c2 = new FieldSchema("c2", DataType.INTEGER);
FieldSchema c3 = new FieldSchema("c3", DataType.DOUBLE);
ResourceFieldSchema fs1 =new ResourceFieldSchema(c1);
ResourceFieldSchema fs2 =new ResourceFieldSchema(c2);
ResourceFieldSchema fs3 =new ResourceFieldSchema(c3);
rs.setFields(new ResourceFieldSchema[]{fs1,fs2,fs3});
return rs;

}


这一个简单的例子中就返回了直接使用模式的形式

grunt> register /home/pig/pig-0.11.0/udflib/myStorage.jar
grunt> a = load 'student' using com.hcr.hadoop.pig.MyStorage(',','/');
grunt> describe a;
a: {c1: int,c2: int,c3: double}
grunt> b = foreach a generate c1,c2,c3;
grunt> describe b;
b: {c1: int,c2: int,c3: double}

分享到:
评论

相关推荐

    pig udf 函数(urldecode row_number tomap)

    在这个主题中,我们将深入探讨 `urldecode`、`row_number` 和 `tomap` 这三个 UDF 在 Pig 中的应用以及如何在 CDH4.1.2 版本中实现它们。 1. **URLDecode UDF**: `urldecode` 是一个用于解码 URL 编码的字符串的...

    hive自定义UDF编写函数.docx

    然后,我们可以将项目编译成 jar 包,指定生成的 jar 包名和存储位置。 四、上传至 Hive 服务器并注册自定义函数 将生成的 jar 包上传至 Hive 服务器,然后注册自定义函数。我们可以使用以下命令注册临时函数: ``...

    Fluent TUI 加载UDF 计算 超算

    2. **UDF结构**:UDF通常包括几个关键部分,如初始化函数(`initFunctionObjects`)、计算函数(如`computeProperties`)和边界条件函数(如`patchFunctionObjects`)。每个函数都有其特定的作用,需要按照OpenFOAM...

    phoenix-udf自定义函数测试jar包

    phoenix-udf自定义函数,上传到hdfs的lib目录,测试udf自定义函数功能,有需要可以下载;select QUARTER(birth) from person

    impala自定义日期处理的udf函数

    由于impala处理日期的函数如date_sub(),date_trunc(),last_day()等这些日期处理函数还需要进行日期格式化为yyyy-MM-dd使用,sql代码段过长,导致频繁嵌套过于复杂.所以自定义udf函数解决这些问题.以下为实现过程.

    Hive的Udf函数进行数据脱敏

    在Hive中,UDF分为三种类型:UDF(单行函数)、UDAF(累积聚合函数)和UDTF(多行转换函数)。在这里,我们只需要UDF,因为它适用于处理单行数据。 1. **编写Java类**: 要创建一个UDF,你需要编写一个Java类,该...

    hive UDF需要jar包

    综上所述,Hive UDF的开发和使用涉及到Java编程、Hive和Hadoop的库依赖管理、函数注册以及在实际查询中的应用。了解和掌握这些知识点,能够帮助开发者更好地利用Hive UDF解决复杂的数据处理问题。

    大数据 java hive udf函数的示例代码(手机号码脱敏)

    在本文中,我们将通过实例代码,详细介绍如何开发和使用 Java Hive UDF 函数。 UDF 函数的实现 首先,我们需要在 Maven 项目中添加依赖项,包括 Hadoop 和 Hive。 Maven 项目的 POM 文件如下所示: ```xml ...

    各种情况手机号清洗udf函数(hive impala)

    本文将详细讲解如何通过自定义函数(UDF)来处理各种格式的手机号码,确保数据的质量和一致性。我们将涵盖正则表达式在手机号码清洗中的应用,以及如何编写和使用Java UDF在Hive和Impala中实现这个过程。 1. **手机...

    hive自定义udf函数实战

    一、UDF相关概念 ...如果udf嵌套复杂,可以重写一个嵌套层数较少且可以实现相同功能的udf,使性能成倍提升 针对过滤类的udf,将过滤率高的放在前面,减少中间结果,避免不必要的计算 二、UDF的使用 1、建hive表

    数据架构师第013节函数概述和udf演示第14节UDF实战:实现udf.mp4

    数据架构师第013节函数概述和udf演示第14节UDF实战:实现udf.mp4

    java RSA加解密的udf函数

    RSAUdf.java可能包含了自定义的用户定义函数(UDF),这些函数用于执行RSA加解密操作。而KeyRSA.java可能包含了生成和管理RSA密钥对的代码。 在RSAUdf.java中,常见的方法可能有`encrypt()`和`decrypt()`。`encrypt...

    一些有用的自定义配置单元udf函数、特殊数组、json、数学、字符串函数。___下载.zip

    ___下载.zip" 提供的资源显然是关于Hive的自定义用户定义函数(UDF)和一些实用的函数库,这对于优化Hive查询和处理复杂的数据操作至关重要。 1. **自定义UDF(User Defined Functions)**: UDF是Hive提供的扩展...

    * hive脱敏UDF函数 *对一些敏感信息进行脱敏处理,替换位置可自定义,脱敏符号可随机也可自定义

    * 脱敏UDF函数 * 功能:对一些敏感信息进行脱敏处理,替换方式可选择自定义替换,如'#','*'等,,如不指定脱敏符号,使用个随机字符替换 * 脱敏位置可自定义,不指定位置,会对数据进行全脱敏 * 例如身份证信息: ...

    HIVE自定义UDF函数

    这个类需要继承 Hive 中提供的 UDF 抽象类,并重写其中的方法。 步骤二:编译打包 将编写好的 Java 类编译成一个 JAR 文件,并包含所有依赖的库。确保 JAR 文件中包含 Hive 的相关类和接口。 步骤三:将 JAR ...

    AES解密UDF函数.doc

    AES解密UDF函数的主要目的是对存储在Hive表中的加密数据进行解密,以便于进一步的数据分析和处理。此函数适用于已经使用AES算法加密的数据。 ##### 2.2 函数实现流程 实现AES解密UDF函数的流程主要包括以下几个...

    udf.zip_fluent 振动_udf_udf 振动_udf 简谐振动的控制_振动 udf

    5. **编译与验证**:UDF代码编写完成后,需要在Fluent环境中编译并加载,然后通过模拟验证其是否正确模拟了预期的振动行为。 压缩包中的多个`.c`文件(udfxx.c及副本)可能代表不同版本或尝试的UDF源代码。这可能是...

    旅游集市数仓建设可能用到的UDF函数

    在旅游集市数仓建设中,可能会用到各种UDF(User Defined Function,用户自定义函数)来处理和分析数据。这些UDF函数可以根据具体的需求和业务场景来设计和实现。以提高数据处理的效率和灵活性。以下是一些在旅游...

    Maxcompute UDF函手动打包以及注册.doc

    Maxcompute UDF 函数打包和注册详解 在大数据处理中,Maxcompute UDF 函数是非常重要的一部分,它...这些知识点对开发者来说非常重要,因为它们能够帮助开发者更好地使用Maxcompute UDF 函数,提高开发效率和质量。

    UDF实现边界源项加载.zip_UDF fluent_fluent udf源项_udf源项_源UDF_相UDF

    fluent中使用UDF代码,实现边界源相的加载

Global site tag (gtag.js) - Google Analytics