`

浅析Hadoop文件格式

 
阅读更多

Hadoop 作为MR 的开源实现,一直以动态运行解析文件格式并获得比MPP数据库快上几倍的装载速度为优势。不过,MPP数据库社区也一直批评Hadoop由于文件格式并非 为特定目的而建,因此序列化和反序列化的成本过高[7]。本文介绍Hadoop目前已有的几种文件格式,分析其特点、开销及使用场景。希望加深读者对 Hadoop文件格式及其影响性能的因素的理解。

Hadoop 中的文件格式

1 SequenceFile

SequenceFile是Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用Hadoop 的标准的Writable 接口实现序列化和反序列化。它与Hadoop API中的MapFile 是互相兼容的。Hive 中的SequenceFile 继承自Hadoop API 的SequenceFile,不过它的key为空,使用value 存放实际的值, 这样是为了避免MR 在运行map 阶段的排序过程。如果你用Java API 编写SequenceFile,并让Hive 读取的话,请确保使用value字段存放数据,否则你需要自定义读取这种SequenceFile 的InputFormat class 和OutputFormat class。

图1:Sequencefile 文件结构

2 RCFile

RCFile是Hive推出的一种专门面向列的数据格式。 它遵循“先按列划分,再垂直划分”的设计理念。当查询过程中,针对它并不关心的列时,它会在IO上跳过这些列。需要说明的是,RCFile在map阶段从 远端拷贝仍然是拷贝整个数据块,并且拷贝到本地目录后RCFile并不是真正直接跳过不需要的列,并跳到需要读取的列, 而是通过扫描每一个row group的头部定义来实现的,但是在整个HDFS Block 级别的头部并没有定义每个列从哪个row group起始到哪个row group结束。所以在读取所有列的情况下,RCFile的性能反而没有SequenceFile高。

图2:RCFile 文件结构

3 Avro

Avro是一种用于支持数据密集型的二进制文件格式。它的文件格式更为紧凑,若要读取大量数据时,Avro能够提供更好的序列化和反序列化性能。并 且Avro数据文件天生是带Schema定义的,所以它不需要开发者在API 级别实现自己的Writable对象。最近多个Hadoop 子项目都支持Avro 数据格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。

图3:Avro MR 文件格式

4. 文本格式

除上面提到的3种二进制格式之外,文本格式的数据也是Hadoop中经常碰到的。如TextFile 、XML和JSON。 文本格式除了会占用更多磁盘资源外,对它的解析开销一般会比二进制格式高几十倍以上,尤其是XML 和JSON,它们的解析开销比Textfile 还要大,因此强烈不建议在生产系统中使用这些格式进行储存。 如果需要输出这些格式,请在客户端做相应的转换操作。 文本格式经常会用于日志收集,数据库导入,Hive默认配置也是使用文本格式,而且常常容易忘了压缩,所以请确保使用了正确的格式。另外文本格式的一个缺 点是它不具备类型和模式,比如销售金额、利润这类数值数据或者日期时间类型的数据,如果使用文本格式保存,由于它们本身的字符串类型的长短不一,或者含有 负数,导致MR没有办法排序,所以往往需要将它们预处理成含有模式的二进制格式,这又导致了不必要的预处理步骤的开销和储存资源的浪费。

5. 外部格式

Hadoop实际上支持任意文件格式,只要能够实现对应的RecordWriter和RecordReader即可。其中数据库格式也是会经常储存 在Hadoop中,比如Hbase,Mysql,Cassandra,MongoDB。 这些格式一般是为了避免大量的数据移动和快速装载的需求而用的。他们的序列化和反序列化都是由这些数据库格式的客户端完成,并且文件的储存位置和数据布局 (Data Layout)不由Hadoop控制,他们的文件切分也不是按HDFS的块大小(blocksize)进行切割。

文件存储大小比较与分析

我们选取一个TPC-H标准测试来说明不同的文件格式在存储上的开销。因为此数据是公开的,所以读者如果对此结果感兴趣,也可以对照后面的实验自行 做一遍。Orders 表文本格式的原始大小为1.62G。 我们将其装载进Hadoop 并使用Hive 将其转化成以上几种格式,在同一种LZO 压缩模式下测试形成的文件的大小。

 

Orders_text1

1732690045

1.61G

非压缩

TextFile

Orders_tex2

772681211

736M

LZO 压缩

TextFile

Orders_seq1

1935513587

1.80G

非压缩

SequenceFile

Orders_seq2

822048201

783M

LZO 压缩

SequenceFile

Orders_rcfile1

1648746355

1.53G

非压缩

RCFile

Orders_rcfile2

686927221

655M

LZO 压缩

RCFile

Orders_avro_table1

1568359334

1.46G

非压缩

Avro

Orders_avro_table2

652962989

622M

LZO 压缩

Avro

表1:不同格式文件大小对比

从上述实验结果可以看到,SequenceFile无论在压缩和非压缩的情况下都比原始纯文本TextFile大,其中非压缩模式下大11%, 压缩模式下大6.4%。这跟SequenceFile的文件格式的定义有关: SequenceFile在文件头中定义了其元数据,元数据的大小会根据压缩模式的不同略有不同。一般情况下,压缩都是选取block 级别进行的,每一个block都包含key的长度和value的长度,另外每4K字节会有一个sync-marker的标记。对于TextFile文件格 式来说不同列之间只需要用一个行间隔符来切分,所以TextFile文件格式比SequenceFile文件格式要小。但是TextFile 文件格式不定义列的长度,所以它必须逐个字符判断每个字符是不是分隔符和行结束符。因此TextFile 的反序列化开销会比其他二进制的文件格式高几十倍以上。

RCFile文件格式同样也会保存每个列的每个字段的长度。但是它是连续储存在头部元数据块中,它储存实际数据值也是连续的。另外RCFile 会每隔一定块大小重写一次头部的元数据块(称为row group,由hive.io.rcfile.record.buffer.size控制,其默认大小为4M),这种做法对于新出现的列是必须的,但是如 果是重复的列则不需要。RCFile 本来应该会比SequenceFile 文件大,但是RCFile 在定义头部时对于字段长度使用了Run Length Encoding进行压缩,所以RCFile 比SequenceFile又小一些。Run length Encoding针对固定长度的数据格式有非常高的压缩效率,比如Integer、Double和Long等占固定长度的数据类型。在此提一个特例—— Hive 0.8引入的TimeStamp 时间类型,如果其格式不包括毫秒,可表示为”YYYY-MM-DD HH:MM:SS”,那么就是固定长度占8个字节。如果带毫秒,则表示为”YYYY-MM-DD HH:MM:SS.fffffffff”,后面毫秒的部分则是可变的。

Avro文件格式也按group进行划分。但是它会在头部定义整个数据的模式(Schema), 而不像RCFile那样每隔一个row group就定义列的类型,并且重复多次。另外,Avro在使用部分类型的时候会使用更小的数据类型,比如Short或者Byte类型,所以Avro的数 据块比RCFile 的文件格式块更小。

序列化与反序列化开销分析

我们可以使用Java的profile工具来查看Hadoop 运行时任务的CPU和内存开销。以下是在Hive 命令行中的设置:

hive>set mapred.task.profile=true;
hive>set mapred.task.profile.params =-agentlib:hprof=cpu=samples,heap=sites, depth=6,force=n,thread=y,verbose=n,file=%s

当map task 运行结束后,它产生的日志会写在$logs/userlogs/job- 文件夹下。当然,你也可以直接在JobTracker的Web界面的logs或jobtracker.jsp 页面找到日志。

我们运行一个简单的SQL语句来观察RCFile 格式在序列化和反序列化上的开销:

hive> select O_CUSTKEY,O_ORDERSTATUS from orders_rc2 where O_ORDERSTATUS='P';

其中的O_CUSTKEY列为integer类型,O_ORDERSTATUS为String类型。在日志输出的最后会包含内存和CPU 的消耗。

下表是一次CPU 的开销:

 

rank

self

accum

count

trace

method

20          

0.48%

79.64%

65

315554

org.apache.hadoop.hive.ql.io.RCFile$Reader.getCurrentRow

28

0.24%

82.07%

32

315292

org.apache.hadoop.hive.serde2.columnar.ColumnarStruct.init

55

0.10%

85.98%

14

315788

org.apache.hadoop.hive.ql.io.RCFileRecordReader.getPos

56

0.10%

86.08%

14

315797

org.apache.hadoop.hive.ql.io.RCFileRecordReader.next

表2:一次CPU的开销

其中第五列可以对照上面的Track信息查看到底调用了哪些函数。比如CPU消耗排名20的函数对应Track:

TRACE 315554: (thread=200001)
    org.apache.hadoop.hive.ql.io.RCFile$Reader.getCurrentRow(RCFile.java:1434)
    org.apache.hadoop.hive.ql.io.RCFileRecordReader.next(RCFileRecordReader.java:88)
    org.apache.hadoop.hive.ql.io.RCFileRecordReader.next(RCFileRecordReader.java:39)
org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:98)
org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:42)    
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:67)

其中,比较明显的是RCFile,它为了构造行而消耗了不必要的数组移动开销。其主要是因为RCFile 为了还原行,需要构造RowContainer,顺序读取一行构造RowContainer,然后给其中对应的列进行赋值,因为RCFile早期为了兼容 SequenceFile所以可以合并两个block,又由于RCFile不知道列在哪个row group结束,所以必须维持数组的当前位置,类似如下格式定义:

   Array<RowContainer extends List<Object>>

而此数据格式可以改为面向列的序列化和反序列化方式。如:

Map<array<col1Type>,array<col2Type>,array<col3Type>....>

这种方式的反序列化会避免不必要的数组移动,当然前提是我们必须知道列在哪个row group开始到哪个row group结束。这种方式会提高整体反序列化过程的效率。

关于Hadoop文件格式的思考

1 高效压缩

Hadoop目前尚未出现针对数据特性的高效编码(Encoding)和解码(Decoding)数据格式。尤其是支持Run Length Encoding、Bitmap 这些极为高效算法的数据格式。HIVE-2065 讨论过使用更加高效的压缩形式,但是对于如何选取列的顺序没有结论。关于列顺序选择可以看Daniel Lemire的一篇论文 《Reordering Columns for Smaller Indexes》[1]。作者同时也是Hive 0.8中引入的bitmap 压缩算法基础库的作者。该论文的结论是:当某个表需要选取多个列进行压缩时,需要根据列的选择性(selectivity)进行升序排列,即唯一值越少的 列排得越靠前。 事实上这个结论也是Vertica多年来使用的数据格式。其他跟压缩有关的还有HIVE-2604和HIVE-2600。

2 基于列和块的序列化和反序列化

不论排序后的结果是不是真的需要,目前Hadoop的整体框架都需要不断根据数据key进行排序。除了上面提到的基于列的排序,序列化和反序列化之 外,Hadoop的文件格式应该支持某种基于块(Block) 级别的排序和序列化及反序列化方式,只有当数据满足需要时才进行这些操作。来自Google Tenzing论文中曾将它作为MR 的优化手段提到过。

“Block Shuffle:正常来说,MR 在Shuffle 的时候使用基于行的编码和解码。为了逐个处理每一行,数据必须先排序。然而,当排序不是必要的时候这种方式并不高效,我们在基于行的shuffle基础上 实现了一种基于block的shuffle方式,每一次处理大概1M的压缩block,通过把整个block当成一行,我们能够避免MR框架上的基于行的 序列化和反序列化消耗,这种方式比基于行的shuffle 快上3倍以上。”

3 数据过滤(Skip List)

除常见的分区和索引之外,使用排序之后的块(Block)间隔也是常见列数据库中使用的过滤数据的方法。Google Tenzing同样描述了一种叫做ColumnIO 的数据格式,ColumnIO在头部定义该Block的最大值和最小值,在进行数据判断的时候,如果当前Block的头部信息里面描述的范围中不包含当前 需要处理的内容,则会直接跳过该块。Hive社区里曾讨论过如何跳过不需要的块 ,可是因为没有排序所以一直没有较好的实现方式。包括RCFile格式,Hive的index 机制里面目前还没有一个高效的根据头部元数据就可以跳过块的实现方式。

4 延迟物化

真正好的列数据库,都应该可以支持直接在压缩数据之上不需要通过解压和排序就能够直接操作块。通过这种方式可以极大的降低MR 框架或者行式数据库中先解压,再反序列化,然后再排序所带来的开销。Google Tenzing里面描述的Block Shuffle 也属于延迟物化的一种。更好的延迟物化可以直接在压缩数据上进行操作,并且可以做内部循环, 此方面在论文《Integrating Compression and Execution in Column-Oriented Database System》[5]的5.2 章节有描述。 不过考虑到它跟UDF 集成也有关系,所以,它会不会将文件接口变得过于复杂也是一件有争议的事情。

5 与Hadoop框架集成

无论文本亦或是二进制格式,都只是最终的储存格式。Hadoop运行时产生的中间数据却没有办法控制。包括一个MR Job在map和reduce之间产生的数据或者DAG Job上游reduce 和下游map之间的数据,尤其是中间格式并不是列格式,这会产生不必要的IO和CPU 开销。比如map 阶段产生的spill,reduce 阶段需要先copy 再sort-merge。如果这种中间格式也是面向列的,然后将一个大块切成若干小块,并在头部加上每个小块的最大最小值索引,就可以避免大量sort- mege操作中解压—反序列化—排序—合并(Merge)的开销,从而缩短任务的运行时间。

其他文件格式

Hadoop社区也曾有对其他文件格式的研究。比如,IBM 研究过面向列的数据格式并发表论文《Column-Oriented Storage Techniques for MapReduce》[4],其中特别提到IBM 的CIF(Column InputFormat)文件格式在序列化和反序列化的IO消耗上比RCFile 的消耗要小20倍。里面提到的将列分散在不同的HDFS Block 块上的实现方式RCFile 也有考虑过,但是最后因为重组行的消耗可能会因分散在远程机器上产生的延迟而最终放弃了这种实现。此外,最近Avro也在实现一种面向列的数据格式,不过 目前Hive 与Avro 集成尚未全部完成。有兴趣的读者可以关注avro-806 和hive-895。

总结

Hadoop 可以与各种系统兼容的前提是Hadoop MR 框架本身能够支持多种数据格式的读写。但如果要提升其性能,Hadoop 需要一种高效的面向列的基于整个MR 框架集成的数据格式。尤其是高效压缩,块重组(block shuffle),数据过滤(skip list)等高级功能,它们是列数据库相比MR 框架在文件格式上有优势的地方。相信随着社区的发展以及Hadoop 的逐步成熟,未来会有更高效且统一的数据格式出现。

参考资料

[1]压缩列顺序选择 http://lemire.me/en/ Reordering Columns for Smaller Indexes 论文地址

[2]Hive与Avro 集成 https://issues.apache.org/jira/browse/HIVE-895

[3]Google 的Tenzing 论文 http://research.google.com/pubs/DistributedSystemsandParallelComputing.html
Tenzing A SQL Implementation On The MapReduce Framework

[4]IBM Column-Oriented Storage Techniques for MapReduce http://pages.cs.wisc.edu/~jignesh/publ/colMR.pdf

[5]Integrating compression and execution in column-oriented database systems http://db.lcs.mit.edu/projects/cstore/abadisigmod06.pdf

[6]Avro 项目主页 http://avro.apache.org/

[7]MapReduce and Parallel DBMSs: Friends or Foes , repetitive record parsing 小节 http://cacm.acm.org/magazines/2010/1/55743-mapreduce-and-parallel-dbmss-friends-or-foes/fulltext

作者简介

江志伟,关注分析型MPP数据库和Hadoop,建有个人博客http://www.gemini5201314.net/ , 五月份Hadoop Definitive Guide 3rd 要出了,如果找到有兴趣合作翻译的朋友,可能会翻译这本经典书籍。

分享到:
评论

相关推荐

    大数据云计算技术 Hadoop应用浅析(共16页).pptx

    NameNode和SecondNameNode是Hadoop文件系统(HDFS)的关键组件,负责元数据管理和备份。JobTracker则负责任务调度和监控,DataNode存储数据并执行MapReduce任务。集群的硬件配置较高,如NameNode和SecondNameNode的...

    基于Hadoop的气象云储存与数据处理应用浅析.pdf

    基于Hadoop的气象云储存与数据处理应用浅析 本文主要介绍了Hadoop架构的构成,并对Hadoop架构的MapReduce实现进行了详细的描述。同时,本文还开发出一个在Hadoop架构的基础上进行气象数值统计的实例,并根据这个...

    Hadoop环境搭建与WordCount实例浅析.pdf

    【Hadoop环境搭建与WordCount实例浅析】 Hadoop是一个分布式计算框架,广泛应用于大数据处理。要搭建Hadoop环境并实现WordCount实例,你需要遵循以下步骤: 1. **环境准备**: - 首先,你需要一个Linux操作系统...

    (hadoop HDFS 和 Mapreduce 架构浅析

    HDFS的设计理念是将大文件分割成固定大小的数据块(block),默认大小为128MB(在Hadoop 2.x版本中),这些数据块分散存储在集群的不同节点上。Namenode和Datanode是HDFS的两个关键组件,其中Namenode负责管理文件...

    Hadoop+HDFS和MapReduce架构浅析

    ### Hadoop+HDFS和MapReduce架构浅析 #### 摘要 本文旨在深入剖析Hadoop中的两大核心组件——HDFS(Hadoop Distributed File System)和MapReduce的工作原理及其实现机制。首先,我们将介绍Hadoop NameNode与...

    大数据云计算技术 Hadoop应用浅析(共16页).rar

    本资料《大数据云计算技术 Hadoop应用浅析》旨在深入探讨Hadoop在大数据处理中的核心价值及其实际应用场景。 Hadoop是Apache软件基金会开发的一个开源框架,主要用于处理和存储海量数据。它基于分布式计算模型,即...

    亿赞普Hadoop应用浅析 共16页.pptx

    为了确保数据安全,NameNode的镜像备份通过定时任务和网络文件系统进行,以防止数据丢失。此外,通过设置机架感知,优化任务分配,避免集群负载不均衡,预防JobTracker可能出现的死锁问题。 总结来说,IZP的Hadoop...

    Hadoop应用系列2--MapReduce原理浅析(上)

    《Hadoop应用系列2--MapReduce原理浅析(上)》 MapReduce是Apache Hadoop框架的核心组件之一,主要用于处理和生成大规模数据集。本文将深入浅出地探讨MapReduce的工作原理,帮助读者理解其核心概念和流程。 一、...

    hadoop开发者第三期

    DBInputFormat是Hadoop自0.19.0版本开始引入的一种特殊输入格式,它允许Hadoop应用程序通过标准的JDBC接口直接与现有数据库系统进行交互。这为那些需要处理结构化数据(通常存储在关系型数据库中)与非结构化或半...

    《Hadoop开发者》第三期

    为了实现这一目标,Hadoop提供了一个特殊的输出格式`MultiFileOutputFormat`,它允许用户根据自定义的逻辑将MapReduce作业的结果输出到多个文件中。 #### 实现原理 `MultiFileOutputFormat`的核心思想是根据用户...

    深入浅析Java Object Serialization与 Hadoop 序列化

    深入浅析Java Object Serialization与 Hadoop 序列化 序列化是指将结构化对象转化为字节流以便在网络上传输或者写到磁盘永久存储的过程。Java 中的序列化是通过实现 Serializable 接口来实现的,而 Hadoop 序列化则...

    Hadoop生态圈的基石有两个,一个是HDFS文件系统,一个是MR编程框架:HDFS性能压测工具浅析

    Hadoop生态圈的基石有两个,一个是HDFS文件系统,一个是MR编程框架。第一弹中提到应用MR编程框架实现大规模多机联合负载压测场景的方案,则突出了MR的能力,实际上HDFS作为这一切的  引言  Hadoop生态圈的基石有两...

    HDFS存储系统浅析.doc

    Hadoop Distributed File System(HDFS)是Apache Hadoop框架的核心组件之一,是一个分布式文件系统,旨在处理和存储大量数据。HDFS的设计目标是提供高容错性和高吞吐量的数据访问,适用于运行在低成本硬件上的大...

    WordCount 源码浅析(1)

    《WordCount 源码浅析(1)》 在大数据处理领域,Hadoop 是一个不可或缺的名字,而 WordCount 是 Hadoop 的经典示例程序,它用于统计文本中单词出现的频率。这篇博客将对 WordCount 的源码进行初步解析,帮助初学者...

    HDFS性能压测工具浅析

    在Hadoop分布式文件系统(HDFS)的性能评估中,有几个经典的压测工具,如Terasort、Slive和DFSIO,它们对理解HDFS的工作原理和优化至关重要。这些工具不仅帮助开发者了解系统的吞吐率,还能揭示不同组件的性能瓶颈。 ...

    浅析人工智能的电子信息资源实时存储方法.pdf

    文章“浅析人工智能的电子信息资源实时存储方法”探讨了在信息时代背景下,如何通过人工智能技术优化电子信息资源的存储问题。随着互联网技术的发展,电子信息资源的存储需求不断增长,传统的存储方法面临着效率低下...

    Java编写Mapreduce程序过程浅析

    在大数据处理领域,Apache Hadoop的MapReduce框架是不可或缺的一部分,尤其对于Java开发者而言,学习如何用Java编写MapReduce程序是提升数据处理能力的关键。本文将深入浅出地解析Java MapReduce程序的编写过程,...

    浅析HDFS架构和设计

    Hadoop Distributed File System(HDFS)是Hadoop生态中的核心组件,专门设计用于处理大规模数据集的分布式文件系统。HDFS的核心理念是在多台独立的计算机上分布式存储和处理大数据,以克服单机存储和计算能力的限制...

Global site tag (gtag.js) - Google Analytics