- 浏览: 2200476 次
- 性别:
- 来自: 北京
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
Solr中Group和Facet的用法 -
遇到的问题同楼上 为什么会返回null
那么如何实现自己的存储UDF呢? 提到这里,我们不得不说下pig里面的load和store函数,load函数是从某个数据源,加载数据,一般都是从HDFS上加载,而store函数则是将分析完的结果,存储到HDFS用的,所以,我们只需继承重写store的功能函数StoreFunc即可完成我们的大部分需求,懂的了这个,我们就可以将结果任意存储了,可以存到数据库,也可以存到索引文件,也可以存入本地txt,excel等等
(3)prepareToWrite 在写入数据之前做一些初始化工作
那么如何实现自己的存储UDF呢? 提到这里,我们不得不说下pig里面的load和store函数,load函数是从某个数据源,加载数据,一般都是从HDFS上加载,而store函数则是将分析完的结果,存储到HDFS用的,所以,我们只需继承重写store的功能函数StoreFunc即可完成我们的大部分需求,懂的了这个,我们就可以将结果任意存储了,可以存到数据库,也可以存到索引文件,也可以存入本地txt,excel等等
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.pig; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.PigStatusReporter; /** * StoreFuncs take records from Pig's processing and store them into a data store. Most frequently * this is an HDFS file, but it could also be an HBase instance, RDBMS, etc. */ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class StoreFunc implements StoreFuncInterface { /** * This method is called by the Pig runtime in the front end to convert the * output location to an absolute path if the location is relative. The * StoreFunc implementation is free to choose how it converts a relative * location to an absolute location since this may depend on what the location * string represent (hdfs path or some other data source). * * * @param location location as provided in the "store" statement of the script * @param curDir the current working direction based on any "cd" statements * in the script before the "store" statement. If there are no "cd" statements * in the script, this would be the home directory - * <pre>/user/<username> </pre> * @return the absolute location based on the arguments passed * @throws IOException if the conversion is not possible */ @Override public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } /** * Return the OutputFormat associated with StoreFunc. This will be called * on the front end during planning and on the backend during * execution. * @return the {@link OutputFormat} associated with StoreFunc * @throws IOException if an exception occurs while constructing the * OutputFormat * */ public abstract OutputFormat getOutputFormat() throws IOException; /** * Communicate to the storer the location where the data needs to be stored. * The location string passed to the {@link StoreFunc} here is the * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * This method will be called in the frontend and backend multiple times. Implementations * should bear in mind that this method is called multiple times and should * ensure there are no inconsistent side effects due to the multiple calls. * {@link #checkSchema(ResourceSchema)} will be called before any call to * {@link #setStoreLocation(String, Job)}. * * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object * @throws IOException if the location is not valid. */ public abstract void setStoreLocation(String location, Job job) throws IOException; /** * Set the schema for data to be stored. This will be called on the * front end during planning if the store is associated with a schema. * A Store function should implement this function to * check that a given schema is acceptable to it. For example, it * can check that the correct partition keys are included; * a storage function to be written directly to an OutputFormat can * make sure the schema will translate in a well defined way. Default implementation * is a no-op. * @param s to be checked * @throws IOException if this schema is not acceptable. It should include * a detailed error message indicating what is wrong with the schema. */ @Override public void checkSchema(ResourceSchema s) throws IOException { // default implementation is a no-op } /** * Initialize StoreFunc to write data. This will be called during * execution on the backend before the call to putNext. * @param writer RecordWriter to use. * @throws IOException if an exception occurs during initialization */ public abstract void prepareToWrite(RecordWriter writer) throws IOException; /** * Write a tuple to the data store. * * @param t the tuple to store. * @throws IOException if an exception occurs during the write */ public abstract void putNext(Tuple t) throws IOException; /** * This method will be called by Pig both in the front end and back end to * pass a unique signature to the {@link StoreFunc} which it can use to store * information in the {@link UDFContext} which it needs to store between * various method invocations in the front end and back end. This method * will be called before other methods in {@link StoreFunc}. This is necessary * because in a Pig Latin script with multiple stores, the different * instances of store functions need to be able to find their (and only their) * data in the UDFContext object. The default implementation is a no-op. * @param signature a unique signature to identify this StoreFunc */ @Override public void setStoreFuncUDFContextSignature(String signature) { // default implementation is a no-op } /** * This method will be called by Pig if the job which contains this store * fails. Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. * The default implementation deletes the output location if it * is a {@link FileSystem} location. * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query * any runtime job information. */ @Override public void cleanupOnFailure(String location, Job job) throws IOException { cleanupOnFailureImpl(location, job); } /** * This method will be called by Pig if the job which contains this store * is successful, and some cleanup of intermediate resources is required. * Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query * any runtime job information. */ @Override public void cleanupOnSuccess(String location, Job job) throws IOException { // DEFAULT: DO NOTHING, user-defined overrides can // call cleanupOnFailureImpl(location, job) or ...? } /** * Default implementation for {@link #cleanupOnFailure(String, Job)} * and {@link #cleanupOnSuccess(String, Job)}. This removes a file * from HDFS. * @param location file name (or URI) of file to remove * @param job Hadoop job, used to access the appropriate file system. * @throws IOException */ public static void cleanupOnFailureImpl(String location, Job job) throws IOException { Path path = new Path(location); FileSystem fs = path.getFileSystem(job.getConfiguration()); if(fs.exists(path)){ fs.delete(path, true); } } /** * Issue a warning. Warning messages are aggregated and reported to * the user. * @param msg String message of the warning * @param warningEnum type of warning */ public final void warn(String msg, Enum warningEnum) { Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum); counter.increment(1); } }
(3)prepareToWrite 在写入数据之前做一些初始化工作
package com.pig.support.lucene; import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; /** * 继承FileOutputFormat,重写支持Lucene格式的outputFormat策略 * */ public class LuceneOutputFormat extends FileOutputFormat<Writable, Document> { String location; FileSystem fs; String taskid; FileOutputCommitter committer; AtomicInteger counter = new AtomicInteger(); public LuceneOutputFormat(String location) { this.location = location; } @Override public RecordWriter<Writable, Document> getRecordWriter( TaskAttemptContext ctx) throws IOException, InterruptedException { Configuration conf = ctx.getConfiguration(); fs = FileSystem.get(conf); File baseDir = new File(System.getProperty("java.io.tmpdir")); String baseName = System.currentTimeMillis() + "-"; File tempDir = new File(baseDir, baseName + counter.getAndIncrement()); tempDir.mkdirs(); tempDir.deleteOnExit(); return new LuceneRecordWriter( (FileOutputCommitter) getOutputCommitter(ctx), tempDir); } /** * Write out the LuceneIndex to a local temporary location.<br/> * On commit/close the index is copied to the hdfs output directory.<br/> * */ static class LuceneRecordWriter extends RecordWriter<Writable, Document> { final IndexWriter writer; final FileOutputCommitter committer; final File tmpdir; public LuceneRecordWriter(FileOutputCommitter committer, File tmpdir) { try { this.committer = committer; this.tmpdir = tmpdir; IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_10_2, new StandardAnalyzer()); LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy(); mergePolicy.setMergeFactor(10); //mergePolicy.setUseCompoundFile(false); config.setMergePolicy(mergePolicy); config.setMergeScheduler(new SerialMergeScheduler()); writer = new IndexWriter(FSDirectory.open(tmpdir), config); } catch (IOException e) { RuntimeException exc = new RuntimeException(e.toString(), e); exc.setStackTrace(e.getStackTrace()); throw exc; } } @Override public void close(final TaskAttemptContext ctx) throws IOException, InterruptedException { //use a thread for status polling final Thread th = new Thread() { public void run() { ctx.progress(); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }; th.start(); try { writer.forceMerge(1); writer.close(); // move all files to part Configuration conf = ctx.getConfiguration(); Path work = committer.getWorkPath(); Path output = new Path(work, "index-" + ctx.getTaskAttemptID().getTaskID().getId()); FileSystem fs = FileSystem.get(conf); FileUtil.copy(tmpdir, fs, output, true, conf); } finally { th.interrupt(); } } @Override public void write(Writable key, Document doc) throws IOException, InterruptedException { writer.addDocument(doc); } } }

Apache Tez0.7编译笔记
2016-01-15 16:33 2563目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
2016-01-14 15:52 3872这两天,打算给现有的 ... -
Apache Pig中如何使用Replace函数
2015-11-17 18:48 1544今天分享一个小案例, ... -
Apache Pig的UDF返回值问题
2015-11-11 16:34 1530今天写了关于Pig的EvalFunc UDF函数,结果一执行 ... -
2015-06-29 19:45 18391,Tez是什么? Tez是Hortonworks公司开源 ... -
CDH-Hadoop2.6+ Apache Pig0.15安装记录
2015-06-26 20:06 27491,使用CDH的hadoop里面有对应的组件Pig,但版本较低 ... -
2015-05-01 17:14 1693(1) 下载文末上传的压缩包,上到对应的linux机器上,并 ... -
Hadoop2.2如何集成Apache Pig0.12.1?
2015-05-01 16:48 999散仙假设你的Hadoop环境已经安装完毕 (1)到ht ... -
Apache Pig和Solr问题笔记(一)
2015-04-02 13:35 2090记录下最近两天散仙在工作中遇到的有关Pig0.12.0和Sol ... -
2015-03-29 18:39 11521,如果是a::tags#'pic'作为参数,传递给另一个函 ... -
玩转大数据系列之Apache Pig高级技能之函数编程(六)
2015-03-18 21:57 2193原创不易,转载请务必注明,原创地址,谢谢配合! http:/ ... -
Apache Pig字符串截取实战小例子
2015-03-13 17:23 2378记录一个Pig字符串截取的实战小例子: 需求如下,从下面的字 ... -
玩转大数据系列之Apache Pig如何通过自定义UDF查询数据库(五)
2015-03-12 21:06 1949GMV(一定时间内的成交 ... -
玩转大数据系列之Apache Pig如何与MySQL集成(三)
2015-03-07 19:43 2650上篇介绍了如何把Pig的结果存储到Solr中,那么可能就会有朋 ... -
玩转大数据系列之Apache Pig如何与Apache Solr集成(二)
2015-03-06 21:52 1547散仙,在上篇文章中介绍了,如何使用Apache Pig与Luc ... -
玩转大数据系列之Apache Pig如何与Apache Lucene集成(一)
2015-03-05 21:54 2942在文章开始之前,我们 ... -
Apache Pig学习笔记之内置函数(三)
2015-03-03 19:53 49061 简介 Pig附带了一些 ... -
Apache Pig学习笔记(二)
2015-02-13 19:23 3152主要整理了一下,pig里 ... -
2015-02-11 22:01 1912备忘和扯一扯最近散仙 ... -
Apache Pig入门学习文档(一)
2015-01-20 20:28 32561,Pig的安装 (一)软件要求 (二)下载Pig ...
自 2021 年 03 月 18 日正式成为 Apache 顶级项目以来,DolphinScheduler 致力于在数据工作流编排中“解决复杂的大数据任务依赖及触发关系,让各种大数据任务类型开箱即用”。目前,已经有 400+ 公司在生产上使用 ...
合肥工业大学的这门“Python玩转大数据”课程,旨在让学生掌握如何利用Python进行大数据分析和处理,从而更好地理解和解决实际问题。 一、Python大数据处理基础 1. Numpy库:Numpy是Python中用于科学计算的核心库...
这个名为 "flume-demo_大数据_flume_DEMO_自定义拦截器_" 的项目,显然是一个示例,展示了如何在 Flume 中创建并使用自定义拦截器来过滤数据。下面我们将深入探讨 Flume 的基本概念、拦截器的作用以及如何自定义拦截...
大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例代码大数据+Apache Doris资料包+示例...
本讲座“大数据存储及分层实践-5-4 Apache Ozone 下一代大数据存储解密”深入探讨了Ozone的设计理念、架构及其在大数据存储中的应用。 Ozone 起源于Hadoop生态系统,它是一个分布式的对象存储系统,设计目标是提供...
如果不希望进行自定义,使用Notepad++的SQL语法高亮作为替代也是一个不错的选择,因为Pig的语法与SQL有很多相似之处。 ##### 2. Linux下的Emacs 在Linux系统中,Emacs是一个强大的文本编辑器,支持多种编程语言的...
在大数据领域,高效的数据存储和查询优化是关键。本话题将聚焦于Apache Doris的存储层的向量化改造设计与实现,这是提升大数据处理性能的重要手段。 在传统的数据库系统中,数据处理通常基于行存模式,即按照行的...
Pig之所以受到青睐,是因为它简化了数据处理流程,降低了编程的复杂性,并且相较于传统编程语言,可以减少大量的代码量。 在配置Pig语法高亮时需要注意的是,在Windows环境下可能没有直接的插件支持,但是可以通过...
在大数据环境里,如Hadoop的Hive、Pig或Spark SQL,都提供了SQL接口,使得分析人员能够以熟悉的SQL语法操作分布式存储的数据。 2. 数据清洗与预处理:在大数据项目中,原始数据往往需要经过一系列清洗步骤,SQL可以...
Apache Pig是一种高级数据流语言和执行框架,用于处理和分析大数据,其运行在Hadoop上。Pig提供了一种名为Pig Latin的数据处理语言,它是一种类SQL语言,可以让用户编写更简洁的代码来处理数据,相对于传统的...
【大数据系列-Hive】 Hive是Apache软件基金会下的一个数据仓库工具,主要设计用于处理和管理大规模的数据集,尤其在大规模分布式计算环境如Hadoop上。Hive提供了SQL-like的查询语言,称为HQL(Hive Query Language...
【标题】:“7-2工业大数据结合医疗领域APACHE体系的研究及应用.pdf”涉及的知识点 【摘要】:本文主要探讨了工业大数据在医疗领域的应用,以及如何利用APACHE(Acute Physiology and Chronic Health Evaluation)...
阿里大数据之路的总结主要涵盖了大数据开发的关键技术和架构,包括数据的采集、存储、处理和分析等方面。以下是对这些内容的详细解析: 1. 数据采集 数据采集是大数据流程的第一步,涉及对Web和APP产生的数据进行...