`

多个MapReduce任务实现任务间相互依赖

 
阅读更多

工作过程工遇到有些情况,需要多个mapreduce程序相互依赖执行,使用ControlledJob类可以轻松处理多个JOB之间按照顺序执行。

 

输入文件为2个,t1.txt和t2.txt,内容如下:

t1:

a
b
a
c
d
a
b

 

t2:

c
a
e
d
b
a
e
c
c

 

执行如下命令:

 

hadoop jar /home/sospdm/morejobs.jar morejobs.morejobs input output output1

 

分别得到2个输出文件夹output和output1,内容如下:

 

[sospdm@master2-dev ~]$ hadoop fs -cat output/part-r-00000
a       5
b       3
c       4
d       2
e       2

 

[sospdm@master2-dev ~]$ hadoop fs -cat output1/part-r-00000
sum     16

 

 

 

 

经过处理后第一个MAPREDUCE程序得到词频,第二个程序以第一个输出文件夹作为输入,得出总词频。

代码:

package morejobs;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;;

 public class morejobs {
   
/***************第一个MAPREDUCE实现单词计数
 * 
 * @author zhangliang
 *
 */
   //第一个Job的map函数
   public static class Map_First extends Mapper<Object, Text  ,Text , IntWritable>{                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
        private final static IntWritable one = new IntWritable(1);
        private Text keys = new Text();
        public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {
        	String s = value.toString();
            keys.set(s); 
        	context.write(keys, one);
        }
    }
  
   //第一个Job的reduce函数
    public static class Reduce_First extends Reducer<Text, IntWritable, Text, IntWritable> {
      private IntWritable result = new IntWritable();
      public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {
         int sum = 0;
         for(IntWritable value:values) {
           sum  +=  value.get();
         }
          result.set(sum);
         
          context.write(key, result);
      }
    }
    
    
/***************************第二个MAPREDUCE实现计数后求和
 * 
 * @author 11100198
 *
 */
    
    
    //第二个job的map函数
    public static class Map_Second extends Mapper<Object, Text  ,Text , IntWritable>{                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
        private final static IntWritable one = new IntWritable(1);
        private Text keys = new Text("sum");
        public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {
        	Pattern p = Pattern.compile("\\d{1,10}$");
        	Matcher m = p.matcher(value.toString());
        	m.find();
        	int s = Integer.valueOf(m.group()).intValue();
        	one.set(s);
        	context.write(keys, one);
        }
    }
    
    //第二个Job的reduce函数
    public static class Reduce_Second extends Reducer<Text, IntWritable, Text, IntWritable> {
      private IntWritable result = new IntWritable();
      public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {
         int sum = 0;
         for(IntWritable value:values) {
           sum  +=  value.get();
         }
          result.set(sum);
          context.write(key, result);
      }
    }

    
/**********************
 * 以下JOB配置参考:http://www.tuicool.com/articles/N7buuy
 * @param args
 * @throws IOException
 */

    //启动函数
    public static void main(String[] args) throws IOException {
    
    JobConf conf = new JobConf(morejobs.class);
    
    //第一个job的配置
    Job job1 = new Job(conf,"join1");
    job1.setJarByClass(morejobs.class); 

      job1.setMapperClass(Map_First.class); 
      job1.setReducerClass(Reduce_First.class); 

    job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key 
    job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value 
  
    job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key 
    job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value 
    
    //加入控制容器 
    ControlledJob ctrljob1=new  ControlledJob(conf); 
    ctrljob1.setJob(job1); 
    //job1的输入输出文件路径
    FileInputFormat.addInputPath(job1, new Path(args[0])); 
      FileOutputFormat.setOutputPath(job1, new Path(args[1])); 

      //第二个作业的配置
    	Job job2=new Job(conf,"Join2"); 
      job2.setJarByClass(morejobs.class); 
      
      job2.setMapperClass(Map_Second.class); 
     job2.setReducerClass(Reduce_Second.class); 
     
    job2.setMapOutputKeyClass(Text.class);//map阶段的输出的key 
    job2.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value 

    job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key 
    job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value 

    //作业2加入控制容器 
    ControlledJob ctrljob2=new ControlledJob(conf); 
    ctrljob2.setJob(job2); 
  
     //设置多个作业直接的依赖关系 
       //如下所写: 
     //意思为job2的启动,依赖于job1作业的完成 
  
    ctrljob2.addDependingJob(ctrljob1); 
    
    //输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好
    FileInputFormat.addInputPath(job2, new Path(args[1]));
    
    //输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得
    //因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了
    FileOutputFormat.setOutputPath(job2,new Path(args[2]) );

    //主的控制容器,控制上面的总的两个子作业 
    JobControl jobCtrl=new JobControl("myctrl"); 
  
    //添加到总的JobControl里,进行控制
    jobCtrl.addJob(ctrljob1); 
    jobCtrl.addJob(ctrljob2); 


    //在线程启动,记住一定要有这个
    Thread  t=new Thread(jobCtrl); 
    t.start(); 

    while(true){ 

    if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息 
    System.out.println(jobCtrl.getSuccessfulJobList()); 
    jobCtrl.stop(); 
    break; 
    }
    }
    }
 }

 

 

 

0
2
分享到:
评论

相关推荐

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    - **ChainMapper/ChainReducer**:可以链式连接多个mapper或reducer,但并不适用于所有情况,特别是当Job间有复杂依赖时。 - **MultipleInputs/MultipleOutputs**:Hadoop API提供的工具类,用于一个Job处理多个...

    20、MapReduce 工作流介绍

    MapReduce工作流是一种在Hadoop生态系统中处理大数据的机制,它允许多个MapReduce作业(MR作业)按照特定的依赖顺序依次执行,以完成更复杂的计算任务。这些作业之间的依赖关系通常形成一个有向无环图(DAG),其中...

    FlowS:一种MapReduce数据流公平调度方法

    用户可以通过Hadoop提交多个MapReduce任务(Job),这些任务通常由若干个MapTask和ReduceTask组成。Hadoop集群中的JobTracker负责调度这些任务至各个TaskTracker节点执行。 MapReduce数据流的概念超越了单一Job的...

    尚硅谷大数据技术之Hadoop(MapReduce)1

    - 仅支持一个Map阶段和一个Reduce阶段,复杂逻辑需通过多个MapReduce作业串联。 1.4 MapReduce进程 MapReduce执行过程中涉及的主要进程包括JobTracker、TaskTracker和Task。JobTracker负责任务调度和资源管理,...

    大数据MapReduce和YARN架构原理.pdf

    MapReduce不适合实时的交互式计算、流式计算、子任务之间相互依赖等场景。 MapReduce的原理是基于映射和化简两个步骤。映射函数对一些独立元素组成的概念上的列表的每一个元素进行指定的操作,每个元素都是被独立...

    eclipse的mapreduce插件

    7. **多用户支持**:插件支持多用户开发,这意味着多个开发者可以在同一台机器上使用各自的Eclipse实例,分别连接到不同的Hadoop集群,而不会相互干扰。 8. **源码分析**:通过插件,开发者可以查看和理解Hadoop...

    改进型MapReduce(第二版).pdf

    每个Job由一个或多个Tasks组成,而同一Job内的Task之间是相互独立且平等的,不存在依赖关系,也没有优先级之分。这就导致了在数据处理过程中可能会出现任务分配不均的情况,比如数据倾斜问题。数据倾斜是指在数据...

    企业级IT架构分享 云计算架构师成长之路 改进型MapReduce(第二版) 共5页.pdf

    每个Job由多个Tasks组成,这些Tasks之间相互独立,不存在依赖关系或优先级差异。 **2. 数据均衡策略** 针对原始MapReduce中存在的数据不均衡问题,改进型MapReduce提出了以下策略: - **Key过于聚集的情况**:...

    Azkaban 大数据任务调度器

    在大数据处理场景中,通常会涉及到多个独立或相互依赖的任务单元,如Shell脚本、Java程序、MapReduce任务和Hive脚本等。这些任务单元之间存在时间先后顺序和依赖关系,例如在数据清洗和分析过程中,原始数据首先需要...

    大数据分析——RDBMS与MapReduce的竞争与共生

    Map阶段负责将输入数据分割成多个小块并进行初步处理;Reduce阶段则对Map阶段产生的中间结果进行汇总,得到最终结果。这种设计使得MapReduce能够很好地支持数据的分布式处理,具有良好的可扩展性和容错性。 #### ...

    Python-一个Python模块帮助你构建复杂管道的批处理作业

    Luigi是一个工作流系统,能够帮助开发者组织大量相互依赖的任务,并确保它们按照正确的顺序执行。 在大数据处理领域,批处理作业通常涉及多个步骤,例如数据提取、清洗、转换和加载(ETL过程)。Luigi提供了一种...

    双鱼:在MapReduce中优化多作业应用程序执行

    如今,许多MapReduce应用程序由相互依赖的作业组组成,例如迭代机器学习应用程序和大型数据库查询。 不幸的是,MapReduce框架并未针对这些多任务应用程序进行优化。 它不会探索作业之间执行重叠的机会,而只能独立...

    Hadoop源码分析 完整版 共55章

    - **Hadoop生态系统**:Hadoop不仅包括HDFS和MapReduce两大核心组件,还包括了其他多个重要的子项目和技术,形成了一个完整的生态系统。 - **包结构及依赖**:Hadoop的包结构非常复杂,这主要归因于HDFS提供了统一的...

    MultitudeOfThreading 并行计算多线程演示

    多线程是现代计算机科学中的一个重要概念,它允许一个应用程序同时执行多个任务,从而充分利用多核处理器的能力。这个演示可能包括创建、同步、通信和管理线程的方法,以及如何避免常见的多线程问题,如竞态条件和...

    azkaban.zip

    7. **Azkaban项目结构**:阿兹卡班的项目通常由一系列相互依赖的作业(Job)组成,形成一个工作流。这些作业可以是Shell脚本、Java程序、Hadoop MapReduce任务等。用户通过Azkaban的Web界面或命令行工具将这些作业...

    appengine-crawl-mapreduce-test:一个简单的多模块appengine项目,学习多模块、crawler4j和map reduce

    在Java中,多模块项目意味着项目被分解为多个相互独立的组件,每个组件都有自己的功能和职责。这通常通过Maven或Gradle等构建工具实现,每个模块有自己的 pom.xml 或 build.gradle 文件。这样做的好处包括更好的代码...

    基于MapReduce的加权朴素贝叶斯并行算法在网络信息情感分析中的应用

    MapReduce模型能够自动将任务分配到多个节点上并行处理,极大地提高了大规模数据处理的效率。 4. 并行算法:并行算法是指能够在多个处理器或计算节点上同时执行的算法。在情感分析中,设计并行算法可以有效处理大...

    win下maven创建的hadoop程序demo

    【标题】"win下maven创建的hadoop程序demo"涉及了多个IT领域的知识点,包括Windows操作系统、Maven构建工具、Hadoop分布式框架以及MapReduce编程模型。下面将逐一详细介绍这些概念及其相互关系。 1. **Windows操作...

Global site tag (gtag.js) - Google Analytics