`
qindongliang1922
  • 浏览: 2180894 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117400
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125816
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59781
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71224
社区版块
存档分类
最新评论

玩转大数据系列之如何给Apache Pig自定义存储形式(四)

    博客分类:
  • Pig
阅读更多
Pig里面内置大量的工具函数,也开放了大量的接口,来给我们开发者使用,通过UDF,我们可以非常方便的完成某些Pig不直接支持或没有的的功能,比如散仙前面几篇文章写的将pig分析完的结果,存储到各种各样的介质里面,而不仅仅局限于HDFS,当然,我们也可以在都存。



那么如何实现自己的存储UDF呢? 提到这里,我们不得不说下pig里面的load和store函数,load函数是从某个数据源,加载数据,一般都是从HDFS上加载,而store函数则是将分析完的结果,存储到HDFS用的,所以,我们只需继承重写store的功能函数StoreFunc即可完成我们的大部分需求,懂的了这个,我们就可以将结果任意存储了,可以存到数据库,也可以存到索引文件,也可以存入本地txt,excel等等


下面先看下StoreFunc的源码:
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;


/**
* StoreFuncs take records from Pig's processing and store them into a data store.  Most frequently
* this is an HDFS file, but it could also be an HBase instance, RDBMS, etc.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class StoreFunc implements StoreFuncInterface {

    /**
     * This method is called by the Pig runtime in the front end to convert the
     * output location to an absolute path if the location is relative. The
     * StoreFunc implementation is free to choose how it converts a relative 
     * location to an absolute location since this may depend on what the location
     * string represent (hdfs path or some other data source). 
     *  
     * 
     * @param location location as provided in the "store" statement of the script
     * @param curDir the current working direction based on any "cd" statements
     * in the script before the "store" statement. If there are no "cd" statements
     * in the script, this would be the home directory - 
     * <pre>/user/<username> </pre>
     * @return the absolute location based on the arguments passed
     * @throws IOException if the conversion is not possible
     */
    @Override
    public String relToAbsPathForStoreLocation(String location, Path curDir) 
    throws IOException {
        return LoadFunc.getAbsolutePath(location, curDir);
    }

    /**
     * Return the OutputFormat associated with StoreFunc.  This will be called
     * on the front end during planning and on the backend during
     * execution. 
     * @return the {@link OutputFormat} associated with StoreFunc
     * @throws IOException if an exception occurs while constructing the 
     * OutputFormat
     *
     */
    public abstract OutputFormat getOutputFormat() throws IOException;

    /**
     * Communicate to the storer the location where the data needs to be stored.  
     * The location string passed to the {@link StoreFunc} here is the 
     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
     * This method will be called in the frontend and backend multiple times. Implementations
     * should bear in mind that this method is called multiple times and should
     * ensure there are no inconsistent side effects due to the multiple calls.
     * {@link #checkSchema(ResourceSchema)} will be called before any call to
     * {@link #setStoreLocation(String, Job)}.
     * 

     * @param location Location returned by 
     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
     * @param job The {@link Job} object
     * @throws IOException if the location is not valid.
     */
    public abstract void setStoreLocation(String location, Job job) throws IOException;
 
    /**
     * Set the schema for data to be stored.  This will be called on the
     * front end during planning if the store is associated with a schema.
     * A Store function should implement this function to
     * check that a given schema is acceptable to it.  For example, it
     * can check that the correct partition keys are included;
     * a storage function to be written directly to an OutputFormat can
     * make sure the schema will translate in a well defined way.  Default implementation
     * is a no-op.
     * @param s to be checked
     * @throws IOException if this schema is not acceptable.  It should include
     * a detailed error message indicating what is wrong with the schema.
     */
    @Override
    public void checkSchema(ResourceSchema s) throws IOException {
        // default implementation is a no-op
    }

    /**
     * Initialize StoreFunc to write data.  This will be called during
     * execution on the backend before the call to putNext.
     * @param writer RecordWriter to use.
     * @throws IOException if an exception occurs during initialization
     */
    public abstract void prepareToWrite(RecordWriter writer) throws IOException;

    /**
     * Write a tuple to the data store.
     * 
     * @param t the tuple to store.
     * @throws IOException if an exception occurs during the write
     */
    public abstract void putNext(Tuple t) throws IOException;
    
    /**
     * This method will be called by Pig both in the front end and back end to
     * pass a unique signature to the {@link StoreFunc} which it can use to store
     * information in the {@link UDFContext} which it needs to store between
     * various method invocations in the front end and back end. This method 
     * will be called before other methods in {@link StoreFunc}.  This is necessary
     * because in a Pig Latin script with multiple stores, the different
     * instances of store functions need to be able to find their (and only their)
     * data in the UDFContext object.  The default implementation is a no-op.
     * @param signature a unique signature to identify this StoreFunc
     */
    @Override
    public void setStoreFuncUDFContextSignature(String signature) {
        // default implementation is a no-op
    }
    
    /**
     * This method will be called by Pig if the job which contains this store
     * fails. Implementations can clean up output locations in this method to
     * ensure that no incorrect/incomplete results are left in the output location.
     * The default implementation  deletes the output location if it
     * is a {@link FileSystem} location.
     * @param location Location returned by 
     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information. 
     */
    @Override
    public void cleanupOnFailure(String location, Job job) 
    throws IOException {
        cleanupOnFailureImpl(location, job);
    }

    /**
     * This method will be called by Pig if the job which contains this store
     * is successful, and some cleanup of intermediate resources is required.
     * Implementations can clean up output locations in this method to
     * ensure that no incorrect/incomplete results are left in the output location.
     * @param location Location returned by 
     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information. 
     */
    @Override
    public void cleanupOnSuccess(String location, Job job) 
    throws IOException {
        // DEFAULT: DO NOTHING, user-defined overrides can
        // call cleanupOnFailureImpl(location, job) or ...?
    }
    
    /**
     * Default implementation for {@link #cleanupOnFailure(String, Job)}
     * and {@link #cleanupOnSuccess(String, Job)}.  This removes a file
     * from HDFS.
     * @param location file name (or URI) of file to remove
     * @param job Hadoop job, used to access the appropriate file system.
     * @throws IOException
     */
    public static void cleanupOnFailureImpl(String location, Job job) 
    throws IOException {        
        Path path = new Path(location);
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        if(fs.exists(path)){
            fs.delete(path, true);
        }    
    }
    
    /**
     * Issue a warning.  Warning messages are aggregated and reported to
     * the user.
     * @param msg String message of the warning
     * @param warningEnum type of warning
     */
    public final void warn(String msg, Enum warningEnum) {
        Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
        counter.increment(1);
    }
}



这里面有许多方法,但并不都需要我们重新定义的,一般来说,我们只需要重写如下的几个抽象方法即可:

(1)getOutputFormat方法,与Hadoop的OutFormat对应,在最终的输出时,会根据不同的format方法,生成不同的形式。
(2)setStoreLocation方法,这个方法定义了生成文件的路径,如果不是存入HDFS上,则可以忽略。
(3)prepareToWrite 在写入数据之前做一些初始化工作
(4)putNext从Pig里面传递过来最终需要存储的数据




在1的步骤我们知道,需要提供一个outputFormat的类,这时就需要我们继承hadoop里面的某个outputformat基类,然后重写getRecordWriter方法,接下来我们还可能要继承RecordWriter类,来定义我们自己的输出格式,可能是一行txt数据,也有可能是一个对象,或一个索引集合等等,如下面支持lucene索引的outputformat
package com.pig.support.lucene;



import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;

/**
 * 继承FileOutputFormat,重写支持Lucene格式的outputFormat策略
 * */
public class LuceneOutputFormat extends FileOutputFormat<Writable, Document> {

	String location;
	FileSystem fs;
	String taskid;

	FileOutputCommitter committer;
	AtomicInteger counter = new AtomicInteger();

	public LuceneOutputFormat(String location) {
		this.location = location;
	}
	
	@Override
	public RecordWriter<Writable, Document> getRecordWriter(
			TaskAttemptContext ctx) throws IOException, InterruptedException {

		Configuration conf = ctx.getConfiguration();
		fs = FileSystem.get(conf);

		File baseDir = new File(System.getProperty("java.io.tmpdir"));
		String baseName = System.currentTimeMillis() + "-";
		File tempDir = new File(baseDir, baseName + counter.getAndIncrement());
		tempDir.mkdirs();
		tempDir.deleteOnExit();

		return new LuceneRecordWriter(
				(FileOutputCommitter) getOutputCommitter(ctx), tempDir);
	}

	/**
	 * Write out the LuceneIndex to a local temporary location.<br/>
	 * On commit/close the index is copied to the hdfs output directory.<br/>
	 *
	 */
	static class LuceneRecordWriter extends RecordWriter<Writable, Document> {

		final IndexWriter writer;
		final FileOutputCommitter committer;
		final File tmpdir;

		public LuceneRecordWriter(FileOutputCommitter committer, File tmpdir) {
			try {
				this.committer = committer;
				this.tmpdir = tmpdir;
				IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_10_2,
						new StandardAnalyzer());
				LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy();
			    mergePolicy.setMergeFactor(10);
			    //mergePolicy.setUseCompoundFile(false);
			    config.setMergePolicy(mergePolicy);
			    config.setMergeScheduler(new SerialMergeScheduler());

				writer = new IndexWriter(FSDirectory.open(tmpdir),
						config);
				
			} catch (IOException e) {
				RuntimeException exc = new RuntimeException(e.toString(), e);
				exc.setStackTrace(e.getStackTrace());
				throw exc;
			}
		}

		@Override
		public void close(final TaskAttemptContext ctx) throws IOException,
				InterruptedException {
			//use a thread for status polling
			final Thread th = new Thread() {
				public void run() {
					ctx.progress();
					try {
						Thread.sleep(500);
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
						return;
					}
				}
			};
			th.start();
			try {
				writer.forceMerge(1);
				writer.close();
				// move all files to part
				Configuration conf = ctx.getConfiguration();

				Path work = committer.getWorkPath();
				Path output = new Path(work, "index-"
						+ ctx.getTaskAttemptID().getTaskID().getId());
				FileSystem fs = FileSystem.get(conf);

				FileUtil.copy(tmpdir, fs, output, true, conf);
			} finally {
				th.interrupt();
			}
		}

		@Override
		public void write(Writable key, Document doc) throws IOException,
				InterruptedException {
			writer.addDocument(doc);

		}

	}
}



最后总结一下,自定义输入格式的步骤:

(1)继承StoreFunc函数,重写其方法
(2)继承一个outputformat基类,重写自己的outputformat类
(2)继承一个RecodeWriter,重写自己的writer方法


当然这并不都是必须的,比如在向数据库存储的时候,我们就可以直接在putNext的时候,获取,保存为集合,然后在OutputCommitter提交成功之后,commit我们的数据,如果保存失败,我们也可以在abort方法里回滚我们的数据。


这样以来,无论我们存储哪里,都可以通过以上步骤实现,非常灵活


欢迎扫码关注微信公众号:我是攻城师(woshigcs)

本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!





分享到:
评论

相关推荐

    3-3.新一代大数据调度+-Apache+DolphinScheduler架构演进+&+Roadmap.pdf

    自 2021 年 03 月 18 日正式成为 Apache 顶级项目以来,DolphinScheduler 致力于在数据工作流编排中“解决复杂的大数据任务依赖及触发关系,让各种大数据任务类型开箱即用”。目前,已经有 400+ 公司在生产上使用 ...

    合肥工业大学 Python玩转大数据课件+全部实验代码+大作业代码及报告

    合肥工业大学的这门“Python玩转大数据”课程,旨在让学生掌握如何利用Python进行大数据分析和处理,从而更好地理解和解决实际问题。 一、Python大数据处理基础 1. Numpy库:Numpy是Python中用于科学计算的核心库...

    apache pig 基础及应用

    apache pig 基础及应用,urldecode row_number web日志分析 根据 用户行为 做出 简易的 相似度 判断。

    flume-demo_大数据_flume_DEMO_自定义拦截器_

    这个名为 "flume-demo_大数据_flume_DEMO_自定义拦截器_" 的项目,显然是一个示例,展示了如何在 Flume 中创建并使用自定义拦截器来过滤数据。下面我们将深入探讨 Flume 的基本概念、拦截器的作用以及如何自定义拦截...

    大数据+Apache Doris资料包+示例代码

    大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例...

    Python玩转大数据的大作业.zip

    在本压缩包“Python玩转大数据的大作业.zip”中,我们可以看到一个名为“zgl_resource”的文件,这可能是一个包含一系列资源的文件夹,用于支持一个关于使用Python处理大数据的项目或课程作业。Python作为一门强大的...

    apache日志hadoop大数据

    Apache日志分析与Hadoop大数据生态系统中的Hive和HBase整合使用涉及到多个关键技术点。首先,我们需要了解Hadoop作为一个大数据处理框架,其核心组件包括HDFS和MapReduce。HDFS负责存储大数据,而MapReduce负责处理...

    大数据内部Hadoop系列培训经典内容,包括大数据系列架构

    大数据是21世纪信息技术发展的重要领域,它涵盖了海量数据的收集、存储、处理和分析,为企业决策提供深度洞察。在本套内部Hadoop系列培训资料中,我们将深入探讨大数据技术的核心——Hadoop及其生态系统,包括Spark...

    Apache Pig用法总结

    Pig之所以受到青睐,是因为它简化了数据处理流程,降低了编程的复杂性,并且相较于传统编程语言,可以减少大量的代码量。 在配置Pig语法高亮时需要注意的是,在Windows环境下可能没有直接的插件支持,但是可以通过...

    尚硅谷大数据技术之企业SQL面试题_大数据_sql_尚硅谷sql_doc_

    在大数据环境里,如Hadoop的Hive、Pig或Spark SQL,都提供了SQL接口,使得分析人员能够以熟悉的SQL语法操作分布式存储的数据。 2. 数据清洗与预处理:在大数据项目中,原始数据往往需要经过一系列清洗步骤,SQL可以...

    大数据系列报告之工业大数据白皮书.zip

    《大数据系列报告之工业大数据白皮书》涵盖了大数据在工业领域的广泛应用和深远影响。工业大数据是信息化与工业化深度融合的产物,它将海量、多样、高速和有价值的数据转化为企业决策、生产优化、服务创新的关键资源...

    大数据漫谈系列之:大数据怎么发挥大价值(亲测可用)

    "大数据漫谈系列之:大数据怎么发挥大价值(亲测可用)"这个标题暗示了我们将深入探讨如何利用大数据技术创造实际价值,尤其在C#编程环境下。在这个系列中,我们将会看到大数据的实际应用、处理流程以及如何在C#中实现...

    Apache pig的性能优化

    Apache Pig是一种高级数据流语言和执行框架,用于处理和分析大数据,其运行在Hadoop上。Pig提供了一种名为Pig Latin的数据处理语言,它是一种类SQL语言,可以让用户编写更简洁的代码来处理数据,相对于传统的...

    大数据系列-Hive

    【大数据系列-Hive】 Hive是Apache软件基金会下的一个数据仓库工具,主要设计用于处理和管理大规模的数据集,尤其在大规模分布式计算环境如Hadoop上。Hive提供了SQL-like的查询语言,称为HQL(Hive Query Language...

    7-2工业大数据结合医疗领域APACHE体系的研究及应用.pdf

    【标题】:“7-2工业大数据结合医疗领域APACHE体系的研究及应用.pdf”涉及的知识点 【摘要】:本文主要探讨了工业大数据在医疗领域的应用,以及如何利用APACHE(Acute Physiology and Chronic Health Evaluation)...

    阿里大数据之路——关键总结版.pdf

    阿里大数据之路的总结主要涵盖了大数据开发的关键技术和架构,包括数据的采集、存储、处理和分析等方面。以下是对这些内容的详细解析: 1. 数据采集 数据采集是大数据流程的第一步,涉及对Web和APP产生的数据进行...

    大数据系列2020-大数据基础资料汇总(精选).zip

    大数据是21世纪信息技术发展的重要领域,它涵盖了海量数据的收集、存储、处理和分析,为企业决策、科学研究、社会管理提供了前所未有的洞察力。本压缩包文件“大数据系列2020-大数据基础资料汇总(精选).zip”是...

    大数据--Apache Spark

    ### 大数据——Apache Spark 入门知识 在当今信息化高度发达的时代,数据量的爆炸性增长已经成为一种常态。随着互联网、移动设备、科学实验等各个领域的数据不断涌现,如何高效处理这些海量数据成为了亟待解决的...

Global site tag (gtag.js) - Google Analytics