`
josephgao
  • 浏览: 15206 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

map reduce 任务串联

 
阅读更多

import com.vividsolutions.jts.io.WKTReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.text.SimpleDateFormat;

public class DependencyJob {


    static class FirstMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override
        public void map(LongWritable key, Text value, Context context) {
            String[] a = value.toString().split("\t");
            String time = a[1];
            try {
                context.write(new Text(time), new LongWritable(1L));
            } catch (IOException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            } catch (InterruptedException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            }
        }
    }

    static class FirstReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
        public void reduce(Text key, Iterable<LongWritable> values, Context context) {
            long a = 0;
            for(LongWritable l : values){
                a += l.get();
            }
            try {
                context.write(new Text(key), new LongWritable(a));
            } catch (IOException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            } catch (InterruptedException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            }
        }
    }

    static class SecondMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override
        public void map(LongWritable key, Text value, Context context) {
            String[] a = value.toString().split("\t");
            String time = a[0];
            Long l = time == null? 0: Long.parseLong(time)%1000;
            try {
                context.write(new Text(l.toString()), new LongWritable(1L));
            } catch (IOException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            } catch (InterruptedException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            }
        }
    }

    static class SecondReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
        public void reduce(Text key, Iterable<LongWritable> values, Context context) {
            long a = 0;
            for(LongWritable l : values){
                a += l.get();
            }
            try {
                context.write(new Text(key), new LongWritable(a));
            } catch (IOException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            } catch (InterruptedException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            }
        }
    }

    public static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");

    public static long stime, etime;

    public static WKTReader rd = new WKTReader();

    public static void main(String[] args) throws Exception {
        if (args.length != 3) {
            System.out.println("lack arg");
            System.exit(0);
        }
        Configuration configuration = new Configuration();
        JobControl jobControl = new JobControl("dependency");

        Job job = new Job();
        job.setJobName("first");
        job.setJarByClass(DependencyJob.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(FirstMapper.class);
        job.setReducerClass(FirstReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        job.setNumReduceTasks(8);//
        ControlledJob controlledJob = new ControlledJob(configuration);
        controlledJob.setJob(job);

        Job job2 = new Job();
        job2.setJobName("second");
        job2.setJarByClass(DependencyJob.class);
        FileInputFormat.addInputPath(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));
        job2.setMapperClass(SecondMapper.class);
        job2.setReducerClass(SecondReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(LongWritable.class);
        job.setNumReduceTasks(8);//
        ControlledJob controlledJob2 = new ControlledJob(configuration);
        controlledJob2.setJob(job2);

        controlledJob2.addDependingJob(controlledJob);
        jobControl.addJob(controlledJob);
        jobControl.addJob(controlledJob2);
        jobControl.run();
    }

}
分享到:
评论

相关推荐

    尚硅谷大数据技术之Hadoop(MapReduce)1

    - MapReduce程序通常包含一个Map阶段和一个Reduce阶段,但可以利用多个MapReduce作业串联处理复杂逻辑。 - 数据的分区、排序和组合过程是MapReduce内在的一部分,确保了数据的正确性和一致性。 总之,MapReduce是...

    hadoop wordcount

    在本例中,尽管WordCount的Map和Reduce任务可以并行执行,但最终结果需要按照单词频率进行排序。由于Hadoop默认不保证Map阶段的输出顺序,因此可能需要额外的排序步骤。这可以通过创建一个新的MapReduce任务实现,该...

    mrrr:Map-Reduce Recipe Runner - 一种非常简单的构建 Hadoop 作业的方法

    MRRR 是一种将 map-reduce 工作流串联在一起的工具,完全基于简单的 Java 方法。 了解 MRRR 的最佳方式是通过示例。 给定用户提供的具有以下类的 Jar,您可以使用以下 MRRR 工作流处理数据: mylib.jar 中的类 MRRR...

    基于流水线的MapReduce.zip

    TaskTracker则负责执行具体的Map和Reduce任务。 具体到"MapReduce_Pipeline-master"这个项目,我们可以预见到其中可能包含的组件和代码结构。可能有以下几个部分: 1. 输入和输出格式:定义数据的输入和输出格式,...

    Flink笔记.md

    但不得不承认MapReduce在矢量编程结构过于简单,在完成一些比较复杂的高阶计算(例如:机器学习线性回归)的时候,需要将多个MapReduce任务串联起来才能完成一个复杂的计算逻辑,因此在早期人们需要在编写完多个job...

    Hadoop-MapReduce.docx

    MapReduce 的基本理念是将复杂的计算任务分解为两个主要阶段:Map阶段和Reduce阶段。 1. **MapReduce 定义** MapReduce 是一个分布式计算框架,允许开发者编写业务逻辑,然后在 Hadoop 集群上并行执行。这个框架...

    MapReduce核心思想图文详解

    - **并发实例**:Reduce任务也可以并行执行,但它们的输入是Map阶段所有输出的集合,而不是单独的Map任务输出。因此,Reduce任务之间不存在直接的数据交互,但它们都依赖Map阶段的全局结果。 4. **限制与扩展性**...

    java大数据作业_5Mapreduce、数据挖掘

    组合式MapReduce则是指通过多个MapReduce步骤连续处理数据,每个步骤可能涉及多个Map和Reduce任务,它们可以串联起来,形成一个处理管道。 3. **HBase的Map、Reduce继承类和序列化类**: HBase本身并不直接使用...

    大数据技术Hadoop+Spark-hadoop和spark

    - **阶段划分**:Map和Reduce阶段之间是相互独立的,但Reduce阶段的输入依赖于所有Map任务的输出。 #### 三、Spark 技术特点和概述 ##### Spark 概述 - **定义**:Apache Spark是一种快速、通用的大数据分析引擎,...

    Hadoop - Hadoop in Action

    - **ChainMapper和ChainReducer**:在一个MapReduce作业中串联多个Map和Reduce任务。 - **Counters和Progress监控**:监控作业进度和性能指标。 #### Hadoop编程实践 除了基础的MapReduce编程外,Hadoop生态系统还...

    hadoop实践

    MapReduce程序通常由Map任务和Reduce任务组成。 3. **YARN**:作为资源管理系统,负责集群资源的分配和调度,为各种应用程序提供统一的资源管理框架。 #### 四、编写基本的MapReduce程序 MapReduce是Hadoop中的一...

    基于用户共现矩阵乘子的分布式协同过滤推荐.pdf

    为了更好地实现分布式协同过滤推荐,本文提出了多子任务串联执行的策略。利用MapReduce框架的优化,能够在分布式计算环境中获得更好的预测准确性和效率。实验结果表明,本文提出的基于用户共现矩阵乘子的分布式协同...

    前端开源库-fluentflow

    在这个例子中,`filter`、`map`和`reduce`方法按照顺序定义了数据流的处理步骤,最后通过`then`来处理最终结果。 **FluentFlow的功能特性** 1. **链式操作**:FluentFlow支持链式调用,使得代码结构清晰,可读性强...

    基于hadoop编写的测试代码.zip

    MapReduce是Hadoop的计算模型,它将复杂的数据处理任务分解为两个阶段:Map阶段和Reduce阶段。在Map阶段,数据被分发到各个节点进行并行处理;在Reduce阶段,处理结果被聚合,形成最终输出。这种并行处理模式使得...

Global site tag (gtag.js) - Google Analytics