package sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class SortText {
private static final String INPUT_PATH = "hdfs://hadoop.master:9000/data1";
private static final String OUTPUT_PATH = "hdfs://hadoop.master:9000/outSort";
public static void main(String[] args) throws Exception {
FileSystem fileSystem = FileSystem.get(new Configuration());
boolean exists = fileSystem.exists(new Path(OUTPUT_PATH));
if(exists){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
Job job=new Job(new Configuration(),SortText.class.getName());
job.setJarByClass(SortText.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job,new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(LongWritable.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
// job.setGroupingComparatorClass(cls);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
}
static class MyMapper extends
Mapper<LongWritable, Text, MyKey, LongWritable> {
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, MyKey, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
String[] split = value.toString().split("\t");
context.write(
new MyKey(Long.parseLong(split[0]), Long
.parseLong(split[1])),
new LongWritable(Long.parseLong(split[1])));
};
}
static class MyReducer extends
Reducer<MyKey, LongWritable, LongWritable, LongWritable> {
protected void reduce(
MyKey arg0,
java.lang.Iterable<LongWritable> arg1,
org.apache.hadoop.mapreduce.Reducer<MyKey, LongWritable, LongWritable, LongWritable>.Context arg2)
throws java.io.IOException, InterruptedException {
// for (LongWritable w : arg1) {
arg2.write(new LongWritable(arg0.k), new LongWritable(arg0.v));
// }
};
}
static class MyKey implements WritableComparable<MyKey> {
long k;
long v;
public MyKey() {
}
public MyKey(long k, long v) {
this.k = k;
this.v = v;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(k);
out.writeLong(v);
}
@Override
public void readFields(DataInput in) throws IOException {
this.k = in.readLong();
this.v = in.readLong();
}
@Override
public int compareTo(MyKey o) {
if (this.k == o.k) {
return (int) (this.v - o.v);// v
} else {
return (int) (o.k - this.k);// k
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (k ^ (k >>> 32));
result = prime * result + (int) (v ^ (v >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MyKey other = (MyKey) obj;
if (k != other.k)
return false;
if (v != other.v)
return false;
return true;
}
}
}
分享到:
相关推荐
在这个特定的案例中,我们不仅实现了基本的WordCount功能,还扩展了MapReduce的能力,通过自定义分区和自定义排序来优化数据处理流程。 首先,基础的`WordCount`实现,通常包含以下四个步骤: 1. **Map阶段**:...
这允许我们为自定义键类型提供比较规则。创建此类时,需要指定键的类类型,并在 `compare` 方法中实现具体的比较逻辑。 在实现自定义排序和分组后,需要在 JobConf 中配置这些类。例如,通过 `job....
Comparator用于自定义键的比较规则,而Partitioner则控制哪些键将在同一台Reducer上处理。 三、分区 默认情况下,MapReduce将所有键均匀地分配到所有Reducer上。然而,有时我们可能希望特定的键只在某些Reducer上...
二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时,数据是根据Key来组织和排序的,而Value则没有这种组织结构,因此排序操作只能针对Key来执行。在...
3. **实现`createRecordReader()`方法**:此方法返回一个实现了`org.apache.hadoop.mapreduce.RecordReader`接口的对象。RecordReader负责从split中读取并解析单个记录,将其转换为键值对的形式。 4. **自定义...
基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...
这段Java代码是一个Hadoop MapReduce程序,用于处理输入数据并计算每个不同词汇的最高分数。它包含了配置和运行MapReduce作业的逻辑,以及Mapper和Reducer类的定义。主要功能是读取输入数据,将数据拆分成词汇和相关...
3. **Comparator定制**:Comparator是关键所在,它用于在Reduce阶段对键进行排序。这里我们需要两个Comparator:一个用于年份的升序排序,另一个用于气温的降序排序。在Comparator中,我们首先比较年份,如果年份...
《基于Java实现的简易MapReduce框架》 在大数据处理领域,Hadoop是一个不可或缺的重要工具,它为海量数据的存储和处理提供了分布式计算框架。而MapReduce是Hadoop的核心组件之一,用于处理和生成大规模数据集。这个...
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
MapReduce之自定义 OutPutFormat,通过一个案例,实现自定义的一个OutPutFormat,来更加的深刻的理解MR的过程
实现了WritableComparable接口的类,不仅可以将对象写入到Hadoop的数据流中,还能在MapReduce框架中比较这些对象,这对于排序、分组等操作是必不可少的。 接下来,我们以Person类为例,介绍如何自定义一个数据类型...
通过以上步骤,我们就成功地实现了自定义分隔符的功能,使得MapReduce能够处理具有复杂格式的日志数据。在实际应用中,可能还需要考虑其他因素,如错误处理、性能优化等。记住,自定义分隔符的目的是为了更好地适应...
同时,系统会默认对这些键值对进行排序,这是MapReduce实现全局排序的关键步骤。在这个实例中,数据会被按照键(即原始记录)的自然顺序进行排序。 ### Reduce阶段 Reduce阶段接收到经过排序的键值对,每个reduce...
【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...
对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。
MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序
本话题将深入探讨如何使用Hadoop MapReduce实现两个矩阵相乘的算法,这在数据分析、机器学习以及高性能计算中有着重要应用。 首先,理解矩阵相乘的基本原理至关重要。矩阵相乘不是简单的元素对元素相乘,而是对应...
利用采样器实现mapreduce任务输出全排序大数据-MapReduce