MapReduce 编程模型 在JAVA中的使用
两项核心操作:Map和Reduce
MapReduce是一个最先由Google提出的分布式计算软件构架,它可以支持大数据量的分布式处理。这个架构最初起源于函数式程式的map和reduce两个函数组成,但它们在MapReduce架构中的应用和原来的使用上的大相径庭。
MapReduce架构是用来解决大数据量的分布式计算问题,然后把计算后的结果放入文件系统或者数据库中。
明白MapReduce 程序的工作原理之后,下一步便是通过代码来实现它。我们需要三样东西:一个map 函数、一个reduce 函数和一些用来运行作业的代码。map函数由Mapper 接口实现来表示,后者声明了一个map()方法。例2-3 显示了我们的map函数实现。
例2-3. 查找最高气温的Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseIntdoesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
该Mapper接口是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键和输出值的类型。就目前的示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop自身提供一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型均可在org.apache.hadoop.io包中找到。这里我们使用LongWritable类型(相当于Java中的Long类型)、Text类型(相当于Java中的String类型)和IntWritable类型(相当于Java 中的Integer类型)。
map()方法的输入是一个键和一个值。我们首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取我们感兴趣的列。
map()方法还提供了OutputCollector实例用于输出内容的写入。在这种情况下,我们将年份数据按Text对象进行读/写 (因为我们把年份当作键),将气温值封装在IntWritable 类型中。
我们只在气温数据不缺失并且所对应质量代码显示为正确的气温读数时,才将其写入输出记录中。
reduce函数通过Reducer进行类似的定义,如例2-4 所示。
例2-4. 查找最高气温的Reducer
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next()。get());
}
output.collect(key, new IntWritable(maxValue));
} }
同样,针对reduce函数也有四个形式参数类型用于指定其输入和输出类型。reduce 函数的输入类型必须与map 函数的输出类型相匹配:即Text类型和IntWritable类型。在这种情况下,reduce函数的输出类型也必须是Text和IntWritable这两种类型,分别输出年份和最高气温。该最高气温是通过循环比较当前气温与已看到的最高气温获得的。
第三部分代码负责运行MapReduce 作业(请参见例2-5)。
例2-5. 该应用程序在气象数据集中找出最高气温
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature<input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
} }
JobConf对象指定了作业执行规范。我们可以用它来控制整个作业的运行。在Hadoop 集群上运行这个作业时,我们需要将代码打包成一个JAR文件(Hadoop会在集群上分发这个文件)。我们无需明确指定JAR 文件的名称,而只需在JobConf的构造函数中传递一个类,Hadoop将通过该类查找包含有该类的JAR文件进而找到相关的JAR文件。
构造JobConf对象之后,需要指定输入和输出数据的路径。调用 FileInputFormat类的静态函数addInputPath()来定义输入数据的路径,该路径可以是单个文件、目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一组文件。由函数名可知,可以多次调用addInputPath()实现多路径的输入。
通过调用FileOutputFormat 类中的静态函数 setOutputPath()来指定输出路径。该函数指定了reduce 函数输出文件的写入目录。在运行任务前该目录不应该存在,否则Hadoop 会报错并拒绝运行该任务。这种预防措施是为了防止数据丢失(一个长时间运行任务的结果被意外地覆盖将是非常恼人的)。
接着,通过setMapperClass()和setReducerClass()指定map和reduce类型。
setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,正如本例所示,这两个输出类型往往相同。如果不同,map函数的输出类型则通过setMapOutputKeyClass()和setMapOutputValueClass()函数来设置。
输入的类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。
在设置定义map 和reduce 函数的类后,便可以开始运行任务。JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台。
相关推荐
实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...
在本文档中,我们将介绍一个使用MapReduce编程模型计算学生平均成绩的例子。该例子主要包括两个部分:Map部分和Reduce部分。Map部分负责将输入文件中的数据分解成小的数据块,然后将其传递给Reduce部分。Reduce部分...
在这个实验报告中,我们将探讨如何在Eclipse环境中设置和使用MapReduce编程。 首先,为了在Eclipse上编写和运行MapReduce程序,我们需要安装`hadoop-eclipse-plugin`。这个插件允许开发者直接在IDE中开发、调试和...
在“拓思爱诺大数据-第二次作业MapReduce编程”中,你将学习到如何使用MapReduce解决实际问题。首先,我们来看Hadoop的wordcount程序,这是一个经典的MapReduce示例,用于统计文本中单词出现的频率。在Map阶段,程序...
MapReduce 是一种编程模型,用于大规模数据集(大于 1TB)的并行计算。它的基本思想是将大任务分解为小任务(映射阶段),然后在多台机器上并行处理这些小任务,最后再将结果合并(化简阶段)。在这个案例中,我们...
【MapReduce初级编程实践】是大数据处理中的一项基础任务,主要应用于大规模数据集的并行计算。在这个实验中,我们关注的是如何利用MapReduce来实现文件的合并与去重操作。MapReduce是一种分布式计算模型,由Google...
通用编程模型是云计算中处理大规模数据的关键。例如,Google的MapReduce模型,它将大型数据处理任务分解为两个阶段——Map和Reduce,适合于批处理和大数据分析。Dryad是微软提出的编程系统,它允许程序员编写数据...
本文将深入探讨如何使用Java编程语言来操作Hadoop MapReduce进行基本实践,通过源码分析来理解其核心工作原理和编程模型。 MapReduce的核心思想是将大规模数据集分解成小块,然后在分布式集群上并行处理这些小块,...
本话题将深入探讨如何利用MapReduce编程模型来实现好友推荐功能。 首先,我们需要理解MapReduce的工作原理。MapReduce由两个主要阶段组成:Map阶段和Reduce阶段。在Map阶段,输入数据被分成多个块,并在集群的不同...
这些是Java和Python示例代码,用于在我的博客教程中显示Warehouse-Scale Computing中编程模型的HOWTO。 下面有五个示例,主要目的是让您亲身体验运行MapReduce并获得对MapReduce范例的更深入的了解,熟悉Apache ...
1. **MapReduce编程模型**:MapReduce的核心是Map和Reduce两个函数。Map负责将输入数据拆分成键值对,进行局部处理;Reduce则将Map阶段的结果进行聚合,生成最终结果。中间结果通过 Shuffle 和 Sort 阶段进行排序和...
在本Hadoop课程设计中,我们将探讨如何使用Java编程语言和MapReduce框架来实现一个贝叶斯文本分类器。这个项目旨在让学生理解大数据处理的基本原理,以及如何利用Hadoop生态系统来解决实际问题,特别是文本分类任务...
在分布式计算领域,MapReduce是一种广泛使用的编程模型,主要用于处理和生成大数据集。在这个场景下,我们将探讨如何利用Java实现MapReduce来进行矩阵乘法。矩阵乘法是线性代数中的基本运算,对于大规模的数据处理,...
MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系结构3.1.2 新旧MapReduce API比较3.2 MapReduce API基本概念3.2.1 序列化3.2.2 Reporter参数3.2.3 回调机制3.3 Java API解析3.3.1 ...
MapReduce编程模型是当前主流的云计算编程工具所采用的模型,其核心思想是将复杂问题拆解为Map(映射)和Reduce(化简)两个阶段。具体操作中,Map阶段将数据划分为较小的数据块,然后由计算机集群进行分布式处理。...
Java在云计算编程模型中的应用特点体现在它能适应Web上传需求,控制应用部署,实时更新和监控数据,以及自动生成使用报告。此外,Java与多种云计算平台如Cloud Foundry、Cloud Bees、Red Hat等有紧密关联,显示了...
在java 矩阵乘法的mapreduce程序实现中,主要使用了Mapper类来实现矩阵乘法的map操作。Mapper类继承自org.apache.hadoop.mapreduce.Mapper类,重写了map方法。在map方法中,程序首先获取输入文件的名称,从而区分...
在云计算编程模型中,Java的MapReduce编程模型特别重要,它是一种简化分布式编程模型,也是一种高效的并行任务调度模型,通常用于大规模数据集的并行处理。 Java在云计算编程模型中的应用特点主要体现在以下几个...
标题“MapReduce例子”可能是指一个具体的MapReduce编程示例,展示了如何使用MapReduce模型解决实际问题。MapReduce的工作流程通常包括以下几个步骤: 1. **数据分片(Input Split)**:输入的数据集被分割成多个...