`

Hadoop Map-Reduce入门学习

 
阅读更多
1、Map-Reduce的逻辑过程
假设我们需要处理一批有关天气的数据,其格式如下:

按照ASCII码存储,每行一条记录
每一行字符从0开始计数,第15个到第18个字符为年
第25个到第29个字符为温度,其中第25位是符号+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+


现在需要统计出每年的最高温度。

Map-Reduce主要包括两个步骤:Map和Reduce

每一步都有key-value对作为输入和输出:

#map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
#map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应
对于上面的例子,在map过程,输入的key-value对如下:

(0, 0067011990999991950051507+0000+)

(33, 0043011990999991950051512+0022+)

(66, 0043011990999991950051518-0011+)

(99, 0043012650999991949032412+0111+)

(132, 0043012650999991949032418+0078+)

(165, 0067011990999991937051507+0001+)

(198, 0043011990999991937051512-0002+)

(231, 0043011990999991945051518+0001+)

(264, 0043012650999991945032412+0002+)

(297, 0043012650999991945032418+0078+)


在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:


(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)


在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])


在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)


其逻辑过程可用如下图表示:



2、编写Map-Reduce程序

编写Map-Reduce程序,一般需要实现两个函数:mapper中的map函数reducer中的reduce函数

一般遵循以下格式:

# map: (K1, V1)  ->  list(K2, V2)

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {

  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)

  throws IOException;

}


# reduce: (K2, list(V))  ->  list(K3, V3)

public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {

  void reduce(K2 key, Iterator<V2> values,

              OutputCollector<K3, V3> output, Reporter reporter)

    throws IOException;

}


对于上面的例子,则实现的mapper如下:

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

    @Override

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        String line = value.toString();

        String year = line.substring(15, 19);

        int airTemperature;

        if (line.charAt(25) == '+') {

            airTemperature = Integer.parseInt(line.substring(26, 30));

        } else {

            airTemperature = Integer.parseInt(line.substring(25, 30));

        }

        output.collect(new Text(year), new IntWritable(airTemperature));

    }

}


实现的reducer如下:

public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        int maxValue = Integer.MIN_VALUE;

        while (values.hasNext()) {

            maxValue = Math.max(maxValue, values.next().get());

        }

        output.collect(key, new IntWritable(maxValue));

    }

}


欲运行上面实现的Mapper和Reduce,则需要生成一个Map-Reduce得任务(Job),其基本包括以下三部分:

# 输入的数据,也即需要处理的数据
# Map-Reduce程序,也即上面实现的Mapper和Reducer
# 此任务的配置项JobConf
欲配置JobConf,需要大致了解Hadoop运行job的基本原理:

# Hadoop将Job分成task进行处理,共两种task:map task和reduce task
# Hadoop有两类的节点控制job的运行:JobTracker和TaskTracker
# JobTracker协调整个job的运行,将task分配到不同的TaskTracker上
# TaskTracker负责运行task,并将结果返回给JobTracker
# Hadoop将输入数据分成固定大小的块,我们称之input split
# Hadoop为每一个input split创建一个task,在此task中依次处理此split中的一个个记录(record)
# Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个,可以提高运行效率,所以input split的大小也一般是HDFS的block的大小。
# Reduce task的输入一般为Map Task的输出,Reduce Task的输出为整个job的输出,保存在HDFS上。

# 在reduce中,相同key的所有的记录一定会到同一个TaskTracker上面运行,然而不同的key可以在不同的TaskTracker上面运行,我们称之为partition
partition的规则为:(K2, V2) –> Integer, 也即根据K2,生成一个partition的id,具有相同id的K2则进入同一个partition,被同一个TaskTracker上被同一个Reducer进行处理。

public interface Partitioner<K2, V2> extends JobConfigurable {

  int getPartition(K2 key, V2 value, int numPartitions);

}


下图大概描述了Map-Reduce的Job运行的基本原理:



下面我们讨论JobConf,其有很多的项可以进行配置:

* setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
* setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
* setMapperClass:设置Mapper,默认为IdentityMapper
* setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
* setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
* setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
* setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
* setReducerClass:设置Reducer,默认为IdentityReducer
* setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
* FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
* FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在

当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:

public class MaxTemperature {

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {

            System.err.println("Usage: MaxTemperature <input path> <output path>");

            System.exit(-1);

        }

        JobConf conf = new JobConf(MaxTemperature.class);

        conf.setJobName("Max temperature");

        FileInputFormat.addInputPath(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(MaxTemperatureMapper.class);

        conf.setReducerClass(MaxTemperatureReducer.class);

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

        JobClient.runJob(conf);

    }

}


3、Map-Reduce数据流(data flow)

Map-Reduce的处理过程主要涉及以下四个部分:

* 客户端Client:用于提交Map-reduce任务job
* JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker
* TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker
* HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件



3.1、任务提交

JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。

* 向JobTracker请求一个新的job ID
* 检测此job的output配置
* 计算此job的input splits
* 将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
* 通知JobTracker此Job已经可以运行了
提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。


3.2、任务初始化

当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。

初始化首先创建一个对象来封装job运行的tasks, status以及progress。

在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。

其为每个input split创建一个map task。

每个task被分配一个ID。


3.3、任务分配


TaskTracker周期性的向JobTracker发送heartbeat。

在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。

在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。

TaskTracker有固定数量的位置来运行map task或者reduce task。

默认的调度器对待map task优先于reduce task

当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。



3.4、任务执行


TaskTracker被分配了一个task,下面便要运行此task。

首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。

TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。

其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。

其三,其创建一个TaskRunner来运行task。

TaskRunner创建一个新的JVM来运行task。

被创建的child JVM和TaskTracker通信来报告运行进度。



3.4.1、Map的过程

MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。

map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。

当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。

在写入硬盘之前,内存中的数据通过partitioner分成多个partition。

在同一个partition中,背景线程会将数据按照key在内存中排序。

每次从内存向硬盘flush数据,都生成一个新的spill文件。

当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。

reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。

3.4.2、Reduce的过程

当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。

对于一个job,JobTracker知道TaskTracer和map输出的对应关系。

reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。

reduce task需要其对应的partition的所有的map输出。

reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。

reduce task中有多个copy线程,可以并行拷贝map输出。

当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。

当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。

最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。



3.5、任务结束


当JobTracker获得最后一个task的运行成功的报告后,将job得状态改为成功。

当JobClient从JobTracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。


转自:http://www.cnblogs.com/forfuture1978/archive/2010/11/14/1877086.html


  • 大小: 65.4 KB
  • 大小: 222.5 KB
  • 大小: 88.7 KB
  • 大小: 172.2 KB
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    hadoop map-reduce turorial

    在深入学习Hadoop Map-Reduce之前,确保Hadoop已安装、配置且运行正常。对于初次使用者,推荐参考Hadoop快速入门指南;对于大型分布式集群环境,则需查阅Hadoop集群设置文档,以确保系统能够高效稳定地运行Map-...

    大数据云计算技术 优酷网Hadoop及Mapreduce入门教程(共35页).pptx

    Hadoop Map-reduce Job Scheduler Resources Hadoop, Why? 数据太多了,需要能存储、快速分析Pb级数据集的系统 单机的存储、IO、内存、CPU有限,需要可扩展的集群 使用门槛低,数据分析是个庞杂的问题,MPI太复杂 ...

    【实例介绍】hadoop入门-入门必备技.pdf

    Map阶段将原始数据拆分为键值对,Reduce阶段则负责聚合和总结这些数据。 Hadoop有以下几个显著优点: - 高可靠性:HDFS通过数据冗余备份确保数据安全,即使部分节点故障,数据仍可恢复。 - 高扩展性:Hadoop可以...

    Hadoop实战-陆嘉恒(高清完整版)

    MapReduce则是Hadoop用于大规模数据处理的编程模型,通过“映射”(map)和“化简”(reduce)两个步骤,实现了并行计算的能力。 在Hadoop实战中,你将学习到如何部署和管理Hadoop集群。这包括安装配置Hadoop环境、...

    hadoop map reduce 中文教程

    每个案例都详细列出了实践步骤,包括如何编写 Map 和 Reduce 函数、如何配置 Hadoop 环境、如何运行 MapReduce 任务等。 #### 六、总结 Hadoop MapReduce 是一种非常强大的分布式数据处理工具,它通过简单的编程...

    Windows平台下Hadoop的Map/Reduce开发

    通过以上步骤,初学者可以逐步熟悉Windows环境下Hadoop的安装、配置和开发流程,从而顺利入门Map/Reduce编程。随着经验的增长,可以进一步探索更复杂的数据处理任务和优化策略,如Combiner的使用、Shuffle和Sort过程...

    hadoop3-quick-start:这是一个存储有关hadoop3-quick入门指南的所有示例的存储库

    在Hadoop 3快速入门指南中,我们主要探讨的是如何高效地使用这个分布式计算框架进行大数据处理。Hadoop是Apache软件基金会的一个开源项目,它的核心是HDFS(Hadoop Distributed File System)和MapReduce,这两个...

    Hadoop集群-WordCount运行详解.pdf

    Hadoop采用MapReduce编程模型来实现分布式计算,MapReduce是Hadoop的核心组件,它将复杂的、运行在大规模集群上的并行计算问题简化为两个关键操作:Map(映射)和Reduce(归约)。 MapReduce理论简介中,1.1.1...

    Hadoop入门学习文档

    ### Hadoop入门学习文档知识点梳理 #### 一、大数据概论 ##### 1.1 大数据概念 - **定义**:大数据是指无法在可承受的时间范围内用常规软件工具进行捕捉、管理和处理的数据集合。 - **特点**: - **Volume(大量)...

    Hadoop 2 Quick-Start Guide_ Lea - Douglas Eadline

    ### Hadoop 2 快速入门指南核心知识点详解 #### 一、Hadoop 2.x与YARN:数据处理的新纪元 随着Hadoop 2.x版本的发布及其引入的YARN架构,Hadoop不再仅仅局限于MapReduce计算模型,而是发展成为一个更为通用的数据...

    史上最强Hadoop-1.2.1安装文档

    - 通过`Window -&gt; Preferences -&gt; Hadoop Map/Reduce`配置Hadoop路径。 - 选择`Hadoop installation directory`,点击`Browse`,选中Hadoop安装目录。 - 在Eclipse中显示MapReduce工具栏。 - **创建Map/Reduce...

    Hadoop学习总结

    **Map-Reduce入门** Map-Reduce是Hadoop的另一核心组件,用于大规模数据集的并行计算。它分为两个主要阶段:Map阶段和Reduce阶段。 1. **Map阶段**:在这个阶段,输入数据被分割成多个小片(split),然后分配给...

    配置mapreduce开发环境(简单易懂,轻松上手)

    ### 配置MapReduce开发环境详解 #### 一、引言 MapReduce是Apache Hadoop框架的核心组件之一,主要用于处理大规模数据集的...希望本文能够帮助您快速入门MapReduce开发,并为进一步学习大数据处理技术打下坚实的基础。

    hadoop 中文手册

    hadoop 中文手册 Hadoop文档 下面的文档是一些概念介绍和操作教程,可帮助你开始使用Hadoop。如果遇到了问题,你可以向邮件列表求助或者浏览一下存档...Hadoop Map-Reduce教程 Hadoop本地库 API参考 维基 常见问题

    hadoop - Hadoop2 Quick-Start Guide

    ### Hadoop 2快速入门指南知识点详解 #### 一、背景与概念 1. **Apache Hadoop定义**:Hadoop是一种开源软件框架,用于分布式存储和处理大型数据集。它能够在集群环境中运行,并且能够自动处理节点故障问题,提供...

    hadoop,hive,hbase学习资料

    1. **Hadoop学习总结之三:Map-Reduce入门.doc**:这是一份关于MapReduce的入门文档,MapReduce是Hadoop的核心计算模型,它将大型数据集划分为小块,并在分布式集群上并行处理。 2. **Hadoop学习总结之一:HDFS简介...

    Hadoop 十分钟快速入门

    本快速入门将带你深入了解Hadoop生态系统的核心组件,包括HDFS、MapReduce、Hive和HBase,并通过集群配置实例让你快速上手。 首先,我们来探讨Hadoop。Hadoop是Apache基金会的一个开源项目,其核心设计思想是分布式...

    Hadoop入门中文手册

    目的是帮助你快速完成单机上的Hadoop安装与使用以便你对Hadoop分布式文件系统(HDFS)和Map-Reduce框架有所体会,比如在HDFS上运行示例程序或简单作业等,同样也介绍了Hive,HBase详细安装应用! 目前国内应用和研究...

    Hadoop开发者入门-带书签文字版

    学习如何编写Map和Reduce函数,理解shuffle和sort过程,是成为Hadoop开发者的必备技能。 4. **YARN详解**:YARN是Hadoop的资源调度器,负责管理和分配集群资源。理解应用程序生命周期,包括容器、应用Master和...

    Hadoop学习总结和源码分析

    “Hadoop学习总结之三:Map-Reduce入门.doc”介绍了MapReduce编程模型,它是Hadoop处理数据的主要计算框架。Map阶段将输入数据分割成键值对,通过映射函数进行初步处理;Reduce阶段则将Map的输出聚合,通过化简函数...

Global site tag (gtag.js) - Google Analytics