`

MapReduce(2)

阅读更多
一、去除重复的内容

源文件:
192.168.234.21
192.168.234.22
192.168.234.21
192.168.234.21
192.168.234.23
192.168.234.21
192.168.234.21
192.168.234.21
192.168.234.25
192.168.234.21
192.168.234.21
192.168.234.26
192.168.234.21

1.新建 Path : distinct ,上传文件

2.Mapper.java

public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(value, NullWritable.get());
	}
}



因为无需统计任务内容,所以value传空,故可用 NullWritable

3.Reducer.java

public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<NullWritable> value,
			Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
		
		context.write(key, NullWritable.get());
	}
}


4.Driver

public class DistinctDriver {

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(DistinctDriver.class);
		job.setMapperClass(DistinctMapper.class);
		job.setReducerClass(DistinctReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.76.131:9000/distinct/distinct.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.76.131:9000/distinct/result"));
	
		job.waitForCompletion(true);
	}

}




二、面向对象编程

源文件:
手机号码 地区 姓名 流量使用
13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987

1.面向对象编程,建立对象
package com.study.flow.day01;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable{

	private String phone ;
	private String addr ;
	private String name ;
	private Integer flow ;
	
	
	
	public String getPhone() {
		return phone;
	}
	public void setPhone(String phone) {
		this.phone = phone;
	}
	public String getAddr() {
		return addr;
	}
	public void setAddr(String addr) {
		this.addr = addr;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public Integer getFlow() {
		return flow;
	}
	public void setFlow(Integer flow) {
		this.flow = flow;
	}
	
	// 反序列化,顺序要与序列化时的顺序相同
	@Override
	public void readFields(DataInput input) throws IOException {
		
		this.phone = input.readUTF();
		this.addr = input.readUTF();
		this.name = input.readUTF();
		this.flow = input.readInt();
	}
	
	// 序列化
	@Override
	public void write(DataOutput output) throws IOException {
		output.writeUTF(phone);
		output.writeUTF(addr);
		output.writeUTF(name);
		output.writeInt(flow);
		
	}
	@Override
	public String toString() {
		return "FlowBean [phone=" + phone + ", addr=" + addr + ", name=" + name + ", flow=" + flow + "]";
	}
	
	
}



2.Mapper

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String [] datas = line.split(" ");
		FlowBean bean = new FlowBean();
		bean.setPhone(datas[0]);
		bean.setAddr(datas[1]);
		bean.setName(datas[2]);
		bean.setFlow(Integer.valueOf(datas[3]));
		
		context.write(new Text(bean.getName()), bean);
	}
}


3.Reducer


public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

	@Override
	protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		
		Integer sum = 0 ;
		FlowBean bean = new FlowBean();
		for(FlowBean flowBean : values){
			sum = sum + flowBean.getFlow();
			bean.setPhone(flowBean.getPhone());
			bean.setAddr(flowBean.getAddr());
			bean.setName(flowBean.getName());
		}
		bean.setFlow(sum);
		
		context.write(key, bean);
	}
}



4.Driver

public class FlowDriver {

	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

		// 配置文件
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		// 设置运行入口,Mapper,Reducer 
		job.setJarByClass(FlowDriver.class);
		job.setMapperClass(FlowMapper.class);
		job.setReducerClass(FlowReducer.class);
		// 设置Mapper 的key value 类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		// 设置Reducer 的 key value 类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		// 设置文件的读入 输出目录
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.76.131:9000/flow"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.76.131:9000/flow/result"));
		
		job.waitForCompletion(true);
		
	}

}


分享到:
评论

相关推荐

    Hadoop MapReduce v2 Cookbook, 2nd Edition-Packt Publishing(2015) 高清完整版PDF下载

    ### Hadoop MapReduce V2 知识点概览 #### 一、Hadoop MapReduce V2 生态系统介绍 **Hadoop MapReduce V2** 是Hadoop生态系统中的一个关键组件,用于处理大规模数据集。相较于V1版本,V2版本在架构上进行了重大...

    Hadoop MapReduce v2 Cookbook.pdf

    2. **Container概念**:应用程序的执行单元是Container,它包含了运行应用程序所需的所有资源(如内存、CPU等)。 3. **更细粒度的资源分配**:YARN允许更精细的资源管理,使得系统能够更好地利用硬件资源。 4. **更...

    MapReduce2中自定义排序分组

    MapReduce2(也称为 YARN)是 Hadoop 2.x 版本引入的重要改进,旨在解决原 MapReduce 的资源管理和调度问题。本文将详细探讨在 MapReduce2 中如何实现自定义排序和分组,以满足特定的数据处理需求。 首先,了解 ...

    Hadoop.MapReduce.v2.Cookbook pdf

    这本书籍详细介绍了如何在Hadoop 2.x环境中有效地设计、开发和优化MapReduce作业。 Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储海量数据。它的核心组件包括HDFS(Hadoop Distributed File ...

    Hadoop MapReduce v2 Cookbook (第二版)

    Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing

    hadoop mapreduce2

    标题“hadoop mapreduce2”可能指的是在Hadoop 2.x版本下进行MapReduce编程,这个版本引入了YARN(Yet Another Resource Negotiator),作为资源管理和调度器,改进了原本单一JobTracker的架构,使得系统更加灵活和...

    MapReduceV1和V2的API区别

    随着时间的推移,MapReduce经历了两次主要的版本升级,即MapReduce V1(MRv1)和MapReduce V2(MRv2,也称为YARN)。这两个版本在API设计和执行模型上有所不同,影响了开发人员的工作流程和系统性能。下面将详细讨论...

    大数据实验四-MapReduce编程实践

    2. **实际编程经验积累**:通过编写MapReduce程序,积累了实际编程经验,熟悉了Hadoop和MapReduce的API。 3. **分布式计算的认识**:认识到分布式计算的局限性与优势,在实际应用中需要权衡数据规模和计算需求。 4. ...

    Hadoop MapReduce v2 Cookbook(PACKT,2ed,2015)

    Starting with installing Hadoop YARN, MapReduce, HDFS, and other Hadoop ecosystem components, with this book, you will soon learn about many exciting topics such as MapReduce patterns, using Hadoop to...

    MapReduce计算框架

    MapReduce是一种编程模型和处理大数据集的相关实现,它是Google提出的一个软件框架。Hadoop的MapReduce是一个更加著名的开源实现。MapReduce模型主要用于将大任务拆分成许多小任务,然后在集群的不同节点上并行计算...

    hadoop-mapreduce-client-common-2.7.3-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...

    数据存储实验5-编写MapReduce程序实现词频统计.doc(实验报告)

    file2.txt: mapreduce science big data hadoop file3.txt: hadoop mapreduce wordcount computer science 然后,我们需要启动Hadoop伪分布式,并将input文件夹上传到HDFS上。接下来,我们需要编写MapReduce程序来...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理 PDF高清扫描版.pdf-218M

    2. **MapReduce编程模型**:详细解释Map函数和Reduce函数的编写,以及如何定义输入输出格式,理解JobConf配置参数,以及如何使用Hadoop API编写MapReduce程序。 3. **数据分片与任务分配**:探讨如何将大数据集切...

    hadoop大数据平台技术与应用-第4章MapReduce.pdf

    典型应用案例展示了MapReduce在处理不同类型数据上的高效性,如何将复杂的数据处理问题简化为MapReduce模型,并通过具体的案例分析来指导开发人员编写更加高效的MapReduce程序。 4.6 MapReduce网站浏览量统计分析 ...

    第5章大数据技术教程-MapReduce运行原理及Yarn介绍.docx

    于是产生了 MapReduce2,为了区别版本,将老版本的 MapReduce 称为 MapReduce1,MapReduce2 称之为 YARN(Yet Another Resource Negotiator,另一种资源协调者)。 YARN 的设计思想是为了改善 MapReduce 的实现,但...

    mapreduce快速入门

    1.什么是mapreduce 2.编写mapreduce典型demo 3.理解mapreduce核心思想 4.熟练编写mapreduce典型demo

    MapReduce技术揭秘.pptx

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大量数据集。该模型的核心思想是将复杂的大规模数据处理任务分解为两个主要阶段:Map(映射)和Reduce(归约),使其能够在大规模分布式...

    基于MapReduce实现决策树算法

    2. 基于MapReduce的决策树算法实现:在Reducer中,主要实现了决策树算法的计算工作,包括对树的构建、决策树的分裂和叶节点的计算等。Reducer需要对Mapper输出的结果进行处理和计算,以生成最终的决策树模型。 3. ...

    藏经阁-Apache Hadoop 3.0_ What’s new in YARN & MapReduce.pdf

    1. MapReduce 2.x:MapReduce 2.x 是 Hadoop 3.0 版本中的一个重要更新。MapReduce 2.x 提高了应用程序的性能和可靠性。 2. MapReduce Scheduling Enhancements:MapReduce 的调度机制得到了改进,提高了资源利用率...

Global site tag (gtag.js) - Google Analytics