使用 Python 编写 Hadoop MapReduce 程序
以前写 Hadoop 的 MapReduce 程序时,使用的是 Java ,利用 Java 写起来是轻车熟路,没有问题,但是使用 Java 很明显的一个弊端就是每次都要编码、打包、上传、执行,还真心是麻烦,想要更加简单的使用 Hadoop 的运算能力,想要写 MapReduce 程序不那么复杂。还真是个问题。
仔细考虑了下,熟悉的 Python 又得拿起来了,随便搜了下 Python 编写 MapReduce 程序,看了个教程,发现用起来真是方便,遂记录之。
Hadoop 框架使用 Java 开发的,对 Java 进行了原生的支持,不过对于其它语言也提供了 API 支持,如 Python 、 C++ 、 Perl 、 Ruby 等。这个工具就是 Hadoop Streaming ,顾名思义, Streaming 就是 Pipe 操作,说起 pipe ,大家肯定不陌生。最原生的 Python 支持是需要 Jython 支持的,不过这里有额外的方法来实现,大家如果只是使用的话,不用纠结 Jython 转换的问题。
前置条件:
Python 环境
Hadoop 环境( single or cluster )
最容易的 Hadoop 编程模型就是 Mapper 和 Reducer 的编写,这种编程模型大大降低了我们对于并发、同步、容错、一致性的要求,你只要编写好自己的业务逻辑,就可以提交任务。然后喝杯茶,结果就出来了,前提是你的业务逻辑没有错误。
使用 Hadoop Streaming ,能够利用 Pipe 模型,而使用 Python 的巧妙之处在于处理输入输出的数据使用的是 STDIN 和 STDOUT ,然后 Hadoop Streaming 会接管一切,转化成 MapReduce 模型。
我们还是使用 wordcount 例子,具体内容不再详细解释,如果有不理解的可以自行度之。下面我们先看下 mapper 的代码:
#!/usr/bin/env python
import sys
#input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
简单解释一下,输入从 sys.stdin 进入,然后进行分割操作,对于每行的分割结果,打印出 word 和 count=1 , Mapper 就这么简单。
大家看完 Mapper 之后,会产生疑问,这个怎么能够实现 mapper 功能?我们跳出这个 sys.stdin 模型,再回顾下 MapReduce 的程序。在 Mapper 中,程序不关心你怎么输入,只关心你的输出,这个 Mapper 代码会被放到各个 slave 机器上,去执行 Mapper 过程,其实可以理解为过滤、处理。
在示例中,程序的输入会被进行一系列的处理过程,得到 word 和 count ,这个就是 slave 机器上的数据处理之后的内容。仔细理解下这个过程,对于开发程序还是相当有帮助的。
下面我们来看下 Reduce 程序, wordcount 的 reduce 程序就是统计相同 word 的 count 数目,然后再输出。我们还是直接上代码吧:
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
看完这个reduce代码,执行一下,完全没有问题,但是未必真正能理解这个reduce的内容,我来解释一下,明确知道执行流程的可以跳过。
reduce的代码页不复杂,利用Reduce程序,可以得出count数目。如果当前的词和分出来的词一致的话,count相加,如果不一致的话,就打印出来,同时更新输入的word和count。最后的if是打印出最后一次统计结果。
reduce的执行依赖了MapReduce模型一个要点,在Shuffle过程中,同一个key会放到同一个reduce任务中,这样处理的是一系列连续的相同的key值,当key不一样的时候,就是说开始统计下一个word了。
<!--EndFragment-->
利用Python写MapReduce程序就这么多内容,更细节的内容和自己处理的业务相关。
下面测试下结果:
<!--EndFragment-->
echo "foo foo quux labs foo bar quux" | python ./mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
进一步可以看到
echo "foo foo quux labs foo bar quux" | python ./mapper.py | sort -k1,1 | ./reducer.py
bar 1
foo 3
labs 1
quux 2
下面就是执行Hadoop命令了,在使用Hadoop Streaming时,要使用一定的格式操作才能提交任务。
<!--EndFragment-->
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar –mapper mapperfile –file mapper_file_path –reducer reducefile –file reducer_file_path –input input_path –output output_path
将自己的mapper、reducer代码代入上面命令中,执行一下看结果是否正确。
<!--EndFragment-->
本文的最后列一下Hadoop Streaming操作的参数,以作备忘。
<!--EndFragment-->
Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming.jar [options]
Options:
-input <path> DFS input file(s) for the Map step
-output <path> DFS output directory for the Reduce step
-mapper <cmd|JavaClassName> The streaming command to run
-combiner <JavaClassName> Combiner has to be a Java class
-reducer <cmd|JavaClassName> The streaming command to run
-file <file> File/dir to be shipped in the Job jar file
-dfs <h:p>|local Optional. Override DFS configuration
-jt <h:p>|local Optional. Override JobTracker configuration
-additionalconfspec specfile Optional.
-inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
-outputformat TextOutputFormat(default)|JavaClassName Optional.
-partitioner JavaClassName Optional.
-numReduceTasks <num> Optional.
-inputreader <spec> Optional.
-jobconf <n>=<v> Optional. Add or override a JobConf property
-cmdenv <n>=<v> Optional. Pass env.var to streaming commands
-cacheFile fileNameURI
-cacheArchive fileNameURI
-verbose
下面简单说下参数的意思:
-input:DFS输入,可以有多个input输入,不过我一般喜欢把输入用逗号{,}分割。
-output:DFS输入,实际上就是Reducer输出
-mapper:MapReduce中的Mapper,看清楚了,也可以是cmd shell命令
-combiner:这个必须是Java类
-reducer:MapReducer中的Reducer,也可以是shell命令
-file:这个file参数是用来提交本地的文件,如本地的mapper或者reducer
-dfs:这个是可选的,用来覆盖DFS设定。
-jt:用来覆盖jobtracker的设定
-inputformat:输入格式设定
-outputformat:输出文件的格式设定
上面的这些参数已经足够平时的应用了,如果有更为细节的需求,就要考虑Streaming是否合适,是否适应自己的业务逻辑。
最后再说一句:按照Hadoop Streaming的执行流程,这些参数应该足够了,但是如果我有更复杂的需求:如根据key值分离文件;根据key值重命名文件;读取HDFS上文件配置数据;从多个数据源中读取mapper数据,如HDFS、DataBase、HBase、Nosql等,这些比较灵活的应用使用Python Streaming都有限制,或者是我暂时还没有看到这块。但是目前来说,使用Hadoop Streaming操作能够大量减少代码和流程,比使用Java要方便许多,特别是对于日常的、临时的统计工作。
只有更复杂的统计工作和Hadoop Streaming特性,留待以后再行发掘。
<!--EndFragment-->
<!--EndFragment-->
分享到:
相关推荐
本文将深入探讨如何使用Python来编写Hadoop MapReduce程序,以实现微博关注者之间的相似用户分析。这个任务的关键在于理解并应用分布式计算原理,以及熟悉Python编程语言在大数据环境下的应用。 首先,Hadoop ...
这个压缩包“使用hadoop-streaming运行Python编写的MapReduce程序.rar”显然是一个教程或示例,旨在指导用户如何利用Python编写MapReduce任务,并通过Hadoop Streaming进行执行。 MapReduce是一种编程模型,由...
在这个例子中,"simple_Hadoop_MapReduce_example-master"可能是一个包含具体代码和说明的项目目录,用于演示如何在Python中编写MapReduce程序。Python作为一门灵活且易学的语言,被广泛用于Hadoop开发,通过Pydoop...
Hadoop和Python的Mapreduce 关于如何使用Python和Hadoop执行MapReduce的一小段回购。 映射器和化简器都是用Python编写的。 有关如何在Hadoop中实现这两个脚本的教程位于。
利用hadoop-streaming框架运行python脚本指令
在这个项目中,我们将探讨如何使用Python编写MapReduce程序,对海量的市场交易数据进行处理,以实现市场篮子分析。 首先,了解Hadoop MapReduce的基本原理是至关重要的。MapReduce是一种分布式计算模型,由Google...
在Windows上编写MapReduce程序时,UnxUtils可以帮助你模拟Linux环境,因为Hadoop通常是与Unix/Linux环境一起使用的。 3. **编程语言**:MapReduce程序通常用Java编写,因为它是最兼容的编程语言,Hadoop的API也主要...
这些API提供了方便的接口,使得开发者可以用Python编写MapReduce作业。例如,你可以在map函数中使用scikit-learn的模型进行训练,然后在reduce函数中进行模型的融合或结果的统计。 在大数据与机器学习实战中,我们...
这个库简化了在分布式计算系统中执行数据处理任务的过程,使得Python开发者无需深入理解Hadoop的底层细节也能编写MapReduce程序。 MapReduce是一种编程模型,由Google提出,用于大规模数据集的并行处理。它将大数据...
在Hadoop生态系统中,Eclipse是一个常用的集成开发环境(IDE),用于编写MapReduce程序。MapReduce是一种编程模型,用于大规模数据集的并行处理。它将大数据任务分解为两个主要阶段:映射(Map)和化简(Reduce)。...
标题 "用python编写nagios hadoop监控脚本" 暗示了本文将探讨如何使用Python编程语言来创建Nagios监控系统针对Hadoop集群的监控脚本。Nagios是一款广泛使用的开源网络监控系统,它能检测各种IT基础设施的状态,包括...
在这个项目中,我们将通过Python编写MapReduce程序,并在Ubuntu系统上配置的Hadoop集群上运行它。 首先,了解Hadoop是必要的。Hadoop是由Apache基金会开发的分布式计算框架,它允许在廉价硬件上处理PB级别的数据。...
7. **hadoop-streaming**: 这是一个可选的JAR包,用于支持使用非Java语言(如Python或Perl)编写MapReduce作业。 8. **hadoop-yarn-api**: 提供YARN的API,用于与资源管理器进行通信。 9. **hadoop-yarn-client**:...
然而,为了适应不同开发者的需求,书里也可能介绍基于Python或Scala的工具,如PySpark和Scalding,它们提供了一种更加简洁和直观的方式来编写MapReduce作业。 压缩包内的文件结构反映了电子书的组织方式。`mimetype...
在这个框架下,程序员可以使用多种语言编写应用程序,不仅限于Java,还包括Python、Scala和其他支持的脚本语言。 理论部分: 1. **MapReduce编程模型**:MapReduce的核心是Map和Reduce两个函数。Map负责将输入数据...
在Python中,虽然没有官方的原生实现,但有很多第三方库如Hadoop的PyDoop或Apache Spark的PySpark提供了MapReduce的接口,使得Python开发者也能利用这种强大的数据处理工具。我们来深入了解一下MapReduce的基本概念...
Hadoop还提供了如Hadoop Streaming和Hadoop Pipes这样的工具,允许使用非Java语言(如Python或C++)编写MapReduce应用程序,增加了开发者的灵活性。 在输入和输出方面,MapReduce处理的是键值对数据。键和值必须...
此外,文档可能还涵盖了Hadoop的其他相关工具,如Hadoop Streaming,它允许使用非Java语言(如Python或Perl)编写MapReduce作业;或者是Pig和Hive,它们提供了高级的数据处理语言,简化了MapReduce作业的编写。 总...