首先介绍常用的几种 mapreduce 方法
reduce side join
reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
SemiJoin
SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
reduce side join + BloomFilter
在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。
数据模型
data.txt
编号 ID 数值1
201001 1003 abc
201002 1005 def
201003 1006 ghi
201004 1003 jkl
201005 1004 mno
201006 1005 pqr
info.txt
ID 数值2
1003 kaka
1004 da
1005 jue
1006 zhao
期望输出
ID 编号 数值1 数值2
1003 201001 abc kaka
1003 201004 jkl kaka
1004 201005 mno da
1005 201002 def jue
1005 201006 pqr jue
1006 201003 ghi zhao
以reduce端join为例
不使用hbase 进行join 伪代码
Map端:
mapper{
String filename = data.txt || info.txt
//为不同的来源数据打上不同标识
if(filename is data.txt){
mapper context output(ID+tag1,编号+数值1)
}else if(filename is info.txt){
mapper context output(ID+tag2,数值2)
}
}
map端输出结果
ID TAG 数值1/编号 数值2
1003,0 kaka
1004,0 da
1005,0 jue
1006,0 zhao
1003,1 201001 abc
1003,1 201004 jkl
1004,1 201005 mon
1005,1 201002 def
1005,1 201006 pqr
1006,1 201003 ghi
按照key进行忽略tag
按照key进行分组 忽略tag
最终输出
同一组:
1003,0 kaka
1003,0 201001 abc
1003,0 201004 jkl
同一组:
1004,0 da
1004,0 201005 mon
同一组:
1005,0 jue
1005,0 201002 def
1005,0 201006 pqr
同一组:
1006,0 zhao
1006,0 201003 ghi
reduce端输出
reducer{
reducer context output()
}
由此可见最大的开销应该是实在shuffle阶段,若每个ID有上百万个键值对,IO开销是非常大的
以mapper端join为例
不使用hbase 进行join 伪代码
Map端代码
Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);//获取info.txt
BufferedReader joinReader = new BufferedReader(
new FileReader(cacheFiles[0].toString()));
while ((line = joinReader.readLine()) != null) {
tokens = line.split(" ");
hashmap.put(ID, 数值2);
}
mapper { //读取data.TXT
获取hashmap.get(ID)
拼接map输出
}
使用hbase进行map join
思路就是将cache 用 hbase代替,这样就不用每个分片都去复制一份cache,极大的节省了磁盘开销
info = HBase.connect("info.txt")
获取相应值
context.output(拼接字符串)
如此将info.txt看作一个散列表,不必为每个任务都创建散列表。
分享到:
相关推荐
### HBase二级索引与JOIN知识点详解 #### HBase简介 - **定义**: HBase是一种分布式、面向列的NoSQL数据库系统,它基于Google Bigtable论文实现。 - **底层架构**: HBase的数据存储依赖于Hadoop Distributed File ...
- **Joins**(连接):尽管HBase不支持标准SQL中的JOIN操作,但可以通过设计表结构来模拟类似功能。 - **ACID**(事务支持):虽然HBase本身不提供完整的ACID事务支持,但在某些场景下可以利用HBase的一些特性实现...
- 如何在MapReduce任务中使用HBase,例如读取和写入数据、扫描缓存、批量导入HFiles等。 - HBase作为MapReduce作业的数据源和数据接收器。 - 在MapReduce作业中访问其他HBase表。 - Map任务的拆分。 - 案例示例,...
- **Joins**: 在NoSQL数据库中,通常不鼓励使用JOIN,但HBase提供了一定的解决方案。 - **生存时间(TTL)**: 可以设置数据自动过期的时间。 - **保留删除的单元**: 删除数据后,可能会暂时保留其痕迹。 - **第二...
- **示例与访问其他表**:提供了使用HBase与MapReduce结合的实例代码,以及如何在一个MapReduce作业中访问多个HBase表的方法。 - **推测执行**:介绍了MapReduce作业中如何利用推测执行来提高整体性能。 #### 八、...
- **在 MapReduce 工作中访问其他 HBase 表**:指导如何在 MapReduce 任务中读取和写入多个 HBase 表。 - **推测执行**:说明 HBase 支持的推测执行机制及其优势。 #### 九、HBase 安全性 - **安全客户端访问 ...
- 编写代码,使用Table和Put对象将数据从本地文件读取并写入到HBase表中。 - 编译并运行Java程序,完成数据导入。 在整个过程中,确保所有组件的版本兼容,例如HBase与Hadoop、Sqoop与Hadoop之间的版本匹配。同时...
类比于传统型数据库里的一些查询方式,本文对Hbase的存储原理进行了研究,借助分布式计算框架Mapreduce在Hbase上构建了二级索引,就可以对表进行有针对性的定位和高效率的查找,同时也减轻zookeeper服务对资源调度的压力...
- **HBase MapReduce示例**:提供了一些使用HBase进行MapReduce编程的例子。 - **跨表访问**:在一个MapReduce作业中访问多个HBase表的方法。 #### 八、HBase安全 - **客户端访问控制**:确保只有授权用户可以访问...
Hive和HBase的整合,可以让用户在Hive中使用SQL语句来访问和操作HBase中的数据。整合Hive和HBase主要是利用两者对外的API接口进行通信,这一过程主要依赖于hive-hbase-handler.jar工具包中的HiveStorageHandlers类。...
13. 使用 MapReduce 实现 Join 操作:使用 MapReduce 来实现数据的 Join 操作,以便将多个数据源合并成一个结果。 14. 使用 MapReduce 实现排序:使用 MapReduce 来实现数据的排序,以便对数据进行排序处理。 15. ...
2. 空间JOIN操作:通过预计算或者MapReduce任务,模拟实现传统数据库中的JOIN操作。 3. 并行处理:利用HBase的分布式特性,进行大规模地理数据的并行处理和分析。 五、GISMaster的性能优化 1. 表分区:根据地理区域...
- **分布式模式**:生产环境中使用,需要配置ZooKeeper集群。 **2.5 ZooKeeper配置** - 设置ZooKeeper集群的地址和端口。 - 根据集群规模调整ZooKeeper实例数量。 **2.6 配置文件** - `hbase-site.xml`: 主要...
Pig Latin将复杂的数据处理任务转化为一系列简单的操作,如LOAD、FILTER、JOIN等,这些操作由Pig引擎自动转换为MapReduce作业执行。Pig提供了一种抽象层,降低了编写大数据处理程序的复杂度,使得数据科学家和分析师...
- **Accessing Other HBase Tables in a MapReduce Job**: 在MapReduce作业中访问其他HBase表的方法。 - **Speculative Execution**: 投机执行机制。 #### 九、HBase安全 - **安全客户端访问HBase**: 安全访问方法...
综上所述,MapReduce应用案例文档深入地介绍了MapReduce编程模型在Hadoop生态系统中的实际使用,包括对join操作的细节分析,以及如何搭建Hadoop环境,如何上传和管理测试数据。此外,文档还提供了Hadoop学习资源的...
通过与Hadoop生态系统的其他组件(如Hive、Pig和Hadoop MapReduce)集成,HBase可以处理更复杂的查询和分析任务,如多表JOIN操作。 总结起来,HBase是Hadoop生态中一个强大的列式存储数据库,提供大规模数据处理的...
Apache Phoenix 是一个开源的 JDBC 驱动程序,它允许用户使用 SQL 查询 HBase 数据存储。这个"apache-phoenix-4.9.0-HBase-1.1-bin.tar.gz"压缩包包含了 Phoenix 4.9.0 版本,专为 HBase 1.1 构建。通过这个包,...
技术点26 在HDFS、MapReduce、Pig 和Hive 中使用数据压缩 技术点27 在MapReduce、Hive 和Pig 中处理可分割的LZOP 5.3 本章小结 6 诊断和优化性能问题 6.1 衡量MapReduce 和你的环境 6.1.1 提取作业统计...