这两天在网上看了个MapReduce的多文件输出的帖子: http://blog.csdn.net/inkfish。写的不错。
我试着完成了一下。也是分为三个文件:我这三个文件,跟原作者的稍有不同。其中有些类是我原来写的,我直接拷贝过来的,所以有点不同。
My_LineRead.java
public class My_LineRead<K, V> extends RecordWriter<K, V>{ private static final String utf8 = "UTF-8"; private static final String colon = "----"; //划分符号 private static final byte[] newline; static { try { newline = "/n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public My_LineRead(DataOutputStream out) { this(out, colon); //调用下面的构造函数 } public My_LineRead(DataOutputStream out, String keyValueSeparator) { // TODO Auto-generated constructor stub this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } @Override public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { // TODO Auto-generated method stub out.close(); } @Override public void write(K key, V value) throws IOException, InterruptedException { if (!(key == null && key instanceof NullWritable)){ //如果key不为空者输出key if ((Object)key instanceof Text){ Text to = (Text) key; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(key.toString().getBytes(utf8)); } out.write(keyValueSeparator); } if (!(value == null && value instanceof NullWritable)){ //如果value不为空则输出value if ((Object)value instanceof Text){ Text to = (Text) value; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(value.toString().getBytes(utf8)); } out.write(newline); } } }
MyMultipleOutputFormat.java //这个类,我添加了些注释便于理解
public abstract class MyMultipleOutputFormat <K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { //接口类,需要在主程序中实现generateFileNameForKeyValue来获取文件名 private MultiRecordWriter writer = null; @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // TODO Auto-generated method stub //如果第一次调用那么writer=null if (writer == null) { //getTaskOutputPath获取output路径 writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } /**通过key, value, conf来确定输出文件名(含扩展名)*/ //返回值就是文件名。可以根据key,value来判断 protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); //MultiRecordWriter类 public class MultiRecordWriter extends RecordWriter<K, V> { /**RecordWriter的缓存*/ private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; /**输出目录*/ private Path workPath = null; //构造函数 public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } //关闭,应该可能是多个文件进行关闭,所有采用循环 //recordWriters.values() 就是指的getBaseRecordWriter返回的值。 @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { //得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); //如果recordWriters里没有文件名,那么就建立。否则就直接写值。 RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); //放入HashMap this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { //获取配置文件 Configuration conf = job.getConfiguration(); //查看是否使用解码器 boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new My_LineRead<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } //如果不使用解码器 else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); //recordWriter = new My_LineRead<K, V>(fileOut, keyValueSeparator); //这里我使用的我自己的OutputFormat recordWriter = new My_LineRead<K, V>(fileOut); } return recordWriter; } } }
最后就是测试类,WordCount_MulFileOut.java
public class WordCount_MulFileOut { public static class wordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{ String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while(itr.hasMoreElements()){ word.set(itr.nextToken()); context.write(word, one); } } } public static class wordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(Text key, Iterable<IntWritable>values, Context context)throws IOException, InterruptedException{ int sum = 0; for (IntWritable str : values){ sum += str.get(); } context.write(key, new IntWritable(sum)); } } public static class MyMultiple extends MyMultipleOutputFormat{ @Override protected String generateFileNameForKeyValue(WritableComparable key, Writable value, Configuration conf) { // TODO Auto-generated method stub return "other.txt"; } } public static void main(String args[])throws Exception{ Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount_MulFileOut.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(MyMultiple.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(wordcountMapper.class); job.setReducerClass(wordcountReduce.class); job.setCombinerClass(wordcountReduce.class); FileInputFormat.setInputPaths(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.waitForCompletion(true); } }
您还没有登录,请您登录后再发表评论
WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的次数。在这个案例中,我们将深入探讨如何在 Hadoop 环境中使用 MapReduce 实现 WordCount。 【描述】在 Hadoop 环境中,WordCount 的...
本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,这是一种在信息检索和文本挖掘中用于评估一个词在文档中的重要性的统计方法。 首先,我们要理解TF-IDF...
【大数据与云计算培训学习资料 Hadoop的MapReduce中多文件输出】 MapReduce是Hadoop框架的核心部分,主要用于处理大规模数据的分布式计算。在默认情况下,MapReduce任务的输出是一个单一的文件,由TextOutputFormat...
在这个"大数据Hadoop MapReduce词频统计"的场景中,我们利用MapReduce来统计文本中的词汇出现频率。这是数据分析中一个常见的任务,有助于理解文本内容的概貌。Map阶段的任务是对输入的文本进行分词,并形成<单词, 1...
1. **Hadoop环境搭建**:讲解如何安装配置Hadoop分布式文件系统(HDFS)和MapReduce框架,包括集群部署和单机模拟。 2. **MapReduce编程模型**:介绍Map和Reduce函数的编写,以及Combiner和Partitioner的使用,它们...
Hadoop中的多文件输出 Hadoop 的 MapReduce 框架中,默认的输出格式是 TextOutputFormat,这种格式的输出文件名不可定制。在 Hadoop 0.19.X 版本中,提供了一个 org.apache.hadoop.mapred.lib.MultipleOutputFormat...
在大数据处理领域,Hadoop MapReduce是一个至关重要的组件,它为海量数据的并行处理提供了分布式计算框架。本文将深入探讨如何使用...通过深入学习和实践,开发者可以利用Hadoop MapReduce解决大数据处理中的各种问题。
Hadoop是Apache软件基金会开发的一个开源项目,它提供了一个分布式文件系统(HDFS)和MapReduce编程模型。MapReduce将大型任务分解为许多小任务,分发到集群中的多台机器上并行执行,然后将结果合并。这种设计非常...
在这个特定的案例中,`Uniq.java`可能包含了逻辑来识别重复的键,并确保在输出中只保留一个副本。 `datadel1.txt`和`datadel2.txt`是可能包含重复数据的输入文件。在MapReduce任务中,这些文件会被Hadoop分片...
在压缩包文件“CH 10.1 - KMeans”中,可能包含了关于如何在 Hadoop MapReduce 上实现 KMeans 的具体代码示例、步骤指南或者理论讲解。这些资源可以帮助你更深入地理解如何将这两个技术结合,以便在大规模数据集上...
5. **输出**:Reduce任务的输出经过排序后被写入到输出文件中。 #### 三、WordCount示例解析 1. **实验目标**:计算一个文本文件中每个单词出现的次数。 2. **实验步骤详解**: - **步骤1:编辑WordCount.java...
从给定的文件信息来看,文档标题为"Hadoop MapReduce教程.pdf",描述与标题相同,标签为"Hadoop Map Reduce",部分内容虽然无法完全解析,但可以推断出与Hadoop MapReduce的基本概念、操作流程以及相关的编程模型...
这个例子项目是关于在单机环境中运行WordCount程序的,这是一个经典的MapReduce示例,用于统计文本文件中每个单词出现的次数。 首先,让我们深入了解Hadoop。Hadoop是由Apache软件基金会开发的开源框架,专门设计...
在Map阶段,输入数据集被分割成多个块,每个块分配给集群中的一个节点进行处理。开发者定义的`map()`函数在这个阶段运行,它接收键值对作为输入,并产生中间键值对。Map任务通常在数据实际存储的节点上执行,以减少...
附件是一个简单的 Hadoop MapReduce 程序示例,用于统计文本文件中单词出现的次数。、 要运行这个程序,你需要将上述代码保存为 .java 文件,然后编译并打包成一个 JAR 文件。之后,你可以使用 Hadoop 的命令行工具...
通常情况下,MapReduce作业的输入和输出都会存储在文件系统中。框架负责任务的调度、监控以及重执行失败的任务。 计算节点和存储节点往往是同一个,即MapReduce框架和Hadoop分布式文件系统(HDFS)运行在同一组节点...
### Hadoop MapReduce 教程知识点详解 #### 一、Hadoop MapReduce 概述 Hadoop MapReduce 是一个强大的分布式计算框架,主要用于处理大规模数据集。它通过将任务分解成多个子任务来实现并行处理,从而极大地提高了...
实验中还涉及到了Hadoop自带的`wordcount`程序,这是一个经典的MapReduce示例,用于统计文本文件中各个单词出现的次数。实验者创建了输入文件,上传到HDFS(Hadoop Distributed File System),然后运行`wordcount`...
相关推荐
WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的次数。在这个案例中,我们将深入探讨如何在 Hadoop 环境中使用 MapReduce 实现 WordCount。 【描述】在 Hadoop 环境中,WordCount 的...
本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,这是一种在信息检索和文本挖掘中用于评估一个词在文档中的重要性的统计方法。 首先,我们要理解TF-IDF...
【大数据与云计算培训学习资料 Hadoop的MapReduce中多文件输出】 MapReduce是Hadoop框架的核心部分,主要用于处理大规模数据的分布式计算。在默认情况下,MapReduce任务的输出是一个单一的文件,由TextOutputFormat...
在这个"大数据Hadoop MapReduce词频统计"的场景中,我们利用MapReduce来统计文本中的词汇出现频率。这是数据分析中一个常见的任务,有助于理解文本内容的概貌。Map阶段的任务是对输入的文本进行分词,并形成<单词, 1...
1. **Hadoop环境搭建**:讲解如何安装配置Hadoop分布式文件系统(HDFS)和MapReduce框架,包括集群部署和单机模拟。 2. **MapReduce编程模型**:介绍Map和Reduce函数的编写,以及Combiner和Partitioner的使用,它们...
Hadoop中的多文件输出 Hadoop 的 MapReduce 框架中,默认的输出格式是 TextOutputFormat,这种格式的输出文件名不可定制。在 Hadoop 0.19.X 版本中,提供了一个 org.apache.hadoop.mapred.lib.MultipleOutputFormat...
在大数据处理领域,Hadoop MapReduce是一个至关重要的组件,它为海量数据的并行处理提供了分布式计算框架。本文将深入探讨如何使用...通过深入学习和实践,开发者可以利用Hadoop MapReduce解决大数据处理中的各种问题。
Hadoop是Apache软件基金会开发的一个开源项目,它提供了一个分布式文件系统(HDFS)和MapReduce编程模型。MapReduce将大型任务分解为许多小任务,分发到集群中的多台机器上并行执行,然后将结果合并。这种设计非常...
在这个特定的案例中,`Uniq.java`可能包含了逻辑来识别重复的键,并确保在输出中只保留一个副本。 `datadel1.txt`和`datadel2.txt`是可能包含重复数据的输入文件。在MapReduce任务中,这些文件会被Hadoop分片...
在压缩包文件“CH 10.1 - KMeans”中,可能包含了关于如何在 Hadoop MapReduce 上实现 KMeans 的具体代码示例、步骤指南或者理论讲解。这些资源可以帮助你更深入地理解如何将这两个技术结合,以便在大规模数据集上...
5. **输出**:Reduce任务的输出经过排序后被写入到输出文件中。 #### 三、WordCount示例解析 1. **实验目标**:计算一个文本文件中每个单词出现的次数。 2. **实验步骤详解**: - **步骤1:编辑WordCount.java...
从给定的文件信息来看,文档标题为"Hadoop MapReduce教程.pdf",描述与标题相同,标签为"Hadoop Map Reduce",部分内容虽然无法完全解析,但可以推断出与Hadoop MapReduce的基本概念、操作流程以及相关的编程模型...
这个例子项目是关于在单机环境中运行WordCount程序的,这是一个经典的MapReduce示例,用于统计文本文件中每个单词出现的次数。 首先,让我们深入了解Hadoop。Hadoop是由Apache软件基金会开发的开源框架,专门设计...
在Map阶段,输入数据集被分割成多个块,每个块分配给集群中的一个节点进行处理。开发者定义的`map()`函数在这个阶段运行,它接收键值对作为输入,并产生中间键值对。Map任务通常在数据实际存储的节点上执行,以减少...
附件是一个简单的 Hadoop MapReduce 程序示例,用于统计文本文件中单词出现的次数。、 要运行这个程序,你需要将上述代码保存为 .java 文件,然后编译并打包成一个 JAR 文件。之后,你可以使用 Hadoop 的命令行工具...
通常情况下,MapReduce作业的输入和输出都会存储在文件系统中。框架负责任务的调度、监控以及重执行失败的任务。 计算节点和存储节点往往是同一个,即MapReduce框架和Hadoop分布式文件系统(HDFS)运行在同一组节点...
### Hadoop MapReduce 教程知识点详解 #### 一、Hadoop MapReduce 概述 Hadoop MapReduce 是一个强大的分布式计算框架,主要用于处理大规模数据集。它通过将任务分解成多个子任务来实现并行处理,从而极大地提高了...
实验中还涉及到了Hadoop自带的`wordcount`程序,这是一个经典的MapReduce示例,用于统计文本文件中各个单词出现的次数。实验者创建了输入文件,上传到HDFS(Hadoop Distributed File System),然后运行`wordcount`...