`
wangzjie
  • 浏览: 74978 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop 新版api中的JobControl实现

阅读更多

依赖关系组合式MapReduce

 

答案是采取JobControl,直接上代码。

JobControl依赖关系组合式MpaReduce。

旧版实现:org.apache.hadoop.mapred包下

Configuration job1conf = new Configuration();
Job job1 = new Job(job1conf,"Job1");
.........//job1 其他设置
Configuration job2conf = new Configuration();
Job job2 = new Job(job2conf,"Job2");
.........//job2 其他设置
Configuration job3conf = new Configuration();
Job job3 = new Job(job3conf,"Job3");
.........//job3 其他设置
job3.addDepending(job1);//设置job3和job1的依赖关系
job3.addDepending(job2);
JobControl JC = new JobControl("123");
JC.addJob(job1);//把三个job加入到jobcontorl中
JC.addJob(job2);
JC.addJob(job3);
JC.run();

 新版api实现:org.apache.hadoop.mapreduce.lib.jobcontrol.*(在hadoop 0.20.2还没有,hadoop 1.x已经有了)

使用该包下的ControlledJob与JobControl

/** 
     * job2 依赖于 job1 
     * @param job1 
     * @param job2 
     * @param chainName 
     * @return 
     * @throws IOException 
     */  
    public static int handleJobChain(Job job1 ,Job job2, String chainName) throws IOException{  
        ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
        controlledJob1.setJob(job1);         
        ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration()) 
        controlledJob2.setJob(job2);  
        controlledJob2.addDependingJob(controlledJob1);          
        JobControl jc = new JobControl(chainName);  
        jc.addJob(controlledJob1);  
        jc.addJob(controlledJob2);  
        Thread jcThread = new Thread(jc);  
        jcThread.start();  
        while(true){  
            if(jc.allFinished()){  
                System.out.println(jc.getSuccessfulJobList());  
                jc.stop();  
                return 0;  
            }  
            if(jc.getFailedJobList().size() > 0){  
                System.out.println(jc.getFailedJobList());  
                jc.stop();  
                return 1;  
            }  
        }  
    } 

 

要注意的地方就是hadoop的JobControl类实现了线程Runnable接口。我们需要实例化一个线程来让它启动。直接调用JobControl的run()方法,线程将无法结束。要调用JobControl.stop()才能停止

分享到:
评论

相关推荐

    hadoop0.23.9离线api

    org.apache.hadoop.mapreduce.lib.jobcontrol org.apache.hadoop.mapreduce.lib.join org.apache.hadoop.mapreduce.lib.map org.apache.hadoop.mapreduce.lib.output org.apache.hadoop.mapreduce.lib.partition...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系结构3.1.2...API解析3.4.1 Hadoop Streaming的实现原理3.4.2 Hadoop Pipes的实现原理3.5 Hadoop工作流3.5.1 JobControl的实现原理3.5.2 ...

    hadoop-3.1.3-src.tar.gz

    - **Hadoop的权限和ACLs**:理解HDFS的文件权限模型和Access Control Lists,实现细粒度的权限控制。 7. **Hadoop的运维实践** - **监控与报警**:监控Hadoop集群的健康状态,设置告警机制,及时发现和解决问题。...

    《Hadoop开发者》第四期

    包括但不限于海量数据处理平台的架构演变、Hive中的计算不均衡问题解决方法、...API管理Job的方法、集群配置调优、Java规范及经验、MapReduce开发经验总结、Hadoop中的tar命令实现以及Hadoop技术论坛的运营数据分析等...

    Hadoop开发者第四期

    #### 九、通过Hadoop的API管理Job Hadoop提供了丰富的API来管理和控制MapReduce作业。主要包括以下几个方面: 1. **提交作业**:使用API提交MapReduce作业。 2. **监控作业**:通过API查询作业的状态和进度。 3. *...

    Hadoop开发第四期

    #### 三、Join算子在Hadoop中的实现 - **Join操作简介**: - **Join**是数据库中的一种基本操作,用于合并两个表的数据。在Hadoop中,由于数据分布在不同的节点上,因此实现Join操作变得更加复杂。 - **实现方法**: ...

    Hadoop_MapReduce教程

    - **JobControl**:JobControl 提供了一组用于管理多个作业的工具。 - **数据压缩**:为了节省存储空间和网络带宽,Hadoop 支持多种压缩算法,可以在作业的不同阶段启用压缩功能。 #### 例子:WordCount v1.0 - **...

    hadoop_the_definitive_guide_3nd_edition

    JobControl 182 Apache Oozie 182 6. How MapReduce Works . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 187 Anatomy of a MapReduce Job Run 187 Classic ...

    MapReduce-Code:Hadoop平台下的MapReduce源码分析

    MapReduce源码分析(主要四大模块,其他表示父目录下的.java文件的总称):1.org.apache.hadoop.mapred(旧版MapReduceAPI):( 1).jobcontrol(job作业直接控制类)(2 ).join :(作业作业中用于模仿数据连接处理...

    行业分类-设备装置-在线印刷服务系统及其存储媒体.zip

    这可能需要特定的设备驱动程序和接口协议,如JDF(Job Definition Format)或PCL(Printer Control Language)。 6. 存储媒体:在系统中,大量数据包括用户文件、订单信息、设计模板等需要安全存储。这需要云存储...

Global site tag (gtag.js) - Google Analytics