- 浏览: 98419 次
- 性别:
- 来自: 深圳
文章分类
最新评论
在上一篇文章:“用 Hadoop 进行分布式并行编程 第一部分 基本概念与安装部署”中,介绍了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, 并且详细介绍了如何安装 Hadoop,如何运行基于 Hadoop 的并行程序。在本文中,将针对一个具体的计算任务,介绍如何基于 Hadoop 编写并行程序,如何使用 IBM 开发的 Hadoop Eclipse plugin 在 Eclipse 环境中编译并运行程序。
我们先来看看 Hadoop 自带的示例程序 WordCount,这个程序用于统计一批文本文件中单词出现的频率,完整的代码可在下载的 Hadoop 安装包中得到(在 src/examples 目录中)。
见代码清单1。这个类实现 Mapper 接口中的 map 方法,输入参数中的 value 是文本文件中的一行,利用 StringTokenizer 将这个字符串拆成单词,然后将输出结果 <单词,1> 写入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 负责收集 Mapper 和 Reducer 的输出数据,实现 map 函数和 reduce 函数时,只需要简单地将其输出的 <key,value> 对往 OutputCollector 中一丢即可,剩余的事框架自会帮你处理好。
代码中 LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为 long, int, String 的替代品。Reporter 则可用于报告整个应用的运行进度,本例中未使用。
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } |
见代码清单 2。这个类实现 Reducer 接口中的 reduce 方法, 输入参数中的 key, values 是由 Map 任务输出的中间结果,values 是一个 Iterator, 遍历这个 Iterator, 就可以得到属于同一个 key 的所有 value. 此处,key 是一个单词,value 是词频。只需要将所有的 value 相加,就可以得到这个单词的总的出现次数。
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } |
在 Hadoop 中一次计算任务称之为一个 job, 可以通过一个 JobConf 对象设置如何运行这个 job。此处定义了输出的 key 的类型是 Text, value 的类型是 IntWritable, 指定使用代码清单1中实现的 MapClass 作为 Mapper 类, 使用代码清单2中实现的 Reduce 作为 Reducer 类和 Combiner 类, 任务的输入路径和输出路径由命令行参数指定,这样 job 运行时会处理输入路径下的所有文件,并将计算结果写到输出路径下。
然后将 JobConf 对象作为参数,调用 JobClient 的 runJob, 开始执行这个计算任务。至于 main 方法中使用的 ToolRunner 是一个运行 MapReduce 任务的辅助工具类,依样画葫芦用之即可。
public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args[0])); conf.setOutputPath(new Path(args[1])); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { if(args.length != 2){ System.err.println("Usage: WordCount <input path> <output path>"); System.exit(-1); } int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } } |
以上就是 WordCount 程序的全部细节,简单到让人吃惊,您都不敢相信就这么几行代码就可以分布式运行于大规模集群上,并行处理海量数据集。
通过上文所述的 JobConf 对象,程序员可以设定各种参数,定制如何完成一个计算任务。这些参数很多情况下就是一个 java 接口,通过注入这些接口的特定实现,可以定义一个计算任务( job )的全部细节。了解这些参数及其缺省设置,您才能在编写自己的并行计算程序时做到轻车熟路,游刃有余,明白哪些类是需要自己实现的,哪些类用 Hadoop 的缺省实现即可。表一是对 JobConf 对象中可以设置的一些重要参数的总结和说明,表中第一列中的参数在 JobConf 中均会有相应的 get/set 方法,对程序员来说,只有在表中第三列中的缺省值无法满足您的需求时,才需要调用这些 set 方法,设定合适的参数值,实现自己的计算目的。针对表格中第一列中的接口,除了第三列的缺省实现之外,Hadoop 通常还会有一些其它的实现,我在表格第四列中列出了部分,您可以查阅 Hadoop 的 API 文档或源代码获得更详细的信息,在很多的情况下,您都不用实现自己的 Mapper 和 Reducer, 直接使用 Hadoop 自带的一些实现即可。
将输入的数据集切割成小数据集 InputSplits, 每一个 InputSplit 将由一个 Mapper 负责处理。此外 InputFormat 中还提供一个 RecordReader 的实现, 将一个 InputSplit 解析成 <key,value> 对提供给 map 函数。 | TextInputFormat (针对文本文件,按行将文本文件切割成 InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 <key,value> 对,key 是行在文件中的位置,value 是文件中的一行) |
SequenceFileInputFormat |
提供一个 RecordWriter 的实现,负责输出最终结果 | TextOutputFormat (用 LineRecordWriter 将最终结果写成纯文件文件,每个 <key,value> 对一行,key 和 value 之间用 tab 分隔) |
SequenceFileOutputFormat |
输出的最终结果中 key 的类型 | LongWritable | |
输出的最终结果中 value 的类型 | Text | |
Mapper 类,实现 map 函数,完成输入的 <key,value> 到中间结果的映射 | IdentityMapper (将输入的 <key,value> 原封不动的输出为中间结果) |
LongSumReducer, LogRegexMapper, InverseMapper |
实现 combine 函数,将中间结果中的重复 key 做合并 | null (不对中间结果中的重复 key 做合并) |
|
Reducer 类,实现 reduce 函数,对中间结果做合并,形成最终结果 | IdentityReducer (将中间结果直接输出为最终结果) |
AccumulatingReducer, LongSumReducer |
设定 job 的输入目录, job 运行时会处理输入目录下的所有文件 | null | |
设定 job 的输出目录,job 的最终结果会写入输出目录下 | null | |
设定 map 函数输出的中间结果中 key 的类型 | 如果用户没有设定的话,使用 OutputKeyClass | |
设定 map 函数输出的中间结果中 value 的类型 | 如果用户没有设定的话,使用 OutputValuesClass | |
对结果中的 key 进行排序时的使用的比较器 | WritableComparable | |
对中间结果的 key 排序后,用此 Partition 函数将其划分为R份,每份由一个 Reducer 负责处理。 | HashPartitioner (使用 Hash 函数做 partition) |
KeyFieldBasedPartitioner PipesPartitioner |
现在你对 Hadoop 并行程序的细节已经有了比较深入的了解,我们来把 WordCount 程序改进一下,目标: (1)原 WordCount 程序仅按空格切分单词,导致各类标点符号与单词混杂在一起,改进后的程序应该能够正确的切出单词,并且单词不要区分大小写。(2)在最终结果中,按单词出 现频率的降序进行排序。
实现很简单,见代码清单4中的注释。
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern="[^\\w]"; //正则表达式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString().toLowerCase(); //全部转为小写字母 line = line.replaceAll(pattern, " "); //将非0-9, a-z, A-Z的字符替换为空格 StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } |
用一个并行计算任务显然是无法同时完成单词词频统计和排序的,这时我们可以利用 Hadoop 的任务管道能力,用上一个任务(词频统计)的输出做为下一个任务(排序)的输入,顺序执行两个并行计算任务。主要工作是修改代码清单3中的 run 函数,在其中定义一个排序任务并运行之。
在 Hadoop 中要实现排序是很简单的,因为在 MapReduce 的过程中,会把中间结果根据 key 排序并按 key 切成 R 份交给 R 个 Reduce 函数,而 Reduce 函数在处理中间结果之前也会有一个按 key 进行排序的过程,故 MapReduce 输出的最终结果实际上已经按 key 排好序。词频统计任务输出的 key 是单词,value 是词频,为了实现按词频排序,我们指定使用 InverseMapper 类作为排序任务的 Mapper 类( sortJob.setMapperClass(InverseMapper.class );),这个类的 map 函数简单地将输入的 key 和 value 互换后作为中间结果输出,在本例中即是将词频作为 key,单词作为 value 输出, 这样自然就能得到按词频排好序的最终结果。我们无需指定 Reduce 类,Hadoop 会使用缺省的 IdentityReducer 类,将中间结果原样输出。
还有一个问题需要解决: 排序任务中的 Key 的类型是 IntWritable, (sortJob.setOutputKeyClass(IntWritable.class)), Hadoop 默认对 IntWritable 按升序排序,而我们需要的是按降序排列。因此我们实现了一个 IntWritableDecreasingComparator 类, 并指定使用这个自定义的 Comparator 类对输出结果中的 key (词频)进行排 序:sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class)
详见代码清单 5 及其中的注释。
public int run(String[] args) throws Exception { Path tempDir = new Path("wordcount-temp-" + Integer.toString( new Random().nextInt(Integer.MAX_VALUE))); //定义一个临时目录 JobConf conf = new JobConf(getConf(), WordCount.class); try { conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args[0])); conf.setOutputPath(tempDir); //先将词频统计任务的输出结果写到临时目 //录中, 下一个排序任务以临时目录为输入目录。 conf.setOutputFormat(SequenceFileOutputFormat.class); JobClient.runJob(conf); JobConf sortJob = new JobConf(getConf(), WordCount.class); sortJob.setJobName("sort"); sortJob.setInputPath(tempDir); sortJob.setInputFormat(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); //将 Reducer 的个数限定为1, 最终输出的结果 //文件就是一个。 sortJob.setOutputPath(new Path(args[1])); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(Text.class); sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class); JobClient.runJob(sortJob); } finally { FileSystem.get(conf).delete(tempDir); //删除临时目录 } return 0; } private static class IntWritableDecreasingComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } |
在 Eclipse 环境下可以方便地进行 Hadoop 并行程序的开发和调试。推荐使用 IBM MapReduce Tools for Eclipse, 使用这个 Eclipse plugin 可以简化开发和部署 Hadoop 并行程序的过程。基于这个 plugin, 可以在 Eclipse 中创建一个 Hadoop MapReduce 应用程序,并且提供了一些基于 MapReduce 框架的类开发的向导,可以打包成 JAR 文件,部署一个 Hadoop MapReduce 应用程序到一个 Hadoop 服务器(本地和远程均可),可以通过一个专门的视图 ( perspective ) 查看 Hadoop 服务器、Hadoop 分布式文件系统( DFS )和当前运行的任务的状态。
可在 IBM alphaWorks 网站下载这个 MapReduce Tool , 或在本文的下载清单中下载。将下载后的压缩包解压到你 Eclipse 安装目录,重新启动 Eclipse 即可使用了。
点击 Eclipse 主菜单上 Windows->Preferences, 然后在左侧选择 Hadoop Home Directory,设定你的 Hadoop 主目录,如图一所示:
点击 Eclipse 主菜单上 File->New->Project, 在弹出的对话框中选择 MapReduce Project, 输入 project name 如 wordcount, 然后点击 Finish 即可。,如图 2 所示:
此后,你就可以象一个普通的 Eclipse Java project 那样,添加入 Java 类,比如你可以定义一个 WordCount 类,然后将本文代码清单1,2,3中的代码写到此类中,添加入必要的 import 语句 ( Eclipse 快捷键 ctrl+shift+o 可以帮你),即可形成一个完整的 wordcount 程序。
在我们这个简单的 wordcount 程序中,我们把全部的内容都放在一个 WordCount 类中。实际上 IBM MapReduce tools 还提供了几个实用的向导 ( wizard ) 工具,帮你创建单独的 Mapper 类,Reducer 类,MapReduce Driver 类(就是代码清单3中那部分内容),在编写比较复杂的 MapReduce 程序时,将这些类独立出来是非常有必要的,也有利于在不同的计算任务中重用你编写的各种 Mapper 类和 Reducer 类。
如图三所示,设定程序的运行参数:输入目录和输出目录之后,你就可以在 Eclipse 中运行 wordcount 程序了,当然,你也可以设定断点,调试程序。
到目前为止,我们已经介绍了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, 如何安装和部署单机 Hadoop 环境,实际编写了一个 Hadoop 并行计算程序,并了解了一些重要的编程细节,了解了如何使用 IBM MapReduce Tools 在 Eclipse 环境中编译,运行和调试你的 Hadoop 并行计算程序。但一个 Hadoop 并行计算程序,只有部署运行在分布式集群环境中,才能发挥其真正的优势,在这篇系列文章的第 3 部分中,你将了解到如何部署你的分布式 Hadoop 环境,如何利用 IBM MapReduce Tools 将你的程序部署到分布式环境中运行等内容。
声明:本文仅代表作者个人之观点,不代表 IBM 公司之观点。
改进的 wordcount 程序 | wordcount.zip | 8KB | HTTP |
IBM MapReduce Tools | mapreduce_plugin.zip | 324KB | HTTP |
学习
-
访问 Hadoop 官方网站
,了解 Hadoop 及其子项目 HBase 的信息。
-
Hadoop wiki
上, 有许多 Hadoop 的用户文档,开发文档,示例程序等。
- 阅读 Google Mapreduce 论文:
MapReduce: Simplified Data Processing on Large Clusters
, 深入了解 Mapreduce 计算模型。
- 学习 Hadoop 分布式文件系统 HDFS:
The Hadoop Distributed File System:Architecture and Design
- 学习 Google 文件系统 GFS:
The Google File System
, Hadoop HDFS 实现了与 GFS 类似的功能。
- 到 IBM alphaWorks 网站了解并且下载 IBM MapReduce Tools:
http://www.alphaworks.ibm.com/tech/mapreducetools
,
讨论
-
加入Hadoop 开发者邮件列表
,了解 Hadoop 项目开发的最新进展。
曹 羽中,在北京航空航天大学获得计算机软件与理论专业的硕士学位,具有数年的 unix 环境下的 C 语言,Java,数据库以及电信计费软件的开发经验,他的技术兴趣还包括 OSGi 和搜索技术。他目前在IBM中国系统与科技实验室从事系统管理软件的开发工作,可以通过 caoyuz@cn.ibm.com 与他联系。
原文地址: http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/index.html
发表评论
-
体系化认识RPC
2017-12-25 10:11 779RPC(Remote Procedure Call),即远程 ... -
聊一聊分布式锁的设计
2017-04-10 13:43 426起因 前段时间,看到r ... -
分布式服务框架 Zookeeper -- 管理分布式环境中的数据
2016-10-09 17:22 491Zookeeper 分布式服务框架是 Apach ... -
怎样打造一个分布式数据库
2016-08-19 09:13 846在技术方面,我自己热衷于 Open Source,写了很多 ... -
分布式系统的事务处理
2014-02-15 20:49 7282014年1月20日 陈皓 当我们在生产线上用一台服 ... -
Hadoop 新 MapReduce 框架 Yarn 详解
2013-11-02 13:22 853简介: 本文介绍了 Hadoop 自 0.23.0 ... -
NoSQL数据库的分布式算法
2013-01-16 09:59 823本文另一地址请见NoSQL数据库的分布式算法 本文译自 ... -
Brewer’s CAP Theorem
2013-01-05 15:10 1081Amazon和EBay一直在喝的酷爱(kool aid)饮料。 ... -
HBase性能调优
2011-09-22 07:29 1057因官方Book Performance Tunin ... -
用 Hadoop 进行分布式并行编程, 第 3 部分
2011-05-31 14:57 1009一 前言 ... -
用 Hadoop 进行分布式并行编程, 第 1 部分
2011-05-31 14:54 964Hadoop 简 ... -
分布式文件系统FastDFS架构剖析
2011-05-29 10:16 921分布式文件系统FastDFS架构剖析 ... -
开源分布式文件系统FastDFS和MogileFS
2011-05-29 10:15 1385开源分布式文件系统FastDFS和MogileFS ... -
当下流行的分布式文件系统大阅兵
2011-05-29 10:14 1097本文对目前数种分布式文件系统进行简单的介绍。当前比较流行的 ...
相关推荐
白色大气风格的建筑商业网站模板下载.rar
内容概要:本文详细介绍了面向对象编程语言Objective-C的基础语法,包括其历史背景、特点、环境搭建、基本语法、面向对象编程、高级特性和实际应用。具体涵盖的内容包括Objective-C的历史发展、面向对象编程的核心特性、变量和数据类型、控制结构、函数、数组和字典的使用,以及类、对象、属性和方法的定义与使用。此外,还介绍了高级特性如协议和委托、类别和扩展、ARC、块和GCD。最后,通过示例项目展示了如何在Xcode中创建和调试Objective-C程序,以及如何使用Cocoa和Cocoa Touch框架。 适合人群:具备一定的编程基础,希望学习或深入了解Objective-C编程的开发人员。 使用场景及目标:适用于需要开发macOS和iOS应用的开发者,帮助他们掌握Objective-C的基本语法和高级特性,提高编程效率和代码质量。 其他说明:本文不仅提供了详细的理论讲解,还通过实际代码示例展示了如何在Xcode中创建和调试Objective-C项目,适合初级到中级水平的开发人员学习和参考。
本次开发的微信小程球馆预约系统,有管理员,用户两个角色。管理员功能有个人中心,用户管理,场地类型管理,球馆信息管理,球馆预约管理,系统管理。用户可以在微信小程序上面注册登录,查看球馆信息,对球馆进行预约操作。 开发本程序后台用到了SSM开发技术,微信端用的是uni-app技术。数据库采用关系数据库市场占有率最高的MySQL作为本程序使用的数据库,完全符合程序使用并且有丰富的拓展余地。 用户在微信小程序注册登录后可以看到首页,首页可以搜索球馆名称,也可以查看球馆资讯,下面是导航栏。 用户点击球馆信息可以进行预约,预约需要输入相关时间等信息。 我的里面可以修改个人信息,可以退出,还可以查看球馆预约信息和我的收藏信息。
1、嵌入式物联网单片机项目开发例程,简单、方便、好用,节省开发时间。 2、代码使用KEIL 标准库开发,当前在STM32F030C8T6运行,如果是STM32F030其他型号芯片,依然适用,请自行更改KEIL芯片型号以及FLASH容量即可。 3、软件下载时,请注意keil选择项是jlink还是stlink。 4、有偿指导v:wulianjishu666; 5、如果接入其他传感器,请查看账号发布的其他资料。 6、单片机与模块的接线,在代码当中均有定义,请自行对照。 7、若硬件有差异,请根据自身情况调整代码,程序仅供参考学习。 8、代码有注释说明,请耐心阅读。 9、编译时请注意提示,请选择合适的编译器版本。
廖鹏盛 - 时代进行曲.zip
白色大气风格的人体艺术摄影网站模板下载.zip
白色大气风格的服装设计师模板下载.zip
白色大气风格的景观设计HTML网站模板.zip
优质的机器学习资源是当今科技领域的热点,其中TensorFlow作为谷歌公司的开源库,成为最受欢迎的深度学习框架之一,广泛应用于各类项目中。TensorFlow提供了丰富的功能和灵活性,使得开发者可以轻松构建和训练复杂的神经网络模型,处理图像、文本和其他类型的数据。由于其开源性质,拥有庞大的社区支持,用户可以放心使用,并从开源社区中获取宝贵的经验和资源。 mnist数据集是机器学习领域的经典数据集之一。它包含着大量的手写数字图像,供开发者用来训练和测试各种算法和模型。这个数据集的规模相对较小,因此对于绝大多数人来说,无论是数据的下载还是训练过程,都不会对电脑性能提出过高的要求。这使得mnist成为了理想的入门数据集,适合初学者探索和理解机器学习算法的基本原理。 结合Pygame与TensorFlow,你将能够为机器学习实验创建出图形化界面,以及实现交互式处理。Pygame是一款面向游戏和多媒体应用的Python库,但同样也可以用于数据可视化和图形化交互。利用Pygame,你可以展示训练过程中的图像输出、模型的预测结果等,增强对机器学习算法运行情况的直观认识。而且,Pygame的简单。内
基于两种坐标系的超螺旋滑模观测器的永磁同步电机pmsm无位置(速度)传感器控制模型 支持 dq旋转坐标系和静止坐标系建立smo 引入二阶滑模超螺旋算法替代一阶滑模 dq坐标系引入锁相环PLL估计转速及转子位置 有效削弱抖振 赠送超螺旋滑模搭建推导文档及相关参考资料 仿真模型
汇编实验算数运算程序设计.docx
小区监控视频监控方案.doc
白色大气风格的HTML商务模板下载.zip
白色大气风格响应式运动健身瑜伽企业网站模板.zip
单片机实验仿真设计报告
白色大气风格的设计公司整站网站模板下载.zip
白色大气风格的html商务模板.zip
白色大气风格的英文网站模板下载.zip
白色大气风格的科研教育模板下载.zip
本摄像头ov7670驱动程序已经通过本人的验证可以正常运行,不同的stm32开发板只需要修改引脚即可使用