`

pig自定义输出文件名,重写StoreFunc

    博客分类:
  • pig
阅读更多

 

需要实现四个方法:
getOutputFormat()
prepareToWrite(RecordWriter writer)
putNext(Tuple tuple)
setStoreLocation(String location, Job job)
 
 
setStoreLocation(String location, Job job)
设置输出路径,可以在里面设置输出压缩格式,这里传进来的location就是写pig时候STORE log0 INTO '/path/of/result'这里的path
如:
FileOutputFormat.setOutputPath (job, new Path(location ));
if ( comp == Compression .bz2 || comp == Compression .bz) {
    FileOutputFormat .setCompressOutput(job, true);
    FileOutputFormat .setOutputCompressorClass(job, BZip2Codec .class);
} else if (comp == Compression.gz) {
    FileOutputFormat .setCompressOutput(job, true);
            FileOutputFormat .setOutputCompressorClass(job, GzipCodec.class);
}
 
putNext(Tuple tuple)
相当于map阶段的处理,最后调用write()
 
prepareToWrite(RecordWriter writer)
设置上面write()的writer
 
getOutputFormat()
在这块儿可以设置自定义的outPutFormat;自定义outPutFormat需要重写getRecordWriter(TaskAttemptContext context),这个方法返回的Writer会传入prepareToWrite()
 
终于到重点了,怎么根据Key自定义输出文件名
写自己的BaiduRecordWriter继承RecordWriter
重写close()和write()
在write()的时候,把默认的TextOutputFormat.LineRecordWriter<WritableComparable, Text>做一层包装,初始化为自己的输出流, 而close()就做一些关闭流的处理
DataOutputStream os = createOutputStream (key);
writer = new MyLineRecordWriter(os, keyValueSeparator );


private DataOutputStream createOutputStream(String key) throws IOException {
    Configuration conf = ctx. getConfiguration();
    TaskID taskId = ctx. getTaskAttemptID().getTaskID ();
    Path path = new Path( StringUtils.left (key, 8 ), key + '-' + NumberFormat.getInstance().format(taskId .getId()));
    Path workOutputPath = (( FileOutputCommitter) getOutputCommitter( ctx)).getWorkPath ();
    Path file = new Path( workOutputPath, path );
    FileSystem fs = file. getFileSystem(conf );
    FSDataOutputStream fileOut = fs. create(file , false );
    return fileOut ;
}


public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) {
    super(out, keyValueSeparator );
}


 
 

我写的pigStorage
public class BaiduMultiStorage extends StoreFunc {
    @SuppressWarnings ("unused")
    private Path outputPath ;
    private String fieldDel ; // delimiter of the output record.
    private RecordWriter <String, Tuple> writer ;


    public BaiduMultiStorage () {
         this.fieldDel = "\\t" ;
    }


    public BaiduMultiStorage (String parentPathStr) {
         this.outputPath = new Path(parentPathStr );
         this.fieldDel = "\\t" ;
    }


    @SuppressWarnings ("rawtypes")
    @Override
    public OutputFormat getOutputFormat () throws IOException {
        MultiStorageOutputFormat format = new MultiStorageOutputFormat();
        format .setKeyValueSeparator(fieldDel);
         return format;
    }


    @SuppressWarnings ({ "unchecked" , "rawtypes" })
    @Override
    public void prepareToWrite (RecordWriter writer) {
         this.writer = writer;


    }


    @Override
    public void putNext (Tuple tuple) throws IOException {
        String key = null ;
        String value = null ;
         try {
            String line = tuple. get(0 ).toString();
            key = BaiduLogFormat.getKey(line );
            value = BaiduLogFormat.getValue(line );
            tuple .set( 0, value );
            writer .write( key, tuple );
         } catch (LogFieldException e) {
            System .out. println(e );
         } catch (InterruptedException e) {
             throw new IOException(e );
         }


    }


    @Override
    public void setStoreLocation (String location, Job job) throws IOException {
        FileOutputFormat .setOutputPath(job, new Path( location));


    }


    public static class MultiStorageOutputFormat extends TextOutputFormat< String, Tuple > {
         private String keyValueSeparator = "\\t";
         private byte fieldDel = '\t';


         public String getKeyValueSeparator() {
             return keyValueSeparator;
         }


         public void setKeyValueSeparator(String keyValueSeparator ) {
             this.keyValueSeparator = keyValueSeparator;
            fieldDel = StorageUtil.parseFieldDel(keyValueSeparator );
         }


        @Override
         public RecordWriter<String, Tuple > getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException {
             return new BaiduRecordWriter(context );
         }


         public class BaiduRecordWriter extends RecordWriter <String, Tuple> {


             final TaskAttemptContext ctx;


             private Map< String, MyLineRecordWriter > hourStoreMap = new HashMap<String, MyLineRecordWriter>();
             private Map< String, MyLineRecordWriter > channelStoreMap = new HashMap<String, MyLineRecordWriter >();


             private static final int BUFFER_SIZE = 1024;


             private ByteArrayOutputStream mOut = new ByteArrayOutputStream( BUFFER_SIZE);


             public BaiduRecordWriter(TaskAttemptContext context) {
                ctx = context;
             }


            @Override
             public void close(TaskAttemptContext context ) throws IOException, InterruptedException {
                 for ( MyLineRecordWriter out : hourStoreMap .values()) {
                    out .close( context);
                 }
                 for ( MyLineRecordWriter out : channelStoreMap .values()) {
                    out .close( context);
                 }
             }


            @Override
             public void write(String key , Tuple val) throws IOException , InterruptedException {
                 int sz = val.size ();
                 for ( int i = 0 ; i < sz; i ++) {
                    Object field ;
                     try {
                        field = val. get(i );
                     } catch (ExecException ee) {
                         throw ee;
                     }
                    StorageUtil .putField(mOut, field );
                     if ( i != sz - 1) {
                        mOut .write( fieldDel);
                     }
                 }
                writeToStore (key);
                mOut .reset();


             }


             public void writeToStore(String key ) throws IOException {
                getStore (BaiduLogFormat.getHourKey(key ), hourStoreMap).write( null, new Text( mOut.toByteArray ()));
                getStore (key, channelStoreMap).write (null, new Text(mOut. toByteArray()));
             }


             private MyLineRecordWriter getStore(String key, Map <String, MyLineRecordWriter> storeMap) throws IOException {
                MyLineRecordWriter store = storeMap.get(key );
                 if ( store == null) {
                    DataOutputStream os = createOutputStream(key);
                    store = new MyLineRecordWriter(os , keyValueSeparator);
                    storeMap .put( key, store );
                 }
                 return store;
             }


             private DataOutputStream createOutputStream(String key) throws IOException {
                Configuration conf = ctx. getConfiguration();
                TaskID taskId = ctx. getTaskAttemptID().getTaskID ();
                Path path = new Path(StringUtils .left( key, 8), key + '-' + NumberFormat.getInstance().format (taskId. getId()));
                Path workOutputPath = (( FileOutputCommitter) getOutputCommitter( ctx)).getWorkPath ();
                Path file = new Path(workOutputPath , path);
                FileSystem fs = file. getFileSystem(conf );
                FSDataOutputStream fileOut = fs. create(file , false );
                 return fileOut;
             }


            @SuppressWarnings ("rawtypes")
             protected class MyLineRecordWriter extends TextOutputFormat. LineRecordWriter<WritableComparable , Text> {


                 public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) {
                     super(out , keyValueSeparator);
                 }
             }


         }


    }


}
 
分享到:
评论

相关推荐

    pig的源码包

    4. 查询优化:Pig使用基于成本的优化策略,包括推导规则、重写规则和基于统计信息的优化。源码中,可以看到如何通过CostBasedOptimizer类实现这些策略。同时,Pig还支持自定义函数(UDF),源码包中的FuncSpec和...

    pig编程指南源码

    源码中的示例可能包括不同格式的数据输入输出,如CSV、JSON或自定义格式,这有助于我们理解Pig的灵活性。 四、自定义函数(UDFs) Pig Latin虽然强大,但有时仍需自定义函数来实现特定的业务逻辑。Pig支持两种类型...

    pig源码0.15版

    Pig支持用户自定义函数,以扩展其功能。源码中,UDF的注册、调用及执行流程清晰可见。这包括了对Java和Python UDF的支持,以及如何将UDF集成到Pig Latin语句中进行数据转换和处理。 3. **数据类型和Schema**: 在...

    pig-0.7.0.tar.gz

    3. **UDF(用户定义函数)扩展**:0.7.0版本提供了更多的内置UDF,同时也支持用户自定义UDF,这极大地增强了Pig的功能性和灵活性。用户可以通过编写Java代码或使用其他语言(如Python或JavaScript)来扩展Pig的功能...

    pig java 编程jar包

    通过这个库,你可以创建模拟数据,然后比较实际输出和期望输出,从而调试和优化你的Pig脚本。 3. **Ant 1.8**: Ant是Apache的一个开源构建工具,常用于Java项目。在本场景中,`ant1.8`用于编译这些jar包,这表明...

    Programming Pig(pig编程).pdf

    Pig Latin的主要特点包括数据转换和过滤的高级抽象,例如使用LOAD语句读取数据、使用STORE语句输出数据、使用DUMP语句将结果输出到标准输出等。 此外,Pig还支持用户自定义函数(UDF),这使得Pig可以扩展使用Java...

    大数据之pig 命令

    3. **脚本文件**:将多个Pig Latin语句写入脚本文件中,然后通过`bin/pig 脚本文件名`来运行整个脚本。 #### 五、Pig的数据类型 - **基本数据类型**:如int、long、float、double、chararray等。 - **复合数据类型...

    pig-0.16.0.tar安装包

    - **UDFs(User Defined Functions)**: 用户可以根据需要编写自定义函数(UDFs),以扩展Pig的功能,处理特定的数据转换或分析任务。 - **Performance优化**: Pig支持多种性能优化策略,如使用`ORDER BY`和`GROUP ...

    Pig编程指南

    《Pig编程指南》不仅为初学者讲解ApachePig的基础知识,同时也向有一定使用经验的高级用户介绍更加综合全面的Pig重要特性,如PigLatin脚本语言、控制台shell交互命令以及用于对Pig进行拓展的用户自定义函数(UDF)等。...

    pig官方基础教程

    此外,教程还会介绍用户自定义函数(UDF)。UDF允许用户扩展Pig的功能,通过编写自己的函数来处理数据。Pig支持多种语言编写UDF,包括Java、Python、JavaScript等。 Pig的基础教程会以理论讲解配合实际案例的方式,...

    Apache Hadoop---Pig.docx

    4. 延迟执行:Pig 采用延迟评估策略,只有在实际需要输出时才执行操作,有利于优化执行计划。 5. 有效利用 Hadoop 功能:Pig 可以充分利用 Hadoop 的并行处理和容错能力,同时提供类似数据库的功能。 6. 非结构化...

    PIG微服务前后端源码

    【标题】"PIG微服务前后端源码"所涉及的知识点主要集中在微服务架构、前端开发和后端开发三个领域。PIG作为国内微服务热度最高的社区之一,其源码解析将帮助开发者深入理解微服务的设计理念和实现方式。 在微服务...

    pig-0.9.1.tar

    Pig的设计目标是简化大数据处理任务,通过其自定义的Pig Latin语言,用户可以编写简单的数据流转换,而无需深入理解MapReduce的工作原理。Pig Latin语言结构清晰,易于学习,能有效提高开发效率。 二、Hadoop环境...

    pig编程指南中的样例脚本、UDF、数据集

    Pig Latin本身提供的函数可能无法满足所有需求,这时就可以编写自定义的Java函数并集成到Pig作业中。UDF可以用于执行数据清洗、计算、字符串操作等任务,极大地丰富了Pig的数据处理能力。编写UDF时,需要继承Pig的...

    pig-0.12.1.tar.gz

    此外,由于Pig支持用户自定义函数(UDF),用户还可以编写Java代码来扩展其功能,以满足特定的业务需求。 总的来说,"pig-0.12.1.tar.gz"是一个强大的工具,为Linux上的Hadoop用户提供了一种高效、灵活的数据处理...

    pig学习笔记

    2. **灵活性** :Pig 支持自定义函数(UDF),允许用户添加自己的业务逻辑,大大增强了其处理复杂数据类型的能力。 3. **性能优化** :Pig 在运行时会自动优化数据处理流程,避免不必要的计算,提高整体效率。 4. **...

    pig-0.9.2.tar.gz

    - **UDF(User Defined Functions)**:用户可以编写自定义函数(Java或Python),以扩展Pig的功能,处理特定的数据转换和计算。 - **Hadoop Integration**:Pig与Hadoop紧密集成,数据存储在HDFS上,Pig作业通过...

Global site tag (gtag.js) - Google Analytics