`
febird
  • 浏览: 256561 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop.MapReduce.简介

 
阅读更多

本文是2009年9月为公司内部培训写得的一篇简介。


MapReduce概述

提供计算任务的自动并行化机制,使用分发-收集的并行策略,Map阶段处理(无依赖的)原始输入,Reduce阶段处理依赖关系(按Key依赖)。

架构在hadoop之上,原则上可以使用hadoop代理的所有分布式文件系统(hdfs,kfs,s3),但我们目前仅使用hdfs。

MapReduce流程

1.客户端提交MapReduce任务

2.JobTracker分发任务,依据输入文件的chunk位置信息,将相应的split分发到TaskTracker

3.Map.TaskTracker 执行Map任务

a)读取split

b)产生输出,输出按i= Hash(K) % R分发至Reduce[i]

4.Reduce.TaskTracker执行Reduce任务

a)Reduce.shuffle可与Map任务并行执行

b)甚至sort也可以和Map并行执行

c)但用户的reduce过程不能用Map并行执行

d)产生R个最终输出文件,整个MapReduce过程结束

MR矩阵与容错

NameNode处理任务调度与错误恢复,因此,在NameNode上,最基本的一个数据结构就是MR[M,R]矩阵。每个Map进程一行,每个Reduce进程一列。

R1

R2

R3

Rj

Rr

M1

M2

M..

Mi

Mm

每个Map的输出被分成R份,按hash规则,每个Reduce一份,这样当任何一个/多个Reduce任务失败时,重启的Reduce只需要从每个Map的输出读取自己的这一份(绿色列)。

当任何一个/多个Map任务失败——这个很简单,随后重启成功,每个Reduce进程只需要读取自己相应于该Map任务的那些数据(粉色行)。

Map

切分输入

如何切分

大文件(大于1 chunk)

小文件(小于1 chunk)

压缩

不切分

不切分

未压缩

按chunk切分

对于大文件,按chunk进行切分,切分程序需要处理chunk边界情况,例如,对于普通文本文件,每行一个记录,chunk边界通常在一行中间,切分程序必须处理这种情况,把跨chunk的记录归入前一个split,因此:

l 需要在网络上传输半条记录

l 并不是每个split的尺寸都精确地等于1chunk

对于定长记录文件,要简单一些,也可以安装这种方式来处理。

压缩文件不能进行切分,因为切分后找不到同步点(压缩头)。所以,把压缩文件的尺寸控制在1 chunk内,一方面可以提高Map的并行度,另一方面也可以减少网络传输,因为超出1 chunk的就不在第一个chunk所在的data server了。

解析(parse)记录

将输入的字节流parse成一条条记录

调用Map.map

调用用户定义的map方法

在Map.map中,除了处理、变换数据,一般还需要调用report,向框架报告进度(执行情况)。

Reduce

Reduce.shuffle

可与Map任务并行执行

Reduce.sort

也可以和Map并行执行,然而,一旦有Map任务失败重启,排序结果就作废了!

Reduce.reduce

调用用户定义的reduce函数。这个过程不能用Map并行执行,因为reduce函数需要接受每个Key对应的整个value集合(这个集合可能也是有序的——SecondarySort)。作为一个极端情况,最后完成的那个Map可能包含所有Key,并且value也是最小的!

这个过程,也可能需要向框架报告进度。

这一步将产生最终输出文件,每个Reduce进程一个,整个MapReduce过程结束。因此,MapReduce的输出总是/path/to/reuce/output/part-####,而不是一个单一的文件。这些输出有可能作为后续其它MapReduce过程的输入。

Hadoop.MapReduce接口

原生接口

旧接口(0.20以前)

@Deprecated

public interface Mapper<K1, V1, K2, V2>extends JobConfigurable, Closeable{

void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporterreporter)

throws IOException;

}

@Deprecated

public interface Reducer<K2, V2, K3,V3> extends JobConfigurable, Closeable {

void reduce(K2 key, Iterator<V2> values,

OutputCollector<K3, V3>output, Reporter reporter)

throws IOException;

}

新接口(0.20以后)

public classMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

public abstract class Context implementsMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

{

}

protected void setup(Context context) throws IOException,InterruptedException {

// NOTHING

}

@SuppressWarnings("unchecked")

protected void map(KEYIN key, VALUEIN value,Context context)

throws IOException, InterruptedException {

context.write((KEYOUT) key, (VALUEOUT)value);

}

protected void cleanup(Context context) throws IOException, InterruptedException {

// NOTHING

}

public void run(Context context) throws IOException, InterruptedException {

setup(context);

while (context.nextKeyValue()) {

map(context.getCurrentKey(),context.getCurrentValue(), context);

}

cleanup(context);

}

}

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{

public abstract class Context implements ReuceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

{

}

protected void setup(Context context) throws IOException, InterruptedException

{

// NOTHING

}

@SuppressWarnings("unchecked")

protected void reduce(KEYINkey, Iterable<VALUEIN> values, Context context)

throws IOException, InterruptedException {

for(VALUEIN value: values) {

context.write((KEYOUT) key, (VALUEOUT)value);

}

}

protected void cleanup(Context context) throws IOException, InterruptedException {

// NOTHING

}

@SuppressWarnings("unchecked")

public void run(Context context) throws IOException, InterruptedException {

setup(context);

while (context.nextKey()) {

reduce(context.getCurrentKey(),context.getValues(), context);

// If a back up store is used, reset it

((ReduceContext.ValueIterator)(context.getValues().iterator())).resetBackupStore();

}

cleanup(context);

}

}

新版更灵活,另一方面,默认的map和reduce都是identity,而以前版本的identity是用专门的类来表达的。

灵活性表现在context参数上,在重构中,这个叫参数提取(Introduce Parameter Object)。如果以后参数需要改变,或者需要插入新的方法,就只需要修改Parameter Object,而不需要修改接口本身。

同时,又为旧的接口提供Adapter/Bridge,以便兼容(二进制兼容+源码兼容)旧程序。

示例(WordCount)

Map

public static class MapClass extends MapReduceBase

implements Mapper<LongWritable, Text,Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,

OutputCollector<Text, IntWritable> output,

Reporterreporter) throws IOException {

String line =value.toString();

StringTokenizer itr = newStringTokenizer(line);

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

output.collect(word, one);

}

}

}

Reduce

public static class Reduce extends MapReduceBase

implements Reducer<Text, IntWritable,Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable>values,

OutputCollector<Text, IntWritable> output,

Reporterreporter) throws IOException {

int sum = 0;

while (values.hasNext()) {

sum += values.next().get();

}

output.collect(key, new IntWritable(sum));

}

}

配置任务

public int run(String[] args) throws Exception {

JobConf conf = new JobConf(getConf(),WordCount.class);

conf.setJobName("wordcount");

// the keys are words (strings)

conf.setOutputKeyClass(Text.class);

// the values are counts (ints)

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);

List<String> other_args = newArrayList<String>();

for(int i=0; i < args.length; ++i) {

try {

if ("-m".equals(args[i])) {

conf.setNumMapTasks(Integer.parseInt(args[++i]));

} else if ("-r".equals(args[i])){

conf.setNumReduceTasks(Integer.parseInt(args[++i]));

} else {

other_args.add(args[i]);

}

} catch (NumberFormatException except) {

System.out.println("ERROR: Integer expected instead of " + args[i]);

return printUsage();

} catch (ArrayIndexOutOfBoundsException except) {

System.out.println("ERROR: Required parameter missing from " +

args[i-1]);

return printUsage();

}

}

// Make sure there are exactly 2 parameters left.

if (other_args.size() != 2) {

System.out.println("ERROR: Wrong number of parameters: " +

other_args.size() + " instead of 2.");

return printUsage();

}

FileInputFormat.setInputPaths(conf,other_args.get(0));

FileOutputFormat.setOutputPath(conf,new Path(other_args.get(1)));

JobClient.runJob(conf);

return 0;

}

Streaming

文本协议的接口,记录以换行符分隔,Key-Value以\t分隔。比原生接口更简单,同时,还可以进行更方便的本地测试——通过管道进行。

原则上,Streaming接口只需要遵循一条:Map.OutputKey==Reduce.InputKey

最简单的一个程序:大客户每天独立用户数计算。MapReduce的输出结果是每个用户和他对应的浏览量,再使用wc -l就可以得出总量,并且,可以从原始输出取得每个用户的浏览量。

Map(为简单起见,省略了错误处理)

#include <stdio.h>

#include <febird/trb_cxx.h>

int main(int argc, char* argv[])

{

size_t len = 0;

char* line = NULL;

using febird::trbstrmap;

trbstrmap<int> smap;

for (;;) {

ssize_t len2 = getline(&line,&len, stdin);

if (-1 == len2) break;

char* ck = strstr(line,"&xnbc=");

char* end = strstr(ck+6,"HTTP");

end[-4] = 0;

smap[ck+6]++;

}

for(trbstrmap<int>::iterator iter = smap.begin();iter; ++iter)

printf("%s\t%d\n",smap.key(iter), *iter);

if (line) free(line);

return 0;

}

这个Map的逻辑比WordCount.Map要稍微复杂一点,因为在程序中,相当于已经做了Combine,这个Combine比MapReduce本身的Combine要高效得多。

优化无止境,如果需要进一步优化,该程序可以在smap的尺寸达到一定大小时就进行输出,而不必等到处理完全部输入后再输出,这样一方面减小了内存占用,另一方面还提高了并发度——每输出一点数据框架就可以传输一点,进而shuffle、sort。

Reduce

框架传递给Reduce程序的[{key, value}]中,相等的Key总是相邻的,充分利用这一点,可以有效化简程序,并提高性能。然而要利用这一点,在编程上付出的努力就比原生接口要复杂一些,原生接口只需遍历Value集合,而使用Streaming需要自己判断相同Key的集合边界,还要处理一些其他边界问题(代码中黄色行)。

这个Reduce实际上可以作为一个通用的reduce,叫它sumlong,可以计算每个Key发生的次数,只要输入匹配key\tnum,用正则表达式就是:.*?\t[0-9]-?{1,20}

#include <stdio.h>

#include <string.h>

#include <stdlib.h>

int main(int argc, char* argv[])

{

size_t llen = 0, klen = 0;

char *line = NULL, *key =(char*)malloc(1024);

long cnt = 0;

key[0] = 0;

for (;;) {

ssize_t len2 = getline(&line,&llen, stdin);

if (-1 == len2) {

if (klen) printf("%s\t%ld\n",key, cnt);

break;

}

char* tab = (char*)memchr(line, '\t',len2);

if (tab) {

long cnt2 = atol(tab+1);

size_t klen2 = tab - line;

if (klen2 == klen &&memcmp(line, key, klen) == 0)

cnt += cnt2;

else {

if (klen)printf("%s\t%ld\n", key, cnt);

memcpy(key, line, klen2);

key[klen2] = 0;

klen = klen2;

cnt = cnt2;

}

} // if (tab)

}

if (line) free(line);

free(key);

return 0;

}

调用脚本

#Distinct User Sum

if (($# < 1)); then

echo usage: $0 yyyy_mm_dd[_hh]

exit

elif (($# < 2)); then

outFile=/data/leipeng/big/dus/$1

else

outFile=$2

fi

hdir=/opt/hadoop

year=`echo $1 | awk-F"_" '{print $1;}'`

month=`echo $1 | awk-F"_" '{print $2;}'`

day=`echo $1 | awk -F"_"'{print $3;}'`

hour=`echo $1 | awk-F"_" '{print $4;}'`

if [[ -z $hour ]] ; then

pvd=`/opt/hadoop/bin/hadoop fs –ls \
/user/root/log_data/log_in/pv/$year/$month/$day/*/\
| awk 'NF>=8{printf("-input %s", $8)}'`

else

pvd="-input \
/user/root/log_data/log_in/pv/$year/$month/$day/$hour/pvval"

fi

echo $pvd

cd $hdir

bin/hadoop fs -rmr/test/dus/output/$1

bin/hadoop jar \

contrib/streaming/hadoop-*-streaming.jar \

-conf conf/dus.xml\

$pvd \

-output /test/dus/output/$1 \

-mapper $hdir/aa_dumap \

-reducer $hdir/aa_duadd

bin/hadoop fs -cat "/test/dus/output/$1/part*"| wc -l > $outFile

如果再加上配置文件conf/dus.xml,整个调用脚本比程序本身还要长。

本地测试命令,相当于仅一个Map和一个Reduce进程:

cat pvfiles | aa_dumap | aa_duadd [|wc –l]

如果要看所有用户的结果,而非最终统计,就不许要|wc–l。这个比原生接口的测试要简单得多!

使用其它语言

Streaming不光可以使用C/C++,任何语言都可以,比如awk,上面的程序用awk可以更简单——效率也许会低一点,但的确更简单得多得多!

awk.Map

cookie之前有&xnbc=标识,之后有|| HTTP,因此,用这两个串做字段分隔符,cookie内容正好是第二个字段$2。

程序本身不做Combine,让MapReduce框架去做,或者干脆不做。

awk '-F&xnbc=|\\|\\| HTTP''{printf("%s\t1\n", $2)}'

awk.Reduce

Reduce不需要自定义字段分隔符,默认的正好

简单的,用内存多点

awk '{km[$1]+=$2}END{for (k inkm){printf("%s\t%d\n", k, km[k]);}}'

复杂点,用内存小点

awk 'p==$1{c+=$2}p!=$1{if(p!="") printf("%s\t%d\n", p, c); p=$1; c=$2;}END{if(NR>0) printf("%s\t%d\n", p, c)}'

可读形式:

p==$1{c+=$2}

p!=$1{

if(p!="")

printf("%s\t%d\n",p, c);

p=$1;c=$2;

}

END{

# 不能漏掉最后一条,并且,空的输入必须是空的输出

if(NR>0)

printf("%s\t%d\n",p, c)

}

调用脚本的复杂性还是一样的,不过,使用awk,可以直接把awk程序写在调用脚本中,就不需要任何其它程序了。

更多控制选项

每个任务可调的参数

Name

Type

Description

mapred.job.id

String

The job id

mapred.jar

String

job.jar location in job directory

job.local.dir

String

The job specific shared scratch space

mapred.tip.id

String

The task id

mapred.task.id

String

The task attempt id

mapred.task.is.map

boolean

Is this a map task

mapred.task.partition

int

The id of the task within the job

map.input.file

String

The filename that the map is reading from

map.input.start

long

The offset of the start of the map input split

map.input.length

long

The number of bytes in the map input split

mapred.work.output.dir

String

The task's temporary output directory

其它参数是全局选项,可参考hadoop官方文档(Hadoop安装目录内)。

分享到:
评论

相关推荐

    Hadoop源代码分析(包org.apache.hadoop.mapreduce)

    包org.apache.hadoop.mapreduce的Hadoop源代码分析

    Hadoop.MapReduce.分析

    - **新代码**:主要位于`org.apache.hadoop.mapreduce.*`,包含36,915行代码,这部分代码进行了重构,提高了代码质量和可维护性。 - 辅助类:分别位于`org.apache.hadoop.util.*`(148行)和`org.apache.hadoop.file...

    Hadoop.MapReduce.v2.Cookbook pdf

    《Hadoop.MapReduce.v2.Cookbook》是一本专注于Hadoop MapReduce v2(也称为YARN)的实用指南,适合那些希望深入了解和利用Hadoop处理大数据的IT专业人士。这本书籍详细介绍了如何在Hadoop 2.x环境中有效地设计、...

    hadoop-2.6.0-hadoop.dll-winutils.exe

     at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:133)  at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:437)  at org.apache....

    Packtpub.Hadoop.MapReduce.Cookbook.Jan.2013

    《Packtpub.Hadoop.MapReduce.Cookbook.Jan.2013》是2013年出版的一本专门探讨Hadoop MapReduce技术的实战指南。这本书深入浅出地介绍了如何利用Hadoop MapReduce框架来处理大数据问题,是IT行业中针对大数据处理的...

    hadoop2.6.0插件+64位winutils+hadoop.dll

    这个文件通常与Hadoop的其他组件一起使用,如Hadoop Common、Hadoop MapReduce等,确保在Windows环境下这些组件能正常运行。 在64位Windows环境中,使用64位版本的Winutils和hadoop.dll尤其重要,因为它们能更好地...

    java操作hadoop之mapreduce计算整数的最大值和最小值实战源码

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class IntMinMax { public static class IntMapper extends Mapper, Text, IntWritable, IntWritable&gt; { // 实现Mapper逻辑 } ...

    【SpringBoot】Error: Could not find or load main class org.apache.hadoop.mapreduce.v2.app.MRAppMaster

    【SpringBoot】Error: Could not find or load main class org.apache.hadoop.mapreduce.v2.app.MRAppMaster报错明细问题解决后记 报错明细 IDEA SpringBoot集成hadoop运行环境,,本地启动项目,GET请求接口触发...

    hadoop-mapreduce-client-jobclient-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...

    hadoop-mapreduce-examples-2.7.1.jar

    hadoop-mapreduce-examples-2.7.1.jar

    hadoop2.7.0版本的hadoop.dll,winutils.exe,X64下编译

    这个文件对于Hadoop在Windows上的运行至关重要,因为它提供了与Hadoop生态系统交互的接口,例如HDFS(Hadoop分布式文件系统)和MapReduce的本地操作。 `winutils.exe`则是Hadoop在Windows下的实用工具集,它包含了...

    win10下hadoo2.7.2的hadoop.dll和winutils.exe

    这个库文件使得Windows应用程序能够与Hadoop生态系统进行交互,例如执行MapReduce任务、管理HDFS文件系统等。在Windows上运行Hadoop时,必须确保这个dll文件位于系统的PATH环境变量所指向的目录下,以便系统能够找到...

    Hadoop中MapReduce基本案例及代码(一)

    概念 MapReduce是hadoop分布式计算框架。 MapReduce意味着在计算过程中实际分为两大步,...import org.apache.hadoop.mapreduce.Mapper; // 案例:统计每一个单词出现的次数 // KEYIN - 这一行的偏移量 --- // VALUEIN

    hadoop.dll & winutils.exe For hadoop-3.0.0

    Hadoop的核心由两个主要部分组成:Hadoop Distributed File System (HDFS)和MapReduce。HDFS是分布式文件系统,而MapReduce则是用于处理和生成大数据集的编程模型。 1. **hadoop.dll**: 这是在Windows操作系统上...

    hadoop0.23.9离线api

    org.apache.hadoop.mapreduce.server.jobtracker org.apache.hadoop.mapreduce.server.tasktracker org.apache.hadoop.mapreduce.tools org.apache.hadoop.mapreduce.v2 org.apache.hadoop.mapreduce.v2.app....

    hadoop-mapreduce-examples-2.7.1-sources.jar

    hadoop中的demo,wordcount列子用到的JAR包 用法: # 在容器里运行WordCount程序,该程序需要2个参数...hadoop jar hadoop-mapreduce-examples-2.7.1-sources.jar org.apache.hadoop.examples.WordCount input output

    使用命令行编译打包运行自己的MapReduce程序 Hadoop2.6.0

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper, Text, Text, IntWritable&gt; { private final static ...

    Hadoop 2.7.4 Windows 7 64位 编译bin(包含winutils.exe, hadoop.dll等)

    Hadoop的核心组成部分包括Hadoop Distributed File System (HDFS)和MapReduce计算模型。 在Windows上配置Hadoop通常比在Linux系统上更为复杂,因为Hadoop最初是为Linux环境设计的。winutils.exe是一个实用工具,...

    hadoop.dll hadoop2.7.2版本

    windows环境下运行hadoop的mapreduce程序需要的hadoop.dll winutils.exe等文件,使用方法见解压文件,该文件对应的hadoop版本是 2.7.2 , 请注意版本一致

Global site tag (gtag.js) - Google Analytics