`

Hadoop实战-中高级部分 之 Hadoop MapReduce高级编程

 
阅读更多

第一部分:重要的组件
Combiner
•什么是Combiner
•combine函数把一个map函数产生的<key,value>对(多个key, value)合并成一个新的<key2,value2>. 将新的<key2,value2>作为输入到reduce函数中,其格式与reduce函数相同。
•这样可以有效的较少中间结果,减少网络传输负荷。
 
•什么情况下可以使用Combiner
•可以对记录进行汇总统计的场景,如求和。
•求平均数的场景就不可以使用了
Combiner执行时机
•运行combiner函数的时机有可能会是merge完成之前,或者之后,这个时机可以由一个参数控制,即 min.num.spill.for.combine(default 3)
•当job中设定了combiner,并且spill数最少有3个的时候,那么combiner函数就会在merge产生结果文件之前运行
•通过这样的方式,就可以在spill非常多需要merge,并且很多数据需要做conbine的时候,减少写入到磁盘文件的数据数量,同样是为了减少对磁盘的读写频率,有可能达到优化作业的目的。
•Combiner也有可能不执行, Combiner会考虑当时集群的负载情况。
Combiner如何使用
•代码示例
•继承Reducer类
public static class Combiner extends MapReduceBase implements
           Reducer<Text, Text, Text, Text> {
       public void reduce(Text key, Iterator<Text> values,
               OutputCollector<Text, Text> output, Reporter reporter)
               throws IOException {
                 }
    }
 
•配置作业时加入conf.setCombinerClass(Combiner.class)
 
Partitioner
•什么是Partitioner
•Mapreduce 通过Partitioner 对Key 进行分区,进而把数据按我们自己的需求来分发。
•什么情况下使用Partitioner
•如果你需要key按照自己意愿分发,那么你需要这样的组件。
•例如:数据文件内包含省份,而输出要求每个省份输出一个文件。
•框架默认的HashPartitioner
•public class HashPartitioner<K, V> extends Partitioner<K, V> {  

  /** Use {@link Object#hashCode()} to partition. */  
  public int getPartition(K key, V value,  
                          int numReduceTasks) {  
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;  
  } 
Partitioner如何使用
•实现Partitioner接口覆盖getPartition()方法
•配置作业时加入conf.setPartitionerClass(MyPartitioner.class);
•Partitioner示例
        public static class MyPartitioner implements Partitioner<Text, Text> {
           
         @Override 
            public int getPartition(Text key, Text value, int numPartitions) {
             }
 
}
Partitioner需求示例
•需求描述
•数据文件中含有省份
•需要相同的省份送到相同的Reduce里
•从而产生不同的文件
•数据样例
•1 liaoning
•1 代表该省份有多少个直辖市
•步骤
•实现Partitioner,覆盖getPartition
•根据省份字段进行切分
 
 
RecordReader
•什么是RecordReader
•用于在分块中读取<Key,Value>对,也就是说每一次我们读取一条记录都会调用该类。
•主要是处理经过InputFormat分片完的数据 
•什么时候使用RecordReader
•需要对输入的数据按自己的需求处理
•如:要求输入的key不是文件的偏移量而是文件的路径或者名字
•系统默认为LineRecordReader
•按照每行的偏移量做为map输出时的key值,每行的内容作为map的value值,默认的分隔符是回车和换行。
 
RecordReader需求示例
•需求
•更改map对应的输入的<key,value>值,key对应的文件的路径(或者是文件名),value对应的是文件的内容(content)。
•步骤
•重写InputFormat不对文件切分
•重写RecordReader
•在配置作业时使用自定义的组件进行数据处理
 
 
第二部分:Join
案例分析
•输入为2个文件,文件一内容如下
•空格分割:用户名 手机号 年龄
•内容样例
•Tom 1314567890 14
•文件二内容
•空格分割:手机号 地市
•内容样例
•13124567890 hubei
•需要统计出的汇总信息为 用户名 手机号 年龄 地市
MapJoin
•设计思路
•使用DistributedCache.addCacheFile()将地市的文件加入到所有Map的缓存里
•在Map函数里读取该文件,进行Join
•  将结果输出到reduce
•需要注意的是
•DistributedCache需要在生成Job作业前使用
 
 
ReduceJoin
•设计思路
•Map端读取所有文件,并在输出的内容里加上标识代表数据时从哪个文件里来的
•在reduce对按照标识对数据进行保存
•然后根据Key的Join来求出结果直接输出
 
第三部分:排序
 
普通排序
•Mapreduce本身自带排序功能
•Text对象是不适合排序的,如果内容为整型不会安照编码顺序去排序
•一般情况下我们可以考虑以IntWritable做为Key,同时将Reduce设置成0 ,进行排序
 
部分排序
•即输出的每个文件都是排过序的
•如果我们不需要全局排序,那么这是个不错的选择。
 
全局排序
•产生背景
•Hadoop平台没有提供全局数据排序,而在大规模数据处理中进行数据的全局排序是非常普遍的需求。
•使用hadoop进行大量的数据排序排序最直观的方法是把文件所有内容给map之后,map不做任何处理,直接输出给一个reduce,利用hadoop的自己的shuffle机制,对所有数据进行排序,而后由reduce直接输出。
•快速排序基本步骤就是需要现在所有数据中选取一个作为支点。然后将大于这个支点的放在一边,小于这个支点的放在另一边。
 
设想如果我们有 N 个支点(这里可以称为标尺),就可以把所有的数据分成 N+1 个part ,将这 N+1 个 part 丢给 reduce ,由 hadoop 自动排序,最后输出 N+1 个内部有序的文件,再把这 N+1 个文件首尾相连合并成一个文件,收工 。
由此我们可以归纳出这样一个用 hadoop 对大量数据排序的步骤:
1 )   对待排序数据进行抽样;
2 )   对抽样数据进行排序,产生标尺;
3 )   Map 对输入的每条数据计算其处于哪两个标尺之间;将数据发给对应区间 ID的 reduce
4 )   Reduce 将获得数据直接输出。
•Hadoop 提供了Sampler接口可以返回一组样本,该接口为Hadoop的采样器。
           public interface Sampler<K, V> {
                        K[] getSample(InputFormat<K, V> inf, Job job)
                         throws IOException, InterruptedException;
            }
•Hadoop提供了一个TotalOrderPartitioner,可以使我们来实现全局排序。
二次排序
•产生背景
•MapReduce默认会对key进行排序
•将输出到Reduce的values也进行预先的排序
•实现方式
•重写Partitioner,完成key分区,进行第一次排序
•实现WritableComparator,完成自己的排序逻辑,完成key的第2次排序
•原理
•Map之前的数据
         key1  1
         key2  2
         key2  3
         key3  4
         key1  2
•Mapduce只能排序key,所以为了二次排序我们要重新定义自己的key 简单说来就是<key value> value ,组合完后
         <key1  1 >    1
         <key2  2 >    2
         <key2  3 >    3
         <key3  4>     4
         <key1  2 >    2
 
•原理
•接下来实现自定义的排序类,分组类,数据变成
         <key1  1 >    1
         <key1  2 >    2
         <key2  2 >    2
         <key2  3 >    3
         <key3  4>     4
•最后 reduce处理后输出结果
           key1  1
           key1  2
           key2  2
           key2  3
           key3  4
 
 
 
第四部分:计数器
•什么是计数器
            计数器主要用来收集系统信息和作业运行信息,用于知道作业成功、失败等情况,比日志更便利进行分析。
•内置计数器
•Hadoop内置的计数器,记录作业执行情况和记录情况。包括MapReduce框架、文件系统、作业计数三大类。
•计数器由关联任务维护,定期传递给tasktracker,再由tasktracker传给jobtracker。
•计数器可以被全局聚集。内置的作业计数器实际上由jobtracker维护,不必在整个网络中传递。
•当一个作业执行成功后,计数器的值才是完整可靠的。
 
 
用户自定义Java计数器
•MapReduce框架允许用户自定义计数器
•计数器是全局使用的
•计数器有组的概念,可以由一个Java枚举类型来定义
•如何配置
•0.20.2以下的版本使用Reporter,
•0.20.2以上的版本使用context.getCounter(groupName, counterName) 来获取计数器配置并设置。
•动态计数器
•所谓动态计数器即不采用Java枚举的方式来定义
 
•Reporter中的获取动态计数器的方法
•public void incrCounter(String group,String counter,long amount)
            组名称,计数器名称,计数值
 
•一些原则
•创建计数器时,尽量让名称易读
 
 
•获取计数器
•Web UI
•命令行 hadoop job-counter
•Java API
•Java API
•在作业运行完成后,计数器稳定后获取。 使用job.getCounters()得到Counters
 
 
 
第五部分:合并小文件示例
•产生背景
•Hadoop不适合处理小文件
•会占用大量的内存空间
•解决方案
•文件内容读取到SequenceFile内
 
  转载请注明出处【 http://sishuok.com/forum/blogPost/list/0/5961.html
2
1
分享到:
评论

相关推荐

    Hadoop实战-中高级部分 PPT 和 源码 下载

    学习Hadoop中高级部分的内容,不仅需要对分布式系统有深入理解,还需要对Java编程有一定基础,因为Hadoop主要使用Java进行开发。此外,了解Linux操作系统和SQL也是必不可少的,因为Hadoop环境通常部署在Linux服务器...

    Hadoop实战-陆嘉恒(高清完整版)

    Hadoop主要由HDFS(Hadoop Distributed File System)和MapReduce两部分组成。HDFS是一种分布式文件系统,能够将大型数据集分布在多台廉价硬件上,提供高容错性和高可用性。MapReduce则是Hadoop用于大规模数据处理的...

    hadoop-common-2.6.0-bin-master.zip

    MapReduce则是一种编程模型,用于大规模数据集的并行计算,它将复杂计算任务分解为“映射”(map)和“化简”(reduce)两部分,便于分布式执行。 **Hadoop在Windows上的安装与配置** 在Windows上使用Hadoop通常...

    hadoop3.3.0-winutils所有bin文件

    同时,它也展示了大数据生态系统中不同组件如何协同工作,如Hadoop的HDFS、YARN和MapReduce,以及Spark的计算引擎,所有这些都依赖于Java和Scala等编程语言来实现。正确理解和使用这些工具,将能有效地利用大数据的...

    Hadoop Real-World Solutions Cookbook 源代码

    《Hadoop Real-World Solutions Cookbook 源代码》是一本针对Hadoop实际应用问题解决方案的实战指南,书中通过丰富的示例代码帮助读者理解和解决在大数据处理中遇到的各种挑战。源代码包含了书中各个章节的关键实现...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    在本安装包“spark-3.2.4-bin-hadoop3.2-scala2.13”中,包含了用于运行Spark的核心组件以及依赖的Hadoop版本和Scala编程语言支持。以下是对这些关键组成部分的详细解释: 1. **Spark**: Spark的核心在于它的弹性...

    Hadoop实战-附目录

    ### Hadoop实战知识点详解 ...通过上述对Hadoop核心组件、实战应用以及在云计算领域中的应用等方面的详细介绍,可以帮助读者更全面地理解和掌握Hadoop相关知识,为后续的实际工作打下坚实的基础。

    Hadoop高级编程- 构建与实现大数据解决方案

    在Hadoop高级编程中,我们需要掌握以下几个关键知识点: 1. **Hadoop配置**:理解Hadoop的配置文件如`core-site.xml`, `hdfs-site.xml`和`mapred-site.xml`,并知道如何根据具体需求调整配置参数,如副本数、内存...

    hadoop-eclipse插件各版本合集

    Hadoop-Eclipse插件是Apache Hadoop项目与Eclipse IDE集成的一个重要工具,它使得Hadoop开发者能够在Eclipse环境中直接创建、编辑、调试和管理Hadoop MapReduce作业,极大地提升了开发效率。本合集包含了多个版本的...

    《实战Hadoop--开启通向云计算的捷径》源码

    MapReduce则是一种编程模型,用于处理和生成大规模数据集,它将复杂任务拆分为"映射"和"化简"两部分,便于并行处理。 这本书的源码可能涵盖了以下知识点: 1. **Hadoop安装与配置**:包括单机模式、伪分布式模式和...

    hadoop实战-part1.pdf

    Hadoop实战文档的这一部分主要聚焦于大数据技术体系的核心组成部分,特别强调了Hadoop技术的重要地位,并对其核心组件HDFS(Hadoop分布式文件系统)和MapReduce进行了深入解析。同时,也讨论了与传统大数据解决方案...

    hadoop-2.6.0-cdh5.7.0版本.zip

    8. **数据处理流程**: 在Hadoop中,数据通常会经过ETL(提取、转换、加载)过程,然后使用MapReduce进行批处理,再通过Hive或Pig进行查询分析,最后可能通过HBase进行实时访问。 总结来说,"hadoop-2.6.0-cdh5.7.0...

    hadoop-eclipse-plugin

    Hadoop Eclipse Plugin是Apache Hadoop项目的一个重要组成部分,它为Eclipse IDE提供了集成开发环境的支持,使得Java开发者能够在Eclipse中直接对Hadoop项目进行创建、编辑和调试。这个压缩包包含了两个版本的Hadoop...

    Hadoop Real-world Solutions Cookbook(英文版)

    Pig和Hive,这两种高级数据处理工具,提供了SQL-like语言来简化MapReduce编程;以及Sqoop,用于导入和导出关系型数据库与Hadoop之间的数据。 通过一系列的实际案例,读者可以学习如何使用Hadoop处理各种业务场景,...

    hadoop-2.7.1.tar.gz.zip

    了解和掌握Hadoop不仅可以提升大数据处理能力,也是迈进大数据领域的必备技能之一。无论是开发者还是数据分析师,都需要深入理解Hadoop的工作原理和应用方法,以便在实际项目中发挥其最大价值。

    hadoop.dll-and-winutils.exe-for-hadoop2.9.0-on-windows_X64

    这使得你可以在Windows环境中进行大数据处理,进行MapReduce任务或者其他基于Hadoop生态的开发工作。不过需要注意的是,虽然Hadoop在Windows上的支持已经相当完善,但在生产环境中,通常还是推荐在Linux系统上部署...

    hadoop-3.1.0.rar windows 环境依赖

    4. **Hadoop的Java API**:对于开发者来说,Hadoop 3.1.0提供了更新的Java API,使得开发和集成更容易,同时支持更多高级功能。 5. **跨版本兼容性**:Hadoop 3.1.0致力于提高与其他版本的兼容性,使得升级和回滚...

Global site tag (gtag.js) - Google Analytics