`

Hadoop MapReduce统计手机流量案例学习(结合Partitioner)

 
阅读更多
统计手机上网的上行流量和下行流量
数据格式:



统计手机的上网流量只需要“手机号”、“上行流量”、“下行流量”三个字段,根据这三个字段创建bean对象,该对象要实现Writable接口,以便实现序列化并且要有无参构造方法,hadoop会使用反射创建对象

public class PhoneBean implements Writable {
	private String phone;
	private Long upPayLoad;
	private Long downPayLoad;
	private Long totalPayLoad;

	public PhoneBean() {
	}

	public PhoneBean(String phone, Long upPayLoad, Long downPayLoad) {
		super();
		this.phone = phone;
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.totalPayLoad = upPayLoad + downPayLoad;
	}

	@Override
	public String toString() {
		return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phone);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.phone = in.readUTF();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}
        setter/getter略
}


MapReduce程序

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PhoneCount {

	public static class PCMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneBean>.Context context) throws IOException, InterruptedException {
			String val = value.toString();
			String[] vals = val.split("\t");
			String phone = vals[1];
			Long upPayLoad = Long.parseLong(vals[8]);
			Long downPayLoad = Long.parseLong(vals[9]);
			PhoneBean bean = new PhoneBean(phone, upPayLoad, downPayLoad);           // 输出map结果 
			context.write(new Text(phone), bean);
		}
	}

	public static class PCReducer extends Reducer<Text, PhoneBean, Text, PhoneBean> {
		@Override
		protected void reduce(Text key, Iterable<PhoneBean> iterable, Reducer<Text, PhoneBean, Text, PhoneBean>.Context context) throws IOException, InterruptedException {
			Long upTotal = 0L;
			Long downTotal = 0L;
			for (PhoneBean pb : iterable) {
				upTotal += pb.getUpPayLoad();
				downTotal += pb.getDownPayLoad();
			}
                        // reduce输出结果
			context.write(key, new PhoneBean("", upTotal, downTotal));
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
// 设置jar对应的class文件
		job.setJarByClass(PhoneCount.class);
// 设置map class文件
		job.setMapperClass(PCMapper.class);
// 设置reduce class文件
                job.setReducerClass(PCReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(PhoneBean.class);
                job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(PhoneBean.class);
// 设置输入文件位置
		FileInputFormat.setInputPaths(job, new Path(args[0]));
// 设置输出文件位置
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}
}


把需要的数据上传到hdfs,程序打包后运行

hadoop jar phone2.jar /phone/phone.dat /phone/output


通过partition对手机号进行划分,使用Map来模拟从数据库中查询出来的partition的规则

public static class PCPartitioner extends Partitioner<Text, PhoneBean> {

		private static Map<String, Integer> dataMap = new HashMap<String, Integer>();
		static {
// 第一分区
			dataMap.put("135", 1);
			dataMap.put("136", 1);
			dataMap.put("137", 1);
// 第二分区
			dataMap.put("138", 2);
			dataMap.put("139", 2);
// 第三分区
			dataMap.put("150", 3);
		}

		@Override
		public int getPartition(Text key, PhoneBean value, int numPartitions) {
			String phone = key.toString();
			String code = phone.substring(0, 3);
			Integer partition = dataMap.get(code);
			return partition == null ? 0 : partition;
		}
	}


总结:分区Partitioner主要作用在于以下两点

(1)根据业务需要,产生多个输出文件;

(2)多个reduce任务并发运行,提高整体job的运行效率

设置reduce的任务数,通过参数传入程序

		// 指定Partitioner文件
		job.setPartitionerClass(PCPartitioner.class);
// 设置Reduce任务数量
		job.setNumReduceTasks(Integer.parseInt(args[2]));


partition分了0、1、2、3个区总共四个分区,但如果reduce的数量小于partition的会报一个IO的异常,因为每个reduce对应一个输出文件

#设置reduce的数量为3  
hadoop jar phone3.jar /phone/phone.dat /phone/output1 3  
#程序执行时的异常  
15/09/21 16:51:34 INFO mapreduce.Job: Task Id : attempt_1442818713228_0003_m_000000_0, Status : FAILED  
Error: java.io.IOException: Illegal partition for 15013685858 (3)  


如果设置的reduce的数量大于partition数量,写出的reduce文件将为空文件

#设置reduce数量为5
hadoop jar phone3.jar /phone/phone.dat /phone/output2 5

[root@centos1 sbin]# hadoop fs -ls /phone/output2
Found 6 items
-rw-r--r--   1 root supergroup          0 2015-09-21 16:53 /phone/output2/_SUCCESS
-rw-r--r--   1 root supergroup        156 2015-09-21 16:53 /phone/output2/part-r-00000
-rw-r--r--   1 root supergroup        241 2015-09-21 16:53 /phone/output2/part-r-00001
-rw-r--r--   1 root supergroup        127 2015-09-21 16:53 /phone/output2/part-r-00002
-rw-r--r--   1 root supergroup         27 2015-09-21 16:53 /phone/output2/part-r-00003
-rw-r--r--   1 root supergroup          0 2015-09-21 16:53 /phone/output2/part-r-00004



partiton的注意事项:
1、partition规则要清晰
2、reduce的数量要等于或大于partition数量


转自:http://mvplee.iteye.com/blog/2245011

  • 大小: 53.8 KB
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    Hadoop MapReduce Cookbook 源码

    在阅读和实践过程中,建议读者结合Hadoop官方文档和其他相关资料,以便更全面地学习。同时,不断进行代码调试和性能调优,是提升自身技能的关键步骤。多加练习,逐步积累经验,将有助于在大数据处理领域实现稳步上升...

    Hadoop MapReduce实战手册(完整版)

    总之,《Hadoop MapReduce实战手册》全面覆盖了MapReduce的基本概念、工作流程、编程模型以及在大数据处理中的实际应用,是学习和理解大数据处理技术的理想读物。通过深入阅读,读者可以提升在大数据环境下的编程和...

    007_hadoop中MapReduce应用案例_1_数据去重

    在这个"007_hadoop中MapReduce应用案例_1_数据去重"的主题中,我们将深入探讨如何利用MapReduce解决数据去重的问题。这个案例可能涉及到对大数据集进行清洗和预处理,以确保数据的准确性和一致性。 首先,我们来看`...

    大数据-hadoop-mapreduce代码

    在大数据处理领域,Hadoop MapReduce 是一个至关重要的组件,它为海量数据的...通过研究这些代码,你可以学习到如何处理Hadoop集群上的大规模数据,如何编写高效的Mapper和Reducer,以及如何设置和调优MapReduce作业。

    Hadoop MapReduce Cookbook

    ### Hadoop MapReduce Cookbook知识点概览 #### 一、Hadoop MapReduce简介 ...通过对本书的学习,读者不仅可以掌握Hadoop MapReduce的基本操作,还能学会如何解决实际工作中遇到的各种复杂问题。

    hadoop mapreduce2

    在IT行业中,Hadoop MapReduce是一个关键的分布式计算框架,尤其在大数据处理领域。MapReduce的设计理念是将大型数据集的处理任务分解成两个主要阶段:...因此,不断学习和实践是成为Hadoop MapReduce专家的必经之路。

    006_hadoop中MapReduce详解_3

    总的来说,"006_hadoop中MapReduce详解_3"可能涵盖了MapReduce的高级话题,包括优化技巧、错误处理和实战案例分析。通过阅读`TestMR.java`和`WordCount.java`的源代码,我们可以深入理解MapReduce的编程模型和实现...

    MapReduce字数统计案例

    在这个"MapReduce字数统计案例"中,我们将深入理解MapReduce的工作原理,并通过一个简单的字数统计任务来学习如何应用它。这个案例适合初学者和有经验的开发者进行实践与交流。 首先,MapReduce的工作流程分为两个...

    hadoop-mapreduce

    这个项目是一个学习Hadoop MapReduce的实践项目,利用Maven构建,无需单独安装Hadoop环境,只需在IDE中打开即可运行,方便初学者进行快速上手和实践。 MapReduce的工作原理分为两个主要阶段:Map阶段和Reduce阶段。...

    基于Java的Hadoop HDFS和MapReduce实践案例设计源码

    该套源码是个人学习Hadoop HDFS和MapReduce技术的实践案例集合,采用Java语言编写,包含45个文件,涵盖34个Java源文件、4个XML配置文件、3个偏好设置文件以及1个Git忽略文件等。内容涵盖HDFS的JAVA API操作,如文件...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    有前三章的内容前言第一部分 基础篇第1章 阅读源代码前的准备1.1 准备源代码学习环境1.1.1 基础软件下载1.1.2 如何准备Windows环境1.1.3 如何准备Linux环境1.2 获取Hadoop源代码1.3 搭建Hadoop源代码阅读...

    Hadoop按日期统计访问次数及测试数据

    本篇将深入探讨如何利用Hadoop按日期统计访问次数,并结合测试数据进行实战解析。 首先,Hadoop的数据处理通常涉及日志分析,例如网站访问日志。这些日志记录了用户的访问行为,包括访问时间、页面、IP地址等信息。...

    hadoop eclipse mapreduce下开发所有需要用到的JAR包

    Eclipse通过特定的插件与Hadoop相结合,使得开发者可以在本地环境中编写、测试和调试MapReduce作业,然后部署到Hadoop集群上运行。 标题提到的“hadoop eclipse mapreduce下开发所有需要用到的JAR包”是指为了在...

    Hadoop_MapReduce教程

    通过本教程的学习,读者将掌握如何使用Hadoop MapReduce处理大规模数据集的基本技能。 #### 先决条件 在开始学习本教程之前,请确保已满足以下条件: - **Hadoop 安装与配置**:请确认Hadoop 已经被正确安装、配置...

    Hadoop 自定义 Partitioner 源代码

    其中,Partitioner 是 Hadoop MapReduce 框架中的关键组件,它负责决定 map 函数产生的中间键值对(key-value pairs)应被哪些 reduce task 处理。自定义 Partitioner 允许用户根据业务需求定制键的分发策略,从而...

    大数据课程设计-Hadoop-MapReduce实现sql的统计、groupby和join-全部源码

    本课程设计主要围绕如何使用Hadoop的MapReduce实现SQL中的统计、GROUP BY和JOIN操作,这是一次深入理解大数据处理机制的实践过程。 首先,让我们来探讨SQL的统计功能。在SQL中,统计通常涉及到COUNT、SUM、AVG、MAX...

    005_hadoop中MapReduce详解_2

    在Hadoop生态系统中,MapReduce是一种分布式计算框架,它允许我们处理海量数据并行化,非常适合大规模数据集的处理。本文将深入解析MapReduce的工作原理、核心组件以及如何编写一个基本的MapReduce程序。 MapReduce...

    Partitioner.zip

    通过解压并学习这些内容,你可以深入理解Hadoop MapReduce的分区机制,并掌握如何根据实际需求优化数据处理流程。 总的来说,Partitioner是Hadoop MapReduce中一个非常关键的组件,它允许用户根据业务逻辑调整数据...

    基于MapReduce的词频统计程序及其重构、MapReduce编程之Combiner、Partitioner组件应用.zip

    《基于MapReduce的词频统计程序及其重构与MapReduce编程中的Combiner、Partitioner组件应用》 在当今大数据处理的领域中,Hadoop作为分布式计算框架的重要代表,以其高效、可扩展的特性受到了广泛关注。尤其在人工...

    大数据mapreduce案例

    MapReduce是大数据处理领域中的一个核心框架,由Google在2004年提出,主要用于海量...在"大数据mapreduce案例"中,深入学习MapReduce的工作流程、编程模型以及相关优化策略,将有助于提升你在大数据领域的专业技能。

Global site tag (gtag.js) - Google Analytics