`
eimhee
  • 浏览: 2151057 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop Mapreduce运行原理与常用数据压缩格式(一)

 
阅读更多

我们通过提交jar包,进行MapReduce处理,那么整个运行过程分为五个环节:

1、向client端提交MapReduce job.

2、随后yarn的ResourceManager进行资源的分配.

3、由NodeManager进行加载与监控containers.

4、通过applicationMaster与ResourceManager进行资源的申请及状态的交互,由NodeManagers进行MapReduce运行时job的管理.

5、通过hdfs进行job配置文件、jar包的各节点分发。



 

Job 提交过程

job的提交通过 调用submit()方法 创建一个 JobSubmitter 实例,并 调用submitJobInternal() 方法。整个job的运行过程如下:

1、向ResourceManager申请application ID,此ID为该MapReduce的jobId。

2、检查output的路径是否正确,是否已经被创建。

3、计算input的splits。

4、拷贝运行job 需要的jar包、配置文件以及计算input的split 到各个节点。

5、在ResourceManager中调用submitAppliction()方法,执行job

Job 初始化过程

1、当resourceManager收到了submitApplication()方法的调用通知后,scheduler开始分配container,随之ResouceManager发送applicationMaster进程,告知每个nodeManager管理器。

2、 由applicationMaster决定 如何运行tasks,如果job数据量比较小,applicationMaster便选择 将tasks运行在一个JVM中 。那么如何判别这个job是大是小呢?当一个job的 mappers数量小于10个 , 只有一个reducer或者读取的文件大小要小于一个HDFS block时 ,(可通过修改配置项mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 进行调整)

3、在运行tasks之前,applicationMaster将会 调用setupJob()方法 ,随之创建output的输出路径(这就能够解释,不管你的mapreduce一开始是否报错,输出路径都会创建)

Task 任务分配

1、接下来applicationMaster向ResourceManager请求containers用于执行map与reduce的tasks(step 8),这里map task的优先级要高于reduce task,当所有的map tasks结束后,随之进行sort(这里是shuffle过程后面再说),最后进行reduce task的开始。(这里有一点,当map tasks执行了百分之5%的时候,将会请求reduce,具体下面再总结)

2、运行tasks的是需要消耗内存与CPU资源的, 默认情况下,map和reduce的task资源分配为1024MB与一个核 ,(可修改运行的最小与最大参数配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)

Task 任务执行

1、这时一个task已经被ResourceManager分配到一个container中,由applicationMaster告知nodemanager启动container,这个task将会被一个 主函数为YarnChild 的java application运行,但在运行task之前, 首先定位task需要的jar包、配置文件以及加载在缓存中的文件 。

2、YarnChild运行于一个专属的JVM中,所以 任何一个map或reduce任务出现问题,都不会影响整个nodemanager的crash或者hang 。

3、每个task都可以在相同的JVM task中完成,随之将完成的处理数据写入临时文件中。

Mapreduce数据流

运行进度与状态更新

1、MapReduce是一个较长运行时间的批处理过程,可以是一小时、几小时甚至几天,那么Job的运行状态监控就非常重要。每个job以及 每个task都有一个包含job(running,successfully completed,failed)的状态 ,以及value的计数器,状态信息及描述信息(描述信息一般都是在代码中加的打印信息),那么,这些信息是如何与客户端进行通信的呢?

2、当一个task开始执行,它将会保持运行记录,记录task完成的比例,对于map的任务,将会记录其运行的百分比,对于reduce来说可能复杂点,但系统依旧会估计reduce的完成比例。当一个map或reduce任务执行时, 子进程会持续每三秒钟与applicationMaster进行交互 。

Job 完成

最终,applicationMaster会收到一个job完成的通知,随后改变job的状态为successful。最终,applicationMaster与task containers被清空。

Shuffle与Sort

从map到reduce的过程,被称之为shuffle过程,MapReduce使到reduce的数据一定是经过key的排序的,那么shuffle是如何运作的呢?

当map任务将数据output时, 不仅仅是将结果输出到磁盘,它是将其写入内存缓冲区域,并进行一些预分类 。



 

1、The Map Side

首先map任务的 output过程是一个环状的内存缓冲区,缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存的大小到达一定比例 ,默认为80% (可通过mapreduce.map.sort.spill.percent配置项修改),便开始写入磁盘。

在写入磁盘之前,线程将会指定数据写入与reduce相应的patitions中,最终传送给reduce.在每个partition中 ,后台线程将会在内存中进行Key的排序 ,( 如果代码中有combiner方法,则会在output时就进行sort排序 ,这里,如果只有少于3个写入磁盘的文件,combiner将会在outputfile前启动,如果只有一个或两个,那么将不会调用)

这里 将map输出的结果进行压缩会大大减少磁盘IO与网络传输的开销 (配置参数mapreduce.map .output.compress 设置为true,如果使用第三方压缩jar,可通过mapreduce.map.output.compress.codec进行设置)

随后这些paritions输出文件将会通过HTTP发送至reducers,传送的最大启动线程通过mapreduce.shuffle.max.threads进行配置。

2、The Reduce Side

首先上面每个节点的map都将结果写入了本地磁盘中,现在reduce需要将map的结果通过集群拉取过来,这里要注意的是, 需要等到所有map任务结束后,reduce才会对map的结果进行拷贝 ,由于reduce函数有少数几个复制线程,以至于它 可以同时拉取多个map的输出结果。默认的为5个线程 (可通过修改配置mapreduce.reduce.shuffle.parallelcopies来修改其个数)

这里有个问题,那么reducers怎么知道从哪些机器拉取数据呢?

当所有map的任务结束后, applicationMaster通过心跳机制(heartbeat mechanism),由它知道mapping的输出结果与机器host ,所以 reducer会定时的通过一个线程访问applicationmaster请求map的输出结果 。

Map的结果将会被拷贝到reduce task的JVM的内存中(内存大小可在mapreduce.reduce.shuffle.input.buffer.percent中设置)如果不够用,则会写入磁盘。当内存缓冲区的大小到达一定比例时(可通过mapreduce.reduce.shuffle.merge.percent设置)或map的输出结果文件过多时(可通过配置mapreduce.reduce.merge.inmen.threshold),将会除法合并(merged)随之写入磁盘。

这时要注意, 所有的map结果这时都是被压缩过的,需要先在内存中进行解压缩,以便后续合并它们 。(合并最终文件的数量可通过mapreduce.task.io.sort.factor进行配置) 最终reduce进行运算进行输出。

这里附带的整理了下Parquet存储结构与SequenceFile存储结构的特点

Parquet

Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目,那么这里就总结下Parquet数据结构到底是什么样的呢?

一个Parquet文件是 由一个header以及一个或多个block块组成,以一个footer结尾。header中只包含一个4个字节的数字PAR1用来识别整个Parquet文件格式。文件中所有的metadata都存在于footer中 。footer中的metadata包含了格式的版本信息,schema信息、key-value paris以及所有block中的metadata信息。footer中最后两个字段为一个以4个字节长度的footer的metadata,以及同header中包含的一样的PAR1。

读取一个Parquet文件时,需要完全读取Footer的meatadata,Parquet格式文件不需要读取sync markers这样的标记分割查找,因为所有block的边界都存储于footer的metadata中(因为metadata的写入是在所有blocks块写入完成之后的,所以吸入操作包含的所有block的位置信息都是存在于内存直到文件close)

这里注意,不像sequence files以及Avro数据格式文件的header以及sync markers是用来分割blocks。Parquet格式文件不需要sync markers,因此block的边界存储与footer的meatada中。

 



 

在Parquet文件中,每一个block都具有一组Row group,她们是由一组Column chunk组成的列数据。继续往下,每一个column chunk中又包含了它具有的pages。每个page就包含了来自于相同列的值.Parquet同时使用更紧凑形式的编码,当写入Parquet文件时,它会自动基于column的类型适配一个合适的编码,比如,一个boolean形式的值将会被用于run-length encoding。

另一方面,Parquet文件对于每个page支持标准的压缩算法比如支持Snappy,gzip以及LZO压缩格式,也支持不压缩。

 

更多hadoop 内容请访问 https://www.hadoop.so/

  • 大小: 27.6 KB
  • 大小: 16.4 KB
  • 大小: 14.9 KB
0
0
分享到:
评论

相关推荐

    Hadoop-MapReduce项目代码ZIP压缩包 + 面向小白(注释详细清晰)

    在IT行业中,Hadoop MapReduce是一项核心技术,常用于大数据处理。这个项目代码ZIP压缩包是为初学者设计的,特别适合那些对MapReduce不熟悉但希望深入理解这一领域的“小白”用户。它提供了详细的注释,使学习过程...

    最高气温 map reduce hadoop 实例

    本实例将介绍如何使用Hadoop MapReduce解决一个实际问题——找出给定数据集中的最高气温。MapReduce是一种编程模型,用于大规模数据集的并行计算,由Google提出并在Hadoop中得以实现。 【描述】:“运行命令hadoop ...

    Hadoop_MapReduce教程

    Hadoop MapReduce 是一个用于处理大规模数据集的软件框架,它使得开发者能够编写可以在大量廉价商用服务器组成的集群上运行的应用程序。该框架支持可靠的容错机制,并能够以并行的方式处理TB级别的数据集。 - **...

    Hadoop构建数据仓库实践1_hadoop_

    MapReduce则是一种编程模型,用于大规模数据集的并行计算,其工作原理包括Map阶段(数据分片和处理)和Reduce阶段(汇总结果)。 构建数据仓库在Hadoop上,通常涉及以下几个步骤: 1. 数据获取:这是数据仓库生命...

    MapReduce数据分析实战

    在“MapReduce数据分析实战”中,作者李立松通过自己的经验分享了使用MapReduce进行数据分析的实践案例,尤其是如何通过Hadoop这一开源框架来实施MapReduce计算。 首先,文档提到了单机测试的重要性,这是在集群...

    Java写的hadoop压缩worldcount例子

    这些压缩格式可以有效地减少数据在HDFS(Hadoop分布式文件系统)中的存储空间和网络传输的数据量,从而提高整体性能。在这个例子中,可能涉及到自定义的压缩器,这通常是为了优化特定场景下的压缩效率或解压速度。 ...

    Hadoop Map Reduce教程

    - **定义**:Hadoop MapReduce 是一个基于 Java 的分布式数据处理框架,它能够高效地处理大规模数据集。该框架将任务分解为一系列较小的任务(Map 和 Reduce),并在集群中的多台计算机上并行执行这些任务。 - **...

    最短路径系列之一从零开始学习Hadoop

    6. MapReduce的输入输出:MapReduce程序通过InputFormat类来定义输入数据的解析方式,而OutputFormat类则定义输出数据的格式。InputFormat和OutputFormat共同决定了MapReduce程序如何读取输入数据和输出计算结果。 ...

    hadoop毅哥的压缩包.7z

    3. **Hadoop数据压缩**:Hadoop支持多种压缩格式,如GZIP、BZip2、LZO等。LZO压缩速度快,但压缩率相对较低。使用hadoop-lzo-0.4.20.jar,用户可以在Hadoop作业中利用LZO进行数据压缩和解压缩,以减少网络传输和存储...

    hadoop-3.1.0-windows依赖文件.7z

    Hadoop是大数据处理领域的重要工具,它是一个开源的分布式计算框架,主要由Apache软件基金会维护。Hadoop 3.1.0是Hadoop发展过程中的一个重要版本,它在Hadoop 3.0的基础上进行了多项改进和优化,提升了系统的稳定性...

    eclipse hadoop 例子源代码

    MyWordCount是Hadoop的标志性示例,展示了MapReduce的工作原理。它由两个主要部分组成:Map阶段和Reduce阶段。Map阶段将输入文本分割成单词,并为每个单词生成一个键值对(键为单词,值为1);Reduce阶段则对所有...

    深入云计算 Hadoop源代码分析

    MapReduce是Hadoop的核心计算框架之一,其主要负责处理数据。MapReduce的基本工作流程如下: 1. **Splitting(分割)**:首先,输入数据被划分为多个小块(称为Splits),这些小块会分别被不同的Map任务处理。 2. **...

    hadoop-linux-2.6.4

    该文件是一个文本格式的tar归档,包含所有必要的组件,如Hadoop Common、Hadoop HDFS(分布式文件系统)、Hadoop YARN(资源管理系统)和Hadoop MapReduce(并行计算框架)等。解压后,需要按照官方文档进行编译和...

    hadoop-3.2.2.tar.gz.7z

    该文件首先被.7z格式压缩,这是一种高效的压缩格式,可以显著减少文件大小,便于下载和存储。在.7z压缩包内,有一个名为"hadoop-3.2.2.tar.gz"的文件,这通常意味着它是一个使用gzip压缩的tar归档文件,tar用于将多...

    hadoop-2.7.1.tar.gz.zip

    这是一个压缩文件,最外层是.zip格式,内部包含的是一个名为“hadoop-2.7.1.tar.gz”的文件。在Linux或Mac系统中,通常会先使用`unzip`命令解压最外层的zip文件,然后用`tar -zxvf`命令解压内层的tar.gz文件。...

    hadoop 笔记

    - **3.2.1 支持平台**:Hadoop可以在多种操作系统上运行,如Linux、Mac OS X和Windows,但在生产环境中最常用的是Linux。 - **3.2.2 所需软件**:需要安装JDK(Java Development Kit),因为Hadoop是用Java编写的。 ...

    Hadoop_2.7.2安装包.rar

    Hadoop是一款开源的分布式计算框架,由Apache基金会开发,主要用于处理和存储海量数据。这个"Hadoop_2.7.2安装包.rar"包含了Hadoop 2.7.2版本的所有组件,供用户在本地或者集群环境中搭建大数据处理平台。在这个版本...

    hadoop-2.7.7.zip(windows 用)

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,主要用于处理和存储海量数据。这个压缩包“hadoop-2.7.7.zip”是专为Windows 7系统设计的Hadoop版本,使得用户在Windows环境下也能搭建本地的大数据处理环境...

    hadoop-3.3.1-aarch64.tar.gz

    标题 "hadoop-3.3.1-aarch64.tar.gz" 暗示这是一个针对aarch64架构(ARM64)的Hadoop 3.3.1版本的压缩包,适合在运行M1芯片(苹果公司的ARM架构处理器)的Mac系统上使用。Hadoop是一个开源的分布式计算框架,它允许...

    hadoop+hbase jar包

    3. Jar包:在Java开发中,jar(Java Archive)文件是包含类文件和其他资源的压缩格式,使得开发者可以将多个类打包在一起方便分发和运行。对于Hadoop和HBase,开发或运行应用程序通常需要依赖相应的jar包,这些jar...

Global site tag (gtag.js) - Google Analytics