`

Hadoop读书笔记(十一)MapReduce中的partition分组

阅读更多

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855

1.partition分组

partition是指定分组算法,以及通过setNumReduceTasks设定Reduce的任务个数

2.代码

KpiApp.ava
package cmd;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 
 * <p> 
 * Title: KpiApp.java 
 * Package mapReduce 
 * </p>
 * <p>
 * Description: 本例demo中统计给定的流量文件每个手机号码的流量,使用Partitioner分组,如果setNumReduceTasks大于1,必须将代码打成jar
 * <p>
 * @author Tom.Cai
 * @created 2014-11-25 下午10:23:33 
 * @version V1.0 
 *
 */
public class KpiApp extends Configured implements Tool{

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new KpiApp(), args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		String INPUT_PATH = arg0[0];
		String OUT_PATH = arg0[1];
		FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), new Configuration());
		Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}

		Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);

		job.setMapperClass(KpiMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWite.class);

		job.setPartitionerClass(kpiPartitioner.class);
		job.setNumReduceTasks(2);
		

		job.setReducerClass(KpiReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWite.class);

		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);

		job.waitForCompletion(true);
		return 0;
	}
	
	
	static class KpiMapper extends Mapper<LongWritable, Text, Text, KpiWite> {
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] splited = value.toString().split("\t");
			String num = splited[1];
			KpiWite kpi = new KpiWite(splited[6], splited[7], splited[8], splited[9]);
			context.write(new Text(num), kpi);
		}
	}

	static class KpiReducer extends Reducer<Text, KpiWite, Text, KpiWite> {
		@Override
		protected void reduce(Text key, Iterable<KpiWite> value, Context context) throws IOException, InterruptedException {
			long upPackNum = 0L;
			long downPackNum = 0L;
			long upPayLoad = 0L;
			long downPayLoad = 0L;
			for (KpiWite kpi : value) {
				upPackNum += kpi.upPackNum;
				downPackNum += kpi.downPackNum;
				upPayLoad += kpi.upPayLoad;
				downPayLoad += kpi.downPayLoad;
			}
			context.write(key, new KpiWite(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)));
		}
	}
	
	static class kpiPartitioner extends HashPartitioner<Text, KpiWite>{

		@Override
		public int getPartition(Text key, KpiWite value, int numReduceTasks) {
			return (key.toString().length() == 11) ? 0:1; 
		}
		
	}
}

class KpiWite implements Writable {
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;

	public KpiWite() {
	}

	public KpiWite(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) {
		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upPackNum);
		out.writeLong(downPackNum);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
	}

}

3.备注:
使用Partitioner分组,如果setNumReduceTasks大于1,必须将代码打成jar包运行
否则将报以下错误:
java.io.IOException: Illegal partition for 84138413 (1)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691)
	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
	at mapReduce.KpiApp$KpiMapper.map(KpiApp.java:75)
	at mapReduce.KpiApp$KpiMapper.map(KpiApp.java:1)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)

欢迎大家一起讨论学习!

有用的自己收!

记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang

 

分享到:
评论

相关推荐

    Hive操作笔记(呕心沥血制作)

    Hive 依赖于 Hadoop 生态系统,包括 MapReduce 或 Spark 来执行查询。 2. **Hive 结构图** Hive 的架构通常包括一个 Metastore 服务,用于存储元数据(如表结构、分区信息等),一个 Hiveserver2 服务,处理客户端...

    大数据hive笔记.zip

    本笔记将全面深入地探讨Hive在大数据处理中的应用、原理及其实战技巧。 一、Hive简介 Hive是Apache软件基金会下的一个开源项目,它提供了一种基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表...

    hive自学笔记.docx

    Hive 是一种基于 Hadoop 的数据仓库工具,用于在 Hadoop 文件系统(HDFS)上存储和处理大规模数据。它提供了SQL-like查询语言...同时,结合Hadoop的其他组件如MapReduce或Spark,可以构建更复杂的数据处理和分析流程。

    Hadoop-2.8.0-Day07-HA-Hive安装部署与HQL-课件与资料.zip

    在本课程中,我们将深入探讨Hadoop 2.8.0版本中的高可用性(HA)配置,以及如何安装和部署Hive以及使用HQL进行数据处理。Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它使得在大规模数据集上运行应用程序...

    项目笔记1

    Hadoop的Shuffle过程是MapReduce计算模型中至关重要的一环,它发生在MapTask和ReduceTask之间。首先,MapTask将处理的数据分块存储在内存缓冲区,当达到预设的溢写阈值时,数据会被溢写到磁盘,并在溢写过程中进行...

    hive基础知识复习笔记.zip

    - **减少 Shuffle 和 Sort**:避免全表扫描,利用JOIN策略和PARTITION BY。 - **使用Bucketing**:根据指定列对数据进行哈希分桶,优化JOIN操作。 - **使用Materialized Views**:预先计算并存储查询结果,加速...

    大数据学习笔记.pdf

    1.3 Spark与Hadoop集成 ................................................................................................ 7 1.4 Spark组件 ....................................................................

    面试总结.txt

    - **GroupByKey**: 这是Spark中的一种Transformation操作,用于将数据按照键进行分组,然后对每个键的所有值执行聚合操作。需要注意的是,GroupByKey通常会导致大量的Shuffle操作,因此在大数据集上可能不是最佳选择...

    分布式系统原理与范型期末复习资料

    9. **CAP理论**:任何分布式系统只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition Tolerance)中的两个,不能三者兼得。 10. **故障恢复与检测**:心跳检测、冗余备份、检查点...

    distributed-systems:与我有关分布式系统的讲座有关的材料和源代码-Source material

    Hadoop是大数据处理的基石,它的HDFS(Hadoop Distributed File System)和MapReduce是分布式存储和计算的经典范例。Kafka是一个高吞吐量的实时流处理平台,而Elasticsearch则是一个分布式搜索引擎,它们都展示了...

Global site tag (gtag.js) - Google Analytics