package org.apache.hadoop.examples;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
hadoop 的map/reduce例子,排序,由于map传给reduce的中间结果是排序的,所以这个例子不用写mapper和reducer。都用默认的map/reduce的实现,
IdentityMapper和IdentityReducer。例子中可以用排序采样器TotalOrderPartitioner,参数设置可以是 -totalOrder 0.1 10000 10
* This is the trivial map/reduce program that does absolutely nothing
* other than use the framework to fragment and sort the input values.
*
* To run: bin/hadoop jar build/hadoop-examples.jar sort
* [-m <i>maps</i>] [-r <i>reduces</i>]
* [-inFormat <i>input format class</i>]
* [-outFormat <i>output format class</i>]
* [-outKey <i>output key class</i>]
* [-outValue <i>output value class</i>]
* [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>]
* <i>in-dir</i> <i>out-dir</i>
*/
public class Sort<K,V> extends Configured implements Tool {
private RunningJob jobResult = null;
static int printUsage() {
System.out.println("sort [-m <maps>] [-r <reduces>] " +
"[-inFormat <input format class>] " +
"[-outFormat <output format class>] " +
"[-outKey <output key class>] " +
"[-outValue <output value class>] " +
"[-totalOrder <pcnt> <num samples> <max splits>] " +
"<input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
/**driver代码
* The main driver for sort program.
* Invoke this method to submit the map/reduce job.
* @throws IOException When there is communication problems with the
* job tracker.
*/
public int run(String[] args) throws Exception {
JobConf jobConf = new JobConf(getConf(), Sort.class);
jobConf.setJobName("sorter");
jobConf.setMapperClass(IdentityMapper.class); //设置mapper
jobConf.setReducerClass(IdentityReducer.class);//设置reducer
JobClient client = new JobClient(jobConf);
ClusterStatus cluster = client.getClusterStatus();//获得集群的状态
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
String sort_reduces = jobConf.get("test.sort.reduces_per_host");
if (sort_reduces != null) {
num_reduces = cluster.getTaskTrackers() *
Integer.parseInt(sort_reduces);
}
Class<? extends InputFormat> inputFormatClass =
SequenceFileInputFormat.class;
Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
Class<? extends Writable> outputValueClass = BytesWritable.class;
List<String> otherArgs = new ArrayList<String>();
InputSampler.Sampler<K,V> sampler = null;
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
jobConf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
num_reduces = Integer.parseInt(args[++i]);
} else if ("-inFormat".equals(args[i])) {
inputFormatClass =
Class.forName(args[++i]).asSubclass(InputFormat.class);
} else if ("-outFormat".equals(args[i])) {
outputFormatClass =
Class.forName(args[++i]).asSubclass(OutputFormat.class);
} else if ("-outKey".equals(args[i])) {
outputKeyClass =
Class.forName(args[++i]).asSubclass(WritableComparable.class);
} else if ("-outValue".equals(args[i])) {
outputValueClass =
Class.forName(args[++i]).asSubclass(Writable.class);
} else if ("-totalOrder".equals(args[i])) { //设置采样器3个参数
double pcnt = Double.parseDouble(args[++i]);
int numSamples = Integer.parseInt(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler =
new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);
} else {
otherArgs.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage(); // exits
}
}
// Set user-supplied (possibly default) job configs
jobConf.setNumReduceTasks(num_reduces);
jobConf.setInputFormat(inputFormatClass);
jobConf.setOutputFormat(outputFormatClass);
jobConf.setOutputKeyClass(outputKeyClass);
jobConf.setOutputValueClass(outputValueClass);
// Make sure there are exactly 2 parameters left.
if (otherArgs.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
otherArgs.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));
if (sampler != null) {
System.out.println("Sampling input to effect total-order sort...");
jobConf.setPartitionerClass(TotalOrderPartitioner.class);//设置采样器
Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
Path partitionFile = new Path(inputDir, "_sortPartitioning");
TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);//采样设置采样文件
InputSampler.<K,V>writePartitionFile(jobConf, sampler);
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, jobConf);
DistributedCache.createSymlink(jobConf);
}
System.out.println("Running on " +
cluster.getTaskTrackers() +
" nodes to sort from " +
FileInputFormat.getInputPaths(jobConf)[0] + " into " +
FileOutputFormat.getOutputPath(jobConf) +
" with " + num_reduces + " reduces.");
Date startTime = new Date();
System.out.println("Job started: " + startTime);
jobResult = JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Sort(), args);
System.exit(res);
}
/**
* Get the last job that was run using this instance.
* @return the results of the last job that was run
*/
public RunningJob getResult() {
return jobResult;
}
}
分享到:
相关推荐
他为SGI STL源代码添加的中文注释,无疑为国内开发者理解和学习STL提供了极大的便利。 首先,让我们来了解STL的核心组成部分: 1. 容器:STL中的容器是一些可以存储元素的对象,如vector(动态数组)、list(双向...
基于YOLOv5+Deepsort的深度学习的驾驶员分心驾驶行为的预警系统python源代码+文档说明,含有代码注释,新手也可看懂,个人手打98分项目,导师非常认可的高分项目,毕业设计、期末大作业和课程设计高分必看,下载下来...
源代码解释了动态内存管理、迭代器实现以及容器操作的性能优化。 4. ****:C++的字符串类,相比C语言的字符数组,提供了更高级的功能。源代码可以学习到字符串操作的封装和优化。 5. ****和****:异常处理库,定义...
### VC中qsort源代码解析 #### 知识点概览 在VC(Visual C++)环境中,`qsort`函数是C/C++标准库中用于数组排序的关键函数之一,其内部实现采用了快速排序算法。本文将深入分析VC中`qsort`源代码的核心逻辑、算法...
### 一、选择排序(Selection Sort) #### 算法思想 选择排序的基本思想是:从未排序序列中找到最小(或最大)元素放到已排序序列的末尾。 1. **第一轮**:从未排序部分中选出最小的元素,与未排序部分的第一个...
│ │ TopologicalSort.cpp │ │ TopologicalSort.h │ │ VertexType.cpp │ │ VertexType.h │ │ │ ├─插入排序 │ │ 1.txt │ │ main.cpp │ │ RedType.cpp │ │ RedType.h │ │ Sq_InsertSort.cpp │ ...
3. Map任务:在`org.apache.hadoop.mapred.MapTask`类中,源代码解释了如何将输入数据分片、执行映射函数并生成中间键值对。 4. Reduce任务:`org.apache.hadoop.mapred.ReduceTask`类负责从各个Map任务获取中间...
case 7:Sort(l);break; /*排序学生记录*/ case 8:Save(l);break; /*保存学生记录*/ case 9:system("cls");Disp(l);break; /*显示学生记录*/ default: Wrong();getchar();break; /*按键有误,必须为数值0-9*/ }...
对于这个压缩包,README可能会解释如何编译和运行源代码示例,这对于初学者了解如何开始至关重要。 3. **cont(容器)**: 这部分可能包含C++标准模板库(STL)中的容器类的实现,如`vector`, `list`, `set`, `map`...
3. 源代码分析:压缩包中的"a.txt"文件可能包含了详细的源代码注释,对理解每个步骤的实现原理非常有帮助。开发者可以从中学习如何初始化背景模型、如何进行帧差运算、如何应用滤波器进行目标追踪等。源代码通常包括...
文件内容包含了C语言的源代码片段,其中涉及到内存分配的两种策略:首次适应算法(First Fit)和最佳适应算法(Best Fit)。 以下是根据文件内容提炼出的知识点: 1. 内存分配基础: - 文件中的代码实现是基于...
标题“数据结构C语言版实验及源代码.pdf”暗示了文档内容是关于数据结构的实验练习,而这些练习是使用C语言完成的。在描述中进一步明确了这一点,说明文件包含了实验内容和源代码。由于文档内容是关于数据结构的实验...
在本文中,我们将对计算机操作系统内存分配实验源代码进行分析,并对其进行详细的解释。 首先,让我们看一下源代码的结构。源代码主要由五个部分组成:头文件的包含、宏定义、结构体定义、函数定义和主函数。头文件...
《C++ Primer 第三版》源代码是一份珍贵的学习资源,它涵盖了C++编程语言的基础到高级特性。这本书是C++初学者和进阶者的重要参考书籍,通过阅读和实践书中的源代码,读者可以深入理解C++的核心概念、语法以及编程...
Hadoop的持续发展离不开全球开发者的贡献,源代码中的注释和版本历史记录揭示了社区如何协作改进项目,以及新功能和修复是如何被引入的。 10. **学习与实践** 学习Hadoop源代码是提高开发者技能的有效途径,通过...
Java学生成绩管理系统是一款基于Java语言开发的应用程序,主要用于存储、查询、修改和删除学生分数信息。...此外,源代码的注释中提及了待办事项(TODO),表明该系统可能还在开发阶段,有待完善和优化。
他的注解不仅解释了代码的功能,还深入到设计模式和C++的底层机制,帮助读者理解STL如何实现其高效性能。通过阅读这份注释版的SGI STL源码,开发者能够掌握泛型编程的思想,提升编写高效、可维护的C++代码的能力。 ...
"php100源代码01-15"这一标题暗示了这是一份与PHP编程相关的资源,可能是一个系列教程的源代码集合,涵盖了从基础到进阶的15个部分。"php100源代码"标签进一步确认了这个主题,意味着这些代码可能来自知名的学习平台...
"算法源代码"这个主题涵盖了几个核心的计算机科学概念,包括快速排序(Quick Sort)、红黑树(Red-Black Tree)、区间树(Interval Tree)以及最近点对(Closest Pair Problem)的实现。下面将对这些知识点进行详细...