前面介绍了什么是MapReduce,然后通过一个简单的例子来说明MapReduce的流程。但都是针对单个Map函数和Reduce函数。在实际业务中可能会很复杂,可能含有多个MapReduce流程配合使用才能得到想要的结果。本节介绍复杂的MapReduce流程
1.线性MapReduce Job流
线性含义很简答,就是一个一个MapReduce Job依次执行。AMap的输出交给AReduce,AReduce的输出结果交给BMap,BMap的输出交给BReduce……就这样一直下去。
实现方式:将每个Job的启动代码设置成只有上一个Job结束之后才执行,然后将Job的输入设置成上一个Job的输出路劲
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "average"); job.setJarByClass(TestMR.class); job.setMapperClass(MyMap.class); job.setCombinerClass(MyReduce.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/user/text.txt")); //Map的输入 FileOutputFormat.setOutputPath(job, new Path("/user/helloMR/success"));//Reduce的输出 job.waitForCompletion(true); Job job2 = new Job(conf, "average2"); job2.setJarByClass(WordCount.class); job2.setMapperClass(TokenizerMapper.class); job2.setCombinerClass(IntSumReducer.class); job2.setReducerClass(IntSumReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job2, new Path("/user/helloMR/success")); //Map的输入 FileOutputFormat.setOutputPath(job2, new Path("/user/helloMR/success2"));//Reduce的输出 System.exit(job2.waitForCompletion(true) ? 0 : 1); }
以上代码仅仅是模拟线性MapReduce Job流,并没用实际的业务含义
2.各种依赖MapReduce Job流
有时候几个MapReduce Job中可能没有上面说的线性关系。可能是AMapReduce+BMapReduce两个的输出结果做为CMapReduce的输入。并且AMapReduce和BMapReduce之间没有任何关系。
实现方式:hadoop为我们提供了这种复杂的Job流API,ControlledJob类和JobControl类。先按照正常情况配置各个Job,配置完后在将所有的Job封装到对应的ControlledJob对象中,然后使用ControlledJob的addDependingJob()设置依赖关系,接着在实例化一个JobControl对象,并使用addJob()方法将多有的Job注入JobControl对象中,最后使用JobControl对象的run方法启动Job流
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //配置作业1 Job job = new Job(conf, "average"); job.setJarByClass(TestMR.class); job.setMapperClass(MyMap.class); job.setCombinerClass(MyReduce.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/user/text.txt")); //Map的输入 FileOutputFormat.setOutputPath(job, new Path("/user/helloMR/success"));//Reduce的输出 //配置作业2 Job job2 = new Job(conf, "average2"); job2.setJarByClass(WordCount.class); job2.setMapperClass(TokenizerMapper.class); job2.setCombinerClass(IntSumReducer.class); job2.setReducerClass(IntSumReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job2, new Path("/user/text.txt")); //Map的输入 FileOutputFormat.setOutputPath(job2, new Path("/user/helloMR/success2"));//Reduce的输出 //配置作业3 Job job3 = new Job(conf, "average2"); job3.setJarByClass(WordCount.class); job3.setMapperClass(TokenizerMapper.class); job3.setCombinerClass(IntSumReducer.class); job3.setReducerClass(IntSumReducer.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(IntWritable.class); //***********作业3的输入是作业1和作业2的输出 FileInputFormat.addInputPath(job3, new Path("/user/helloMR/success")); //Map的输入 FileInputFormat.addInputPath(job3, new Path("/user/helloMR/success2")); FileOutputFormat.setOutputPath(job3, new Path("/user/helloMR/success3"));//Reduce的输出 /** * 特别说明: * 配置依赖关系的作用是确保作业3是在作业1和作业2执行完后才执行,利用作业1和作业2的输出作为作业3的输入。 * 所以在配置作业3时,需要将作业1和作业2的输出路劲作为作业3的输入路径。 */ //配置依赖关系 ControlledJob cj1 = new ControlledJob(conf); cj1.setJob(job); ControlledJob cj2 = new ControlledJob(conf); cj2.setJob(job2); ControlledJob cj3 = new ControlledJob(conf); cj3.setJob(job3); cj3.addDependingJob(cj1); cj3.addDependingJob(cj2); //将所有任务添加到JobControl中 JobControl JC = new JobControl("123"); JC.addJob(cj1); JC.addJob(cj2); JC.addJob(cj3); //启动线程 Thread thread = new Thread(JC); thread.start(); while(true){ if(JC.allFinished()){ System.out.println(JC.getSuccessfulJobList()); JC.stop(); System.exit(0); } if(JC.getFailedJobList().size() > 0){ System.out.println(JC.getFailedJobList()); JC.stop(); System.exit(0); } } }
特别说明:以上两个实例都是我亲自运行过的,结果也是正确的。
相关推荐
在Hadoop生态系统中,MapReduce是一种分布式计算框架,它允许我们处理海量数据并行化,非常适合大规模数据集的处理。本文将深入解析MapReduce的工作原理、核心组件以及如何编写一个基本的MapReduce程序。 MapReduce...
Hadoop 集群配置详解 Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2...Hadoop_Hadoop集群(第9期)_MapReduce初级案例 Hadoop_Hadoop集群(第10期)_MySQL关系数据库 Web(Json-Lib类库使用手册)
### Hadoop MapReduce 教程知识点详解 #### 一、Hadoop MapReduce 概述 Hadoop MapReduce 是一个强大的分布式计算框架,主要用于处理大规模数据集。它通过将任务分解成多个子任务来实现并行处理,从而极大地提高了...
3. **创建Hadoop项目**:在Eclipse的“文件”菜单中选择“新建” -> “其他”,在弹出的对话框中找到Hadoop相关选项,创建Hadoop MapReduce项目。 4. **编写MapReduce代码**:在创建的项目中,编写MapReduce程序,...
《基于Hadoop的MapReduce架构实现KNN算法详解》 在大数据处理的领域中,Hadoop作为开源的分布式计算框架,扮演着至关重要的角色。它以其高效的数据存储和处理能力,为各种复杂算法的实现提供了可能。其中,K近邻(K...
【Hadoop运行DEMO详解】 Hadoop是一款开源的分布式计算框架,由Apache基金会开发,它设计用于处理和存储海量数据。在"cs245-as1-master_Hadoop运行demo_Hadoop学习demo_DEMO_"这个项目中,我们主要关注的是如何在...
3.细细品味Hadoop_Hadoop集群(第3期)_VSFTP安装配置 4.细细品味Hadoop_Hadoop集群(第4期)_SecureCRT使用 5.细细品味Hadoop_Hadoop集群(第5期)_Hadoop安装配置 6.细细品味Hadoop_Hadoop集群(第5期副刊)_JDK和...
大数据Hadoop核心之MapReduce详解 MapReduce是Hadoop核心模块之一,作为一个分布式运算程序的编程框架,用于用户开发基于Hadoop的数据分析应用。MapReduce的核心功能是将用户编写的业务逻辑代码和自带默认组件整合...
### Hadoop集群与HBase应用详解 #### 一、HBase基本概念介绍 **1.1 引言** 随着大数据处理需求的日益增长,Hadoop生态中的HBase因其卓越的数据处理能力和灵活性,成为了众多企业的大数据解决方案之一。本文旨在...
### Hadoop集群构建与配置详解 #### 一、Hadoop概览及集群角色解析 **Hadoop**,作为Apache软件基金会旗下的一款开源分布式计算平台,以其核心组件**Hadoop分布式文件系统**(HDFS)和**MapReduce**而闻名。HDFS提供...
### Hadoop集群中WordCount运行详解 #### 一、MapReduce理论简介 ##### 1.1 MapReduce编程模型概述 MapReduce是一种编程模型,用于处理和生成大型数据集。其核心理念是“分而治之”,即将大规模数据处理任务拆分...
【Hadoop安装教程:单机与伪分布式配置详解】 在大数据处理领域,Hadoop是一个广泛使用的开源框架,它允许在廉价硬件集群上处理大规模数据。本文将指导您如何在Ubuntu 14.04 64位系统上安装Hadoop 2.6.0,无论是...
标题 "3-haddop_hadoop_" 暗示着这是一个关于Hadoop的教程或学习资源,其中包含了Hadoop集群的基本知识和操作指南。Hadoop是一个开源的分布式计算框架,广泛应用于大数据处理领域,尤其在Linux系统环境下表现出色。...
3. **测试Hadoop**:在安装完成后,应进行Hadoop的基本测试,如格式化文件系统、启动集群服务以及检查节点状态等。 4. **导入数据**:将空间数据导入到HDFS中,以便后续进行分析处理。 5. **使用Hadoop进行统计分析*...
在Hadoop 2.x中,MapReduce进行了重大改进,包括YARN(Yet Another Resource Negotiator)的引入,它作为全局资源管理系统,负责管理和调度集群上的计算资源。MapReduce作业现在通过YARN进行调度和执行,提高了集群...