`
brandNewUser
  • 浏览: 456077 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop map端的超时参数

阅读更多

 

 目前集群上某台机器卡住导致出现大量的Map端任务FAIL,当定位到具体的机器上时,无法ssh或进去后terminal中无响应,退出的相关信息如下:

[hadoop@xxx ~]$ Received disconnect from xxx: Timeout, your session not responding.
 
任务执行失败的错误日志:
AttemptID:attempt_1413206225298_24177_m_000001_0 Timed out after 1200 secsContainer killed by the ApplicationMaster. Container killed on request. Exit code is 143
 
经过查找后1200s是配置的参数mapreduce.task.timeout,
关于参数mapreduce.task.timeout的解释:
The number of milliseconds before a task will be terminated if it neither reads an input, writes an output, nor updates its status string. A value of 0 disables the timeout.
 
通过翻hadoop2.2.0的源代码,类TaskHeartbeatHandler会作为一个独立的线程来运行。它会定期去检查当前所有运行的TaskAttempt,时间间隔为:mapreduce.task.timeout.check-interval-ms(默认30s),
Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
          boolean taskTimedOut = (taskTimeOut > 0) &&
              (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
          
          if(taskTimedOut) {
            // task is lost, remove from the list and raise lost event
            iterator.remove();
            eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
                .getKey(), "AttemptID:" + entry.getKey().toString()
                + " Timed out after " + taskTimeOut / 1000 + " secs"));
            eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
                TaskAttemptEventType.TA_TIMED_OUT));
          }
 
如果监测到有一个task_attempt没有在规定的时间间隔内(mapreduce.task.timeout)汇报进度,那么就认为该attempt已经失败,并发送一个TA_TIMED_OUT的Event,通知ApplicationMaster去Kill掉该Attempt。
Attempt的进度会定期报告给该线程,调用progressing方法:
  
public void progressing(TaskAttemptId attemptID) {
  //only put for the registered attempts
    //TODO throw an exception if the task isn't registered.
    ReportTime time = runningAttempts.get(attemptID);
    if(time != null) {
      time.setLastProgress(clock.getTime());
    }
  }
 
 
在TaskAttemptListenerImpl类中会调用报告进度的方法,在任务的不同阶段,都会对任务向ApplicationMaster报告,提交进度信息。更详细的方法这里就不再深入研究。


 
一般情况下,我们的任务都是在运行过程中出现的这个错误,这就需要我们检查哪些资源的限制导致任务无法进行下去而出现这种问题。
在Cloudera中有一篇文章教你如何能够避免这个问题:

 Report progress

If your task reports no progress for 10 minutes (see the mapred.task.timeout property) then it will be killed by Hadoop. Most tasks don’t encounter this situation since they report progress implicitly by reading input and writing output. However, some jobs which don’t process records in this way may fall foul of this behavior and have their tasks killed. Simulations are a good example, since they do a lot of CPU-intensive processing in each map and typically only write the result at the end of the computation. They should be written in such a way as to report progress on a regular basis (more frequently than every 10 minutes). This may be achieved in a number of ways:

  • Call setStatus() on Reporter to set a human-readable description of
    the task’s progress
  • Call incrCounter() on Reporter to increment a user counter
  • Call progress() on Reporter to tell Hadoop that your task is still there (and making progress)

但是,事情还没完,集群中会不定时地有任务卡死在某个点上导致任务无法继续下去:

 

"main" prio=10 tid=0x000000000293f000 nid=0x1e06 runnable [0x0000000041b20000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:81)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x00000006e243c3f0> (a sun.nio.ch.Util$2)
- locked <0x00000006e243c3e0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006e243c1a0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 

 

 

读源码分析这个问题,找到SocketIOWithTimeout类中的doIO方法,157行附近,
/now wait for socket to be ready.
      int count = 0;
      try {
        count = selector.select(channel, ops, timeout); 
      } catch (IOException e) { //unexpected IOException.
        closed = true;
        throw e;
      }

      if (count == 0) {
        throw new SocketTimeoutException(timeoutExceptionString(channel,
                                                                timeout, ops));
      }
 
当经过超时时间,但是却并没有读出任何数据时,抛出错误:
 
Error: java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=xxx remote=/xxx]
    at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:962)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:930)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
 
超时时间通过(dfs.client.socket-timeout)来计算,如果在该时间范围内,没有读到任何的数据,那么就抛出这个异常。
进入SocketIOTimeout.select方法,发现其中会执行一段轮询:
while (true) {
          long start = (timeout == 0) ? 0 : Time.now();
          key = channel.register(info.selector, ops);
          ret = info.selector.select(timeout);         
          if (ret != 0) {
            return ret;
          }
         
          /* Sometimes select() returns 0 much before timeout for
           * unknown reasons. So select again if required.
           */
          if (timeout > 0) {
            timeout -= Time.now() - start;
            if (timeout <= 0) {
              return 0;
            }
          }
         
          if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedIOException("Interruped while waiting for " +
                                             "IO on channel " + channel +
                                             ". " + timeout +
                                             " millis timeout left.");
          }
        }
 
此时由于是读数据,ops一般就是指SelectionKey.OP_READ,我们设置的timeout不等于0,也就是说会执行一段总时间为timeout的任务,”Sometimes select() returns 0 much before timeout for  * unknown reasons. So select again if required.” 这个注释写的有点含糊,看来NIO有些问题当前都没确定清楚。
 
我们看一下方法的介绍:
java.nio.channels.Selector
public abstract int select(long timeout)
                   throws java.io.IOException
Selects a set of keys whose corresponding channels are ready for I/O operations.
This method performs a blocking selection operation. It returns only after at least one channel is selected, this selector's wakeup method is invoked, the current thread is interrupted, or the given timeout period expires, whichever comes first.
 
Selector选择的方法,仅当下面三个事件之一发生的情况下:
  • 至少一个已经注册的Channel被选择,返回的就是被选择的Channel数量;
  • Selector被中断;
  • 给定的超时时间已到;

 

如果被中断了,会抛出中断异常,因此当前仅可能是超时时间已到,返回的ret=0,导致抛出上述的异常。

但是,这也没完,难道超时了不会重试?到底会重试几次?

 

经过继续分析,发现往下的堆栈中的DFSInputStream调用了readBuffer方法,可以看到retryCurrentNode在第一次失败后,将IOException捕获,会进行必要的重试操作,如果还是发生超时,并且找不到就将其加入黑名单作为失败的DataNode(可能下次不会进行重试?),并转移到另外的DataNode上(执行seekToNewSource方法),经过几次后才会将IOException真正抛出。

 
try {
        return reader.doRead(blockReader, off, len, readStatistics);
      } catch ( ChecksumException ce ) {
        DFSClient.LOG.warn("Found Checksum error for "
            + getCurrentBlock() + " from " + currentNode
            + " at " + ce.getPos());       
        ioe = ce;
        retryCurrentNode = false;
        // we want to remember which block replicas we have tried
        addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
            corruptedBlockMap);
      } catch ( IOException e ) {
        if (!retryCurrentNode) {
          DFSClient.LOG.warn("Exception while reading from "
              + getCurrentBlock() + " of " + src + " from "
              + currentNode, e);
        }
        ioe = e;
      }
      boolean sourceFound = false;
      if (retryCurrentNode) {
        /* possibly retry the same node so that transient errors don't
         * result in application level failures (e.g. Datanode could have
         * closed the connection because the client is idle for too long).
         */
        sourceFound = seekToBlockSource(pos);
      } else {
        addToDeadNodes(currentNode);
        sourceFound = seekToNewSource(pos);
      }
      if (!sourceFound) {
        throw ioe;
      }
      retryCurrentNode = false;
    }
 
总之,这部分的问题还是很多,继续研究中...
  • 大小: 133.8 KB
分享到:
评论

相关推荐

    Hadoop企业优化常用的调优参数.docx

    本文将详细讨论Hadoop调优的关键参数,分为资源相关参数、Shuffle性能优化参数以及容错相关参数。 1. **资源相关参数**: - `mapreduce.map.memory.mb` 和 `mapreduce.reduce.memory.mb` 分别设定MapTask和Reduce...

    基于心跳超时机制的Hadoop实时容错技术

    本文探讨了Hadoop实时容错技术,并针对其心跳超时机制进行了改进,提出了公平心跳超时容错机制。在了解该技术之前,首先要了解Hadoop的基本原理及其心跳超时机制的作用。 Hadoop是一个开源的分布式存储与计算平台,...

    hadoop-xml配置

    MapReduce是Hadoop的数据处理模型,分为Map和Reduce两个阶段。`mapreduce.framework.name`设定运行模式,可以是经典的`local`或YARN(`yarn`)。`mapreduce.job.reduces`控制reduce任务的数量,影响数据的并行度和...

    hadoop network

    例如,可以通过调整TCP的缓冲区大小、设置合适的RPC超时时间以及优化网络带宽使用等来改善Hadoop集群的效率。 标签 "源码" 暗示我们需要关注Hadoop的源代码层面。理解Hadoop源码有助于深入学习其内部工作原理,比如...

    hadoop 开发规范

    - **Task任务超时**:调整`hive.exec.max.dynamic.partitions`和`hive.exec.max.dynamic.partitions.pernode`。 - **OutOfMemoryError: Java heap space**:调整JVM参数如`-Xmx`。 #### 四、结论 通过上述内容可以...

    Hadoop MapReduce作业卡死问题的解决方法.docx

    这种情况通常发生在高并发或IO竞争激烈的场景下,需要考虑调整超时时间和容器内存配置等参数。但这个问题并非是导致作业卡死的根本原因。 3. **长时间卡死的reduce任务日志**: - syslog日志中出现的`IOException...

    hadoop 2.9.0 mapred-default.xml 属性集

    Hadoop 2.9.0版本中的mapred-default.xml文件包含了MapReduce作业的配置属性,这些属性定义了MapReduce作业执行过程中的各种行为和参数。下面我们来详细介绍mapred-site.xml文件中的一些关键属性。 1. mapreduce....

    理论部分-MapReduce-hadoop1

    总结,MapReduce模型通过将大数据处理任务分解为可并行执行的Map和Reduce任务,高效地在Hadoop集群中进行分布式计算。JobTracker和TaskTracker共同协调任务的执行,确保作业的正确性和容错性。HDFS作为存储系统,...

    Hadoop公平调度器指南.pdf

    **公平调度器**(Fair Scheduler)是Hadoop中的一种插件式Map/Reduce调度器,它为大规模集群提供了一种有效的资源共享机制。其核心目标是确保随着时间的推移,所有作业都能平均分配到等量的共享资源。 #### 二、...

    大数据技术分享 Hadoop集群监控与Hive高可用方案 共17页.pdf

    phpHiveAdmin是一个基于Web的Hive管理工具,能够实时监控Hive集群的运行状态,提供了详细的监控数据,包括Job提交、Map/Reduce过程、Hive日志等。phpHiveAdmin的优点是界面清晰,安装简单,运行方便,节省Hive操作...

    企业级IT架构分享 云计算架构师成长之路 Hadoop公平调度器指南 共8页.pdf

    ### 企业级IT架构分享:Hadoop公平调度器指南 #### 目的 ...通过对公平调度器的基本配置、参数设置及其实现原理的理解,用户可以根据自身需求灵活地调整调度策略,从而最大程度地发挥Hadoop集群的效能。

    Hadoop集群监控与Hive高可用-向磊.pdf

    - **TCPSession超时问题**:长时间运行的查询可能会导致TCPSession进入CLOSE_WAIT状态,可以通过调整HAProxy中的超时设置来缓解此问题。 - **Hive日志问题**:频繁的健康检查可能会产生大量Hive日志,可以通过增加...

    Hive优化方法整理

    Hive 优化方法整理是 Hive 数据处理过程中的重要步骤,涉及到 Hive 的类 SQL 语句本身进行调优、参数调优、Hadoop 的 HDFS 参数调优和 Map/Reduce 调优等多个方面。 Hive 类 SQL 语句优化 1. 尽量尽早地过滤数据...

    MUS:一种适用于Hadoop新颖的时限约束调度算法

    例如,金融系统中的在线支付需要在几秒钟内完成处理和验证,超时的响应是不可接受的。因此,如何设计能够在Hadoop环境下有效支持实时服务的调度算法,是研究的重点。 MUS算法的提出正是为了解决这一问题。该算法被...

    JavaSparkStreaming-kafkaDemo

    1. **设置和连接**: 首先,我们需要设置Spark和Kafka的连接参数,如Kafka的broker列表、topic名以及Spark的streaming超时时间等。 2. **创建DStream**: 使用`JavaInputDStream`从Kafka创建一个Discretized Stream...

    Google MapReduce(三)

    ### Google MapReduce 架构详解 #### 一、架构概览及核心特点 ...这些技术和思想不仅为Google内部的大规模数据处理提供了强大的支持,也为后来的大数据处理框架如Hadoop MapReduce等奠定了坚实的基础。

    Hbase官方文档

    - 配置Hadoop的核心参数,如HDFS的地址、块大小等。 - 确保Hadoop和HBase版本兼容。 **2.4 运行模式** - **单机模式**:用于开发和测试。 - **分布式模式**:生产环境中使用,需要配置ZooKeeper集群。 **2.5 ...

    hive常见的优化方案ppt

    `mapreduce.map.task.timeout`和`mapreduce.reduce.task.timeout`用于设置任务超时时间。 14. **列裁剪**:Hive会在解析阶段剔除未使用的列,减少数据读取量,`hive.optimize.pruning=true`启用该功能。 15. **...

Global site tag (gtag.js) - Google Analytics