MapReduce执行过程
2011-07-12 17:06:28| 分类: 默认分类 | 标签:mapreduce执行过程 |举报|字号 订阅
1、Map-Reduce的逻辑过程
假设我们需要处理一批有关天气的数据,其格式如下:
- 按照ASCII码存储,每行一条记录
- 每一行字符从0开始计数,第15个到第18个字符为年
- 第25个到第29个字符为温度,其中第25位是符号+/-
0067011990999991950051507+0000+ 0043011990999991950051512+0022+ 0043011990999991950051518-0011+ 0043012650999991949032412+0111+ 0043012650999991949032418+0078+ 0067011990999991937051507+0001+ 0043011990999991937051512-0002+ 0043011990999991945051518+0001+ 0043012650999991945032412+0002+ 0043012650999991945032418+0078+ |
现在需要统计出每年的最高温度。
Map-Reduce主要包括两个步骤:Map和Reduce
每一步都有key-value对作为输入和输出:
- map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
- map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应
对于上面的例子,在map过程,输入的key-value对如下:
(0, 0067011990999991950051507+0000+) (33, 0043011990999991950051512+0022+) (66, 0043011990999991950051518-0011+) (99, 0043012650999991949032412+0111+) (132, 0043012650999991949032418+0078+) (165, 0067011990999991937051507+0001+) (198, 0043011990999991937051512-0002+) (231, 0043011990999991945051518+0001+) (264, 0043012650999991945032412+0002+) (297, 0043012650999991945032418+0078+) |
在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:
(1950, 0) (1950, 22) (1950, -11) (1949, 111) (1949, 78) (1937, 1) (1937, -2) (1945, 1) (1945, 2) (1945, 78) |
在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入
(1950, [0, 22, –11]) (1949, [111, 78]) (1937, [1, -2]) (1945, [1, 2, 78]) |
在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:
(1950, 22) (1949, 111) (1937, 1) (1945, 78) |
其逻辑过程可用如下图表示:
2、编写Map-Reduce程序
编写Map-Reduce程序,一般需要实现两个函数:mapper中的map函数和reducer中的reduce函数。
一般遵循以下格式:
- map: (K1, V1) -> list(K2, V2)
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable { void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException; } |
- reduce: (K2, list(V)) -> list(K3, V3)
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable { void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException; } |
对于上面的例子,则实现的mapper如下:
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(25) == '+') { airTemperature = Integer.parseInt(line.substring(26, 30)); } else { airTemperature = Integer.parseInt(line.substring(25, 30)); } output.collect(new Text(year), new IntWritable(airTemperature)); } } |
实现的reducer如下:
public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } } |
欲运行上面实现的Mapper和Reduce,则需要生成一个Map-Reduce得任务(Job),其基本包括以下三部分:
- 输入的数据,也即需要处理的数据
- Map-Reduce程序,也即上面实现的Mapper和Reducer
- 此任务的配置项JobConf
欲配置JobConf,需要大致了解Hadoop运行job的基本原理:
- Hadoop将Job分成task进行处理,共两种task:map task和reduce task
- Hadoop有两类的节点控制job的运行:JobTracker和TaskTracker
-
- JobTracker协调整个job的运行,将task分配到不同的TaskTracker上
- TaskTracker负责运行task,并将结果返回给JobTracker
- Hadoop将输入数据分成固定大小的块,我们称之input split
- Hadoop为每一个input split创建一个task,在此task中依次处理此split中的一个个记录(record)
- Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个,可以提高运行效率,所以input split的大小也一般是HDFS的block的大小。
- Reduce task的输入一般为Map Task的输出,Reduce Task的输出为整个job的输出,保存在HDFS上。
- 在reduce中,相同key的所有的记录一定会到同一个TaskTracker上面运行,然而不同的key可以在不同的TaskTracker上面运行,我们称之为partition
-
- partition的规则为:(K2, V2) –> Integer, 也即根据K2,生成一个partition的id,具有相同id的K2则进入同一个partition,被同一个TaskTracker上被同一个Reducer进行处理。
public interface Partitioner<K2, V2> extends JobConfigurable { int getPartition(K2 key, V2 value, int numPartitions); } |
下图大概描述了Map-Reduce的Job运行的基本原理:
下面我们讨论JobConf,其有很多的项可以进行配置:
- setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
- setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
- setMapperClass:设置Mapper,默认为IdentityMapper
- setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
- setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
- setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
- setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
- setReducerClass:设置Reducer,默认为IdentityReducer
- setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
- FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
- FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在
当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:
public class MaxTemperature { public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperature.class); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } } |
3、Map-Reduce数据流(data flow)
Map-Reduce的处理过程主要涉及以下四个部分:
- 客户端Client:用于提交Map-reduce任务job
- JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker
- TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker
- HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件
3.1、任务提交
JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。
- 向JobTracker请求一个新的job ID
- 检测此job的output配置
- 计算此job的input splits
- 将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
- 通知JobTracker此Job已经可以运行了
提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。
3.2、任务初始化
当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。
初始化首先创建一个对象来封装job运行的tasks, status以及progress。
在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。
其为每个input split创建一个map task。
每个task被分配一个ID。
3.3、任务分配
TaskTracker周期性的向JobTracker发送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。
在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。
TaskTracker有固定数量的位置来运行map task或者reduce task。
默认的调度器对待map task优先于reduce task
当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。
3.4、任务执行
TaskTracker被分配了一个task,下面便要运行此task。
首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。
TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。
其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。
其三,其创建一个TaskRunner来运行task。
TaskRunner创建一个新的JVM来运行task。
被创建的child JVM和TaskTracker通信来报告运行进度。
3.4.1、Map的过程
MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。
map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。
当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。
在写入硬盘之前,内存中的数据通过partitioner分成多个partition。
在同一个partition中,背景线程会将数据按照key在内存中排序。
每次从内存向硬盘flush数据,都生成一个新的spill文件。
当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。
reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。
3.4.2、Reduce的过程
当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。
对于一个job,JobTracker知道TaskTracer和map输出的对应关系。
reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。
reduce task需要其对应的partition的所有的map输出。
reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。
reduce task中有多个copy线程,可以并行拷贝map输出。
当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。
当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。
3.5、任务结束
当JobTracker获得最后一个task的运行成功的报告后,将job得状态改为成功。
当JobClient从JobTracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。
相关推荐
MapReduce是Hadoop框架的核心组件之一,用于处理和生成大数据集。它主要由两个主要阶段组成:Map阶段和Reduce阶段,这两个阶段都是基于键值对(key-value pair)的操作。 1. Map阶段: 在这个阶段,数据首先被输入...
MapReduce的大体流程是这样的,如图所示:由图片可以看到mapreduce执行下来主要包含这样几个步骤1.首先对输入数据源进行切片2.master调度worker执行map任务3.worker读取输入源片段4.worker执行map任务,将任务输出...
本节将对 Hadoop MapReduce 的工作机制进行介绍,主要从 MapReduce 的作业执行流程和 Shuffle 过程方面进行阐述。通过加深对 MapReduce 工作机制的了解,可以使程序开发者更合理地使用 MapReduce 解决实际问题。 ...
以下是MapReduce执行流程、Split切片、以及MapTask过程的详细解析。 1. MapReduce执行流程: MapReduce的工作流程主要分为四个步骤:作业提交、任务调度、Map任务执行和Reduce任务执行。首先,客户端将作业提交给...
近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 课程目录 000 上课方式和课程大纲介绍 ...065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架数据类型讲解 067
《深入理解Hadoop MapReduce执行过程》 MapReduce是Apache Hadoop的核心组件之一,它为大规模数据处理提供了分布式计算框架。本文将从客户端、JobTracker、TaskTracker和Child四个角度,详细阐述MapReduce的工作...
### Python执行MapReduce测试知识点详解 #### 一、测试环境搭建 在进行Python MapReduce测试之前,需要确保以下环境配置已经完成: 1. **安装Hadoop**:Hadoop是实现MapReduce模型的基础软件,用于分布式处理大数据...
此外,通过Shell命令如`mapred job -status id`,可以在运行过程中和结束后跟踪作业状态,这有利于理解MapReduce的执行流程。 实验的总结与思考部分,强调了实验的目标在于理解和掌握MapReduce编程思想,了解...
对于MapReduce执行过程的理解包括以下几个要点: 1. 用户编写的MapReduce程序需要覆盖MapReduce框架提供的Mapper类和Reducer类。在Mapper类中,用户需要实现map方法,该方法处理输入的键值对,并输出中间的键值对。...
同时,也会涵盖中间数据排序、分区策略以及Shuffle过程,这些都是MapReduce执行过程中不可或缺的环节。 除了基本概念和原理,手册还可能包含MapReduce的优化技巧,比如通过本地化减少网络传输,或者利用Combiner...
### MapReduce详细流程 #### 一、MapReduce概述 MapReduce是Hadoop生态系统中的核心组件之一,主要用于处理大规模数据集的分布式计算。它基于一种简单的编程模型,将复杂的任务分解为两个基本步骤:Map(映射)和...
MapReduce详解Shuffle过程 MapReduce是Hadoop生态系统中的一种分布式计算框架,而Shuffle过程是MapReduce的核心部分。Shuffle过程是将map task的输出结果传送到reduce task的过程,顾名思义,Shuffle就是洗牌或弄乱...
Hadoop 培训课程(3)MapReduce_1 ...MapReduce执行过程** 数据类型与格式*** Writable接口与序列化机制*** ---------------------------加深拓展---------------------- MapReduce的执行过程源码分析
TaskTracker:定期与JobTracker通信,执行Map和Reduce任务 HDFS:保存作业的数据、配置、jar包、结果 作业运行流程 1.在客户端启动一个作业。 2.向JobTracker请求一个Job ID。 3.将运行作业所需要的资源文件复制到...
MapReduce执行过程可以分为两个阶段:Map阶段和Reduce阶段。 Map阶段 在Map阶段,框架使用InputFormat类的子类把输入文件(夹)划分为很多InputSplit,默认,每个HDFS的block对应一个InputSplit。通过RecordReader类...
* MapReduce 程序的执行流程 * MapReduce 程序的优化方法 MapReduce 项目实践 在实践中,我们可以使用 MapReduce 来解决各种大数据处理问题。以下是一些 MapReduce 项目实践: * WordCount 程序编写及代码分析 * ...
MapReduce执行过程中涉及的主要进程包括JobTracker、TaskTracker和Task。JobTracker负责任务调度和资源管理,TaskTracker在各个节点上执行具体的Map和Reduce任务。 1.5 WordCount源码分析 官方的WordCount示例包含...
Hadoop 执行 MapReduce 测试 Hadoop 是一个大数据处理框架,它...这个测试演示了 Hadoop 的基本使用方式,并展示了 MapReduce 程序的执行过程。这种分布式计算模型可以处理大规模数据,并将结果汇总以获得最终结果。