我们都知道在数据库里,多个表之间是可以根据某个链接键进行join的,这也是数据库的范式规范,通过主外键的关联,由此来减少数据冗余,提高性能。当然近几年,随着NOSQL的兴起,出现了基于列的的列式数据库,典型的有Hbase,MongonDB,Cassdran,等等,NOSQL数据库弱化了关联,直接将一整条数据,存入一列,以及去掉了数据库的部分事务特性,从而在海量数据面前显得游刃有余,当然,大部分的NOSQL不支持join操作,也没有绝对的必要支持,因为现在,我们完全是把一整条数据存在了一起,虽然多了许多冗余,但也换来了比较高检索性能,扩展性能,可靠性能。但某些业务场景下,我们仍然需要Join操作,这时候怎么办?
如果数据量比较大的情况下,我们可以使用Hadoop的MapReduce来完成大表join,尤其对Hbase的某些表进行join操作,当然我们也可以使用Hive或Pig来完成,其实质在后台还是运行的一个MR程序。
那么,散仙今天就来看下如何使用MapReduce来完成一个join操作,Hadoop的join分为很多种例如;Reduce链接,Map侧链接,半链接和Reduce侧链接+BloomFilter等等,各个链接都有自己特定的应用场景,没有绝对的谁好谁坏。
今天散仙要说的是,基于Reduce侧的链接,原理如下:
1、在Reudce端进行连接。
在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。
本次的实现是基于hadoop的旧版API+contribu扩展包里的,DataJoin的工具类辅助来完成的,下篇博客,将会给出,基于新版API,独立来完成Reduce侧的连接示例。
现在看下散仙的两个文件的测试数据,一个是a.txt,另一个是b.txt
- a文件的数据
- 1,三劫散仙,13575468248
- 2,凤舞九天,18965235874
- 3,忙忙碌碌,15986854789
- 4,少林寺方丈,15698745862
a文件的数据 1,三劫散仙,13575468248 2,凤舞九天,18965235874 3,忙忙碌碌,15986854789 4,少林寺方丈,15698745862
- b文件的数据
- 3,A,99,2013-03-05
- 1,B,89,2013-02-05
- 2,C,69,2013-03-09
- 3,D,56,2013-06-07
b文件的数据 3,A,99,2013-03-05 1,B,89,2013-02-05 2,C,69,2013-03-09 3,D,56,2013-06-07
源码如下:
- package com.qin.reducejoin;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.util.Iterator;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.KeyValueTextInputFormat;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reducer;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.TextInputFormat;
- import org.apache.hadoop.mapred.TextOutputFormat;
- import org.apache.hadoop.util.ReflectionUtils;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
- import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
- import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
- import com.qin.joinreduceerror.JoinReduce;
- /***
- *
- * Hadoop1.2的版本,旧版本实现的Reduce侧连接
- *
- * @author qindongliang
- *
- * 大数据交流群:376932160
- * 搜索技术交流群:324714439
- *
- *
- */
- public class DataJoin extends Configured implements Tool {
- /**
- *
- * Map实现
- *
- * */
- public static class MapClass extends DataJoinMapperBase {
- /**
- * 读取输入的文件路径
- *
- * **/
- protected Text generateInputTag(String inputFile) {
- //返回文件路径,做标记
- return new Text(inputFile);
- }
- /***
- * 分组的Key
- *
- * **/
- protected Text generateGroupKey(TaggedMapOutput aRecord) {
- String line = ((Text) aRecord.getData()).toString();
- String[] tokens = line.split(",");
- String groupKey = tokens[0];
- return new Text(groupKey);
- }
- protected TaggedMapOutput generateTaggedMapOutput(Object value) {
- TaggedWritable retv = new TaggedWritable((Text) value);
- retv.setTag(this.inputTag);
- return retv;
- }
- }
- /**
- *
- * Reduce进行笛卡尔积
- *
- * **/
- public static class Reduce extends DataJoinReducerBase {
- /***
- * 笛卡尔积
- *
- * */
- protected TaggedMapOutput combine(Object[] tags, Object[] values) {
- if (tags.length < 2) return null;
- String joinedStr = "";
- for (int i=0; i<values.length; i++) {
- if (i > 0) {joinedStr += ",";}
- TaggedWritable tw = (TaggedWritable) values[i];
- String line = ((Text) tw.getData()).toString();
- String[] tokens = line.split(",", 2);
- joinedStr += tokens[1];
- }
- TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
- retv.setTag((Text) tags[0]);
- return retv;
- }
- }
- /**
- *
- * 自定义的输出类型
- *
- * ***/
- public static class TaggedWritable extends TaggedMapOutput {
- private Writable data;
- /**
- * 注意加上构造方法
- *
- * */
- public TaggedWritable() {
- // TODO Auto-generated constructor stub
- }
- public TaggedWritable(Writable data) {
- this.tag = new Text("");
- this.data = data;
- }
- public Writable getData() {
- return data;
- }
- public void write(DataOutput out) throws IOException {
- this.tag.write(out);
- //此行代码很重要
- out.writeUTF(this.data.getClass().getName());
- this.data.write(out);
- }
- public void readFields(DataInput in) throws IOException {
- this.tag.readFields(in);
- //加入此部分代码,否则,可能报空指针异常
- String temp=in.readUTF();
- if (this.data == null|| !this.data.getClass().getName().equals(temp)) {
- try {
- this.data = (Writable) ReflectionUtils.newInstance(
- Class.forName(temp), null);
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- this.data.readFields(in);
- }
- }
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- JobConf job = new JobConf(conf, DataJoin.class);
- job.set("mapred.job.tracker","192.168.75.130:9001");
- ////读取person中的数据字段
- job.setJar("tt.jar");
- job.setJarByClass(DataJoin.class);
- System.out.println("模式: "+job.get("mapred.job.tracker"));;
- String path="hdfs://192.168.75.130:9000/root/outputjoindb";
- FileSystem fs=FileSystem.get(conf);
- Path p=new Path(path);
- if(fs.exists(p)){
- fs.delete(p, true);
- System.out.println("输出路径存在,已删除!");
- }
- Path in = new Path("hdfs://192.168.75.130:9000/root/inputjoindb");
- // Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, p);
- job.setJobName("cee");
- job.setMapperClass(MapClass.class);
- job.setReducerClass(Reduce.class);
- job.setInputFormat(TextInputFormat.class);
- job.setOutputFormat(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(TaggedWritable.class);
- job.set("mapred.textoutputformat.separator", ",");
- JobClient.runJob(job);
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(),
- new DataJoin(),
- args);
- System.exit(res);
- }
- }
package com.qin.reducejoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import com.qin.joinreduceerror.JoinReduce; /*** * * Hadoop1.2的版本,旧版本实现的Reduce侧连接 * * @author qindongliang * * 大数据交流群:376932160 * 搜索技术交流群:324714439 * * */ public class DataJoin extends Configured implements Tool { /** * * Map实现 * * */ public static class MapClass extends DataJoinMapperBase { /** * 读取输入的文件路径 * * **/ protected Text generateInputTag(String inputFile) { //返回文件路径,做标记 return new Text(inputFile); } /*** * 分组的Key * * **/ protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(","); String groupKey = tokens[0]; return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } /** * * Reduce进行笛卡尔积 * * **/ public static class Reduce extends DataJoinReducerBase { /*** * 笛卡尔积 * * */ protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2) return null; String joinedStr = ""; for (int i=0; i<values.length; i++) { if (i > 0) {joinedStr += ",";} TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(",", 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } /** * * 自定义的输出类型 * * ***/ public static class TaggedWritable extends TaggedMapOutput { private Writable data; /** * 注意加上构造方法 * * */ public TaggedWritable() { // TODO Auto-generated constructor stub } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void write(DataOutput out) throws IOException { this.tag.write(out); //此行代码很重要 out.writeUTF(this.data.getClass().getName()); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); //加入此部分代码,否则,可能报空指针异常 String temp=in.readUTF(); if (this.data == null|| !this.data.getClass().getName().equals(temp)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(temp), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoin.class); job.set("mapred.job.tracker","192.168.75.130:9001"); ////读取person中的数据字段 job.setJar("tt.jar"); job.setJarByClass(DataJoin.class); System.out.println("模式: "+job.get("mapred.job.tracker"));; String path="hdfs://192.168.75.130:9000/root/outputjoindb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } Path in = new Path("hdfs://192.168.75.130:9000/root/inputjoindb"); // Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, p); job.setJobName("cee"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoin(), args); System.exit(res); } }
运行,日志
- 模式: 192.168.75.130:9001
- 输出路径存在,已删除!
- INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library
- WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
- INFO - FileInputFormat.listStatus(199) | Total input paths to process : 2
- INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404222310_0025
- INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 33% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100%
- INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404222310_0025
- INFO - Counters.log(585) | Counters: 30
- INFO - Counters.log(587) | Job Counters
- INFO - Counters.log(589) | Launched reduce tasks=1
- INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=14335
- INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Launched map tasks=3
- INFO - Counters.log(589) | Data-local map tasks=3
- INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9868
- INFO - Counters.log(587) | File Input Format Counters
- INFO - Counters.log(589) | Bytes Read=207
- INFO - Counters.log(587) | File Output Format Counters
- INFO - Counters.log(589) | Bytes Written=172
- INFO - Counters.log(587) | FileSystemCounters
- INFO - Counters.log(589) | FILE_BYTES_READ=837
- INFO - Counters.log(589) | HDFS_BYTES_READ=513
- INFO - Counters.log(589) | FILE_BYTES_WRITTEN=221032
- INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172
- INFO - Counters.log(587) | Map-Reduce Framework
- INFO - Counters.log(589) | Map output materialized bytes=849
- INFO - Counters.log(589) | Map input records=8
- INFO - Counters.log(589) | Reduce shuffle bytes=849
- INFO - Counters.log(589) | Spilled Records=16
- INFO - Counters.log(589) | Map output bytes=815
- INFO - Counters.log(589) | Total committed heap usage (bytes)=496644096
- INFO - Counters.log(589) | CPU time spent (ms)=2080
- INFO - Counters.log(589) | Map input bytes=187
- INFO - Counters.log(589) | SPLIT_RAW_BYTES=306
- INFO - Counters.log(589) | Combine input records=0
- INFO - Counters.log(589) | Reduce input records=8
- INFO - Counters.log(589) | Reduce input groups=4
- INFO - Counters.log(589) | Combine output records=0
- INFO - Counters.log(589) | Physical memory (bytes) snapshot=623570944
- INFO - Counters.log(589) | Reduce output records=4
- INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2908262400
- INFO - Counters.log(589) | Map output records=8
模式: 192.168.75.130:9001 输出路径存在,已删除! INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - FileInputFormat.listStatus(199) | Total input paths to process : 2 INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404222310_0025 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 33% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404222310_0025 INFO - Counters.log(585) | Counters: 30 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=14335 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=3 INFO - Counters.log(589) | Data-local map tasks=3 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9868 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=207 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=172 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=837 INFO - Counters.log(589) | HDFS_BYTES_READ=513 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=221032 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=849 INFO - Counters.log(589) | Map input records=8 INFO - Counters.log(589) | Reduce shuffle bytes=849 INFO - Counters.log(589) | Spilled Records=16 INFO - Counters.log(589) | Map output bytes=815 INFO - Counters.log(589) | Total committed heap usage (bytes)=496644096 INFO - Counters.log(589) | CPU time spent (ms)=2080 INFO - Counters.log(589) | Map input bytes=187 INFO - Counters.log(589) | SPLIT_RAW_BYTES=306 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | Reduce input records=8 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=623570944 INFO - Counters.log(589) | Reduce output records=4 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=2908262400 INFO - Counters.log(589) | Map output records=8
运行结果,如下图所示:
可以看出,MR正确的完成了join操作,需要注意的是Reduce侧连接的不足之处:
之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
另外一点需要注意的是,散仙在eclipse里进行调试,Local模式下会报异常,建议提交到hadoop的测试集群上进行测试。
相关推荐
在Hadoop MapReduce环境中,处理大数据时经常遇到多表关联(Join)的需求,尤其是在复杂的业务逻辑中。MapReduce提供了一种分布式计算模型,能够高效地处理大规模数据集,但面对多表关联,尤其是多个Job之间的依赖和...
《Hadoop MapReduce Cookbook》是一本专为大数据处理和分析领域的专业人士编写的指南,它深入浅出地介绍了如何使用Hadoop MapReduce框架解决实际问题。MapReduce是Hadoop生态系统中的核心组件,它允许用户在分布式...
总之,MapReduce高级特性如计数器、排序和连接等,极大地增强了Hadoop平台处理大规模数据集的能力。通过这些高级特性,开发者可以更精确地控制数据处理流程,更有效地进行数据分析,并提高数据处理效率。对于追求高...
虽然Hadoop MapReduce可以直接处理JOIN操作,但使用Hive或Pig等高级数据处理工具可以简化编写代码的过程。Hive和Pig都支持多种JOIN类型,如LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN等,并且能够自动优化JOIN策略。 ...
通常,有几种方法来实现JOIN,包括:1) 全外连接(Full Outer Join)通常通过两次MapReduce作业完成,第一次MapReduce实现一个表的JOIN键的完整复制,第二次JOIN另一个表;2) 交叉连接(Cross Join)只需将所有键值...
本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一过程。 Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop ...
综上所述,MapReduce应用案例文档深入地介绍了MapReduce编程模型在Hadoop生态系统中的实际使用,包括对join操作的细节分析,以及如何搭建Hadoop环境,如何上传和管理测试数据。此外,文档还提供了Hadoop学习资源的...
org.apache.hadoop.mapreduce.lib.join org.apache.hadoop.mapreduce.lib.map org.apache.hadoop.mapreduce.lib.output org.apache.hadoop.mapreduce.lib.partition org.apache.hadoop.mapreduce.lib.reduce ...
由于Hadoop的设计初衷是为了解决大规模数据的聚合操作而非跨表操作,因此,在处理join操作时存在一定的局限性。本文旨在介绍一种新的方法——自适应连接计划生成(Adaptive Join Plan Generation),该方法针对...
6. **复杂算法实现**:可能包含机器学习算法如K-means聚类、PageRank算法等,利用MapReduce的并行处理能力来解决大规模数据集上的问题。 7. **错误处理和容错机制**:Hadoop MapReduce有内置的容错机制,如任务重试...
13. 使用 MapReduce 实现 Join 操作:使用 MapReduce 来实现数据的 Join 操作,以便将多个数据源合并成一个结果。 14. 使用 MapReduce 实现排序:使用 MapReduce 来实现数据的排序,以便对数据进行排序处理。 15. ...
在Hadoop MapReduce中,Reduce Join是一种实现大规模数据集间连接的高效方法。本文将探讨Reduce Join的工作原理,以及如何利用MRV2(MapReduce v2)API对它进行重写。 首先,我们来理解什么是Reduce Join。在关系...
- 为了提高效率,Hadoop MapReduce支持多种优化技术,如Combiner(局部聚合)、Spill(磁盘溢出)和Map-side Join(基于广播的小表)等。 9. **Hadoop的缓存机制**: - MapReduce可以通过缓存机制将中间结果或...
在大数据处理领域,Hadoop MapReduce 是一种广泛使用的分布式计算框架。在处理涉及多数据集的任务时,Join 操作是必不可少的,它用于合并来自不同数据源的相关数据。本文主要探讨了两种 MapReduce 中的 Join 实现:...
该套源码是个人学习Hadoop HDFS和MapReduce技术的实践案例集合,采用Java语言编写,包含45个文件,涵盖34个Java源文件、4个XML配置文件、3个偏好设置文件以及1个Git忽略文件等。内容涵盖HDFS的JAVA API操作,如文件...
- **使用Map Join**:当连接表较小且适合放入内存时,使用Map Join可以避免额外的Reduce任务。 - **利用分区裁剪**:通过WHERE子句指定特定分区,减少需要扫描的数据量,从而减少map任务的数量。 - **减少GROUP BY...
描述部分列举了一些具体的知识点,包含搭建企业级Hadoop集群的步骤、搭建高可用性(HA)的Hadoop分布式集群、安装CDH5集群环境,以及如何配置和使用Zookeeper、MapReduce的高级join操作。 对于标签“Hadoop”,这是...
包org.dan.mr.order_pro_mapjoin MapReduce实现订单信息和产品信息的join逻辑,在Mapper端实现,避免数据倾斜 包org.dan.mr.wordindex MapReduce单词索引 包org.dan.mr.shared_friends MapReduce查找共同好友 包org....
而Hadoop是一个由Apache软件基金会开发的开源框架,它允许使用简单的编程模型来分布式地处理大数据,其核心是HDFS分布式文件系统和MapReduce分布式计算模型,除此之外,Hadoop生态系统还包括了Hive、HBase、Pig、...
通过学习《Pro Hadoop》,读者将能够掌握Hadoop核心的使用,并且能够处理和分析大规模数据集,最终成为所在领域的“Hadoop核心专家”。 由于文档中的部分内容是通过OCR扫描出的,可能存在一些识别错误或漏识别的...