`

hadoop patition 分区简介和自定义

 
阅读更多

 

 

0 简介:

0) 类比于新生<k,v>入学,不同的学生实现分配好了宿舍,然后进入到不同的宿舍(reduce task)

如果map发送来的数据量太大,意味着这些数据都到这个默认reduce节点执行,没有发挥reduce

并行计算的目的,IO压力也很大。 这就是分区的原因。

a) 默认下分配一个区

b) 分配几个区,则对应几个reduce任务,每个任务在执行的时候都会公用reduce内的代码

c) 自定义分区下 返回的分区数量一定要和 定义的reduce任务相同,具体来说就是:

自定义分区类 extends HashPartitioner,重写getPartition时,返回的分支个数要和

job.setNumReduceTasks(X); 中的X个数相同

如果分区格式和reducetask任务个数不同下,在hadoop不同版本中的运行情况如下:

  HashPartitioner.java  key.hashcode() & integer.maxvalue % numreducetasks =  模1恒等于0 返回值恒为0 返回值是分区的标记或者索引 part-00000 part-00001 等等
	   默认的是job.setPartitionerClass(HashPartitioner.class)  自定义分区返回的是索引数字,从0开始依次递增1返回。
	   以 手机号和座机号写在一个文件中为例:
	   如果分区数量 大于/小于 reduce数量时, 
	   2个分区 1个reduce --->  hadoop2中依旧能正常执行 只不过不会分区 所有数据都写到一个输出中   hadoop1中会报错
	   2个分区 4个reduce --->  hadoop2中依旧能正常执行 输出结果写到4个区中,第一个分区结果为手机号 第二个为座机号 剩下两个为空文件 所有数据都写到一个输出中

 

 

d) 需要打包放在hadoop环境内运行,否则在本机运行eg:eclipse环境下,会报错如下:

14/12/09 14:12:58 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Illegal partition for 84138413 (1)

 

 

 map-shuffle-reduce过程图如下:

 



 

 

 

 

 

1 代码

 

结果处理成2个区, 一个是放手机号的 一个是放固话的:

package partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 
 * 实现单词计数功能,指定分区个数(分区下必须通过打包方式来运行)
 * 1 自定义规约
 * 	1.1  规约定义好处:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短
 *  1.2 因为不是所有的算法都适合使用Combiner处理,例如求平均数,因此Combiner不作为MR运行的标配
 *  1.3 Combiner本身已经执行了reduce操作仅仅是处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据
 *      这也是Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作的原因。
 * 2 自定义分区
 *   2.1 分区运行必须打成jar运行
 *   2.2 map分几个区,则reduce有几个任务数量,每个reduce任务将对应一个输出文件
 *   2.3 分区不是越多越好,要根据业务需求,分区太多,也会造成资源创建,等待等消耗
 *   2.4 多个reduce任务在运行的好处是提高整体job的运行效率
 *   
 *   结果处理成2个区, 一个是放手机号的 一个是放固话的
 *  [root@master ~]# hadoop fs -ls /out
Warning: $HADOOP_HOME is deprecated.

Found 4 items
-rw-r--r--   1 root supergroup          0 2014-08-24 16:02 /out/_SUCCESS
drwxr-xr-x   - root supergroup          0 2014-08-24 16:02 /out/_logs
-rw-r--r--   1 root supergroup        556 2014-08-24 16:02 /out/part-r-00000
-rw-r--r--   1 root supergroup         79 2014-08-24 16:02 /out/part-r-00001

 */
public class KpiAppPatition {



	// 0 定义操作地址
	static final String FILE_ROOT = "hdfs://master:9000/";
	static final String INPUT_PATH = "hdfs://master:9000/hello";
	static final String OUT_PATH = "hdfs://master:9000/out";
	
	public static void main(String[] args) throws Exception{
		
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf);
		Path outpath = new Path(OUT_PATH);
		if(fileSystem.exists(outpath)){
			fileSystem.delete(outpath, true);
		}
		
		// 0 定义干活的人
		Job job = new Job(conf);
		job.setJarByClass(KpiAppPatition.class);
		// 1.1 告诉干活的人 输入流位置     读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper2.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWritable2.class);
		
		//1.3 分区
		job.setPartitionerClass(KpiPartitioner.class);
		job.setNumReduceTasks(2);
		
		//1.4 TODO 排序、分组    目前按照默认方式执行
		//1.5 TODO 规约
		
		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer2.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outpath);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 让干活的人干活
		job.waitForCompletion(true);
		
	}
}

// 自定义分区写法如 注意返回的值是从0开始依次累加1的int值,不能跳跃
// 否则报错 说找不到编号为X的
// 你的reduce有几个,那么就会从0开始以1为累加数字返回对应个数的分区编码 然后
// 在去你代码里找对应编码  代码中随意返回patition的num 找不到就会报错
class KpiPartitioner extends HashPartitioner<Text, KpiWritable2>{
	@Override
	public int getPartition(Text key, KpiWritable2 value, int numReduceTasks) {
		System.out.println("KpiPartitioner numReduceTasks is : " + numReduceTasks );
		return (key.toString().length()==11)?0:1; // key为key2 即 电话号码,这里 如果是手机号(11位)则返回0,否则返回1 这样会生成2个分区,1个存放手机号的 1个存放固话的
	}
}

/**
 * 将 <k1,v1> --->  <k2,v2>
 * @author zm
 */
class MyMapper2 extends Mapper<LongWritable, Text, Text, KpiWritable2>{

	/**
	 * key 表示k1 即 当前行号
	 * value 表示v1 即当前行内容
	 */
	@Override
	protected void map(LongWritable k1, Text v1, Context context)
			throws IOException, InterruptedException {
		//格式: 1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
		String[] elements = v1.toString().split("\t");
		String phoneNum = elements[1];
		KpiWritable2 v2 = new KpiWritable2(elements[6],elements[7],elements[8],elements[9]);
		Text k2 = new Text(phoneNum);
		context.write(k2, v2);
	}
}

/**
 * 将 <k2,v2> --->  <k3,v3>
 * @author zm
 */
class MyReducer2 extends Reducer<Text, KpiWritable2,Text, LongWritable>{

	protected void reduce(Text k2, Iterable<KpiWritable2> v2s,
			org.apache.hadoop.mapreduce.Reducer.Context context)
			throws IOException, InterruptedException {
		
		long upPackNum = 0L;
		long downPackNum = 0L;
		long upPayLoad = 0L;
		long downPayLoad = 0L;
		
		for(KpiWritable2 kpiWritable1 : v2s){
			upPackNum = kpiWritable1.upPackNum;
			downPackNum = kpiWritable1.downPackNum;
			upPayLoad = kpiWritable1.upPayLoad;
			downPayLoad = kpiWritable1.downPayLoad;
		}
		
		KpiWritable2 v3 = new KpiWritable2(upPackNum+"",downPackNum+"",upPayLoad+"",downPayLoad+"");
		context.write(k2, v3);
	}
	
	
}


/**
 * 自定义类型类,里面封装 上网流量信息
 * @author zm
 *
 */
class KpiWritable2 implements Writable{
	
	long upPackNum; // 上传数据包个数
	long downPackNum;// 下载数据包个数
	long upPayLoad;// 上传数据
	long downPayLoad;// 下载数据

	public KpiWritable2(String upPackNum,String downPackNum,String upPayLoad,String downPayLoad){
		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}
	
	public KpiWritable2(){}
	
	@Override
	public void write(DataOutput out) throws IOException {
		// 先写后读
		out.writeLong(this.upPackNum);
		out.writeLong(this.downPackNum);
		out.writeLong(this.upPayLoad);
		out.writeLong(this.downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// 读取的时候, 按照写方法的顺序( 队列方式) 顺序读取
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}
	
	@Override
	public String toString() {
		return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
	}
}

 

 

 

 

 

  • 大小: 26.3 KB
分享到:
评论

相关推荐

    Hadoop平台技术 分区操作案例.docx

    3. Partition分区操作案例: 这个案例旨在按照手机号的归属地(省份)进行分区,将136、137、138、139开头的手机号分别放入四个不同的文件,其余的放入第五个文件。具体实现步骤如下: a) 创建一个名为`...

    17、MapReduce的分区Partition介绍

    在MapReduce模型中,数据分区(Partition)是至关重要的一个环节,它决定了数据如何在不同的 Reduce Task 之间进行分布,从而影响整个计算的效率和结果的组织方式。 **一、数据分区** 数据分区是MapReduce工作流程...

    Partitioner, SortComparator and GroupingComparator in Hadoop

    在Hadoop MapReduce框架中,Partitioner、SortComparator和GroupingComparator是三个至关重要的组件,它们共同决定了数据如何被分发、排序以及分组,从而影响到整个MapReduce作业的性能和结果的正确性。接下来,我们...

    hadoop shuffle和排序1

    1. **分区内部排序(Within-partition sorting)**:首先,数据按照key进行排序,同一分区内的所有键值对都会根据key的自然顺序或者用户自定义的Comparator进行升序排序。此外,如果一个job配置了`...

    Hadoop数据仓库工具hive介绍.pdf

    例如,用户可以使用自定义脚本来实现更为复杂的映射和归约操作。 **2.3 分区管理** Hive的分区特性允许用户按照一定的标准(如日期、地区等)将表数据划分为多个分区,每个分区都存储在HDFS的不同位置。这不仅可以...

    Hadoop实战 随书源码

    - MapReduce中的数据处理通常涉及到数据的分片(Split)和分区(Partition),这些可能在某些代码示例中有所体现,例如`listing-10-1`和`listing-12-1`。 7. **容错机制**: - Hadoop设计了强大的容错机制,如...

    获取最大分区UDTF函数.doc

    为了更高效地管理和查询数据,有时我们需要找到表中的最大分区。本文将详细介绍如何在Hive中使用用户定义的表函数(User Defined Table Generating Function,简称UDTF)来实现这一功能。 #### 二、UDTF简介 UDTF...

    Hive开发规范及要点

    Hive是一款基于Hadoop的数据仓库工具,能够对大规模数据进行快速的查询和分析。它提供了一个SQL-like的查询语言,称为HQL(Hive Query Language),使得用户可以使用类似SQL的语句来查询和管理数据。在Hive开发中,...

    获取最大分区UDF函数.doc

    通常情况下,自定义UDF函数由于其针对性的设计和优化,能够更高效地完成特定任务。 总结来说,无论是临时函数还是永久函数,UDF都是扩展Hive功能的有效手段之一。通过合理设计和实现UDF,可以极大地提升数据处理...

    22_尚硅谷大数据之MapReduce_常见错误及解决方案1

    3. java.lang.Exception: java.io.IOException: Illegal partition for 13926435656(4),说明partition和reducetask个数没对上,调整reducetask个数。解决方案:调整reducetask个数,使其与partition个数相匹配。 ...

    cloudera-hive-cdh6.3.2源码包

    源码中的 `org.apache.hadoop.hive.ql.metadata.Partition` 类表示分区,`org.apache.hadoop.hive.ql.plan.CreateTableDesc` 中定义了如何创建分区的规则。 5. **MapReduce 与 Tez 执行引擎** Hive 默认使用 ...

    Hadoo数据仓库-hive入门全面介绍

    Hive 是一个基于 Hadoop 的数据仓库工具,它允许用户使用类似于 SQL 的语言(HQL,Hive Query Language)对大规模数据集进行分析和处理。Hive 的设计初衷是为了简化大数据处理,使得非 Java 开发者也能方便地利用 ...

    apache-hive-2.3.9-bin.tar大数据HIVE.zip

    1. **数据模型**:Hive 支持两种主要的数据存储结构——表(Table)和分区(Partition)。表是数据的基本单位,可以看作是关系数据库中的表格。分区则是对大表进行逻辑上的划分,通过将数据按特定字段值进行分类,...

    征服Hive小文件之困:策略、方法与实践

    Hive是一种基于Hadoop的数据仓库工具,它提供了一种SQL-like的查询语言,称为HiveQL,用于查询和分析...- **数据模型**:Hive支持表(Table)、外部表(External Table)、分区(Partition)和桶(Bucket)等数据模型。

    mapreduceDemo.zip

    在这个"mapreduceDemo.zip"压缩包中,我们可以通过一系列的示例深入理解MapReduce的工作原理和关键概念,如自定义分区、排序和分组。这些概念在大数据处理中至关重要,能够优化数据处理性能并确保结果的正确性。 ...

    Hive-Examples:Hadoop 的 Hive 的一些示例

    3. **表和分区**: 在 Hive 中,数据通常组织为表,可以进一步按分区(partition)划分,提高查询效率。分区是对大量数据进行逻辑分组的方法,每个分区对应一个目录,包含该分区内的所有文件。 4. **MapReduce 与 ...

    大数据 40 道面试题及答案.docx

    之后会进行一个partition分区操作,默认使用的是hashpartitioner,可以通过重写hashpartitioner的getPartition方法来自定义分区规则。之后会对key进行sort排序,grouping分组操作将相同key的value合并分组输出,在...

    机器学习中大数据对数据的排序

    通过自定义Map、Reduce和Partition类,可以实现对大量数据的有效排序。这种技术不仅适用于数据排序,还可以扩展到其他复杂的数据处理任务中。在实际应用中,根据具体需求调整MapReduce作业的参数是非常重要的。

    一个MapReduce简单程序示例

    4. 分区(Partition):根据键的哈希值将数据分配到不同的Reducer上,确保相同键的数据会分发到同一个Reducer。 Reduce阶段包括: 1. Reduce任务分配:Hadoop根据分区结果分配Reduce任务。 2. Copy阶段:Reducer从...

    WordCountMapReduce.zip

    自定义的Partitioner允许我们根据业务逻辑进行更精细的控制,比如保证同一键的所有值都被同一个Reducer处理,或者实现特定的数据分区策略,以优化数据局部性和减少网络传输。 最后,自定义的排序代码实例涉及到...

Global site tag (gtag.js) - Google Analytics