统计手机上网的上行流量和下行流量
数据格式:
统计手机的上网流量只需要“手机号”、“上行流量”、“下行流量”三个字段,根据这三个字段创建bean对象,该对象要实现Writable接口,以便实现序列化,并且要有无参构造方法,hadoop会使用反射创建对象
public class PhoneBean implements Writable { private String phone; private Long upPayLoad; private Long downPayLoad; private Long totalPayLoad; public PhoneBean() { } public PhoneBean(String phone, Long upPayLoad, Long downPayLoad) { super(); this.phone = phone; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; this.totalPayLoad = upPayLoad + downPayLoad; } @Override public String toString() { return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeLong(upPayLoad); out.writeLong(downPayLoad); } @Override public void readFields(DataInput in) throws IOException { this.phone = in.readUTF(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } setter/getter略 }
程序,注意不要引错包
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PhoneCount { public static class PCMapper extends Mapper<LongWritable, Text, Text, PhoneBean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneBean>.Context context) throws IOException, InterruptedException { String val = value.toString(); String[] vals = val.split("\t"); String phone = vals[1]; Long upPayLoad = Long.parseLong(vals[8]); Long downPayLoad = Long.parseLong(vals[9]); PhoneBean bean = new PhoneBean(phone, upPayLoad, downPayLoad); context.write(new Text(phone), bean); } } public static class PCReducer extends Reducer<Text, PhoneBean, Text, PhoneBean> { @Override protected void reduce(Text key, Iterable<PhoneBean> iterable, Reducer<Text, PhoneBean, Text, PhoneBean>.Context context) throws IOException, InterruptedException { Long upTotal = 0L; Long downTotal = 0L; for (PhoneBean pb : iterable) { upTotal += pb.getUpPayLoad(); downTotal += pb.getDownPayLoad(); } context.write(key, new PhoneBean("", upTotal, downTotal)); } } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(PhoneCount.class); job.setMapperClass(PCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PhoneBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(PCReducer.class); // reducer input key and value equals reduce output key and value ignore job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PhoneBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
把需要的数据上传到hdfs,程序打包后运行
hadoop jar phone2.jar /phone/phone.dat /phone/output
结果
13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954 2070 13560439658 2034 5892 7926 略
其中手机号13560439658是两次上网,其它手机号都是一次上网,该手机号作为验证数据
通过partition对手机号进行划分,使用Map来模拟从数据库中查询出来的partition的规则
public static class PCPartitioner extends Partitioner<Text, PhoneBean> { private static Map<String, Integer> dataMap = new HashMap<String, Integer>(); static { dataMap.put("135", 1); dataMap.put("136", 1); dataMap.put("137", 1); dataMap.put("138", 2); dataMap.put("139", 2); dataMap.put("150", 3); } @Override public int getPartition(Text key, PhoneBean value, int numPartitions) { String phone = key.toString(); String code = phone.substring(0, 3); Integer partition = dataMap.get(code); return partition == null ? 0 : partition; } }
设置reduce的任务数,通过参数传入程序
// set partition job.setPartitionerClass(PCPartitioner.class); job.setNumReduceTasks(Integer.parseInt(args[2]));
partition分了0、1、2、3个区总共四个分区,但如果reduce的数量小于partition的会报一个IO的异常,因为每个reduce对应一个输出文件
#设置reduce的数量为3 hadoop jar phone3.jar /phone/phone.dat /phone/output1 3 #程序执行时的异常 15/09/21 16:51:34 INFO mapreduce.Job: Task Id : attempt_1442818713228_0003_m_000000_0, Status : FAILED Error: java.io.IOException: Illegal partition for 15013685858 (3)
如果设置的reduce的数量大于partition数量,写出的reduce文件将为空文件
#设置reduce数量为5 hadoop jar phone3.jar /phone/phone.dat /phone/output2 5 [root@centos1 sbin]# hadoop fs -ls /phone/output2 Found 6 items -rw-r--r-- 1 root supergroup 0 2015-09-21 16:53 /phone/output2/_SUCCESS -rw-r--r-- 1 root supergroup 156 2015-09-21 16:53 /phone/output2/part-r-00000 -rw-r--r-- 1 root supergroup 241 2015-09-21 16:53 /phone/output2/part-r-00001 -rw-r--r-- 1 root supergroup 127 2015-09-21 16:53 /phone/output2/part-r-00002 -rw-r--r-- 1 root supergroup 27 2015-09-21 16:53 /phone/output2/part-r-00003 -rw-r--r-- 1 root supergroup 0 2015-09-21 16:53 /phone/output2/part-r-00004
partiton的注意事项:
1、partition规则要清晰
2、reduce的数量要等于或大于partition数量
相关推荐
08.mapreduce编程案例--流量统计求和--自定义数据类型.mp4
为MapReduce框架对电话号码的上行流量和下行流量及总流量进行统计的模板数据
python实现mapreduce词频统计 执行方式:打开cmd命令,cd到代码所在文件夹,输入python wordcout_map.py > words.txt | sort | python wordcout_reduce.py执行
【大数据Hadoop MapReduce词频统计】 大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的...
### MapReduce下的PCA异常流量检测系统实现 #### 1. 引言 随着信息技术的快速发展,互联网流量呈现出爆炸式增长趋势。在这种背景下,网络运营商面临着前所未有的挑战,尤其是在处理异常流量方面。网络异常流量,即...
在这个场景中,我们将讨论如何使用Hadoop的MapReduce来实现词统计和列式统计。 **一、MapReduce原理** MapReduce的工作流程主要包括三个主要阶段:Map、Shuffle(排序)和Reduce。在Map阶段,输入数据被分割成多个...
### MapReduce统计度分布知识点详解 #### 一、概述 MapReduce是一种编程模型,用于处理大规模数据集(通常是大于1TB的数据)。它是由Google提出的,随后Hadoop项目实现了这一模型,使得用户可以在集群上分布式地...
在这个"MapReduce字数统计案例"中,我们将深入理解MapReduce的工作原理,并通过一个简单的字数统计任务来学习如何应用它。这个案例适合初学者和有经验的开发者进行实践与交流。 首先,MapReduce的工作流程分为两个...
1)统计每一个手机号耗费的总上行流量、下行流量、总流量 2)将统计结果按照手机归属地不同号段(手机号前3位)输出到不同文件中 3)根据需求1)产生的结果再次对总流量进行排序。 4)按照要求2)每个手机号段输出的...
利用MapReduce实现了求学生成绩的最大值,最小值,及成绩分布。结合我的博客“MapReduce之学生平均成绩”看,效果更好。
网站流量数据分析 (MapReduce+Hive综合实验)
本学习案例聚焦于MapReduce框架在天气统计中的应用,通过实际编程实践加深对框架的理解。 首先,MapReduce的工作原理可以概括为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,输入的数据被分割成多个小块,每个块...
在这个“MapReduce数据统计简单实例”中,我们将探讨如何使用Hadoop的Eclipse插件来开发MapReduce程序,以及如何实现对数据的简单统计处理和可视化结果。 首先,我们来看Map阶段。在Map阶段,输入数据被分割成多个...
本教程将聚焦于如何利用Hadoop对上网流量源数据进行统计分析,这对于我们理解网络行为、优化网络服务以及制定数据驱动的决策至关重要。 首先,我们要理解Hadoop的核心组件:HDFS(Hadoop Distributed File System)...
本项目"基于Hadoop平台使用MapReduce统计某银行信用卡违约用户数量"旨在利用Hadoop的MapReduce组件来分析银行信用卡用户的违约情况,这对于银行的风险控制和信用评估具有重要意义。 MapReduce是Hadoop的核心组成...
MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar
接下来是flowcount流量统计程序,这可能涉及到网络数据的处理。在实际应用中,可能会收集到大量的网络流量日志,例如用户访问、请求响应时间等信息。通过MapReduce,我们可以分析这些数据,例如计算每小时的请求数量...
数据存储实验5-编写MapReduce程序实现词频统计 本实验的主要目的是通过编写MapReduce程序来实现词频统计,熟悉Hadoop中的MapReduce模块的处理逻辑和编程。实验中,我们将使用Linux操作系统和Eclipse或Intellij Idea...