转载请注明:http://hanlaiming.freetzi.com/?p=123
在mapreduce上编写简单应用后,开始学习稍微高级一点的单表关联和多表关联。
在学习过程中我参考了这篇文章,谢谢http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html,里面很多基本的内容很实用。
一、单表关联。
实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。
样例输入如下所示。
- file:child parent
- Tom Lucy
- Tom Jack
- Jone Lucy
- Jone Jack
- Lucy Mary
- Lucy Ben
- Jack Alice
- Jack Jesse
- Terry Alice
- Terry Jesse
- Philip Terry
- Philip Alma
- Mark Terry
- Mark Alma
-
- 家族树状关系谱:
家族谱
样例输出如下所示。
- file:grandchild grandparent
- Tom Alice
- Tom Jesse
- Jone Alice
- Jone Jesse
- Tom Mary
- Tom Ben
- Jone Mary
- Jone Ben
- Philip Alice
- Philip Jesse
- Mark Alice
- Mark Jesse
设计思路
分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表。
连接结果中除去连接的两列就是所需要的结果——"grandchild--grandparent"表。要用MapReduce解决这个实例,首先应该考虑如何实现表的自连接;其次就是连接列的设置;最后是结果的整理。
考虑到MapReduce的shuffle过程会将相同的key会连接在一起,所以可以将map结果的key设置成待连接的列,然后列中相同的值就自然会连接在一起了。再与最开始的分析联系起来:
要连接的是左表的parent列和右表的child列,且左表和右表是同一个表,所以在map阶段将读入数据分割成child和parent之后,会将parent设置成key,child设置成value进行输出,并作为左表;再将同一对child和parent中的child设置成key,parent设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"grandchild--grandparent"关系。取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。
代码实现:
- import java.io.IOException;
- import java.util.*;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- public class STjoin {
- public static int time = 0;
- /*
- * map将输出分割child和parent,然后正序输出一次作为右表,
- * 反序输出一次作为左表,需要注意的是在输出的value中必须
- * 加上左右表的区别标识。
- */
- public static class Map extends Mapper<Object, Text, Text, Text> {
- // 实现map函数
- public void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- String childname = new String();// 孩子名称
- String parentname = new String();// 父母名称
- String relationtype = new String();// 左右表标识
- // 输入的一行预处理文本
- StringTokenizer itr=new StringTokenizer(value.toString());
- String[] values=new String[2];
- int i=0;
- while(itr.hasMoreTokens()){
- values[i]=itr.nextToken();
- i++;
- }
- if (values[0].compareTo("child") != 0) {
- childname = values[0];
- parentname = values[1];
- // 输出左表
- relationtype = "1";
- context.write(new Text(values[1]), new Text(relationtype +
- "+"+ childname + "+" + parentname));
- // 输出右表
- relationtype = "2";
- context.write(new Text(values[0]), new Text(relationtype +
- "+"+ childname + "+" + parentname));
- }
- }
- }
- public static class Reduce extends Reducer<Text, Text, Text, Text> {
- // 实现reduce函数
- public void reduce(Text key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- // 输出表头
- if (0 == time) {
- context.write(new Text("grandchild"), new Text("grandparent"));
- time++;
- }
- int grandchildnum = 0;
- String[] grandchild = new String[10];
- int grandparentnum = 0;
- String[] grandparent = new String[10];
- Iterator ite = values.iterator();
- while (ite.hasNext()) {
- String record = ite.next().toString();
- int len = record.length();
- int i = 2;
- if (0 == len) {
- continue;
- }
- // 取得左右表标识
- char relationtype = record.charAt(0);
- // 定义孩子和父母变量
- String childname = new String();
- String parentname = new String();
- // 获取value-list中value的child
- while (record.charAt(i) != '+') {
- childname += record.charAt(i);
- i++;
- }
- i = i + 1;
- // 获取value-list中value的parent
- while (i < len) {
- parentname += record.charAt(i);
- i++;
- }
- // 左表,取出child放入grandchildren
- if ('1' == relationtype) {
- grandchild[grandchildnum] = childname;
- grandchildnum++;
- }
- // 右表,取出parent放入grandparent
- if ('2' == relationtype) {
- grandparent[grandparentnum] = parentname;
- grandparentnum++;
- }
- }
- // grandchild和grandparent数组求笛卡尔儿积
- if (0 != grandchildnum && 0 != grandparentnum) {
- for (int m = 0; m < grandchildnum; m++) {
- for (int n = 0; n < grandparentnum; n++) {
- // 输出结果
- context.write(new Text(grandchild[m]), new Text(grandparent[n]));
- }
- }
- }
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: Single Table Join <in> <out>");
- System.exit(2);
- }
- Job job = new Job(conf, "Single Table Join");
- job.setJarByClass(STjoin.class);
- // 设置Map和Reduce处理类
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
- // 设置输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- // 设置输入和输出目录
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
二、多表关联
输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。
样例输入如下所示。
- 1)factory:
- factoryname addressed
- Beijing Red Star 1
- Shenzhen Thunder 3
- Guangzhou Honda 2
- Beijing Rising 1
- Guangzhou Development Bank 2
- Tencent 3
- Back of Beijing 1
-
- 2)address:
- addressID addressname
- 1 Beijing
- 2 Guangzhou
- 3 Shenzhen
- 4 Xian
-
- 样例输出如下所示。
- factoryname addressname
- Back of Beijing Beijing
- Beijing Red Star Beijing
- Beijing Rising Beijing
- Guangzhou Development Bank Guangzhou
- Guangzhou Honda Guangzhou
- Shenzhen Thunder Shenzhen
- Tencent Shenzhen
多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。
代码实现:
- import java.io.IOException;
- import java.util.*;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- public class MTjoin {
- public static int time = 0;
- /*
- * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割,
- * 保存连接列在key值,剩余列和左右表标志在value中,最后输出
- */
- public static class Map extends Mapper<Object, Text, Text, Text> {
- // 实现map函数
- public void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- String line = value.toString();// 每行文件
- String relationtype = new String();// 左右表标识
- // 输入文件首行,不处理
- if (line.contains("factoryname") == true
- || line.contains("addressed") == true) {
- return;
- }
- // 输入的一行预处理文本
- StringTokenizer itr = new StringTokenizer(line);
- String mapkey = new String();
- String mapvalue = new String();
- int i = 0;
- while (itr.hasMoreTokens()) {
- // 先读取一个单词
- String token = itr.nextToken();
- // 判断该地址ID就把存到"values[0]"
- if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {
- mapkey = token;
- if (i > 0) {
- relationtype = "1";
- } else {
- relationtype = "2";
- }
- continue;
- }
- // 存工厂名
- mapvalue += token + " ";
- i++;
- }
- // 输出左右表
- context.write(new Text(mapkey), new Text(relationtype + "+"+ mapvalue));
- }
- }
- /*
- * reduce解析map输出,将value中数据按照左右表分别保存,
- * 然后求出笛卡尔积,并输出。
- */
- public static class Reduce extends Reducer<Text, Text, Text, Text> {
- // 实现reduce函数
- public void reduce(Text key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- // 输出表头
- if (0 == time) {
- context.write(new Text("factoryname"), new Text("addressname"));
- time++;
- }
- int factorynum = 0;
- String[] factory = new String[10];
- int addressnum = 0;
- String[] address = new String[10];
- Iterator ite = values.iterator();
- while (ite.hasNext()) {
- String record = ite.next().toString();
- int len = record.length();
- int i = 2;
- if (0 == len) {
- continue;
- }
- // 取得左右表标识
- char relationtype = record.charAt(0);
- // 左表
- if ('1' == relationtype) {
- factory[factorynum] = record.substring(i);
- factorynum++;
- }
- // 右表
- if ('2' == relationtype) {
- address[addressnum] = record.substring(i);
- addressnum++;
- }
- }
- // 求笛卡尔积
- if (0 != factorynum && 0 != addressnum) {
- for (int m = 0; m < factorynum; m++) {
- for (int n = 0; n < addressnum; n++) {
- // 输出结果
- context.write(new Text(factory[m]),
- new Text(address[n]));
- }
- }
- }
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: Multiple Table Join <in> <out>");
- System.exit(2);
- }
- Job job = new Job(conf, "Multiple Table Join");
- job.setJarByClass(MTjoin.class);
- // 设置Map和Reduce处理类
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
- // 设置输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- // 设置输入和输出目录
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
分享到:
相关推荐
- 这个应用程序很可能是一个示例,演示了如何在Hadoop MapReduce中实现多表关联并处理Job间的依赖和参数传递。它可能包括多个Job,每个Job负责一个或多个表的处理,并通过特定机制将结果传递给后续的Job。 5. **...
在运行WordCount程序时,输入了自定义的数据文件,模拟工厂和地址的对应关系,通过Hadoop的MapReduce功能,找出并输出工厂名与地址名的关联列表,按照工厂名排序。 **实验结果分析:** 通过实验,学生成功实现了...
总的来说,基于Hadoop的倒排索引实现是一个结合了分布式计算和高效数据结构的优秀实践,它展示了如何利用MapReduce模型解决大数据场景下的文本检索问题。通过理解这一过程,开发者可以更好地运用Hadoop来处理复杂的...
《基于Hadoop实现的关联规则挖掘在图书数据分析推荐系统中的应用》 图书数据分析推荐系统是当前数字图书馆和在线书店中广泛应用的技术,它旨在通过分析用户的阅读习惯和偏好,为用户提供个性化的图书推荐。本项目...
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。Storm是一个分布式的、容错的实时计算系统。两者整合,优势互补。
《基于Hadoop实现朴素贝叶斯文本分类器》 在当今大数据时代,高效的数据处理和分析成为了企业竞争力的关键。Hadoop作为开源的分布式计算框架,以其高容错性、可扩展性和低成本的优势,广泛应用于海量数据的处理。而...
它使得开发者可以直接查看源码,了解Hadoop的内部实现,从而更好地优化和定制自己的Hadoop应用。记住,理解源码对于任何开源技术的学习都是至关重要的。现在,你已经具备了将Hadoop源码与Eclipse集成的能力,可以...
【标题】中的“基于hadoop实现的图书推荐系统”指的是使用Hadoop这一开源大数据处理框架来构建一个能够为用户推荐图书的系统。Hadoop是Apache软件基金会开发的分布式存储和计算平台,它允许高效地处理海量数据,尤其...
### 基于Hadoop的Apriori算法设计与实现 #### 一、背景与问题概述 随着信息技术的发展,海量数据的处理与分析变得日益重要。传统的数据挖掘算法和技术已经难以满足当前的需求,尤其是在处理多维度、含有大量噪声的...
本教程将深入探讨如何利用Hadoop集群和Python实现词频统计,这是一个经典的WordCount示例,适合初学者入门。在这个项目中,我们将通过Python编写MapReduce程序,并在Ubuntu系统上配置的Hadoop集群上运行它。 首先,...
在大数据处理领域,Hadoop是一个不可或缺的开源框架,主要用于分布式存储和计算。本文将深入探讨与"Hadoop.dll"和"winutils.exe...在实际操作中,确保安全配置和充分的测试将有助于实现高效、可靠的Hadoop解决方案。
Hadoop的分布式文件系统(HDFS)可以将数据分布在多台廉价服务器上,确保高可用性和容错性。而MapReduce则是一种编程模型,用于大规模数据集的并行计算,使得处理大量数据变得高效且易于管理。 接下来是Hive,它是...
Hadoop 2.6是Hadoop的一个重要版本,它包含了多项改进和优化,以提高性能和稳定性。在Windows 7 64位操作系统上配置和运行Hadoop可能会遇到一些挑战,因为Hadoop最初是为Linux设计的。不过,通过一些特定的工具和...
本实验是关于如何在Hadoop平台上并行实现经典的Apriori算法,这是一门重要的数据挖掘技术,主要用于关联规则学习。下面将详细阐述Apriori算法的基本原理、并行化实现的关键点以及在Hadoop上的应用。 Apriori算法是...
本项目“基于Hadoop的推荐系统简单实现”旨在利用Hadoop的强大功能来构建一个基本的推荐系统,该系统能根据用户的历史行为和兴趣进行个性化推荐。 推荐系统的核心目标是预测用户对未接触过的项目或内容的喜好程度,...