2. MapReduce
2.1. A Weather Dataset 一个天气数据集
数据是NCDC的数据,我们关注以下特点:
1) 数据是半格式化的
2) 目录里面存放的是从1901-2001年一个世纪的记录,是gzip压缩过的文件。
3) 以行为单位,使用ASCII格式存储,每行就是一条记录
4) 每条记录我们关注一些基本的元素,比如温度,这些数据在每条数据中都会出现,并且宽度也是固定的。
下面是一条记录的格式,为了便于显示,做了一部分调整。
Example 2-1. Format of a National Climate Data Center record
0057
332130 # USAF weather station identifier
99999 # WBAN weather station identifier
19500101 # observation date
0300 # observation time
4
+51317 # latitude (degrees x 1000)
+028783 # longitude (degrees x 1000)
FM-12
+0171 # elevation (meters)
99999
V020
320 # wind direction (degrees)
1 # quality code
N
0072
1
00450 # sky ceiling height (meters)
1 # quality code
CN
010000 # visibility distance (meters)
1 # quality code
N9
-0128 # air temperature (degrees Celsius x 10)
1 # quality code
-0139 # dew point temperature (degrees Celsius x 10)
1 # quality code
10268 # atmospheric pressure (hectopascals x 10)
1 # quality code
2.2. Analyzing the Data with Unix Tools 使用Unix工具分析数据
以分析某年份的最高温度为例,下面是一段Unix的脚本程序:
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done
这段脚本的执行过程如下:
脚本循环处理每一个压缩的年份文件,首先打印出年份,然后对每一个文件使用awk处理。Awk脚本从数据中解析出两个字段:一个air temperature,一个quality code。air temperature值加0被转换成整形。接下来查看温度数据是否有效(9999表示在NCDC数据集中丢失的值),并且检查quality code是不是可信并没有错误的。如果读取一切正常,temp将与目前的最大值比较,如果出现新的最大值,则更新当前max的值。当文件中所有行的数据都被处理之后,开始执行End程序块,并且打印出最大值。
程序执行之后将产生如下样式的输出:
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
处理结果之中,温度的值被放大了10倍。所以,1901年的温度应该是31.7度,1902年的温度应该是24.4度……
所有的,一个世纪的气象记录在一台EC2 High-CPU Extra Large Instance上耗时42分钟。
为了加速处理速度,我们将程序的某些部分进行并行执行。这在理论上是比较简单的,我们可以按照年份来在不同的处理器上执行,使用所有可用的硬件线程,但是还是有些问题:
1) 把任务切分成相同大小的块不总是那么容易的。这这种情况下,不同年份的文件大小有很大不同,这样就会导致一些过程较早的完成,尽管这些他们可以进行下一步的工作,但是总的运行时间是由耗费时间最长的文件所决定的。一种可供选择的尝试是将输入分成固定大小的块,并把它们分配给处理进程。
2) 合并单独处理出来的结果还需要进一步的处理。在这种情况下,一个年份的结果对于其他年份来说是独立的,并且可能经过联接所有的结果,并按照年份进行排序之后被合并。如果使用固定大小块的方式,合并是很脆弱的。例如,某一年份的数据可能被分到不同的块中,并且被单独处理。我们最终也会得每块数据的最高温度,但是这时候我们最后一步变成了在这些最大值中,为每一个年份找出最大值。
3) 人们仍旧被单机的处理能力所束缚。如果在一台拥有确定数量处理器的计算机上面执行程序的的开销是20分钟的话,你也不能可能再有所提高了。并且有些数据集的数据量已经超出了单台计算机的处理能力。当我们开始使用多台机器的时候,其它一大堆因素就跳了出来,主要是协调和可靠性的问题。谁掌控全局?怎么进行处理器的失效处理?
所以,尽管在理论上并行处理是可行的,但是实践上却是麻烦的。使用一个类似于Hadoop的框架将会有很大的帮助。
2.3. Analyzing the Data with Hadoop 使用Hadoop分析数据
为了使用Hadoop并行处理的长处,我们需要将程序做成MapReduce格式。经过一些本地的、小数据量的测试之后,我们将可以把程序放在集群上进行运行。
2.3.1. Map and Reduce
MapReduce将工作分为map阶段和reduce阶段,每个阶段都将键值对作为输入输入,键值对的类型可以由程序员选择。程序员还指定两个函数:map和reduce函数。
Map阶段的输入数据是NCDC的原始数据,我们选择文本格式输入,这样可以把记录中的每一行作为文本value。Key是当前行离开始行的偏移量,但是我们并不需要这个key,所以省去。
我们的Map函数比较简单,仅仅从输入中析取出temperature。从这个意义上来说,map函数仅仅是完成了数据的准备阶段,这样使得reducer函数可以基于它查找历年的最高温度。Map函数也是一个很好的过滤阶段,这里可以过滤掉丢失、置疑、错误的temperature数据。
形象化一点:下面是输入数据
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
下面的键值对给map函数处理,其中加粗的是有用的数据
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
处理之后的结果如下:
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
经过以上的处理之后还需要在mapreduce框架中进行进一步的处理,主要有排序和按照key给键值对给key-value排序。经过这一番处理之后的结果如下:
(1949, [111, 78])
(1950, [0, 22, −11])
上面的数据将传递给reduce之后,reduce所需要做的工作仅仅是遍历这些数据,找出最大值,产生最终的输出结果:
(1949, 111)
(1950, 22)
以上过程可以用下图描述:
2.3.2. Java MapReduce
Map函数实现了mapper接口,此接口声明了一个map()函数。
下面是map实现
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
Mapper是一个泛型类型,有四个参数分别代表Map函数的input key, input value, output key, output value 的类型。对于本例来说,input key是一个long integer的偏移量,input value是一行文本,output key是年份,output value是气温(整形)。除了Java的数据类型之外,Hadoop也提供了他自己的基本类型,这些类型为网络序列化做了专门的优化。可以在org.apache.hadoop.io包中找到他们。比如LongWritable相当于Java中的Long,Text相当于String而IntWritable在相当于Integer。map()方法传入一个key和一个value。我们将Text类型的value转化成Java的String,然后用String的substring方法取出我偶们需要的部分。map()方法也提供了OutputCollector的一个实例,用来写输出数据。在次情况下,我们将year封装成Text,而将temperature包装成IntWritable类型。只要在temperature值出现并且quality code表示temperature读取正常的情况下我们才进行数据的写入。
下面是Reduce函数的类似实现,仅用了一个Reducer接:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
类似的,Reducer也有四个参数来分别标记输入输出。Reduce函数的输入类型必须对应于Map函数的输出,拿本例来说,输入必须是:Text,IntWritable类型。Reduce在本例输出结果是Text和IntWritbale类型,year和与其对应的maxValue是经过遍历、比较之后得到的。
下面的一段代码执行了MapReduce工作:
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err
.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
JobConf对象构成了mapReduce job的说明,给出了job的总体控制。当执行MapReduce工作的时候我们需要将代码打包成一个jar文件(这个文件将被Hadoop在集群中分发)。我们并没有指定jar文件,但是我们传递了一个class给JobConf的构造函数,Hadoop将利用它通过查找包含这个类的jar文件而去定位相关的jar件。之后我们指定input、output路径。
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileInputFormat的静态方法addInputPath来添加input path,input path可以是文件名或者目录,如果是目录的话,在目录下面的文件都会作为输入。addInputPath可以调用多次。
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
FileOutputFormat的setOutput Path()方法指定output path。这个目录在运行job之前是不应该存在的,这样可以阻止数据丢失。
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
指定了mapper和reducer类。
conf.setOutputKeyClass(Text.class);设置output key类型
conf.setOutputValueClass(IntWritable.class);设置output value类型
一般map和reduce的key、value类型都是一样的,如果不一样的话可以调用setMapOutputKeyClass() 和 setMapOutputValueClass()来设置。
输入类型由input format控制,本例使用的是默认的Text格式,所以没有显式指定。
JobClient.runJob(conf);提交工作,等待工作完成。
2.4. The New Java MapReduce API
0.20.0版本的Hadoop新增了一个Context Object,为API将来进化做准备。新旧api不兼容,要想使用新api的特性,程序需要重写。
主要有以下几处重大改进:
1)The new API favors abstract classes over interfaces, since these are easier to evolve.
For example, you can add a method (with a default implementation) to an abstract
class without breaking old implementations of the class. In the new API, the
Mapper and Reducer interfaces are now abstract classes.
2) The new API is in the org.apache.hadoop.mapreduce package (and subpackages).
The old API is found in org.apache.hadoop.mapred.
3) The new API makes extensive use of context objects that allow the user code to
communicate with the MapReduce system. The MapContext, for example, essentially
unifies the role of the JobConf, the OutputCollector, and the Reporter.
4)The new API supports both a “push” and a “pull” style of iteration. In both APIs,
key-value record pairs are pushed to the mapper, but in addition, the new API
allows a mapper to pull records from within the map() method. The same goes for
the reducer. An example of how the “pull” style can be useful is processing records
in batches, rather than one by one.
5)Configuration has been unified. The old API has a special JobConf object for job
configuration, which is an extension of Hadoop’s vanilla Configuration object
(used for configuring daemons; see “The Configuration API” on page 116). In the
new API, this distinction is dropped, so job configuration is done through a
Configuration.
6)Job control is performed through the Job class, rather than JobClient, which no
longer exists in the new API.
使用新api重写的程序如下:
public class NewMaxTemperature {
static class NewMaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
static class NewMaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err
.println("Usage: NewMaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(NewMaxTemperatureMapper.class);
job.setReducerClass(NewMaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.5. Scaling Out
2.5.1. DataFlow数据流
1)MapReduce Job:客户端要处理的一批工作,包括input data、mapreduce程序、配置信息。
2)Hadoop工作分为map task和reduce task两种。
3)有两种节点控制job运行,一种是jobtracker,一种是tasktracker。Jobtracker通过调度tasktracker协调所有工作的执行。Tasktracker运行任务并将报告发送给jobtracker,jobtracker所有工作的进度。如果一个任务失败,jobtracker再重新调度一个不同的tasktracker进行工作。
4)splits,input splits。Hadoop将输入划分成固定大小的块,这些块就叫splits。分块不能太大,也不能太小,一般是64MB,也就是HDFS默认的块大小。
5)data locality map处理本机HDFS的数据,不经过网络。
6)Map将输出写到本地磁盘,没有写到HDFS中。
7)reduce task没有data locality优势
下面是mapreduce的几种执行方式:
MapReduce data flow with a single reduce task
MapReduce data flow with multiple reduce tasks
MapReduce data flow with no reduce task
2.5.2. Combiner Functions
Combiner将map出来的中间数据进行处理,减少网络传输量。
指定combiner方法:conf.setCombinerClass(MaxTemperatureReducer.class);
2.6. Hadoop streaming(略)
2.7. Hadoop pipes(略)
2.1. A Weather Dataset 一个天气数据集
数据是NCDC的数据,我们关注以下特点:
1) 数据是半格式化的
2) 目录里面存放的是从1901-2001年一个世纪的记录,是gzip压缩过的文件。
3) 以行为单位,使用ASCII格式存储,每行就是一条记录
4) 每条记录我们关注一些基本的元素,比如温度,这些数据在每条数据中都会出现,并且宽度也是固定的。
下面是一条记录的格式,为了便于显示,做了一部分调整。
Example 2-1. Format of a National Climate Data Center record
0057
332130 # USAF weather station identifier
99999 # WBAN weather station identifier
19500101 # observation date
0300 # observation time
4
+51317 # latitude (degrees x 1000)
+028783 # longitude (degrees x 1000)
FM-12
+0171 # elevation (meters)
99999
V020
320 # wind direction (degrees)
1 # quality code
N
0072
1
00450 # sky ceiling height (meters)
1 # quality code
CN
010000 # visibility distance (meters)
1 # quality code
N9
-0128 # air temperature (degrees Celsius x 10)
1 # quality code
-0139 # dew point temperature (degrees Celsius x 10)
1 # quality code
10268 # atmospheric pressure (hectopascals x 10)
1 # quality code
2.2. Analyzing the Data with Unix Tools 使用Unix工具分析数据
以分析某年份的最高温度为例,下面是一段Unix的脚本程序:
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done
这段脚本的执行过程如下:
脚本循环处理每一个压缩的年份文件,首先打印出年份,然后对每一个文件使用awk处理。Awk脚本从数据中解析出两个字段:一个air temperature,一个quality code。air temperature值加0被转换成整形。接下来查看温度数据是否有效(9999表示在NCDC数据集中丢失的值),并且检查quality code是不是可信并没有错误的。如果读取一切正常,temp将与目前的最大值比较,如果出现新的最大值,则更新当前max的值。当文件中所有行的数据都被处理之后,开始执行End程序块,并且打印出最大值。
程序执行之后将产生如下样式的输出:
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
处理结果之中,温度的值被放大了10倍。所以,1901年的温度应该是31.7度,1902年的温度应该是24.4度……
所有的,一个世纪的气象记录在一台EC2 High-CPU Extra Large Instance上耗时42分钟。
为了加速处理速度,我们将程序的某些部分进行并行执行。这在理论上是比较简单的,我们可以按照年份来在不同的处理器上执行,使用所有可用的硬件线程,但是还是有些问题:
1) 把任务切分成相同大小的块不总是那么容易的。这这种情况下,不同年份的文件大小有很大不同,这样就会导致一些过程较早的完成,尽管这些他们可以进行下一步的工作,但是总的运行时间是由耗费时间最长的文件所决定的。一种可供选择的尝试是将输入分成固定大小的块,并把它们分配给处理进程。
2) 合并单独处理出来的结果还需要进一步的处理。在这种情况下,一个年份的结果对于其他年份来说是独立的,并且可能经过联接所有的结果,并按照年份进行排序之后被合并。如果使用固定大小块的方式,合并是很脆弱的。例如,某一年份的数据可能被分到不同的块中,并且被单独处理。我们最终也会得每块数据的最高温度,但是这时候我们最后一步变成了在这些最大值中,为每一个年份找出最大值。
3) 人们仍旧被单机的处理能力所束缚。如果在一台拥有确定数量处理器的计算机上面执行程序的的开销是20分钟的话,你也不能可能再有所提高了。并且有些数据集的数据量已经超出了单台计算机的处理能力。当我们开始使用多台机器的时候,其它一大堆因素就跳了出来,主要是协调和可靠性的问题。谁掌控全局?怎么进行处理器的失效处理?
所以,尽管在理论上并行处理是可行的,但是实践上却是麻烦的。使用一个类似于Hadoop的框架将会有很大的帮助。
2.3. Analyzing the Data with Hadoop 使用Hadoop分析数据
为了使用Hadoop并行处理的长处,我们需要将程序做成MapReduce格式。经过一些本地的、小数据量的测试之后,我们将可以把程序放在集群上进行运行。
2.3.1. Map and Reduce
MapReduce将工作分为map阶段和reduce阶段,每个阶段都将键值对作为输入输入,键值对的类型可以由程序员选择。程序员还指定两个函数:map和reduce函数。
Map阶段的输入数据是NCDC的原始数据,我们选择文本格式输入,这样可以把记录中的每一行作为文本value。Key是当前行离开始行的偏移量,但是我们并不需要这个key,所以省去。
我们的Map函数比较简单,仅仅从输入中析取出temperature。从这个意义上来说,map函数仅仅是完成了数据的准备阶段,这样使得reducer函数可以基于它查找历年的最高温度。Map函数也是一个很好的过滤阶段,这里可以过滤掉丢失、置疑、错误的temperature数据。
形象化一点:下面是输入数据
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
下面的键值对给map函数处理,其中加粗的是有用的数据
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
处理之后的结果如下:
(1950, 0)
(1950, 22)
(1950, −11)
(1949, 111)
(1949, 78)
经过以上的处理之后还需要在mapreduce框架中进行进一步的处理,主要有排序和按照key给键值对给key-value排序。经过这一番处理之后的结果如下:
(1949, [111, 78])
(1950, [0, 22, −11])
上面的数据将传递给reduce之后,reduce所需要做的工作仅仅是遍历这些数据,找出最大值,产生最终的输出结果:
(1949, 111)
(1950, 22)
以上过程可以用下图描述:
2.3.2. Java MapReduce
Map函数实现了mapper接口,此接口声明了一个map()函数。
下面是map实现
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
Mapper是一个泛型类型,有四个参数分别代表Map函数的input key, input value, output key, output value 的类型。对于本例来说,input key是一个long integer的偏移量,input value是一行文本,output key是年份,output value是气温(整形)。除了Java的数据类型之外,Hadoop也提供了他自己的基本类型,这些类型为网络序列化做了专门的优化。可以在org.apache.hadoop.io包中找到他们。比如LongWritable相当于Java中的Long,Text相当于String而IntWritable在相当于Integer。map()方法传入一个key和一个value。我们将Text类型的value转化成Java的String,然后用String的substring方法取出我偶们需要的部分。map()方法也提供了OutputCollector的一个实例,用来写输出数据。在次情况下,我们将year封装成Text,而将temperature包装成IntWritable类型。只要在temperature值出现并且quality code表示temperature读取正常的情况下我们才进行数据的写入。
下面是Reduce函数的类似实现,仅用了一个Reducer接:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
类似的,Reducer也有四个参数来分别标记输入输出。Reduce函数的输入类型必须对应于Map函数的输出,拿本例来说,输入必须是:Text,IntWritable类型。Reduce在本例输出结果是Text和IntWritbale类型,year和与其对应的maxValue是经过遍历、比较之后得到的。
下面的一段代码执行了MapReduce工作:
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err
.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
JobConf对象构成了mapReduce job的说明,给出了job的总体控制。当执行MapReduce工作的时候我们需要将代码打包成一个jar文件(这个文件将被Hadoop在集群中分发)。我们并没有指定jar文件,但是我们传递了一个class给JobConf的构造函数,Hadoop将利用它通过查找包含这个类的jar文件而去定位相关的jar件。之后我们指定input、output路径。
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileInputFormat的静态方法addInputPath来添加input path,input path可以是文件名或者目录,如果是目录的话,在目录下面的文件都会作为输入。addInputPath可以调用多次。
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
FileOutputFormat的setOutput Path()方法指定output path。这个目录在运行job之前是不应该存在的,这样可以阻止数据丢失。
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
指定了mapper和reducer类。
conf.setOutputKeyClass(Text.class);设置output key类型
conf.setOutputValueClass(IntWritable.class);设置output value类型
一般map和reduce的key、value类型都是一样的,如果不一样的话可以调用setMapOutputKeyClass() 和 setMapOutputValueClass()来设置。
输入类型由input format控制,本例使用的是默认的Text格式,所以没有显式指定。
JobClient.runJob(conf);提交工作,等待工作完成。
2.4. The New Java MapReduce API
0.20.0版本的Hadoop新增了一个Context Object,为API将来进化做准备。新旧api不兼容,要想使用新api的特性,程序需要重写。
主要有以下几处重大改进:
1)The new API favors abstract classes over interfaces, since these are easier to evolve.
For example, you can add a method (with a default implementation) to an abstract
class without breaking old implementations of the class. In the new API, the
Mapper and Reducer interfaces are now abstract classes.
2) The new API is in the org.apache.hadoop.mapreduce package (and subpackages).
The old API is found in org.apache.hadoop.mapred.
3) The new API makes extensive use of context objects that allow the user code to
communicate with the MapReduce system. The MapContext, for example, essentially
unifies the role of the JobConf, the OutputCollector, and the Reporter.
4)The new API supports both a “push” and a “pull” style of iteration. In both APIs,
key-value record pairs are pushed to the mapper, but in addition, the new API
allows a mapper to pull records from within the map() method. The same goes for
the reducer. An example of how the “pull” style can be useful is processing records
in batches, rather than one by one.
5)Configuration has been unified. The old API has a special JobConf object for job
configuration, which is an extension of Hadoop’s vanilla Configuration object
(used for configuring daemons; see “The Configuration API” on page 116). In the
new API, this distinction is dropped, so job configuration is done through a
Configuration.
6)Job control is performed through the Job class, rather than JobClient, which no
longer exists in the new API.
使用新api重写的程序如下:
public class NewMaxTemperature {
static class NewMaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
static class NewMaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err
.println("Usage: NewMaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(NewMaxTemperatureMapper.class);
job.setReducerClass(NewMaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.5. Scaling Out
2.5.1. DataFlow数据流
1)MapReduce Job:客户端要处理的一批工作,包括input data、mapreduce程序、配置信息。
2)Hadoop工作分为map task和reduce task两种。
3)有两种节点控制job运行,一种是jobtracker,一种是tasktracker。Jobtracker通过调度tasktracker协调所有工作的执行。Tasktracker运行任务并将报告发送给jobtracker,jobtracker所有工作的进度。如果一个任务失败,jobtracker再重新调度一个不同的tasktracker进行工作。
4)splits,input splits。Hadoop将输入划分成固定大小的块,这些块就叫splits。分块不能太大,也不能太小,一般是64MB,也就是HDFS默认的块大小。
5)data locality map处理本机HDFS的数据,不经过网络。
6)Map将输出写到本地磁盘,没有写到HDFS中。
7)reduce task没有data locality优势
下面是mapreduce的几种执行方式:
MapReduce data flow with a single reduce task
MapReduce data flow with multiple reduce tasks
MapReduce data flow with no reduce task
2.5.2. Combiner Functions
Combiner将map出来的中间数据进行处理,减少网络传输量。
指定combiner方法:conf.setCombinerClass(MaxTemperatureReducer.class);
2.6. Hadoop streaming(略)
2.7. Hadoop pipes(略)
相关推荐
包org.apache.hadoop.mapreduce的Hadoop源代码分析
总之,《Hadoop.MapReduce.v2.Cookbook》是一本全面的实战指南,涵盖了从基础概念到高级应用的各个层面,对于想要在Hadoop 2.x平台上利用MapReduce进行大数据处理的开发者和数据工程师来说,是一份不可多得的参考...
- **新代码**:主要位于`org.apache.hadoop.mapreduce.*`,包含36,915行代码,这部分代码进行了重构,提高了代码质量和可维护性。 - 辅助类:分别位于`org.apache.hadoop.util.*`(148行)和`org.apache.hadoop.file...
【SpringBoot】Error: Could not find or load main class org.apache.hadoop.mapreduce.v2.app.MRAppMaster报错明细问题解决后记 报错明细 IDEA SpringBoot集成hadoop运行环境,,本地启动项目,GET请求接口触发...
2. 编写或获取MapReduce程序的Java源码。 3. 解决编译错误,确保所有需要的jar包都已导入。 4. 将编写好的程序打包成JAR文件。 5. 将JAR文件上传至Hadoop集群中的某个节点。 6. 准备输入数据文件,并上传至HDFS的...
2. **MapReduce算法设计** - **局部聚合**:讨论如何在Mapper阶段进行局部聚合操作,从而减少后续Reducer阶段的数据量。 - **配对与条纹化**:介绍如何通过特定的映射策略来优化数据处理流程。 - **相对频率计算*...
2. MapReduce作业提交与执行流程:一个MapReduce作业的执行涉及客户端提交作业、JobTracker分配任务、TaskTracker执行任务和处理结果输出等多个步骤。客户端需要定义map和reduce函数,提交作业到Hadoop集群。...
2. MapReduce和Hadoop:介绍了MapReduce的历史及其与Hadoop的关系。Hadoop是一个开源框架,支持分布式存储和计算,MapReduce是Hadoop的核心组件之一。 3. MapReduce案例分析:通过“Hadoop Example: WordCount”...
2. MapReduce体系结构 MapReduce的体系结构主要由三个部分组成:Client、Master和Slave。Client负责提交作业,Master负责作业调度和任务分配,Slave负责执行任务。Master Node主要负责作业调度、任务分配和失败恢复...
org.apache.hadoop.mapreduce.server.tasktracker org.apache.hadoop.mapreduce.tools org.apache.hadoop.mapreduce.v2 org.apache.hadoop.mapreduce.v2.app.webapp.dao org.apache.hadoop.mapreduce.v2.hs....
1. Hadoop简介2. Hadoop安装配置3. MapReduce计算模型4. MapReduce应用程序5. MapReduce应用案例6. MapReduce工作机制7. Hadoop I/O操作8. 下一代MapReduce: Yarn9. HDFS简介10. HDFS文件结构11. Hive详解12. HBase...
主要的测试类包括`MiniMRCluster`、`MiniDFSCluster`以及`org.apache.hadoop.mapreduce.lib.map.WrappedMapper`和`org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer`。 1. MiniMRCluster:这是一个用于本地...
藏经阁-Apache Hadoop 3.0_ What’s new in YARN & MapReduce.pdf Apache Hadoop 3.0 版本中,YARN(Yet Another Resource Negotiator)和 MapReduce 组件发生了许多变化。本文将对这些变化进行详细的介绍和分析。 ...
2. MapReduce算法设计:详细阐述了算法设计的各个重要方面,比如局部聚合(Local Aggregation)、键值对的处理(Pairs and Stripes)、相对频率的计算(Computing Relative Frequencies)、次级排序(Secondary ...