To create the list of tasks to run, the job scheduler first retrieves the input splits computed by the JobClient from the shared filesystem.It then creates one map task for each split.
事实上,当输入数据足够小的时候,它会按照你设定的mapper数目来执行。
Tasktrackers have a fixed number of slots for map tasks and for reduce tasks: for example, a tasktracker may be able to run two map tasks and two reduce tasks simultaneously. (The precise number depends on the number of cores and the amount of memory on the tasktracker; see “Memory” on page 254.) The default scheduler fills empty map task slots before reduce task slots, so if the tasktracker has at least one empty map task slot, the jobtracker will select a map task; otherwise, it will select a reduce task.
?
To choose a reduce task the jobtracker simply takes the next in its list of yet-to-be-run reduce tasks, since there are no data locality considerations.
reducer不考虑本地化。
For a map task, however, it takes account of the tasktracker’s network location and picks a task whose input split is as close as possible to the tasktracker. In the optimal case, the task is data-local, that is, running on the same node that the split resides on. Alternatively, the task may be rack-local: on the same rack, but not the same node, as the split.
mapper会考虑数据的本地化。
TaskRunner launches a new Java Virtual Machine (step 9) to run each task in (step 10), so that any bugs in the user-defined map and reduce functions don’t affect the tasktracker (by causing it to crash or hang, for example). It is however possible to reuse the JVM between tasks; see “Task JVM Reuse” on page 170.
map和reduce都在JVM中运行
When a task is running, it keeps track of its progress, that is, the proportion of the task completed. For map tasks, this is the proportion of the input that has been processed. For reduce tasks, it’s a little more complex, but the system can still estimate the proportion of the reduce input processed. It does this by dividing the total progress into three parts, corresponding to the three phases of the shuffle (see “Shuffle and Sort” on page 163). For example, if the task has run the reducer on half its input, then the task’s progress is ⅚, since it has completed the copy and sort phases (⅓ each) and is half way through the reduce phase (⅙).
Shuffer和Sort占的比例这么多?
Streaming里面用管道的时候,就没办法进行进度更新了。
Setting the timeout to a value of zero disables the timeout, so long-running tasks are never marked as failed. In this case, a hanging task will never free up its slot, and over time there may be cluster slowdown as a result. This approach should therefore be avoided, and making sure that a task is reporting progress periodically will suffice (see “What Constitutes Progress in MapReduce?” on page 158).
mapred.task.timeout=0,还是有很大的风险的
If a Streaming process hangs, the tasktracker does not try to kill it (although the JVM that launched it will be killed), so you should take precautions to monitor for this scenario, and kill orphaned processes by some other means.
这个还真的碰到好几次。不知道是不是叫做僵尸进程?
For some applications it is undesirable to abort the job if a few tasks fail, as it may be possible to use the results of the job despite some failures. In this case, the maximum percentage of tasks that are allowed to fail without triggering job failure can be set for the job. Map tasks and reduce tasks are controlled independently, using the mapred.max.map.failures.percent and mapred.max.reduce.failures.percent properties.
这个在项目投产的时候可以考虑设置一下。
Failure of a tasktracker is another failure mode. If a tasktracker fails by crashing, or running very slowly, it will stop sending heartbeats to the jobtracker (or send them very infrequently). The jobtracker will notice a tasktracker that has stopped sending heartbeats (if it hasn’t received one for 10 minutes, configured via the mapred.task tracker.expiry.interval property, in milliseconds) and remove it from its pool of tasktrackers to schedule tasks on.
?
The jobtracker arranges for map tasks that were run and completed successfully on that tasktracker to be rerun if they belong to incomplete jobs, since their intermediate output residing on the failed tasktracker’s local filesystem may not be accessible to the reduce task. Any tasks in progress are also rescheduled.
中文书貌似翻译的有点问题,这个的意思是tasktracker只要是未完成的job,map都要重跑。
The Fair Scheduler aims to give every user a fair share of the cluster capacity over time. If a single job is running, it gets all of the cluster. As more jobs are submitted, free task slots are given to the jobs in such a way as to give each user a fair share of the cluster. A short job belonging to one user will complete in a reasonable time even while another user’s long job is running, and the long job will still make progress.
Fair Scheduler 这么好的东西,之前一直都没有用过。很适合大规模集群投产使用。
In many ways, the shuffle is the heart of MapReduce, and is where the “magic” happens.
有没有这么牛?
It is often a good idea to compress the map output as it is written to disk, since doing so makes it faster to write to disk, saves disk space, and reduces the amount of data to transfer to the reducer. By default the output is not compressed, but it is easy to enable by setting mapred.compress.map.output to true. The compression library to use is specified by mapred.map.output.compression.codec; see “Compression” on page 77 for more on compression formats.
压缩倒是个好注意。
The output file’s partitions are made available to the reducers over HTTP. The number of worker threads used to serve the file partitions is controlled by the tasktracker.http.threads property—this setting is per tasktracker, not per map task slot. The default of 40 may need increasing for large clusters running large jobs.
没错,就是HTTP!
split和partition
split是map的输入,map溢写到硬盘时按照分区(partition)进行内排,partition这个值应该跟reducer_task的数目有关系,并得到一个溢写文件,map结束时对溢写文件进行合并,每个reduce在复制阶段,把属于自己partition的数据拷贝(copy phase)过来,在reducer开始执行时再把所有partition的数据再进行合并(sort phase)。
Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to.
Furthermore, the reduce task needs the map output for its particular partition from several map tasks across the cluster. The map tasks may finish at different times, so the reduce task starts copying their outputs as soon as each completes. This is known as the copy phase of the reduce task.
复制阶段。map的输出到特定的分块好像是由一个哈希函数做的。
The reduce task has a small number of copier threads so that it can fetch map outputs in parallel. The default is five threads, but this number can be changed by setting the mapred.reduce.parallel.copies property.
map没结束,reduce的进度就开始有报告,就是这个原因。
As map tasks complete successfully, they notify their parent tasktracker of the status update, which in turn notifies the jobtracker. These notifications are transmitted over the heartbeat communication mechanism described earlier. Therefore, for a given job, the jobtracker knows the mapping between map outputs and tasktrackers. A thread in the reducer periodically asks the jobtracker for map output locations until it has retrieved them all.
Tasktrackers do not delete map outputs from disk as soon as the first reducer has retrieved them, as the reducer may fail. Instead, they wait until they are told to delete them by the jobtracker, which is after the job has completed.
mapping是怎么做的?map跑完的数据要在job完成之后才删除?那中间数据不是很恐怖?
The configuration property io.sort.factor controls the maximum number of streams to merge at once; the default is 10.
据说这个值可以设到100。
The number of files merged in each round is actually more subtle than this example suggests. The goal is to merge the minimum number of files to get to the merge factor for the final round. So if there were 40 files, the merge would not merge 10 files in each of the four rounds to get 4 files. Instead, the first round would merge only 4 files, and the subsequent three rounds would merge the full 10 files. The 4 merged files, and the 6 (as yet unmerged) files make a total of 10 files for the final round.
合并的细节:意思是最后要留下十个文件,那就10个文件合并三次,剩下10个文件+3个大文件,所以第一次就先合并4个文件,可以减少写磁盘的数据。
using exponential backoff
类似QQ的超时重连,每次重连的时间间隔越来越长。
The general principle is to give the shuffle as much memory as possible. However, there is a trade-off, in that you need to make sure that your map and reduce functions get enough memory to operate. This is why it is best to write your map and reduce functions to use as little memory as possible—certainly they should not use an unbounded amount of memory (by avoiding accumulating values in a map, for example).
map和reduce函数应该尽量少用内存
The amount of memory given to the JVMs in which the map and reduce tasks run is set by the mapred.child.java.opts property.
多大啊?
More generally, Hadoop’s uses a buffer size of 4 KB by default, which is low, so you should increase this across the cluster (by setting io.file.buffer.size, see also “Other Hadoop Properties” on page 264).
有空试试!
Speculative execution is an optimization, not a feature to make jobs run more reliably. If there are bugs that sometimes cause a task to hang or slow down, then relying on speculative execution to avoid these problems is unwise, and won’t work reliably, since the same bugs are likely to affect the speculative task. You should fix the bug so that the task doesn’t hang or slow down. mapred.reduce.tasks.speculative.execution
我也很烦这个,关掉它。
The property for controlling task JVM reuse is mapred.job.reuse.jvm.num.tasks: it specifies the maximum number of tasks to run for a given job for each JVM launched, the default is 1 (see Table 6-4). Tasks from different jobs are always run in separate JVMs. If the property is set to –1, there is no limit to the number of tasks from the same job that may share a JVM. The method setNumTasksToExecutePerJvm() on JobConf can also be used to configure this property.
共享JVM能不能减少内存使用呢?
Tasks that are CPU-bound may also benefit from task JVM reuse by taking advantage of runtime optimizations applied by the HotSpot JVM. After running for a while, the HotSpot JVM builds up enough information to detect performance-critical sections in the code, and dynamically translates the Java byte codes of these hot spots into native machine code. This works well for long-running processes, but JVMs that run for seconds or a few minutes may not gain the full benefit of HotSpot. In these cases, it is worth enabling task JVM reuse.
Another place where a shared JVM is useful is for sharing state between the tasks of a job. By storing reference data in a static field, tasks get rapid access to the shared data.
共享JVM,CPU密集型的程序有优势。
Skipping mode is off by default; you enable it independently for map and reduce tasks using the SkipBadRecords class. It’s important to note that skipping mode can detect only one bad record per task attempt, so this mechanism is appropriate only for detecting occasional bad records (a few per task, say). You may need to increase the maximum number of task attempts (via mapred.map.max.attempts and mapred.reduce.max.attempts) to give skipping mode enough attempts to detect and skip all the bad records in an input split.
Skipping模式的启动。对于跑任务来说,这个选项非常重要。
分享到:
相关推荐
本来是想直接扒一扒MapReduce的工作原理,但是觉得只是图解或者文字描述,没有Demo的运行体验总是无趣的,一遍走下来也没有什么成就感,因此还是要撸一撸代码的。 那么谈到MapReduce的工作原理,我们的Demo首选自然...
本节笔记主要介绍了 Hadoop.MapReduce 和 YARN 的基本概念、组成部分、工作原理以及实践应用。 一、MapReduce 概念 MapReduce 是 Hadoop 的核心组件之一,负责处理大规模数据。MapReduce 是一种编程模型,用于处理...
下面将详细阐述MapReduce的基本概念、编程规范、运行模式以及通过一个具体的案例——WordCount来进一步理解其工作原理。 **MapReduce基本概念** 1. **Map阶段**:Map阶段是数据处理的并行部分,它将输入数据分割成...
MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。它将复杂的、大规模的任务分解为一系列较小的可并行处理...理解MapReduce的工作原理和编程规范对于开发和优化大数据处理应用至关重要。
Hive的优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计。延迟较高,使得Hive适合用于离线的数据分析和清洗工作。 Hive与Hbase的区别: * Hive是基于Hadoop的大数据仓库工具,可以将结构化的数据...
利用语言学原理对信息进行处理和分析,如自然语言处理(NLP)技术,可应用于文本挖掘、情感分析、机器翻译等。 #### 3.4 机器学习 机器学习是指计算机利用数据进行学习和模式识别的过程,以便做出决策或预测。包括...
MapReduceV2的工作机制可以分为map阶段、shuffle阶段和reduce阶段。在map阶段,系统将输入数据划分成小的数据块(分片),然后对每个分片执行map函数,该函数将数据解析为键值对的形式。之后,Shuffle阶段负责将map...
本笔记基于林子雨老师在MOOC上的《大数据技术原理》课程,旨在为IT从业者和大学生提供一个全面了解大数据的基础框架。 首先,我们要认识到大数据的发展背景。随着互联网的普及,以及物联网、社交媒体、移动设备等...
这个开发笔记可能记录了如何利用Hadoop的生态系统,如HDFS(Hadoop Distributed File System)进行数据存储,以及MapReduce或YARN进行数据处理。 【描述】虽然描述部分重复了标题,但我们可以推断,"hadoopusic-...
【尚硅谷大数据技术之Hadoop(MapReduce)1】...随着技术的发展,如Spark等更先进的计算框架出现,它们在一定程度上弥补了MapReduce的不足,但MapReduce作为大数据处理的基础,仍然是理解大数据处理原理的关键部分。
1. **Hadoop基础**:介绍Hadoop的基本概念,包括其设计目标、架构和工作原理,以及HDFS的分布式文件存储机制。 2. **MapReduce编程**:讲解MapReduce模型的工作流程,如何编写Map和Reduce函数,以及如何处理键值对...
根据提供的文件信息,我们可以归纳出以下关键知识点,这些知识点涵盖了分布式系统...以上总结涵盖了L1导论——笔记.pptx文件中的主要内容,包括分布式系统的基本概念、面临的挑战、课程安排以及MapReduce的工作原理等。
在Hadoop框架中,WordCount是经典的入门示例,用于演示MapReduce的基本工作原理。程序的整体运行流程分为两个主要阶段:map阶段和reduce阶段。 1. Map阶段: - 输入:通常是文本文件,每行被视为一个输入记录。 -...
【标题】"传智黑马赵星老师hadoop七天课程资料笔记-第一天(全)" 提供的是一份关于Hadoop的全面学习资料,主要聚焦在Hadoop的第一天课程内容。这个资源包涵盖了从基础概念到实际操作的多个方面,旨在帮助初学者快速...
Hadoop架构分析之集群结构分析,Hadoop架构分析之HDFS架构分析,Hadoop架构分析之NN和DN原生文档解读,Hadoop MapReduce原理之流程图.Hadoop MapReduce原理之核心类Job和ResourceManager解读.Hadoop MapReduce原理之...
它的核心组件包括Hadoop分布式文件系统(HDFS)和MapReduce计算模型。本文将深入探讨HDFS的基本原理和操作,以及如何通过Java API进行文件操作。 Hadoop分布式文件系统(HDFS)是Google文件系统(GFS)的一种实现,...
- **基本概念**: 讲解了MapReduce的基本概念和工作原理,以及它如何与Hadoop生态系统其他部分相互作用。 - **编程指南**: 提供了一套详细的编程指南,包括如何定义Map和Reduce函数,以及如何控制数据的输入和输出...
5. **原理**:理解 Hadoop 的内部工作原理对于高效使用该框架至关重要。 - **HDFS 架构**:包括 NameNode(元数据管理)、DataNode(实际数据存储)和 SecondaryNameNode(辅助 NameNode,帮助检查 NameNode 的状态...
Hadoop架构分析之集群结构分析,Hadoop架构分析之HDFS架构分析,Hadoop架构分析之NN和DN原生文档解读,Hadoop MapReduce原理之流程图.Hadoop MapReduce原理之核心类Job和ResourceManager解读.Hadoop MapReduce原理之...