import com.vividsolutions.jts.io.WKTReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.text.SimpleDateFormat;
public class DependencyJob {
static class FirstMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
public void map(LongWritable key, Text value, Context context) {
String[] a = value.toString().split("\t");
String time = a[1];
try {
context.write(new Text(time), new LongWritable(1L));
} catch (IOException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
static class FirstReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) {
long a = 0;
for(LongWritable l : values){
a += l.get();
}
try {
context.write(new Text(key), new LongWritable(a));
} catch (IOException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
static class SecondMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
public void map(LongWritable key, Text value, Context context) {
String[] a = value.toString().split("\t");
String time = a[0];
Long l = time == null? 0: Long.parseLong(time)%1000;
try {
context.write(new Text(l.toString()), new LongWritable(1L));
} catch (IOException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
static class SecondReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) {
long a = 0;
for(LongWritable l : values){
a += l.get();
}
try {
context.write(new Text(key), new LongWritable(a));
} catch (IOException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
public static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
public static long stime, etime;
public static WKTReader rd = new WKTReader();
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.out.println("lack arg");
System.exit(0);
}
Configuration configuration = new Configuration();
JobControl jobControl = new JobControl("dependency");
Job job = new Job();
job.setJobName("first");
job.setJarByClass(DependencyJob.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(FirstMapper.class);
job.setReducerClass(FirstReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(8);//
ControlledJob controlledJob = new ControlledJob(configuration);
controlledJob.setJob(job);
Job job2 = new Job();
job2.setJobName("second");
job2.setJarByClass(DependencyJob.class);
FileInputFormat.addInputPath(job2, new Path(args[1]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
job2.setMapperClass(SecondMapper.class);
job2.setReducerClass(SecondReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(8);//
ControlledJob controlledJob2 = new ControlledJob(configuration);
controlledJob2.setJob(job2);
controlledJob2.addDependingJob(controlledJob);
jobControl.addJob(controlledJob);
jobControl.addJob(controlledJob2);
jobControl.run();
}
}
分享到:
相关推荐
- MapReduce程序通常包含一个Map阶段和一个Reduce阶段,但可以利用多个MapReduce作业串联处理复杂逻辑。 - 数据的分区、排序和组合过程是MapReduce内在的一部分,确保了数据的正确性和一致性。 总之,MapReduce是...
在本例中,尽管WordCount的Map和Reduce任务可以并行执行,但最终结果需要按照单词频率进行排序。由于Hadoop默认不保证Map阶段的输出顺序,因此可能需要额外的排序步骤。这可以通过创建一个新的MapReduce任务实现,该...
MRRR 是一种将 map-reduce 工作流串联在一起的工具,完全基于简单的 Java 方法。 了解 MRRR 的最佳方式是通过示例。 给定用户提供的具有以下类的 Jar,您可以使用以下 MRRR 工作流处理数据: mylib.jar 中的类 MRRR...
TaskTracker则负责执行具体的Map和Reduce任务。 具体到"MapReduce_Pipeline-master"这个项目,我们可以预见到其中可能包含的组件和代码结构。可能有以下几个部分: 1. 输入和输出格式:定义数据的输入和输出格式,...
但不得不承认MapReduce在矢量编程结构过于简单,在完成一些比较复杂的高阶计算(例如:机器学习线性回归)的时候,需要将多个MapReduce任务串联起来才能完成一个复杂的计算逻辑,因此在早期人们需要在编写完多个job...
MapReduce 的基本理念是将复杂的计算任务分解为两个主要阶段:Map阶段和Reduce阶段。 1. **MapReduce 定义** MapReduce 是一个分布式计算框架,允许开发者编写业务逻辑,然后在 Hadoop 集群上并行执行。这个框架...
- **并发实例**:Reduce任务也可以并行执行,但它们的输入是Map阶段所有输出的集合,而不是单独的Map任务输出。因此,Reduce任务之间不存在直接的数据交互,但它们都依赖Map阶段的全局结果。 4. **限制与扩展性**...
组合式MapReduce则是指通过多个MapReduce步骤连续处理数据,每个步骤可能涉及多个Map和Reduce任务,它们可以串联起来,形成一个处理管道。 3. **HBase的Map、Reduce继承类和序列化类**: HBase本身并不直接使用...
- **阶段划分**:Map和Reduce阶段之间是相互独立的,但Reduce阶段的输入依赖于所有Map任务的输出。 #### 三、Spark 技术特点和概述 ##### Spark 概述 - **定义**:Apache Spark是一种快速、通用的大数据分析引擎,...
- **ChainMapper和ChainReducer**:在一个MapReduce作业中串联多个Map和Reduce任务。 - **Counters和Progress监控**:监控作业进度和性能指标。 #### Hadoop编程实践 除了基础的MapReduce编程外,Hadoop生态系统还...
MapReduce程序通常由Map任务和Reduce任务组成。 3. **YARN**:作为资源管理系统,负责集群资源的分配和调度,为各种应用程序提供统一的资源管理框架。 #### 四、编写基本的MapReduce程序 MapReduce是Hadoop中的一...
为了更好地实现分布式协同过滤推荐,本文提出了多子任务串联执行的策略。利用MapReduce框架的优化,能够在分布式计算环境中获得更好的预测准确性和效率。实验结果表明,本文提出的基于用户共现矩阵乘子的分布式协同...
在这个例子中,`filter`、`map`和`reduce`方法按照顺序定义了数据流的处理步骤,最后通过`then`来处理最终结果。 **FluentFlow的功能特性** 1. **链式操作**:FluentFlow支持链式调用,使得代码结构清晰,可读性强...
MapReduce是Hadoop的计算模型,它将复杂的数据处理任务分解为两个阶段:Map阶段和Reduce阶段。在Map阶段,数据被分发到各个节点进行并行处理;在Reduce阶段,处理结果被聚合,形成最终输出。这种并行处理模式使得...