`

hadoop系列A:多文件输出

阅读更多

 

package org.myorg; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
/**摘{@link TextOutputFormat}碌ineRecordWriter隆拢 */ 
public class LineRecordWriter<K, V> extends RecordWriter<K, V> { 
private static final String utf8 = "UTF-8"; 
private static final byte[] newline; 
static { 
try { 
newline = "\n".getBytes(utf8); 
} catch (UnsupportedEncodingException uee) { 
throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 
} 
} 
protected DataOutputStream out; 
private final byte[] keyValueSeparator; 
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { 
this.out = out; 
try { 
this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 
} catch (UnsupportedEncodingException uee) { 
throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 
} 
} 
public LineRecordWriter(DataOutputStream out) { 
this(out, "\t"); 
} 
private void writeObject(Object o) throws IOException { 
if (o instanceof Text) { 
Text to = (Text) o; 
out.write(to.getBytes(), 0, to.getLength()); 
} else { 
out.write(o.toString().getBytes(utf8)); 
} 
} 
public synchronized void write(K key, V value) throws IOException { 
boolean nullKey = key == null || key instanceof NullWritable; 
boolean nullValue = value == null || value instanceof NullWritable; 
if (nullKey && nullValue) { 
return; 
} 
if (!nullKey) { 
writeObject(key); 
} 
if (!(nullKey || nullValue)) { 
out.write(keyValueSeparator); 
} 
if (!nullValue) { 
writeObject(value); 
} 
out.write(newline); 
} 
public synchronized void close(TaskAttemptContext context) throws IOException { 
out.close(); 
}

}
 
public static class myOutput extends MultipleOutputFormat<Text, Text> {
@Override
protected String generateFileNameForKeyValue(Text key, Text value, TaskAttemptContext taskID) {

String mykey = key.toString();
String myValue = value.toString();

String tasknum = taskID.getTaskAttemptID().getTaskID().toString();
String fileNum = tasknum.substring(tasknum.length()-3);

String newname = mykey.substring(0,3);
return fileNum+newname;
}
}
 

目的:

根据输出数据的某些特征,分类输出到不同的文件夹下以便管理,而不是放在同一个文件夹下。

实现:

1、重写MultipleOutputFormat的某些方法,参考org.apache.hadoop.mapred.lib.MultipleOutputFormat,需要在程序中实现的子类方法是:

protected String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job),即通过key和value及conf配置信息决定文件名

(含扩展名)。其中,需要改写的方法是最后一个方法:private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job,

String baseName),baseName 即为 在程序中重写的子类 generateFileNameForKeyValue 的返回值。

代码:

package org.myorg; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.util.HashMap; 
import java.util.Iterator; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.mapreduce.OutputCommitter; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.ReflectionUtils; 
public abstract class MultipleOutputFormat<K extends WritableComparable, V extends Writable> 
extends FileOutputFormat<K, V> { 
private MultiRecordWriter writer = null; 
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, 
InterruptedException { 
if (writer == null) { 
writer = new MultiRecordWriter(job, getTaskOutputPath(job)); 
} 
return writer; 
} 
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null; 
OutputCommitter committer = super.getOutputCommitter(conf); 
if (committer instanceof FileOutputCommitter) { 
workPath = ((FileOutputCommitter) committer).getWorkPath(); 
} else { 
Path outputPath = super.getOutputPath(conf); 
if (outputPath == null) { 
throw new IOException("Undefined job output-path"); 
} 
workPath = outputPath; 
} 
return workPath; 
} 

protected abstract String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job);//Configuration conf); 
public class MultiRecordWriter extends RecordWriter<K, V> { 

private HashMap<String, RecordWriter<K, V>> recordWriters = null; 
private TaskAttemptContext job = null; 

private Path workPath = null; 
public MultiRecordWriter(TaskAttemptContext job, Path workPath) { 
super(); 
this.job = job; 
this.workPath = workPath; 
recordWriters = new HashMap<String, RecordWriter<K, V>>(); 
} 
@Override 
public void close(TaskAttemptContext context) throws IOException, InterruptedException { 
Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); 
while (values.hasNext()) { 
values.next().close(context); 
} 
this.recordWriters.clear(); 
} 
@Override 
public void write(K key, V value) throws IOException, InterruptedException {

String baseName = generateFileNameForKeyValue(key, value, job);//job.getConfiguration()); 
RecordWriter<K, V> rw = this.recordWriters.get(baseName); 
if (rw == null) { 
rw = getBaseRecordWriter(job, baseName); 
this.recordWriters.put(baseName, rw); 
} 
//LongWritable keys=(LongWritable)key;
//long ret=keys.get()>>1;
//keys.set(ret);
rw.write(key, value);//change 
} 

private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) 
throws IOException, InterruptedException { 
Configuration conf = job.getConfiguration(); 
boolean isCompressed = getCompressOutput(job); 
String keyValueSeparator = "\t"; //change 
String pathname=baseName.substring(12); //change
RecordWriter<K, V> recordWriter = null; 
if (isCompressed) { 
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class); 
//String pathname=baseName.substring(12); 
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); 
Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11) + codec.getDefaultExtension()); //change 
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); 
recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec 
.createOutputStream(fileOut)), keyValueSeparator); 
} else { 
Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11)); //change
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); 
recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); 
} 
return recordWriter; 
}
} 
}
 

 

2、把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。

代码如下:

 

3、在主程序中加载generateFileNameForKeyValue方法:

 

在main函数中需添加 job.setOutputFormatClass(myOutput.class);

更多信息请查看 java进阶网 http://www.javady.com

3
0
分享到:
评论

相关推荐

    Hadoop学习网址

    - **日志处理示例**: 介绍了如何使用Hadoop进行日志文件分析,这通常涉及到对日志文件进行过滤、排序和汇总统计。 #### 四、Hadoop杂志——《Hadoop开发者》 - **网址**: [百度文库中的《Hadoop开发者》系列]...

    《Hadoop 数据分析平台》课程毕业测试题

    根据提供的《Hadoop 数据分析平台》课程毕业测试题的相关信息,我们可以提炼出一系列与Hadoop相关的知识点,这些知识点不仅能够帮助学生更好地理解Hadoop的工作原理和技术细节,还能够加深他们对大数据处理技术的...

    拓思爱诺大数据-第二天Hadoop高级编程

    - **Hadoop Shell**:作为操作Hadoop集群的工具,Hadoop Shell提供了一系列用于文件管理、数据传输等功能的强大命令,是日常维护和开发工作中不可或缺的一部分。 - **HDPData文件夹**:这是每个Hadoop集群节点上的...

    Hadoop实时数据处理框架Spark技术教程

    sum = numbers.reduce(lambda a, b: a + b) print("Sum is: ", sum) # 释放缓存 numbers.unpersist() ``` **4. 灵活的数据处理API** - **数据处理API的灵活性:**Spark提供了丰富的数据处理API,使得...

    Hadoop实时数据处理框架spark技术

    通过以上介绍,我们可以清晰地了解到Spark作为一种现代的大数据处理框架,是如何在多个方面改进了Hadoop,并且展示了其在内存计算、数据处理API以及实时数据处理等方面的优势。这对于理解和应用Spark技术处理大规模...

    4_尚硅谷大数据之Hadoop运行模式1

    同样,首先创建输入文件夹并编写输入文件,接着运行Hadoop的wordcount程序,指定输入和输出目录。运行完成后,查看输出结果,可以发现每个单词及其对应的计数值。 **2. 伪分布式运行模式** 伪分布式模式在单台机器...

    HadoopWithPython:通过Pythn玩Hadoop

    你需要下载Hadoop发行版,配置环境变量,包括HADOOP_HOME和PATH,并修改Hadoop配置文件如`core-site.xml`和`hdfs-site.xml`来指定本地文件系统为HDFS的默认存储。启动Hadoop服务,包括NameNode、DataNode和YARN等...

    Hadoop 下单词反向索引程序实验报告.pdf

    6. **结果展示**:实验输出显示了每个单词及其对应的文档列表,例如单词"a"出现在文件3.txt的第7个位置,单词"file6.txt"出现在第6个位置,单词"file2.txt"出现在第9个位置等。 通过这个实验,我们不仅了解了Hadoop...

    Tez 的设计者在 Hadoop 大会上的分享

    例如,任务可以被组织成一系列的阶段,如预处理、聚合、分区等,每个阶段可以由不同的任务(TaskA-1, TaskB-2等)组成。 2. **灵活的输入-处理器-输出运行时模型**:Tez允许动态构造物理运行时执行器,通过连接不同...

    资源的代码

    3. **Hadoop案例之单表关联输出祖孙关系.docx**:这个文件可能介绍了一个使用Hadoop处理多表关联,特别是如何找出数据之间的层次或祖孙关系的问题。在数据处理中,这样的操作常见于构建复杂的分析模型或者数据仓库。...

    《大数据技术原理和应用操作》试卷A卷及答案.pdf

    * Hadoop 集群执行完 MapReduce 程序后,会输出_SUCCESS 和 part-r-00000 结果文件(√) * 传统文件系统存储数据时,若文件太大,会导致上传和下载非常耗时(√) * 使用虚拟机软件,可以在同一台电脑上构建多个 ...

    Inverted-Index-Using-a-Hadoop-Cluster

    `Inverted-Index-Using-a-Hadoop-Cluster-master`这个压缩包文件很可能是项目源代码,包含了实现上述逻辑的所有Java类和其他资源。通过解压并编译运行这个项目,可以在实际的Hadoop集群上构建反向索引。 总之,利用...

    MapReduce and HDFS

    - 输出:`("A", "c")`, `("A", "a")`, `("A", "t")`, `("A", "s")` #### 示例:筛选映射器 ```plaintext let map(k, v) = if(isPrime(v)) then emit(k, v) ``` - 输入:`("foo", 7)` - 输出:`("foo", 7)` ####...

    大数据核心技术A卷(word文档良心出品).doc

    根据给定的大数据核心技术A卷的部分内容,我们可以总结并扩展其中涉及的重要知识点,具体包括Hadoop生态系统中的关键组件及其功能、MapReduce编程模型的基本概念、以及数据存储与处理中的相关技术。 ### Hadoop生态...

    wordcount运行分析

    当Hadoop接收到一个大文件时,它会根据默认的块大小(通常是128MB)将其分割成多个数据块,并将这些块复制到集群的不同节点上,以确保容错性和并行性。在这个阶段,文件内容并没有被解析或处理,只是物理上的切分。...

Global site tag (gtag.js) - Google Analytics