学习Hadoop编程,以前看过《Hadoop权威指南》这本书,但是看完了HDFS这一章之后,后面的内容就难以再看懂了,说实话,之前一直对MapReduce程序敬而远之,毫不理解这种类型的程序的执行过程。这一周花了些时间看了Hadoop的实战,现在能够看懂简单的MapReduce程序,也能自己动手写几个简单的例子程序。下面是两个简单的MapReduce程序,用到了一些简单的Hadoop知识点,总结如下文。
例子一 求最大数
问题描述是这样的,从一系列数中,求出最大的那一个。这个需求应该说是很简单的,如果不用MapReduce来实现,普通的Java程序要实现这个需求,应该说是轻而易举的,几行代码就能搞定。这里用这个例子是想说说Hadoop中的Combiner的用法。
我们知道,Hadoop使用Mapper函数将数据处理成一个一个的<key, value>键值对,再在网络节点间对这些键值对进行整理(shuffle),然后使用Reducer函数处理这些键值对,并最终将结果输出。那么可以这样想,如果我们有1亿个数据(Hadoop就是为大数据而生),Mapper函数将会产生1亿个键值对在网络中进行传输,如果我们只是要求出这1亿个数当中的最大值,那么显然,Mapper只需要输出它所知道的最大值即可。这样一来可以减轻网络带宽的压力,二来,可以减轻Reducer的压力,提高程序的效率。
如果Reducer只是运行简单的诸如求最大值、最小值、计数,那么我们可以使用Combiner,但是,如果是求一组数的平均值,千万别用Combiner,道理很简单,你自己分析看。Combiner可以看作是Reducer的帮手,或者看成是Mapper端的Reducer,它能减少Mapper函数的输出从而减少网络数据传输并能减少Reducer上的负载。下面是Combiner的例子程序。
程序的输入是这样的:
12 5 9 21 43 99 65 32 10
MapReduce程序需要找到这一组数字中的最大值99,Mapper函数是这样的:
public class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
@Override
protected void map(Object key, Text value,Context context)throws IOException, InterruptedException {
// TODO Auto-generated method stub
context.write(new Text(), new IntWritable(Integer.parseInt(value.toString())));
}
}
Mapper函数非常简单,它是负责读取HDFS中的数据的,负责将这些数据组成<key, value>对,然后传输给Reducer函数。Reducer函数如下:
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
// TODO Auto-generated method stub
int temp = Integer.MIN_VALUE;
for(IntWritable value : values){
if(value.get() > temp){
temp = value.get();
}
}
context.write(new Text(), new IntWritable(temp));
}
}
Reducer函数也很简单,就是负责找到从Mapper端传来的数据中找到最大值。那么在Mapper函数与Reducer函数之间,有个Combiner,它的代码是这样的:
public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
// TODO Auto-generated method stub
int temp = Integer.MIN_VALUE;
for(IntWritable value : values){
if(value.get() > temp){
temp = value.get();
}
}
context.write(new Text(), new IntWritable(temp));
}
}
我们可以看到,combiner也是继承了Reducer类,其写法与写reduce函数一样,reduce和combiner对外的功能是一样的,只是使用时的位置和上下文(Context)不一样而已。定义好了自己的Combiner函数之后,需要在Job类中加入一行代码,告诉Job你使用要在Mapper端使用Combiner:
job.setCombinerClass(MyCombiner.class);
那么这个求最大数的例子的Job类是这样的:
public class MyMaxNum {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf,"My Max Num");
job.setJarByClass(MyMaxNum.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setCombinerClass(MyCombiner.class);
FileInputFormat.addInputPath(job, new Path("/huhui/nums.txt"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
当然你还可以对输出进行压缩。只要在函数中添加两行代码,就能对Reducer函数的输出结果进行压缩。当然这里没有必要对结果进行压缩,只是作为一个知识点而已。
//对输出进行压缩
conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
例子二 自定义Key的类型
str1 2
str2 5
str3 9
str1 1
str2 3
str3 12
str1 8
str2 7
str3 18
希望得到的输出如下:str1 1,2,8
str2 3,5,7
str3 9,12,19
public class IntPaire implements WritableComparable<IntPaire> {
private String firstKey;
private int secondKey;
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
firstKey = in.readUTF();
secondKey = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(firstKey);
out.writeInt(secondKey);
}
@Override
public int compareTo(IntPaire o) {
// TODO Auto-generated method stub
return o.getFirstKey().compareTo(this.firstKey);
}
public String getFirstKey() {
return firstKey;
}
public void setFirstKey(String firstKey) {
this.firstKey = firstKey;
}
public int getSecondKey() {
return secondKey;
}
public void setSecondKey(int secondKey) {
this.secondKey = secondKey;
}
}
上面重写的readFields方法和write方法,都是这样写的,几乎成为模板。getPartition(KEY key, VALUE value, int numPartitions)
其中,第一个参数key和第二个参数value是Mapper端的输出<key, value>,第三个参数numPartitions表示的是当前Hadoop集群一共有多少个Reducer。输出则是分配的Reducer编号,就是指的是Mapper端输出的键对应到哪一个Reducer中去。我们一般实现Partitioner是哈希散列的方式,它以key的hash值对Reducer的数目取模,得到对应的Reducer编号。这样就能保证相同的key值,必定会分配到同一个reducer上。如果有N个Reducer,那么编号就是0,1,2,3......(N-1)。public class PartitionByText extends Partitioner<IntPaire, IntWritable> {
@Override
public int getPartition(IntPaire key, IntWritable value, int numPartitions) {//reduce的个数
// TODO Auto-generated method stub
return (key.getFirstKey().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
本例还用到了Hadoop的比较器WritableComparator,它实现的是RawComparator接口。public class TextIntComparator extends WritableComparator {
public TextIntComparator(){
super(IntPaire.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
IntPaire o1 = (IntPaire) a;
IntPaire o2 = (IntPaire) b;
if(!o1.getFirstKey().equals(o2.getFirstKey())){
return o1.getFirstKey().compareTo(o2.getFirstKey());
}else{
return o1.getSecondKey() - o2.getSecondKey();
}
}
}
public class TextComparator extends WritableComparator {
public TextComparator(){
super(IntPaire.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
IntPaire o1 = (IntPaire) a;
IntPaire o2 = (IntPaire) b;
return o1.getFirstKey().compareTo(o2.getFirstKey());
}
}
下面将写出Mapper函数,它是以KeyValueTextInputFormat的输入形式读取HDFS中的数据,设置输入格式将在job中。public class SortMapper extends Mapper<Object, Text, IntPaire, IntWritable>{
public IntPaire intPaire = new IntPaire();
public IntWritable intWritable = new IntWritable(0);
@Override
protected void map(Object key, Text value,Context context)throws IOException, InterruptedException {
// TODO Auto-generated method stub
int intValue = Integer.parseInt(value.toString());
intPaire.setFirstKey(key.toString());
intPaire.setSecondKey(intValue);
intWritable.set(intValue);
context.write(intPaire, intWritable);//key:str1 value:5
}
}
下面是Reducer函数,public class SortReducer extends Reducer<IntPaire, IntWritable, Text, Text> {
@Override
protected void reduce(IntPaire key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
// TODO Auto-generated method stub
StringBuffer combineValue = new StringBuffer();
Iterator<IntWritable> itr = values.iterator();
while(itr.hasNext()){
int value = itr.next().get();
combineValue.append(value + ",");
}
int length = combineValue.length();
String str = "";
if(combineValue.length() > 0){
str = combineValue.substring(0, length-1);//去除最后一个逗号
}
context.write(new Text(key.getFirstKey()), new Text(str));
}
}
Job类是这样的:public class SortJob {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "Sortint");
job.setJarByClass(SortJob.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
//设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
//设置map的输出类型
job.setMapOutputKeyClass(IntPaire.class);
job.setMapOutputValueClass(IntWritable.class);
//设置排序
job.setSortComparatorClass(TextIntComparator.class);
//设置group
job.setGroupingComparatorClass(TextComparator.class);//以key进行grouping
job.setPartitionerClass(PartitionByText.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/huhui/input/words.txt"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
这样一来,程序就写完了,按照需求,完成了相应的功能。
相关推荐
Hadoop 架构有两个主要的组件:分布式文件系统 HDFS 和 MapReduce 引擎。 在 Hadoop 中,MapReduce 底层的分布式文件系统是独文模块,用户可按照约定 的一套接口实现自己的分布式文件系统,然后经过简单的配置后,...
Hadoop-Eclipse-Plugin-3.1.1是一款专为Eclipse集成开发环境设计的插件,用于方便地在Hadoop分布式文件系统(HDFS)上进行开发和调试MapReduce程序。这款插件是Hadoop生态系统的组成部分,它使得Java开发者能够更加...
这个压缩包“使用hadoop-streaming运行Python编写的MapReduce程序.rar”显然是一个教程或示例,旨在指导用户如何利用Python编写MapReduce任务,并通过Hadoop Streaming进行执行。 MapReduce是一种编程模型,由...
Hadoop由两个核心部分组成:Hadoop Distributed File System(HDFS)和MapReduce。HDFS为大规模数据提供了高容错、高吞吐量的分布式存储解决方案,而MapReduce则是一种用于大规模数据集并行计算的编程模型。 二、...
主要由两个核心组件构成:HDFS(Hadoop Distributed File System)和MapReduce。HDFS提供高容错性的分布式存储,而MapReduce则负责大规模数据的并行处理。 Eclipse是一款广泛使用的Java集成开发环境,对于Hadoop...
Hadoop主要由两个核心组件构成:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一种分布式文件系统,能够跨多台机器存储和管理海量数据。它具有高容错性和高吞吐量,确保数据的可靠性和快速访问。 ...
Hadoop由Apache软件基金会开发,它主要由两个核心组件构成:Hadoop Distributed File System (HDFS) 和 MapReduce。HDFS是一个分布式文件系统,设计为跨多台计算机(节点)存储和处理大量数据,提供高容错性和高可用...
这个版本的Hadoop包含了核心库、HDFS(Hadoop Distributed File System)、MapReduce以及YARN(Yet Another Resource Negotiator)等关键模块,是大数据处理的基础平台。 描述中提到的"下载资源hadoop2.7.2资源包...
Hadoop-Eclipse-Plugin是Apache Hadoop项目的一个插件,专为Eclipse集成开发环境设计,使得开发者可以在Eclipse中直接编写、调试和运行Hadoop MapReduce程序。这个压缩包包含的是1.0.0版本的插件,并且提供了5个不同...
`hadoop jar`命令,用于运行MapReduce程序。 7. **Hadoop安全性**:Hadoop 2.7.4支持Kerberos认证,可以实现集群的安全访问和数据保护,防止未授权的访问。 8. **Hadoop生态环境**:Hadoop并不只是单一的工具,它...
这两个部分是Hadoop的核心基石,为大数据处理提供了基础架构。这里我们将深入探讨"Hadoop-core-0.20.2"和"hadoop-2.5.1-src"的源码,以便更好地理解Hadoop的工作原理和内部机制。 **Hadoop Core源码分析** Hadoop-...
在Windows系统上配置Hadoop环境时,这两个文件经常会出现问题,因为它们通常不包含在标准的Hadoop发行版中,需要额外下载或从特定路径获取。 1. **hadoop.dll**:这是一个动态链接库文件,对于在Windows系统中运行...
Hadoop 架构有两个主要的组件:分布式文件系统 HDFS 和 MapReduce 引擎。 在 Hadoop 中,MapReduce 底层的分布式文件系统是独文模块,用户可按照约定 的一套接口实现自己的分布式文件系统,然后经过简单的配置后,...
Hadoop的核心由两个主要部分组成:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一个分布式文件系统,它将大文件分割成块,并在多台服务器上进行存储,提供高容错性和高可用性。MapReduce则是一种编程...
Hadoop的核心由两个主要组件构成:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一种分布式文件系统,它将大型数据集分割成块,并在集群中的多个节点上存储这些块,以提高容错性和性能。MapReduce则是...
当这两个世界相遇,便诞生了Hadoop Eclipse Plugin,它使得在Eclipse环境中直接操作Hadoop集群成为可能。本文将深入探讨Hadoop Eclipse Plugin 2.7.4的使用,以及如何将其集成到Eclipse中,提升Hadoop项目的开发效率...
首先,MapReduce的工作原理分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,原始数据被分割成多个小块,每个块都会在集群中的某个节点上执行一个用户定义的“Mapper”函数,将原始数据转化为键值对的形式。...
Hadoop由两个主要组件构成:Hadoop Distributed File System (HDFS) 和 MapReduce。HDFS是一个分布式文件系统,提供高容错性和高可用性,适合处理和存储大量数据。MapReduce则是一种编程模型,用于大规模数据集的...
1. MapReduce编程基础:包括如何配置Hadoop环境,创建并运行第一个MapReduce程序。 2. 数据输入与输出格式:了解InputFormat和OutputFormat接口,以及自定义输入输出格式的必要性。 3. 键值对处理:深入理解Writable...