`
zxxapple
  • 浏览: 79503 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

MapReduce框架中全排序的算法思想--学习笔记

阅读更多

 

 关于全排序的问题  Tom White的书中提出的数据取样方法 ,最近学习了一下,下面做个比较,以防后患!!

 

 

主要思想就是在要排序的所有数据中随机取出一定量的数据,这些数据取自三个部分,

1. 选取总得数据 ( 键值对 ) 数目

2. 选取的 split 数目

3. 每个 split 选取的键值对数目(只要达到总得键值对数目马上停止采集)

 

接下来对整个选取得键值对进行全局排序,然后根据工作配置的 reducer task 数目 R 来选取关键 key ,将采集后而且排序的 key 分成 R 个部分,即出现 R-1 个分割点,然后取出 R-1 个分割点处的 key ,将其写到一个二进制文件 ( 默认是 _partition.lst) ,然后将这个文件设置为 DistributedCache ,及所有 map reducer 共享的文件。接下来 PartitionerClass 来读取这个共享的二进制文件,读取其中的 R-1 个可以的值,根据着 R-1 key 生成一个 Binary 形式的 Tire 树,可以加快查找(以空间换取时间),将所有的 map 输出根据这个 R-1 key 将不同范围内的 key 输出到不同 reducer ,然后每个 reducer 进行一下局部排序即可,这样可以保证第 i reducer 输出的键值对所有的可以都比第 i+1 reducer 的键值对的 key 小。从而达到所有的 key 全局有序,

 

note :这其中的 mapClass ReducerClass 都设置成默认的即可,即直接输出键值对, mapreduce 框架自带有 sort 功能,即可满足条件。(为简单测试目的)

 

在mapreduce自带的框架内的TotalOrderPartitioner类就是通过读取这个_partition.lst, 并生成对应的trie树,加快前缀匹配速度,以空间换取时间的思想,从而分配可以到对应的reduce内部,然后使用框架自带的排序功能进行局部的排序,从而达到整体的有序。我们也没有必要将这些文件进行合并,

 

Trie 树,又称单词查找树或键树,是一种树形结构,是一种哈希树的变种。典型应用是用于统计和排序大量的字符串(但不仅限于字符串),所以经常被搜索引擎系统用于文本词频统计。它的优点是:最大限度地减少无谓的字符串比较,查询效率比哈希表高。

它有 3 个基本特性:

   1 )根节点不包含字符,除根节点外每一个节点都只包含一个字符。

   2 )从根节点到某一节点,路径上经过的字符连接起来,为该节点对应的字符串。

3 )每个节点的所有子节点包含的字符都不相同。

 

数据结构:

const int MaxKeySize = 25; // 关键码最大位数

   typedef struct { // 关键码类型

   KeyType * ch[MaxKeySize]; // 关键码存放 数组

   int currentSize; // 关键码当前位数

} KeyType;

 

下面贴下我自己实现的采样全局排序(为了简单  没有实现trie树  直接进行比较  速度会有一定的影响)

自定义的InputFormat,从而在分片的时候进行采集:

 

package com.zxx.smpler;

public class SamplerInputFormat extends FileInputFormat<Text, Text>
{
    
	static final String PARTITION_FILENAME = "_partition.lst";
	static final String SAMPLE_SIZE = "terasort.partitions.sample";
	private static JobConf lastConf = null;
	private static InputSplit[] lastResult = null;
	
	static class TextSampler implements IndexedSortable
	{

		public ArrayList<Text> records=new ArrayList<Text>();
		
		@Override
		public int compare(int arg0, int arg1)
		{
			Text right=records.get(arg0);
			Text left=records.get(arg1);
			
			return right.compareTo(left);
		}

		@Override
		public void swap(int arg0, int arg1)
		{
			Text right=records.get(arg0);
			Text left=records.get(arg1);
			
			records.set(arg0, left);
			records.set(arg1, right);
		}
		
		public void addKey(Text key) 
		{
		    records.add(new Text(key));
		}
		
		public Text[] createPartitions(int numPartitions)
		{
			int numRecords=records.size();
			if(numPartitions>numRecords)
			{
				throw new IllegalArgumentException
		          ("Requested more partitions than input keys (" + numPartitions +
		           " > " + numRecords + ")");
			}
			new QuickSort().sort(this, 0, records.size());
			float stepSize=numRecords/(float)numPartitions;
			Text[] result=new Text[numPartitions-1];
			for(int i=1;i<numPartitions;++i)
			{
				result[i-1]=records.get(Math.round(stepSize*i));
			}
			return result;
		}
	}
	
	public static void writePartitionFile(JobConf conf, Path partFile) throws IOException
	{
        SamplerInputFormat inputFormat=new SamplerInputFormat();
        TextSampler sampler=new TextSampler();
        Text key =new Text();
        Text value=new Text();
        
        int partitions = conf.getNumReduceTasks();    //Reducer任务的个数
        long sampleSize = conf.getLong(SAMPLE_SIZE, 100); //采集数据-键值对的个数
        InputSplit[] splits=inputFormat.getSplits(conf, conf.getNumMapTasks());//获得数据分片
        int samples=Math.min(10, splits.length);//采集分片的个数
        long recordsPerSample = sampleSize / samples;//每个分片采集的键值对个数
        int sampleStep = splits.length / samples; //采集分片的步长
        long records = 0;
        
        for(int i=0;i<samples;i++)
        {
        	RecordReader<Text, Text> reader=inputFormat.getRecordReader(splits[sampleStep*i], conf, null);
        	while(reader.next(key, value))
        	{
        		sampler.addKey(key);
        		records+=1;
        		if((i+1)*recordsPerSample<=records)
        		{
        			break;
        		}
        	}
        }
        FileSystem outFs = partFile.getFileSystem(conf);
        if (outFs.exists(partFile)) {
            outFs.delete(partFile, false);
          }
        SequenceFile.Writer writer=SequenceFile.createWriter(outFs, conf, partFile, Text.class, NullWritable.class);
        NullWritable nullValue = NullWritable.get();
        for(Text split:sampler.createPartitions(partitions))
        {
        	writer.append(split, nullValue);
        }
        writer.close();
        
	}

	static class TeraRecordReader implements RecordReader<Text,Text>
	{

		private LineRecordReader in;
		private LongWritable junk = new LongWritable();
		private Text line = new Text();
		private static int KEY_LENGTH = 10;
		
		public TeraRecordReader(Configuration job, FileSplit split) throws IOException
		{
			in = new LineRecordReader(job, split);
		}

		@Override
		public void close() throws IOException
		{
		    in.close();	
		}

		@Override
		public Text createKey()
		{
			// TODO Auto-generated method stub
			return new Text();
		}

		@Override
		public Text createValue()
		{
            return new Text();
		}

		@Override
		public long getPos() throws IOException
		{
			// TODO Auto-generated method stub
			return in.getPos();
		}

		@Override
		public float getProgress() throws IOException
		{
			// TODO Auto-generated method stub
			return in.getProgress();
		}

		@Override
		public boolean next(Text arg0, Text arg1) throws IOException
		{
			if(in.next(junk, line))
			{
				if(line.getLength()<KEY_LENGTH)
				{
					arg0.set(line);
					arg1.clear();
				}else{
					byte[] bytes=line.getBytes();  //默认知道读取要比较值的前10个字节  作为key 后面的字节作为value;
					arg0.set(bytes, 0,KEY_LENGTH);
					arg1.set(bytes,KEY_LENGTH, line.getLength()-KEY_LENGTH);
				}
				return true;
			}else {
				return false;	
			}
		}
		
	}
	@Override
	public InputSplit[] getSplits(JobConf conf, int splits) throws IOException
	{
		if(conf==lastConf)
		{
			return lastResult;
		}
		lastConf=conf;
		lastResult=super.getSplits(lastConf, splits);
		return lastResult;

	}
	@Override
	public org.apache.hadoop.mapred.RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException
	{
		// TODO Auto-generated method stub
		return new TeraRecordReader(arg1,(FileSplit)arg0);
	}
   
}

 主函数:

package com.zxx.smpler;


public class SamplerSort extends Configured implements Tool
{
       //自定义的Partitioner
	public static class TotalOrderPartitioner implements Partitioner<Text,Text>
	{

		private Text[] splitPoints;
		
		public TotalOrderPartitioner(){}
		@Override
		public int getPartition(Text arg0, Text arg1, int arg2)
		{
			// TODO Auto-generated method stub
			return findPartition(arg0);
		}

		@Override
		public void configure(JobConf arg0)
		{
			try
			{
				FileSystem fs = FileSystem.getLocal(arg0);
				Path partFile = new Path(SamplerInputFormat.PARTITION_FILENAME);
				splitPoints = readPartitions(fs, partFile, arg0);  //读取采集文件
			} catch (IOException ie)
			{
				throw new IllegalArgumentException("can't read paritions file", ie);
			}
			
		}
		
		public int findPartition(Text key)          //分配可以到多个reduce
		{
			int len=splitPoints.length;
			for(int i=0;i<len;i++)
			{
				int res =key.compareTo(splitPoints[i]);
				if(res>0&&i<len-1)
				{
					continue;
				}else if (res==0) {
					return i;
				}else if(res<0){
					return i;
				}else if (res>0&&i==len-1) {
					return i+1;
				}
			}
			return 0;
		}
		
		private static Text[] readPartitions(FileSystem fs, Path p, JobConf job) throws IOException  
		{
			SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
			List<Text> parts = new ArrayList<Text>();
			Text key=new Text();
			NullWritable value = NullWritable.get();
			while(reader.next(key,value))
			{
				parts.add(key);
			}
			reader.close();
			return parts.toArray(new Text[parts.size()]);
		}
		
	}
	@Override
	public int run(String[] args) throws Exception
	{
		JobConf job=(JobConf)getConf();
		Path inputDir = new Path(args[0]);
		inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
		Path partitionFile = new Path(inputDir, SamplerInputFormat.PARTITION_FILENAME);
		
		URI partitionUri = new URI(partitionFile.toString() +
                "#" + SamplerInputFormat.PARTITION_FILENAME);
		
		SamplerInputFormat.setInputPaths(job, new Path(args[0]));
	    FileOutputFormat.setOutputPath(job, new Path(args[1]));
	    
	    job.setJobName("SamplerTotalSort");
	    job.setJarByClass(SamplerSort.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);
	    job.setInputFormat(SamplerInputFormat.class);
	    job.setOutputFormat(TextOutputFormat.class);
	    job.setPartitionerClass(TotalOrderPartitioner.class);
	    job.setNumReduceTasks(4);
	    
	    SamplerInputFormat.writePartitionFile(job, partitionFile);  //数据采集并写入文件
	    
	    DistributedCache.addCacheFile(partitionUri, job);  //将这个文件作为共享文件  提供给partition使用
	    DistributedCache.createSymlink(job);
	    
	    //SamplerInputFormat.setFinalSync(job, true);
	    JobClient.runJob(job);
		return 0;
	}
	public static void main(String[] args) throws Exception
	{
		int res = ToolRunner.run(new JobConf(), new SamplerSort(), args);
	    System.exit(res);
	}


}
 

 

分享到:
评论
3 楼 developerinit 2013-08-07  
ok,已经明白了,谢谢。
2 楼 coobery 2013-08-04  
seqfile是Sampler通过writePartitionFile方法生成的,而且格式非常简单K是采样点的值,V是NullWritable,而且经过了sort。看源代码就很清楚了。
developerinit 写道
非常好的东西,请问下,全排序在不自定义partitioner的情况下,需要先自己用mapreduce程序生成sequenceFile序列文件,再用自带的TotalOrderPartitioner排序吗?
1 楼 developerinit 2013-03-03  
非常好的东西,请问下,全排序在不自定义partitioner的情况下,需要先自己用mapreduce程序生成sequenceFile序列文件,再用自带的TotalOrderPartitioner排序吗?

相关推荐

Global site tag (gtag.js) - Google Analytics