需要实现四个方法:
我写的pigStorage
getOutputFormat() prepareToWrite(RecordWriter writer) putNext(Tuple tuple) setStoreLocation(String location, Job job)
setStoreLocation(String location, Job job)
设置输出路径,可以在里面设置输出压缩格式,这里传进来的location就是写pig时候STORE log0 INTO '/path/of/result'这里的path
如:
FileOutputFormat.setOutputPath (job, new Path(location )); if ( comp == Compression .bz2 || comp == Compression .bz) { FileOutputFormat .setCompressOutput(job, true); FileOutputFormat .setOutputCompressorClass(job, BZip2Codec .class); } else if (comp == Compression.gz) { FileOutputFormat .setCompressOutput(job, true); FileOutputFormat .setOutputCompressorClass(job, GzipCodec.class); }
putNext(Tuple tuple)
相当于map阶段的处理,最后调用write()
prepareToWrite(RecordWriter writer)
设置上面write()的writer
getOutputFormat()
在这块儿可以设置自定义的outPutFormat;自定义outPutFormat需要重写getRecordWriter(TaskAttemptContext context),这个方法返回的Writer会传入prepareToWrite()
终于到重点了,怎么根据Key自定义输出文件名
写自己的BaiduRecordWriter继承RecordWriter
重写close()和write()
在write()的时候,把默认的TextOutputFormat.LineRecordWriter<WritableComparable, Text>做一层包装,初始化为自己的输出流, 而close()就做一些关闭流的处理
DataOutputStream os = createOutputStream (key); writer = new MyLineRecordWriter(os, keyValueSeparator ); private DataOutputStream createOutputStream(String key) throws IOException { Configuration conf = ctx. getConfiguration(); TaskID taskId = ctx. getTaskAttemptID().getTaskID (); Path path = new Path( StringUtils.left (key, 8 ), key + '-' + NumberFormat.getInstance().format(taskId .getId())); Path workOutputPath = (( FileOutputCommitter) getOutputCommitter( ctx)).getWorkPath (); Path file = new Path( workOutputPath, path ); FileSystem fs = file. getFileSystem(conf ); FSDataOutputStream fileOut = fs. create(file , false ); return fileOut ; } public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) { super(out, keyValueSeparator ); }
我写的pigStorage
public class BaiduMultiStorage extends StoreFunc { @SuppressWarnings ("unused") private Path outputPath ; private String fieldDel ; // delimiter of the output record. private RecordWriter <String, Tuple> writer ; public BaiduMultiStorage () { this.fieldDel = "\\t" ; } public BaiduMultiStorage (String parentPathStr) { this.outputPath = new Path(parentPathStr ); this.fieldDel = "\\t" ; } @SuppressWarnings ("rawtypes") @Override public OutputFormat getOutputFormat () throws IOException { MultiStorageOutputFormat format = new MultiStorageOutputFormat(); format .setKeyValueSeparator(fieldDel); return format; } @SuppressWarnings ({ "unchecked" , "rawtypes" }) @Override public void prepareToWrite (RecordWriter writer) { this.writer = writer; } @Override public void putNext (Tuple tuple) throws IOException { String key = null ; String value = null ; try { String line = tuple. get(0 ).toString(); key = BaiduLogFormat.getKey(line ); value = BaiduLogFormat.getValue(line ); tuple .set( 0, value ); writer .write( key, tuple ); } catch (LogFieldException e) { System .out. println(e ); } catch (InterruptedException e) { throw new IOException(e ); } } @Override public void setStoreLocation (String location, Job job) throws IOException { FileOutputFormat .setOutputPath(job, new Path( location)); } public static class MultiStorageOutputFormat extends TextOutputFormat< String, Tuple > { private String keyValueSeparator = "\\t"; private byte fieldDel = '\t'; public String getKeyValueSeparator() { return keyValueSeparator; } public void setKeyValueSeparator(String keyValueSeparator ) { this.keyValueSeparator = keyValueSeparator; fieldDel = StorageUtil.parseFieldDel(keyValueSeparator ); } @Override public RecordWriter<String, Tuple > getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { return new BaiduRecordWriter(context ); } public class BaiduRecordWriter extends RecordWriter <String, Tuple> { final TaskAttemptContext ctx; private Map< String, MyLineRecordWriter > hourStoreMap = new HashMap<String, MyLineRecordWriter>(); private Map< String, MyLineRecordWriter > channelStoreMap = new HashMap<String, MyLineRecordWriter >(); private static final int BUFFER_SIZE = 1024; private ByteArrayOutputStream mOut = new ByteArrayOutputStream( BUFFER_SIZE); public BaiduRecordWriter(TaskAttemptContext context) { ctx = context; } @Override public void close(TaskAttemptContext context ) throws IOException, InterruptedException { for ( MyLineRecordWriter out : hourStoreMap .values()) { out .close( context); } for ( MyLineRecordWriter out : channelStoreMap .values()) { out .close( context); } } @Override public void write(String key , Tuple val) throws IOException , InterruptedException { int sz = val.size (); for ( int i = 0 ; i < sz; i ++) { Object field ; try { field = val. get(i ); } catch (ExecException ee) { throw ee; } StorageUtil .putField(mOut, field ); if ( i != sz - 1) { mOut .write( fieldDel); } } writeToStore (key); mOut .reset(); } public void writeToStore(String key ) throws IOException { getStore (BaiduLogFormat.getHourKey(key ), hourStoreMap).write( null, new Text( mOut.toByteArray ())); getStore (key, channelStoreMap).write (null, new Text(mOut. toByteArray())); } private MyLineRecordWriter getStore(String key, Map <String, MyLineRecordWriter> storeMap) throws IOException { MyLineRecordWriter store = storeMap.get(key ); if ( store == null) { DataOutputStream os = createOutputStream(key); store = new MyLineRecordWriter(os , keyValueSeparator); storeMap .put( key, store ); } return store; } private DataOutputStream createOutputStream(String key) throws IOException { Configuration conf = ctx. getConfiguration(); TaskID taskId = ctx. getTaskAttemptID().getTaskID (); Path path = new Path(StringUtils .left( key, 8), key + '-' + NumberFormat.getInstance().format (taskId. getId())); Path workOutputPath = (( FileOutputCommitter) getOutputCommitter( ctx)).getWorkPath (); Path file = new Path(workOutputPath , path); FileSystem fs = file. getFileSystem(conf ); FSDataOutputStream fileOut = fs. create(file , false ); return fileOut; } @SuppressWarnings ("rawtypes") protected class MyLineRecordWriter extends TextOutputFormat. LineRecordWriter<WritableComparable , Text> { public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) { super(out , keyValueSeparator); } } } } }
相关推荐
4. 查询优化:Pig使用基于成本的优化策略,包括推导规则、重写规则和基于统计信息的优化。源码中,可以看到如何通过CostBasedOptimizer类实现这些策略。同时,Pig还支持自定义函数(UDF),源码包中的FuncSpec和...
源码中的示例可能包括不同格式的数据输入输出,如CSV、JSON或自定义格式,这有助于我们理解Pig的灵活性。 四、自定义函数(UDFs) Pig Latin虽然强大,但有时仍需自定义函数来实现特定的业务逻辑。Pig支持两种类型...
Pig支持用户自定义函数,以扩展其功能。源码中,UDF的注册、调用及执行流程清晰可见。这包括了对Java和Python UDF的支持,以及如何将UDF集成到Pig Latin语句中进行数据转换和处理。 3. **数据类型和Schema**: 在...
3. **UDF(用户定义函数)扩展**:0.7.0版本提供了更多的内置UDF,同时也支持用户自定义UDF,这极大地增强了Pig的功能性和灵活性。用户可以通过编写Java代码或使用其他语言(如Python或JavaScript)来扩展Pig的功能...
此外,教程还会介绍用户自定义函数(UDF)。UDF允许用户扩展Pig的功能,通过编写自己的函数来处理数据。Pig支持多种语言编写UDF,包括Java、Python、JavaScript等。 Pig的基础教程会以理论讲解配合实际案例的方式,...
通过这个库,你可以创建模拟数据,然后比较实际输出和期望输出,从而调试和优化你的Pig脚本。 3. **Ant 1.8**: Ant是Apache的一个开源构建工具,常用于Java项目。在本场景中,`ant1.8`用于编译这些jar包,这表明...
Pig Latin的主要特点包括数据转换和过滤的高级抽象,例如使用LOAD语句读取数据、使用STORE语句输出数据、使用DUMP语句将结果输出到标准输出等。 此外,Pig还支持用户自定义函数(UDF),这使得Pig可以扩展使用Java...
3. **脚本文件**:将多个Pig Latin语句写入脚本文件中,然后通过`bin/pig 脚本文件名`来运行整个脚本。 #### 五、Pig的数据类型 - **基本数据类型**:如int、long、float、double、chararray等。 - **复合数据类型...
- **UDFs(User Defined Functions)**: 用户可以根据需要编写自定义函数(UDFs),以扩展Pig的功能,处理特定的数据转换或分析任务。 - **Performance优化**: Pig支持多种性能优化策略,如使用`ORDER BY`和`GROUP ...
《Apache Pig 0.17.0 安装与配置指南》 Apache Pig 是一个用于大数据分析的平台,它提供了一种高级语言 Pig Latin 来处理大规模数据集。Pig-0.17.0 是该平台的一个稳定版本,包含了多项优化和改进,适用于Hadoop...
《Pig编程指南》不仅为初学者讲解ApachePig的基础知识,同时也向有一定使用经验的高级用户介绍更加综合全面的Pig重要特性,如PigLatin脚本语言、控制台shell交互命令以及用于对Pig进行拓展的用户自定义函数(UDF)等。...
4. 延迟执行:Pig 采用延迟评估策略,只有在实际需要输出时才执行操作,有利于优化执行计划。 5. 有效利用 Hadoop 功能:Pig 可以充分利用 Hadoop 的并行处理和容错能力,同时提供类似数据库的功能。 6. 非结构化...
【标题】"PIG微服务前后端源码"所涉及的知识点主要集中在微服务架构、前端开发和后端开发三个领域。PIG作为国内微服务热度最高的社区之一,其源码解析将帮助开发者深入理解微服务的设计理念和实现方式。 在微服务...
Pig的设计目标是简化大数据处理任务,通过其自定义的Pig Latin语言,用户可以编写简单的数据流转换,而无需深入理解MapReduce的工作原理。Pig Latin语言结构清晰,易于学习,能有效提高开发效率。 二、Hadoop环境...
Pig Latin本身提供的函数可能无法满足所有需求,这时就可以编写自定义的Java函数并集成到Pig作业中。UDF可以用于执行数据清洗、计算、字符串操作等任务,极大地丰富了Pig的数据处理能力。编写UDF时,需要继承Pig的...
此外,由于Pig支持用户自定义函数(UDF),用户还可以编写Java代码来扩展其功能,以满足特定的业务需求。 总的来说,"pig-0.12.1.tar.gz"是一个强大的工具,为Linux上的Hadoop用户提供了一种高效、灵活的数据处理...
2. **灵活性** :Pig 支持自定义函数(UDF),允许用户添加自己的业务逻辑,大大增强了其处理复杂数据类型的能力。 3. **性能优化** :Pig 在运行时会自动优化数据处理流程,避免不必要的计算,提高整体效率。 4. **...