`

MapReduce牛逼(3)(继承WritableComparable)实现自定义key键,实现二重排序

 
阅读更多

package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class SortText {
	

	private static final String INPUT_PATH = "hdfs://hadoop.master:9000/data1";
	private static final String OUTPUT_PATH = "hdfs://hadoop.master:9000/outSort";

	public static void main(String[] args) throws Exception { 

		FileSystem fileSystem = FileSystem.get(new Configuration());
		boolean exists = fileSystem.exists(new Path(OUTPUT_PATH));
		if(exists){
			fileSystem.delete(new Path(OUTPUT_PATH),true);
		}
		
		Job job=new Job(new Configuration(),SortText.class.getName());
		job.setJarByClass(SortText.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		FileInputFormat.setInputPaths(job,new Path(INPUT_PATH));
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(MyKey.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		
//		job.setGroupingComparatorClass(cls);
		
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		
		job.waitForCompletion(true);
		
	}

	static class MyMapper extends
			Mapper<LongWritable, Text, MyKey, LongWritable> {
		protected void map(
				LongWritable key,
				Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, MyKey, LongWritable>.Context context)
				throws java.io.IOException, InterruptedException {

			String[] split = value.toString().split("\t");
			context.write(
					new MyKey(Long.parseLong(split[0]), Long
							.parseLong(split[1])),
					new LongWritable(Long.parseLong(split[1])));

		};
	}

	static class MyReducer extends
			Reducer<MyKey, LongWritable, LongWritable, LongWritable> {
		protected void reduce(
				MyKey arg0,
				java.lang.Iterable<LongWritable> arg1,
				org.apache.hadoop.mapreduce.Reducer<MyKey, LongWritable, LongWritable, LongWritable>.Context arg2)
				throws java.io.IOException, InterruptedException {

//			for (LongWritable w : arg1) {
				arg2.write(new LongWritable(arg0.k), new LongWritable(arg0.v));
//			}

		};
	}

	static class MyKey implements WritableComparable<MyKey> {

		long k;
		long v;

		public MyKey() {
		}

		public MyKey(long k, long v) {
			this.k = k;
			this.v = v;
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(k);
			out.writeLong(v);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.k = in.readLong();
			this.v = in.readLong();
		}

		@Override
		public int compareTo(MyKey o) {
			if (this.k == o.k) {
				return (int) (this.v - o.v);// v
			} else {
				return (int) (o.k - this.k);// k
			}
		}

		@Override
		public int hashCode() {
			final int prime = 31;
			int result = 1;
			result = prime * result + (int) (k ^ (k >>> 32));
			result = prime * result + (int) (v ^ (v >>> 32));
			return result;
		}

		@Override
		public boolean equals(Object obj) {
			if (this == obj)
				return true;
			if (obj == null)
				return false;
			if (getClass() != obj.getClass())
				return false;
			MyKey other = (MyKey) obj;
			if (k != other.k)
				return false;
			if (v != other.v)
				return false;
			return true;
		}

	}

}


分享到:
评论

相关推荐

    mapreduce wc单词计数 自定义分区 自定义排序实现

    在这个特定的案例中,我们不仅实现了基本的WordCount功能,还扩展了MapReduce的能力,通过自定义分区和自定义排序来优化数据处理流程。 首先,基础的`WordCount`实现,通常包含以下四个步骤: 1. **Map阶段**:...

    MapReduce2中自定义排序分组

    这允许我们为自定义键类型提供比较规则。创建此类时,需要指定键的类类型,并在 `compare` 方法中实现具体的比较逻辑。 在实现自定义排序和分组后,需要在 JobConf 中配置这些类。例如,通过 `job....

    16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN

    Comparator用于自定义键的比较规则,而Partitioner则控制哪些键将在同一台Reducer上处理。 三、分区 默认情况下,MapReduce将所有键均匀地分配到所有Reducer上。然而,有时我们可能希望特定的键只在某些Reducer上...

    MapReduce模型--二次排序

    二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时,数据是根据Key来组织和排序的,而Value则没有这种组织结构,因此排序操作只能针对Key来执行。在...

    自定义MapReduce的InputFormat

    3. **实现`createRecordReader()`方法**:此方法返回一个实现了`org.apache.hadoop.mapreduce.RecordReader`接口的对象。RecordReader负责从split中读取并解析单个记录,将其转换为键值对的形式。 4. **自定义...

    基于MapReduce实现决策树算法

    基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...

    MapReduce自定义Key实现获取学生最高成绩 课程设计

    这段Java代码是一个Hadoop MapReduce程序,用于处理输入数据并计算每个不同词汇的最高分数。它包含了配置和运行MapReduce作业的逻辑,以及Mapper和Reducer类的定义。主要功能是读取输入数据,将数据拆分成词汇和相关...

    mapreduce二次排序

    3. **Comparator定制**:Comparator是关键所在,它用于在Reduce阶段对键进行排序。这里我们需要两个Comparator:一个用于年份的升序排序,另一个用于气温的降序排序。在Comparator中,我们首先比较年份,如果年份...

    基于Java实现的简易MapReduce框架.zip

    《基于Java实现的简易MapReduce框架》 在大数据处理领域,Hadoop是一个不可或缺的重要工具,它为海量数据的存储和处理提供了分布式计算框架。而MapReduce是Hadoop的核心组件之一,用于处理和生成大规模数据集。这个...

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

    MapReduce之自定义OutPutFormat.md

    MapReduce之自定义 OutPutFormat,通过一个案例,实现自定义的一个OutPutFormat,来更加的深刻的理解MR的过程

    MapReduce模型--自定义数据类型

    实现了WritableComparable接口的类,不仅可以将对象写入到Hadoop的数据流中,还能在MapReduce框架中比较这些对象,这对于排序、分组等操作是必不可少的。 接下来,我们以Person类为例,介绍如何自定义一个数据类型...

    mapreduce 自定义分隔符源码

    通过以上步骤,我们就成功地实现了自定义分隔符的功能,使得MapReduce能够处理具有复杂格式的日志数据。在实际应用中,可能还需要考虑其他因素,如错误处理、性能优化等。记住,自定义分隔符的目的是为了更好地适应...

    分布式文件系统实例-mapreduce-排序

    同时,系统会默认对这些键值对进行排序,这是MapReduce实现全局排序的关键步骤。在这个实例中,数据会被按照键(即原始记录)的自然顺序进行排序。 ### Reduce阶段 Reduce阶段接收到经过排序的键值对,每个reduce...

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...

    Hadoop中MapReduce基本案例及代码(五)

    对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。

    MapReduce二次排序

    MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序

    MapReduce实现矩阵相乘算法

    本话题将深入探讨如何使用Hadoop MapReduce实现两个矩阵相乘的算法,这在数据分析、机器学习以及高性能计算中有着重要应用。 首先,理解矩阵相乘的基本原理至关重要。矩阵相乘不是简单的元素对元素相乘,而是对应...

    利用采样器实现mapreduce任务输出全排序

    利用采样器实现mapreduce任务输出全排序大数据-MapReduce

Global site tag (gtag.js) - Google Analytics