`

多个mapreduce工作相互依赖处理方法完整实例(JobControl)

阅读更多

        处理复杂的要求的时候,有时一个mapreduce程序时完成不了的,往往需要多个mapreduce程序,这个时候就要牵扯到各个任务之间的依赖关系所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入,以此类推,完成几个mapreduce程序,得到最后的结果,下面将直接贴出一个例子的全部代码,因为为了找一个完整的例子实在是太难了,今天找了半天才把这个问题解决。

         代码描述,一共包括两个mapreduce作业。也就是两个map和两个reduce函数,第一个job处理后的输出是第二个job的输入,然后交由第二个job来做出最后的结果,代码里面的关键的地方已经有了注释

 

先是代码的主体部分:



 

 

上代码:

/*
 * anthor TMS
 */
package 依赖MR处理方法;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 public class MODEL {
	 
	 //第一个Job的map函数
	 public static class Map_First extends Mapper<Object, Text  ,Text , IntWritable>{                                                                                               	    private final static IntWritable one = new IntWritable(1);
		    private Text keys = new Text();
		    public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {
		    	String s = value.toString();
		    	String[]  allStr = Config.CatString(s);
		        keys.set(allStr[1]); 
		    	context.write(keys, one);
		    }
	  }
	
	 //第一个Job的reduce函数
	  public static class Reduce_First extends Reducer<Text, IntWritable, Text, IntWritable> {
		  private IntWritable result = new IntWritable();
		  public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {
			   int sum = 0;
			   for(IntWritable value:values) {
				   sum  +=  value.get();
			   }
			    result.set(sum);
			   
			    context.write(key, result);
		  }
	  }
	  
	  //第二个job的map函数
	  public static class Map_Second extends Mapper<Object, Text  ,Text , IntWritable>{        
		    private final static IntWritable one = new IntWritable(1);
		    private Text keys = new Text();
		    public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {

		    	String s = value.toString();
		    	String[]  allStr = Config.CatString(s);
		        keys.set(allStr[1]); 
		    	context.write(keys, one);
		    }
	  }
	  
	  //第二个Job的reduce函数
	  public static class Reduce_Second extends Reducer<Text, IntWritable, Text, IntWritable> {
		  private IntWritable result = new IntWritable();
		  public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {
			   int sum = 0;
			   for(IntWritable value:values) {
				   sum  +=  value.get();
			   }
			    result.set(sum);
			    context.write(key, result);
		  }
	  }
	  
	  //启动函数
	  public static void main(String[] args) throws IOException {
		
		JobConf conf = new JobConf(MODEL.class);
		
		//第一个job的配置
		Job job1 = new Job(conf,"join1");
		job1.setJarByClass(MODEL.class); 

	    job1.setMapperClass(Map_First.class); 
	    job1.setReducerClass(Reduce_First.class); 

		job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key 
		job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value 
	
		job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key 
		job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value 
		
		//加入控制容器 
		ControlledJob ctrljob1=new  ControlledJob(conf); 
		ctrljob1.setJob(job1); 
		//job1的输入输出文件路径
		FileInputFormat.addInputPath(job1, new Path(args[0])); 
	    FileOutputFormat.setOutputPath(job1, new Path(args[1])); 

	    //第二个job的配置
    	Job job2=new Job(conf,"Join2"); 
	    job2.setJarByClass(MODEL.class); 
	    
	    job2.setMapperClass(Map_Second.class); 
	   job2.setReducerClass(Reduce_Second.class); 
	   
		job2.setMapOutputKeyClass(Text.class);//map阶段的输出的key 
		job2.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value 

		job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key 
		job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value 

		//作业2加入控制容器 
		ControlledJob ctrljob2=new ControlledJob(conf); 
		ctrljob2.setJob(job2); 
	
	   //设置多个作业直接的依赖关系 
       //如下所写: 
	   //意思为job2的启动,依赖于job1作业的完成 
	
		ctrljob2.addDependingJob(ctrljob1); 
		
		//输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好
		FileInputFormat.addInputPath(job2, new Path(args[1]));
		
		//输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得
		//因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了
		FileOutputFormat.setOutputPath(job2,new Path(args[2]) );

		//主的控制容器,控制上面的总的两个子作业 
		JobControl jobCtrl=new JobControl("myctrl"); 
	
		//添加到总的JobControl里,进行控制
		jobCtrl.addJob(ctrljob1); 
		jobCtrl.addJob(ctrljob2); 


		//在线程启动,记住一定要有这个
		Thread  t=new Thread(jobCtrl); 
		t.start(); 

		while(true){ 

		if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息 
		System.out.println(jobCtrl.getSuccessfulJobList()); 
		jobCtrl.stop(); 
		break; 
		}
		}
		}
}

 

   工程上右键run进行配置:先配置第一个栏目main里面的Project(项目名)和Main Class(主类名) 

    

 

接下来是arguments如下所示:



 

最后点击Apply然后Run,运行成功之后,刷新DFS出现几个文件,如下分别为输入的原始数据文件,第一个mapreduce任务后输出的文件output和第二个mapreduce任务之后输出的文件output1



 

 

这里只有两个mapreduce任务,多个也是一样,主要的思想就是先写好每一个mapreduce任务的主体部分,也就是map和reduce函数,然后就是分别配置每一个mapreduce任务(这里要注意设置好输入和输出路径,很容易忘记!!!)此时将job任务加入到控制容器,每一个都要加,再就是使用addDependingJob()添加依赖关系,再用一个总的控制器控制每一个任务。最后用一个线程启动!!!

 

 

 

        

 

  • 大小: 66.1 KB
  • 大小: 73.1 KB
  • 大小: 104.4 KB
  • 大小: 145.5 KB
3
0
分享到:
评论
2 楼 MNTMs 2014-07-28  
我现在用的还是1.1.2
1 楼 SpringJava 2014-07-28  
楼主的hadoop是哪个版本的?

相关推荐

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    - **MultipleInputs/MultipleOutputs**:Hadoop API提供的工具类,用于一个Job处理多个输入源或产生多个输出结果。 3. **参数传递**: - **JobConf**:每个Job都有自己的JobConf对象,可以通过设置conf属性将参数...

    Hadoop之MapReduce编程实例完整源码

    包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作步骤。学习完此例子后,你能掌握MapReduce基础编程,及如何编译Java文件,打包jar文件,编写shell执行脚本等。后续学习还可以参看本人的...

    HBase MapReduce完整实例.rar

    在"**HBase MapReduce完整实例.zip**"这个压缩包中,可能包含了以下内容: 1. **案例介绍**:详细讲解如何使用HBase和MapReduce进行数据处理,包括设置环境、配置HBase与MapReduce的集成、编写MapReduce程序等步骤...

    20、MapReduce 工作流介绍

    总结来说,MapReduce工作流是Hadoop中处理多阶段数据处理任务的重要工具,通过`JobControl` 和`ControlledJob` ,可以管理和协调一系列相互依赖的MapReduce作业,确保它们按照正确的顺序和条件执行。这对于实现复杂...

    MapReduce数据统计简单实例

    在Map阶段,输入数据被分割成多个小块(split),然后分配给各个工作节点(mapper)进行处理。在这里,"有关114查询的数据例子.txt"可能就是我们要处理的原始数据。Mapper接收键值对(key-value pair)作为输入,对...

    HBase MapReduce完整实例

    本实例中提供的Eclipse工程是一个完整的HBase MapReduce应用。工程包含了HBase的增删改查功能,以及一个名为Test的测试类,运行这个测试类可以观察到实际效果。 1. 数据导入:使用MapReduce将数据批量导入HBase,这...

    Mapreduce工作流程-3 计算实例

    Mapreduce工作流程-3 计算实例

    mapreduce实例

    总的来说,MapReduce实例在Hadoop平台上用于处理大数据,通过Map和Reduce两个步骤,实现了数据的分布式处理。理解并熟练掌握MapReduce编程模型,对于开发高效的大数据解决方案至关重要。而实际应用中,还需要关注...

    用于多个MapReduce作业的任务调度算法.pdf

    本文提出了一个用于多个MapReduce作业的任务调度算法,解决了多个MapReduce作业之间的调度问题,并且考虑了数据依赖限制和带宽有限的数据传输成本。 二、算法设计 本文的调度算法基于优先级约束,考虑了工作流应用...

    MapReduce实例分析:单词计数

    本节通过单词计数实例来阐述采用 MapReduce 解决实际问题的基本思路和具体实现过程。 设计思路 首先,检查单词计数是否可以使用 MapReduce 进行处理。因为在单词计数程序任务中,不同单词的出现次数之间不存在...

    MapReduce编程实例:单词计数

    假设文件的量比较大,每个文档又包含大量的单词,则无法使用传统的线性程序进行处理,而这类问题正是 MapReduce 可以发挥优势的地方。 在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词...

    用mapreduce进行文本处理

    ### 使用MapReduce进行文本处理 #### 一、引言与背景 MapReduce 是一种用于处理大规模数据集(尤其是海量文本数据)的编程模型及其相关的实现。这种模型最初由 Google 提出,并被广泛应用于搜索引擎和大数据处理...

    MapReduce综合案例(4个)

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。它将复杂的并行计算任务分解为两个主要阶段:Map(映射)和Reduce(化简)。在这个综合案例中,我们将探讨四个具体的应用...

    MapReduce操作实例-数据去重.pdf

    在这个实例中,我们看到的是一个基于MapReduce的数据去重操作,这个操作在大数据处理中非常常见,尤其是当处理的数据源包含重复记录时。下面将详细解释每个部分的作用。 1. **Mapper类**: 在`DedupMapper`类中,...

    MapReduce实例

    在这个实例中,我们看到MapReduce被用来从Hbase数据库中提取海量数据,对其进行处理,然后将统计结果存储到MySQL数据库中。这个过程涉及到大数据处理的核心技术,下面我们将深入探讨这些知识点。 首先,**Hbase** ...

    分布式文件系统实例-mapreduce-排序

    这个面向新手的MapReduce排序实例,旨在帮助理解大数据处理的基本流程。通过将大任务拆分为并行的map和reduce任务,MapReduce能够在分布式环境中高效地完成数据排序。对于学习大数据处理和理解MapReduce模型的人来说...

    MapReduce简单程序示例

    在大数据处理领域,MapReduce已经成为一种重要的工具,广泛应用于日志分析、搜索引擎索引构建、机器学习等多个场景。 Map阶段是数据处理的开始,它接收输入数据,并将其划分为一系列键值对(key-value pairs)。...

Global site tag (gtag.js) - Google Analytics