`

MapReduce(4)

阅读更多
一、处理多个文件
求每个同学每科成绩的总分
chinese.txt

english.txt

math.txt

1.新建 path : score , 上传源文件

Mapper.java

package com.study.score.day01;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * 统计每个人三科的每科各月的总成绩
 * key : 姓名
 * value : student
 * Map : 映射数据
 * 
 * Mapper 数量 = 切片的数量 
 */
public class ScoreMapper extends Mapper<LongWritable, Text, Text, Student> {

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Student>.Context context)
			throws IOException, InterruptedException {

		// 文件名称,根据文件名称判断成绩是哪个科目的
		FileSplit split = (FileSplit) context.getInputSplit();
		String textName = split.getPath().getName();
		String [] textNames = textName.split("\\.");
		textName = textNames[0];
		// 每行的内容
		// 1 zhang 89  月份 姓名 成绩
		String lineContent = value.toString() ;
		// 
		String [] datas = lineContent.split(" ");
		String name = datas[1];
		String score = datas[2];
		
		Student student = new Student();
		student.setName(name);
		if("chinese".equals(textName)){
			student.setChinese(Integer.valueOf(score));
			student.setEnglish(0);
			student.setMath(0);
		}else if("english".equals(textName)){
			student.setEnglish(Integer.valueOf(score));
			student.setMath(0);
			student.setChinese(0);
		}else if("math".equals(textName)){
			student.setMath(Integer.valueOf(score));
			student.setChinese(0);
			student.setEnglish(0);
		}
		context.write(new Text(name), student);
	}
}



2.Reducer.java
package com.study.score.day01;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ScoreReducer extends Reducer<Text, Student, Text, Student> {

	@Override
	protected void reduce(Text key, Iterable<Student> values, Reducer<Text, Student, Text, Student>.Context context)
			throws IOException, InterruptedException {

		Student student = new Student();
		student.setName(key.toString());
		
		Integer chinese = 0;
		Integer english = 0 ;
		Integer math = 0 ;
		for(Student stu : values){
			chinese = chinese + stu.getChinese();
			english = english + stu.getEnglish();
			math = math + stu.getMath();
		}
		student.setChinese(chinese);
		student.setEnglish(english);
		student.setMath(math);
		
		context.write(key, student);
	}
}




3.Student.java

package com.study.score.day01;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Student implements Writable{

	private String name ;
	private Integer chinese ;
	private Integer english ;
	private Integer math ;
	
	
	
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public Integer getChinese() {
		return chinese;
	}
	public void setChinese(Integer chinese) {
		this.chinese = chinese;
	}
	public Integer getEnglish() {
		return english;
	}
	public void setEnglish(Integer english) {
		this.english = english;
	}
	public Integer getMath() {
		return math;
	}
	public void setMath(Integer math) {
		this.math = math;
	}
	@Override
	public void readFields(DataInput input) throws IOException {
		this.name = input.readUTF();
		this.chinese = input.readInt();
		this.english = input.readInt();
		this.math = input.readInt();
	}
	@Override
	public void write(DataOutput output) throws IOException {
		output.writeUTF(name);
		output.writeInt(chinese);
		output.writeInt(english);
		output.writeInt(math);
	}
	@Override
	public String toString() {
		return "Student [name=" + name + ", chinese=" + chinese + ", english=" + english + ", math=" + math + "]";
	}
	
	
}




4.Driver.java

public class ScoreDriver {

	public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(ScoreDriver.class);
		job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Student.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Student.class);
	// 读取路径下的所有文件,此时 result 文件夹不存在	
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.76.131:9000/score"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.76.131:9000/score/result"));
	
		job.waitForCompletion(true);
	}

}




总结:
利用 content.getInputFile().getPath().getName() 文件名称来区分成绩的归属,即是哪个科目的成绩

二、排序

备注:若使用插件上传文件后乱码,检查 Eclipse 工作空间的编码格式,设置为UTF-8与文件的编码格式相同

根据电影热度对电影排序
惊天破 72
机械师2 83
奇异博士 67
但丁密码 79
比利林恩的中场战事 84
侠探杰克:永不回头 68
龙珠Z:复活的弗利萨 79
长城 56


1.Po

package com.study.sort.day01;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class MovieBean implements WritableComparable<MovieBean>{

	private String name ;
	private Integer hotNum ;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public Integer getHotNum() {
		return hotNum;
	}
	public void setHotNum(Integer hotNum) {
		this.hotNum = hotNum;
	}
	@Override
	public void readFields(DataInput input) throws IOException {
		this.name = input.readUTF();
		this.hotNum = input.readInt();
	}
	@Override
	public void write(DataOutput output) throws IOException {
		output.writeUTF(this.name);
		output.writeInt(this.hotNum);
	}
	@Override
	public String toString() {
		return "MovieBean [name=" + name + ", hotNum=" + hotNum + "]";
	}
// 降序排序:旧对象 - 当前对象
	@Override
	public int compareTo(MovieBean o) {
		return o.getHotNum() - this.getHotNum();
	}

}



2.Mapper

public class SortMapper extends Mapper<LongWritable, Text, MovieBean, NullWritable> {

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, MovieBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString() ;
		String [] datas = line.split(" ");
		MovieBean movieBean = new MovieBean();
		movieBean.setName(datas[0]);
		movieBean.setHotNum(Integer.valueOf(datas[1]));
		
		context.write(movieBean, NullWritable.get());
	}
}



3.Driver

public class SortDriver {

	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SortDriver.class);
		job.setMapperClass(SortMapper.class);
		
		job.setMapOutputKeyClass(MovieBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.76.131:9000/sort"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.76.131:9000/sort/result"));
		
		job.waitForCompletion(true);
		
	}

}



重点:
PO 实现 比较接口,定义比较结果
Mapper 排序是根据KEY值进行排序的,所以 PO类作为KEY值

三、多层MR处理

在第一层MR处理基础上
添加第二个JOB处理第一个JOB的运行结果
例子:
计算每人3个月的总收入并排序

第一个MR:计算每人的总收入
第二个MR:按照收入进行排序Mapper

四、combine 减轻 reducer压力

统计单词数量
若多个mapper 读取一份文件,每个mapper 对文件处理的结果为
hello 1
hello 1
...

最后发送到reducer 上进行合并处理,若文件数量很大,则reducer的压力很大

所以,在 mapper的处理过程中进行预处理,先进行合并
再发送到 reducer 上进行二次合并,此时只是对两个数据进行合并

1.Mapper
读取数据,分割,发送

2.Combine extends Reducer

3.Reducer
接收 combine 处理后的数据

4.Driver
job.serCombineClass();

不需对 combine 的输出 key value 进行设置

分享到:
评论

相关推荐

    基于MapReduce实现决策树算法

    4. 决策树算法在MapReduce中的优化:在基于MapReduce实现决策树算法中,需要对决策树算法进行优化,以提高计算速度和效率。例如,可以对决策树算法的计算过程进行并行化,对Mapper和Reducer的计算过程进行优化等。 ...

    实验项目 MapReduce 编程

    实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...

    mapreduce mapreduce mapreduce

    MapReduce是一种分布式计算模型,由Google开发,用于处理和生成大量数据。这个模型主要由两个主要阶段组成:Map(映射)和Reduce(规约)。MapReduce的核心思想是将复杂的大规模数据处理任务分解成一系列可并行执行...

    MapReduce综合案例(4个)

    4. QQ好友推荐案例: 在QQ这样的社交平台上,好友推荐是增加用户黏性的关键。Map阶段,处理用户的基本信息,如共同好友、兴趣爱好、在线时间等,生成键值对。Reduce阶段,通过计算用户间的相似度,找出可能有共同...

    Hadoop mapreduce实现wordcount

    4. **Shuffle & Sort**: Map 输出的键值对会被 Hadoop 自动进行分区、排序和合并。这一步是 MapReduce 的一个重要环节,确保相同键的值被聚集在一起,以便 Reduce 阶段处理。 5. **Reduce 阶段**: 在 Reduce ...

    MapReduce基础.pdf

    ### MapReduce基础知识详解 #### 一、MapReduce概述 **MapReduce** 是一种编程模型,最初由Google提出并在Hadoop中实现,用于处理大规模数据集的分布式计算问题。该模型的核心思想是将复杂的大型计算任务分解成较...

    学生mapreduce成绩分析

    MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。这个模型将复杂的计算任务分解成两个主要阶段:Map(映射)和Reduce(化简),使得在大规模分布式环境下处理大数据变得可能...

    MapReduce 设计模式

    4. Pig和Hive:这两个工具都与Hadoop紧密相关。Pig是一个高层次的数据流语言和执行框架,用于简化MapReduce程序的编写,而Hive提供了对大规模数据集进行查询和分析的简单数据仓库基础设施。 5. 数据总结模式...

    基于MapReduce的Apriori算法代码

    4. Mapper和Reducer:Mapper和Reducer是MapReduce框架中的两个主要组件,Mapper负责将输入数据集映射成键值对,Reducer负责聚合Mapper输出的键值对。 5. 并行计算:并行计算是指将计算任务分割成多个小任务,然后...

    hadoop mapreduce编程实战

    Hadoop MapReduce 编程实战 Hadoop MapReduce 是大数据处理的核心组件之一,它提供了一个编程模型和软件框架,用于大规模数据处理。下面是 Hadoop MapReduce 编程实战的知识点总结: MapReduce 编程基础 ...

    mapreduce项目 数据清洗

    4. **Hadoop生态系统**: MapReduce通常与Hadoop生态系统一起使用,Hadoop提供了一个分布式文件系统(HDFS)来存储大数据,以及YARN资源管理器来协调计算任务。在这个项目中,数据可能存储在HDFS上,由YARN调度执行...

    MapReduce发明人关于MapReduce的介绍

    ### MapReduce:大规模数据处理的简化利器 #### 引言:MapReduce的诞生与使命 在MapReduce问世之前,Google的工程师们,包括其发明者Jeffrey Dean和Sanjay Ghemawat,面临着一个共同的挑战:如何高效地处理海量...

    Hadoop原理与技术MapReduce实验

    (4)完成上课老师演示的内容 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 二、实验内容 1.单词计数实验(wordcount) (1)输入start-all.sh启动hadoop相应进程和相关的端口号 (2)...

    大数据 hadoop mapreduce 词频统计

    4. **运行Job**:配置好MapReduce作业后,提交到Hadoop集群进行执行。集群会自动调度任务,将工作分配给各个节点。 5. **结果收集**:MapReduce完成后,最终的词频统计结果会被写入HDFS,可以进一步进行可视化或...

    大数据实验四-MapReduce编程实践

    ### 大数据实验四-MapReduce编程实践 #### 一、实验内容与目的 ##### 实验内容概述 本次实验的主要内容是使用MapReduce框架来实现WordCount词频统计功能,即统计HDFS(Hadoop Distributed File System)系统中多个...

    kmeans(mapreduce)

    这个过程可以视为“更新”步骤,但因为MapReduce模型不支持原地更新,所以需要再次运行MapReduce作业,将新的质心作为输入,开始下一轮迭代。 4. **迭代过程**:重复上述过程,直到质心不再明显变化或者达到预设的...

    【MapReduce篇07】MapReduce之数据清洗ETL1

    MapReduce之数据清洗ETL详解 MapReduce是一种基于Hadoop的分布式计算框架,广泛应用于大数据处理领域。数据清洗(Data Cleaning)是数据处理过程中非常重要的一步,旨在清洁和转换原始数据,使其更加可靠和有用。...

    18、MapReduce的计数器与通过MapReduce读取-写入数据库示例

    MapReduce是一种分布式计算模型,由Google开发,广泛应用于大数据处理。在MapReduce中,计数器(Counter)是一个非常重要的工具,它允许开发者在MapReduce作业执行过程中收集和跟踪各种统计信息,帮助理解和优化程序...

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

Global site tag (gtag.js) - Google Analytics