`

MapReduce 两列数据升序排列

 
阅读更多

 

数据

3	3
3	2
3	1
2	2
2	1
1	1

 两列数据按升序排列

 

static class SortMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
			String val = value.toString();
			String[] vals = val.split("\t");
			if (vals.length == 2)
				context.write(new LongWritable(Long.parseLong(vals[0])), new LongWritable(Long.parseLong(vals[1])));
		}
	}

	static class SortReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {

		@Override
		protected void reduce(LongWritable key, Iterable<LongWritable> value, Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
			Iterator<LongWritable> iterator = value.iterator();
			while (iterator.hasNext()) {
				LongWritable next = iterator.next();
				context.write(key, next);
			}
		}
	}

 输出结果

1	1
2	2
2	1
3	3
3	2
3	1

 

MapReduce默认只会对key进行排序,创建一个新的数据类型,用于保存两列数据,新的类型实现WritableComparable接口,复写compareTo()实现两列数据的比较

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import utils.HDPConstants;

public class SortApp2 extends Configured implements Tool {

	static class SortMapper extends Mapper<LongWritable, Text, DataBean, NullWritable> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DataBean, NullWritable>.Context context) throws IOException, InterruptedException {
			String val = value.toString();
			String[] vals = val.split("\t");
			if (vals.length == 2)
				context.write(new DataBean(Long.parseLong(vals[0]), Long.parseLong(vals[1])), NullWritable.get());
		}
	}

	static class SortReducer extends Reducer<DataBean, NullWritable, LongWritable, LongWritable> {

		@Override
		protected void reduce(DataBean key, Iterable<NullWritable> value, Reducer<DataBean, NullWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(key.getFirst()), new LongWritable(key.getSecond()));

		}
	}

	/**
	 * 
	 * @author Rock Lee
	 *  
	 * @Description 保存两列数据,进行比较
	 */
	static class DataBean implements WritableComparable<DataBean> {
		private Long first;
		private Long second;

		public DataBean() {
			super();
		}

		public DataBean(Long first, Long second) {
			super();
			this.first = first;
			this.second = second;
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(first);
			out.writeLong(second);
		}

		public Long getFirst() {
			return this.first;
		}

		public void setFirst(Long first) {
			this.first = first;
		}

		public Long getSecond() {
			return this.second;
		}

		public void setSecond(Long second) {
			this.second = second;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			first = in.readLong();
			second = in.readLong();
		}

		@Override
		public int compareTo(DataBean o) {
			long offset = this.first - o.first;
			if (offset != 0)
				return (int) offset;
			return (int) (this.second - o.second);
		}

		@Override
		public int hashCode() {
			final int prime = 31;
			long result = 1L;
			result = prime * result + this.first;
			result = prime * result + this.second;
			return (int) result;
		}

		@Override
		public boolean equals(Object obj) {
			if (this == obj)
				return true;
			if (obj == null)
				return false;
			if (getClass() != obj.getClass())
				return false;
			DataBean other = (DataBean) obj;
			if (this.first != other.first)
				return false;
			if (this.second != other.second)
				return false;
			return true;
		}

		@Override
		public String toString() {
			return "DataBean [first=" + this.first + ", second=" + this.second + "]";
		}

	}

	@Override
	public int run(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = new Job(conf, SortApp2.class.getSimpleName());

		checkOutputPath(conf);

		job.setMapperClass(SortMapper.class);
		job.setReducerClass(SortReducer.class);

		job.setOutputKeyClass(DataBean.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(HDPConstants.SORT_APP_INPUT_PATH));
		FileOutputFormat.setOutputPath(job, new Path(HDPConstants.SORT_APP_OUTPUT_PATH));

		boolean exitCode = job.waitForCompletion(true);
		return exitCode ? 1 : 0;
	}

	private void checkOutputPath(Configuration configuration) throws IOException, URISyntaxException {
		final FileSystem fileSystem = FileSystem.get(new URI(HDPConstants.SORT_APP_INPUT_PATH), configuration);
		final Path outPath = new Path(HDPConstants.SORT_APP_OUTPUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
			System.out.println("delete -->" + HDPConstants.SORT_APP_OUTPUT_PATH);
		}
	}

	public static void main(String[] args) throws Exception {
		int status = ToolRunner.run(new SortApp2(), args);
		System.exit(status);
	}
}

 输出结果

1	1
2	1
2	2
3	1
3	2
3	3

 

 

 

分享到:
评论

相关推荐

    mapreduce项目 数据清洗

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。它将复杂的并行计算任务分解成两个主要阶段:Map(映射)和Reduce(化简)。在这个"MapReduce项目 数据清洗"中,我们将探讨...

    【MapReduce篇07】MapReduce之数据清洗ETL1

    MapReduce之数据清洗ETL详解 MapReduce是一种基于Hadoop的分布式计算框架,广泛应用于大数据处理领域。数据清洗(Data Cleaning)是数据处理过程中非常重要的一步,旨在清洁和转换原始数据,使其更加可靠和有用。...

    MapReduce算法

    - **数据冗余**:为了进一步提高系统的可靠性和容错性,MapReduce通常会在多个节点上复制数据,确保即使某些节点发生故障,数据也不会丢失。 总之,MapReduce作为一种高效的大规模数据处理框架,在大数据领域具有...

    使用MapReduce对数据文件进行切分

    标题中的“使用MapReduce对数据文件进行切分”是指在大数据处理领域,通过Apache Hadoop的MapReduce框架来对大规模数据文件进行分割和处理的方法。MapReduce是一种分布式计算模型,它将复杂的、大规模的数据处理任务...

    MapReduce数据分析实战

    MapReduce是一种编程模型,用于处理和生成大数据集的并行运算。它由Google提出,并被Apache Hadoop框架广泛采用。MapReduce模型将复杂的数据处理过程分解为两个阶段:Map(映射)和Reduce(归约)。简单来说,Map...

    MapReduce海量数据处理

    MapReduce是一种分布式计算模型,由Google为处理和生成大规模数据集而设计。它极大地简化了在大规模集群上执行数据密集型任务的过程,隐藏了复杂的并行处理、容错、数据分发、负载均衡等底层细节。 1. **MapReduce...

    mapreduce气象数据(用于测试)

    MapReduce是Apache Hadoop的核心组件之一,主要用于处理和分析大规模数据。在这个名为“mapreduce气象数据(用于测试)”的压缩包中,我们有一个可能是用于教学目的的数据集,旨在帮助初学者理解如何在Hadoop环境下...

    基于MapReduce的数据挖掘平台设计与实现.pdf

    总结以上内容,本文介绍的基于MapReduce的数据挖掘平台设计与实现,不仅涵盖了MapReduce和Hadoop平台的基础知识,还深入探讨了模型驱动开发方法、数据适应性、数据可视化等在大数据背景下的重要应用,对于从事数据...

    基于MapReduce+Pandas的电影排名与推荐以及数据分析与可视化展示

    基于MapReduce+Pandas的电影排名与推荐以及数据分析与可视化展示 数据科学与大数据技术领域中,电影排名与推荐系统的开发是非常重要的一部分。该系统可以通过对电影数据的分析和处理,提供电影排名和推荐服务,满足...

    使用MapReduce进行数据密集型文本处理(Jimmy Lin)Data-Intensive Text Processing with MapReduce (Jimmy Lin)

    在当前的IT行业中,MapReduce作为一种处理大规模数据集的编程模型,其在数据密集型文本处理中的应用已变得极为重要。《使用MapReduce进行数据密集型文本处理》一书由Jimmy Lin所著,是该领域内的一本重要参考文献。...

    MapReduce操作实例-数据去重.pdf

    MapReduce是分布式计算的一种编程模型,常用于处理大规模数据集。在这个实例中,我们看到的是一个基于MapReduce的数据去重操作,这个操作在大数据处理中非常常见,尤其是当处理的数据源包含重复记录时。下面将详细...

    云计算环境下基于MapReduce的并行化排列熵算法.pdf

    尽管排列熵在众多领域都取得了较好的效果,但当对海量的历史监测数据进行批量特征提取时,传统的基于企业关系型数据库或文件存储方式以及单机下的数据处理方法,难以应对存储性能和计算性能的双重挑战。因此,对于大...

    基于MapReduce的招聘数据清洗项目(免费提供源码)

    基于MapReduce的招聘数据清洗项目是一种高效处理和整理大量招聘数据的方法。MapReduce是一种分布式计算模型,由谷歌提出,广泛应用于大规模数据处理。该项目旨在通过MapReduce框架,将原始招聘数据进行清洗、规范化...

    大数据技术:MapReduce、数据仓库Hive单元测试与答案.pdf

    大数据技术:MapReduce、数据仓库Hive单元测试与答案 本资源摘要信息涵盖了大数据技术中 MapReduce 和数据仓库 Hive 的重要知识点,包括 MapReduce 框架、Hive 数据仓库、Impala 等相关概念和技术。 MapReduce ...

    0324大数据代码与数据_JAVA大数据_文本分析_运用MapReduce做数据分析_

    在大数据处理领域,Java语言因其稳定性和可扩展性,成为了实现MapReduce算法的首选工具。MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。在这个项目"0324大数据代码与数据_JAVA...

    基于MapReduce的电信数据清洗系统设计与实现

    内容概要:本文详细介绍了如何使用MapReduce框架设计和实现一个电信数据清洗系统,涵盖数据预处理、无效数据过滤、重复数据检测与删除以及数据格式转换等关键技术步骤。通过具体的代码示例,解释了各阶段的实现细节...

    java大数据作业_5Mapreduce、数据挖掘

    【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...

Global site tag (gtag.js) - Google Analytics