- 浏览: 61302 次
- 性别:
- 来自: 北京
最新评论
-
scu_cxh:
您好,我在学习hadoop方面的东西,想做一个对task监控的 ...
JobClient应用概述 -
bennie19870116:
看不到图呢...
Eclipse下配置使用Hadoop插件
一、背景
早在8月份的时候,我就做了一些MR的Join查询,但是发现回北京之后,2个月不用,居然有点生疏,所以今天早上又花时间好好看了一下,顺便写下这个文档,以供以后查阅。
二、环境
JDK 1.6、Linux操作系统、hadoop0.20.2
三、资料数据
在做这个Join查询的时候,必然涉及数据,我这里设计了2张表,分别较data.txt和info.txt,字段之间以\t划分。
data.txt内容如下:
201001 1003 abc
201002 1005 def
201003 1006 ghi
201004 1003 jkl
201005 1004 mno
201006 1005 pqr
info.txt内容如下:
1003 kaka
1004 da
1005 jue
1006 zhao
期望输出结果:
1003 201001 abc kaka
1003 201004 jkl kaka
1004 201005 mno da
1005 201002 def jue
1005 201006 pqr jue
1006 201003 ghi zhao
四、Map代码
首先是map的代码,我贴上,然后简要说说
public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取输入文件的全路径和名称
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
if (pathName.contains("data.txt")) {
String values[] = value.toString().split("\t");
if (values.length < 3) {
// data数据格式不规范,字段小于3,抛弃数据
return;
} else {
// 数据格式规范,区分标识为1
TextPair tp = new TextPair(new Text(values[1]), new Text("1"));
context.write(tp, new Text(values[0] + "\t" + values[2]));
}
}
if (pathName.contains("info.txt")) {
String values[] = value.toString().split("\t");
if (values.length < 2) {
// data数据格式不规范,字段小于2,抛弃数据
return;
} else {
// 数据格式规范,区分标识为0
TextPair tp = new TextPair(new Text(values[0]), new Text("0"));
context.write(tp, new Text(values[1]));
}
}
}
}
这里需要注意以下部分:
A、pathName是文件在HDFS中的全路径(例如:hdfs://M1:9000/dajuezhao/join/data/info.txt),可以以endsWith()的方法来判断。
B、资料表,也就是这里的info.txt需要放在前面,也就是标识号是0.否则无法输出理想结果。
C、Map执行完成之后,输出的中间结果如下:
1003,1 201001 abc
1003,1 201004 jkl
1004,1 201005 mon
1005,1 201002 def
1005,1 201006 pqr
1006,1 201003 ghi
1003,0 kaka
1004,0 da
1005,0 jue
1006,0 zhao
五、分区和分组
1、map之后的输出会进行一些分区的操作,代码贴出来:
public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numParititon) {
return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
}
}
分区我在以前的文档中写过,这里不做描述了,就说是按照map输出的符合key的第一个字段做分区关键字。分区之后,相同key会划分到一个reduce中去处理(如果reduce设置是1,那么就是分区有多个,但是还是在一个reduce中处理。但是结果会按照分区的原则排序)。分区后结果大致如下:
同一区:
1003,1 201001 abc
1003,1 201004 jkl
1003,0 kaka
同一区:
1004,1 201005 mon
1004,0 da
同一区:
1005,1 201002 def
1005,1 201006 pqr
1005,0 jue
同一区:
1006,1 201003 ghi
1006,0 zhao
2、分组操作,代码如下
public static class Example_Join_01_Comparator extends WritableComparator {
public Example_Join_01_Comparator() {
super(TextPair.class, true);
}
@SuppressWarnings("unchecked")
public int compare(WritableComparable a, WritableComparable b) {
TextPair t1 = (TextPair) a;
TextPair t2 = (TextPair) b;
return t1.getFirst().compareTo(t2.getFirst());
}
}
分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,是按照复合key的第一个字段做分组原则。输出后结果如下:
同一组:
1003,0 kaka
1003,0 201001 abc
1003,0 201004 jkl
同一组:
1004,0 da
1004,0 201005 mon
同一组:
1005,0 jue
1005,0 201002 def
1005,0 201006 pqr
同一组:
1006,0 zhao
1006,0 201003 ghi
六、reduce操作
贴上代码如下:
public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {
protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
Text pid = key.getFirst();
String desc = values.iterator().next().toString();
while (values.iterator().hasNext()) {
context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc));
}
}
}
1、代码比较简单,首先获取关键的ID值,就是key的第一个字段。
2、获取公用的字段,通过排组织后可以看到,一些共有字段是在第一位,取出来即可。
3、遍历余下的结果,输出。
七、其他的支撑代码
1、首先是TextPair代码,没有什么可以细说的,贴出来:
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
2、Job的入口函数
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
String[] otherArgs = parser.getRemainingArgs();
if (agrs.length < 3) {
System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");
System.exit(2);
}
//conf.set("hadoop.job.ugi", "root,hadoop");
Job job = new Job(conf, "Example_Join_01");
// 设置运行的job
job.setJarByClass(Example_Join_01.class);
// 设置Map相关内容
job.setMapperClass(Example_Join_01_Mapper.class);
// 设置Map的输出
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
// 设置partition
job.setPartitionerClass(Example_Join_01_Partitioner.class);
// 在分区之后按照指定的条件分组
job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
// 设置reduce
job.setReducerClass(Example_Join_01_Reduce.class);
// 设置reduce的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出的目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
// 执行,直到结束就退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
八、总结
1、这是个简单的join查询,可以看到,我在处理输入源的时候是在map端做来源判断。其实在0.19可以用MultipleInputs.addInputPath()的方法,但是它用了JobConf做参数。这个方法原理是多个数据源就采用多个map来处理。方法各有优劣。
2、对于资源表,如果我们采用0和1这样的模式来区分,资源表是需要放在前的。例如本例中info.txt就是资源表,所以标识位就是0.如果写为1的话,可以试下,在分组之后,资源表对应的值放在了迭代器最后一位,无法追加在最后所有的结果集合中。
3、关于分区,并不是所有的map都结束才开始的,一部分数据完成就会开始执行。同样,分组操作在一个分区内执行,如果分区完成,分组将会开始执行,也不是等所有分区完成才开始做分组的操作。
4、有疑问或是写的不对的地方,欢迎大家发邮件沟通交流:dajuezhao@gmail.com
早在8月份的时候,我就做了一些MR的Join查询,但是发现回北京之后,2个月不用,居然有点生疏,所以今天早上又花时间好好看了一下,顺便写下这个文档,以供以后查阅。
二、环境
JDK 1.6、Linux操作系统、hadoop0.20.2
三、资料数据
在做这个Join查询的时候,必然涉及数据,我这里设计了2张表,分别较data.txt和info.txt,字段之间以\t划分。
data.txt内容如下:
201001 1003 abc
201002 1005 def
201003 1006 ghi
201004 1003 jkl
201005 1004 mno
201006 1005 pqr
info.txt内容如下:
1003 kaka
1004 da
1005 jue
1006 zhao
期望输出结果:
1003 201001 abc kaka
1003 201004 jkl kaka
1004 201005 mno da
1005 201002 def jue
1005 201006 pqr jue
1006 201003 ghi zhao
四、Map代码
首先是map的代码,我贴上,然后简要说说
public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取输入文件的全路径和名称
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
if (pathName.contains("data.txt")) {
String values[] = value.toString().split("\t");
if (values.length < 3) {
// data数据格式不规范,字段小于3,抛弃数据
return;
} else {
// 数据格式规范,区分标识为1
TextPair tp = new TextPair(new Text(values[1]), new Text("1"));
context.write(tp, new Text(values[0] + "\t" + values[2]));
}
}
if (pathName.contains("info.txt")) {
String values[] = value.toString().split("\t");
if (values.length < 2) {
// data数据格式不规范,字段小于2,抛弃数据
return;
} else {
// 数据格式规范,区分标识为0
TextPair tp = new TextPair(new Text(values[0]), new Text("0"));
context.write(tp, new Text(values[1]));
}
}
}
}
这里需要注意以下部分:
A、pathName是文件在HDFS中的全路径(例如:hdfs://M1:9000/dajuezhao/join/data/info.txt),可以以endsWith()的方法来判断。
B、资料表,也就是这里的info.txt需要放在前面,也就是标识号是0.否则无法输出理想结果。
C、Map执行完成之后,输出的中间结果如下:
1003,1 201001 abc
1003,1 201004 jkl
1004,1 201005 mon
1005,1 201002 def
1005,1 201006 pqr
1006,1 201003 ghi
1003,0 kaka
1004,0 da
1005,0 jue
1006,0 zhao
五、分区和分组
1、map之后的输出会进行一些分区的操作,代码贴出来:
public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numParititon) {
return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
}
}
分区我在以前的文档中写过,这里不做描述了,就说是按照map输出的符合key的第一个字段做分区关键字。分区之后,相同key会划分到一个reduce中去处理(如果reduce设置是1,那么就是分区有多个,但是还是在一个reduce中处理。但是结果会按照分区的原则排序)。分区后结果大致如下:
同一区:
1003,1 201001 abc
1003,1 201004 jkl
1003,0 kaka
同一区:
1004,1 201005 mon
1004,0 da
同一区:
1005,1 201002 def
1005,1 201006 pqr
1005,0 jue
同一区:
1006,1 201003 ghi
1006,0 zhao
2、分组操作,代码如下
public static class Example_Join_01_Comparator extends WritableComparator {
public Example_Join_01_Comparator() {
super(TextPair.class, true);
}
@SuppressWarnings("unchecked")
public int compare(WritableComparable a, WritableComparable b) {
TextPair t1 = (TextPair) a;
TextPair t2 = (TextPair) b;
return t1.getFirst().compareTo(t2.getFirst());
}
}
分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,是按照复合key的第一个字段做分组原则。输出后结果如下:
同一组:
1003,0 kaka
1003,0 201001 abc
1003,0 201004 jkl
同一组:
1004,0 da
1004,0 201005 mon
同一组:
1005,0 jue
1005,0 201002 def
1005,0 201006 pqr
同一组:
1006,0 zhao
1006,0 201003 ghi
六、reduce操作
贴上代码如下:
public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {
protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
Text pid = key.getFirst();
String desc = values.iterator().next().toString();
while (values.iterator().hasNext()) {
context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc));
}
}
}
1、代码比较简单,首先获取关键的ID值,就是key的第一个字段。
2、获取公用的字段,通过排组织后可以看到,一些共有字段是在第一位,取出来即可。
3、遍历余下的结果,输出。
七、其他的支撑代码
1、首先是TextPair代码,没有什么可以细说的,贴出来:
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
2、Job的入口函数
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
String[] otherArgs = parser.getRemainingArgs();
if (agrs.length < 3) {
System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");
System.exit(2);
}
//conf.set("hadoop.job.ugi", "root,hadoop");
Job job = new Job(conf, "Example_Join_01");
// 设置运行的job
job.setJarByClass(Example_Join_01.class);
// 设置Map相关内容
job.setMapperClass(Example_Join_01_Mapper.class);
// 设置Map的输出
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
// 设置partition
job.setPartitionerClass(Example_Join_01_Partitioner.class);
// 在分区之后按照指定的条件分组
job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
// 设置reduce
job.setReducerClass(Example_Join_01_Reduce.class);
// 设置reduce的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出的目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
// 执行,直到结束就退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
八、总结
1、这是个简单的join查询,可以看到,我在处理输入源的时候是在map端做来源判断。其实在0.19可以用MultipleInputs.addInputPath()的方法,但是它用了JobConf做参数。这个方法原理是多个数据源就采用多个map来处理。方法各有优劣。
2、对于资源表,如果我们采用0和1这样的模式来区分,资源表是需要放在前的。例如本例中info.txt就是资源表,所以标识位就是0.如果写为1的话,可以试下,在分组之后,资源表对应的值放在了迭代器最后一位,无法追加在最后所有的结果集合中。
3、关于分区,并不是所有的map都结束才开始的,一部分数据完成就会开始执行。同样,分组操作在一个分区内执行,如果分区完成,分组将会开始执行,也不是等所有分区完成才开始做分组的操作。
4、有疑问或是写的不对的地方,欢迎大家发邮件沟通交流:dajuezhao@gmail.com
发表评论
-
Hadoop的基准测试工具使用(部分转载)
2011-01-21 11:58 1611一、背景由于以前没有 ... -
分布式集群中的硬件选择
2011-01-21 11:58 1040一、背景最近2个月时间一直在一个阴暗的地下室的角落里工作,主要 ... -
Map/Reduce的内存使用设置
2011-01-21 11:57 1651一、背景今天采用10台 ... -
Hadoop开发常用的InputFormat和OutputFormat(转)
2011-01-21 11:55 1489Hadoop中的Map Reduce框架依 ... -
SecondaryNamenode应用摘记
2010-11-04 15:54 1069一、环境 Hadoop 0.20.2、JDK 1.6、 ... -
Zookeeper分布式安装手册
2010-10-27 09:41 1333一、安装准备1、下载zookeeper-3.3.1,地址:ht ... -
Hadoop分布式安装
2010-10-27 09:41 1014一、安装准备1、下载hadoop 0.20.2,地址:http ... -
Map/Reduce使用杂记
2010-10-27 09:40 967一、硬件环境1、CPU:Intel(R) Core(TM)2 ... -
Hadoop中自定义计数器
2010-10-27 09:40 1538一、环境1、hadoop 0.20.22、操作系统Linux二 ... -
Map/Reduce中的Partiotioner使用
2010-10-27 09:39 919一、环境1、hadoop 0.20.22 ... -
Map/Reduce中的Combiner的使用
2010-10-27 09:38 1197一、作用1、combiner最基本是实现本地key的聚合,对m ... -
Hadoop中DBInputFormat和DBOutputFormat使用
2010-10-27 09:38 2443一、背景 为了方便MapReduce直接访问关系型数据 ... -
Hadoop的MultipleOutputFormat使用
2010-10-27 09:37 1698一、背景 Hadoop的MapReduce中多文件输出默 ... -
Map/Reduce中公平调度器配置
2010-10-27 09:37 1546一、背景一般来说,JOB ... -
无法启动Datanode的问题
2010-10-27 09:37 2398一、背景早上由于误删namenode上的hadoop文件夹,在 ... -
Map/Reduce的GroupingComparator排序简述
2010-10-27 09:36 1349一、背景排序对于MR来说是个核心内容,如何做好排序十分的重要, ... -
Map/Reduce中分区和分组的问题
2010-10-27 09:35 1140一、为什么写分区和分组在排序中的作用是不一样的,今天早上看书, ... -
关于Map和Reduce最大的并发数设置
2010-10-27 09:34 1249一、环境1、hadoop 0.20.22、操作系统 Linux ... -
关于集群数据负载均衡
2010-10-27 09:33 897一、环境1、hadoop 0.20.22、操作系统 Linux ... -
Map/Reduce执行流程简述
2010-10-27 09:33 989一、背景最近总在弄MR的东西,所以写点关于这个方面的内容,总结 ...
相关推荐
例如,在订单表和公司名称表的关联分析中,可以使用 MapJoin 或 ReduceJoin 实现数据 Join 操作。 小结 MapReduce 之 MapJoin 和 ReduceJoin 是两种常见的 Join 操作,用于连接来自不同数据源的数据。MapJoin 可以...
本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...
本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...
Map/Reduce是海量离线数据分析中广泛应用的并行编程模型.Hive数据仓库基于Map/Reduce实现了查询处理引擎,然而Map/Reduce框架在处理偏斜数据时会出现工作负载分布不均的问题.均衡计算模型(computation balanced model...
对于Join操作,优化的原则是将较小的表或子查询放在Join的左侧,这样在Reduce阶段,内存中加载的数据会较少,从而降低内存溢出的风险。在多个Join操作的情况下,如果Join条件相同,Hive会将它们合并成一个MapReduce...
然后,Master、Worker、Map 操作(M 个)、Reduce 操作(R 个)是构成执行程序的主要部分,其中 Map/Reduce 的处理任务会由 Master 伴随物联网的快速发展和广泛应用,人们可以有效利用物联网来实现信息交换和通信的...
该参数决定了是否根据输入小表的大小,自动将Reduce端的Common Join转化为Map Join,从而加快大表关联小表的Join速度。如果设置为true,则Hive将自动将Reduce端的Common Join转化为Map Join,默认值为false。 5. ...
通过Hadoop平台和Map/Reduce模式的运用,该研究展示了如何高效地处理大量物联网数据并实现数据挖掘的目的。 ### Hadoop平台操作流程 1. **RFID数据处理**:首先,文档提到需要对物联网环境中的RFID数据进行预处理...
传统的Map/Reduce设计用于批处理分析,并不适合交互式查询。由于Map/Reduce在HBase上的性能比在HDFS上慢3到4倍,启动开销大,且依赖磁盘计算,不适合快速查找。 接下来,文章描述了如何实现交互式查询。它介绍了...
Map-Reduce-Join-Locate: a Data Processing Framework for
本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一过程。 Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop ...
- 小表放前原则:在Join操作中,将较小的表放在前面,这样在Reduce阶段可以减少内存溢出的风险。设置多个Join时,若Join条件相同,所有表会合并成一个MapReduce任务;若条件不同,每个条件对应一个单独的任务。 除...
是为Groovy提供的一款开源并行类库,给我们提供了多种高级抽象,包括:map/reduce、fork/join、asynchronous closures、actors、agents、dataflow concurrency及其它概念。这样用Groovy编写并行程序就方便多了。 ...
Hive Summit 2011-join介绍了Facebook中Hive实现的多种join策略。Hive作为一个数据仓库工具,主要用于处理大规模数据集的分析和查询,而join操作是数据仓库中常见且关键的操作之一。在大数据的背景下,如何高效地...
- **含义**:决定查询中最后一个map/reduce job的输出是否为压缩格式。 - **默认值**:`false` - **建议设置**:根据数据量和存储需求调整,开启压缩可以节省存储空间和传输时间。 20. **hive.exec.compress....
在MapReduce环境中实现Theta连接,通常分为三个主要步骤:Map、Shuffle和Reduce。 1. **Map阶段**:在这个阶段,我们对输入的数据进行预处理。对于每个表,我们分别读取其记录并生成键值对。键通常是连接字段,值则...
在Hadoop MapReduce中实现Join,通常涉及将不同数据集的数据联接在一起。由于数据可能分布在不同的数据块中,因此必须在Reduce阶段进行Join,因为这是数据聚合的地方。然而,这可能导致数据倾斜,即某些键对应的数据...
Hive 优化方法整理是 Hive 数据处理过程中的重要步骤,涉及到 Hive 的类 SQL 语句本身进行调优、参数调优、Hadoop 的 HDFS 参数调优和 Map/Reduce 调优等多个方面。 Hive 类 SQL 语句优化 1. 尽量尽早地过滤数据...