`

使用 Python 编写 Hadoop MapReduce 程序

 
阅读更多

使用 Python 编写 Hadoop MapReduce 程序

 

以前写 Hadoop MapReduce 程序时,使用的是 Java ,利用 Java 写起来是轻车熟路,没有问题,但是使用 Java 很明显的一个弊端就是每次都要编码、打包、上传、执行,还真心是麻烦,想要更加简单的使用 Hadoop 的运算能力,想要写 MapReduce 程序不那么复杂。还真是个问题。

仔细考虑了下,熟悉的 Python 又得拿起来了,随便搜了下 Python 编写 MapReduce 程序,看了个教程,发现用起来真是方便,遂记录之。

 

 

Hadoop 框架使用 Java 开发的,对 Java 进行了原生的支持,不过对于其它语言也提供了 API 支持,如 Python C++ Perl Ruby 等。这个工具就是 Hadoop Streaming ,顾名思义, Streaming 就是 Pipe 操作,说起 pipe ,大家肯定不陌生。最原生的 Python 支持是需要 Jython 支持的,不过这里有额外的方法来实现,大家如果只是使用的话,不用纠结 Jython 转换的问题。

 

 

 

前置条件:

Python 环境

Hadoop 环境( single or cluster

 

最容易的 Hadoop 编程模型就是 Mapper Reducer 的编写,这种编程模型大大降低了我们对于并发、同步、容错、一致性的要求,你只要编写好自己的业务逻辑,就可以提交任务。然后喝杯茶,结果就出来了,前提是你的业务逻辑没有错误。

使用 Hadoop Streaming ,能够利用 Pipe 模型,而使用 Python 的巧妙之处在于处理输入输出的数据使用的是 STDIN STDOUT ,然后 Hadoop Streaming 会接管一切,转化成 MapReduce 模型。

 

我们还是使用 wordcount 例子,具体内容不再详细解释,如果有不理解的可以自行度之。下面我们先看下 mapper 的代码:

 

#!/usr/bin/env python

import sys
#input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

 

简单解释一下,输入从 sys.stdin 进入,然后进行分割操作,对于每行的分割结果,打印出 word count=1 Mapper 就这么简单。

大家看完 Mapper 之后,会产生疑问,这个怎么能够实现 mapper 功能?我们跳出这个 sys.stdin 模型,再回顾下 MapReduce 的程序。在 Mapper 中,程序不关心你怎么输入,只关心你的输出,这个 Mapper 代码会被放到各个 slave 机器上,去执行 Mapper 过程,其实可以理解为过滤、处理。

在示例中,程序的输入会被进行一系列的处理过程,得到 word count ,这个就是 slave 机器上的数据处理之后的内容。仔细理解下这个过程,对于开发程序还是相当有帮助的。

 

下面我们来看下 Reduce 程序, wordcount reduce 程序就是统计相同 word count 数目,然后再输出。我们还是直接上代码吧:

#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

 

 

看完这个reduce代码,执行一下,完全没有问题,但是未必真正能理解这个reduce的内容,我来解释一下,明确知道执行流程的可以跳过。

reduce的代码页不复杂,利用Reduce程序,可以得出count数目。如果当前的词和分出来的词一致的话,count相加,如果不一致的话,就打印出来,同时更新输入的wordcount。最后的if是打印出最后一次统计结果。

reduce的执行依赖了MapReduce模型一个要点,在Shuffle过程中,同一个key会放到同一个reduce任务中,这样处理的是一系列连续的相同的key值,当key不一样的时候,就是说开始统计下一个word了。

<!--EndFragment-->

 

 

利用PythonMapReduce程序就这么多内容,更细节的内容和自己处理的业务相关。

下面测试下结果:

<!--EndFragment-->
echo "foo foo quux labs foo bar quux" | python ./mapper.py
  foo     1
  foo     1
  quux    1
  labs    1
  foo     1
  bar     1
  quux    1

 

 进一步可以看到

 

echo "foo foo quux labs foo bar quux" | python ./mapper.py | sort -k1,1 | ./reducer.py
  bar     1
  foo     3
  labs    1
  quux    2

 

下面就是执行Hadoop命令了,在使用Hadoop Streaming时,要使用一定的格式操作才能提交任务。

<!--EndFragment--> 

 

 

   hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar –mapper mapperfile –file mapper_file_path –reducer reducefile –file reducer_file_path –input input_path –output output_path

 

将自己的mapperreducer代码代入上面命令中,执行一下看结果是否正确。

<!--EndFragment-->

 

本文的最后列一下Hadoop Streaming操作的参数,以作备忘。

<!--EndFragment-->

Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming.jar [options]
 Options:
   -input    <path>                   DFS input file(s) for the Map step
   -output   <path>                   DFS output directory for the Reduce step
   -mapper   <cmd|JavaClassName>      The streaming command to run
   -combiner <JavaClassName>          Combiner has to be a Java class
   -reducer  <cmd|JavaClassName>      The streaming command to run
   -file     <file>                   File/dir to be shipped in the Job jar file
   -dfs    <h:p>|local                Optional. Override DFS configuration
   -jt     <h:p>|local                Optional. Override JobTracker configuration
   -additionalconfspec specfile       Optional.
   -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
   -outputformat TextOutputFormat(default)|JavaClassName  Optional.
   -partitioner JavaClassName         Optional.
   -numReduceTasks <num>              Optional.
   -inputreader <spec>                Optional.
   -jobconf  <n>=<v>                  Optional. Add or override a JobConf property
   -cmdenv   <n>=<v>                  Optional. Pass env.var to streaming commands
   -cacheFile fileNameURI
   -cacheArchive fileNameURI
   -verbose

 

下面简单说下参数的意思:

-inputDFS输入,可以有多个input输入,不过我一般喜欢把输入用逗号{,}分割。

-outputDFS输入,实际上就是Reducer输出

-mapperMapReduce中的Mapper,看清楚了,也可以是cmd shell命令

-combiner:这个必须是Java

-reducerMapReducer中的Reducer,也可以是shell命令

-file:这个file参数是用来提交本地的文件,如本地的mapper或者reducer

-dfs:这个是可选的,用来覆盖DFS设定。

-jt:用来覆盖jobtracker的设定

-inputformat:输入格式设定

-outputformat:输出文件的格式设定

 

上面的这些参数已经足够平时的应用了,如果有更为细节的需求,就要考虑Streaming是否合适,是否适应自己的业务逻辑。

 

最后再说一句:按照Hadoop Streaming的执行流程,这些参数应该足够了,但是如果我有更复杂的需求:如根据key值分离文件;根据key值重命名文件;读取HDFS上文件配置数据;从多个数据源中读取mapper数据,如HDFS、DataBase、HBase、Nosql等,这些比较灵活的应用使用Python Streaming都有限制,或者是我暂时还没有看到这块。但是目前来说,使用Hadoop Streaming操作能够大量减少代码和流程,比使用Java要方便许多,特别是对于日常的、临时的统计工作。

 

只有更复杂的统计工作和Hadoop Streaming特性,留待以后再行发掘。

<!--EndFragment-->

<!--EndFragment--> 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    python hadoop mapreduce 相似用户|mapreduce.rar

    本文将深入探讨如何使用Python来编写Hadoop MapReduce程序,以实现微博关注者之间的相似用户分析。这个任务的关键在于理解并应用分布式计算原理,以及熟悉Python编程语言在大数据环境下的应用。 首先,Hadoop ...

    使用hadoop-streaming运行Python编写的MapReduce程序.rar

    这个压缩包“使用hadoop-streaming运行Python编写的MapReduce程序.rar”显然是一个教程或示例,旨在指导用户如何利用Python编写MapReduce任务,并通过Hadoop Streaming进行执行。 MapReduce是一种编程模型,由...

    Python中Hadoop MapReduce的一个简单示例.zip

    在这个例子中,"simple_Hadoop_MapReduce_example-master"可能是一个包含具体代码和说明的项目目录,用于演示如何在Python中编写MapReduce程序。Python作为一门灵活且易学的语言,被广泛用于Hadoop开发,通过Pydoop...

    hadoop-python-mapreduce:有关如何使用Python运行Hadoop MapReduce的教程

    Hadoop和Python的Mapreduce 关于如何使用Python和Hadoop执行MapReduce的一小段回购。 映射器和化简器都是用Python编写的。 有关如何在Hadoop中实现这两个脚本的教程位于。

    hadoop运行python编写的mapreduce程序

    利用hadoop-streaming框架运行python脚本指令

    Python_Hadoop_MapReduce_MarketBasketAnalysis:在Python中使用Hadoop MapReduce进行市场分析

    在这个项目中,我们将探讨如何使用Python编写MapReduce程序,对海量的市场交易数据进行处理,以实现市场篮子分析。 首先,了解Hadoop MapReduce的基本原理是至关重要的。MapReduce是一种分布式计算模型,由Google...

    windows下编写mapreduce程序

    在Windows上编写MapReduce程序时,UnxUtils可以帮助你模拟Linux环境,因为Hadoop通常是与Unix/Linux环境一起使用的。 3. **编程语言**:MapReduce程序通常用Java编写,因为它是最兼容的编程语言,Hadoop的API也主要...

    Python——机器学习实战——大数据与MapReduce

    这些API提供了方便的接口,使得开发者可以用Python编写MapReduce作业。例如,你可以在map函数中使用scikit-learn的模型进行训练,然后在reduce函数中进行模型的融合或结果的统计。 在大数据与机器学习实战中,我们...

    Python-mrjob在Hadoop或AmazonWebServices上运行MapReduce作业

    这个库简化了在分布式计算系统中执行数据处理任务的过程,使得Python开发者无需深入理解Hadoop的底层细节也能编写MapReduce程序。 MapReduce是一种编程模型,由Google提出,用于大规模数据集的并行处理。它将大数据...

    hadoop eclipse mapreduce 下开发所有需要用到的 JAR 包

    在Hadoop生态系统中,Eclipse是一个常用的集成开发环境(IDE),用于编写MapReduce程序。MapReduce是一种编程模型,用于大规模数据集的并行处理。它将大数据任务分解为两个主要阶段:映射(Map)和化简(Reduce)。...

    用python编写nagios hadoop监控脚本

    标题 "用python编写nagios hadoop监控脚本" 暗示了本文将探讨如何使用Python编程语言来创建Nagios监控系统针对Hadoop集群的监控脚本。Nagios是一款广泛使用的开源网络监控系统,它能检测各种IT基础设施的状态,包括...

    词频统计(基于hadoop集群,python实现)

    在这个项目中,我们将通过Python编写MapReduce程序,并在Ubuntu系统上配置的Hadoop集群上运行它。 首先,了解Hadoop是必要的。Hadoop是由Apache基金会开发的分布式计算框架,它允许在廉价硬件上处理PB级别的数据。...

    hadoop eclipse mapreduce下开发所有需要用到的JAR包

    7. **hadoop-streaming**: 这是一个可选的JAR包,用于支持使用非Java语言(如Python或Perl)编写MapReduce作业。 8. **hadoop-yarn-api**: 提供YARN的API,用于与资源管理器进行通信。 9. **hadoop-yarn-client**:...

    Packtpub.Hadoop.MapReduce.Cookbook.Jan.2013

    然而,为了适应不同开发者的需求,书里也可能介绍基于Python或Scala的工具,如PySpark和Scalding,它们提供了一种更加简洁和直观的方式来编写MapReduce作业。 压缩包内的文件结构反映了电子书的组织方式。`mimetype...

    MapReduce2.0程序设计多语言编程(理论+实践)

    在这个框架下,程序员可以使用多种语言编写应用程序,不仅限于Java,还包括Python、Scala和其他支持的脚本语言。 理论部分: 1. **MapReduce编程模型**:MapReduce的核心是Map和Reduce两个函数。Map负责将输入数据...

    Mapreduce-1python中的MapReduce的孙子祖父母对.zip

    在Python中,虽然没有官方的原生实现,但有很多第三方库如Hadoop的PyDoop或Apache Spark的PySpark提供了MapReduce的接口,使得Python开发者也能利用这种强大的数据处理工具。我们来深入了解一下MapReduce的基本概念...

    java__Hadoop_MapReduce教程.pdf

    Hadoop还提供了如Hadoop Streaming和Hadoop Pipes这样的工具,允许使用非Java语言(如Python或C++)编写MapReduce应用程序,增加了开发者的灵活性。 在输入和输出方面,MapReduce处理的是键值对数据。键和值必须...

    hadoop编写MR和运行测试共12页.pdf.zip

    此外,文档可能还涵盖了Hadoop的其他相关工具,如Hadoop Streaming,它允许使用非Java语言(如Python或Perl)编写MapReduce作业;或者是Pig和Hive,它们提供了高级的数据处理语言,简化了MapReduce作业的编写。 总...

Global site tag (gtag.js) - Google Analytics