对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现.
我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:
人员ID 人员名称 地址ID
1 张三 1
2 李四 2
3 王五 1
4 赵六 3
5 马七 3
另外一组为地址信息:
地址ID 地址名称
1 北京
2 上海
3 广州
这里给出了一个很简单的例子,而且数据量很小,就这么用眼睛就能看过来的几行,当然,实际的情况可能是几十万上百万甚至上亿的数据量.要实现的功能很简单,就是将人员信息与地址信息进行join,将人员的地址ID完善成为地址名称.对于Hadoop文件系统的应用,目前看来,很多数据的存储都是基于文本的,而且都是将数据放在一个文件目录中进行处理.因此我们这里也采用这种模式来完成.
对于mapreduce程序来说,最主要的就是将要做的工作转化为map以及reduce两个部分.我们可以将地址以及人员都采用同样的数据结构来存储,通过一个flag标志来指定该数据结构里面存储的是地址信息还是人员信息.经过map后,使用地址ID作为key,将所有的具有相同地址的地址信息和人员信息放入一个key->value list数据结构中传送到reduce中进行处理.在reduce过程中,由于key是地址的ID,所以value list中只有一个是地址信息,其他的都是人员信息,因此,找到该地址信息后,其他的人员信息的地址就是该地址所指定的地址名称.
OK,我们的join算法基本搞定啦.剩下就是编程实现了,let’s go.
上面提到了存储人员和地址信息的数据结构,可以说这个数据结构是改程序的重要的数据载体之一.我们先来看看该数据结构:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Record implements WritableComparable {
int type; //数据类型的定义,1为人员,2为地址
String empName="";
String empId="";
String locId="";
String locationName="";
public Record(){
super();
}
public Record(Record record){
this.type = record.type;
this.empName = record.empName;
this.empId = record.empId;
this.locId = record.locId;
this.locationName = record.locationName;
}
public String toString(){
if(type == 1)
return empId+","+empName+","+locationName;
else if(type == 2)
return locId+","+locationName;
return "uninit data!";
}
public void readFields(DataInput in) throws IOException {
type = in.readInt();
empName = in.readUTF();
empId = in.readUTF();
locId = in.readUTF();
locationName = in.readUTF();
}
public void write(DataOutput out) throws IOException {
out.writeInt(type);
out.writeUTF(empName);
out.writeUTF(empId);
out.writeUTF(locId);
out.writeUTF(locationName);
}
public int compareTo(Object arg0) {
return 0;
}
}
上面的Record的实现了WritableComparable,对于Mapreduce的中间结果类来说,必须要实现Writable,从而在map完成输出中间结果时能够将中间结果写入到运行job的node文件系统中,至于Comparable接口的实现,对于作为Key的中间结果来说需要实现该接口,从而能够完成基于key的排序功能.
接下来是Join的主程序,就是mapreduce的主程序.基本的主程序如下:
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
public class Join {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
JobConf conf = new JobConf(Join.class);
conf.setJobName("Join");
FileSystem fstm = FileSystem.get(conf);
Path outDir = new Path("/Users/hadoop/outputtest");
fstm.delete(outDir, true);
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapOutputValueClass(Record.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(JoinMapper.class);
conf.setReducerClass(JoinReducer.class);
FileInputFormat.setInputPaths(conf, new Path(
"/user/hadoop/input/join"));
FileOutputFormat.setOutputPath(conf, outDir);
JobClient.runJob(conf);
Path outPutFile = new Path(outDir, "part-00000");
SequenceFile.Reader reader = new SequenceFile.Reader(fstm, outPutFile,
conf);
org.apache.hadoop.io.Text numInside = new Text();
LongWritable numOutside = new LongWritable();
while (reader.next(numOutside, numInside)) {
System.out.println(numInside.toString() + " "
+ numOutside.toString());
}
reader.close();
}
}
程序主体很简单,开始将输出目录删除,中间进行一系列的JobConf设定工作,将输出格式设为SequenceFile,最后读出程序结果到控制台.接下来我们看看Mapper的实现:
import java.io.IOException;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.io.*;
public class JoinMapper extends MapReduceBase
implements Mapper<LongWritable, Text, LongWritable, Record> {
public void map(LongWritable key, Text value,
OutputCollector<LongWritable, Record> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] values = line.split(",");
if(values.length == 2){ //这里使用记录的长度来区别地址信息与人员信息,当然可以通过其他方式(如文件名等)来实现
Record reco = new Record();
reco.locId = values[0];
reco.type = 2;
reco.locationName = values[1];
output.collect(new LongWritable(Long.parseLong(values[0])), reco);
}else{
Record reco = new Record();
reco.empId = values[0];
reco.empName = values[1];
reco.locId = values[2];
reco.type = 1;
output.collect(new LongWritable(Long.parseLong(values[2])), reco);
}
}
}
对于maper来说,就是从输入文件中读取相应数据构造key->value(地址id->地址或者人员对象)的数据对,并交给hadoop框架完成shuffle等工作.经过hadoop框架完成suffle之后便会将具有想同地址ID的人员信息以及地址信息交给reducer来进行处理.
好啦,剩下就是最后一步了,其实也是最重要的一步就是reduce端的join工作了.还是来看看代码吧:
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class JoinReducer extends MapReduceBase implements
Reducer<LongWritable, Record, LongWritable, Text> {
public void reduce(LongWritable key, Iterator<Record> values,
OutputCollector<LongWritable, Text> output,
Reporter reporter) throws IOException {
System.out.println("reducer for key "+key.toString());
Record thisLocation= new Record();
List<Record> employees= new Vector<Record>();
while (values.hasNext()){
Record reco = values.next();
if(reco.type == 2){ //2 is the location
thisLocation = new Record(reco);
//thisLocation = reco;
System.out.println("location is "+ thisLocation.locationName);
}else{ //1 is employee
Record recoClone = new Record(reco);
employees.add(recoClone);
//employess.add(reco);
System.out.println(" employess "+ reco.toString());
}
}
for(Record e : employees){
e.locationName = thisLocation.locationName;
output.collect(new LongWritable(0), new Text(e.toString()));
}
System.out.println("+++++++++++++++");
}
}
在reducer端,我们先构造了一个地址对象,thisLocation用来保存地址信息.在reducer的迭代器values中,如果某个value是地址,就将其保存到thisLocation中.否则就将人员信息加入到List中以供后面打印.
这个reducer中有两点需要非常注意:
一,在while (values.hasNext())的循环中的thisLocation = new Record(reco)以及Record recoClone = new Record(reco)语句,我们不能直接保存reducer的迭代器中的对象,因为迭代器中每次返回的对象都是同一个Object,但是具有不同的值.注意,一定要注意.
二,这个是一个比较蹩脚的reduce实现,从程序中我们可以看到.我们用了一个List来保存某个地址ID的所有人员信息,对于一个非常巨大的应用来说,某个地址ID可能具有大于List长度的人员信息,这就会造成List溢出.下次对该程序进行优化从而能够避免该现象.
好啦,看看数据和程序的运行结果吧!
$ ./hadoop fs -cat input/join/names
1,张三,1
2,李四,2
3,王五,1
4,赵六,3
5,马七,3
$ ./hadoop fs -cat input/join/locations
1,北京
2,上海
3,广州
运行程序:
09/11/20 21:44:09 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
09/11/20 21:44:10 INFO mapred.FileInputFormat: Total input paths to process : 2
09/11/20 21:44:11 INFO mapred.JobClient: Running job: job_200911202139_0001
09/11/20 21:44:12 INFO mapred.JobClient: map 0% reduce 0%
09/11/20 21:44:24 INFO mapred.JobClient: map 33% reduce 0%
09/11/20 21:44:26 INFO mapred.JobClient: map 66% reduce 0%
09/11/20 21:44:28 INFO mapred.JobClient: map 100% reduce 0%
09/11/20 21:44:35 INFO mapred.JobClient: map 100% reduce 100%
09/11/20 21:44:36 INFO mapred.JobClient: Job complete: job_200911202139_0001
09/11/20 21:44:37 INFO mapred.JobClient: Counters: 16
09/11/20 21:44:37 INFO mapred.JobClient: File Systems
09/11/20 21:44:37 INFO mapred.JobClient: HDFS bytes read=97
09/11/20 21:44:37 INFO mapred.JobClient: HDFS bytes written=246
09/11/20 21:44:37 INFO mapred.JobClient: Local bytes read=243
09/11/20 21:44:37 INFO mapred.JobClient: Local bytes written=582
09/11/20 21:44:37 INFO mapred.JobClient: Job Counters
09/11/20 21:44:37 INFO mapred.JobClient: Launched reduce tasks=1
分享到:
相关推荐
在Hadoop MapReduce中,数据处理的核心任务之一就是JOIN操作,它相当于关系数据库中的连接操作,用于合并来自不同数据源的相关信息。本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码...
本课程设计主要围绕如何使用Hadoop的MapReduce实现SQL中的统计、GROUP BY和JOIN操作,这是一次深入理解大数据处理机制的实践过程。 首先,让我们来探讨SQL的统计功能。在SQL中,统计通常涉及到COUNT、SUM、AVG、MAX...
在大数据的浪潮中,Hadoop成为了应对海量数据处理的关键技术之一。Hadoop是一个开源的分布式计算框架,由Apache基金会开发,旨在提供高可靠性和高扩展性的数据处理能力。它的核心组件包括HDFS(Hadoop Distributed ...
《基于Hadoop Spark奥运会奖牌变化大数据分析实现毕业源码案例设计》 在这个项目中,我们探讨了如何利用Hadoop和Spark两大核心技术进行大规模数据处理和分析,具体应用于奥运会奖牌变化的历史数据。Hadoop是Apache...
然而,join操作因其复杂性和数据分布的特点,在Hadoop中实现起来较为困难。具体而言: 1. **数据分布与倾斜问题**:在分布式环境中,数据的不均匀分布会导致join操作性能下降。 2. **MapReduce的局限性**:Hadoop的...
本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一过程。 Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop ...
1. **Hadoop数据仓库Hive**:Hive是由Facebook开发的一种基于Hadoop的数据仓库工具,它允许SQL熟悉的用户对存储在Hadoop分布式文件系统(HDFS)上的大规模数据进行分析。Hive将结构化的数据文件映射为数据库表,提供了...
- 这个应用程序很可能是一个示例,演示了如何在Hadoop MapReduce中实现多表关联并处理Job间的依赖和参数传递。它可能包括多个Job,每个Job负责一个或多个表的处理,并通过特定机制将结果传递给后续的Job。 5. **...
MapReduce是Hadoop的核心组件之一,主要用于分布式数据处理。它采用Map(映射)和Reduce(归约)的编程模型来处理大规模数据集。MapReduce高级特性包括计数器、排序和数据集连接等,这些特性能够进一步提升数据处理...
《Hadoop MapReduce Cookbook》是一本专为大数据处理和分析领域的专业人士编写的指南,它深入浅出地介绍了如何使用Hadoop MapReduce框架解决实际问题。MapReduce是Hadoop生态系统中的核心组件,它允许用户在分布式...
在Hadoop MapReduce中,Reduce Join是一种实现大规模数据集间连接的高效方法。本文将探讨Reduce Join的工作原理,以及如何利用MRV2(MapReduce v2)API对它进行重写。 首先,我们来理解什么是Reduce Join。在关系...
根据提供的《Hadoop 数据分析平台》课程毕业测试题的相关信息,我们可以提炼出一系列与Hadoop相关的知识点,这些知识点不仅能够帮助学生更好地理解Hadoop的工作原理和技术细节,还能够加深他们对大数据处理技术的...
该套源码是个人学习Hadoop HDFS和MapReduce技术的实践案例集合,采用Java语言编写,包含45个文件,涵盖34个Java源文件、4个XML配置文件、3个偏好设置文件以及1个Git忽略文件等。内容涵盖HDFS的JAVA API操作,如文件...
综上所述,MapReduce应用案例文档深入地介绍了MapReduce编程模型在Hadoop生态系统中的实际使用,包括对join操作的细节分析,以及如何搭建Hadoop环境,如何上传和管理测试数据。此外,文档还提供了Hadoop学习资源的...
在关系型数据库中 join是非常常见的操作,各种优化手段已经到了极致。...本文对Hadoop中最基本的join方法进行简单介绍,这也是其它许多方法和优化措施的基础。文中所采用的例子来自于《 HadoopinAc
MapReduce源码分析(主要四大模块,其他表示父目录下的.java文件的总称):1.org.apache.hadoop.mapred(旧版MapReduceAPI):( 1).jobcontrol(job作业直接控制类)(2 ).join :(作业作业中用于模仿数据连接处理...
在大数据处理领域,Hadoop MapReduce 是一种广泛使用的分布式计算框架。在处理涉及多数据集的任务时,Join 操作是必不可少的,它用于合并来自不同数据源的相关数据。本文主要探讨了两种 MapReduce 中的 Join 实现:...
具体要掌握的五个主题分别是:大数据分布式集群搭建(高可用性,HA),构建企业级MapReduce项目,Hadoop和Spark的源码编译,以及Zookeeper和MapReduce的高级Join操作。 描述部分列举了一些具体的知识点,包含搭建...