1.MapReduce的类型
Hadoop的MapReduce函数遵循如下常规格式:
--map:(K1, V1) -> list(K2, V2)
--combine:(K2, list(V2)) -> list(K2, V2)
--partition:(K2, V2) -> integer
--reduce:(K2, list(V2)) -> list(K3, V3)
输入数据(K1,K2)的类型由输入格式进行设置。
由于JAVA的泛型存在类型擦除,其它的类型不能从mapper,reducer等直接导出,而需要手动设置。
其余设置如下表所示:
1)默认的MapReduce作业
job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); job.setReducerClass(Reducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class);HashPartitioner:对每条记录的键进行哈希操作以决定该记录属于那个分区让reduce处理。每个分区对应一个reducer任务,所以分区数等于作业的reduce的个数
2.输入格式
1)输入分片和记录
一个输入分片(split)是由单个map处理的输入块。每一个map只处理一个输入分片。每个分片被划分为若干个记录,每条记录就是一个key/value对,map一个接一个的处理每条记录。输入分片和记录都是逻辑的,不必将他们对应到文件上。
public abstract class InputSplit { //以字节为单位的长度 public abstract long getLength() throws IOException, InterruptedException; //分片的存储位置 public abstract String[] getLocations() throws IOException, InterruptedException; }注:一个分片并不包含数据本身,而是指向数据的引用。
长度:用来排序分片,以便优先处理最大的分片,从而最小化作业运行时间(近似贪婪算法)。
存储位置:供MapReduce系统使用以便将Map任务尽量放在分片数据附近。
2)InputFormat
InputFormat负责创建输入分片并将它们分割成记录
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
客户端通过调用getSpilts()方法获得分片列表。
JobTracker使用分片的存储位置来调度map任务。
Map任务把分片传给InputFormat的createRecordReader方法来获得这个分片的RecordReader。
RecordReader就是记录上的迭代器,map任务用一个RecordReader来生成记录的key/value对,然后再传递给map函数。
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { ...... public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
--FileInputFormat类
FileInputFormat是所有使用文件为数据源的InputFormat实现的基类.
它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现。把分片割成记录的作业由其子类来完成。
设置JobConf的输入路径:
public static void addInputPath(Job job, Path path); public static void addInputPaths(Job job, String commaSeparatedPaths); public static void setInputPaths(Job job, Path... inputPaths); public static void setInputPaths(Job job, String commaSeparatedPaths);一个路径可以表示一个文件,一个目录或是一个glob(文件模式)。
注:目录中的内容不会被递归处理。如果包含子目录,也会被当做文件,从而产生错误。
设置PathFilter排除特定文件。无论设置与否,FileInputFormat会使用默认过滤器来排除隐藏文件(以.和_开头的文件)。
public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter);也可以通过配置属性来设置:
mapred.input.dir:逗号分隔的路径 mapred.input.path.filter.class:PathFilter类名----FileInputFormat类的输入分片
分片大小的计算公式为
max(minimumSize, min(maximumSize, blockSize))
注:FileInputFormat定义的逻辑记录有时并不能很好地匹配HDFS的文件块,即逻辑记录的边界和HDFS的块的边界没有对齐。因此那些“本地”map可能会执行一些远程的读操作。
----CombineFileInputFormat
FileInputFormat生成的InputSplit是一个文件或该文件的一部分。
CombineFileInputFormat把多个文件打包到一个分片中以便每个mapper可以处理更多地数据。并且在决定哪些块放入同一个分片时,它会考虑节点和机架的因素,所以在典型的MapReduce作业中处理输入的速度并不会下降。
CombineFileInputFormat是一个抽象类,没有提供实体类,所以使用的时候要创建一个CombineFileInputFormat的具体类和实现createRecordReader()方法。
----避免切分
方法一:增加最小分片大小
方法二:覆盖FileInputFormat的isSplitable方法
----mapper中文件分片信息的相关属性
----TextInputFormat
TextInputFormat是默认的InputFormat。每条记录是一行输入。key是LongWritable类型,存储该行在整个文件中的字节偏移量,value是这行的内容,不包括任何终止符(换行符和回车符),它是Text类型
----KeyValueTextInputFormat
和TextInputFormat类似,KeyValueTextInputFormat也是将文件中的每行解析成一条记录,不同的是,它将每行的文本根据分隔符来拆分成键值对。默认的分隔符为制表符。可通过mapreduce.input.keyvaluelinerecordreader.key.value.separator属性设置。
----NLineInputFormat
如果希望mapper收到固定行数的输入,需要使用NLineInputFormat作为InputFormat。与TextInputFormat一样,key是文件中行的字节偏移量,值是行本身。
N是每个mapper收到的输入行数,默认时N=1,每个mapper会正好收到一行输入,mapreduce.input.lineinputformat.linespermap属性控制N的值。
----SequenceFileInputFormat
如果要用顺序文件数据作为MapReduce的输入,应用SequenceFileInputFormat。key和value由顺序文件决定,所以只需要保证map输入的类型匹配。
SequenceFileInputFormat可以读MapFile和SequenceFile。如果在处理顺序文件时遇到目录,SequenceFileInputFormat类会认为正在读MapFile,使用的是其数据文件,因此没有MapFileInputFormat类也是可以理解的。
----SequenceFileAsTextInputFormat和SequenceFileAsBinaryInputFormat
两者均是SequenceFileInputFormat的变体,前者将顺序文件(其实就是SequenceFile)的key和value转成Text对象,后者获取顺序文件的key和value作为二进制对象。
----多种输入MultipleInputs
MultipleInputs允许为每条输入路径指定InputFormat和Mapper,不同的Mapper输出的KEY/VALUE类型必须一致。
----数据库输入和输出
DBInputFormat和DBOutputFormat,HBase的TableInputFormat和TableOutputFormat。
运行太多的mapper数据库中读取数据可能会使数据库受不了。
另一种方法是使用Sqoop。
3.输出格式
1)TextOutputFormat
默认的输出格式,它本每条记录写成文本行。它的key/value可以是任意类型,因为TextOutputFormat调用toString()方法把它们转成字符串。
默认情况下key和value用制表符分割,可以通过mapreduce.output.textoutputformat.separator进行设置,与TextOutputFormat对应的输入格式是KeyValueTextInputFormat。
可以使用NullWritable来省略输出的key和value,这也会导致无分隔符输出。
2)SequenceFileOutputFormat & SequenceFileAsBinaryOutputFormat
SequenceFileOutputFormat将它的输出写为一个顺序文件。
SequenceFileAsBinaryOutputFormat把key/value对作为二进制格式写到一个SequenceFile容器中。
3)MapFileOutputFormat
把MapFile作为输出,MapFile中的key必需顺序添加,所以必须确保reducer输出的key已经排好序。
4)多个输出MultipleOutputs
5)延迟输出
FileOutputFormat的子类会产生输出文件,即使文件时空的。但是有些应用倾向于不创建空文件,此时就可以使用LazyOutputFormat了,它是一个封装输出格式,可以保证指定分区第一条记录输出时才真正的创建文件。要使用它,用JobConf和相关输出格式作为参数来调用setOutputFormatClass()方法即可。
相关推荐
本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,这是一种在信息检索和文本挖掘中用于评估一个词在文档中的重要性的统计方法。 首先,我们要理解TF-IDF...
【标题】Hadoop MapReduce 实现 WordCount ...通过理解和实践 Hadoop MapReduce 的 WordCount 示例,开发者可以快速掌握 MapReduce 的基本工作原理,为进一步学习和应用大数据处理技术打下坚实基础。
Hadoop的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,这两个组件共同为大数据处理提供了强大的支持。 MapReduce是一种分布式计算模型,由Google提出,Hadoop对其进行了实现。在MapReduce中,...
3. **数据输入与输出**:探讨InputFormat和OutputFormat接口,理解如何自定义输入输出格式以适应不同类型的数据源。 4. **错误处理与容错机制**:讲解Hadoop的检查点、重试和故障恢复策略,以确保任务的可靠性。 5...
MapReduce是Hadoop生态系统中的核心组件之一,用于处理和生成大规模数据集。该书旨在帮助读者理解并掌握如何使用MapReduce解决实际的大数据问题。 MapReduce的核心理念是将复杂的分布式计算任务分解为两个主要阶段...
总之,这个项目旨在展示如何用Python和Hadoop MapReduce解决社交网络中的相似用户分析问题。虽然代码可能不够精致,但它提供了一个起点,让人们了解如何在实际问题中应用这两个工具。对于想要进一步学习大数据处理的...
总之,《Hadoop MapReduce v2 Cookbook》第二版深入介绍了Hadoop MapReduce V2的相关技术和实践方法,适合于想要深入了解和掌握Hadoop MapReduce V2的开发者和技术人员阅读。通过本书的学习,读者不仅可以了解Hadoop...
而Hadoop MapReduce则是一个分布式计算框架,能够处理和存储海量数据。现在我们详细探讨这两个概念以及它们如何协同工作。 首先,让我们理解Apriori算法的基本原理。Apriori算法由Raghu Ramakrishnan和Gehrke在1994...
本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...
Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing
1. 复杂性:Hadoop MapReduce 的编程模型和执行机制相对复杂,需要一定的学习和实践经验。 2. 资源消耗:Hadoop MapReduce 需要大量的计算资源和存储资源,以支持大规模数据处理。 Hadoop MapReduce 是一种功能强大...
Hadoop MapReduce 编程实战 ...通过了解 MapReduce 编程基础、MapReduce 项目实践、MapReduce 编程模型、Deduplication、MAC 地址统计和计数器的使用,我们可以更好地掌握 Hadoop MapReduce 的编程技术。
通过熟练掌握这些调试技巧,开发者可以更有效地定位和解决问题,从而优化 Hadoop MapReduce 应用程序的性能和效率。在实际操作中,不断实践和理解 Hadoop 生态系统将使你能够更好地驾驭这个强大的大数据处理工具。
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...
Hadoop MapReduce是Apache Hadoop框架的核心组件之一,它设计用于分布式处理大规模数据集,而v2的引入主要是为了解决v1在资源管理和效率上的局限性。 MapReduce的工作原理分为两个主要阶段:Map阶段和Reduce阶段。...
在 Hadoop MapReduce 中实现 KMeans,我们可以将这个过程分解为两个主要步骤:Map 和 Reduce。 **Map 阶段:** - 输入:原始数据集,每个数据点为一行。 - 输出:<(质心ID, 数据点)> 键值对,其中质心ID表示当前...
对于需要处理大规模数据集的企业和组织来说,掌握Hadoop MapReduce的原理和使用方法是至关重要的。通过合理设计Map和Reduce函数,可以充分发挥Hadoop MapReduce的性能优势,解决实际问题中的数据处理难题。