处理复杂的要求的时候,有时一个mapreduce程序时完成不了的,往往需要多个mapreduce程序,这个时候就要牵扯到各个任务之间的依赖关系,所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入,以此类推,完成几个mapreduce程序,得到最后的结果,下面将直接贴出一个例子的全部代码,因为为了找一个完整的例子实在是太难了,今天找了半天才把这个问题解决。
代码描述,一共包括两个mapreduce作业。也就是两个map和两个reduce函数,第一个job处理后的输出是第二个job的输入,然后交由第二个job来做出最后的结果,代码里面的关键的地方已经有了注释
先是代码的主体部分:
上代码:
/* * anthor TMS */ package 依赖MR处理方法; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MODEL { //第一个Job的map函数 public static class Map_First extends Mapper<Object, Text ,Text , IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text keys = new Text(); public void map(Object key,Text value, Context context ) throws IOException, InterruptedException { String s = value.toString(); String[] allStr = Config.CatString(s); keys.set(allStr[1]); context.write(keys, one); } } //第一个Job的reduce函数 public static class Reduce_First extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value:values) { sum += value.get(); } result.set(sum); context.write(key, result); } } //第二个job的map函数 public static class Map_Second extends Mapper<Object, Text ,Text , IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text keys = new Text(); public void map(Object key,Text value, Context context ) throws IOException, InterruptedException { String s = value.toString(); String[] allStr = Config.CatString(s); keys.set(allStr[1]); context.write(keys, one); } } //第二个Job的reduce函数 public static class Reduce_Second extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value:values) { sum += value.get(); } result.set(sum); context.write(key, result); } } //启动函数 public static void main(String[] args) throws IOException { JobConf conf = new JobConf(MODEL.class); //第一个job的配置 Job job1 = new Job(conf,"join1"); job1.setJarByClass(MODEL.class); job1.setMapperClass(Map_First.class); job1.setReducerClass(Reduce_First.class); job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value //加入控制容器 ControlledJob ctrljob1=new ControlledJob(conf); ctrljob1.setJob(job1); //job1的输入输出文件路径 FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); //第二个job的配置 Job job2=new Job(conf,"Join2"); job2.setJarByClass(MODEL.class); job2.setMapperClass(Map_Second.class); job2.setReducerClass(Reduce_Second.class); job2.setMapOutputKeyClass(Text.class);//map阶段的输出的key job2.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value //作业2加入控制容器 ControlledJob ctrljob2=new ControlledJob(conf); ctrljob2.setJob(job2); //设置多个作业直接的依赖关系 //如下所写: //意思为job2的启动,依赖于job1作业的完成 ctrljob2.addDependingJob(ctrljob1); //输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好 FileInputFormat.addInputPath(job2, new Path(args[1])); //输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得 //因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了 FileOutputFormat.setOutputPath(job2,new Path(args[2]) ); //主的控制容器,控制上面的总的两个子作业 JobControl jobCtrl=new JobControl("myctrl"); //添加到总的JobControl里,进行控制 jobCtrl.addJob(ctrljob1); jobCtrl.addJob(ctrljob2); //在线程启动,记住一定要有这个 Thread t=new Thread(jobCtrl); t.start(); while(true){ if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息 System.out.println(jobCtrl.getSuccessfulJobList()); jobCtrl.stop(); break; } } } }
工程上右键run进行配置:先配置第一个栏目main里面的Project(项目名)和Main Class(主类名)
接下来是arguments如下所示:
最后点击Apply然后Run,运行成功之后,刷新DFS出现几个文件,如下分别为输入的原始数据文件,第一个mapreduce任务后输出的文件output和第二个mapreduce任务之后输出的文件output1
这里只有两个mapreduce任务,多个也是一样,主要的思想就是先写好每一个mapreduce任务的主体部分,也就是map和reduce函数,然后就是分别配置每一个mapreduce任务(这里要注意设置好输入和输出路径,很容易忘记!!!)此时将job任务加入到控制容器,每一个都要加,再就是使用addDependingJob()添加依赖关系,再用一个总的控制器控制每一个任务。最后用一个线程启动!!!
相关推荐
- **MultipleInputs/MultipleOutputs**:Hadoop API提供的工具类,用于一个Job处理多个输入源或产生多个输出结果。 3. **参数传递**: - **JobConf**:每个Job都有自己的JobConf对象,可以通过设置conf属性将参数...
包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作步骤。学习完此例子后,你能掌握MapReduce基础编程,及如何编译Java文件,打包jar文件,编写shell执行脚本等。后续学习还可以参看本人的...
在"**HBase MapReduce完整实例.zip**"这个压缩包中,可能包含了以下内容: 1. **案例介绍**:详细讲解如何使用HBase和MapReduce进行数据处理,包括设置环境、配置HBase与MapReduce的集成、编写MapReduce程序等步骤...
总结来说,MapReduce工作流是Hadoop中处理多阶段数据处理任务的重要工具,通过`JobControl` 和`ControlledJob` ,可以管理和协调一系列相互依赖的MapReduce作业,确保它们按照正确的顺序和条件执行。这对于实现复杂...
在Map阶段,输入数据被分割成多个小块(split),然后分配给各个工作节点(mapper)进行处理。在这里,"有关114查询的数据例子.txt"可能就是我们要处理的原始数据。Mapper接收键值对(key-value pair)作为输入,对...
本实例中提供的Eclipse工程是一个完整的HBase MapReduce应用。工程包含了HBase的增删改查功能,以及一个名为Test的测试类,运行这个测试类可以观察到实际效果。 1. 数据导入:使用MapReduce将数据批量导入HBase,这...
Mapreduce工作流程-3 计算实例
总的来说,MapReduce实例在Hadoop平台上用于处理大数据,通过Map和Reduce两个步骤,实现了数据的分布式处理。理解并熟练掌握MapReduce编程模型,对于开发高效的大数据解决方案至关重要。而实际应用中,还需要关注...
本文提出了一个用于多个MapReduce作业的任务调度算法,解决了多个MapReduce作业之间的调度问题,并且考虑了数据依赖限制和带宽有限的数据传输成本。 二、算法设计 本文的调度算法基于优先级约束,考虑了工作流应用...
本节通过单词计数实例来阐述采用 MapReduce 解决实际问题的基本思路和具体实现过程。 设计思路 首先,检查单词计数是否可以使用 MapReduce 进行处理。因为在单词计数程序任务中,不同单词的出现次数之间不存在...
假设文件的量比较大,每个文档又包含大量的单词,则无法使用传统的线性程序进行处理,而这类问题正是 MapReduce 可以发挥优势的地方。 在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词...
### 使用MapReduce进行文本处理 #### 一、引言与背景 MapReduce 是一种用于处理大规模数据集(尤其是海量文本数据)的编程模型及其相关的实现。这种模型最初由 Google 提出,并被广泛应用于搜索引擎和大数据处理...
MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。它将复杂的并行计算任务分解为两个主要阶段:Map(映射)和Reduce(化简)。在这个综合案例中,我们将探讨四个具体的应用...
在这个实例中,我们看到的是一个基于MapReduce的数据去重操作,这个操作在大数据处理中非常常见,尤其是当处理的数据源包含重复记录时。下面将详细解释每个部分的作用。 1. **Mapper类**: 在`DedupMapper`类中,...
在这个实例中,我们看到MapReduce被用来从Hbase数据库中提取海量数据,对其进行处理,然后将统计结果存储到MySQL数据库中。这个过程涉及到大数据处理的核心技术,下面我们将深入探讨这些知识点。 首先,**Hbase** ...
这个面向新手的MapReduce排序实例,旨在帮助理解大数据处理的基本流程。通过将大任务拆分为并行的map和reduce任务,MapReduce能够在分布式环境中高效地完成数据排序。对于学习大数据处理和理解MapReduce模型的人来说...
在大数据处理领域,MapReduce已经成为一种重要的工具,广泛应用于日志分析、搜索引擎索引构建、机器学习等多个场景。 Map阶段是数据处理的开始,它接收输入数据,并将其划分为一系列键值对(key-value pairs)。...