`

hadoop combine 规约

 
阅读更多

 

 

0 简介:

 

a) combine发生在map流程中

b) 一般combine代码和自定义reduce代码相同,如果需要不相同,只需要继承hadoop.mapreduce.Reducer

    在其内写自己代码即可。

c) combine不是hadoop标配,其有使用局限性,

    不会对多个map的输出做处理,不能跨map任务执行,不能代替reduce的作用,

    在进行运算时,运算结果于运算总个数有关系时,就不可以使用,eg:

    求平均值计算,源文件内容为:

    1,1,1,1
    2,2,2

不使用combine下,reduce会接受到7个数字, 总和为 10,在除以7 结果为 10/7 = XXX
如果使用combine,在combine处 1,1,1,1 得到的平均值为1, 2,2,2的平均值为2, 

 然后1,2在传入reduce后在做平均值,结果为1.5

 

d)  而在非依赖总数计算时,combine的出现会减少map端的输出,

   这样shuffle时传输的数据量减少,网络开销减少。 eg:

  源文件内容如下,进行单词计数:

   hello you
   hello me me me

默认不使用combine下的结果为:组是3组 但是记录仍旧是6条 ,
因此在执行job操作时打印为:Map output records=6  Reduce input records=6  Combine input records=0   Combine output records=0
hello, {1,1}
you, {1}
me, {1,1,1}

增加了 combine后,结果就会变成:
因此在执行job操作时打印为:Map output records=6  Combine input records=6   Combine output records=3   Reduce input records=3 Reduce input groups=3  Reduce output records=3

 

hello, {2}
you, {1}
me, {3}

 

 

1 代码

 

package combine;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 实现单词计数功能
 * 增加自定义计数器功能
 * 增加自定义规约combine,使得提高map-->reduce操作传输效率
 * 测试文件 hello内容为:
 * hello	you
 * hello	me	me	me
 * @author zm
 *
 * 问:为什么使用Combiner?
 * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
 * 
 * 问:为什么Combiner不作为MR运行的标配,而是可选步骤哪?
 * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数.
 *
 * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作哪?
 * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。
 * 
 * 输出如下:
Mapper输出<hello,1>
Mapper输出<you,1>
Mapper输出<hello,1>
Mapper输出<me,1>
Mapper输出<me,1>
Mapper输出<me,1>
Combine输入分组<hello,...>
Combine输入键值对<hello,1>
Combine输入键值对<hello,1>
Combiner输出键值对<hello,2>
Combine输入分组<me,...>
Combine输入键值对<me,1>
Combine输入键值对<me,1>
Combine输入键值对<me,1>
Combiner输出键值对<me,3>
Combine输入分组<you,...>
Combine输入键值对<you,1>
Combiner输出键值对<you,1>

MyReducer输入分组<hello,...>
MyReducer输入键值对<hello,2>
MyReducer输入分组<me,...>
MyReducer输入键值对<me,3>
MyReducer输入分组<you,...>
MyReducer输入键值对<you,1>
 */
public class MyWordCounterCombile {

	static String FILE_ROOT = "hdfs://master:9000/";
	static String FILE_INPUT = "hdfs://master:9000/hello";
	static String FILE_OUTPUT = "hdfs://master:9000/out";
	public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
		
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf);
		Path outpath = new Path(FILE_OUTPUT);
		if(fileSystem.exists(outpath)){
			fileSystem.delete(outpath, true);
		}
		
		// 0 定义干活的人
		Job job = new Job(conf);
		// 1.1 告诉干活的人 输入流位置     读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
		FileInputFormat.setInputPaths(job, FILE_INPUT);
		// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		//1.3 分区
		job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分组    目前按照默认方式执行
		//1.5  规约
		job.setCombinerClass(MyCombile.class);
		
		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outpath);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 让干活的人干活
		job.waitForCompletion(true);
		
	}
	
}

/**
 * 继承mapper 覆盖map方法,hadoop有自己的参数类型
 * 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数,
 * 这样,对于文件hello而言,调用MyMapper方法map后得到结果:
 * <hello,1>,<you,1>,<hello,1>,<me,1>
 * 方法后,得到结果为: 
 * KEYIN,      行偏移量
 * VALUEIN,    行文本内容(当前行)
 * KEYOUT,     行中出现的单词
 * VALUEOUT    行中出现单词次数,这里固定写为1
 *
 */
class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

	@Override
	protected void map(LongWritable k1, Text v1, Context context)
			throws IOException, InterruptedException {
		
		Counter helloCounter = context.getCounter("Sensitive Words", "hello");
		String line = v1.toString();
		if(line.contains("hello")){
			helloCounter.increment(1);
		}
		
		String[] v1s = v1.toString().split("\t");
		for(String word : v1s){
			context.write(new Text(word), new LongWritable(1));
			System.out.println("Mapper输出<" + word +"," + 1 + ">"); // <hello,1>  <hello,1> <you,1>  <me,1>
		}
	}
}

//自定义规约器
class MyCombile extends Reducer<Text, LongWritable, Text, LongWritable>{
	
	protected void reduce(Text k2, Iterable<LongWritable> v2s, Context ctx) throws IOException, InterruptedException {
		// 显示k2 在上面执行完mapper和自动合并分组后,有多少组
		System.out.println("Combine输入分组<" + k2.toString() + ",...>");
		long times = 0L;
		for(LongWritable l : v2s){
			times += l.get();
			System.out.println("Combine输入键值对<" + k2.toString() + "," + l.get() + ">");
		}
		ctx.write(k2, new LongWritable(times));
		System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">");
	}
	
}

/**
 * <hello,{1,1}>,<me,{1}>,<you,{1}>, 每个分组调用一次 reduce方法
 * 
 * KEYIN,     行中出现单词
 * VALUEIN,   行中出现单词个数
 * KEYOUT,    文件中出现不同单词
 * VALUEOUT   文件中出现不同单词总个数
 */
class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

	protected void reduce(Text k2, Iterable<LongWritable> v2s, Context ctx)
			throws IOException, InterruptedException {
		System.out.println("MyReducer输入分组<"+k2.toString()+",...>");
		long times = 0L;
		for(LongWritable l : v2s){
			times += l.get();
			System.out.println("MyReducer输入键值对<"+k2.toString()+","+l.get()+">");
		}
		ctx.write(k2, new LongWritable(times));
	}
	
}




 

 

 

 

分享到:
评论

相关推荐

    hadoop2.7.3 Winutils.exe hadoop.dll

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是这个框架的一个稳定版本,它包含了多个改进和优化,以提高性能和稳定性。在这个版本中,Winutils.exe和hadoop.dll是两...

    hadoop的dll文件 hadoop.zip

    Hadoop是一个开源的分布式计算框架,由Apache基金会开发,它主要设计用于处理和存储大量数据。在提供的信息中,我们关注的是"Hadoop的dll文件",这是一个动态链接库(DLL)文件,通常在Windows操作系统中使用,用于...

    hadoop winutils hadoop.dll

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在普通硬件上高效处理大量数据。在Windows环境下,Hadoop的使用与Linux有所不同,因为它的设计最初是针对Linux操作系统的。"winutils"和"hadoop.dll...

    hadoop.dll & winutils.exe For hadoop-2.7.1

    在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式存储和计算的能力。本文将详细探讨与"Hadoop.dll"和"winutils.exe"相关的知识点,以及它们在Hadoop-2.7.1版本中的作用。 Hadoop.dll是Hadoop在...

    hadoop的hadoop.dll和winutils.exe下载

    在Hadoop生态系统中,`hadoop.dll`和`winutils.exe`是两个关键组件,尤其对于Windows用户来说,它们在本地开发和运行Hadoop相关应用时必不可少。`hadoop.dll`是一个动态链接库文件,主要用于在Windows环境中提供...

    hadoop2.7.3的hadoop.dll和winutils.exe

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是Hadoop发展中的一个重要版本,它包含了众多的优化和改进,旨在提高性能、稳定性和易用性。在这个版本中,`hadoop.dll`...

    Hadoop下载 hadoop-2.9.2.tar.gz

    Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo 的工程师 Doug Cutting 和 Mike Cafarella Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo...

    hadoop2.7.7对应的hadoop.dll,winutils.exe

    在Hadoop生态系统中,Hadoop 2.7.7是一个重要的版本,它为大数据处理提供了稳定性和性能优化。Hadoop通常被用作Linux环境下的分布式计算框架,但有时开发者或学习者在Windows环境下也需要进行Hadoop相关的开发和测试...

    Hadoop下载 hadoop-3.3.3.tar.gz

    Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不...

    hadoop.dll & winutils.exe For hadoop-2.6.0

    在Hadoop生态系统中,`hadoop.dll`和`winutils.exe`是两个关键组件,尤其对于Windows用户来说。本文将详细介绍这两个文件以及它们在Hadoop 2.6.0版本中的作用。 `hadoop.dll`是Hadoop在Windows环境下运行所必需的一...

    hadoop2.6 hadoop.dll+winutils.exe

    标题 "hadoop2.6 hadoop.dll+winutils.exe" 提到的是Hadoop 2.6版本中的两个关键组件:`hadoop.dll` 和 `winutils.exe`,这两个组件对于在Windows环境中配置和运行Hadoop至关重要。Hadoop原本是为Linux环境设计的,...

    win环境 hadoop 3.1.0安装包

    在Windows环境下安装Hadoop 3.1.0是学习和使用大数据处理技术的重要步骤。Hadoop是一个开源框架,主要用于分布式存储和处理大规模数据集。在这个过程中,我们将详细讲解Hadoop 3.1.0在Windows上的安装过程以及相关...

    各个版本Hadoop,hadoop.dll以及winutils.exe文件下载大合集

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。它是由Apache软件基金会开发并维护的,旨在实现高效、可扩展的数据处理能力。Hadoop的核心由两个主要组件构成:Hadoop Distributed ...

    hadoop2.7.4 hadoop.dll包括winutils.exe

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,主要由HDFS(Hadoop Distributed File System)和MapReduce两大部分组成,旨在提供一种可靠、可扩展、高效的数据处理和存储解决方案。在标题中提到的...

    winutils+hadoop.dll+eclipse插件(hadoop2.7)

    在Hadoop生态系统中,`winutils.exe`和`hadoop.dll`是Windows环境下运行Hadoop必备的组件,尤其对于开发和测试环境来说至关重要。这里我们深入探讨这两个组件以及与Eclipse插件的相关性。 首先,`winutils.exe`是...

    hadoop2.6.0插件+64位winutils+hadoop.dll

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop2.6.0是这个框架的一个重要版本,它包含了多项优化和改进,以提高系统的稳定性和性能。在这个压缩包中,我们关注的是与Windows...

    Hadoop3.1.3.rar

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心设计是处理和存储大量数据的能力。这个名为"Hadoop3.1.3.rar"的压缩包文件包含了Hadoop 3.1.3版本的所有组件和相关文件,使得用户可以下载并进行...

    hadoop-eclipse-plugin1.2.1 and hadoop-eclipse-plugin2.8.0

    《Hadoop Eclipse Plugin:开发利器的进化》 在大数据领域,Hadoop作为开源分布式计算框架,扮演着核心角色。为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-...

    hadoop.dll & winutils.exe For hadoop-2.8.0

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。这个压缩包文件包含的是"Hadoop.dll"和"winutils.exe"两个关键组件,它们对于在Windows环境下配置和运行Hadoop生态系统至关重要。 首先,...

    hadoop2.7.x_winutils_exe&&hadoop_dll

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。标题"hadop2.7.x_winutils_exe&&hadoop_dll"暗示我们关注的是Hadoop 2.7.x版本在Windows环境下的两个关键组件:`winutils.exe`和`...

Global site tag (gtag.js) - Google Analytics