`

探索Hadoop OutputFormat

 
阅读更多

转自:http://www.infoq.com/cn/articles/HadoopOutputFormat

 

Hadoop常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。从高层次角 度来看,整个过程就是Hadoop接收输入文件、使用自定义转换(Map-Reduce步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月InfoQ展示了 怎 样在第一个步骤中,使用InputFormat类来更好地对接收输入文件进行控制。而在本文中,我们将同大家一起探讨怎样自定义最后一个步骤——即怎样写 入输出文件。OutputFormat将Map/Reduce作业的输出结果转换为其他应用程序可读的方式,从而轻松实现与其他系统的互操作。为了展示 OutputFormts的实用性,我们将用两个例子进行讨论:如何拆分作业结果到不同目录以及如何为提供快速键值查找的服务写入文件。

OutputFormats是做什么的?

OutputFormt接口决定了在哪里以及怎样持久化作业结果。Hadoop为不同类型的格式提供了一系列的类和接口,实现自定义操作只要继承其 中的某个类或接口即可。你可能已经熟悉了默认的OutputFormat,也就是TextOutputFormat,它是一种以行分隔,包含制表符界定的 键值对的文本文件格式。尽管如此,对多数类型的数据而言,如再常见不过的数字,文本序列化会浪费一些空间,由此带来的结果是运行时间更长且资源消耗更多。 为了避免文本文件的弊端,Hadoop提供了SequenceFileOutputformat,它将对象表示成二进制形式而不再是文本文件,并将结果进 行压缩。下面是Hadoop提供的类层次结构:

  • FileOutputFormat(实现OutputFormat接口)—— 所有OutputFormats的基类
    • MapFileOutputFormat —— 一种使用部分索引键的格式
    • SequenceFileOutputFormat —— 二进制键值数据的压缩格式
      • SequenceFileAsBinaryOutputFormat —— 原生二进制数据的压缩格式
    • TextOutputFormat —— 以行分隔、包含制表符定界的键值对的文本文件格式
    • MultipleOutputFormat —— 使用键值对参数写入文件的抽象类
      • MultipleTextOutputFormat —— 输出多个以标准行分割、制表符定界格式的文件
      • MultipleSequenceFileOutputFormat —— 输出多个压缩格式的文件

OutputFormat提供了对RecordWriter的实现,从而指定如何序列化数据。 RecordWriter类可以处理包含单个键值对的作业,并将结果写入到OutputFormat中准备好的位置。RecordWriter的实现主要 包括两个函数:“write”和“close”。“write”函数从Map/Reduce作业中取出键值对,并将其字节写入磁盘。 LineRecordWriter是默认使用的RecordWriter,它是前面提到的TextOutputFormat的一部分。它写入的内容包括:

  • 键(key)的字节 (由getBytes()函数返回)
  • 一个用以定界的制表符
  • 值(value)的字节(同样由getBytes()函数返回)
  • 一个换行符

“close”函数会关闭Hadoop到输出文件的数据流。

我们已经讨论了输出数据的格式,下面我们关心的问题是数据存储在何处?同样,你或许看到过某个作业的输出结果会以多个“部分”文件的方式存储在输出目录中,如下:

|-- output-directory
| |-- part-00000
| |-- part-00001
| |-- part-00002
| |-- part-00003
| |-- part-00004
   '-- part-00005

默认情况下,当需要写入数据时,每个进程都会在输出目录创建自己的文件。数据由reducers在作业结束时写入(如果没有reducers会由 mapper写入)。即使在本文后面提到的创建自定义输出目录时,我们仍会保持写入“部分”文件,这么做可以让多个进程同时写入同一个目录而互不干扰。

自定义OutputFormat

从前面我们已经看到,OutputFormat类的主要职责是决定数据的存储位置以及写入的方式。那么为什么要自定义这些行为呢?自定义数据位置的 原因之一是为了将Map/Reduce作业输出分离到不同的目录。例如,假设需要处理一个包含世界范围内的搜索请求的日志文件,并希望计算出每个国家的搜 索频度。你想要在不牵涉其他国家的前提下能够查看某个特定国家的结果。也许以后在你的数据管道中,会用不同的进程来处理不同的国家,或者想要把某个特定国 家的结果复制一份到该国的数据中心去。使用默认的OutputFormat时,所有的数据都会存储在同一目录下,这样在不浏览的情况下是无从知晓“部分” 文件的内容的。而通过使用自定义的OutputFormat,你可以为每个国家创建一个子目录的布局,如下:

|-- output-directory
|   |-- France
|   |   |-- part-00000
|   |   |-- part-00001
|   |   '-- part-00002
... |
|   '-- Zimbabwe
|       |-- part-00000
|       |-- part-00001
|       '-- part-00002

其中每个部分文件都具有键值对(“搜索词汇”=>频度)。现在只要简单地指定某个国家数据所在的路径,就可以只读取该国家的数据了。下面我们将看到怎样继承MultipleTextOutputFormat类,以获得所需的行为。

自定义OutputFormat还有一些其他的原因,以名为ElephantDB 的 项目为例, 它将数据以一种面向消费应用程序的“本地”形式进行存储。这个项目的设立是为了让Map/Reduece作业结果可以像分布式服务一样被查询。 ElephantDB写入的并不是文本文件,而是使用自定义的OutputFormat将结果写成BerkeleyDB文件,其中这些文件使用作业输出的 键进行索引。之后使用某个服务加载BerkeleyDB文件,可以提供低延滞的任意键查找。类似的系统还有HBase和Voldemort,它们可以存储 Hadoop生成的键值数据。ElephantDB重点关注的是怎样与Hadoop批量式更新进行简易紧密的集成。

多路输出

为了解决上面的搜索日志的问题,我们继承了MultipleTextOutputFormat类,并根据被写入的键值来选择输出目录。我们的 Map/Reduce作业将会为搜索请求所在国家生成一个键,并为搜索词汇及该搜索的频度产生一个值。由于 MultipleTextOutputFormat已经知道如何写入文本文件,因此并不需要为OutputFormat实现序列化功能。清单1实现了该 类:

1 package oddjob.hadoop;
2
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
6
7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> {
8
9        /**
10        * Use they key as part of the path for the final output file.
11        */
12       @Override
13       protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
14             return new Path(key.toString(), leaf).toString();
15       }
16
17       /**
18        * When actually writing the data, discard the key since it is already in
19        * the file path.
20        */
21       @Override
22       protected Text generateActualKey(Text key, Text value) {
23             return null;
24          }
25 }

清单1:MultipleTextOutputFormat子类样例

MultipleTextOutputFormatByKey类的generateActualFileNameForKeyValue方法指定了 作业输出的存储位置(第13行)。对于每组由Map/Reduce作业生成的键值对,该类会把键加入到路径名称中作为输出。“leaf”参数就是我们之前 看到的“part-0000”,它在每个reducer中都是独一无二的,这样可以允许不同进程同时写入到输出目录而互不影响。例如,由第一个 reducer产生的键为“France”、值为“soccer 5000”的结果会被写入到“output-directory/France/part-00000”内的某个文件中。

要使用这个类,需确保Hadoop包含了这个自定义类的jar,并使用完整的类名作为“-outputformat”的参数:

hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \
  -outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \
  -input search-logs \
  -output search-frequency-by-country \
  -mapper parse-logs.py \
  -reducer count-searches.py 

清单1是oddjob 项目中某个类的Java实现。oddjob是一个开源库,提供了多种MultipleTextOutputFormat。虽然这个库面向的是Hadoop的流特性,但是它也可以用在产生文本键值输出的其他作业中。

为服务准备输出

在我们的下一个例子中,必须实现两个接口来自定义数据序列化以及文件存放的目录结构,以使结果可被ElephantDB服务加载。正如前面所讨论 的,序列化部分会由RecordWriter的实现来处理。在LineRecordWriter类将字节流写入输出文件的同 时,ElephantRecordWriter还包含了专门的逻辑用来选择要写入的文件以及使用第三方库来格式化磁盘上的数据。

1   public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> {
2
3       FileSystem _fs;
4       Args _args;
5       Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>();
6       Progressable _progressable;
7       LocalElephantManager _localManager;
8
9       int _numWritten = 0;
10      long _lastCheckpoint = System.currentTimeMillis();
11
12      public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException {
13         _fs = Utils.getFS(args.outputDirHdfs, conf);
14         _args = args;
15         _progressable = progressable;
16         _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf));
17      }
18
19      private String remoteUpdateDirForShard(int shard) {
20          if(_args.updateDirHdfs==null) return null;
21          else return _args.updateDirHdfs + "/" + shard;
22      }
23
24      public void write(IntWritable shard, ElephantRecordWritable record) throws IOException {
25          LocalPersistence lp = null;
26          LocalPersistenceFactory fact = _args.spec.getLPFactory();
27          Map<String, Object> options = _args.persistenceOptions;
28          if(_lps.containsKey(shard.get())) {
29             lp = _lps.get(shard.get());
30          } else {
31             String updateDir = remoteUpdateDirForShard(shard.get());
32             String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir);
33             lp = fact.openPersistenceForAppend(localShard, options);
34             _lps.put(shard.get(), lp);
35             progress();
36          }
37
38          _args.updater.updateElephant(lp, record.key, record.val);
39
40          _numWritten++;
41          if(_numWritten % 25000 == 0) {
42             long now = System.currentTimeMillis();
43             long delta = now - _lastCheckpoint;
44             _lastCheckpoint = now;
45             LOG.info("Wrote last 25000 records in " + delta + " ms");
46             _localManager.progress();
47          }
48      }
49
50      public void close(Reporter reporter) throws IOException {
51          for(Integer shard: _lps.keySet()) {
52             String lpDir = _localManager.localTmpDir("" + shard);
53             LOG.info("Closing LP for shard " + shard + " at " + lpDir);
54             _lps.get(shard).close();
55             LOG.info("Closed LP for shard " + shard + " at " + lpDir);
56             progress();
57             String remoteDir = _args.outputDirHdfs + "/" + shard;
58             if(_fs.exists(new Path(remoteDir))) {
59                 LOG.info("Deleting existing shard " + shard + " at " + remoteDir);
60                 _fs.delete(new Path(remoteDir), true);
61                 LOG.info("Deleted existing shard " + shard + " at " + remoteDir);
62             }
63             LOG.info("Copying " + lpDir + " to " + remoteDir);
64             _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir));
65             LOG.info("Copied " + lpDir + " to " + remoteDir);
66             progress();
67          }
68          _localManager.cleanup();
69      }
70
71      private void progress() {
72           if(_progressable!=null) _progressable.progress();
73      }
74   }

清单2:从ElephantDB中摘录的某个RecordWriter子类

ElephantDB的工作方式是通过跨越若干个LocalPersistence对象(BerkeleyDB文件)来对数据进行分片(划分)。 ElephantRecordWriter类中的write函数拿到分片ID,并检查该分片是否已经打开(第28行),如果没有则打开并创建一个新的本地 文件(第33行)。第38行的updateElephant调用将作业输出的键值对写入到BerkeleyDB文件。

当关闭ElephantRecordWriter时,该类在第64行会复制BerkeleyDB文件到HDFS中,且可以随意选择是否覆盖旧文件。 接下去的progress方法调用会通知Hadoop当前的RecordWriter正在按计划进行,这有点类似于真实Map/Reduce作业中的状态 或计数器更新。

下一步是利用ElephantRecordWriter来实现OutputFormat。要理解此清单中的代码,重点是了解Hadoop JobConf对象封装了什么。顾名思义,JobConf对象包含了某项作业的全部设置,包括输入输出目录,作业名称以及mapper和reducer 类。清单3展示了两个自定义类是如何共同工作的:

1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
2     public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
3
4     public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException {
5         return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable);
6     }
7
8     public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException {
9         Args args = (Args) Utils.getObject(conf, ARGS_CONF);
10         fs = Utils.getFS(args.outputDirHdfs, conf);
11         if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
12             throw new InvalidJobConfException("Speculative execution should be false");
13         }
14         if(fs.exists(new Path(args.outputDirHdfs))) {
15             throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
16         }
17         if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) {
18             throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs);
19         }
20     }
21   }

清单3:从ElephantDB中摘录的某个OutputFormat实现

正如前面所看到的,OutputFormat有两个职责,分别是决定数据的存储位置以及数据写入的方式。ElephantOutputFormat 的数据存储位置是通过检查JobConf以及在第14和17行检查确保该位置是一个合法目标位置后来决定的。至于数据的写入方式,则是由 getRecordWriter函数处理,它的返回结果是清单2中的ElephantRecordWriter对象。

从Hadoop的角度来看,当Map/Reduce作业结束并且每个reducer产生了键值对流的时候,这些类会派上用场。Hadoop会以作业 配置为参数调用checkOutputSpecs。如果函数运行没有抛出异常,它会接下去调用getRecordWriter以返回可以写入流数据的对 象。当所有的键值对都被写入后,Hadoop会调用writer中的close函数,将数据提交到HDFS并结束该reducer的职责。

总结

OutputFormat是Hadoop框架中的重要组成部分。它们通过为目标消费应用程序产生合适的输出来提供与其他系统和服务间的互操作。自定 义作业输出位置可以简化并加速数据工作流;而自定义结果输出方式可以让其快速地工作于其他不同的环境下。虽然实现OutputFormat和覆写几个方法 一样简单,但是它足够灵活可以支持全新的磁盘上的数据格式。

分享到:
评论

相关推荐

    免费的防止锁屏小软件,可用于域统一管控下的锁屏机制

    免费的防止锁屏小软件,可用于域统一管控下的锁屏机制

    Python代码实现带装饰的圣诞树控制台输出

    内容概要:本文介绍了一段简单的Python代码,用于在控制台中输出一棵带有装饰的圣诞树。具体介绍了代码结构与逻辑,包括如何计算并输出树形的各层,如何加入装饰元素以及打印树干。还提供了示例装饰字典,允许用户自定义圣诞树装饰位置。 适用人群:所有对Python编程有一定了解的程序员,尤其是想要学习控制台图形输出的开发者。 使用场景及目标:适用于想要掌握如何使用Python代码创建控制台艺术,特别是对于想要增加节日氛围的小项目。目标是帮助开发者理解和实现基本的字符串操作与格式化技巧,同时享受创造乐趣。 其他说明:本示例不仅有助于初学者理解基本的字符串处理和循环机制,而且还能激发学习者的编程兴趣,通过调整装饰物的位置和树的大小,可以让输出更加个性化和丰富。

    白色大气风格的设计师作品模板下载.zip

    白色大气风格的设计师作品模板下载.zip

    电商平台开发需求文档.doc

    电商平台开发需求文档.doc

    白色简洁风格的办公室室内设计门户网站模板下载.zip

    白色简洁风格的办公室室内设计门户网站模板下载.zip

    VB+access干部档案管理系统(源代码+系统)(20246t).7z

    1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于计算机科学与技术等相关专业,更为适合;

    VB+ACCESS服装专卖店管理系统设计(源代码+系统+开题报告+答辩PPT)(2024ra).7z

    1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于计算机科学与技术等相关专业,更为适合;

    (179065812)基于Android stduio的手机银行开发与设计-用于课程设计

    课程设计---基于Android stduio的手机银行开发与设计 现今,手机已经成为人们生活和工作的必备品,在手机各种系统中Android系统是人们用的比较多的系统。手机银行也是人们在生活中比较常用的功能之一。本项目基于Android的手机银行开发与设计主要功能有登录注册、转账、转账记录查询、修改及查询个人信息、添加好友、向好友转账的功能。本项目主要用Android Studio 开发,数据库SQLite数据库,和夜神模拟器。 基于Android stduio的手机银行开发与设计项目主要功能有登录注册、转账、转账记录查询、修改及查询个人信息、添加好友、向好友转账的功能。。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。

    白色大气风格的婚礼现场倒计时模板下载.zip

    白色大气风格的婚礼现场倒计时模板下载.zip

    轮式移动机器人轨迹跟踪的MATHLAB程序,运用运动学和动力学模型的双闭环控制,借鉴自抗扰控制技术结合了非线性ESO,跟踪效果良好,控制和抗扰效果较优,可分享控制结构图 这段程序主要是一个小车的动力

    轮式移动机器人轨迹跟踪的MATHLAB程序,运用运动学和动力学模型的双闭环控制,借鉴自抗扰控制技术结合了非线性ESO,跟踪效果良好,控制和抗扰效果较优,可分享控制结构图。 这段程序主要是一个小车的动力学仿真程序,用于模拟小车在参考轨迹下的运动。下面我将对程序进行详细的分析解释。 首先,程序开始时使用`clear`、`clc`和`close all`命令来清除工作空间、命令窗口和图形窗口中的内容。 接下来,程序定义了一系列参数和变量,用于设置仿真的参数和存储仿真过程中的数据。这些参数包括小车的质量、车宽、驱动轮半径等,还有参考轨迹的振幅和频率,仿真步长,仿真时间等。 然后,程序定义了一些元胞数组,用于存储不同阶段的数据。这些数组包括参考轨迹位姿、真实运动轨迹位姿、参考轨迹一阶导数、参考轨迹速度、期望速度、真实速度、控制器输出的控制力矩、控制输入、期望速度与真实速度误差、摩擦值、外界扰动值、总扰动、位姿跟踪误差、扰动观测值等。 接下来,程序给这些变量赋初始值,包括小车的初始位姿和速度,初始速度,期望初始速度,控制器输出的控制力矩,扰动观测值等。 然后,程序进入一个循环,仿真时间从

    vb+ACCESS学生档案管理系统(论文+源代码)(2024ql).7z

    1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于计算机科学与技术等相关专业,更为适合;

    数据分析-31-疫情数据分析(包含代码和数据)

    这是一份来自开源的全球新冠肺炎数据集,每日时间序列汇总,包括确诊、死亡和治愈。所有数据来自每日病例报告。数据持续更新中。 由于数据集中没有美国的治愈数据,所以在统计全球的现有确诊人员和治愈率的时候会有很大误差,代码里面先不做这个处理,期待数据集的完善。

    白色大气风格的时装设计公司模板下载.zip

    白色大气风格的时装设计公司模板下载.zip

    白色大气风格的商务会议活动模板下载.rar

    白色大气风格的商务会议活动模板下载.rar

    vb+access工资管理系统(论文+程序+开题报告+外文翻译+答辩PPT)(2024k3).7z

    1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于计算机科学与技术等相关专业,更为适合;

    基于微信小程序的学生签到系统设计与实现ssm.zip

    本次开发一套基于微信小程序的生签到系统,有管理员,教师,学生三个角色。管理员功能有个人中心,学生管理,教师管理,签到管理,学生签到管理,班课信息管理,加入班课管理,请假信息管理,审批信息管理,销假信息管理,系统管理。教师和学生都可以在微信端注册和登录,教师可以管理签到信息,管理班课信息,审批请假信息,查看学生签到,查看加入班级,查看审批信息和销假信息。学生可以查看教师发布的学生签到信息,可以自己选择加入班课信息,添加请假信息,查看审批信息,进行销假操作。基于微信小程序的生签到系统服务端用Java开发的网站后台,接收并且处理微信小程序端传入的json数据,数据库用到了MySQL数据库作为数据的存储。

    技术资源分享-我的运维人生-《新年的奇妙团聚与希望之旅》

    **脚本描述**:本脚本围绕着新年这个充满欢乐与希望的时刻展开。故事发生在一个热闹的小镇,主要角色有在外打拼多年的年轻人小李,他的父母,以及一群充满活力的小镇居民。新年将至,小李踏上回家的旅途,满心期待与家人团聚。在小镇上,大家都在积极筹备新年,贴春联、挂灯笼、准备年夜饭。小李与家人重逢后,一起分享着彼此的故事和喜悦。同时,他们也和小镇居民一起举办了热闹的庆祝活动,在欢声笑语中迎接新年的到来。这个新年不仅让小李重新感受到了家的温暖,也让他对未来充满了信心和希望,他决定和小镇一起成长发展。通过这个脚本,展现新年带给人们的幸福、温暖和对未来的憧憬。

    Python 自动办公- Python分类汇总278张Excel表中的数据 Python源码

    Python 自动办公- Python分类汇总278张Excel表中的数据

    白色创意风格的用户信息登记源码下载.zip

    白色创意风格的用户信息登记源码下载.zip

    白色大气的音乐专辑博客整站网站模板下载.zip

    白色大气的音乐专辑博客整站网站模板下载.zip

Global site tag (gtag.js) - Google Analytics