`
cocoIT
  • 浏览: 50975 次
  • 性别: Icon_minigender_1
  • 来自: 福建
文章分类
社区版块
存档分类
最新评论

Hadoop学习笔记

 
阅读更多

应用开发

主要知识点如下:

Configuration类(支持overwrite,variable$)

测试(mock单元测试,本地测试,集群测试)

Tool,ToolRunner

集群测试(package,启动job,JobwebUIfornamenodeandjobtracker)

运程调试器(keep.failed.task.files=true,使用ISolationRunner)

作业调优(HPROF)

MapReduce工作流(oozie)

1.在本地运行测试数据

publicclassMaxTemperatureDriverextendsConfiguredimplementsTool{

publicintrun(String[]args)throwsException{

Jobjob=newJob(getConf(),“computemaxtemperature”);

job.setJarByClass();

job.setMapperClass();

job.setReducerClass();

FileInputFormat.addInputPath(job,newPath(args[0]));

FileOutputFormat.addOutputPath(job,newPath(args[1]));

returnjob.waitForCompletion(true);

}

publicstaticvoidmain(String[]args){

intexitCode=ToolRunner.run(newMaxTemperatureDriver(),args);

System.exit(exitCode);

}

}

编译上面的代码,在根节点处运行hadoop命令(事先将hadoop进程在本地启动):

hadoopMaxTemperature–confconf/hadoop-local.xmlinput/ncdcmax-temp

2.集群上运行

使用jar命令将class文件打包,然后使用jar命令上传并启动任务(事先将hadoop在集群中启动):

%hadoopjarjob.jarMaxTempratureDriver–confconf/hadoop-cluster.xmlinputoutput

3.Hadoop守护进程的地址和端口

RPC

namenodeRPC地址和端口hdfs://localhost:8020(fs.default.name)

jobtrackerRPC地址和端口localhost:8021(mapred.job.tracker)

datanodeTCP/IP服务器(块传输)50010(dfs.datanode.address)

datanodeRPC地址和端口localhost:50020(dfs.datanode.ipc.address)

tasktrackerRPC地址和端口(mapred.task.tracker.report.address)

HTTP

jobtracker50030(mapred.job.tracker.http.address)

tasktracker50060(mapred.task.tracker.http.address)

namenode50070(dfs.http.address)

datanode50075(dfs.datanode.http.address)

secondary50090(dfs.secondary.http.address)

4.作业调试(计数器和状态)

在map/reduce程序中可以通过计数器和状态来记录数据中的一些状态,可以通过webUI或脚本指令来查看运行后的计数器或状态。

context.setStatus(“”);

context.incrCounter(Stringgroup,Stringcounter,intnum);

命令行查询计数器:

%hadoopjob–counterjob_201111160811_0003‘MaxTemperatureMaper$Temperature’ENUM

远程调试器

在集群上运行作业很难调试,但是可以配置Hadoop保留作业运行期间产生的所有中间值,以便稍后在调试器上重新运行这些出错的任务。

1)设置属性保留中间数据keep.failed.task.files=true

2)运行作业,在web界面上查看故障节点和task_attempt_ID;

3)通过上面的ID来查找保存的中间数据文件。mapred.local.dir定义了本地缓存目录,在指定的一个或多个目录下寻找对应的job_id下的task_temp_id目录,下面存放着job.xml,map输入的序列化文件,map输出备份(在output目录下),和work目录(task_attempt的工作目录)。

4)在脚本控制台cd到上面的work目录,设置运程调试器属性并启动hadoop进入debug模式:

%exportHADOOP_OPTS=”-agentlib:jdwp=transfport=dt_socket,server=y,suspend=y,address=8787”

%hadooporg.apache.hadoop.mapred.IsolationRunner../job.xml

5)在运程客户端启动JavaIDE如Eclipse远程连接上面主机的8787端口,在map/reduce源代码中设置断点等待。

上述调试技术不只适用于失败的任务,还可以保留成功完成的任务数据来调试内部逻辑。这是,可将属性keep.task.files.pattern设置为一个正则表达式(与保留的任务ID匹配)。

其它一些调试的技巧:

在linux下dumpJavathreadstacktrace

如果是在控制台中运行,则直接ctrl+\

如果是在后台运行,可以先找到运行java的pid,然后kill-QUITPID,会将threadstack内容输出到该java进程的标准输出流里,例如tomcat就会写在catalina.out里。

jstack[-l]pid

如果java程序崩溃生成core文件,jstack工具可以用来获得core文件的javastack和nativestack的信息,从而可以轻松地知道java程序是如何崩溃和在程序何处发生问题。另外,jstack工具还可以附属到正在运行的java程序中,看到当时运行的java程序的javastack和nativestack的信息,如果现在运行的java程序呈现hung的状态,jstack是非常有用的。

5作业调优

哪些因素影响作业的运行效率?

mapper的数量:尽量将输入数据切分成数据块的整数倍。如有太多小文件,则考虑CombineFileInputFormat;

reducer的数量:为了达到最高性能,集群中reducer数应该略小于reducer的任务槽数。

combiner:充分使用合并函数减少map和reduce之间传递的数据量,combiner在map后运行;

中间值的压缩:对map输出值进行压缩减少到reduce前的传递量(conf.setCompressMapOutput(true)和setMapOutputCompressorClass(GzipCodec.class));

自定义序列:如果使用自定义的Writable对象或自定义的comparator,则必须确保已实现RawComparator

调整shuffle:MapReduce的shuffle过程可以对一些内存管理的参数进行调整,以弥补性能不足;

另一个有用的方法是启用JDK的HPROF分析来获取程序的CPU和堆栈使用情况。

conf.setProfileEnabled(true);//“mapred.task.profile”

conf.setProfileParams(“-agentlib:hprof=cpu=samples,heap=sites,depth=6,force=n,thread=y,verbose=n,file=%s”);//“mapred.task.profile.params”

conf.setProfileTaskRange(true,“0-2”);//第一个参数表示map,false则分析reduce;第二个参数任务ID范围

将上述程序加入驱动程序后重新运行,分析结果将输出到作业日志的末尾。

MapReduce工作机制

知识点小结:

shuffle影响性能的因素

1Map–>buffer–>partition,sort,spilltodisk(输出缓冲区,溢出写磁盘比例,运行combiner最小溢出写文件数3,tasktracker工作线程数)

2Reduce

copy(5threads)–>memory(buffersize)–>disk(threhold)–>merge–>reduce

1剖析MapReduce作业运行机制

1.1作业的提交

客户端通过JobClient.runJob()来提交一个作业到jobtracker,JobClient程序逻辑如下:

a)向Jobtracker请求一个新的jobid(JobTracker.getNewJobId());

b)检查作业的输出说明,如已存在抛错误给客户端;计算作业的输入分片;

c)将运行作业所需要的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中以jobid命名的目录下。作业jar副本较多(mapred.submit.replication=10);

d)告知jobtracker作业准备执行(submitjob)。

1.2作业的初始化

jobtracker接收到对其submitJob()方法的调用后,将其放入内部队列,交由jobscheduler进行调度,并对其进行初始化,包括创建一个正在运行作业的对象(封装任务和记录信息)。

为了创建任务运行列表,jobscheduler首先从共享文件系统中获取JobClient已计算好的输入分片信息,然后为每个分片创建一个map任务;创建的reduce任务数量由JobConf的mapred.reduce.task属性决定,schedule创建相应数量的reduce任务。任务此时被执行ID。

1.3任务的分配

jobtacker应该先选择哪个job来运行?这个由jobscheduler来决定,下面会详细讲到。

jobtracker如何选择tasktracker来运行选中作业的任务呢?

每个tasktracker定期发送心跳给jobtracker,告知自己还活着,是否可以接受新的任务。jobtracker以此来决定将任务分配给谁(仍然使用心跳的返回值与tasktracker通信)。每个tasktracker会有固定数量的任务槽来处理map和reduce(比如2,表示tasktracker可以同时运行两个map和reduce),由机器内核的数量和内存大小来决定。jobtracker会先将tasktracker的map槽填满,然后分配reduce任务到tasktracker。

jobtracker选择哪个tasktracker来运行map任务需要考虑网络位置,它会选择一个离输入分片较近的tasktracker,优先级是数据本地化(data-local)–>机架本地化(rack-local)。

对于reduce任务,没有什么标准来选择哪个tasktracker,因为无法考虑数据的本地化。map的输出始终是需要经过整理(切分排序合并)后通过网络传输到reduce的,可能多个map的输出会切分出一部分送给一个reduce,所以reduce任务没有必要选择和map相同或最近的机器上。

1.4任务的执行

1.tasktracker分配到一个任务后,首先从HDFS中把作业的jar文件复制到tasktracker所在的本地文件系统(jar本地化用来启动JVM)。同时将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。

2.接下来tasktracker为任务新建一个本地工作目录work,并把jar文件的内容解压到这个文件夹下。

3.tasktracker新建一个taskRunner实例来运行该任务。TaskRunner启动一个新的JVM来运行每个任务,以便客户的map/reduce不会影响tasktracker守护进程。但在不同任务之间重用JVM还是可能的。子进程通过umbilical接口(?什么含义,暂时未知)与父进程进行通信。任务的子进程每隔几秒便告知父进程的进度,直到任务完成。

Streaming和Pipes是用来运行其它语言编写的map和reduce。Streaming任务特指任务使用标准输入输出steaming与进程通信,可以是任何语言编写的。pipes特指C++语言编写的任务,其通过socket来通信(persistentsocketconnection)。

1.5进度和状态的更新

一个作业和每个任务都有一个状态信息,包括:作业或任务的运行状态(running,successful,failed),map和reduce的进度,计数器值,状态消息或描述。

这些信息通过一定的时间间隔由childJVM–>tasktracker–>jobtracker汇聚。jobtracker将产生一个表明所有运行作业及其任务状态的全局试图。你可以通过WebUI查看。同时JobClient通过每秒查询jobtracker来获得最新状态。

1.6作业的完成

1.7作业的失败

2.作业的调度

默认调度器–基于队列的FIFO调度器

公平调度器(FairScheduler)-每个用户都有自己的作业池,用map和reduce的任务槽数来定制作业池的最小容量,也可以设置每个池的权重。FairScheduler支持抢占,如果一个池在特定的一段时间内未得到公平的资源共享,它会中止运行池得到过多资源的任务,以便把任务槽让给运行资源不足的池。启动步骤:

1)拷贝contrib/fairscheduler下的jar复制到lib下;

2)mapred.jobtracker.taskScheduler=org.apache.hadoop.mapred.FairScheduler

3)重启节点hadoop

能力调度器(CapacityScheduler)-

3.shuffle和排序

shuffle特指map输出后到reduce运行前得到输入的整个过程,它是MapReduce的心脏,属于不断被优化和改进的代码库的一部分,下面主要针对0.20版本。

Map端

1)Map输出首先放在内存缓冲区(io.sort.mb属性定义,默认100MB);

2)守护进程会将缓冲区的数据按照目标reducer划分成不同的分区(partition),同时按键进行内排序;如果客户端定义了combiner,则combiner会在排序后运行,继续压缩缓存区的数据;

3)缓冲区上定义了一个阈值(io.sort.spill.percent,默认为0.8),当存储内容达到这个值时,缓冲区的值会被写到本地文件中(mapred.local.dir定义,可以是一个或多个目录);这种文件会有多个,每个的内容都是按照reducer分区且局部排序的。这个过程简称spilltodisk;

4)Map输出完毕前,这些中间的输出文件会合并成一个已分区且已排序的输出文件中,合并会分多次,每次合并的中间文件个数有io.sort.factor来定义,默认是10;这个过程也会伴随着combiner的运行,min.num.spills.for.combine定义了运行combiner之前溢出写的次数;

5)写磁盘时可以压缩文件。mapred.compress.map.output设置为true,mapred.map.output.compression.codec指定压缩实现类;

map任务完成后,会通知父tasktracker状态已更新,然后tasktracker通过心跳通知jobtracker。下面的reduce所在的tasktracker有一个线程定期询问jobtracker以便获得map输出的位置,直到它获得所有输出的位置。

Reduce端

1)每个map任务的完成时间可能不同,但只要有一个任务完成,reduce任务得知后就开始复制对应它的输出,复制线程数由mapred.reduce.parallel.copies定义,默认为5;

2)如果map输出相当小,则不用复制到文件中,而是reducetasktracker的内存中。缓冲区大小由mapred.job.shuffle.input.buffer.percent定义用于此用途的堆空间的百分比,默认0.7;一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent,默认值为0.66)或达到reduce输出阈值(mapred.inmem.merge.threshold,默认值为1000),则合并后溢出写到磁盘中;

3)随着磁盘上副本的增多,后台线程会将它们合并为更大的排好序的文件。为了合并,压缩的map输出必须在内存中被解压缩;

4)复制完所有的map输出后,reduce任务进入合并阶段(sortphase,合并多个文件,并按键排序)。io.sort.factor定义了每次合并数,默认为10,即每10个map输出合并一次。会有很多个合并后的中间文件。

5)最后直接把中间文件数据输入给reduce函数,对已排序输出中的每个键都要调用reduce函数,此阶段的输出直接写到HDFS中。

配置的调优

总原则:给shuffle过程尽量多提供内存空间,但也要确保map函数和reduce函数能得到足够的内存。

运行map和reduce任务的JVM内存大小有mapred.child.java.opts属性设置。

在map端,避免多次溢出写磁盘来获得最佳性能。计数器spilled.records计算在作业运行整个阶段中溢出写磁盘的记录数,大则表明写磁盘太频繁;

在reduce端,中间数据全部驻留在内存中就能得到最佳性能。如果reduce函数的内存需求不大,那么把mapred.inmem.merg.threshold设置为0,把mapred.job.reduce.input.buffer.percent设置为1会带来性能的提升。

4.任务的执行

Hadoop发现一个任务运行比预期慢的时候,它会尽量检测,并启动另一个相同的任务作为备份,即“推测执行”(speculativeexecution)。

推测执行是一种优化措施,并不能使作业运行更可靠。默认启用,但可以单独为map/reduce任务设置,mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution。开启此功能会减少整个吞吐量,在集群中倾向于关闭此选项,而让用户根据个别作业需要开启该功能。

Hadoop为每个任务启动一个新JVM需要耗时1秒,对于大量超短任务如果重用JVM会提升性能。当启用JVM重用后,JVM不会同时运行多个任务,而是顺序执行。tasktracker可以一次启动多个JVM然后同时运行,接着重用这些JVM。控制任务重用JVM的属性是mapred.job.reuse.jvm.num.tasks,它指定给定作业每个JVM运行的任务的最大数,默认为1,即无重用;-1表示无限制即该作业的所有的任务都是有一个JVM。

在map/reduce程序中,可以通过某些环境属性(Configuration)得知作业和任务的信息。

mapred.job.id作业ID,如job_201104121233_0001

mapred.tip.id任务ID,如task_201104121233_0001_m_000003

mapred.task.id任务尝试ID,如attempt_201104121233_0001_m_000003_0

mapred.task.partition作业中任务的ID,如3

mapred.task.is.map此任务是否为map任务,如true

MapReduce类型和格式

1.MapReduce的类型

map(K1,V1)–>list(K2,V2)//对输入数据进行抽取过滤排序等操作

combine(K2,list(V2))–>list(K2,V2)//为了减少reduce的输入,需要在map端对输出进行预处理,类似reduce。不是所有的reduce都在部分数据集上有效,比如求平均值就不能简单用于combine

partition(K2,V2)–>integer//将中间键值对划分到一个reduce分区,返回分区索引号。分区内的键会排序,相同的键的所有值会合成一个组(list(V2))

reduce(K2,list(V2))–>list(K3,V3)//每个reduce会处理具有某些特性的键,每个键上都有值的序列,是通过对所有map输出的值进行统计得来的;当获得一个分区后,tasktracker会对每条记录调用reduce。

默认的map和reduce函数是IdentityMapper和IdentityReducer,均是泛型类型,简单的将所有输入写到输出中。默认的partitioner是HashPartitioner,对每天记录的键进行哈希操作以决定该记录属于那个分区让reduce处理。

输入数据的类型有输入格式(InputFormat类)进行设置,其它的类型通过JobConf上的方法显示设置。这里显式设置中间和最终输出类型的原因是因为Java语言的泛型实现是typeerasure。另外如果K2和K3是相同类型,就不需要调用setMapOutputKeyClass(),因为它将调用setOutputKeyClass()来设置。

2.输入格式

2.1输入分片与记录

一个输入分片(split)是由单个map处理的输入块(分片个数即map所需的tasktracker个数),每个分片包含若干记录(key+value),map函数依次处理每条记录。输入分片表示为InputSplit接口,其包含一个以字节为单位的长度和一组存储位置,分片不包含数据本身,而是指向数据的引用。

InputSplit是由InputFormat创建的,一般无需应用开发人员处理。InputFormat负责产生输入分片并将它们分割成记录。

1)JobClient调用InputFormat.getSplites()方法,传入预期的map任务数(只是一个参考值);

2)InputFormat计算好分片数后,客户端将它们发送到jobtracker,jobtracker便使用其存储位置信息来调度map任务从而在tasktracker上处理这些分片数据。

3)在tasktracker上,map任务把输入分片传给InputFormat的getRecordReader()方法来获得这个分片的RecordReader;RecordReader基本上就是记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,然后在传给map函数。

2.2FileInputFormat

输入路径可由多个函数FileInputFormat.addInputPath()指定,还可以利用FileInputFormat.setInputPathFilter()设置过滤器。输入分片的大小有上个属性控制:分片最小字节数,分片最大字节数和HDFS数据块字节数。

mapred.min.split.size,mapred.max.split.size,dfs.block.size

计算公式是:

max(minSplitSize,min(maxSplitSize,blockSize))

没有特殊需求,应该尽量让分片大小和数据块大小一致。如果HDFS中存在大批量的小文件,则需要使用CombineFileInputFormat将多个文件打包到一个分片中,以便mapper可以处理更多的数据。一个可以减少大量小文件的方法(适合于小文件在本地文件系统,在上传至HDFS之前将它们合并成大文件)是使用SequenceFile将小文件合并成一个或多个大文件,可以将文件名作为键,文件内容作为值。

有时候不希望输入文件被切分,只需覆盖InputFormat的isSplitable()方法返回false即可。

有时候map程序想知道正在处理的分片信息,可以通过Configuration中的属性得到,包括map.input.file(正在处理的输入文件的路径),map.input.start(分片开始处的字节偏移量),map.input.length(分片的字节长度)。

有时候map想访问一个文件的所有内容,需要一个RecordReader来读取文件内容作为record的值。可行的方法是实现一个FileInputFormat的子类,将文件标记为不可切分,同时指定一个特定的RecordReader;该RecordReader只是在第一次next()时返回文件的内容。

2.3文本输入

TextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量;值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。由于一行的长度不定,所以极易出现split分片会跨越HDFS的数据块。

KeyValueTextInputFormat将文件的每一行看作一个键值对,使用某个分界符进行分隔,比如制表符。Hadoop默认输出的TextOutputFormat格式即键值对为一行组成一个文件,处理这类文件就可以使用键值文本输入格式。

NLineInputFormat可以保证map收到固定行数的输入分片,键是文件中行的字节偏移量,值是行内容。默认为1,即一行为一个分片,送给每个map。

2.4二进制输入

SequenceFileInputFormat存储二进制的键值对的序列。顺序文件SequenceFile是可分割的,也支持压缩,很符合MapReduce数据的格式。

2.5多种输入

Hadoop也支持在一个作业中对不同的数据集进行连接(join),即定义多个不同的数据输入源,每个源对应不同的目录、输入格式和Map函数。

MultipleInputs.addInputpath(conf,inputPath,TextInputFormat.class,MaxTemperatureMapper.class);

2.6数据库输入和输出

DBInputFormat用于使用JDBC从关系数据库中读取数据,但只适合少量的数据集。如果需要与来自HDFS的大数据集连接,要使用MultipleInputs。

在关系数据库和HDFS之间移动数据的另一个方法是Sqoop。

HBase和HDFS之间移动数据使用TableInputFormat和TableOutputFormat。

3.输出格式

TextOutputFormat是默认的输出格式,它把每条记录写为文本行,键和值可以是任意类型。

SequenceFileOutputFormat将输出写入一个顺序文件,是二进制格式。MapFileOutputFormat把MapFile作为输出,键必须顺序添加,所以必须确保reducer输出的键已经排好序。

FileOutputFormat及其子类产生的文件放在输出目录下,每个reducer一个文件并且文件由分区号命名,如part-00000,part-00001等。有时候需要对文件名进行控制,或让每个reduce输出多个文件,则可使用MultipleOutputFormat和MultipleOutputs类。

MultipleFileOuputFormat可以将数据写到多个文件,关键是如何控制输出文件的命名。它有两个子类:MultipleTextOutputFormat和MultipleSequenceFileOutputFormat。在使用多文件输出时,只需实现它们任何一个的子类,并覆盖generateFileNameForKeyValue()返回输出文件名。

MultipleOutputs类不同的是,可以为不同的输出产生不同的类型。

MultipleOutputs.addMultiNameOutput(conf,“name”,TextOutputFormat.class,KeyClass,valueClass);

新版本Hadoop中上述两个多输出类也合并。

FileOutputFormat的子类会产生输出文件,即使文件是空的。可以使用LazyOutputFormat来去除空文件。
MapReduce的特性

这章主要总结MapReduce的高级特性,包括计数器,数据集的排序和连接。

1.计数器

计数器是一种收集作业统计信息的有效手段,由于质量控制或应用统计。计数器还可辅助诊断系统故障。

Hadoop为每个作业维护若干内置计数器,以描述该作业的各项指标。计数器由关联任务维护,并定期(3秒)传到tasktracker,再由tasktracker传给jobtracker(5秒,心跳)。一个任务的计数器值每次都是完整传输的,而非增量值。

MapReduce允许用户编写程序定义计数器,一般是由一个Java枚举(enum)类型定义。枚举类型的名称即计数器组名称,枚举类型的字段即计数器名称。计数器在作业实例级别是全局的,MapReduce框架会跨所有的map和reduce来统计这些计数器,并在作业结束时产生一个最终的结果。

enumTemperature{

MISSING,MAlFORMED

}

context.incrCounter(Temperature.MISSING,1);

MapReduce同时支持非枚举类型的动态计数器。

context.incrContext(Stringgroup,Stringcounter,intamount);

计数器可以通过很多方式获取,Web界面和命令行(hadoopjob-counter指令)之外,用户可以用JavaAPI获取计数器的值。

RunningJobjob=jobClient.getJob(JobID.forName(id));

Counterscounters=job.getCounters();

longmissing=counters.getCounter(MaxTemperatue.Temperature.MISSING);

2.排序

排序是MapReduce的核心技术,尽管应用程序本身不需要对数据排序,但可以使用MapReduce的排序功能来组织数据。默认情况下,MapReduce根据输入记录的键对数据排序。键的排列顺序是由RawComparator控制的,规则如下:

1)若属性mapred.output.key.comparator.class已设置,则使用该类的实例;

2)否则键必须是WritableComparable的子类,并使用针对该键类的已登记的comparator;

3)如果还没有已登记的comparator,则使用RawComparator将字节流反序列化为一个对象,再由WritableComparable的compareTo()方法进行操作。

全排序

如何用Hadoop产生一个键全局排序的文件?(最好的回答是使用Pig或Hive,两者均可使用一条指令进行排序)

大致方法是,想办法创建一系列排好序的文件,而且这些文件直接也是排序的,比方说第一个文件的值都不第二个文件的值小,则简单的拼装这些文件就可以得到全局排序的结果。问题是如何划分这些文件,并把原始文件的值放入这些排序的文件中?可以使用map的partition来将某一范围的键放入对于的reduce,每个reduce的输入可以保证已排序(局部排序),默认直接输出到part-000×,那所有这些输出组合成一个文件就是全局排序的。为了得到合适的范围,需要对所有输入数据进行统计,实际做法是通过抽样,Hadoop提供InputSampler和IntervalSampler。使用抽样函数事先对input数据进行抽样,得到抽样范围,然后将范围写入分布式缓存,供集群上其它任务使用。

DistributedCache.addCacheFile(cacheFile,conf);

DistributedCache.createSymlink(conf);

辅助排序

MapReduce框架在记录达到reducer之前按键对记录排序,但键所对应的值并没有排序。大多情况下不需考虑值在reduce函数中的出现顺序,但是,有时也需要通过对键进行排序和分组等以实现对值的排序。

例子:设计一个MapReduce程序以计算每年最高气温。

1)使用组合键IntPair,将年份和气温都作为键;

2)按照年份来分区和分组,但排序需要按照年份升序和气温降序。

conf.setPartitionerClass();

conf.setOutputKeyComparatorClass();

conf.setOutputValueGroupingComparator();

3连接

MapReduce能执行大型数据集间的“连接”操作。

Map端连接指在数据到达map函数之前就执行连接操作。为达到此目的,各map的输入数据必须先分区并且以特定方式排序。各个数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。

map连接操作可以连接多个作业的输出,只要这些作业的reduce数量相同,键相同,并且输出文件是不可切分的(如小于HDFS块大小,或gzip压缩)。利用org.apache.mapred.join包中的CompositeInputFormat类来运行一个map端连接,其输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置。

Reduce连接不要求数据集符合特定结构,因此比Map连接更为常用。但是,由于数据集均经过mapReduce的shuffle过程,所以reduce端连接的效率往往更低一些。基本思路是mapper为各个记录标记源,并且使用连接键作为map输出键,使键相同的记录放在同一个reducer中。

1)可以使用MultipleInputs来解析和标注各个源;

2)先将某一个数据源传输到reduce。举天气数据为例,气象站信息(气象站id和名字)以气象站ID+“0”为组合键,名字为值,但是按照ID来分区和分组;气象站天气情况(气象站id,时间和气温)以气象站ID+“1”为组合键,气温为值,但是按照ID来分区和分组。两组数据经过不同的map之后,具有相同的ID的记录被合并作为一个记录输入reduce程序,值列表中的第一个是气象站名称,其余的记录都是温度信息。reduce程序只需要取出一个值,并将其作为后续每条输出记录的一部分写到输出文件即可。

conf.setPartitionerClass();

conf.setOutputValueGroupingComparator(Textpair.FirstComparator.class);

4边数据分布(sidedata)

边数据是作业所需的额外的只读数据,已辅助处理主数据集。面临的挑战是如何让所有的map和reduce都能方便高效地使用边数据。

1)如果仅需向任务传递少量元数据,则可以通过Configuration来设置每个job的属性,则map/reduce可以覆盖configure()方法来获取这些元数据值。如果你设置的值是复杂对象,则需要处理序列化工作。在几百个作业同在一个系统中运行的情况下,这种方法会增多内存开销,而且元数据信息在所有节点都缓存,即使在不需要它的jobtracker和tasktracker上。

2)针对小数据量边数据的常用办法是将在map/reduce数据缓存在内存中,并通过重用JVM使tasktracker上同一个作业的后续任务共享这些数据。

3)分布式缓存(-files,-archives)

a)启动作业时,使用files或archives传入元数据文件路径,

%hadoopjarjob.jarMaxTempratureSample–fileinput/metadata/stations-fixed-width.txtinput/alloutput

b)当tasktracker获得任务后,首先将jobtracker中的上述文件复制到本地磁盘,具体在${mapred.local.dir}/taskTracker/archive,缓存的容量是有限的,默认10GB,可以通过local.cache.size来设置。

c)在map/reduce程序中,直接读取“stations-fixed-width.txt”文件。同时可以通过JobConf.getLocalCacheFiles()和JobConf.getLocalCacheArchives()来获取本地文件路径的数组。

5MapReduce类库

Hadoop还提供了一个MapReduce类库,方便完成常用的功能。

ChainMapper,ChainReducer在一个MapReduce中运行多个mapper或reducer。(M+RM*)

IntSumReducer,LongSumReducer对各键的所有整数值进行求和操作的reducer

TokenCounterMapper输出各单词及其出现的次数

RegexMapper检查输入值是否匹配某正则表达式,输出匹配字符串和计数器值

分享到:
评论

相关推荐

    最新Hadoop学习笔记

    **Hadoop学习笔记详解** Hadoop是一个开源的分布式计算框架,由Apache基金会开发,主要用于处理和存储海量数据。它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,两者构成了大数据处理的基础...

    Hadoop 学习笔记.md

    Hadoop 学习笔记.md

    HADOOP学习笔记

    【HADOOP学习笔记】 Hadoop是Apache基金会开发的一个开源分布式计算框架,是云计算领域的重要组成部分,尤其在大数据处理方面有着广泛的应用。本学习笔记将深入探讨Hadoop的核心组件、架构以及如何搭建云计算平台。...

    hadoop学习笔记.rar

    《Hadoop学习笔记详解》 Hadoop,作为大数据处理领域中的核心框架,是Apache软件基金会下的一个开源项目,主要用于分布式存储和并行计算。本文将根据提供的Hadoop学习笔记,深入解析Hadoop的关键概念和实战技巧,...

    3.Hadoop学习笔记.pdf

    Hadoop是一个开源框架,用于存储和处理大型数据集。由Apache软件基金会开发,Hadoop已经成为大数据处理事实上的标准。它特别适合于存储非结构化和半结构化数据,并且能够存储和运行在廉价硬件之上。Hadoop具有高可靠...

    hadoop学习笔记(三)

    在本篇"Hadoop学习笔记(三)"中,我们将探讨如何使用Hadoop的MapReduce框架来解决一个常见的问题——从大量数据中找出最大值。这个问题与SQL中的`SELECT MAX(NUMBER) FROM TABLE`查询相似,但在这里我们通过编程...

    Hadoop学习笔记整理

    "Hadoop学习笔记整理" 本篇笔记对Hadoop进行了系统的介绍和总结,从大数据的基本流程到Hadoop的发展史、特性、集群整体概述、配置文件、HDFS分布式文件系统等方面都进行了详细的讲解。 一、大数据分析的基本流程 ...

    云计算hadoop学习笔记

    云计算,hadoop,学习笔记, dd

    Hadoop学习笔记.pdf

    在初学者的角度,理解Hadoop的组成部分以及其架构设计是学习Hadoop的基础。 首先,Hadoop的分布式文件系统(HDFS)是其核心组件之一,它具有高吞吐量的数据访问能力,非常适合大规模数据集的存储和处理。HDFS的设计...

Global site tag (gtag.js) - Google Analytics