`
leonzhx
  • 浏览: 793689 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Chapter 2. MapReduce

阅读更多

1.      MapReduce is a programming model for data processing. Hadoop can run MapReduce programs written in various languages. Most important, MapReduce programs are inherently parallel, thus putting very large-scale data analysis into the hands of anyone with enough machines at their disposal.

2.      To take advantage of the parallel processing that Hadoop provides, we need to express our query as a MapReduce job. After some local, small-scale testing, we will be able to run it on a cluster of machines.

3.      MapReduce works by breaking the processing into two phases: the map phase and the reduce phase. Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer. The programmer also specifies two functions: the map function and the reduce function.

4.      The map function is usually a data preparation phase, setting up the data in such a way that the reducer function can do its work on it. The map function is also a good place to drop bad records.

5.      The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key.

6.      The map function is represented by an implementation of the Mapper interface, which declares a map() method. The Mapper interface is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. Rather than use built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization. These are found in the org.apache.hadoop.io package. The map() method is passed a key and a value and provides an instance of OutputCollector to write the output to.

7.      The reduce function is similarly defined using a Reducer and four formal type parameters are used to specify the input and output types. The input types of the reduce function must match the output types of the map function.

8.      A JobConf object forms the specification of the job. It gives you control over how the job is run. When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute around the cluster). Rather than explicitly specify the name of the JAR file, we can pass a class in the JobConf constructor, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class.

9.      Having constructed a JobConf object, we specify the input and output paths. An input path is specified by calling the static addInputPath() method on FileInputFormat, and it can be a single file, a directory (in which case, the input forms all the files in that directory), or a file pattern. As the name suggests, addInputPath() can be called more than once to use input from multiple paths. The output path (of which there is only one) is specified by the static setOutputPath() method on FileOutputFormat. It specifies a directory where the output files from the reducer functions are written. The directory shouldn’t exist before running the job, as Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with another). Next, we specify the map and reduce types to use via the setMapperClass() and setReducerClass() methods on JobConf object. The setOutputKeyClass() and setOutputValueClass() methods on JobConf object control the output types for the map and the reduce functions, which are often the same. If they are different, then the map output types can be set using the methods setMapOutputKeyClass() and setMapOutputValueClass(). The input types are controlled via the input format and if you don’t explicitly set, the default TextInputFormat will be used. After setting the classes that define the map and reduce functions, we are ready to run the job. The static runJob() method on JobClient submits the job and waits for it to finish, writing information about its progress to the console.

10.  Hadoop has a standalone mode in which Hadoop runs using the local filesystem with a local job runner.

11.  When the hadoop command is invoked with a classname as the first argument, it launches a JVM to run the class. It is more convenient to use hadoop than straight java since the former adds the Hadoop libraries (and their dependencies) to the classpath and picks up the Hadoop configuration, too. To add the application classes to the classpath, we’ve defined an environment variable called HADOOP_CLASSPATH, which the hadoop script picks up. The output was written to the output directory, which contains one output file per reducer.

12.  Release 0.20.0 of Hadoop included a new Java MapReduce API:

a)        The new API favors abstract classes over interfaces, since these are easier to evolve. For example, you can add a method (with a default implementation) to an abstract class without breaking old implementations of the class. In the new API, the Mapper and Reducer interfaces are now abstract classes.

b)        The new API is in the org.apache.hadoop.mapreduce package (and subpackages). The old API can still be found in org.apache.hadoop.mapred.

c)        The new API makes extensive use of context objects that allow the user code to communicate with the MapReduce system. The MapContext, for example, essentially unifies the role of the JobConf, the OutputCollector, and the Reporter.

d)        The new API supports both a “push” and a “pull” style of iteration. In both APIs, key-value record pairs are pushed to the mapper, but in addition, the new API allows a mapper to pull records from within the map() method. The same goes for the reducer. An example of how the “pull” style can be useful is processing records in batches, rather than one by one.

e)        Configuration has been unified. The old API has a special JobConf object for job configuration, which is an extension of Hadoop’s vanilla Configuration object (used for configuring daemons; see The Configuration API). In the new API, this distinction is dropped, so job configuration is done through a Configuration.

f)         Job control is performed through the Job class, rather than JobClient, which no longer exists in the new API.

g)        Output files are named slightly differently: part-m-nnnnn for map outputs, and part-r-nnnnn for reduce outputs (where nnnnn is an integer designating the part number, starting from zero).

13.  A MapReduce job is a unit of work that the client wants to be performed: it consists of the input data, the MapReduce program, and configuration information. Hadoop runs the job by dividing it into tasks, of which there are two types: map tasks and reduce tasks.

14.  There are two types of nodes that control the job execution process: a jobtracker and a number of tasktrackers. The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers. Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job. If a task fails, the jobtracker can reschedule it on a different tasktracker.

15.  Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits, or just splits. Hadoop creates one map task for each split, which runs the user-defined map function for each record in the split.

16.  For most jobs, a good split size tends to be the size of an HDFS block, 64 MB by default, although this can be changed for the cluster (for all newly created files), or specified when each file is created. Because Hadoop does its best to run the map task on a node where the input data resides in HDFS (data locality optimization) and it is the largest size of input that can be guaranteed to be stored on a single node.

17.  Map tasks write their output to the local disk, not to HDFS. Because Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete the map output can be thrown away. So storing it in HDFS, with replication, would be overkill.

18.  Reduce tasks don’t have the advantage of data locality. The output of the reduce is normally stored in HDFS for reliability. For each HDFS block of the reduce output, the first replica is stored on the local node, with other replicas being stored on off-rack nodes. Thus, writing the reduce output does consume network bandwidth, but only as much as a normal HDFS write pipeline consumes.

19.  When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition. The partitioning can be controlled by a user-defined partitioning function, but normally the default partitioner—which buckets keys using a hash function—works very well.



 


20.  It’s also possible to have zero reduce tasks. This can be appropriate when you don’t need the shuffle since the processing can be carried out entirely in parallel. In this case, the only off-node data transfer is when the map tasks write to HDFS:



 


21.  Hadoop allows the user to specify a combiner function to be run on the map output—the combiner function’s output forms the input to the reduce function. Since the combiner function is an optimization, Hadoop does not provide a guarantee of how many times it will call it for a particular map output record, if at all. In other words, calling the combiner function zero, one, or many times should produce the same output from the reducer. (i.e. max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25 )

22.  The combiner function is defined using the Reducer interface and we need to set the combiner class on the JobConf (setCombinerClass())

23.  Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any language that can read standard input and write to standard output to write your MapReduce program. Map input data is passed over standard input to your map function, which processes it line by line and writes lines to standard output. A map output key-value pair is written as a single tab-delimited line. Input to the reduce function is in the same format—a tab-separated key-value pair—passed over standard input. The reduce function reads lines from standard input, which the framework guarantees are sorted by key, and writes its results to standard output.

24.  The Java API is geared toward processing your map function one record at a time. The framework calls the map() method on your Mapper for each record in the input, whereas with Streaming the map program can decide how to process the input. The user’s Java map implementation is “pushed” records, but it’s still possible to consider multiple lines at a time by accumulating previous lines in an instance variable in the Mapper. In this case, you need to implement the close() method so that you know when the last record has been read, so you can finish processing the last group of lines.

25.  The MapReduce framework ensures that the input keys to the reduce scripts are ordered, so we know that if a key is different from the previous one, we have moved into a new key group. In contrast to the Java API, where you are provided an iterator over each key group, in Streaming you have to find key group boundaries in your program.

26.  The hadoop command doesn’t support a Streaming option; instead, you specify the Streaming JAR file along with the jar option. Options to the Streaming program specify the input and output paths, and the map and reduce scripts. This is what it looks like:

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \

  -input input/ncdc/sample.txt \

  -output output \

  -mapper ch02/src/main/ruby/max_temperature_map.rb \

  -reducer ch02/src/main/ruby/max_temperature_reduce.rb

27.  Hadoop Pipes is the name of the C++ interface to Hadoop MapReduce. Unlike Streaming, which uses standard input and output to communicate with the map and reduce code, Pipes uses sockets as the channel over which the tasktracker communicates with the process running the C++ map or reduce function. JNI is not used.

  • 大小: 51 KB
  • 大小: 28.1 KB
分享到:
评论

相关推荐

    Big.Data.for.Chimps.A.Guide.to.Massive-Scale.Data.Processing.in.Practice.epub

    Chapter 2. MapReduce Chapter 3. A Quick Look into Baseball Chapter 4. Introduction to Pig Part II. Tactics: Analytic Patterns Chapter 5. Map-Only Operations Chapter 6. Grouping Operations Chapter 7. ...

    Chapter7-厦门大学-林子雨-大数据技术原理与应用-第7讲-MapReduce(中国大学MOOC2018年春季学期)1

    2. MapReduce体系结构 MapReduce的体系结构主要由三个部分组成:Client、Master和Slave。Client负责提交作业,Master负责作业调度和任务分配,Slave负责执行任务。Master Node主要负责作业调度、任务分配和失败恢复...

    HBase.The.Definitive.Guide.2nd.Edition

    Chapter 2. Installation Chapter 3. Client API: The Basics Chapter 4. Client API: Advanced Features Chapter 5. Client API: Administrative Features Chapter 6. Available Clients Chapter 7. Hadoop ...

    Real-time.Big.Data.Analytics.17843

    Chapter 2. Getting Acquainted with Storm Chapter 3. Processing Data with Storm Chapter 4. Introduction to Trident and Optimizing Storm Performance Chapter 5. Getting Acquainted with Kinesis Chapter 6....

    Hadoop.Application.Architectures.1491900083

    Chapter 2. Data Movement Chapter 3. Processing Data In Hadoop Chapter 4. Common Hadoop Processing Patterns Chapter 5. Graph Processing On Hadoop Chapter 6. Orchestration Chapter 7. Near-Real-Time ...

    HBase.High.Performance.Cookbook.epub

    Chapter 2. Loading Data from Various DBs Chapter 3. Working with Large Distributed Systems Part I Chapter 4. Working with Large Distributed Systems Part II Chapter 5. Working with Scalable Structure ...

    hadoop学习资料向厦门大学教授教学课件

    2. **Hadoop处理架构**(Chapter2.pdf):这章节将详细阐述Hadoop的架构,包括Hadoop的分布式文件系统HDFS(Hadoop Distributed File System)和MapReduce编程模型。HDFS为海量数据提供高容错、高可用的存储方案,而...

    Hadoop- The Definitive Guide, 3rd Edition.pdf

    MapReduce. Chapter 3 looks at Hadoop filesystems, and in particular HDFS, in depth. Chapter 4 covers the fundamentals of I/O in Hadoop: data integrity, compression, serialization, and file-based ...

    数据库新技术复习资料

    Hadoop的MapReduce模型适合批量处理,而Spark以其内存计算和流处理能力,为实时分析提供了高效解决方案。 最后,"chapter06.ppt"可能会总结人工智能和机器学习在数据库领域的应用,如自动索引优化、智能查询预测...

    Hadoop_in_Action

    CHAPTER 2 Starting Hadoop CHAPTER 3 Components of Hadoop PART 2 - Hadoop in Action CHAPTER 4 Writing basic MapReduce programs CHAPTER 5 Advanced MapReduce CHAPTER 6 Programming practices CHAPTER 7 ...

Global site tag (gtag.js) - Google Analytics