`
qianshangding
  • 浏览: 129177 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Hadoop之推测执行

 
阅读更多

最近在测试环境跑任务,有一部分任务出现如下情况:


推测执行(Speculative Execution)是指在集群环境下运行MapReduce,可能是程序Bug,负载不均或者其他的一些问题,导致在一个JOB下的多个TASK速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,根据木桶原理,这些任务将成为整个JOB的短板,如果集群启动了推测执行,这时为了最大限度的提高短板,Hadoop会为该task启动备份任务,让speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果,并且在运行完成后Kill掉另外一个任务。


推测执行(Speculative Execution)是通过利用更多的资源来换取时间的一种优化策略,但是在资源很紧张的情况下,推测执行也不一定能带来时间上的优化,假设在测试环境中,DataNode总的内存空间是40G,每个Task可申请的内存设置为1G,现在有一个任务的输入数据为5G,HDFS分片为128M,这样Map Task的个数就40个,基本占满了所有的DataNode节点,如果还因为每些Map Task运行过慢,启动了Speculative Task,这样就可能会影响到Reduce Task的执行了,影响了Reduce的执行,自然而然就使整个JOB的执行时间延长。所以是否启用推测执行,如果能根据资源情况来决定,如果在资源本身就不够的情况下,还要跑推测执行的任务,这样会导致后续启动的任务无法获取到资源,以导致无法执行。

默认的推测执行器是:org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator,如果要改变推测执行的策略,可以按照这个类重写,继承org.apache.hadoop.service.AbstractService,实现org.apache.hadoop.mapreduce.v2.app.speculate.Speculator接口。

DefaultSpeculator构造方法:

  public DefaultSpeculator
      (Configuration conf, AppContext context,
       TaskRuntimeEstimator estimator, Clock clock) {
    super(DefaultSpeculator.class.getName());

    this.conf = conf;
    this.context = context;
    this.estimator = estimator;
    this.clock = clock;
    this.eventHandler = context.getEventHandler();
    this.soonestRetryAfterNoSpeculate =
        conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
                MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE);
    this.soonestRetryAfterSpeculate =
        conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE,
                MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE);
    this.proportionRunningTasksSpeculatable =
        conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS,
                MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS);
    this.proportionTotalTasksSpeculatable =
        conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS,
                MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS);
    this.minimumAllowedSpeculativeTasks =
        conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS,
                MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS);
  }


mapreduce.map.speculative:如果为true则Map Task可以推测执行,即一个Map Task可以启动Speculative Task运行并行执行,该Speculative Task与原始Task同时处理同一份数据,谁先处理完,则将谁的结果作为最终结果。默认为true。

mapreduce.reduce.speculative:同上,默认值为true。

mapreduce.job.speculative.speculative-cap-running-tasks:能够推测重跑正在运行任务(单个JOB)的百分之几,默认是:0.1

mapreduce.job.speculative.speculative-cap-total-tasks:能够推测重跑全部任务(单个JOB)的百分之几,默认是:0.01

mapreduce.job.speculative.minimum-allowed-tasks:可以推测重新执行允许的最小任务数。默认是:10

首先,mapreduce.job.speculative.minimum-allowed-tasks和mapreduce.job.speculative.speculative-cap-total-tasks * 总任务数,取最大值。

然后,拿到上一步的值和mapreduce.job.speculative.speculative-cap-running-tasks * 正在运行的任务数,取最大值,该值就是猜测执行的运行的任务数

mapreduce.job.speculative.retry-after-no-speculate:等待时间(毫秒)做下一轮的猜测,如果没有任务,推测在这一轮。默认:1000(ms)

mapreduce.job.speculative.retry-after-speculate:等待时间(毫秒)做下一轮的猜测,如果有任务推测在这一轮。默认:15000(ms)

mapreduce.job.speculative.slowtaskthreshold:标准差,任务的平均进展率必须低于所有正在运行任务的平均值才会被认为是太慢的任务,默认值:1.0

启动服务:

  @Override
  protected void serviceStart() throws Exception {
    Runnable speculationBackgroundCore
        = new Runnable() {
            @Override
            public void run() {
              while (!stopped && !Thread.currentThread().isInterrupted()) {
                long backgroundRunStartTime = clock.getTime();
                try {
                  //计算推测,会根据Map和Reduce的任务类型,遍历mapContainerNeeds和reduceContainerNeeds,满足条件则启动推测任务。
                  int speculations = computeSpeculations(); 
                  long mininumRecomp
                      = speculations > 0 ? soonestRetryAfterSpeculate
                                         : soonestRetryAfterNoSpeculate;

                  long wait = Math.max(mininumRecomp,
                        clock.getTime() - backgroundRunStartTime);

                  if (speculations > 0) {
                    LOG.info("We launched " + speculations
                        + " speculations.  Sleeping " + wait + " milliseconds.");
                  }

                  Object pollResult
                      = scanControl.poll(wait, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                  if (!stopped) {
                    LOG.error("Background thread returning, interrupted", e);
                  }
                  return;
                }
              }
            }
          };
    speculationBackgroundThread = new Thread
        (speculationBackgroundCore, "DefaultSpeculator background processing");
    speculationBackgroundThread.start();

    super.serviceStart();
  }

最后我们看看源码,是如何启动一个推测任务的:

private int maybeScheduleASpeculation(TaskType type) {
    int successes = 0;

    long now = clock.getTime();

    ConcurrentMap<JobId, AtomicInteger> containerNeeds
        = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;

    for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {//遍历所有的JOB
      // This race conditon is okay.  If we skip a speculation attempt we
      //  should have tried because the event that lowers the number of
      //  containers needed to zero hasn't come through, it will next time.
      // Also, if we miss the fact that the number of containers needed was
      //  zero but increased due to a failure it's not too bad to launch one
      //  container prematurely.
      if (jobEntry.getValue().get() > 0) {
        continue;
      }

      int numberSpeculationsAlready = 0;
      int numberRunningTasks = 0;

      // loop through the tasks of the kind
      Job job = context.getJob(jobEntry.getKey());

      Map<TaskId, Task> tasks = job.getTasks(type);//获取JOB的task

      int numberAllowedSpeculativeTasks
          = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
                           PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());//上面有介绍

      TaskId bestTaskID = null;
      long bestSpeculationValue = -1L;

      // this loop is potentially pricey.
      // TODO track the tasks that are potentially worth looking at
      for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {//遍历所有任务
        long mySpeculationValue = speculationValue(taskEntry.getKey(), now);//获取推测值

        if (mySpeculationValue == ALREADY_SPECULATING) {
          ++numberSpeculationsAlready;
        }

        if (mySpeculationValue != NOT_RUNNING) {
          ++numberRunningTasks;
        }

        if (mySpeculationValue > bestSpeculationValue) {
          bestTaskID = taskEntry.getKey();
          bestSpeculationValue = mySpeculationValue;
        }
      }
      numberAllowedSpeculativeTasks
          = (int) Math.max(numberAllowedSpeculativeTasks,
                           PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);

      // If we found a speculation target, fire it off
      if (bestTaskID != null
          && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {//允许的个数大于准备推测执行的个数,就开始创建推测运行任务
        addSpeculativeAttempt(bestTaskID);//发送一个T_ADD_SPEC_ATTEMPT事件,启动另外一个任务。
        ++successes;
      }
    }

    return successes;
  }



private long speculationValue(TaskId taskID, long now) {
    Job job = context.getJob(taskID.getJobId());
    Task task = job.getTask(taskID);
    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
    long acceptableRuntime = Long.MIN_VALUE;
    long result = Long.MIN_VALUE;

    if (!mayHaveSpeculated.contains(taskID)) {//是否包含在推测运行的SET中
      acceptableRuntime = estimator.thresholdRuntime(taskID);//运行的阀值
      if (acceptableRuntime == Long.MAX_VALUE) {
        return ON_SCHEDULE;
      }
    }

    TaskAttemptId runningTaskAttemptID = null;

    int numberRunningAttempts = 0;

    for (TaskAttempt taskAttempt : attempts.values()) {
      if (taskAttempt.getState() == TaskAttemptState.RUNNING
          || taskAttempt.getState() == TaskAttemptState.STARTING) {//任务在运行状态下,或开始状态下
        if (++numberRunningAttempts > 1) {//重试超过一次的,直接返回,则numberSpeculationsAlready的值加1
          return ALREADY_SPECULATING;
        }
        runningTaskAttemptID = taskAttempt.getID();

        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);//估算的运行时间

        long taskAttemptStartTime
            = estimator.attemptEnrolledTime(runningTaskAttemptID);//任务的开始时间
        if (taskAttemptStartTime > now) {
          // This background process ran before we could process the task
          //  attempt status change that chronicles the attempt start
          return TOO_NEW;
        }

        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;//估算的运行时间+任务的开始时间,等于完成时间

        long estimatedReplacementEndTime
            = now + estimator.estimatedNewAttemptRuntime(taskID);//新开启一个任务的完成时间

        float progress = taskAttempt.getProgress();
        TaskAttemptHistoryStatistics data =
            runningTaskAttemptStatistics.get(runningTaskAttemptID);
        if (data == null) {
          runningTaskAttemptStatistics.put(runningTaskAttemptID,
            new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
        } else {
          if (estimatedRunTime == data.getEstimatedRunTime()
              && progress == data.getProgress()) {
            // Previous stats are same as same stats
            if (data.notHeartbeatedInAWhile(now)) {
              // Stats have stagnated for a while, simulate heart-beat.
              TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
              taskAttemptStatus.id = runningTaskAttemptID;
              taskAttemptStatus.progress = progress;
              taskAttemptStatus.taskState = taskAttempt.getState();
              // Now simulate the heart-beat
              handleAttempt(taskAttemptStatus);
            }
          } else {
            // Stats have changed - update our data structure
            data.setEstimatedRunTime(estimatedRunTime);
            data.setProgress(progress);
            data.resetHeartBeatTime(now);
          }
        }

        if (estimatedEndTime < now) {//完成时间小于当前时间
          return PROGRESS_IS_GOOD;
        }

        if (estimatedReplacementEndTime >= estimatedEndTime) {//新开任务的完成时间小于或等于当前时间
          return TOO_LATE_TO_SPECULATE;
        }

        result = estimatedEndTime - estimatedReplacementEndTime;
      }
    }

    // If we are here, there's at most one task attempt.
    if (numberRunningAttempts == 0) {//任务没有运行
      return NOT_RUNNING;
    }



    if (acceptableRuntime == Long.MIN_VALUE) {
      acceptableRuntime = estimator.thresholdRuntime(taskID);
      if (acceptableRuntime == Long.MAX_VALUE) {
        return ON_SCHEDULE;
      }
    }

    return result;
  }
DefaultSpeculator依赖于一个执行时间估算器,默认采用了LegacyTaskRuntimeEstimator,此外,MRv2还提供了另外一个实现:ExponentiallySmoothedTaskRuntimeEstimator,该实现采用了平滑算法对结果进行平滑处理。

分享到:
评论

相关推荐

    基于Hadoop的研究及性能分析 (2).pdf

    Hadoop推测执行算法的性能优于SALS推测执行算法,这是因为Hadoop推测执行算法可以根据实际情况选择合适的执行方式,以提高执行效率。 7. 基于DistributedCache的改进算法 为了提高性能,我们提出了基于...

    基于Hadoop的研究及性能分析.pdf

    本文将详细介绍Hadoop和MapReduce的工作原理,并讨论Hadoop推测执行算法和SALS推测执行算法的性能分析。 Hadoop的工作原理 Hadoop是一个分布式计算框架,采用Master-Slave架构,由一个NameNode和多个DataNode组成。...

    【Yarn篇02】Yarn任务推测执行1

    Yarn任务推测执行是一种优化Hadoop集群性能的策略,其主要目标是缩短整个作业的完成时间,通过预判并启动备份任务来替换那些执行缓慢的任务。下面将详细讲解Yarn任务推测执行的工作原理、前提条件以及适用场景。 1....

    基于Hadoop的研究及性能分析.docx

    本文首先介绍了 Hadoop 及其核心技术 MapReduce 的工作原理,详细讨论了 Hadoop 推测执行算法和 SALS 推测执行算法,并对它们的性能进行分析。最后,分析了 MapReduce 框架的通用二路连接算法 RSJ。为了提高性能,...

    各个版本Hadoop,hadoop.dll以及winutils.exe文件下载大合集

    `winutils.exe`通常位于Hadoop安装目录的`bin`子目录下,对于配置Hadoop环境和执行Hadoop命令至关重要。 在压缩包`winutils-master`中,我们可以推测这可能是一个包含`winutils.exe`源码的项目,或者是对Windows...

    运行hadoop jar

    描述中提到的链接可能是一个博客文章,虽然内容未给出,但我们可以推测它可能包含了如何使用`hadoop jar`命令的详细步骤。通常,这个过程包括以下步骤: 1. **编写Java代码**:首先,你需要编写处理Hadoop数据的...

    Hadoop任务调度器

    每个作业包含若干个任务,任务是实际运行的实体,任务执行时又分为不同的尝试(Attempt),包括正常的尝试、重试尝试以及推测执行的尝试。 如果要编写自己的Hadoop调度器,需要深入了解Hadoop调度器的接口和扩展...

    基于Hadoop的研究及分析性能

    文中提到了两种推测执行算法:Hadoop推测执行算法和SALS(Selective Aggressive Load Shedding)推测执行算法。这两种算法都旨在通过智能地推测和替换慢速任务来优化集群资源的利用。 【DistributedCache优化】 ...

    hadoop2.7.6 win10 x64本地编译结果

    压缩包子文件的文件名称列表虽为 "新建文件夹",但根据上下文,我们可以推测这个压缩包可能包含了`bin`目录下的所有文件,即Hadoop的可执行脚本和其他相关配置文件。这些文件对于在Windows 10环境下运行和管理Hadoop...

    Hadoop Share

    在描述中提到了一个博客链接,虽然具体内容没有给出,但我们可以推测这可能是博主对于Hadoop使用、配置或者优化的见解。 在标签中,“源码”表明内容可能涉及到Hadoop的源代码分析,这对于深入理解Hadoop的工作原理...

    hadoop-2.6.0-cdh5.14.2.tar.gz

    2. 配置环境变量,使系统能够找到Hadoop的可执行文件。 3. 修改Hadoop配置文件,如 `core-site.xml`(核心配置)、`hdfs-site.xml`(HDFS配置)、`yarn-site.xml`(YARN配置)以及`mapred-site.xml`(MapReduce配置...

    hadoop完整安装流程

    【Hadoop 完整安装流程】是一篇针对新手的指南,详细介绍了如何一步步安装Hadoop,文中虽然没有提供具体的步骤,但我们可以从标题和描述中推测出安装Hadoop的重要性,尤其是对于想要进入大数据领域的人来说。Hadoop...

    hadoop 配置项的调优

    2. **mapred.map.tasks.speculative.execution** 和 **mapred.reduce.tasks.speculative.execution**:这两个参数控制是否启用推测执行。当某些任务由于硬件或负载问题执行较慢时,推测执行会启动新的任务副本,加快...

    8天Hadoop大数据

    虽然没有具体的文件名,但可以推测其中可能包括了各个主题的视频讲座、PPT课件、示例代码、阅读材料等,这些内容将按照8天的学习计划进行组织,每天可能涵盖一个或多个关键知识点,例如Hadoop的安装与配置、HDFS的...

    Hadoop权威指南 第二版(中文版)

     推测式执行  重用JVM  跳过坏记录  任务执行环境 第7章 MapReduce的类型与格式  MapReduce的类型  默认的MapReduce作业  输入格式  输入分片与记录  文本输入  二进制输入  多种输入  数据库输入(和...

    hadoop shell操作与程式开发

    【描述】提供的链接指向了一个博客文章,虽然具体内容未给出,但可以推测博主可能分享了个人对Hadoop操作和开发的经验,包括可能的实战案例或技巧。 【标签】"源码"和"工具"表明内容可能涉及Hadoop的源代码分析和...

    winutis-master-hadoop.7z

    这是因为在Windows环境下,某些Hadoop工具可能依赖于这个动态链接库文件来执行Hadoop相关的操作。 2. 设置环境变量:环境变量的配置至关重要,尤其是 `%HADOOP_HOME%` 和 `%PATH%`。用户需要将Hadoop安装目录(假设...

Global site tag (gtag.js) - Google Analytics