DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。。它具有以下几个特点:缓存的文件是只读的,修改这些文件内容没有意义;用户可以调整文件可见范围(比如只能用户自己使用,所有用户都可以使用等),进而防止重复拷贝现象;按需拷贝,文件是通过HDFS作为共享数据中心分发到各节点的,且只发给任务被调度到的节点。
DistributeCache的命令方式:
- (1)-files:将指定的本地/hdfs文件分发到各个Task的工作目录下,不对文件进行任何处理;
- (2)-archives:将指定文件分发到各个Task的工作目录下,并对名称后缀为“.jar”、“.zip”,“.tar.gz”、“.tgz”的文件自动解压,默认情况下,解压后的内容存放到工作目录下名称为解压前文件名的目录中,比如压缩包为dict.zip,则解压后内容存放到目录dict.zip中。为此,你可以给文件起个别名/软链接,比如dict.zip#dict,这样,压 缩包会被解压到目录dict中。
- (3)-libjars:指定待分发的jar包,Hadoop将这些jar包分发到各个节点上后,会将其自动添加到任务的CLASSPATH环境变量中。
自己在写MAR/REDUCE代码时,遇到了一个问题,一个大数据文件和一个小数据文件匹配计算,但是小数据文件太小,所以想采用HIVE的MAP JOIN的方式,把小数据文件放到直接大数据文件map的datanode的内存中,这样少了MR代码的1对N的数据文件关联。
实现这个的最佳方案就是利用distributed cache。HIVE的MAP JOIN也是利用这个技术。
首先简要介绍一下distributed cache是如何使用的,然后总结下自己在使用distributed cache遇到的问题,这些问题网上也有人遇到,但是没有给出明确的解释。希望能够帮助同样遇到此类问题的朋友。
distributed cache至少有如下的两类类应用:
- MAP、REDUCE本身和之间共享的较大数据量的数据
- 布置第三方JAR包,可以避免集群的删减导致部分依赖的机器的JAR包的丢失
distributed cache使用的流程总结如下:
- 1.在HDFS上准备好要共享的数据(text、archive、jar)
- 2.在distributed cache中添加文件
- 3.在mapper或者reducer类中获取数据
- 4.在map或者reduce函数中使用数据
1.数据本来就在HDFS上,所以省去了流程中的第一步
可以使用hadoop fs -copyFromLocal把本地文件cp到HDFS上
2.在distributed cache中添加文件
public static void disCache(String dimDir, JobConf conf) throws IOException {
FileSystem fs = FileSystem.get(URI.create(dimDir), conf);
FileStatus[] fileDir = fs.listStatus(new Path(dimDir));
for (FileStatus file : fileDir) {
DistributedCache.addCacheFile(URI.create(file.getPath().toString()), conf);
}
}
因为我利用的数据是HIVE脚本生成的,所以无法指定具体的文件路径,采用这种方式把一张HIVE表的所有数据都加载到cache中。如果能直接明确知道文件名称就简单很多了,例如:
DistributedCache.addCacheFile(URI.create(“/mytestfile/file.txt”), conf);
3.在mapper或者reducer中获取数据
public void configure(JobConf job) {
try {
urlFile = DistributedCache.getLocalCacheFiles(job);
seoUrlFile = "file://" + urlFile[0].toString();
allUrlFile = "file://" + urlFile[1].toString();
FileReader reader = new FileReader(seoUrlFile );
BufferedReader br = new BufferedReader(reader);
System.out.println("this is OK");
String s1 = null;
int i=0;
while((s1 = br.readLine())!=null){
String[] word = s1.split("\\|");
//do something you want
}
}
br.close();
reader.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
我在流程2中添加了两个文件,通过DistributedCache.getLocalCacheFiles(job)获取Path[],针对不同文件调用getData,把文件数据放在不同的List中seoUrlDim,allUrlDim
如此一来,distributed cache的过程就结束了,接下来就在map()或者reduce()中使用这些数据就OK了。
4.在map或者reduce函数中使用数据
一定要区分流程3和4,3是获取数据,4是使用数据。我就是在前期没弄明白这个的差异,导致内存溢出。
虽说好像挺简单的,但是在实现这个代码的过程中有如下几个问题困扰了我好久,网上也没找到很好的解决方案,后来在兄弟的帮助下搞定
- 1.FileNotFoundException
- 2.java.lang.OutOfMemoryError: GC overhead limit exceeded
1.FileNotFoundException
这个问题涉及到DistributedCache.getLocalCacheFiles(job) 这个函数,此函数返回的Path[]是执行map或者reduce的datanode的本地文件系统中路径,但是我在getData中利用的SequenceFile.Reader的默认filesystem是hdfs,这就导致获取数据时是从hdfs上找文件,但是这个文件是在本地文件系统中,所以就会报出这个错误FileNotFoundException。
例如,DistributedCache.getLocalCacheFiles(job)返回的PATH路径是:/home/dwapp/hugh.wangp/mytestfile/file.txt,在默认文件系统是hdfs时,获取数据时会读hdfs://hdpnn:9000/home/dwapp/hugh.wangp/mytestfile/file.txt,但是我们是在本地文件系统,为了避免数据获取函数选择"错误"的文件系统,我们在/home/dwapp/hugh.wangp/mytestfile/file.txt前加上"file://",这样就会从本地文件系统中读取数据了。就像我例子中的seoUrlFile = "file://" + urlFile[0].toString();
2.java.lang.OutOfMemoryError: GC overhead limit exceeded
这个问题是我在没搞明白如何使用distributed cache时犯下的错误,也就是我没弄明白流程3和4的区别。我把流程3中说的获取数据的过程放在map函数中,而在map函数中其实是使用数据的过程。这个错误因为使每用一个map就要获取一下数据,也就是初始化一个list容器,使一个datanode上起N个map,就要获取N个list容器,内容溢出也就是自然而然的事情了。
我们一定要把获取数据的过程放在mapper或reducer类的configure()函数中,这样对应一个datanode就只有一份数据,N个map可以共享着一份数据。
内容单薄,望志同道合之士互相学习。
原文:
http://hugh-wangp.iteye.com/blog/1468989
分享到:
相关推荐
Hadoop分布式缓存是Hadoop生态系统中的一个重要组成部分,它允许应用程序在执行MapReduce任务时共享和重用数据,从而提高整体性能。这份源码提供了深入理解Hadoop如何管理和利用分布式缓存的机会,对于想要优化...
### 深入浅出Hive企业级架构优化 #### Hive概述 Apache Hive 是一个基于 Hadoop 的数据...总之,通过对 Hive 架构、SQL 语句、数据压缩以及分布式缓存等方面的综合优化,可以有效提升 Hive 在企业级应用中的表现。
Hadoop分布式文件系统(HDFS)作为大数据处理领域的重要组成部分,其性能优化一直是研究的重点。HDFS中的集中化缓存管理提供了一种高效的缓存机制,它允许用户指定特定的HDFS路径进行缓存。通过这种方式,NameNode...
Hadoop分布式文件系统(HDFS)是Apache Hadoop项目的核心组件之一,它为大数据处理提供了可靠的、可扩展的分布式存储解决方案。在这个“HDFS实例基本操作”中,我们将深入探讨如何在已经安装好的HDFS环境中执行基本...
本文将深入探讨“go语言分布式书籍合集”中的两个关键主题:分布式对象存储和分布式缓存,以及它们如何在Go语言中得到实现。 首先,我们来看分布式对象存储。对象存储是一种以对象为基本单位的存储方式,它不再依赖...
5. **性能优化**:包括读写性能提升、I/O调度、缓存策略、并行处理等,这些都是提高分布式存储效率的关键。 6. **容错与恢复**:在分布式系统中,硬件故障是常态。如何设计有效的故障检测机制和快速的故障恢复策略...
【HDFS 透明加密KMS】是Hadoop分布式文件系统(HDFS)提供的一种安全特性,用于保护存储在HDFS中的数据,确保数据在传输和存储时的安全性。HDFS透明加密通过端到端的方式实现了数据的加密和解密,无需修改用户的应用...
综上所述,这份文档全面探讨了分布式环境下的信息缓存,从方法到具体实现,从硬件配置到软件设计,再到存储介质的选择和管理,涵盖了构建高效分布式缓存系统所需的关键技术和实践。对于理解并优化大型分布式系统的...
在Java编程环境中,Hadoop分布式文件系统(HDFS)提供了丰富的Java API,使得开发者能够方便地与HDFS进行交互,包括文件的上传、下载、读写等操作。本篇文章将详细探讨如何使用HDFS Java API来实现文件上传的功能。 ...
- `hdfs dfs -ls hdfs://server1:8020/` 操作HDFS分布式文件系统 - `hdfs dfs -ls /` 如果不指定协议,默认会使用`fs.defaultFS`配置的文件系统 此外,HDFS Shell还支持一系列管理命令,如`cacheadmin`用于缓存管理...
开发者可以自由控制调度过程,比如按照“农民工”的数量将源数据切分成多少份,然后远程分配给“农民工”节点进行计算处理,它处理完的中间结果数据不限制保存在hdfs里,而可以自由控制保存在分布式缓存、数据库、...
Hadoop分布式存储系统HDFS,全称为Hadoop Distributed File System,是Hadoop的一个核心组件,提供了高吞吐量的数据访问,适合大规模数据集的存储和处理。HDFS是一个高度容错的系统,能够被部署在廉价的硬件上,并能...
1. **层次存储体系**:通过建立多级缓存体系,包括应用程序缓存、HBase的HBlock缓存以及操作系统层面的HDFS文件缓存,Facebook极大提高了数据访问速度,减少了磁盘I/O,增强了系统的响应能力。 2. **容错机制改进**...
7. **性能优化**:考虑到HDFS的分布式特性,HDFS Explorer Installer可能包含了一些性能优化策略,如批量操作、缓存机制等,以减少网络延迟和提高整体性能。 安装文件HDFS Explorer Installer.msi是Windows平台上的...
WebHDFS Java Client是Hadoop生态系统中的一个关键组件,它为开发者提供了在Java环境中与Hadoop分布式文件系统(HDFS)交互的能力。"webhdfs-java-client-master"这个项目很可能是该客户端库的一个源码仓库主分支,...
例如,批量操作可以减少网络通信次数,缓存常用数据以减少延迟,或者利用HDFS的多线程读写特性提高效率。 通过这个“web中进行HDFS文件系统操作的demo”,开发者可以学习如何构建自己的HDFS Web应用,使非技术人员...
本文内容将围绕Hadoop 2.0版本中的HDFS新特性进行阐述,包括NameNode高可用性(HA)、NameNode联邦(Federation)、HDFS快照功能、HDFS缓存和HDFS访问控制列表(ACL)。 在Hadoop 1.0中,HDFS的NameNode仅有一个,...
分布式缓存 缓存产品 redis 业界主流 memcached 解决问题 数据库访问 使用应用服务器集群改善网站的并发处理能力 问题: 负载均衡情况下session状态的保持? 解决方案: 基于...
HDFS(Hadoop Distributed File System)是Apache Hadoop项目的核心组件,是一个分布式文件系统,旨在处理大规模数据存储和计算。本项目专注于如何在HDFS上存储和实现视频点播服务。 首先,理解HDFS的工作原理至关...