`
pftzzg
  • 浏览: 10321 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

8-Hadoop MapReduce数据流

 
阅读更多

 

参考:

 

http://www.linuxidc.com/Linux/2012-02/54485.htm

 

Hadoop的核心组件在一起工作时如下图所示:


图4.4高层MapReduce工作流水线

 

 

 

  MapReduce的输入一般来自HDFS中的文件,这些文件分布存储在集群内的节点上。运行一个MapReduce程序会在集群的许多节点甚至所有节点上运行mapping任务,每一个mapping任务都是平等的:mappers没有特定“标识物”与其关联。因此,任意的mapper都可以处理任意的输入文件。每一个mapper会加载一些存储在运行节点本地的文件集来进行处理(译注:这是移动计算,把计算移动到数据所在节点,可以避免额外的数据传输开销)。

 

 

 

  当mapping阶段完成后,这阶段所生成的中间键值对数据必须在节点间进行交换,把具有相同键的数值发送到同一个reducer那里。Reduce任务在集群内的分布节点同mappers的一样。这是MapReduce中唯一的任务节点间的通信过程。map任务间不会进行任何的信息交换,也不会去关心别的map任务的存在。相似的,不同的reduce任务之间也不会有通信。用户不能显式的从一台机器封送信息到另外一台机器;所有数据传送都是由Hadoop MapReduce平台自身去做的,这些是通过关联到数值上的不同键来隐式引导的。这是Hadoop MapReduce的可靠性的基础元素。如果集群中的节点失效了,任务必须可以被重新启动。如果任务已经执行了有副作用(side-effect)的操作,比如说,跟外面进行通信,那共享状态必须存在可以重启的任务上。消除了通信和副作用问题,那重启就可以做得更优雅些。

 

 

 

 

 

 

 

近距离观察

 

 

 

  在上一图中,描述了Hadoop MapReduce的高层视图。从那个图你可以看到mapper和reducer组件是如何用到词频统计程序中的,它们是如何完成它们的目标的。接下来,我们要近距离的来来看看这个系统以获取更多的细节。



 

 

图4.5细节化的Hadoop MapReduce数据流

 

 

 

  图4.5展示了流线水中的更多机制。虽然只有2个节点,但相同的流水线可以复制到跨越大量节点的系统上。下去的几个段落会详细讲述MapReduce程序的各个阶段。

 



 
 

 

  1. InputFormat类定义了如何分割和读取输入文件,它提供有下面的几个功能:

 

 

 

选择作为输入的文件或对象;

 

定义把文件划分到任务的InputSplits

 

RecordReader读取文件提供了一个工厂方法;

 

 

 

 

 

  1. InputFormat



 

  1. MapReduce提供的输入格式

 

 

 

  默认的输入格式是TextInputFormat,它把输入文件每一行作为单独的一个记录,但不做解析处理。这对那些没有被格式化的数据或是基于行的记录来说是很有用的,比如日志文件。更有趣的一个输入格式是KeyValueInputFormat,这个格式也是把输入文件每一行作为单独的一个记录。然而不同的是TextInputFormat把整个文件行当做值数据,KeyValueInputFormat则是通过搜寻tab字符来把行拆分为键值对。这在把一个MapReduce的作业输出作为下一个作业的输入时显得特别有用,因为默认输出格式(下面有更详细的描述)正是按KeyValueInputFormat格式输出数据。最后来讲讲SequenceFileInputFormat,它会读取特殊的特定于Hadoop的二进制文件,这些文件包含了很多能让Hadoop的mapper快速读取数据的特性。Sequence文件是块压缩的并提供了对几种数据类型(不仅仅是文本类型)直接的序列化与反序列化操作。Squence文件可以作为MapReduce任务的输出数据,并且用它做一个MapReduce作业到另一个作业的中间数据是很高效的。

 

 

 

 

 

 

 

4. 记录读取器(RecordReader)

 

 

 

    InputSplit定义了如何切分工作,但是没有描述如何去访问它。 RecordReader类则是实际的用来加载数据并把数据转换为适合mapper读取的键值对。RecordReader实例是由输入格式定义的,默认的输入格式,TextInputFormat,提供了一个LineRecordReader,这个类的会把输入文件的每一行作为一个新的值,关联到每一行的键则是该行在文件中的字节偏移量。RecordReader会在输入块上被重复的调用直到整个输入块被处理完毕,每一次调用RecordReader都会调用Mapper的map()方法。

 

 

 

 

 

5. Mapper:

 

 

 

   Mapper执行了MapReduce程序第一阶段中有趣的用户定义的工作。给定一个键值对,map()方法会生成一个或多个键值对,这些键值对会被送到Reducer那里。对于整个作业输入部分的每一个map任务(输入块),每一个新的Mapper实例都会在单独的Java进程中被初始化,mapper之间不能进行通信。这就使得每一个map任务的可靠性不受其它map任务的影响,只由本地机器的可靠性来决定。map()方法除了键值对外还会接收额外的两个参数(译注:在0.20.×后的版本,接口已变化,由Context对象代替这两个参数):

 

 

 

•OutputCollector对象有一个叫collect()的方法,它可以利用该方法把键值对送到作业的reduce阶段。

 

•Reporter对象提供当前任务的信息,它的getInputSplit()方法会返回一个描述当前输入块的对象,并且还允许map任务提供关于系统执行进度的额外信息。setStatus()方法允许你生成一个反馈给用户的状态消息,incrCounter()方法允许你递增共享的高性能计数器,除了默认的计数器外,你还可以定义更多的你想要的计数器。每一个mapper都可以递增计数器,JobTracker会收集由不同处理得到的递增数据并把它们聚集在一起以供作业结束后的读取。

 

 

 

6. Partition & Shuffle:

 

 

 

 

 

 

 

当第一个map任务完成后,节点可能还要继续执行更多的map任务,但这时候也开始把map任务的中间输出交换到需要它们的reducer那里去,这个移动map输出到reducer的过程叫做shuffle。每一个reduce节点会分派到中间输出的键集合中的一个不同的子集合,这些子集合(被称为“partitions”)是reduce任务的输入数据。每一个map任务生成的键值对可能会隶属于任意的partition,有着相同键的数值总是在一起被reduce,不管它是来自那个mapper的。因此,所有的map节点必须就把不同的中间数据发往何处达成一致。Partitioner类就是用来决定给定键值对的去向,默认的分类器(partitioner)会计算键的哈希值并基于这个结果来把键赋到相应的partition上,自定义的分类器在第五部分有详细描述。

 

 

 

 

 

7. 排序:

 

 

 

 

 

每一个reduce任务负责归约(reduceing)关联到相同键上的所有数值,每一个节点收到的中间键集合在被送到具体的reducer那里前就已经自动被Hadoop排序过了。

 

 

 

 

 

8. 归约(Reduce):

 

 

 

 

 

 

 

每个reduce任务都会创建一个Reducer实例,这是一个用户自定义代码的实例,负责执行特定作业的第二个重要的阶段。对于每一个已赋予到reducer的partition内的键来说,reducer的reduce()方法只会调用一次,它会接收一个键和关联到键的所有值的一个迭代器,迭代器会以一个未定义的顺序返回关联到同一个键的值。reducer也要接收一个OutputCollector和Report对象,它们像在map()方法中那样被使用。

 

 

 

 

 

9.输出格式:

 

 

 

 

 

 提供给OutputCollector的键值对会被写到输出文件中,写入的方式由输出格式控制。OutputFormat的功能跟前面描述的InputFormat类很像,Hadoop提供的OutputFormat的实例会把文件写在本地磁盘或HDFS上,它们都是继承自公共的FileInputFormat类。每一个reducer会把结果输出写在公共文件夹中一个单独的文件内,这些文件的命名一般是part-nnnnn,nnnnn是关联到某个reduce任务的partition的id,输出文件夹通过FileOutputFormat.setOutputPath() 来设置。你可以通过具体MapReduce作业的JobConf对象的setOutputFormat()方法来设置具体用到的输出格式。下表给出了已提供的输出格式:

 

 

 

输出格式

 

 描述

 

 

 

TextOutputFormat

 

 默认的输出格式, 以 "key \t value" 的方式输出行

 

 

 

SequenceFileOutputFormat

 

 输出二进制文件,适合于读取为子MapReduce作业的输入

 

 

 

NullOutputFormat

 

 忽略收到的数据,即不做输出



 

Hadoop提供的输出格式

 

  Hadoop提供了一些OutputFormat实例用于写入文件,基本的(默认的)实例是TextOutputFormat,它会以一行一个键值对的方式把数据写入一个文本文件里。这样后面的MapReduce任务就可以通过KeyValueInputFormat类简单的重新读取所需的输入数据了,而且也适合于人的阅读。还有一个更适合于在MapReduce作业间使用的中间格式,那就是SequenceFileOutputFormat,它可以快速的序列化任意的数据类型到文件中,而对应SequenceFileInputFormat则会把文件反序列化为相同的类型并提交为下一个Mapper的输入数据,方式和前一个Reducer的生成方式一样。NullOutputFormat不会生成输出文件并丢弃任何通过OutputCollector传递给它的键值对,如果你在要reduce()方法中显式的写你自己的输出文件并且不想Hadoop框架输出额外的空输出文件,那这个类是很有用的。

 

  RecordWriter:这个跟InputFormat中通过RecordReader读取单个记录的实现很相似,OutputFormat类是RecordWriter对象的工厂方法,用来把单个的记录写到文件中,就像是OuputFormat直接写入的一样。

 

  Reducer输出的文件会留在HDFS上供你的其它应用使用,比如另外一个MapReduce作业,或一个给人工检查的单独程序。

 

 

 

 

 

 

 

 

  • 大小: 22.9 KB
  • 大小: 35.6 KB
  • 大小: 31.4 KB
  • 大小: 59.1 KB
  • 大小: 64.5 KB
  • 大小: 54.9 KB
分享到:
评论

相关推荐

    flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz

    在Hadoop 3.x中,引入了一些重要的改进和优化,如YARN的升级、HDFS的增强以及新的MapReduce API等。这些变化可能会影响Flink的运行,比如新的HDFS客户端API、安全认证机制等。Flink-shaded-hadoop-3-uber-jar通过...

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip

    Hadoop,则是分布式存储和计算的基石,尤其是其 MapReduce 框架,是早期大数据处理的主要方式。然而,Flink 在处理大数据任务时,可能会依赖于 Hadoop 的某些组件,如 HDFS(Hadoop 分布式文件系统)或 YARN(Hadoop...

    spark-3.1.3-bin-without-hadoop.tgz

    Spark Streaming则构建在RDD之上,通过微批处理实现对实时数据流的处理,支持复杂的窗口操作和状态管理。这对于实时监控、在线分析等应用场景非常有用。 MLlib是Spark的机器学习库,包含了多种算法如分类、回归、...

    hadoop-mapreduce-examples-2.6.0

    Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来...HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。

    spark-3.5.1-bin-hadoop3.tgz

    Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,...

    spark-2.4.0-bin-hadoop2.7.tgz

    它能处理来自多种源的数据流,如Kafka、Flume等,并且可以与其他Spark组件无缝集成。 4. **MLlib**:Spark的机器学习库,包含各种机器学习算法,如分类、回归、聚类、协同过滤等,以及模型选择和评估工具。在2.4.0...

    spark-3.2.1-bin-hadoop2.7.tgz

    Spark的核心设计理念是快速数据处理,通过内存计算大幅度提高了数据处理速度,对比传统的MapReduce模型,Spark提供了更高的计算效率。在Spark 3.2.1版本中,包含了对性能的优化、新功能的添加以及已知问题的修复。 ...

    spark-2.1.1-bin-hadoop2.7.tgz.7z

    3. **Spark Streaming**:提供了对实时数据流处理的支持,可以处理来自各种源(如Kafka、Flume或TCP套接字)的数据流。 4. **MLlib**:是Spark的机器学习库,提供了多种机器学习算法和实用工具,如分类、回归、聚类...

    spark-assembly-1.5.2-hadoop2.6.0jar包

    3. Spark Streaming:用于实时数据流处理,它可以将数据流划分为微批次,并利用Spark Core的批处理能力进行处理。 4. MLlib:Spark的机器学习库,包含多种机器学习算法,如分类、回归、聚类、协同过滤等,以及模型...

    flink-shaded-hadoop-2-uber-2.6.5-10.0.zip

    4. **数据转换**:Flink支持将Hadoop的MapReduce作业转换为流处理作业,使得基于批处理的传统作业能够无缝过渡到实时处理。 5. **容错机制**:Flink的容错机制与Hadoop的检查点机制相结合,可以提供高可用性和数据...

    spark-2.4.7-bin-hadoop2.7.tgz

    - **Spark Streaming**:处理实时数据流,通过微批处理实现高吞吐量和容错性。 - **MLlib**:机器学习库,包含多种算法(如分类、回归、聚类、协同过滤等)和实用工具,支持模型评估和管道构建。 - **GraphX**:...

    spark-3.0.0-bin-without-hadoop.tgz

    Spark的设计目标是提供比Hadoop MapReduce更快的数据处理速度,同时保持易于编程的特性。它通过引入基于内存计算的DAG(有向无环图)执行模型,大大减少了磁盘I/O,从而实现了速度上的飞跃。Spark的核心组件包括...

    spark-2.3.1-bin-hadoop2.7.zip

    3. **Spark Streaming**:提供了一个高级抽象来处理实时数据流,支持微批处理,使得实时分析变得简单。 4. **MLlib**:Spark的机器学习库,包含多种机器学习算法和实用工具,如分类、回归、聚类、协同过滤等,以及...

    spark--bin-hadoop3-without-hive.tgz

    总的来说,"spark--bin-hadoop3-without-hive.tgz"提供了一个在CentOS 8和Hadoop 3.1.3环境下运行的Spark实例,不包含Hive支持,适合那些需要高效大数据处理而不依赖Hive功能的用户。要充分利用这个版本,理解Spark...

    spark-2.0.0-bin-hadoop2.7.tgz.zip

    此外,Spark Streaming是Spark的一个模块,用于实时数据流处理。在Spark 2.0.0中,它增强了对微批处理的支持,使得流处理更加高效且易于调试。Spark Streaming还与其他模块如Spark SQL和MLlib(机器学习库)紧密集成...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    2. **Hadoop**: Hadoop是大数据处理的基础框架,包括HDFS(Hadoop Distributed File System)和MapReduce计算模型。Spark与Hadoop的集成意味着它可以读取、写入HDFS中的数据,并且可以利用YARN(Hadoop的资源调度器...

    spark-2.3.4-bin-hadoop2.7.tgz

    Spark的主要优势在于其内存计算机制,它将数据存储在内存中,从而显著提高了数据处理速度,相比传统的MapReduce模型,性能提升可达数十倍。此外,Spark提供了SQL查询支持(通过Spark SQL),机器学习库(MLlib),图...

    spark-3.0.3-bin-hadoop2.7.tgz

    它将数据流分解为微批处理,然后使用Spark Core的API进行处理,实现了低延迟的数据处理。 4. **MLlib**:Spark的机器学习库,包含各种机器学习算法,如分类、回归、聚类、协同过滤等,以及模型选择和评估工具。...

    spark-1.4.0-bin-hadoop1.tgz

    3. **Spark Streaming**:提供了基于微批次的数据流处理,能够实时处理连续的数据流。 4. **MLlib**:Spark的机器学习库,包含各种算法如分类、回归、聚类、协同过滤等,以及模型选择和评估工具。 5. **GraphX**:...

    spark-2.4.0-bin-without-hadoop.tgz

    3. 流处理(Spark Streaming):Spark Streaming 可以处理实时数据流,2.4.0 版本增加了对复杂事件处理的支持,提升了处理延迟和吞吐量。 4. 图计算(GraphX):Spark 2.4.0 对 GraphX 进行了优化,支持大规模图...

Global site tag (gtag.js) - Google Analytics