MapReduce程序默认的输出文件个数:
首先,根据setNumReduceTasks(int num)这个方法,
其次,根据Map的输出文件个数。
一般情况下,同一个key的数据,可能会被分散到不同的输出文件中。倘若我们要对某一个特定的key的所有value值进行遍历,则需要将包含该key的所有文件作为输入文件。当数据比较庞大时,这样的操作会浪费资源。如果同一个Key的所有的value值都会被分配到同一个文件中,就会比较理想。
在Hadoop-core包中,有个类MultiplyOutputs可以实现以上功能(其实就是在reduce中加一两句话,其他不变)。代码如下:
package io; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class MultipleOut extends Configured implements Tool { static class Map extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); int index = line.indexOf(" "); if (index != -1) { context.write(new Text(line.substring(0, index)), new Text(value.toString().substring(index + 1))); } } } // 只需要在reduce中添加几句代码,其他部分不需要改动 static class Reduce extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { MultipleOutputs mo = new MultipleOutputs(context); for (Text val : values) { //key.toString():表示输出文件名以Key命名,注意是相对路径 mo.write(key, val, key.toString() + "/"); } //一定要close,否则无数据 mo.close(); } } @Override public int run(String[] strings) throws Exception { String path = "你的输入路径"; if (strings.length != 1) { System.out.println("input:" + path); System.out.print("arg:<out>"); return 1; } Configuration conf = getConf(); Job job = new Job(conf, "MultipleOut"); job.setJarByClass(MultipleOut.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(path)); FileOutputFormat.setOutputPath(job, new Path(strings[0])); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int rst = ToolRunner.run(conf, new MultipleOut(), args); System.exit(rst); } }
结果如图片所示,目录1,2,3,4,5是五个Key。
相关推荐
【标题】"最简单MR WordCount" 涉及到的是MapReduce编程模型中的一个经典示例,WordCount。在Hadoop生态系统中,WordCount是一个基础但非常重要的应用,用于统计文本文件中每个单词出现的次数。这个程序展示了...
文件分发首先发生在这一阶段,HDFS(Hadoop Distributed File System)将数据块分发到各个节点。 2. **数据本地性**: - Hadoop设计的核心理念之一是数据本地性,即尽量让计算任务在数据所在节点执行,减少网络...
- **提交作业**: 通过 JobClient 提交作业,JobTracker 会进行一系列操作,包括检查输入输出路径的有效性、将 job 的 jar 文件复制到 HDFS 等。 ##### 3.3 作业初始化 - **作业调度**: JobTracker 将作业加入队列,...
- 输出:将每行文本数据拆分成单词,每个单词与数字1组成键值对<单词,1>。这里,单词作为key,1作为value,两者都是Hadoop序列化框架中的类型,如Text对应String,IntWritable对应Integer。 - 注意:map阶段的输入...
- **Map 程序调用用户 map() 方法**:每当 Map 程序读取一行数据时,就会调用用户的 `map()` 方法,并将这一行数据的起始偏移量作为 key,数据内容作为 value 传入。 - **Reduce 程序调用用户 reduce() 方法**:...
Split切片是MapReduce处理数据的第一步,它将大文件分割成多个小的逻辑单元,以便于并行处理。源代码分析从提交任务开始,核心方法为`submit()`,进一步深入`submitJobInternal()`方法,其中主要检查输出规格、配置...
1. **作业提交(Job Submission)**:当用户通过Hadoop的API提交一个MapReduce作业时,JobTracker接收到请求,将作业的JAR包和配置信息分发到集群中的TaskTracker节点。 2. **作业初始化(Job Initialization)**:...
根据短文件名偏移 0x0c 处:位 3=1 表示文件名小写,位 4=1 表示文件扩展名小写。 4.可以正确识别 mkisofs 2.00/2.01 生成有 bug 的 Joliet 格式光盘。 2013-10-18 1.新增功能类似CMD的PATHEXT,可以设置默认的...
9. MR机制:MapReduce中的MapTask和ReduceTask分别负责数据的映射和归约操作,shuffle阶段负责排序、合并和传输map输出到reduce。 10. Yarn架构与工作原理:Yarn是一个资源管理平台,负责资源管理和任务调度,包括...