getOutputFormat() prepareToWrite(RecordWriter writer) putNext(Tuple tuple) setStoreLocation(String location, Job job)
setStoreLocation(String location, Job job)
设置输出路径,可以在里面设置输出压缩格式,这里传进来的location就是写pig时候STORE log0 INTO '/path/of/result'这里的path
FileOutputFormat.setOutputPath (job, new Path(location )); if ( comp == Compression .bz2 || comp == Compression .bz) { FileOutputFormat .setCompressOutput(job, true); FileOutputFormat .setOutputCompressorClass(job, BZip2Codec .class); } else if (comp == Compression.gz) { FileOutputFormat .setCompressOutput(job, true); FileOutputFormat .setOutputCompressorClass(job, GzipCodec.class); }
putNext(Tuple tuple)
prepareToWrite(RecordWriter writer)
在这块儿可以设置自定义的outPutFormat;自定义outPutFormat需要重写getRecordWriter(TaskAttemptContext context),这个方法返回的Writer会传入prepareToWrite()
在write()的时候,把默认的TextOutputFormat.LineRecordWriter<WritableComparable, Text>做一层包装,初始化为自己的输出流, 而close()就做一些关闭流的处理
DataOutputStream os = createOutputStream (key); writer = new MyLineRecordWriter(os, keyValueSeparator ); private DataOutputStream createOutputStream(String key) throws IOException { Configuration conf = ctx. getConfiguration(); TaskID taskId = ctx. getTaskAttemptID().getTaskID (); Path path = new Path( StringUtils.left (key, 8 ), key + '-' + NumberFormat.getInstance().format(taskId .getId())); Path workOutputPath = (( FileOutputCommitter) getOutputCommitter( ctx)).getWorkPath (); Path file = new Path( workOutputPath, path ); FileSystem fs = file. getFileSystem(conf ); FSDataOutputStream fileOut = fs. create(file , false ); return fileOut ; } public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) { super(out, keyValueSeparator ); }
public class BaiduMultiStorage extends StoreFunc { @SuppressWarnings ("unused") private Path outputPath ; private String fieldDel ; // delimiter of the output record. private RecordWriter <String, Tuple> writer ; public BaiduMultiStorage () { this.fieldDel = "\\t" ; } public BaiduMultiStorage (String parentPathStr) { this.outputPath = new Path(parentPathStr ); this.fieldDel = "\\t" ; } @SuppressWarnings ("rawtypes") @Override public OutputFormat getOutputFormat () throws IOException { MultiStorageOutputFormat format = new MultiStorageOutputFormat(); format .setKeyValueSeparator(fieldDel); return format; } @SuppressWarnings ({ "unchecked" , "rawtypes" }) @Override public void prepareToWrite (RecordWriter writer) { this.writer = writer; } @Override public void putNext (Tuple tuple) throws IOException { String key = null ; String value = null ; try { String line = tuple. get(0 ).toString(); key = BaiduLogFormat.getKey(line ); value = BaiduLogFormat.getValue(line ); tuple .set( 0, value ); writer .write( key, tuple ); } catch (LogFieldException e) { System .out. println(e ); } catch (InterruptedException e) { throw new IOException(e ); } } @Override public void setStoreLocation (String location, Job job) throws IOException { FileOutputFormat .setOutputPath(job, new Path( location)); } public static class MultiStorageOutputFormat extends TextOutputFormat< String, Tuple > { private String keyValueSeparator = "\\t"; private byte fieldDel = '\t'; public String getKeyValueSeparator() { return keyValueSeparator; } public void setKeyValueSeparator(String keyValueSeparator ) { this.keyValueSeparator = keyValueSeparator; fieldDel = StorageUtil.parseFieldDel(keyValueSeparator ); } @Override public RecordWriter<String, Tuple > getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { return new BaiduRecordWriter(context ); } public class BaiduRecordWriter extends RecordWriter <String, Tuple> { final TaskAttemptContext ctx; private Map< String, MyLineRecordWriter > hourStoreMap = new HashMap<String, MyLineRecordWriter>(); private Map< String, MyLineRecordWriter > channelStoreMap = new HashMap<String, MyLineRecordWriter >(); private static final int BUFFER_SIZE = 1024; private ByteArrayOutputStream mOut = new ByteArrayOutputStream( BUFFER_SIZE); public BaiduRecordWriter(TaskAttemptContext context) { ctx = context; } @Override public void close(TaskAttemptContext context ) throws IOException, InterruptedException { for ( MyLineRecordWriter out : hourStoreMap .values()) { out .close( context); } for ( MyLineRecordWriter out : channelStoreMap .values()) { out .close( context); } } @Override public void write(String key , Tuple val) throws IOException , InterruptedException { int sz = val.size (); for ( int i = 0 ; i < sz; i ++) { Object field ; try { field = val. get(i ); } catch (ExecException ee) { throw ee; } StorageUtil .putField(mOut, field ); if ( i != sz - 1) { mOut .write( fieldDel); } } writeToStore (key); mOut .reset(); } public void writeToStore(String key ) throws IOException { getStore (BaiduLogFormat.getHourKey(key ), hourStoreMap).write( null, new Text( mOut.toByteArray ())); getStore (key, channelStoreMap).write (null, new Text(mOut. toByteArray())); } private MyLineRecordWriter getStore(String key, Map <String, MyLineRecordWriter> storeMap) throws IOException { MyLineRecordWriter store = storeMap.get(key ); if ( store == null) { DataOutputStream os = createOutputStream(key); store = new MyLineRecordWriter(os , keyValueSeparator); storeMap .put( key, store ); } return store; } private DataOutputStream createOutputStream(String key) throws IOException { Configuration conf = ctx. getConfiguration(); TaskID taskId = ctx. getTaskAttemptID().getTaskID (); Path path = new Path(StringUtils .left( key, 8), key + '-' + NumberFormat.getInstance().format (taskId. getId())); Path workOutputPath = (( FileOutputCommitter) getOutputCommitter( ctx)).getWorkPath (); Path file = new Path(workOutputPath , path); FileSystem fs = file. getFileSystem(conf ); FSDataOutputStream fileOut = fs. create(file , false ); return fileOut; } @SuppressWarnings ("rawtypes") protected class MyLineRecordWriter extends TextOutputFormat. LineRecordWriter<WritableComparable , Text> { public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) { super(out , keyValueSeparator); } } } } }
