数据排序是许多实际任务在执行时要完成的第一项工作,比如学生成绩评比、数据建立索引等。
本次实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础。
实例描述:
对输入文件中的数据进行排序。输入文件中的每行内容均为一个数字,即一个数据。要求在输出中每行有两个间隔的数字,其中,第二个数字代表原始数据,第一个数字代表这个原始数据在原始数据集中的位次。
样例输入:
样例输出:
程序代码
package com.songjy.hadoop.demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sort {
public static class MyMapper extends
Mapper<Object, Text, IntWritable, IntWritable> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
IntWritable data = new IntWritable(Integer.parseInt(line));
context.write(data, new IntWritable(1));
}
}
public static class MyReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private static IntWritable linenum = new IntWritable(1);
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
for (IntWritable v : values) {
context.write(linenum, key);
linenum = new IntWritable(linenum.get() + 1);
}
// linenum = new IntWritable(linenum.get() + 1);//代码放在这输出结果是啥样呢?o(∩_∩)o 哈哈
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.out.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, Sort.class.getName());
job.setJarByClass(Sort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//job.setPartitionerClass(MyPartitioner.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
以上引自书籍《Hadoop实战》第2版的第五章,不过我去掉了自定义Partition部分代码,从结果来看,输出结果仍是正确(参看上面已有截图),是否仍需要自定义Partition的必要,望大牛们指点!
Partition部分代码
/**
* 自定义Partitioner函数,此函数根据输入数据的最大值和MapReduce框架中
* Partitioner的数量获取将输入数据按照大小分块的边界,然后根据输入数值和
* 边界的关系返回对应的Partitioner ID
*/
public static class MyPartitioner extends
Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value,
int numPartitions) {
System.out.println("numPartitions=" + numPartitions);
int maxnum = 652232;
int bound = maxnum / numPartitions + 1;
System.out.println("bound=" + bound);
int keynum = key.get();
for (int i = 0; i < numPartitions; i++) {
if ((keynum < (bound * i)) && (keynum >= (bound * (i - 1))))
//return i - 1;
return (i - 1) >= 0 ? (i - 1) : 0;//partition是从0开始的,默认的返回应该给个0
}
//return -1;
return 0;//partition是从0开始的,默认的返回应该给个0
}
}
下面的错误信息是因为
partition是从0开始的,默认的返回应该给个0
15/04/06 15:32:40 INFO mapred.JobClient: map 0% reduce 0%
15/04/06 15:36:54 INFO mapred.JobClient: Task Id : attempt_201503291109_0008_m_000002_0, Status : FAILED
java.io.IOException: Illegal partition for 26 (-1)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.songjy.hadoop.demo.Sort$MyMapper.map(Sort.java:29)
at com.songjy.hadoop.demo.Sort$MyMapper.map(Sort.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
attempt_201503291109_0008_m_000002_0: numPartitions=1
attempt_201503291109_0008_m_000002_0: bound=652233
15/04/06 15:38:11 INFO mapred.JobClient: Task Id : attempt_201503291109_0008_m_000001_0, Status : FAILED
attempt_201503291109_0008_m_000001_0: numPartitions=1
attempt_201503291109_0008_m_000001_0: bound=652233
15/04/06 15:38:24 INFO mapred.JobClient: Task Id : attempt_201503291109_0008_m_000000_0, Status : FAILED
java.io.IOException: Illegal partition for 2 (-1)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.songjy.hadoop.demo.Sort$MyMapper.map(Sort.java:29)
at com.songjy.hadoop.demo.Sort$MyMapper.map(Sort.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
attempt_201503291109_0008_m_000000_0: numPartitions=1
attempt_201503291109_0008_m_000000_0: bound=652233
15/04/06 15:38:24 INFO mapred.JobClient: Task Id : attempt_201503291109_0008_m_000001_1, Status : FAILED
java.io.IOException: Illegal partition for 5956 (-1)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
- 大小: 10.8 KB
- 大小: 7.7 KB
- 大小: 6.9 KB
分享到:
相关推荐
标题中的“Hadoop排序”指的是Hadoop框架中的MapReduce排序机制。MapReduce是Apache Hadoop的核心组件,主要用于处理和生成大规模数据集。在Hadoop中,数据被分割成多个块,然后并行处理,其中排序是一个关键步骤,...
在Hadoop大数据处理中,MapReduce计算模型是一个关键的组件,尤其在面对大规模数据时,高效的数据排序至关重要。本文将深入解析Hadoop的二次排序(Secondary Sort)原理,这是一个允许用户自定义排序规则以满足特定...
在这个“hadoop分区二次排序示例.zip”压缩包中,我们重点探讨的是如何在Hadoop MapReduce中实现特定的排序逻辑,即二次排序和分区策略。 首先,我们需要理解什么是二次排序。在标准的MapReduce流程中,数据经过map...
- **样例程序**:包括TestDFSIO(用于压力测试HDFS I/O性能)、TeraSort基准套件(衡量Hadoop排序能力)和NameNode基准(nnbench,用于评估NameNode的性能)。这些样例代码详细展示了如何编写和运行Hadoop作业,有...
我们小组主要对基于[hadoop的大规模数据排序算法、海量数据的生成做了一定的研究。我们首先对于hadoop做了初步了解,其次,mapreduce是hadoop的很重要的算法,我们在第二阶段对mapreduce以及一些代码做了分析。第三...
二次排序(Secondary Sort)是Hadoop MapReduce中的一个重要概念,它允许用户自定义数据的最终排序方式,以满足更复杂的排序需求。这篇博客文章(虽然链接无法直接访问,但我们可以根据常规知识来解释这个概念)可能...
在Hadoop MapReduce框架中,shuffle和排序是两个至关重要的步骤,它们发生在map阶段和reduce阶段之间,确保数据被正确地处理和聚合。下面将详细解释这两个概念以及它们的工作流程。 首先,shuffle(洗牌)过程是...
在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式存储和计算的能力。本示例中的"had
45_hadoop2.x_温度排序,分区,分组,自定义封装类02 46_hadoop2.x_温度排序,分区,分组,自定义封装类03 47_hadoop2.x_温度排序,分区,分组,自定义封装类04 48_hadoop2.x_温度排序,分区,分组,自定义封装类05 ...
04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi 06-shuffle机制.avi 07-mr程序的组件全貌.avi 08-textinputformat对切片规划的源码分析.avi 09-倒排索引的mr实现.avi 10-多个job在同一个...
3. **MapReduce原理**:MapReduce的工作流程包括Map阶段和Reduce阶段,中间通过Shuffle和Sort过程进行数据排序和分区。Map函数将输入数据拆分成键值对,Reduce函数则聚合这些键值对,处理结果。书中会详述如何编写...
- Shuffle与Sort:在Map任务完成后,系统自动进行数据排序,准备进入Reduce阶段。 - Reduce阶段:对中间结果进行聚合处理,生成最终结果。 3. YARN:资源调度器 - YARN(Yet Another Resource Negotiator)是...
Hadoop大作业排序代码 由于 MapReduce 中对 key 进行比较和排序,而 key 可以是任何实 现了 Writable 接口的类。 在 java 中,要实现类的大小比较可以实现 Comparable 接口并通 过重写 compareTo 方法来实现。 在 ...
在Hadoop平台上,排序操作是MapReduce框架的核心功能,它保证了数据的有序性,使得数据分析和处理更为高效。排序分为三个主要阶段:MapTask排序、ReduceTask排序以及全局排序。 1. MapTask排序: MapTask阶段的...
Hadoop是一款开源的大数据处理框架,由Apache基金会开发,它主要设计用于分布式存储和处理海量数据。这个"hadop jar包.rar"文件很显然是包含了运行Hadoop相关程序所需的jar包集合,用户解压后可以直接使用,省去了...
3. MapReduce编程:编写MapReduce程序处理数据,理解Mapper和Reducer的工作原理,以及中间键值对的分区和排序过程。 4. 性能优化:实验可能包括如何调整Hadoop参数以优化性能,如修改Map和Reduce的任务数量,设置...
,Hadoop 技术已经在互联网领域得到了广泛的应用。互联网公司往往需要 存储海量的数据并对其进行处理,而这正是Hadoop 的强项。如Facebook 使用Hadoop ...阿里巴巴则将Hadoop 用于商业数据的排序和搜索引擎的优化等。
Hadoop数据迁移是指将存储在传统数据库系统(如Oracle)中的数据转移到Hadoop文件系统(HDFS)的过程。在这个过程中,MapReduce作为一种编程模型,用于处理和生成大数据集,被用来连接Hadoop与Oracle数据库,使得...