`

MapReduce 顺序组合, 迭代式,组合式,链式

 
阅读更多

1、顺序组合式

顺序组合式就是按照指定顺序执行任务如:mapreduce1 --> mapreduce2 --> mapreduce3

即:mapreduce1的输出是mapreduce2的输入,mapreduce2的输出式mapreduce3的输入

代码片段如下:

String inPath1 = "hdfs://hadoop0:9000/user/root/3D/";
		String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/";
		String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/";
		String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/";	
		
		// job1配置
		Job job1 = Job.getInstance(conf);
		job1.setJarByClass(Mode.class);
		job1.setMapperClass(Map1.class);
		job1.setReducerClass(Reduce1.class);
		job1.setMapOutputKeyClass(Text.class);
		job1.setMapOutputValueClass(IntWritable.class);
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job1, new Path(inPath1));
		FileOutputFormat.setOutputPath(job1, new Path(outPath1));
		job1.waitForCompletion(true);
		
		// job2配置
		Job job2 = Job.getInstance(conf);
		job2.setJarByClass(Mode.class);
		job2.setMapperClass(Map2.class);
		job2.setReducerClass(Reduce2.class);
		job2.setMapOutputKeyClass(Text.class);
		job2.setMapOutputValueClass(IntWritable.class);
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job2, new Path(inPath1));
		FileOutputFormat.setOutputPath(job2, new Path(outPath2));
		job2.waitForCompletion(true);
		
		// job3配置
		Job job3 = Job.getInstance(conf);
		job3.setJarByClass(Mode.class);
		job3.setMapperClass(Map3.class);
		job3.setReducerClass(Reduce3.class);
		job3.setMapOutputKeyClass(Text.class);
		job3.setMapOutputValueClass(IntWritable.class);
		job3.setOutputKeyClass(Text.class);
		job3.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job3, new Path(outPath2));
		FileOutputFormat.setOutputPath(job3, new Path(outPath3));
		job3.waitForCompletion(true);

子任务作业配置代码运行后,将按照顺序逐个执行每个子任务作业。由于后一个子任务需要使用前一个子任务的输出数据,因此,每一个子任务

都需要等前一个子任务执行执行完毕后才允许执行,这是通过job.waitForCompletion(true)方法加以保证的。

2、迭代组合式

迭代也可以理解为for循环或while循环,当满足某些条件时,循环结束

mapreduce的迭代算法正在研究中,后续提供完整源码....

代码如下:


3、复杂的依赖组合式

处理复杂的要求的时候,有时候一个mapreduce程序完成不了,往往需要多个mapreduce程序 这个时候就牵扯到各个任务之间的依赖关系,

所谓依赖就是一个M/R job的处理结果是另外一个M/R的输入,以此类推,

这里的顺序是 job1 和 job2 单独执行, job3依赖job1和job2执行后的结果

代码如下:

package com.hadoop.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Mode {

	// 第一个Job
	public static class Map1 extends Mapper<Object, Text, Text, IntWritable>{
		Text word = new Text();
		@Override
		protected void map(Object key, Text value,Context context)
				throws IOException, InterruptedException {
			StringTokenizer st = new StringTokenizer(value.toString());
			while(st.hasMoreTokens()){
				word.set(st.nextToken());
				context.write(word, new IntWritable(1));
			}
		}
		
	}
	
	public static class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{
		IntWritable result = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for(IntWritable val : values){
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
		
	}
	
	// 第二个Job
	public static class Map2 extends Mapper<Object, Text, Text, IntWritable>{
		Text word = new Text();
		@Override
		protected void map(Object key, Text value,Context context)
				throws IOException, InterruptedException {
			StringTokenizer st = new StringTokenizer(value.toString());
			while(st.hasMoreTokens()){
				word.set(st.nextToken());
				context.write(word, new IntWritable(1));
			}
		}
	}
	
	public static class Reduce2 extends Reducer<Text, IntWritable, Text, IntWritable>{
		IntWritable result = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for(IntWritable val : values){
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	
	// 第三个Job
	public static class Map3 extends Mapper<Object, Text, Text, IntWritable>{
		Text word = new Text();
		@Override
		protected void map(Object key, Text value,Context context)
				throws IOException, InterruptedException {
			StringTokenizer st = new StringTokenizer(value.toString());
			while(st.hasMoreTokens()){
				word.set(st.nextToken());
				context.write(word, new IntWritable(1));
			}
		}
	}
	
	public static class Reduce3 extends Reducer<Text, IntWritable, Text, IntWritable>{
		IntWritable result = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for(IntWritable val : values){
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	
	
	public static void main(String[] args) throws IOException{
		String inPath1 = "hdfs://hadoop0:9000/user/root/3D/";
		String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/";
		String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/";
		String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/";
		String[] inOut = {inPath1, outPath1};
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, inOut).getRemainingArgs();
		if (otherArgs.length < 2) {
			System.err.println("Usage: wordcount <in> [<in>...] <out>");
			System.exit(2);
		}
		// 判断输出路径是否存在,如存在先删除
		FileSystem hdfs = FileSystem.get(conf);
		Path findFile = new Path(outPath1);
		boolean isExists = hdfs.exists(findFile);
		if(isExists){
			hdfs.delete(findFile, true);
		}
		if(hdfs.exists(new Path(outPath2))){
			hdfs.delete(new Path(outPath2), true);
		}
		if(hdfs.exists(new Path(outPath3))){
			hdfs.delete(new Path(outPath3), true);
		}		
		
		// job1配置
		Job job1 = Job.getInstance(conf);
		job1.setJarByClass(Mode.class);
		job1.setMapperClass(Map1.class);
		job1.setReducerClass(Reduce1.class);
		job1.setMapOutputKeyClass(Text.class);
		job1.setMapOutputValueClass(IntWritable.class);
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job1, new Path(inPath1));
		FileOutputFormat.setOutputPath(job1, new Path(outPath1));
		// 将job1加入控制容器
		ControlledJob ctrljob1 = new ControlledJob(conf);
		ctrljob1.setJob(job1);
		
		// job2配置
		Job job2 = Job.getInstance(conf);
		job2.setJarByClass(Mode.class);
		job2.setMapperClass(Map2.class);
		job2.setReducerClass(Reduce2.class);
		job2.setMapOutputKeyClass(Text.class);
		job2.setMapOutputValueClass(IntWritable.class);
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job2, new Path(inPath1));
		FileOutputFormat.setOutputPath(job2, new Path(outPath2));
		// 将job2加入控制容器
		ControlledJob ctrljob2 = new ControlledJob(conf);
		ctrljob2.setJob(job2);
		
		// job3配置
		Job job3 = Job.getInstance(conf);
		job3.setJarByClass(Mode.class);
		job3.setMapperClass(Map3.class);
		job3.setReducerClass(Reduce3.class);
		job3.setMapOutputKeyClass(Text.class);
		job3.setMapOutputValueClass(IntWritable.class);
		job3.setOutputKeyClass(Text.class);
		job3.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job3, new Path(outPath2));
		FileOutputFormat.setOutputPath(job3, new Path(outPath3));
		ControlledJob ctrljob3 = new ControlledJob(conf);
		// 设置job3依赖job1和job2
		ctrljob3.addDependingJob(ctrljob1);	
		ctrljob3.addDependingJob(ctrljob2);
		ctrljob3.setJob(job3);
		
		
		// 主控制器
		JobControl jobCtrl = new JobControl("myctrl");
		jobCtrl.addJob(ctrljob1);
		jobCtrl.addJob(ctrljob2);
		jobCtrl.addJob(ctrljob3);
		
		// 在启动线程,记住一定要有这个
		Thread t = new Thread(jobCtrl);
		t.start();
		
		while(true){
			// 如果作业全部完成,就打印成功作业的信息
			if(jobCtrl.allFinished()){
				System.out.println(jobCtrl.getSuccessfulJobList());
				jobCtrl.stop();
				break;
			}
		}
	}
	
}


3、链式组合式

所谓连式MapReduce就是用多个Mapper处理任务,最后用一个Reducer输出结果,注意和迭代式和组合式MapReduce的不同之处

一个MapReduce作业可能会有一些前处理和后处理步骤,将这些前后处理步骤以单独的MapReduce任务实现也可以达到目的,但由于

增加了多个MapReduce作业,将增加整个作业的处理周期,而且还会增加很多I/O操作,因此处理效率不高。

Hadoop为此提供了专门的链式Mapper(ChainMapper)和链式Reducer(ChainReducer)来完成这种处理。

ChainMapper允许在一个单一Map任务中添加和使用多个Map子任务;而ChainReducer则允许在一个单一Reduce任务执行了Reduce处理

后,继续使用多个Map子任务完成一些后续处理。

package com.hadoop.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Chain {

	// 第一个Job
	public static class Map1 extends Mapper<LongWritable, Text, Text, IntWritable>{
		Text word = new Text();
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			StringTokenizer st = new StringTokenizer(value.toString());
			while(st.hasMoreTokens()){
				word.set(st.nextToken());
				context.write(word, new IntWritable(1));
			}
		}
		
	}
	
	public static class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{
		IntWritable result = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for(IntWritable val : values){
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
		
	}
	
	// 第二个Job
	public static class Map2 extends Mapper<Text, IntWritable, Text, IntWritable>{
		Text word = new Text();
		@Override
		protected void map(Text key, IntWritable value,Context context)
				throws IOException, InterruptedException {
			StringTokenizer st = new StringTokenizer(value.toString());
			while(st.hasMoreTokens()){
				word.set(st.nextToken());
				context.write(word, new IntWritable(1));
			}
		}
	}
	
	public static class Reduce2 extends Reducer<Text, IntWritable, Text, IntWritable>{
		IntWritable result = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for(IntWritable val : values){
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	
	// 第三个Job
	public static class Map3 extends Mapper<Text, IntWritable, Text, IntWritable>{
		Text word = new Text();
		@Override
		protected void map(Text key, IntWritable value,Context context)
				throws IOException, InterruptedException {
			StringTokenizer st = new StringTokenizer(value.toString());
			while(st.hasMoreTokens()){
				word.set(st.nextToken());
				context.write(word, new IntWritable(1));
			}
		}
	}
	
	public static class Reduce3 extends Reducer<Text, IntWritable, Text, IntWritable>{
		IntWritable result = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for(IntWritable val : values){
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
		String inPath1 = "hdfs://hadoop0:9000/user/root/input/";
		String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/";
		String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/";
		String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/";
		String[] inOut = {inPath1, outPath1};
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, inOut).getRemainingArgs();
		if (otherArgs.length < 2) {
			System.err.println("Usage: wordcount <in> [<in>...] <out>");
			System.exit(2);
		}
		// 判断输出路径是否存在,如存在先删除
		FileSystem hdfs = FileSystem.get(conf);
		Path findFile = new Path(outPath1);
		boolean isExists = hdfs.exists(findFile);
		if(isExists){
			hdfs.delete(findFile, true);
		}
		if(hdfs.exists(new Path(outPath2))){
			hdfs.delete(new Path(outPath2), true);
		}
		if(hdfs.exists(new Path(outPath3))){
			hdfs.delete(new Path(outPath3), true);
		}		
		
		// job1配置
		Job job1 = Job.getInstance(conf);
		job1.setJarByClass(Chain.class);
		job1.setJobName("ChainJob");
		
		FileInputFormat.addInputPath(job1, new Path(inPath1));
		FileOutputFormat.setOutputPath(job1, new Path(outPath1));
		
		// 连式编程要注意的是,可以有多个个Mapper,且后面Mapper的输入是是上一个Mapper的输出,最后一个Mapper的输出是Reducer的输入,
		// 但全局只有一个Reducer
		ChainMapper.addMapper(job1, Map1.class, LongWritable.class, Text.class, Text.class, IntWritable.class, conf);
		ChainMapper.addMapper(job1, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);
		ChainMapper.addMapper(job1, Map3.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);
		
		// 执行顺序 map1 --> map2 --> map3 --> reduce1
		ChainReducer.setReducer(job1, Reduce1.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);
		
		job1.waitForCompletion(true);
		
		
	}
	
}


分享到:
评论

相关推荐

    Hadoop迭代式计算框架Guagua.zip

    Guagua目前主要支持的是同步的Master-Workers结构的迭代式计算框架,今后我们希望能够支持异步方式的迭代计算框架,2012年Google MapReduce之父Jeff Dean发表了一篇论文,上面提到了对神经网络深度模型的支持,文章...

    基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf

    尽管Hadoop提供了链式MapReduce接口来减少作业数量,但这种原生机制仍存在不足。 在本文的研究中,作者首先对MapReduce作业的执行流程进行了深入研究,并分析了现有的开源ETL项目,以理解大数据ETL处理的现状。通过...

    MapReduce基础.pdf

    - **不适合迭代计算**:MapReduce不支持迭代式的算法,这限制了其在某些复杂数据分析场景中的应用。 - **内存管理**:虽然MapReduce能够处理大规模数据,但在处理特别大的数据集时可能会遇到内存不足的问题。 综上...

    MapReduce求行平均值--标准差--迭代器处理--MapReduce案例

    在“迭代器处理”方面,MapReduce框架中的迭代器可以帮助更高效地处理数据。例如,可以使用自定义的迭代器来实现累加、累乘等操作,这在计算平均值和标准差时非常有用。迭代器可以在中间结果上直接进行操作,减少了...

    MapReduce模型在并行式计算机数据挖掘中的应用.pdf

    ### MapReduce模型在并行式计算机数据挖掘中的应用 #### 一、研究背景与意义 随着互联网技术的飞速发展,大数据时代产生了大量的数据。如何有效地处理这些数据,从中提取有价值的信息,成为了一个重要的研究课题。...

    比较Spark和MapReduce执行迭代应用的性能差异源码+学习说明(课程作业).zip

    【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业...比较Spark和MapReduce执行迭代应用的性能差异源码+学习说明(课程作业).zip

    Experiment-of-a-Comparison-with-Iterative-MR-frameworks:我们为在迭代 MapReduce 框架上运行迭代算法而实现的示例代码

    这里的“迭代MapReduce”是指在MapReduce模型基础上支持连续多轮数据处理的框架,适用于机器学习和图处理等需要多次迭代的算法。 1. **迭代MapReduce的概念**:传统的MapReduce模型中,数据处理通常是一次性的,即...

    Mapreduce编程模型

    MapReduce的设计灵感来源于函数式编程语言中的`map`和`reduce`操作。这些概念在函数式编程语言如Lisp中已经存在了很长时间。MapReduce将这些概念应用于大规模数据处理领域,提供了一种简单而强大的编程接口。 - **...

    mapreduce mapreduce mapreduce

    MapReduce是一种分布式计算模型,由Google开发,用于处理和生成大量数据。这个模型主要由两个主要阶段组成:Map(映射)和Reduce(规约)。MapReduce的核心思想是将复杂的大规模数据处理任务分解成一系列可并行执行...

    kmeans(mapreduce)

    这个过程可以视为“更新”步骤,但因为MapReduce模型不支持原地更新,所以需要再次运行MapReduce作业,将新的质心作为输入,开始下一轮迭代。 4. **迭代过程**:重复上述过程,直到质心不再明显变化或者达到预设的...

    一种基于改进的链式MapReduce的并行ETL应用

    提出一种改进的链式MapReduce 框架,并将此框架应用于一个并行ETL 工具,同时提出一些针对ETL 处理的流程级优化规则,使ETL流程产生更少的MapReduce作业,从而减少I/O以及网络传输的消耗;利用某省份手机上网数据与...

    MapReduce发明人关于MapReduce的介绍

    MapReduce的设计灵感来源于Lisp和其他函数式语言中的`map`和`reduce`原语。其工作流程可以分为两个阶段:`map`操作和`reduce`操作。 - **`map`操作**:对输入数据集中的每个逻辑记录应用`map`函数,计算出一系列...

    mars_cuda_mapreduce

    在所提供的部分内容中,我们了解到Mars系统不仅在NVIDIA GPU上运行,还支持AMD GPU、多核心CPU以及它们的组合式并行处理,甚至是分布式系统如Hadoop。系统被划分为不同模块,以便在不同的硬件平台上运行。MarsCUDA指...

    实验项目 MapReduce 编程

    实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...

    基于MapReduce实现决策树算法

    基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...

    MapReduce高阶实现

    - **Spark和Hadoop的比较**:Spark提供了更高效的内存计算,适用于迭代算法和交互式数据分析,但MapReduce更适合批处理任务。 6. **案例研究**: - 实际应用中,MapReduce被广泛应用于搜索引擎的索引构建、推荐...

    mapreduce项目 数据清洗

    为了提高效率,MapReduce作业可以采用各种优化策略,如分区(Partitioning)、排序(Sorting)和组合(Combiner)。分区决定了Reduce任务的分布,排序确保相同键的记录在一起,而组合器则在本地节点上减少网络传输...

    用MapReduce实现KMeans算法

    3. **迭代控制**:由于MapReduce的并行特性,迭代过程的控制相对复杂,通常通过设置固定的迭代次数或者判断中心点的变化幅度来决定是否停止。 4. **中间结果处理**:在MapReduce中,中间结果的存储和传递也是关键。...

    mapreduce在hadoop实现词统计和列式统计

    在这个场景中,我们将讨论如何使用Hadoop的MapReduce来实现词统计和列式统计。 **一、MapReduce原理** MapReduce的工作流程主要包括三个主要阶段:Map、Shuffle(排序)和Reduce。在Map阶段,输入数据被分割成多个...

Global site tag (gtag.js) - Google Analytics