`
ghost_face
  • 浏览: 54602 次
社区版块
存档分类
最新评论

MR实现将同一个key的内容分配到同一个输出文件

 
阅读更多

MapReduce程序默认的输出文件个数:

首先,根据setNumReduceTasks(int num)这个方法,

其次,根据Map的输出文件个数。

一般情况下,同一个key的数据,可能会被分散到不同的输出文件中。倘若我们要对某一个特定的key的所有value值进行遍历,则需要将包含该key的所有文件作为输入文件。当数据比较庞大时,这样的操作会浪费资源。如果同一个Key的所有的value值都会被分配到同一个文件中,就会比较理想。

在Hadoop-core包中,有个类MultiplyOutputs可以实现以上功能(其实就是在reduce中加一两句话,其他不变)。代码如下:

 

package io;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MultipleOut extends Configured implements Tool {

    static class Map extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            int index = line.indexOf(" ");
            if (index != -1) {
                context.write(new Text(line.substring(0, index)),
                        new Text(value.toString().substring(index + 1)));
            }
        }
    }


// 只需要在reduce中添加几句代码,其他部分不需要改动
    static class Reduce extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            MultipleOutputs mo = new MultipleOutputs(context);
            for (Text val : values) {
             //key.toString():表示输出文件名以Key命名,注意是相对路径
                mo.write(key, val, key.toString() + "/");
            }
            //一定要close,否则无数据
            mo.close();
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        String path = "你的输入路径";
        if (strings.length != 1) {
            System.out.println("input:" + path);
            System.out.print("arg:<out>");
            return 1;
        }
        Configuration conf = getConf();
        Job job = new Job(conf, "MultipleOut");
        job.setJarByClass(MultipleOut.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(strings[0]));

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int rst = ToolRunner.run(conf, new MultipleOut(), args);
        System.exit(rst);
    }
}

 结果如图片所示,目录1,2,3,4,5是五个Key。

 



 

  • 大小: 12.4 KB
0
2
分享到:
评论

相关推荐

    最简单MR WordCount

    【标题】"最简单MR WordCount" 涉及到的是MapReduce编程模型中的一个经典示例,WordCount。在Hadoop生态系统中,WordCount是一个基础但非常重要的应用,用于统计文本文件中每个单词出现的次数。这个程序展示了...

    大数据MapReduce文件分发

    文件分发首先发生在这一阶段,HDFS(Hadoop Distributed File System)将数据块分发到各个节点。 2. **数据本地性**: - Hadoop设计的核心理念之一是数据本地性,即尽量让计算任务在数据所在节点执行,减少网络...

    Hadoop MapReduce 入门

    - **提交作业**: 通过 JobClient 提交作业,JobTracker 会进行一系列操作,包括检查输入输出路径的有效性、将 job 的 jar 文件复制到 HDFS 等。 ##### 3.3 作业初始化 - **作业调度**: JobTracker 将作业加入队列,...

    hdp-day03-05笔记

    - 输出:将每行文本数据拆分成单词,每个单词与数字1组成键值对&lt;单词,1&gt;。这里,单词作为key,1作为value,两者都是Hadoop序列化框架中的类型,如Text对应String,IntWritable对应Integer。 - 注意:map阶段的输入...

    MapReduce学习笔记,亲自测试写出来的,1000分都不贵

    - **Map 程序调用用户 map() 方法**:每当 Map 程序读取一行数据时,就会调用用户的 `map()` 方法,并将这一行数据的起始偏移量作为 key,数据内容作为 value 传入。 - **Reduce 程序调用用户 reduce() 方法**:...

    MapReduce源码流程.pdf

    Split切片是MapReduce处理数据的第一步,它将大文件分割成多个小的逻辑单元,以便于并行处理。源代码分析从提交任务开始,核心方法为`submit()`,进一步深入`submitJobInternal()`方法,其中主要检查输出规格、配置...

    mapreduce详细流程

    1. **作业提交(Job Submission)**:当用户通过Hadoop的API提交一个MapReduce作业时,JobTracker接收到请求,将作业的JAR包和配置信息分发到集群中的TaskTracker节点。 2. **作业初始化(Job Initialization)**:...

    grub4dos-V0.4.6a-2017-02-04更新

    根据短文件名偏移 0x0c 处:位 3=1 表示文件名小写,位 4=1 表示文件扩展名小写。 4.可以正确识别 mkisofs 2.00/2.01 生成有 bug 的 Joliet 格式光盘。 2013-10-18 1.新增功能类似CMD的PATHEXT,可以设置默认的...

    【面试宝典】2021年超全超详细的最新大数据开发面试题,附答案解析(一版).pdf

    9. MR机制:MapReduce中的MapTask和ReduceTask分别负责数据的映射和归约操作,shuffle阶段负责排序、合并和传输map输出到reduce。 10. Yarn架构与工作原理:Yarn是一个资源管理平台,负责资源管理和任务调度,包括...

Global site tag (gtag.js) - Google Analytics