`

开发MapReduce常见问题

 
阅读更多

(1)当你把一个文件加入distribution cache的时候,要注意:如果你是以addCacheFile()的方式添加的,而你在mapper中取出来的时候,却是以archive的方式取出来——getLocalCacheArchives(),那么,你将得不到cache文件的路径,因为放进去和取出来的方式要一致。

(2)在mapper中获取当前正在处理的HDFS文件名/HDFS目录名

有时候,Hadoop是按行来对数据进行处理的,由于对每一行数据,map()函数会被调用一次,我们有时可以根据文件名/目录名来获取一些信息,从而把它们输出,例如,目录名中包含了日期,则我们可以取出来并输出到Reducer。在map()函数中,我们可以这样取文件名:

InputSplit inputSplit = context.getInputSplit();
String fileName = ((FileSplit) inputSplit).getName();
假设当前正在处理的HDFS文件路径为:/user/hadoop/abc/myFile.txt,则上面的 fileName 取到的是“myFile.txt”这样的字符串。但如果要获取其目录名“abc”,则可以这样做:

InputSplit inputSplit = context.getInputSplit();
String dirName = ((FileSplit) inputSplit).getPath().getParent().getName();

(5)从HDFS上下载同一目录下的一堆文件
如果是从HDFS上下载一个文件到本地文件系统,这样做:

hadoop fs -get /your/hdfs/file /your/local/fs/file
但如果是要下载一个目录下的N个M-R输出文件(到一个文件),则应这样:

hadoop fs -getmerge /your/hdfs/directory /your/local/fs/file
或者你干脆把HDFS上的文件内容打印出来,重定向到一个文件:

hadoop fs -cat /your/hdfs/directory/part* > /your/local/fs/file
(6)关于InputFormat
具体可看这个链接。这里摘抄一段下来:

The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats. These types are described in more detail in Module 4.
More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.
即:InputFormat定义了如何从文件中将数据读取到Mapper的实例里。Hadoop已经自带了一些InputFormat的实现了,其中有一些用于处理文本文件,它们描述了如何解释文本文件的多个不同方法;其他的实现——例如SequenceFileInputFormat——是为读取特殊二进制文件格式而生的。
更加强大的是,你可以定义你自己的InputFormat实现来格式化输入到你程序的数据——无论你想要什么样的输入。例如,默认的TextInputFormat读取文本文件的一行行的数据。它为每条记录emit的key是正在读取的行的偏移字节(以LongWritable的形式体现),而value则是该行的内容直到结束的 \n 字符(以Text对象的形式体现)。如果你有多行记录,这些记录是以 $ 字符来分隔的,那么你可以写一个自己的InputFormat用于根据这个字符来分割解析文件。

(7)为什么要启用LZO压缩,现在有什么可用的Hadoop LZO实现
这篇文章很好地解释了Twitter的Hadoop LZO实践,看完它,你就明白为什么要用LZO了。
这个项目,就是Twitter的Hadoop LZO实现,非常有用。
一句话总结就是:gzip不能将数据分块压缩,虽然减小了存储的数据量(同时也就减小了IO),但却无法利用Map-Reduce进行并行处理;bzip可以将数据分块压缩,虽然减小了存储的数据量(同时也就减小了IO),但是却在解压的时候很慢,耗费掉太多的CPU资源,从而导致CPU处理速度跟不上读取压缩文件的速度;LZO在这二者之间达到了一个平衡,虽然其压缩比没有gzip那么高,却可以分块压缩(从而可以利用Map-Reduce进行并行处理),并且其解压速度非常快,整体上达到的效果就是:减小了数据存储量,减小了IO,虽然CPU资源比原来占用多了一些,但是Hadoop集群整体上的计算能力提升了很多。

(8)启动Haoop进程时的错误及解决方法:localhost: ssh: connect to host localhost port 22: Connection refused
启动Hadoop进程时可用Hadoop安装目录下的 bin/start-all.sh 脚本,如果执行该脚本提示错误:

localhost: ssh: connect to host localhost port 22: Connection refused
那么你应该先检查你是否安装了sshd,然后再检查防火墙是否阻止了连接本机的22端口。依据不同的Linux发行版,这些检测方法会有不同。以Ubuntu为例,执行sshd命令,如果提示你sshd没有安装,那么你可以使用以下命令安装之:

sudo apt-get install openssh-server
检查防火墙状态:

sudo ufw status
如果防火墙是打开的,那么还要确保22端口是允许连接的。
可以在开机启动时就启动Hadoop进程:编辑 /etc/rc.local 文件,添加一行即可:

/usr/local/hadoop/bin/start-all.sh
当然,需视情况改成你的Hadoop安装路径。

(9)在 Hadoop Map/Reduce Administration 的web页面中看不到运行中的job(Running Jobs)的可能原因
可能是 TaskTracker 没有启动,导致无法在页面中看到任何Running Jobs。这时你可以先查看一下其是否启动了:

ps -ef | grep java
里面应该有 org.apache.hadoop.mapred.TaskTracker 这一项。
如果没有,则可以重启Hadoop进程试试看。

(10)向HDFS中put文件时揭示“Name node is in safe mode”的原因及解决办法
向HDFS中put文件时,如果揭示:

put: org.apache.Hadoop.hdfs.server.namenode.SafeModeException: Cannot create file /XXX. Name node is in safe mode.
原因很显然了,name node处于安全模式,解决办法也很简单:kill掉name node的进程,然后重启之:

ps -ef | grep java
用该命令查看name node进程的PID,然后kill掉,然后再启动之:

start-dfs.sh
再确定一下name node进程是否启动了,如果成功启动了,就OK了。

这里有一个批量kill进程的技巧,其实就是几句shell语句:

PIDS=`ps -ef | grep -v grep | grep java | awk '{print $2}'`; for PID in $PIDS; do kill $PID; done
其中,“grep java”表示查找含有“java”关键字的进程名,“grep -v grep”表示过滤掉grep自己的这个进程名,awk 是用于打印出第2列的内容,即PID(进程号),而后面的 for 循环则是批量kill掉找到的进程。

(11)在shell中判断一个HDFS目录/文件是否存在
直接看shell代码:

hadoop fs -test -e /hdfs_dir
if [ $? -ne 0 ]; then
    echo "Directory not exists!"
fi
hadoop fs -test -e 用于判断HDFS目录/文件是否存在,下一步检测该命令的返回值,以确定其判断结果。

-test -[ezd] <path>: If file { exists, has zero length, is a directory
then return 0, else return 1.
e,z,d参数必用其一,不可缺少。

(12)一次添加多个输入目录/文件到Map-Reduce job中
使用 FileInputFormat.addInputPaths(Job job, String commaSeparatedPaths) 方法,可以一次将多个目录/文件添加到M-R job中,其中,第二个参数是一个逗号分隔的路径列表,例如“/user/root/2012-01-01,/user/root/2012-01-02,/user/root/2012-01-03”。

(13)HBase中的TTL的单位
在hbase shell中,describe '表名'可以查看一个HBase表的结构和基本参数,例如:

hbase(main):005:0> describe 'TableName'
DESCRIPTION                                                             ENABLED                               
 {NAME => 'TableName', FAMILIES => [{NAME => 'fam', COMPRESSION = > 'NONE', VERSIONS => '2', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}
里面的TTL的单位是秒,不做特别设置的时候,就是这个默认值(约为69年),超过此时间的记录会被删除。

(14)HBase中的VERSIONS(版本)的含义
如上例所示,你已经看到了VERSIONS这个参数,假设其值为2,那么它表示:row key、column family、qualifier 都相同的记录最多可以有2条,这2条记录的timestamp不同。例如用hbase shell查到的下面两条记录:

abc     column=fam:\x11\x00\x00\x99, timestamp=1325260900000, value=?\x80\x00\x00
abc     column=fam:\x11\x00\x00\x99, timestamp=1326828800000, value=?\x80\x00\x00
其中,“abc”是row key,“fam”是column family,“\x11\x00\x00\x99”是qualifier,这三者均相同,而这两条记录的timestamp不同,也就是VERSIONS为2。

(15)context.progress()的作用
假设在map()方法中,你有一个从数据库读取大量数据的操作,是用一个循环来完成的,并且,在读完全部的数据之前,你不会有任何的数据输出(纯读),那么,读完全部数据所需的时间可能很长,一直没有输出的话,这个task就会因为超时被杀掉,为了避免这个问题,可在以读取数据的循环中使用context.progress()方法来报告进度,那么该task就会被认为还活着,从而解决超时问题。

(16)Map-Reduce的单元测试,用MRUnit来做
我们不可能把每个M-R job都放到到实际的环境中去运行,靠打印log来调试其中的问题,单元测试是必须的,M-R的单元测试用MRUnit来做。
①MRUnit有两个ReduceDriver,一个是 org.apache.hadoop.mrunit.mapreduce.ReduceDriver,另一个是 org.apache.hadoop.mrunit.ReduceDriver,其中,前者是为更新的Hadoop API准备的,如果你发现IDE在ReduceDriver这里提示错误,但是又不知道哪里写错了的时候,可以查看一下是否是这个问题。
②在测试一个mapper类中的时候,MRUnit的setUp()函数比mapper类的setup(Context context)函数要先执行。

(17)调用一个Java Map-Reduce程序时,在命令行传入参数“-D mapred.queue.name=XXX”的作用
Hadoop集群中的job被分在不同的队列中,如果不设置mapred.queue.name参数,则job被放置在默认队列中,否则就被放在指定的队列中。各队列之间是有优先级之分的,同一个队列中的各job也有优先级之分,所以,我们需要的话,可以既设置队列,又设置job的优先级:

-D mapred.queue.name=XXX
-D mapred.job.priority=HIGH
这表示设置优先级为HIGH。

(18)继承自org.apache.hadoop.hbase.mapreduce.TableMapper这个抽象类的一个mapper类,当它的map()方法每被调用一次时,就有HBase的一行(row)被读入处理,由于是处理一行,所以对一个map()方法来说,row key是唯一的,column family可能有N个,每个column family下又可能有M个qualifier,每一个qualifier还可能会对应X个timestamp的记录(取决于你HBase的VERSIONS设置),你可以在map()方法中,一级级地遍历得到所有记录。

(19)在大多数情况下,一个split里的数据(由一个mapper所处理)是来自于同一个文件的;少数情况下,一个split里的数据是来自多个文件的。

(20)org.apache.Hadoop.mapreduce.lib.output 和 org.apache.hadoop.mapreduce.output 这两个package都有 TextOutputFormat 类,其中,前者比后者版本新,使用的时候注意。

(21)执行Map-Reduce Java程序时,传入 -D hadoop.job.ugi=hadoop,hadoop 参数可以使得该job以hadoop用户来执行,例如,你是以Linux root用户来执行一个脚本,脚本中执行了一个M-R Java程序,那么该程序就无法将输出结果写入到HDFS上的 /user/hadoop/ 目录下,如果按上面的方法传入一个参数,就解决了这个问题:

hadoop com.codelast.DoSomething -D hadoop.job.ugi=hadoop,hadoop
其中,com.codelast.DoSomething是你的M-R Java程序。

(22)用MRUnit怎么测试含有FileSplit.getPath()的方法
如果mapper中的一个方法myMethod(Context context)含有如下代码片段:

String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
这句话是用来取当前mapper正在处理的文件名。那么,方法myMethod()就不能用MRUnit来测,因为无法使用MRUnit来设置mapper中当前正在处理的文件。为了测这个方法,你需要把上面的代码段抽取出来,单独放在一个方法中,我们假设其为:

public int getName(Context context) {
    return ((FileSplit) context.getInputSplit()).getPath().getName();
}
然后,在单元测试文件中,你的tester类里重写这个方法,自己指定一个返回值:

@Test
public void test_1() throws IOException {
 mapper = new MyMapper() {
   @Override
   public int getName(Context context) {
  return "part-r-00000";
   }
 };

 Configuration configuration = new Configuration();
 mapDriver.withConfiguration(configuration);
 mapDriver.withMapper(mapper);
 
 mapDriver.withInput(new LongWritable(1), new Text("XXXXXX"));
 //TODO:
}
其中,MyMapper是你的mapper类名,在这里我们强制指定了getName方法返回一个字段串“part-r-00000”,从而在下面的“//TODO:”测试代码中,就可以在调用待测的myMethod方法时(间接地会调用getName方法),自然会得到“part-r-00000”这个字符串。

(23)HBase中的Pair类
如果你只要保存一对对象,那么Map可能不好用,你可以用 org.apache.hadoop.hbase.util 包中的 Pair<T1, T2> 类:

Pair<String, String> aPair = new Pair<String, String>("abc", "def");
String firstStr = aPair.getFirst();
String secondStr = aPair.getSecond();
显然,getFirst()方法用于取第一个值,getScond()方法用于取第二个值。

(24)用MRUnit测试mapper时,如何避开从 DistributedCache 加载文件
可以在unit test里set一个值到 Configuration 对象中,在mapper里判断这个变量是否set了,set了就从用于测试的local file读数据,没有set就从DistributedCache读文件。

(25)只有map的job,如何在一定程度上控制map的数量
如果一个job只有map,那么,map的数量就是输出文件的数量,为了能减少输出文件的数量,可以采用减少map的数量的方法,那么,如何减少呢?其中一个办法是设置最小的input split size。例如以下代码:

FileInputFormat.setMinInputSplitSize(job, 2L * 1024 * 1024 * 1024);
将使得小于 2G 的输入文件不会被分割处理。如果你的输入文件中有很多都是小于2G的,并且你的Hadoop集群配置了一个split的大小是默认的64M,那么就会导致一个1点几G的文件就会被很多个map处理,从而导致输出文件数量很多。使用上面的方法设置了min input split size之后,减小输出文件数量的效果很明显。

(26)如何使用elephant-bird的 LzoTextOutputFormat 对纯文本数据进行LZO压缩
假设你有一堆纯文本数据,要将它们用LZO来压缩,那么,可以用elephant-bird的 LzoTextOutputFormat 来实现。
一个只有map的job就可以完成这个工作,你需要做的,首先是设置输出格式:

job.setMapperClass(MyMapper.class);
job.setOutputFormatClass(LzoTextOutputFormat.class);
其次,你需要这样的一个mapper类:

public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    context.write(null, value);
  }
}
其余代码此处省略。

(27)如何使MapReduce job执行开始时不检查某目录是否已经存在
如果M-R job的HDFS输出目录已经存在,那么job执行时会报错。为了让它不检查,或者改变默认的检查办法(例如,我们会在HDFS输出目录下生成几个子目录,在里面输出最终数据,只要确保这几个子目录不存在即可),那么就需要override checkOutputSpecs 这个方法:

  @Override
  public void checkOutputSpecs(JobContext job) throws IOException {
    //TODO:
  }
在这里面,你只要把exception吃掉即可使得输出目录存在时不会报错。

(28)使用HBase的程序报错“java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.<init>”的一个原因
如果你的程序使用了HBase,并且有HDFS操作(即使用了hadoop的jar包),那么出现上面所说的错误提示时,请检查Hadoop的安装路径下的lib目录下,HBase的jar包版本是否与你的程序路径下的HBase jar包版本相同,如果不同,那么就有可能导致这个问题。

分享到:
评论

相关推荐

    Hadoop集群搭建部署与MapReduce程序关键点个性化开发.doc

    总的来说,搭建Hadoop集群和开发MapReduce程序是一个系统性的工程,涉及到操作系统管理、网络配置、Java编程以及大数据处理原理。对于初学者来说,遵循详尽的步骤和代码示例是非常有益的,而逐步熟悉并理解这些过程...

    华为MapReduce服务应用开发指南.pdf

    总之,这份华为MapReduce服务应用开发指南是为希望在华为MRS平台上开发MapReduce应用的开发者量身打造的,其内容全面、结构清晰,并且包含了大量实例代码,是学习和参考的宝贵资源。开发者可以根据这份指南快速上手...

    配置mapreduce开发环境(简单易懂,轻松上手)

    #### 七、解决常见问题 如果程序能够运行但没有任何输出信息,可能是因为缺少日志输出配置。可以通过添加`log4j.properties`文件来解决此问题。 - 如果遇到权限问题,可以在HDFS上创建一个目录,并将该目录的权限...

    实验项目 MapReduce 编程

    接着,实验在Hadoop集群的主节点上配置了MapReduce的开发环境,这里选择了Eclipse,而描述中提到的IDEA也是常见选择。开发者需要熟悉如何在IDE中设置Hadoop的环境,以便编写和调试MapReduce程序。 实验的核心部分是...

    使用MyEclipse实现MapReduce

    WordCount是最常见的MapReduce示例,用于统计文本中各个单词的出现次数。在MyEclipse中,可以创建一个Java类,实现Mapper和Reducer接口。Mapper类负责将输入行拆分成单词,Reducer类则将相同的单词聚合并计算总数。 ...

    MapReduce求行平均值--MapReduce案例

    本案例主要探讨如何使用MapReduce来求取数据集的行平均值,这在数据分析、数据挖掘以及日志分析等场景中非常常见。我们将讨论四种不同的方法来实现这一目标,针对已处理(包含行号)和未处理的数据集。 首先,我们...

    大数据 hadoop mapreduce 词频统计

    Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,这两个组件共同为大数据处理提供了强大的支持。 ...

    MapReduce经典常见面试实操题

    MapReduce是一种分布式计算模型,由Google开发,用于处理和生成大规模数据集。它将复杂的并行数据处理过程简化为两个主要阶段:Map和Reduce。在Map阶段,输入数据被分割成多个块,然后在集群的不同节点上并行处理。...

    1.1 MapReduce服务课程资料

    - MapReduce是基于Google的Map/Reduce分布式计算框架设计开发的,用于处理大规模数据集的并行运算。MapReduce易于编程,程序员仅需描述需要做什么,具体的执行由框架处理。 - Yarn是Hadoop 2.0中的资源管理系统,它...

    MapReduce 中文版论文

    - 排序是 MapReduce 中常见的应用场景之一,通过优化可以显著提高排序任务的效率。 4. **高效的备份任务**: - 通过合理设计备份任务,可以减少因节点故障导致的重新计算时间,从而提高整体性能。 5. **失效的...

    MapReduce浪潮

    它自动处理数据分割、任务调度、错误恢复和跨机器通信等问题,使得开发者能够专注于编写Map和Reduce函数,而无需具备分布式计算的专业知识。这种抽象使得开发大规模数据处理应用变得更加简单和高效。 Google的...

    MapReduce1.doc

    MapReduce是一种分布式计算模型,由Google开发,用于处理和生成大规模数据集。它将复杂的计算任务分解为两个主要阶段:Map(映射)和Reduce(规约),并且在大规模集群中并行执行这些任务,从而提高了处理效率。...

    MapReduce2.0程序设计多语言编程(理论+实践)

    Scala由于与Java API的兼容性,也是MapReduce的常见选择。 实战部分: 1. **MapReduce实例**:实战部分通常会涵盖不同类型的MapReduce应用,例如网页链接分析、日志分析、文本挖掘等。这些实例将展示如何设计和实现...

    基于mapreduce的小型电影推荐系统

    8. **性能优化**:考虑到MapReduce处理大数据的特性,推荐系统设计时还需考虑性能优化,如合理设置Map和Reduce任务的数量,避免数据倾斜等问题,以确保系统的高效运行。 综上所述,这个项目涵盖了大数据处理、...

    阿里云 专有云企业版 V3.5.2 E-MapReduce 开发指南 - 20190326.pdf

    除了上述详细的应用场景指导外,指南还提供了常见问题的解答部分,其中涵盖了安装问题、使用问题、性能问题等多方面的问题和解决方案。通过这部分内容,开发者可以快速定位并解决在使用E-MapReduce过程中可能遇到的...

    MapReduce Simplified Data Processing on Large Clusters.pdf

    在开发MapReduce之前,作者和其他Google员工已经实现了数百种特殊的计算任务来处理大量原始数据(如爬取的文档、网页请求日志等),用于计算各种衍生数据,例如倒排索引、网页结构的多种表示、每个主机被爬取的页面...

    基于javaweb + mapreduce的小型电影推荐系统

    MapReduce是Apache Hadoop的一部分,用于分布式处理大规模数据,尤其适合处理推荐系统中常见的协同过滤或基于内容的推荐算法。Java Web是开发Web应用程序的标准技术,用于构建服务器端功能和用户界面。推荐系统则是...

    Hadoop.MapReduce.v2.Cookbook pdf

    这本书籍详细介绍了如何在Hadoop 2.x环境中有效地设计、开发和优化MapReduce作业。 Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储海量数据。它的核心组件包括HDFS(Hadoop Distributed File ...

    IBM09年推出的一款基于eclipse的mapreduce小插件

    这样的文档通常会涵盖如何配置开发环境、创建MapReduce作业、理解工作流程、以及解决常见问题等内容。 标签中的“mapreduce”、“eclipse”和“hadoop”进一步强调了这个插件与这三个关键领域的关联。Hadoop是实现...

Global site tag (gtag.js) - Google Analytics