本文是2009年9月为公司内部培训写得的一篇简介。
MapReduce概述
提供计算任务的自动并行化机制,使用分发-收集的并行策略,Map阶段处理(无依赖的)原始输入,Reduce阶段处理依赖关系(按Key依赖)。
架构在hadoop之上,原则上可以使用hadoop代理的所有分布式文件系统(hdfs,kfs,s3),但我们目前仅使用hdfs。
MapReduce流程
1.客户端提交MapReduce任务
2.JobTracker分发任务,依据输入文件的chunk位置信息,将相应的split分发到TaskTracker
3.Map.TaskTracker 执行Map任务
a)读取split
b)产生输出,输出按i= Hash(K) % R分发至Reduce[i]
4.Reduce.TaskTracker执行Reduce任务
a)Reduce.shuffle可与Map任务并行执行
b)甚至sort也可以和Map并行执行
c)但用户的reduce过程不能用Map并行执行
d)产生R个最终输出文件,整个MapReduce过程结束
MR矩阵与容错
NameNode处理任务调度与错误恢复,因此,在NameNode上,最基本的一个数据结构就是MR[M,R]矩阵。每个Map进程一行,每个Reduce进程一列。
|
R1
|
R2
|
R3
|
|
Rj
|
|
|
|
Rr
|
M1
|
|
|
|
|
|
|
|
|
|
M2
|
|
|
|
|
|
|
|
|
|
M..
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Mi
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Mm
|
|
|
|
|
|
|
|
|
|
每个Map的输出被分成R份,按hash规则,每个Reduce一份,这样当任何一个/多个Reduce任务失败时,重启的Reduce只需要从每个Map的输出读取自己的这一份(绿色列)。
当任何一个/多个Map任务失败——这个很简单,随后重启成功,每个Reduce进程只需要读取自己相应于该Map任务的那些数据(粉色行)。
Map
切分输入
如何切分
|
大文件(大于1 chunk)
|
小文件(小于1 chunk)
|
压缩
|
不切分
|
不切分
|
未压缩
|
按chunk切分
|
对于大文件,按chunk进行切分,切分程序需要处理chunk边界情况,例如,对于普通文本文件,每行一个记录,chunk边界通常在一行中间,切分程序必须处理这种情况,把跨chunk的记录归入前一个split,因此:
l 需要在网络上传输半条记录
l 并不是每个split的尺寸都精确地等于1chunk
对于定长记录文件,要简单一些,也可以安装这种方式来处理。
压缩文件不能进行切分,因为切分后找不到同步点(压缩头)。所以,把压缩文件的尺寸控制在1 chunk内,一方面可以提高Map的并行度,另一方面也可以减少网络传输,因为超出1 chunk的就不在第一个chunk所在的data server了。
解析(parse)记录
将输入的字节流parse成一条条记录
调用Map.map
调用用户定义的map方法
在Map.map中,除了处理、变换数据,一般还需要调用report,向框架报告进度(执行情况)。
Reduce
Reduce.shuffle
可与Map任务并行执行
Reduce.sort
也可以和Map并行执行,然而,一旦有Map任务失败重启,排序结果就作废了!
Reduce.reduce
调用用户定义的reduce函数。这个过程不能用Map并行执行,因为reduce函数需要接受每个Key对应的整个value集合(这个集合可能也是有序的——SecondarySort)。作为一个极端情况,最后完成的那个Map可能包含所有Key,并且value也是最小的!
这个过程,也可能需要向框架报告进度。
这一步将产生最终输出文件,每个Reduce进程一个,整个MapReduce过程结束。因此,MapReduce的输出总是/path/to/reuce/output/part-####,而不是一个单一的文件。这些输出有可能作为后续其它MapReduce过程的输入。
Hadoop.MapReduce接口
原生接口
旧接口(0.20以前)
@Deprecated
public interface Mapper<K1, V1, K2, V2>extends
JobConfigurable, Closeable{
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporterreporter)
throws IOException;
}
@Deprecated
public interface Reducer<K2, V2, K3,V3>
extends JobConfigurable, Closeable {
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3>output, Reporter reporter)
throws IOException;
}
新接口(0.20以后)
public classMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public abstract class Context implementsMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
{
}
protected void setup(Context context) throws IOException,InterruptedException {
// NOTHING
}
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT)value);
}
protected void cleanup(Context context) throws IOException, InterruptedException {
// NOTHING
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(),context.getCurrentValue(), context);
}
cleanup(context);
}
}
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{
public abstract class Context implements ReuceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
{
}
protected void setup(Context context) throws IOException, InterruptedException
{
// NOTHING
}
@SuppressWarnings("unchecked")
protected void reduce(KEYINkey, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT)value);
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
// NOTHING
}
@SuppressWarnings("unchecked")
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(),context.getValues(), context);
// If a back up store is used, reset it
((ReduceContext.ValueIterator)(context.getValues().iterator())).resetBackupStore();
}
cleanup(context);
}
}
新版更灵活,另一方面,默认的map和reduce都是identity,而以前版本的identity是用专门的类来表达的。
灵活性表现在context参数上,在重构中,这个叫参数提取(Introduce Parameter Object)。如果以后参数需要改变,或者需要插入新的方法,就只需要修改Parameter Object,而不需要修改接口本身。
同时,又为旧的接口提供Adapter/Bridge,以便兼容(二进制兼容+源码兼容)旧程序。
示例(WordCount)
Map
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text,Text, IntWritable> {
private final static IntWritable one =
new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporterreporter) throws IOException {
String line =value.toString();
StringTokenizer itr = newStringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
Reduce
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable,Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable>values,
OutputCollector<Text, IntWritable> output,
Reporterreporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
配置任务
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(),WordCount.class);
conf.setJobName("wordcount");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
List<String> other_args = newArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])){
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf,other_args.get(0));
FileOutputFormat.setOutputPath(conf,new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
Streaming
文本协议的接口,记录以换行符分隔,Key-Value以\t分隔。比原生接口更简单,同时,还可以进行更方便的本地测试——通过管道进行。
原则上,Streaming接口只需要遵循一条:Map.OutputKey==Reduce.InputKey
最简单的一个程序:大客户每天独立用户数计算。MapReduce的输出结果是每个用户和他对应的浏览量,再使用wc -l就可以得出总量,并且,可以从原始输出取得每个用户的浏览量。
Map(为简单起见,省略了错误处理)
#include <stdio.h>
#include <febird/trb_cxx.h>
int main(int argc, char* argv[])
{
size_t len = 0;
char* line = NULL;
using febird::trbstrmap;
trbstrmap<int> smap;
for (;;) {
ssize_t len2 = getline(&line,&len, stdin);
if (-1 == len2) break;
char* ck = strstr(line,"&xnbc=");
char* end = strstr(ck+6,"HTTP");
end[-4] = 0;
smap[ck+6]++;
}
for(trbstrmap<int>::iterator iter = smap.begin();iter; ++iter)
printf("%s\t%d\n",smap.key(iter), *iter);
if (line) free(line);
return 0;
}
这个Map的逻辑比WordCount.Map要稍微复杂一点,因为在程序中,相当于已经做了Combine,这个Combine比MapReduce本身的Combine要高效得多。
优化无止境,如果需要进一步优化,该程序可以在smap的尺寸达到一定大小时就进行输出,而不必等到处理完全部输入后再输出,这样一方面减小了内存占用,另一方面还提高了并发度——每输出一点数据框架就可以传输一点,进而shuffle、sort。
Reduce
框架传递给Reduce程序的[{key, value}]中,相等的Key总是相邻的,充分利用这一点,可以有效化简程序,并提高性能。然而要利用这一点,在编程上付出的努力就比原生接口要复杂一些,原生接口只需遍历Value集合,而使用Streaming需要自己判断相同Key的集合边界,还要处理一些其他边界问题(代码中黄色行)。
这个Reduce实际上可以作为一个通用的reduce,叫它sumlong,可以计算每个Key发生的次数,只要输入匹配key\tnum,用正则表达式就是:.*?\t[0-9]-?{1,20}
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
int main(int argc, char* argv[])
{
size_t llen = 0, klen = 0;
char *line = NULL, *key =(char*)malloc(1024);
long cnt = 0;
key[0] = 0;
for (;;) {
ssize_t len2 = getline(&line,&llen, stdin);
if (-1 == len2) {
if (klen) printf("%s\t%ld\n",key, cnt);
break;
}
char* tab = (char*)memchr(line, '\t',len2);
if (tab) {
long cnt2 = atol(tab+1);
size_t klen2 = tab - line;
if (klen2 == klen &&memcmp(line, key, klen) == 0)
cnt += cnt2;
else {
if (klen)printf("%s\t%ld\n", key, cnt);
memcpy(key, line, klen2);
key[klen2] = 0;
klen = klen2;
cnt = cnt2;
}
} // if (tab)
}
if (line) free(line);
free(key);
return 0;
}
调用脚本
#Distinct User Sum
if (($# < 1)); then
echo usage: $0 yyyy_mm_dd[_hh]
exit
elif (($# < 2)); then
outFile=/data/leipeng/big/dus/$1
else
outFile=$2
fi
hdir=/opt/hadoop
year=`echo $1 | awk-F"_" '{print $1;}'`
month=`echo $1 | awk-F"_" '{print $2;}'`
day=`echo $1 | awk -F"_"'{print $3;}'`
hour=`echo $1 | awk-F"_" '{print $4;}'`
if [[ -z $hour ]] ; then
pvd=`/opt/hadoop/bin/hadoop fs –ls \
/user/root/log_data/log_in/pv/$year/$month/$day/*/\
| awk 'NF>=8{printf("-input %s", $8)}'`
else
pvd="-input \
/user/root/log_data/log_in/pv/$year/$month/$day/$hour/pvval"
fi
echo $pvd
cd $hdir
bin/hadoop fs -rmr/test/dus/output/$1
bin/hadoop jar \
contrib/streaming/hadoop-*-streaming.jar \
-conf conf/dus.xml\
$pvd \
-output /test/dus/output/$1 \
-mapper $hdir/aa_dumap \
-reducer $hdir/aa_duadd
bin/hadoop fs -cat "/test/dus/output/$1/part*"| wc -l > $outFile
如果再加上配置文件conf/dus.xml,整个调用脚本比程序本身还要长。
本地测试命令,相当于仅一个Map和一个Reduce进程:
cat pvfiles | aa_dumap | aa_duadd [|wc –l]
如果要看所有用户的结果,而非最终统计,就不许要|wc–l。这个比原生接口的测试要简单得多!
使用其它语言
Streaming不光可以使用C/C++,任何语言都可以,比如awk,上面的程序用awk可以更简单——效率也许会低一点,但的确更简单得多得多!
awk.Map
cookie之前有&xnbc=标识,之后有|| HTTP,因此,用这两个串做字段分隔符,cookie内容正好是第二个字段$2。
程序本身不做Combine,让MapReduce框架去做,或者干脆不做。
awk '-F&xnbc=|\\|\\| HTTP''{printf("%s\t1\n", $2)}'
awk.Reduce
Reduce不需要自定义字段分隔符,默认的正好
简单的,用内存多点
awk '{km[$1]+=$2}END{for (k inkm){printf("%s\t%d\n", k, km[k]);}}'
复杂点,用内存小点
awk 'p==$1{c+=$2}p!=$1{if(p!="") printf("%s\t%d\n", p, c); p=$1; c=$2;}END{if(NR>0) printf("%s\t%d\n", p, c)}'
可读形式:
p==$1{c+=$2}
p!=$1{
if(p!="")
printf("%s\t%d\n",p, c);
p=$1;c=$2;
}
END{
# 不能漏掉最后一条,并且,空的输入必须是空的输出
if(NR>0)
printf("%s\t%d\n",p, c)
}
调用脚本的复杂性还是一样的,不过,使用awk,可以直接把awk程序写在调用脚本中,就不需要任何其它程序了。
更多控制选项
每个任务可调的参数
Name
|
Type
|
Description
|
mapred.job.id
|
String
|
The job id
|
mapred.jar
|
String
|
job.jar location in job directory
|
job.local.dir
|
String
|
The job specific shared scratch space
|
mapred.tip.id
|
String
|
The task id
|
mapred.task.id
|
String
|
The task attempt id
|
mapred.task.is.map
|
boolean
|
Is this a map task
|
mapred.task.partition
|
int
|
The id of the task within the job
|
map.input.file
|
String
|
The filename that the map is reading from
|
map.input.start
|
long
|
The offset of the start of the map input split
|
map.input.length
|
long
|
The number of bytes in the map input split
|
mapred.work.output.dir
|
String
|
The task's temporary output directory
|
其它参数是全局选项,可参考hadoop官方文档(Hadoop安装目录内)。
分享到:
相关推荐
包org.apache.hadoop.mapreduce的Hadoop源代码分析
- **新代码**:主要位于`org.apache.hadoop.mapreduce.*`,包含36,915行代码,这部分代码进行了重构,提高了代码质量和可维护性。 - 辅助类:分别位于`org.apache.hadoop.util.*`(148行)和`org.apache.hadoop.file...
《Hadoop.MapReduce.v2.Cookbook》是一本专注于Hadoop MapReduce v2(也称为YARN)的实用指南,适合那些希望深入了解和利用Hadoop处理大数据的IT专业人士。这本书籍详细介绍了如何在Hadoop 2.x环境中有效地设计、...
at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:133) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:437) at org.apache....
《Packtpub.Hadoop.MapReduce.Cookbook.Jan.2013》是2013年出版的一本专门探讨Hadoop MapReduce技术的实战指南。这本书深入浅出地介绍了如何利用Hadoop MapReduce框架来处理大数据问题,是IT行业中针对大数据处理的...
这个文件通常与Hadoop的其他组件一起使用,如Hadoop Common、Hadoop MapReduce等,确保在Windows环境下这些组件能正常运行。 在64位Windows环境中,使用64位版本的Winutils和hadoop.dll尤其重要,因为它们能更好地...
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class IntMinMax { public static class IntMapper extends Mapper, Text, IntWritable, IntWritable> { // 实现Mapper逻辑 } ...
【SpringBoot】Error: Could not find or load main class org.apache.hadoop.mapreduce.v2.app.MRAppMaster报错明细问题解决后记 报错明细 IDEA SpringBoot集成hadoop运行环境,,本地启动项目,GET请求接口触发...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
hadoop-mapreduce-examples-2.7.1.jar
这个文件对于Hadoop在Windows上的运行至关重要,因为它提供了与Hadoop生态系统交互的接口,例如HDFS(Hadoop分布式文件系统)和MapReduce的本地操作。 `winutils.exe`则是Hadoop在Windows下的实用工具集,它包含了...
这个库文件使得Windows应用程序能够与Hadoop生态系统进行交互,例如执行MapReduce任务、管理HDFS文件系统等。在Windows上运行Hadoop时,必须确保这个dll文件位于系统的PATH环境变量所指向的目录下,以便系统能够找到...
概念 MapReduce是hadoop分布式计算框架。 MapReduce意味着在计算过程中实际分为两大步,...import org.apache.hadoop.mapreduce.Mapper; // 案例:统计每一个单词出现的次数 // KEYIN - 这一行的偏移量 --- // VALUEIN
Hadoop的核心由两个主要部分组成:Hadoop Distributed File System (HDFS)和MapReduce。HDFS是分布式文件系统,而MapReduce则是用于处理和生成大数据集的编程模型。 1. **hadoop.dll**: 这是在Windows操作系统上...
org.apache.hadoop.mapreduce.server.jobtracker org.apache.hadoop.mapreduce.server.tasktracker org.apache.hadoop.mapreduce.tools org.apache.hadoop.mapreduce.v2 org.apache.hadoop.mapreduce.v2.app....
hadoop中的demo,wordcount列子用到的JAR包 用法: # 在容器里运行WordCount程序,该程序需要2个参数...hadoop jar hadoop-mapreduce-examples-2.7.1-sources.jar org.apache.hadoop.examples.WordCount input output
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper, Text, Text, IntWritable> { private final static ...
Hadoop的核心组成部分包括Hadoop Distributed File System (HDFS)和MapReduce计算模型。 在Windows上配置Hadoop通常比在Linux系统上更为复杂,因为Hadoop最初是为Linux环境设计的。winutils.exe是一个实用工具,...
windows环境下运行hadoop的mapreduce程序需要的hadoop.dll winutils.exe等文件,使用方法见解压文件,该文件对应的hadoop版本是 2.7.2 , 请注意版本一致