`
bupt04406
  • 浏览: 347365 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hadoop debug 以及ReduceTask的部分分析

 
阅读更多
分析基于hadoop-0.19.2
MapTask和ReduceTask的入口是
org.apache.hadoop.mapred.Child.main(String[] args){ }
传入的args举例如下:
//args = [127.0.0.1, 57354, attempt_201107272049_0001_m_000003_0, 497563501]
//args = [127.0.0.1, 41819, attempt_201107280338_0001_r_000000_1, -2017927773]
    String host = args[0];  // ip
    int port = Integer.parseInt(args[1]); //端口号
    InetSocketAddress address = new InetSocketAddress(host, port); //用于子进程跟TaskTracker通信用,接口是TaskUmbilicalProtocol。使用如下:
        TaskUmbilicalProtocol umbilical =
      (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
          TaskUmbilicalProtocol.versionID,
          address,
          defaultConf);
      attempt_201107272049_0001_m_000003_0
      // 201107272049_0001用于确定JobId,JobId=jobId = job_201107272049_0001
      // m 说明是Map,r说明是Reduce。
      //  其他id 还不清楚。
    

org.apache.hadoop.mapred.Child.main 被org.apache.hadoop.mapred.TaskRunner调用、启动,具体见TaskRunner的run方法。
在TaskRunner.run方法里:
String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
        vargs.add(javaOptsSplit[i]);
}
默认运行map和reduce的子进程是200m。
mapred.child.java.opts这个属性是只有运行map和reduce的子进程用,所以我们可以传入一下其他参数,例如:
在hadoop-site.xml里加入:
<property>
  <name>mapred.child.java.opts</name>
  <value>-Xmx200m -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y</value>
</property>
这样子进程JVM运行的时候就会在8000端口监听,可以通过Eclipse远程debug进去,跟踪MapTask和ReduceTask的运行。
还需要改下其他参数比较好:
<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>1</value>
  <description>The maximum number of map tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>1</value>
  <description>The maximum number of reduce tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

<property>
  <name>mapred.task.timeout</name>
  <value>600000</value>
  <description>The number of milliseconds before a task will be
  terminated if it neither reads an input, writes an output, nor
  updates its status string.
  </description>
</property>
因为我是在自己的虚拟机上启动的一个hadoop集群,所有的进程都运行在一个机器上,任何时候都只能有一个进程在8000端口监听。所以设置一下同时最后只有一个 map或者一个reduce调度运行,mapred.task.timeout默认是10分钟超时,如果运行map或者reduce的子进程没有向TaskTracker上报信息或者心跳,就会被kill,debug过程需要的时间比较长,可以一个线程一个线程debug.


运行ReduceTask:
org.apache.hadoop.mapred.Child.main(String[] args){
    Task task = null;
    JvmTask myTask = umbilical.getTask(jvmId); // 子进程通过TaskUmbilicalProtocol从父进程TaskTracker获得一个Task的信息。
    task = myTask.getTask();    
    task.run(job, umbilical);             // run the task
}
Task  task 这个Task运行map时是MapTask,在运行reduce时是ReduceTask.
ReduceTask.run(JobConf job, final TaskUmbilicalProtocol umbilical){

    // start thread that will handle communication with parent
    startCommunicationThread(umbilical);

    if (!isLocal) {
      reduceCopier = new ReduceCopier(umbilical, job);
      if (!reduceCopier.fetchOutputs()) {
      }
    }
   
    copyPhase.complete();                         // copy is already complete

    RawKeyValueIterator rIter = isLocal
      ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
          job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
          !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
          new Path(getTaskID().toString()), job.getOutputKeyComparator(),
          reporter)
      : reduceCopier.createKVIterator(job, rfs, reporter);
   
    sortPhase.complete();                         // sort is complete
  
    // make output collector
    String finalName = getOutputName(getPartition());

    FileSystem fs = FileSystem.get(job);

    final RecordWriter out =
      job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); 
   
    OutputCollector collector = new OutputCollector() {
        public void collect(Object key, Object value)
          throws IOException {
          out.write(key, value);
          reduceOutputCounter.increment(1);
          // indicate that progress update needs to be sent
          reporter.progress();
        }
      };
      // collector 直接往HDFS写。

      // apply reduce function
      Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
      // 这里的Reducer就是实现的Reducer类,如用hive提交的job就是org.apache.hadoop.hive.ql.exec.ExecReducer
      
      while (values.more()) { //最主要的循环处理
        reduceInputKeyCounter.increment(1);
        reducer.reduce(values.getKey(), values, collector, reporter);//对数据进行reduce
        if(incrProcCount) {
          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
        }
        values.nextKey();
        values.informReduceProgress();
      }

      done(umbilical); //reduce处理完了通过umbilical.done(getTaskID());通知TaskTracker。
}


new的线程:按时间顺序
(1)在Child.main  71行中创建线程Thread t = new Thread() 用于同步日志:
        //every so often wake up and syncLogs so that we can track
        //logs of the currently running task
(2)在ReduceTask.run方法开始运行时调用startCommunicationThread,见Task的startCommunicationThread(final TaskUmbilicalProtocol umbilical)方法,new了一个pingProgressThread线程,该线程检查ReduceTask是否有状态如进度有变化,如有就向TaskTracker上报,如没有向TaskTracker发送心跳,保活。
(3)copy阶段有多种多个线程:
   主要是在reduceCopier.fetchOutputs()中调用。
  (3.1)MapOutputCopier,copying threads默认5个
  (3.2)LocalFSMerger,on-disk-merge thread线程,1个,merge磁盘上的文件
  (3.3)InMemFSMergeThread,in memory merger thread,1个,merge内存的文件
  一些线程的角色和分工是:
  主线程从TaskTracker获得MapTask完成的事件信息,构造获得Map输出结果的URL放入队列mapLocations。TaskTracker是从JobTracker获得MapTask已完成的信息。
    主线程遍历mapLocations把这些任务移到队列scheduledCopies中。
    每个MapOutputCopier互斥的从队列scheduledCopies获取任务,如果没有就等待在队列上,如果获得一个通过http请求从TaskTracker获得那部分map output。
    LocalFSMerger把MapOutputCopier获取过来放到disk上的map output进行归并。
    InMemFSMergeThread合并内存上的map output。



Map:
(1)Read (reading map inputs)
(2)Map (map function processing)
(3)Collect (serializing to buff er and partitioning)
(4)Spill (sorting, combining, compressing, and writing map outputs to local disk)
(5)Merge (merging sorted spill files)
Reduce:
(1)Shuffle (transferring map outputs to reduce tasks, with decompression if needed)
(2)Merge (merging sorted map outputs)
(3)Reduce (reduce function processing)
(4)Write(writing reduce outputs to the distributed fi le-system)
分享到:
评论

相关推荐

    hadoop map-reduce turorial

    通过对源代码的详细分析和实际运行测试,用户可以更深入地理解Hadoop Map-Reduce的工作原理,掌握更高级的数据处理技巧。 总之,Hadoop Map-Reduce框架不仅为大数据处理提供了强大的工具,同时也为用户提供了丰富的...

    远程调用执行Hadoop Map/Reduce

    例如,`org.apache.hadoop.mapred.MapTask`和`org.apache.hadoop.mapreduce.ReduceTask`分别对应Map和Reduce任务的实现,开发者可以通过阅读这些源码了解任务执行的详细流程。 7. **工具集成**:有许多开源工具可以...

    Hadoop Map-Reduce教程

    ### Hadoop Map-Reduce 教程 #### 一、Hadoop Map-Reduce 概述 Hadoop Map-Reduce 是一种编程模型,用于处理大规模数据集(通常为TB级或以上)。这种模型支持分布式计算,可以在成百上千台计算机上运行。Map-...

    基于Hadoop Hive健身馆可视化分析平台项目源码+数据库文件.zip

    基于Hadoop Hive健身馆可视化分析平台项目源码+数据库文件.zip启动方式 环境启动 hadoop hive2元数据库 sql导入 导入hivesql脚本,修改application.yml 启动主程序 HadoopApplication 基于Hadoop Hive健身馆可视化...

    基于Hadoop的电影影评数据分析

    总结来说,【基于Hadoop的电影影评数据分析】项目是大数据技术在生活娱乐领域的应用实例,它涵盖了Hadoop环境的搭建、MapReduce编程模型的运用以及大数据分析的实践。通过这个项目,学生可以深入理解大数据处理流程...

    基于Hadoop网站流量日志数据分析系统.zip

    基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...

    Hadoop之外卖订单数据分析系统

    本文将深入探讨“Hadoop之外卖订单数据分析系统”,并介绍如何利用Hadoop进行大规模数据处理,以及如何将分析结果通过可视化手段进行展示。 首先,我们需要理解Hadoop的核心组件:HDFS(Hadoop Distributed File ...

    基于Hadoop数据分析系统设计(需求分析).docx

    总的来说,设计一个基于Hadoop的数据分析系统涉及到多个环节,从需求分析到系统设计,再到具体的部署和优化,每个步骤都需要细致考虑和精心实施。通过这样的系统,企业能够高效地处理和分析海量数据,从而获取有价值...

    基于Hadoop豆瓣电影数据分析实验报告

    【基于Hadoop豆瓣电影数据分析实验报告】 在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    在大数据处理领域,Hadoop和MapReduce是两个...然而,为了充分发挥其潜力,理解Hadoop的分布式原理以及如何优化Join策略至关重要。在实际应用中,需要根据数据规模、数据分布以及特定业务需求来选择最合适的Join方法。

    基于Hadoop的成绩分析系统.docx

    在编码实现部分,本文介绍了使用MapReduce实现成绩分析的过程,包括初始数据的处理、计算每门课程的平均成绩、最高成绩和最低成绩,以及计算每门课程学生的平均成绩等。同时,也介绍了如何计算每门课程当中出现了...

    Hadoop源代码分析(完整版).pdf

    Hadoop 的源代码分析可以分为三个部分:HDFS、MapReduce 和其他组件。 HDFS HDFS 是 Hadoop 的核心组件之一,是一个分布式文件系统。HDFS 的主要功能是提供一个高可靠、高可扩展的文件系统,可以存储大量的数据。...

    使用hadoop进行天气数据分析.zip

    使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop...

    Hadoop豆瓣电影数据分析(Hadoop)操作源码

    Hadoop豆瓣电影数据分析(Hadoop)操作源码

    基于Hadoop的豆瓣电影影评数据分析(word文档)

    标题中的“基于Hadoop的豆瓣电影影评...综上所述,这个项目涵盖了Hadoop的基本原理、大数据处理流程、文本分析技术、情感分析以及用户行为挖掘等多个方面的知识,对于理解和实践大数据在实际生活中的应用具有重要意义。

    基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip

    基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip基于Hadoop框架的豆瓣大数据分析与搜索系统源码.zip基于...

    基于Hadoop的网站流量日志数据分析系统项目源码+教程.zip

    基于Hadoop网站流量日志数据分析系统项目源码+教程.zip网站流量日志数据分析系统 典型的离线流数据分析系统 技术分析 hadoop nginx flume hive sqoop mysql springboot+mybatisplus+vcharts 基于Hadoop网站流量日志...

Global site tag (gtag.js) - Google Analytics