`

探索Hadoop OutputFormat

 
阅读更多

转自:http://www.infoq.com/cn/articles/HadoopOutputFormat

 

Hadoop常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。从高层次角 度来看,整个过程就是Hadoop接收输入文件、使用自定义转换(Map-Reduce步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月InfoQ展示了 怎 样在第一个步骤中,使用InputFormat类来更好地对接收输入文件进行控制。而在本文中,我们将同大家一起探讨怎样自定义最后一个步骤——即怎样写 入输出文件。OutputFormat将Map/Reduce作业的输出结果转换为其他应用程序可读的方式,从而轻松实现与其他系统的互操作。为了展示 OutputFormts的实用性,我们将用两个例子进行讨论:如何拆分作业结果到不同目录以及如何为提供快速键值查找的服务写入文件。

OutputFormats是做什么的?

OutputFormt接口决定了在哪里以及怎样持久化作业结果。Hadoop为不同类型的格式提供了一系列的类和接口,实现自定义操作只要继承其 中的某个类或接口即可。你可能已经熟悉了默认的OutputFormat,也就是TextOutputFormat,它是一种以行分隔,包含制表符界定的 键值对的文本文件格式。尽管如此,对多数类型的数据而言,如再常见不过的数字,文本序列化会浪费一些空间,由此带来的结果是运行时间更长且资源消耗更多。 为了避免文本文件的弊端,Hadoop提供了SequenceFileOutputformat,它将对象表示成二进制形式而不再是文本文件,并将结果进 行压缩。下面是Hadoop提供的类层次结构:

  • FileOutputFormat(实现OutputFormat接口)—— 所有OutputFormats的基类
    • MapFileOutputFormat —— 一种使用部分索引键的格式
    • SequenceFileOutputFormat —— 二进制键值数据的压缩格式
      • SequenceFileAsBinaryOutputFormat —— 原生二进制数据的压缩格式
    • TextOutputFormat —— 以行分隔、包含制表符定界的键值对的文本文件格式
    • MultipleOutputFormat —— 使用键值对参数写入文件的抽象类
      • MultipleTextOutputFormat —— 输出多个以标准行分割、制表符定界格式的文件
      • MultipleSequenceFileOutputFormat —— 输出多个压缩格式的文件

OutputFormat提供了对RecordWriter的实现,从而指定如何序列化数据。 RecordWriter类可以处理包含单个键值对的作业,并将结果写入到OutputFormat中准备好的位置。RecordWriter的实现主要 包括两个函数:“write”和“close”。“write”函数从Map/Reduce作业中取出键值对,并将其字节写入磁盘。 LineRecordWriter是默认使用的RecordWriter,它是前面提到的TextOutputFormat的一部分。它写入的内容包括:

  • 键(key)的字节 (由getBytes()函数返回)
  • 一个用以定界的制表符
  • 值(value)的字节(同样由getBytes()函数返回)
  • 一个换行符

“close”函数会关闭Hadoop到输出文件的数据流。

我们已经讨论了输出数据的格式,下面我们关心的问题是数据存储在何处?同样,你或许看到过某个作业的输出结果会以多个“部分”文件的方式存储在输出目录中,如下:

|-- output-directory
| |-- part-00000
| |-- part-00001
| |-- part-00002
| |-- part-00003
| |-- part-00004
   '-- part-00005

默认情况下,当需要写入数据时,每个进程都会在输出目录创建自己的文件。数据由reducers在作业结束时写入(如果没有reducers会由 mapper写入)。即使在本文后面提到的创建自定义输出目录时,我们仍会保持写入“部分”文件,这么做可以让多个进程同时写入同一个目录而互不干扰。

自定义OutputFormat

从前面我们已经看到,OutputFormat类的主要职责是决定数据的存储位置以及写入的方式。那么为什么要自定义这些行为呢?自定义数据位置的 原因之一是为了将Map/Reduce作业输出分离到不同的目录。例如,假设需要处理一个包含世界范围内的搜索请求的日志文件,并希望计算出每个国家的搜 索频度。你想要在不牵涉其他国家的前提下能够查看某个特定国家的结果。也许以后在你的数据管道中,会用不同的进程来处理不同的国家,或者想要把某个特定国 家的结果复制一份到该国的数据中心去。使用默认的OutputFormat时,所有的数据都会存储在同一目录下,这样在不浏览的情况下是无从知晓“部分” 文件的内容的。而通过使用自定义的OutputFormat,你可以为每个国家创建一个子目录的布局,如下:

|-- output-directory
|   |-- France
|   |   |-- part-00000
|   |   |-- part-00001
|   |   '-- part-00002
... |
|   '-- Zimbabwe
|       |-- part-00000
|       |-- part-00001
|       '-- part-00002

其中每个部分文件都具有键值对(“搜索词汇”=>频度)。现在只要简单地指定某个国家数据所在的路径,就可以只读取该国家的数据了。下面我们将看到怎样继承MultipleTextOutputFormat类,以获得所需的行为。

自定义OutputFormat还有一些其他的原因,以名为ElephantDB 的 项目为例, 它将数据以一种面向消费应用程序的“本地”形式进行存储。这个项目的设立是为了让Map/Reduece作业结果可以像分布式服务一样被查询。 ElephantDB写入的并不是文本文件,而是使用自定义的OutputFormat将结果写成BerkeleyDB文件,其中这些文件使用作业输出的 键进行索引。之后使用某个服务加载BerkeleyDB文件,可以提供低延滞的任意键查找。类似的系统还有HBase和Voldemort,它们可以存储 Hadoop生成的键值数据。ElephantDB重点关注的是怎样与Hadoop批量式更新进行简易紧密的集成。

多路输出

为了解决上面的搜索日志的问题,我们继承了MultipleTextOutputFormat类,并根据被写入的键值来选择输出目录。我们的 Map/Reduce作业将会为搜索请求所在国家生成一个键,并为搜索词汇及该搜索的频度产生一个值。由于 MultipleTextOutputFormat已经知道如何写入文本文件,因此并不需要为OutputFormat实现序列化功能。清单1实现了该 类:

1 package oddjob.hadoop;
2
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
6
7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> {
8
9        /**
10        * Use they key as part of the path for the final output file.
11        */
12       @Override
13       protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
14             return new Path(key.toString(), leaf).toString();
15       }
16
17       /**
18        * When actually writing the data, discard the key since it is already in
19        * the file path.
20        */
21       @Override
22       protected Text generateActualKey(Text key, Text value) {
23             return null;
24          }
25 }

清单1:MultipleTextOutputFormat子类样例

MultipleTextOutputFormatByKey类的generateActualFileNameForKeyValue方法指定了 作业输出的存储位置(第13行)。对于每组由Map/Reduce作业生成的键值对,该类会把键加入到路径名称中作为输出。“leaf”参数就是我们之前 看到的“part-0000”,它在每个reducer中都是独一无二的,这样可以允许不同进程同时写入到输出目录而互不影响。例如,由第一个 reducer产生的键为“France”、值为“soccer 5000”的结果会被写入到“output-directory/France/part-00000”内的某个文件中。

要使用这个类,需确保Hadoop包含了这个自定义类的jar,并使用完整的类名作为“-outputformat”的参数:

hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \
  -outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \
  -input search-logs \
  -output search-frequency-by-country \
  -mapper parse-logs.py \
  -reducer count-searches.py 

清单1是oddjob 项目中某个类的Java实现。oddjob是一个开源库,提供了多种MultipleTextOutputFormat。虽然这个库面向的是Hadoop的流特性,但是它也可以用在产生文本键值输出的其他作业中。

为服务准备输出

在我们的下一个例子中,必须实现两个接口来自定义数据序列化以及文件存放的目录结构,以使结果可被ElephantDB服务加载。正如前面所讨论 的,序列化部分会由RecordWriter的实现来处理。在LineRecordWriter类将字节流写入输出文件的同 时,ElephantRecordWriter还包含了专门的逻辑用来选择要写入的文件以及使用第三方库来格式化磁盘上的数据。

1   public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> {
2
3       FileSystem _fs;
4       Args _args;
5       Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>();
6       Progressable _progressable;
7       LocalElephantManager _localManager;
8
9       int _numWritten = 0;
10      long _lastCheckpoint = System.currentTimeMillis();
11
12      public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException {
13         _fs = Utils.getFS(args.outputDirHdfs, conf);
14         _args = args;
15         _progressable = progressable;
16         _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf));
17      }
18
19      private String remoteUpdateDirForShard(int shard) {
20          if(_args.updateDirHdfs==null) return null;
21          else return _args.updateDirHdfs + "/" + shard;
22      }
23
24      public void write(IntWritable shard, ElephantRecordWritable record) throws IOException {
25          LocalPersistence lp = null;
26          LocalPersistenceFactory fact = _args.spec.getLPFactory();
27          Map<String, Object> options = _args.persistenceOptions;
28          if(_lps.containsKey(shard.get())) {
29             lp = _lps.get(shard.get());
30          } else {
31             String updateDir = remoteUpdateDirForShard(shard.get());
32             String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir);
33             lp = fact.openPersistenceForAppend(localShard, options);
34             _lps.put(shard.get(), lp);
35             progress();
36          }
37
38          _args.updater.updateElephant(lp, record.key, record.val);
39
40          _numWritten++;
41          if(_numWritten % 25000 == 0) {
42             long now = System.currentTimeMillis();
43             long delta = now - _lastCheckpoint;
44             _lastCheckpoint = now;
45             LOG.info("Wrote last 25000 records in " + delta + " ms");
46             _localManager.progress();
47          }
48      }
49
50      public void close(Reporter reporter) throws IOException {
51          for(Integer shard: _lps.keySet()) {
52             String lpDir = _localManager.localTmpDir("" + shard);
53             LOG.info("Closing LP for shard " + shard + " at " + lpDir);
54             _lps.get(shard).close();
55             LOG.info("Closed LP for shard " + shard + " at " + lpDir);
56             progress();
57             String remoteDir = _args.outputDirHdfs + "/" + shard;
58             if(_fs.exists(new Path(remoteDir))) {
59                 LOG.info("Deleting existing shard " + shard + " at " + remoteDir);
60                 _fs.delete(new Path(remoteDir), true);
61                 LOG.info("Deleted existing shard " + shard + " at " + remoteDir);
62             }
63             LOG.info("Copying " + lpDir + " to " + remoteDir);
64             _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir));
65             LOG.info("Copied " + lpDir + " to " + remoteDir);
66             progress();
67          }
68          _localManager.cleanup();
69      }
70
71      private void progress() {
72           if(_progressable!=null) _progressable.progress();
73      }
74   }

清单2:从ElephantDB中摘录的某个RecordWriter子类

ElephantDB的工作方式是通过跨越若干个LocalPersistence对象(BerkeleyDB文件)来对数据进行分片(划分)。 ElephantRecordWriter类中的write函数拿到分片ID,并检查该分片是否已经打开(第28行),如果没有则打开并创建一个新的本地 文件(第33行)。第38行的updateElephant调用将作业输出的键值对写入到BerkeleyDB文件。

当关闭ElephantRecordWriter时,该类在第64行会复制BerkeleyDB文件到HDFS中,且可以随意选择是否覆盖旧文件。 接下去的progress方法调用会通知Hadoop当前的RecordWriter正在按计划进行,这有点类似于真实Map/Reduce作业中的状态 或计数器更新。

下一步是利用ElephantRecordWriter来实现OutputFormat。要理解此清单中的代码,重点是了解Hadoop JobConf对象封装了什么。顾名思义,JobConf对象包含了某项作业的全部设置,包括输入输出目录,作业名称以及mapper和reducer 类。清单3展示了两个自定义类是如何共同工作的:

1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
2     public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
3
4     public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException {
5         return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable);
6     }
7
8     public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException {
9         Args args = (Args) Utils.getObject(conf, ARGS_CONF);
10         fs = Utils.getFS(args.outputDirHdfs, conf);
11         if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
12             throw new InvalidJobConfException("Speculative execution should be false");
13         }
14         if(fs.exists(new Path(args.outputDirHdfs))) {
15             throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
16         }
17         if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) {
18             throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs);
19         }
20     }
21   }

清单3:从ElephantDB中摘录的某个OutputFormat实现

正如前面所看到的,OutputFormat有两个职责,分别是决定数据的存储位置以及数据写入的方式。ElephantOutputFormat 的数据存储位置是通过检查JobConf以及在第14和17行检查确保该位置是一个合法目标位置后来决定的。至于数据的写入方式,则是由 getRecordWriter函数处理,它的返回结果是清单2中的ElephantRecordWriter对象。

从Hadoop的角度来看,当Map/Reduce作业结束并且每个reducer产生了键值对流的时候,这些类会派上用场。Hadoop会以作业 配置为参数调用checkOutputSpecs。如果函数运行没有抛出异常,它会接下去调用getRecordWriter以返回可以写入流数据的对 象。当所有的键值对都被写入后,Hadoop会调用writer中的close函数,将数据提交到HDFS并结束该reducer的职责。

总结

OutputFormat是Hadoop框架中的重要组成部分。它们通过为目标消费应用程序产生合适的输出来提供与其他系统和服务间的互操作。自定 义作业输出位置可以简化并加速数据工作流;而自定义结果输出方式可以让其快速地工作于其他不同的环境下。虽然实现OutputFormat和覆写几个方法 一样简单,但是它足够灵活可以支持全新的磁盘上的数据格式。

分享到:
评论

相关推荐

    探索HadoopOutputFormat

    Hadoop常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。...OutputFormat将Map/Reduce作业的输出结果转换为其他应用程序可读的方式,从而

    hadoop2.7.3源码包,hadoop2.7.3zip源码包

    Hadoop是大数据处理领域中的一个核心框架,主要由Apache软件基金会开发。Hadoop 2.7.3是其一个稳定...通过这个源码包,你可以探索Hadoop如何处理大数据,学习分布式系统的设计原则,以及如何利用Java实现这样的系统。

    Hadoop-0.20.1+API

    8. **Hadoop扩展性**:探索Hadoop生态系统中的其他项目,如Pig、Hive、HBase、Zookeeper等,以及它们与Hadoop的集成方式。 9. **Hadoop工具和命令**:熟悉Hadoop提供的命令行工具,如`hadoop fs`命令,`hadoop ...

    实战hadoop中的源码

    7. **扩展性与插件开发**:学习如何为Hadoop开发自定义InputFormat、OutputFormat、Partitioner、Combiner等组件。 8. **实战项目**:结合实际案例,运用所学知识解决大数据处理问题,如日志分析、推荐系统等。 ...

    Hadoop源码分析 第一章 Hadoop脚本

    总之,Hadoop脚本的分析不仅是对工具的掌握,更是对大数据处理思想的探索。只有深入理解Hadoop的每一个细节,才能更好地利用这一强大工具解决实际问题。通过不断学习和实践,你将成为驾驭Hadoop的专家。

    Hadoop实现大矩阵乘法

    在大数据处理领域,Hadoop是一个不可或缺的开源框架...通过分析和运行提供的代码,学习者不仅可以掌握大矩阵乘法的分布式实现,还能深入理解Hadoop的工作原理和编程技巧,对于进一步探索大数据处理领域具有很高的价值。

    hadoop-2.7.6-src

    《Hadoop 2.7.6源码解析与探索》 Hadoop,作为大数据处理领域的重要框架,一直以来都是开发者和研究者关注的焦点。本文将深入探讨Hadoop 2.7.6版本的源码,带领读者理解其内部机制,为理解和使用Hadoop提供更深入的...

    Hadoop高级编程- 构建与实现大数据解决方案

    在大数据处理领域,Hadoop是不可或缺的核心技术之一。作为一个开源框架,Hadoop为海量数据的存储、处理和分析提供了高效且可扩展的解决方案。...在实践中不断探索和优化,你将成为一名真正的Hadoop专家。

    hadoop中文文档

    Hadoop是一款开源的大数据处理框架,由Apache基金会开发,它主要设计用于处理和存储海量数据。...尽管0.18版本可能不包含最新的特性,但它仍能提供一个坚实的基础,以便进一步探索更新的Hadoop版本及其生态系统。

    Hadoop入门手册

    7. **数据输入与输出**:理解Hadoop如何处理不同类型的数据源,如文本文件、CSV、JSON等,并学习使用InputFormat和OutputFormat自定义数据格式。 8. **Hadoop应用实例**:通过具体的案例,如网页日志分析、推荐系统...

    hadoop2.7.3的源码包

    6. **扩展与定制**:理解源码后,可以对Hadoop进行各种定制,例如开发自定义InputFormat、OutputFormat、Partitioner、Comparator等,以适应特定的数据处理需求。 7. **性能优化**:通过阅读源码,可以发现可能的...

    Hadoop源代码分析(IDs类和Context类)

    ### Hadoop源代码分析:IDs类与Context类详解 #### 一、引言 Hadoop作为分布式计算领域的重要工具之一,其内部结构复杂且功能...在未来的研究和实践中,这些知识点将成为进一步探索Hadoop高级特性和优化方案的基础。

    Hadoop: The Definitive Guide 中英两版

    4. **Hadoop生态系统的扩展**:探索YARN如何取代JobTracker,提供更强大的资源管理;HBase作为分布式数据库如何与Hadoop集成;以及Pig和Hive如何提供高级查询语言,简化数据分析。 5. **数据输入与输出**:了解多种...

    《Hadoop开发者》

    - Sqoop与Flume:探索数据导入导出工具Sqoop,以及日志收集工具Flume,了解如何在Hadoop与关系型数据库之间进行数据迁移。 - Oozie与Zookeeper:了解工作流管理系统Oozie和协调服务Zookeeper,以及它们在Hadoop...

    Hadoop大数据开发教程笔记软件.zip

    在大数据处理领域,Hadoop是一个不可或缺的关键技术。本教程旨在为初学者提供一套全面的Hadoop大数据开发指导,通过...在学习过程中,理论与实践相结合,不断探索和理解Hadoop的内在机制,将有助于提升大数据处理能力。

    hadoop source code

    4. 探索Hadoop的容错机制,如数据块的复制策略和故障恢复过程。 5. 实践编写MapReduce作业,通过阅读源代码来优化性能和解决实际问题。 通过以上分析,我们可以看出,Hadoop源代码是一个庞大的知识库,涵盖了分布式...

    hadoop源码

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心...总之,"hadoop源码"的深入研究将为你打开一扇通向大数据处理世界的大门,无论是对Hadoop本身的理解,还是对整个大数据生态的探索,都将受益匪浅。

    hadoop权威指南第二版源代码

    7. **Hadoop的扩展性**:源代码中会有关于如何自定义InputFormat、OutputFormat、RecordReader、RecordWriter等内容,这让你能够根据需求扩展Hadoop以适应各种数据格式和处理需求。 8. **配置与优化**:书中源码还...

    hadoop-2.2.0-sources:Hadoop 2.2.0源代码(用于Eclipse IDE的Java项目)-java project source code

    3. **扩展和定制**: 开发者可以根据需求对Hadoop进行扩展,例如实现自定义InputFormat、OutputFormat、Partitioner、Mapper和Reducer等。 4. **贡献代码**: 对于开源爱好者,可以参与Hadoop社区,根据源代码修改和...

Global site tag (gtag.js) - Google Analytics