package com.hadoop.sample;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MTJoin {
private static int time = 0;
public static class Map extends Mapper<Object,Text,Text,Text>{
//在map中先区分输入行属于左表还是右表,然后对两列值进行分割,
//保存连接列在key值,剩余列和左右表标志在value中,最后输出
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
int i = 0;
//输入文件首行,不处理
if(line.contains("factoryname")==true||line.contains("addressID")==true){
return;
}
//找出数据中的分割点
while(line.charAt(i)>='9'||line.charAt(i)<='0'){
i++;
}
if(line.charAt(i)>='9'||line.charAt(i)<='0'){
//左表
int j = i-1;
while(line.charAt(j)!=' ') j--;
String[] values = {line.substring(0, j),line.substring(i)};
context.write(new Text(values[1]), new Text("1+"+values[0]));
}else{//右表
int j = i+1;
while(line.charAt(j)!=' ') j++;
String[] values = {line.substring(0, i+1),line.substring(j)};
context.write(new Text(values[0]), new Text("2+"+values[1]));
}
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>{
//reduce解析map输出,将value中数据按照左右表分别保存,然后求笛卡尔积,输出
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
if(time == 0){//输入文件第一行
context.write(new Text("factoryname"),new Text("addressname"));
time++;
}
int factorynum = 0;
String factory[] = new String[10];
int adressnum = 0;
String adress[] = new String[10];
Iterator iter = values.iterator();
while(iter.hasNext()){
String record = iter.next().toString();
int len = record.length();
int i = 2;
char type = record.charAt(0);
String factoryname = new String();
String adressname = new String();
if(type == '1'){//左表
factory[factorynum] = record.substring(2);
factorynum++;
}else{//右表
adress[adressnum] = record.substring(2);
}
}
if(factorynum!=0&&adressnum!=0){//笛卡尔积
for(int m=0;m<factorynum;m++){
for(int n=0;n<adressnum;n++){
context.write(new Text(factory[m]), new Text(adress[n]));
}
}
}
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage WordCount <int> <out>");
System.exit(2);
}
Job job = new Job(conf,"word count");
job.setJarByClass(MTJoin.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
分享到:
相关推荐
《Hadoop MapReduce Cookbook》是一本专注于Hadoop MapReduce技术实践的书籍,其附带的示例代码旨在帮助读者深入理解和应用MapReduce编程模型。在这个压缩包中,我们看到的是书中的实例代码,名为“Hadoop-MapReduce...
标签"windows hadoop"进一步强调了这个资源与Windows平台和Hadoop技术的关联性。 关于压缩包内的文件"window10-hadoop-2.7.2.zip",这可能是一个解压后包含Hadoop源码、编译脚本、配置文件、文档、可执行文件等的...
1. src:存放MapReduce程序源代码。 2. conf:配置文件,包括Hadoop集群的配置信息。 3. data:原始数据或处理后的中间数据。 4. logs:运行过程中的日志文件。 5. scripts:执行脚本,如数据导入、分析任务的启动...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。...通过深入研究其源代码,不仅可以提升Hadoop技能,还能了解如何利用Perl这样的脚本语言与Hadoop集成,为大数据工作流添加更多可能性。
它支持通过Spring的依赖注入来管理Hadoop的客户端实例,允许开发者通过声明式的方式定义MapReduce作业,极大地提高了开发效率和代码可维护性。此外,它还提供了对Hadoop生态系统其他组件如YARN、Hive、Pig等的集成,...
MapReduce 的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。 MapReduce 的优点包括: 1. 易于编程:MapReduce 提供了一个简单的编程模型,...
用户无需管理硬件或安装Hadoop集群,只需专注于编写MapReduce代码。 2. **Java API for MapReduce**: Hadoop MapReduce使用Java作为其主要编程语言,提供了丰富的API供开发者使用。在这个示例中,我们需要创建...
在这个项目中,MapReduce代码用于处理气象数据,可能包括温度、湿度、风速等各项指标,通过对这些数据的分析,可以挖掘出天气模式、气候变化等有价值的信息。例如,可能会统计特定区域的历史平均气温,找出高温或...
1. 市场篮分析(Market Basket Analysis):利用Hadoop和Spark处理大量交易数据,发现购买商品之间的关联规则。 2. 数据挖掘算法:书中探讨了K-means、KNN(K最近邻算法)和朴素贝叶斯等常用的数据挖掘技术,并介绍...
人工智能-项目实践-聚类-基于MapReduce爬虫,可抽取各大新闻网站的新闻正文并进行分类和聚类 这是一个基于Hadoop的分布式爬虫,目前只支持抓取腾讯新闻中心的新闻内容。支持插件机制,可以通过实现Extractor接口自己...
在Hadoop生态系统中,HDFS(Hadoop Distributed File System)是核心组件之一,它提供了分布式存储功能,使得大数据处理成为可能。Hadoop2.2.0版本是一个重要的里程碑,引入了诸多改进和优化,增强了系统的稳定性和...
基于MapReduce的Apriori算法代码是一个使用Hadoop MapReduce框架实现的关联规则挖掘算法,称为Apriori算法。Apriori算法是一种经典的关联规则挖掘算法,用于发现事务数据库中频繁出现的项集。该算法的主要思想是生成...
- **YARN架构**:作为Hadoop 2.0的核心组件之一,YARN为MapReduce提供了资源管理和调度的功能。 - **Hadoop生态系统集成**:讲解如何与其他Hadoop组件(如Hive、Pig等)配合使用,构建更强大的数据处理流水线。 ###...
在深入理解并使用Hadoop框架时,将Hadoop源码关联到Eclipse工程中是十分有益的。这不仅能够帮助我们更好地理解Hadoop的内部工作机制,还便于进行二次开发和调试。下面我们将详细讲解如何一步步地将Hadoop源码导入到...
MapReduce是大数据处理的一种核心工具,常与Hadoop生态系统一起使用,能高效地处理大规模数据。MySQL作为关系型数据库,用于存储用户信息、电影数据以及用户行为记录。JSP(JavaServer Pages)则用于生成动态网页,...
Hadoop是Apache基金会的一个开源项目,它提供了一个开放源代码版本的MapReduce框架以及其他相关组件,如HDFS(Hadoop Distributed File System)用于存储数据,YARN(Yet Another Resource Negotiator)用于资源管理...
相比于传统数据库,MapReduce能够更加高效地处理大规模数据集,特别是在涉及多表关联的情况下。此外,实验中还学习到了如何使用Python进行数据预处理,以及如何在Eclipse中集成Hadoop进行开发调试。这些技能对于未来...
标题中的"hadoop-training-map-reduce-example-4"表明这是一个关于Hadoop MapReduce的教程实例,很可能是第四个阶段或示例。Hadoop是Apache软件基金会的一个开源项目,它提供了分布式文件系统(HDFS)和MapReduce...