reduce task启动后的第一阶段是shuffle(向map端fetch数据),每次fetch数据的时候都可能因为connect timeout,read timeout,checksum error等原因时报,因而reduce task为每个map设置了一个计数器,用以记录fetch该map输出时失败的次数,当失败次数达到一定阀值的时候。会通知MRAppMaster 从该map fetch数据时失败的次数太多了,并打印想要的log;
该阀值计算方式:
org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.java
float failureRate = runningReduceTasks == 0 ? 1.0f :
(float) fetchFailures / runningReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
boolean isMapFaulty =
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map");
job.eventHandler.handle(new TaskAttemptEvent(mapId,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
job.fetchFailuresMapping.remove(mapId);
}
默认的阀值是3,
//The maximum fraction of fetch failures allowed for a map
private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
// Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
最终的日志信息是在
org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl.TooManyFetchFailureTransition类中打印出来的
private static class TooManyFetchFailureTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
//add to diagnostic
taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
//set the finish time
taskAttempt.setFinishTime();
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptState.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
}
}
分享到:
相关推荐
hadoop-mapreduce-examples-2.7.1.jar
hadoop-mapreduce-examples-2.6.5.jar 官方案例源码
大数据分析课程设计后端大数据分析MapReduce程序和sql脚本.zip 95分以上必过项目,下载即用无需修改。 大数据分析课程设计后端大数据分析MapReduce程序和sql脚本.zip 95分以上必过项目,下载即用无需修改。大数据...
KMean算法在MapReduce上的并行化.rar KMean算法在MapReduce上的并行化.rar KMean算法在MapReduce上的并行化.rar KMean算法在MapReduce上的并行化.rar KMean算法在MapReduce上的并行化.rar
Ch6-MapReduce算法设计.ppt.ppt
hadoop中的demo,wordcount列子用到的JAR包 用法: # 在容器里运行WordCount程序,该程序需要2个参数...hadoop jar hadoop-mapreduce-examples-2.7.1-sources.jar org.apache.hadoop.examples.WordCount input output
8. **错误处理和调试**:了解如何在MapReduce作业中处理各种错误情况,以及如何使用GAE的日志和监控工具进行问题排查。 通过学习和应用这些知识点,开发者可以充分利用Google App Engine的MapReduce库,有效地处理...
17. mapreduce.job.max.map.failures 定义在作业失败之前,Map任务可以失败的最大次数。 18. mapreduce.job.reducer.preempt.delay.sec 定义Reduce任务抢占前的等待时间(以秒为单位)。 19. mapreduce.job.max....
【SpringBoot】Error: Could not find or load main class org.apache.hadoop.mapreduce.v2.app.MRAppMaster报错明细问题解决后记 报错明细 IDEA SpringBoot集成hadoop运行环境,,本地启动项目,GET请求接口触发...
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public WordCount() { } public static void main(String...
《Packtpub.Hadoop.MapReduce.Cookbook.Jan.2013》是2013年出版的一本专门探讨Hadoop MapReduce技术的实战指南。这本书深入浅出地介绍了如何利用Hadoop MapReduce框架来处理大数据问题,是IT行业中针对大数据处理的...
藏经阁-基于E-MapReduce梨视频推荐系统.pdf
### MapReduce技术详解 #### 一、MapReduce概述 **MapReduce** 是一种重要的分布式计算模型,它由谷歌公司提出并广泛应用于大规模数据处理场景。该模型的核心在于将大规模数据集分割成小块,通过一系列的映射(Map)...
用java的MapReduce写了个demo,用于计算文档单词出现个数
《Google MapReduce中文版》是关于分布式计算框架MapReduce的一份详细指南,主要针对的是Google开发的这个核心技术。MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行计算,它极大地简化了在大规模集群...