关于全排序的问题 Tom White的书中提出的数据取样方法 ,最近学习了一下,下面做个比较,以防后患!!
主要思想就是在要排序的所有数据中随机取出一定量的数据,这些数据取自三个部分,
1.
选取总得数据
(
键值对
)
数目
2.
选取的
split
数目
3.
每个
split
选取的键值对数目(只要达到总得键值对数目马上停止采集)
接下来对整个选取得键值对进行全局排序,然后根据工作配置的
reducer task
数目
R
来选取关键
key
,将采集后而且排序的
key
分成
R
个部分,即出现
R-1
个分割点,然后取出
R-1
个分割点处的
key
,将其写到一个二进制文件
(
默认是
_partition.lst)
,然后将这个文件设置为
DistributedCache
,及所有
map reducer
共享的文件。接下来
PartitionerClass
来读取这个共享的二进制文件,读取其中的
R-1
个可以的值,根据着
R-1
个
key
生成一个
Binary
形式的
Tire
树,可以加快查找(以空间换取时间),将所有的
map
输出根据这个
R-1
个
key
将不同范围内的
key
输出到不同
reducer
,然后每个
reducer
进行一下局部排序即可,这样可以保证第
i
个
reducer
输出的键值对所有的可以都比第
i+1
个
reducer
的键值对的
key
小。从而达到所有的
key
全局有序,
note
:这其中的
mapClass ReducerClass
都设置成默认的即可,即直接输出键值对,
mapreduce
框架自带有
sort
功能,即可满足条件。(为简单测试目的)
在mapreduce自带的框架内的TotalOrderPartitioner类就是通过读取这个_partition.lst,
并生成对应的trie树,加快前缀匹配速度,以空间换取时间的思想,从而分配可以到对应的reduce内部,然后使用框架自带的排序功能进行局部的排序,从而达到整体的有序。我们也没有必要将这些文件进行合并,
Trie
树,又称单词查找树或键树,是一种树形结构,是一种哈希树的变种。典型应用是用于统计和排序大量的字符串(但不仅限于字符串),所以经常被搜索引擎系统用于文本词频统计。它的优点是:最大限度地减少无谓的字符串比较,查询效率比哈希表高。
它有
3
个基本特性:
1
)根节点不包含字符,除根节点外每一个节点都只包含一个字符。
2
)从根节点到某一节点,路径上经过的字符连接起来,为该节点对应的字符串。
3
)每个节点的所有子节点包含的字符都不相同。
数据结构:
const int MaxKeySize = 25; //
关键码最大位数
typedef struct { //
关键码类型
KeyType * ch[MaxKeySize]; //
关键码存放
数组
int currentSize; //
关键码当前位数
} KeyType;
下面贴下我自己实现的采样全局排序(为了简单 没有实现trie树 直接进行比较 速度会有一定的影响)
自定义的InputFormat,从而在分片的时候进行采集:
package com.zxx.smpler;
public class SamplerInputFormat extends FileInputFormat<Text, Text>
{
static final String PARTITION_FILENAME = "_partition.lst";
static final String SAMPLE_SIZE = "terasort.partitions.sample";
private static JobConf lastConf = null;
private static InputSplit[] lastResult = null;
static class TextSampler implements IndexedSortable
{
public ArrayList<Text> records=new ArrayList<Text>();
@Override
public int compare(int arg0, int arg1)
{
Text right=records.get(arg0);
Text left=records.get(arg1);
return right.compareTo(left);
}
@Override
public void swap(int arg0, int arg1)
{
Text right=records.get(arg0);
Text left=records.get(arg1);
records.set(arg0, left);
records.set(arg1, right);
}
public void addKey(Text key)
{
records.add(new Text(key));
}
public Text[] createPartitions(int numPartitions)
{
int numRecords=records.size();
if(numPartitions>numRecords)
{
throw new IllegalArgumentException
("Requested more partitions than input keys (" + numPartitions +
" > " + numRecords + ")");
}
new QuickSort().sort(this, 0, records.size());
float stepSize=numRecords/(float)numPartitions;
Text[] result=new Text[numPartitions-1];
for(int i=1;i<numPartitions;++i)
{
result[i-1]=records.get(Math.round(stepSize*i));
}
return result;
}
}
public static void writePartitionFile(JobConf conf, Path partFile) throws IOException
{
SamplerInputFormat inputFormat=new SamplerInputFormat();
TextSampler sampler=new TextSampler();
Text key =new Text();
Text value=new Text();
int partitions = conf.getNumReduceTasks(); //Reducer任务的个数
long sampleSize = conf.getLong(SAMPLE_SIZE, 100); //采集数据-键值对的个数
InputSplit[] splits=inputFormat.getSplits(conf, conf.getNumMapTasks());//获得数据分片
int samples=Math.min(10, splits.length);//采集分片的个数
long recordsPerSample = sampleSize / samples;//每个分片采集的键值对个数
int sampleStep = splits.length / samples; //采集分片的步长
long records = 0;
for(int i=0;i<samples;i++)
{
RecordReader<Text, Text> reader=inputFormat.getRecordReader(splits[sampleStep*i], conf, null);
while(reader.next(key, value))
{
sampler.addKey(key);
records+=1;
if((i+1)*recordsPerSample<=records)
{
break;
}
}
}
FileSystem outFs = partFile.getFileSystem(conf);
if (outFs.exists(partFile)) {
outFs.delete(partFile, false);
}
SequenceFile.Writer writer=SequenceFile.createWriter(outFs, conf, partFile, Text.class, NullWritable.class);
NullWritable nullValue = NullWritable.get();
for(Text split:sampler.createPartitions(partitions))
{
writer.append(split, nullValue);
}
writer.close();
}
static class TeraRecordReader implements RecordReader<Text,Text>
{
private LineRecordReader in;
private LongWritable junk = new LongWritable();
private Text line = new Text();
private static int KEY_LENGTH = 10;
public TeraRecordReader(Configuration job, FileSplit split) throws IOException
{
in = new LineRecordReader(job, split);
}
@Override
public void close() throws IOException
{
in.close();
}
@Override
public Text createKey()
{
// TODO Auto-generated method stub
return new Text();
}
@Override
public Text createValue()
{
return new Text();
}
@Override
public long getPos() throws IOException
{
// TODO Auto-generated method stub
return in.getPos();
}
@Override
public float getProgress() throws IOException
{
// TODO Auto-generated method stub
return in.getProgress();
}
@Override
public boolean next(Text arg0, Text arg1) throws IOException
{
if(in.next(junk, line))
{
if(line.getLength()<KEY_LENGTH)
{
arg0.set(line);
arg1.clear();
}else{
byte[] bytes=line.getBytes(); //默认知道读取要比较值的前10个字节 作为key 后面的字节作为value;
arg0.set(bytes, 0,KEY_LENGTH);
arg1.set(bytes,KEY_LENGTH, line.getLength()-KEY_LENGTH);
}
return true;
}else {
return false;
}
}
}
@Override
public InputSplit[] getSplits(JobConf conf, int splits) throws IOException
{
if(conf==lastConf)
{
return lastResult;
}
lastConf=conf;
lastResult=super.getSplits(lastConf, splits);
return lastResult;
}
@Override
public org.apache.hadoop.mapred.RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException
{
// TODO Auto-generated method stub
return new TeraRecordReader(arg1,(FileSplit)arg0);
}
}
主函数:
package com.zxx.smpler;
public class SamplerSort extends Configured implements Tool
{
//自定义的Partitioner
public static class TotalOrderPartitioner implements Partitioner<Text,Text>
{
private Text[] splitPoints;
public TotalOrderPartitioner(){}
@Override
public int getPartition(Text arg0, Text arg1, int arg2)
{
// TODO Auto-generated method stub
return findPartition(arg0);
}
@Override
public void configure(JobConf arg0)
{
try
{
FileSystem fs = FileSystem.getLocal(arg0);
Path partFile = new Path(SamplerInputFormat.PARTITION_FILENAME);
splitPoints = readPartitions(fs, partFile, arg0); //读取采集文件
} catch (IOException ie)
{
throw new IllegalArgumentException("can't read paritions file", ie);
}
}
public int findPartition(Text key) //分配可以到多个reduce
{
int len=splitPoints.length;
for(int i=0;i<len;i++)
{
int res =key.compareTo(splitPoints[i]);
if(res>0&&i<len-1)
{
continue;
}else if (res==0) {
return i;
}else if(res<0){
return i;
}else if (res>0&&i==len-1) {
return i+1;
}
}
return 0;
}
private static Text[] readPartitions(FileSystem fs, Path p, JobConf job) throws IOException
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
List<Text> parts = new ArrayList<Text>();
Text key=new Text();
NullWritable value = NullWritable.get();
while(reader.next(key,value))
{
parts.add(key);
}
reader.close();
return parts.toArray(new Text[parts.size()]);
}
}
@Override
public int run(String[] args) throws Exception
{
JobConf job=(JobConf)getConf();
Path inputDir = new Path(args[0]);
inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
Path partitionFile = new Path(inputDir, SamplerInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + SamplerInputFormat.PARTITION_FILENAME);
SamplerInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("SamplerTotalSort");
job.setJarByClass(SamplerSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(SamplerInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setNumReduceTasks(4);
SamplerInputFormat.writePartitionFile(job, partitionFile); //数据采集并写入文件
DistributedCache.addCacheFile(partitionUri, job); //将这个文件作为共享文件 提供给partition使用
DistributedCache.createSymlink(job);
//SamplerInputFormat.setFinalSync(job, true);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new JobConf(), new SamplerSort(), args);
System.exit(res);
}
}
分享到:
相关推荐
Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-app-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
在这个场景中,我们讨论的是如何在Hadoop 2.0框架下利用MapReduce实现朴素贝叶斯(Naive Bayes)算法。朴素贝叶斯算法是一种基于概率的分类方法,广泛应用于文本分类、垃圾邮件过滤等领域。它假设特征之间相互独立,...
基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策树算法的知识点: 1. 基于C45决策树算法...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.6.5.jar 赠送原API文档:hadoop-mapreduce-client-core-2.6.5-javadoc.jar 赠送源代码:hadoop-mapreduce-client-core-2.6.5-sources.jar 包含翻译后的API文档:...
赠送jar包:hadoop-mapreduce-client-core-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...
赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...
这两种算法在分类和聚类分析中具有重要应用,而通过Hadoop的MapReduce框架,可以有效地分布式处理大量数据。 首先,我们来看KNN算法。KNN是一种基于实例的学习方法,它的核心思想是:一个未知类别的数据点可以通过...
赠送jar包:hadoop-mapreduce-client-jobclient-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.5.1-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...
赠送jar包:hadoop-mapreduce-client-jobclient-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.7.3-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-jobclient-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.5.1-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-shuffle-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-shuffle-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-shuffle-2.5.1-sources.jar; 赠送Maven依赖...