`

MapReduce2中自定义排序分组

 
阅读更多

 

1 Map 、Reduce和主类 

  

package com.wzt.mapreduce.secondsort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.wzt.mapreduce.wordcount.WCRunner;

public class SecSortMain {

	public static class SecSortMapper extends Mapper<LongWritable, Text, FirstSortEntity, IntWritable> {
		
		protected void map(LongWritable key, Text value, Context context)
				throws  IOException, InterruptedException {
			 
			String line = value.toString();
			String[] spilted = line.split(" ");
			
			// 为了显示效果而输出Mapper的输出键值对信息
			System.out.println("Mapper输出<" + spilted[0] + "," + spilted[1] + ">"+this);
			context.write(new FirstSortEntity(spilted[0], Integer.parseInt(spilted[1]))  , new IntWritable(Integer.parseInt(spilted[1])) );
		};
		
	}

	public static class SecSortReducer extends Reducer<FirstSortEntity, IntWritable , FirstSortEntity, IntWritable> {
		
		@Override
		protected void reduce(
				FirstSortEntity key,
				Iterable<IntWritable> values,
				Context context)
				throws IOException, InterruptedException {
			
			// 显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
			System.out.println("Reducer输入分组<" + key+ ",N(N>=1)>"+this);
			StringBuffer sb = new StringBuffer() ; 
			for (IntWritable value : values) {
				//count += value.get();
				// 显示次数表示输入的k2,v2的键值对数量
				sb.append( value+" , " ) ;
				System.out.println("Reducer输入键值对<" + key.toString() + "," + value.get() + ">  组"+sb.toString() );
			}
//			if(sb.length()>0){
//				sb.deleteCharAt( -1 ) ;
//			}

			context.write(key, key.getSecondkey());
			//context.write(key.getFirstkey(),  new Text(sb.toString() ));
			
		}
		
	}

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration() ; 
		Job job = Job.getInstance(conf) ;
		
		job.setJarByClass(WCRunner.class );
		
		job.setMapperClass( SecSortMapper.class );
		job.setMapOutputKeyClass( FirstSortEntity.class);
		job.setMapOutputValueClass( IntWritable.class );
		 
		//设置分区方法
		job.setPartitionerClass( SSPartintioner.class);//不同
		//会有几个reduce去执行最后的汇总数据, 有几个分区就要有几个reduce ,最后就会生成几个reduce ,这里设置为1 ,没看到调用但是确实分区了,没弄明白
		job.setNumReduceTasks(1);//当任务数为1的时候设置Partitioner是没有用的
		
		//数据做总的排序
		job.setSortComparatorClass(MySSSortComparator.class) ; //排序
		//总数据  记性分组 
		job.setGroupingComparatorClass( GroupComparator.class );//分组
		
		job.setReducerClass( SecSortReducer.class );
		job.setOutputKeyClass( FirstSortEntity.class );
		job.setOutputValueClass(IntWritable.class );
		
		
//		FileInputFormat.setInputPaths(job,  "/wc/input/xiyou.txt");
//		FileOutputFormat.setOutputPath(job,  new Path("/wc/output6"));
		FileInputFormat.setInputPaths(job,  "/sort/input");
		FileOutputFormat.setOutputPath(job,  new Path("/sort/output1"));
		
 		job.waitForCompletion(true) ; 
	}
}

 

2 自定义 组合key 

 

package com.wzt.mapreduce.secondsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
/**
 * 自定义组合件 
 * @author root
 *
 */
public class FirstSortEntity implements WritableComparable<FirstSortEntity>{

	private Text firstkey ; 
	private IntWritable secondkey ;
	
	public FirstSortEntity( ) {
	}
	
	public FirstSortEntity(Text firstkey, IntWritable secondkey) {
		this.firstkey = firstkey;
		this.secondkey = secondkey;
	}
	public FirstSortEntity(String firstkey, int secondkey) {
		this.firstkey = new Text(firstkey);
		this.secondkey = new IntWritable(secondkey);
	}
	
	public Text getFirstkey() {
		return firstkey;
	}
	public void setFirstkey(Text firstkey) {
		this.firstkey = firstkey;
	}
	public IntWritable getSecondkey() {
		return secondkey;
	}
	public void setSecondkey(IntWritable secondkey) {
		this.secondkey = secondkey;
	}
	/**
	 * 对象序列化
	 */
	@Override
	public void write(DataOutput out) throws IOException {
		 out.writeUTF(firstkey.toString() );
		 out.writeInt(  secondkey.get() );
	}

	//对象反序列化
	@Override
	public void readFields(DataInput in) throws IOException {
		 
		firstkey = new Text(in.readUTF() );
		secondkey = new IntWritable(in.readInt()); 
	}

	
	/**
	 * 排序在map执行后数据传出后 会调用这个方法对key进行排序 
	 * 数据map后,如果设置了分区并且reduce>1 的话,会执行分区类方法,进行分区
	 */
	@Override
	public int compareTo(FirstSortEntity entity) {
		//利用这个来控制升序或降序
		//this本对象写在前面代表是升序
		//this本对象写在后面代表是降序
		return this.firstkey.compareTo( entity.getFirstkey());
		//return this.secondkey.get()>entity.getSecondkey().get()?1:-1;	
	}
	@Override
	public String toString() {
		return this.getFirstkey() +" "+this.getSecondkey()+ "   "  ;
	} 

}

 3 自定义分区 

 

package com.wzt.mapreduce.secondsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

//自定义 分区
public class SSPartintioner extends Partitioner<FirstSortEntity, IntWritable>{
 
	/**
	 * key map输出的key
	 * value map 输出的value 
	 *  map后的数据 经过排序后传进这个分区方法,如果返回的值相同的数据,值相同的数据会分配到一组中 ,即 放到一堆 
	 *  到此 数据为N堆,并且数据是经过排序的 
	 */
	@Override
	public int getPartition(FirstSortEntity key, IntWritable value,
			int numPartitions) {
			System.out.println("Partitioner  key:"+key.getFirstkey()+"  value:"+value+"  "+ ( ( key.getFirstkey().hashCode()&Integer.MAX_VALUE)%numPartitions ) +"   "+this);
			//System.out.println("Partitioner  key:"+key.getFirstkey()+"  value:"+value+"  "+ ((key.getSecondkey().get()&Integer.MAX_VALUE)%numPartitions) +"   "+this);
			
	       return (key.getFirstkey().hashCode()&Integer.MAX_VALUE)%numPartitions;
			//return (key.getSecondkey().get()&Integer.MAX_VALUE)%numPartitions;
	}
	 
	
}

   个人理解以上都是在Map阶段进行,即本地操作,以下为Map到Reduce这段进行的

 

4  自定义整体排序 

  

package com.wzt.mapreduce.secondsort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


//组内自定义排序策略
/**
 * @author root
 *
 */
public class MySSSortComparator extends WritableComparator{

	public MySSSortComparator() {//注册处理的试题类型 
		super(FirstSortEntity.class,true);
	}
	
	/**
	 *  reduce 处理数据之前 
	 *  对全量数据排序 
	 *  逻辑:分组一样则按照第二个参数排序  ,分组不一样,则按照第一个参数排序  
	 */
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		
		FirstSortEntity e1 = (FirstSortEntity)a;
		FirstSortEntity e2 = (FirstSortEntity)b;
		System.out.println( e1.getFirstkey()+"==MySSSortComparator 排序 。。 "+e2.getFirstkey());
		//首先要保证是同一个组内,同一个组的标识就是第一个字段相同
		if(!e1.getFirstkey().equals( e2.getFirstkey())){
			return e1.getFirstkey().compareTo(e2.getFirstkey());
		}else{
			return e1.getSecondkey().get() - e2.getSecondkey().get() ; 
		}
	}
}

 

5 自定义分组  

 

   

package com.wzt.mapreduce.secondsort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


//对象分组策略 
//数据放到 reduce前 ,对数据进行分组 
public class GroupComparator extends WritableComparator{

	public GroupComparator() { //注册处理的试题类型 
		super(FirstSortEntity.class,true ) ; 
	}
	
	
	/**
	 * 对排序后的数据 分组, 
	 * 第一个参数相同的,放到一个key的 迭代器 集合中  
	 */
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		FirstSortEntity e1 = (FirstSortEntity)a;
		FirstSortEntity e2 = (FirstSortEntity)b;
		System.out.println( e1.getFirstkey()+"==GroupComparator = 分组=="+e2.getFirstkey());
		return  e1.getFirstkey().toString().compareTo( e2.getFirstkey().toString());
		//return  e1.getSecondkey().compareTo( e2.getSecondkey());
	}
}

 在以后就是主类中的reduce进行数据处理

  下面这个类作为自己的记录,这里没用:

   

package com.wzt.mapreduce.secondsort;

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;

//自定义分组比较器
//这个类 暂时没用, 分组比较器的 实现,但没有测试 
public class SSGroupComparator implements RawComparator<FirstSortEntity>{

	@Override
	public int compare(FirstSortEntity o1, FirstSortEntity o2) {
	 
		return o1.getSecondkey().get()>o2.getSecondkey().get()?1:-1;
	}
 
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    	 
    	//对象可以这样反序列化 
    	//IntWritable d ; 
    	System.out.println( "SSGroupComparator   自定义分组 =" );
    	ByteArrayInputStream bis = new ByteArrayInputStream(b1);
    	DataInput in1 = new DataInputStream(bis); 
    	FirstSortEntity entity1 = new FirstSortEntity();
    	
    	ByteArrayInputStream bis2 = new ByteArrayInputStream(b2);
    	DataInput in2 = new DataInputStream(bis2); 
    	FirstSortEntity entity2 = new FirstSortEntity();
    	try {
			entity1.readFields(in1);
			entity2.readFields(in2);
		} catch (IOException e) {
			e.printStackTrace();
		}
     
        return entity1.getFirstkey().compareTo( entity2.getFirstkey());
    }
 

}

 

 

 

 

 

分享到:
评论

相关推荐

    16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN

    如果需要自定义排序规则,可以实现Partitioner和Comparator接口。Comparator用于自定义键的比较规则,而Partitioner则控制哪些键将在同一台Reducer上处理。 三、分区 默认情况下,MapReduce将所有键均匀地分配到...

    MapReduce模型--自定义数据类型

    实现了WritableComparable接口的类,不仅可以将对象写入到Hadoop的数据流中,还能在MapReduce框架中比较这些对象,这对于排序、分组等操作是必不可少的。 接下来,我们以Person类为例,介绍如何自定义一个数据类型...

    MapReduce模型--二次排序

    MapReduce模型中的二次排序是大数据处理中一项重要的技术,它通过多层排序功能来优化数据处理性能。二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时...

    Hadoop中MapReduce基本案例及代码(五)

    对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。

    Hadoop MapReduce高级特性

    在MapReduce中,连接操作通常是通过在Map阶段对键进行分组,然后在Reduce阶段对具有相同键的记录进行合并实现的。连接操作可以分为多种类型,包括Reduce端连接、Map端连接和半连接等。Reduce端连接是最通用的连接...

    mapreduce secondarysort

    通过对MapReduce二次排序原理的理解以及具体实现细节的分析,我们可以看到二次排序在MapReduce框架中的重要性和其实现的复杂性。通过合理配置排序和分组比较器,可以有效地实现复杂的排序需求,从而提高数据处理的...

    MapReduce进阶

    在MapReduce中,中间键值对的处理并不是直接传递给reduce函数,而是先经过一个分组操作。这个操作确保所有拥有相同键的键值对都被分组到一起,然后按键进行排序,最后才被传递给reduce函数。这一过程由MapReduce框架...

    mapreduce案例代码及案例涉及文件

    在这个案例中,提供的代码和文档旨在帮助初学者理解MapReduce的工作原理,包括排序、分组和分区等关键概念。 **Map阶段**: Map阶段是MapReduce流程的起点。在这个阶段,原始数据被分割成多个小块,每个块由一个Map...

    MapReduceV2笔记

    性能优化是MapReduce应用中的关键话题,它包括但不限于合理设置map和reduce任务的数量、优化数据序列化类型和格式、合理使用分区和分组机制以及压缩技术等。 实际案例分析是学习MapReduce时理解其应用和效果的最佳...

    mapreduce在hadoop实现词统计和列式统计

    2. **Shuffle & Sort**:系统按照键(列名)进行排序和分组。 3. **Reducer**:Reducer执行相应的统计操作,比如求和、平均值等,然后将结果作为新的键值对输出。 **四、Hadoop MapReduce实践** 在实际操作中,...

    Hadoop应用系列2--MapReduce原理浅析(上)

    2. **Sorting & Grouping**:Reducer接收到的键值对会先进行排序,然后按键进行分组,确保同一键的所有值在一起。 3. **Reducing**:Reducer执行用户定义的Reduce函数,对每个键及其相关值集合进行处理,生成最终...

    hadoop 二次排序 原理

    本文将深入解析Hadoop的二次排序(Secondary Sort)原理,这是一个允许用户自定义排序规则以满足特定需求的功能。 首先,二次排序是在MapReduce框架内进行的一种特殊排序方式,它遵循两个主要步骤:第一字段排序和...

    google mapreduce

    2. **Reduce函数**:Reduce函数处理的是经过排序和分组后的中间键值对,即对于每个唯一的键,Reduce函数都会被调用来处理所有与此键相关的值。 #### 三、MapReduce的实际应用场景示例 为了更好地理解MapReduce的...

    MapReduce 2.0

    MapReduce的工作流程通常是从HDFS(Hadoop Distributed File System)中读取输入数据,将其分割为InputSplits,然后由Map任务处理,之后进行Shuffle和Sort过程,将中间结果排序后传递给Reduce任务,最终生成输出结果...

    mapreduce.tar.gz

    - 在Map阶段结束后,系统会对所有Mapper的输出进行排序,按照key进行分组,这一步骤称为Shuffle。同时,系统还会对每个key的value进行排序,以确保相同key的value在一起。 - 这个阶段对优化Reduce性能至关重要,...

    MapReduce2.0程序设计多语言编程(理论+实践)

    2. **数据输入和输出**:MapReduce作业的数据输入和输出通常涉及到自定义InputFormat和OutputFormat。例如,CSVFileInputFormat用于读取逗号分隔值文件,而TextOutputFormat则将结果输出为文本格式。 3. **性能优化...

    Hadoop 培训课程(4)MapReduce_2

    Hadoop 培训课程(4)MapReduce_2 标准和自定义计数器* Combiner和Partitioner编程** 自定义排序和分组编程** 常见的MapReduce算法** ---------------------------加深拓展---------------------- 常见大数据处理方法*

    Hadoop平台技术 排序操作案例.docx

    总结来说,Hadoop的排序机制是通过MapTask和ReduceTask的组合操作实现的,而自定义排序则需要实现WritableComparable接口并重写compareTo方法。这种灵活性使得用户可以根据具体业务需求对数据进行复杂排序,从而优化...

    htuple:在 MapReduce 中简化复合字段分区、排序和分组的库

    在 MapReduce 中,使用复合映射输出键并自定义对哪些字段进行分区、排序和分组可能很乏味,尤其是在跨多个作业执行此操作时。 这个库的目标是提供一个Tuple类,它可以包含多个元素,并提供一个ShuffleUtils类,为您...

Global site tag (gtag.js) - Google Analytics