`

MapReduce,组合式,迭代式,链式

 
阅读更多

 

1.迭代式mapreduce

一些复杂的任务难以用一次MapReduce处理完成,需要多次 MapReduce 才能完成任务,例如Pagrank,K-means算法都需要多次的迭代,关于 MapReduce 迭代在Mahout中运用较多。有兴趣的可以参考一下Mahout的源码。

      在MapReduce的迭代思想,类似for循环,前一个 MapReduce的输出结果,作为下一个 MapReduce的输入,任务完成后中间结果都可以删除。

代码示例:

 

Configuration conf1 = new Configuration();
Job job1 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job1,InputPaht1);
FileOutputFromat.setOoutputPath(job1,Outpath1);
job1.waitForCompletion(true);
//sub Mapreduce
Configuration conf2 = new Configuration();
Job job2 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job2,Outpath1);
FileOutputFromat.setOoutputPath(job2,Outpath2);
job2.waitForCompletion(true);
//sub Mapreduce
Configuration conf3 = new Configuration();
Job job3 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job3,Outpath2);
FileOutputFromat.setOoutputPath(job3,Outpath3);
job3.waitForCompletion(true);
.....
 

 

 

下面列举一个Mahout怎样运用 MapReduce 迭代的,下面的代码快就是Mahout中k-means的算法的代码,在main函数中用一个while循环来做 MapReduce 的迭代,其中:runIteration()是一次MapReduce 的过程。

个人感觉现在的 MapReduce 迭代设计不太满意的地方。

1. 每次迭代,所有Job(task)重复创建,代价非常高。

2.每次迭代,数据都写入本地和读取本地,I/O和网络传输的代价比较大。

期待着下个版本hadoop更好的支持迭代算法。

代码示例:

//main function

while (!converged && iteration <= maxIterations) {
      log.info("K-Means Iteration {}", iteration);
      // point the output to a new directory per iteration
      Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration);
      converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta);
      // now point the input to the old output directory
      clustersIn = clustersOut;
      iteration++;
}

  private static boolean runIteration(Configuration conf,
                                      Path input,
                                      Path clustersIn,
                                      Path clustersOut,
                                      String measureClass,
                                      String convergenceDelta)
    throws IOException, InterruptedException, ClassNotFoundException {

    conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
    conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
    conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);

    Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ClusterObservations.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Cluster.class);

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(KMeansMapper.class);
    job.setCombinerClass(KMeansCombiner.class);
    job.setReducerClass(KMeansReducer.class);

    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, clustersOut);

    job.setJarByClass(KMeansDriver.class);
    HadoopUtil.delete(conf, clustersOut);
    if (!job.waitForCompletion(true)) {
      throw new InterruptedException("K-Means Iteration failed processing " + clustersIn);
    }
    FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);

    return isConverged(clustersOut, conf, fs);
  }
 

 

2.依赖关系组合式MapReduce

我们可以设想一下MapReduce有3个子任务job1,job2,job3构成,其中job1和job2相互独立,job3要在job1和job2完成之后才执行。这种关系就叫复杂数据依赖关系的组合时 MapReduce 。Hadoop为这种组合关系提供了一种执行和控制机制,Hadoop通过job和jobControl类提供具体的编程方法。Job除了维护子任务的配置信息,还维护子任务的依赖关系,而jobControl控制整个作业流程,把所有的子任务作业加入到JobControl中,执行JobControl的run()方法即可运行程序。

伪代码示例:

Configuration job1conf = new Configuration();

Job job1 = new Job(job1conf,"Job1");
.........//job1 其他设置
Configuration job2conf = new Configuration();
Job job2 = new Job(job2conf,"Job2");
.........//job2 其他设置
Configuration job3conf = new Configuration();
Job job3 = new Job(job3conf,"Job3");
.........//job3 其他设置
job3.addDepending(job1);//设置job3和job1的依赖关系
job3.addDepending(job2);
JobControl jc = new JobControl("123");
jc.addJob(job1);//把三个job加入到jobcontorl中
jc.addJob(job2);
jc.addJob(job3);
jc.run();
 

3.链式MapReduce

首先看一下例子,来说明为什么要有链式MapReduce,假设在统计单词是,会出现这样的词,make,made,making等,他们都属于一个词,在单词累加的时候,都归于一个词。解决的方法为增加多个MapReduce作业,但这将增加整个作业的处理周期以及I/O操作,因而处理效率不高。

一个较好的办法就是在核心的MapReduce之外,增加一个辅助的Map过程,然后将这个辅助的Map过程和核心的 MapReduce 过程合并为一个链式的 MapReduce ,从而完成整个作业。Hadoop提供了专门的链式ChainMapper和ChainReducer来处理链式任务,ChainMapper允许一个Map任务中添加多个Map的子任务,ChainReducer可以在Reducer执行之后,在加入多个Map的子任务。

 

 

 

参考:MapReduce,组合式,迭代式,链式

 


 

分享到:
评论
1 楼 chenwq 2012-05-27  
http://blog.csdn.net/RiverM?viewmode=contents

相关推荐

    MapReduce求行平均值--标准差--迭代器处理--MapReduce案例

    在“迭代器处理”方面,MapReduce框架中的迭代器可以帮助更高效地处理数据。例如,可以使用自定义的迭代器来实现累加、累乘等操作,这在计算平均值和标准差时非常有用。迭代器可以在中间结果上直接进行操作,减少了...

    MapReduce基础.pdf

    - **不适合迭代计算**:MapReduce不支持迭代式的算法,这限制了其在某些复杂数据分析场景中的应用。 - **内存管理**:虽然MapReduce能够处理大规模数据,但在处理特别大的数据集时可能会遇到内存不足的问题。 综上...

    比较Spark和MapReduce执行迭代应用的性能差异源码+学习说明(课程作业).zip

    【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业...比较Spark和MapReduce执行迭代应用的性能差异源码+学习说明(课程作业).zip

    通过实例让你真正明白mapreduce填空式、分布(分割)编程

    ### MapReduce填空式与分布式编程详解 #### 一、MapReduce概述 MapReduce是一种用于处理大规模数据集的编程模型,最初由Google提出并应用于其内部的数据处理任务中。它利用分布式计算的思想,将大数据处理任务分解...

    基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf

    尽管Hadoop提供了链式MapReduce接口来减少作业数量,但这种原生机制仍存在不足。 在本文的研究中,作者首先对MapReduce作业的执行流程进行了深入研究,并分析了现有的开源ETL项目,以理解大数据ETL处理的现状。通过...

    Experiment-of-a-Comparison-with-Iterative-MR-frameworks:我们为在迭代 MapReduce 框架上运行迭代算法而实现的示例代码

    这里的“迭代MapReduce”是指在MapReduce模型基础上支持连续多轮数据处理的框架,适用于机器学习和图处理等需要多次迭代的算法。 1. **迭代MapReduce的概念**:传统的MapReduce模型中,数据处理通常是一次性的,即...

    kmeans(mapreduce)

    这个过程可以视为“更新”步骤,但因为MapReduce模型不支持原地更新,所以需要再次运行MapReduce作业,将新的质心作为输入,开始下一轮迭代。 4. **迭代过程**:重复上述过程,直到质心不再明显变化或者达到预设的...

    mapreduce mapreduce mapreduce

    MapReduce是一种分布式计算模型,由Google开发,用于处理和生成大量数据。这个模型主要由两个主要阶段组成:Map(映射)和Reduce(规约)。MapReduce的核心思想是将复杂的大规模数据处理任务分解成一系列可并行执行...

    一种基于改进的链式MapReduce的并行ETL应用

    提出一种改进的链式MapReduce 框架,并将此框架应用于一个并行ETL 工具,同时提出一些针对ETL 处理的流程级优化规则,使ETL流程产生更少的MapReduce作业,从而减少I/O以及网络传输的消耗;利用某省份手机上网数据与...

    实验项目 MapReduce 编程

    实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...

    基于MapReduce实现决策树算法

    基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...

    mars_cuda_mapreduce

    在所提供的部分内容中,我们了解到Mars系统不仅在NVIDIA GPU上运行,还支持AMD GPU、多核心CPU以及它们的组合式并行处理,甚至是分布式系统如Hadoop。系统被划分为不同模块,以便在不同的硬件平台上运行。MarsCUDA指...

    MapReduce发明人关于MapReduce的介绍

    MapReduce的设计灵感来源于Lisp和其他函数式语言中的`map`和`reduce`原语。其工作流程可以分为两个阶段:`map`操作和`reduce`操作。 - **`map`操作**:对输入数据集中的每个逻辑记录应用`map`函数,计算出一系列...

    函数式编程语言和MapReduce

    在当今信息技术领域中,函数式编程语言和MapReduce技术是处理大数据问题的重要手段。函数式编程语言以高阶函数为基础,通过将函数作为参数传递和返回作为结果,提供了一种简洁而强大的编程范式。而MapReduce是一种...

    mapreduce在hadoop实现词统计和列式统计

    在这个场景中,我们将讨论如何使用Hadoop的MapReduce来实现词统计和列式统计。 **一、MapReduce原理** MapReduce的工作流程主要包括三个主要阶段:Map、Shuffle(排序)和Reduce。在Map阶段,输入数据被分割成多个...

    用MapReduce实现KMeans算法

    3. **迭代控制**:由于MapReduce的并行特性,迭代过程的控制相对复杂,通常通过设置固定的迭代次数或者判断中心点的变化幅度来决定是否停止。 4. **中间结果处理**:在MapReduce中,中间结果的存储和传递也是关键。...

    MapReduce高阶实现

    - **Spark和Hadoop的比较**:Spark提供了更高效的内存计算,适用于迭代算法和交互式数据分析,但MapReduce更适合批处理任务。 6. **案例研究**: - 实际应用中,MapReduce被广泛应用于搜索引擎的索引构建、推荐...

    Hadoop迭代式计算框架Guagua.zip

    Guagua目前主要支持的是同步的Master-Workers结构的迭代式计算框架,今后我们希望能够支持异步方式的迭代计算框架,2012年Google MapReduce之父Jeff Dean发表了一篇论文,上面提到了对神经网络深度模型的支持,文章...

Global site tag (gtag.js) - Google Analytics