`
ld_hust
  • 浏览: 170274 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

hadoop 学习(转)

阅读更多

Hadoop 的文件系统,最重要是 FileSystem 类,以及它的两个子类 LocalFileSystem 和 DistributedFileSystem。 这里先分析 FileSystem。
抽象类 FileSystem,提高了一系列对文件/目录操作的接口,还有一些辅助方法。分别说明一下:
1. open,create,delete,rename等,非abstract,部分返回 FSDataOutputStream,作为流进行处理。
2. openRaw,createRaw,renameRaw,deleteRaw等,abstract,部分返回 FSInputStream,可以随机访问。
3. lock,release,copyFromLocalFile,moveFromLocalFile,copyToLocalFile 等abstract method,提供便利作用,从方法命名可以看出作用。
特别说明,Hadoop的文件系统,每个文件都有一个checksum,一个crc文件。因此FileSystem里面的部分代码对此进行了特别的处理,比如 rename。
LocalFileSystem 和 DistributedFileSystem,理应对用户透明,这里不多做分析,和 FSDataInputStream,FSInputStream 结合一起说明一下。
查看两个子类的 getFileCacheHints 方法,可以看到 LocalFileSystem 是使用'localhost'来命名,这里暂且估计两个FileSystem都是通过网络进行数据通讯,一个是Internet,一个是Intranet。
LocalFileSystem 里面有两个内部类 LocalFSFileInputStream和LocalFSFileOutputStream,查看代码可以看到它是使用 FileChannel进行操作的。另外 lock和release 两个方法使用了TreeMap来保存文件和对应的锁。
DistributedFileSystem 代码量少于 LocalFileSystem,但是更加复杂,它里面使用了 DFSClient 来进行分布式文件系统的操作:
public DistributedFileSystem(InetSocketAddress namenode, Configuration conf) throws IOException
{
   super(conf);
   this.dfs = new DFSClient(namenode, conf);
   this.name = namenode.getHostName() + ":" + namenode.getPort();
}
DFSClient 类接收一个InetSocketAddress 和Configuration 作为输入,对网络传输细节进行了封装。DistributedFileSystem中绝大多数方法都是调用DFSClient进行处理,它只是一个 Warpper。下面着重分析DFSClient。
DFSClient中,主要使用RPC来进行网络的通讯,而不是直接在内部使用Socket。如果要详细了解传输细节,可以查看 org.apache.hadoop.ipc 这个包里面的3个Class。
DFSClient 中的路径,基本上都是UTF8类型,而非String,在DistributedFileSystem中,通过getPath和getDFSPath来转换,这样做可以保证路径格式的标准和数据传输的一致性。
DFSClient 中的大多数方法,也是直接委托ClientProtocol类型的namenode来执行,这里主要分析其它方法。
LeaseChecker 内部类。一个守护线程,定期对namenode进行renewLease操作,注释说明:
Client programs can cause stateful changes in the NameNode that affect other clients. A client may obtain a file and neither abandon nor complete it. A client might hold a series of locks that prevent other clients from proceeding. Clearly, it would be bad if a client held a bunch of locks that it never gave up. This can happen easily if the client dies unexpectedly. So, the NameNode will revoke the locks and live file-creates for clients that it thinks have died. A client tells the NameNode that it is still alive by periodically calling renewLease(). If a certain amount of time passes since the last call to renewLease(), the NameNode assumes the client has died.
作用是对client进行心跳监测,若client挂掉了,执行解锁操作。
DFSInputStream 和 DFSOutputStream,比LocalFileSystem里面的更为复杂,也是通过 ClientProtocol 进行操作,里面使用到了 org.apache.hadoop.dfs 包中的数据结构,如DataNode,Block等,这里不对这些细节进行分析。

对FileSystem的分析(1)到此结束,个人感觉它的封装还是做的不错的,从Nutch项目分离出来后,比原先更为清晰。 下面就接着进行MapReduce的第二部分分析,从MapReduce如何进行分布式


###############################################################################

之前的MapReduce Demo只能在一台机器上运行,现在是时候让它分布式运行了。在对MapReduce的运行流程和FileSystem进行了简单研究之后,现在尝试从配置着手,看看怎样让Hadoop在两台机器上面同时运行MapReduce。
首先看回这里
   String tracker = conf.get("mapred.job.tracker", "local");
   if ("local".equals(tracker)) {
       this.jobSubmitClient = new LocalJobRunner(conf);
   } else {
       this.jobSubmitClient = (JobSubmissionProtocol)
      RPC.getProxy(JobSubmissionProtocol.class,
                      JobTracker.getAddress(conf), conf);
   }
当tracker地址不为local,则tracker为Remote Client的 JobTracker 类,这里重点分析。
JobTracker有一个main函数,注释显示它仅仅用于调试,正常情况是作为DFS Namenode进程的一部分来运行。不过这里我们可以先从它着手开始分析。
      tracker = new JobTracker(conf); //构造
构造函数先获取一堆常量的值,然后清空'systemDir',接着启动RPC服务器。
       InetSocketAddress addr = getAddress(conf);
       this.localMachine = addr.getHostName();
       this.port = addr.getPort();
       this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
       this.interTrackerServer.start();
启动TrackInfoServer:
       this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
       this.infoServer = new JobTrackerInfoServer(this, infoPort);
       this.infoServer.start();
TrackInfoServer 提供了通过HTTP方式获取JobTracker信息的方式,可以方便用于监测工作任务的进度。
启动三个守护线程:
       new Thread(this.expireTrackers).start();   //Used to expire TaskTrackers that have gone down
       new Thread(this.retireJobs).start();   //Used to remove old finished Jobs that have been around for too long
       new Thread(this.initJobs).start();   //Used to init new jobs that have just been created
三个线程的用处已经注释,这里不作分析。下面开始分析 JobTracker.submitJob()
之前已经分析过 LocalJobRunner.submitJob(),它实例化内部类Job,在里面实现MapReduce流程。JobTracker就复杂一些,它实例化 JobInProgress,然后将这个Job提交到队列:
       JobInProgress job = new JobInProgress(jobFile, this, this.conf);
       synchronized (jobs) {
         synchronized (jobsByArrival) {
            synchronized (jobInitQueue) {
                   jobs.put(job.getProfile().getJobId(), job);
                   jobsByArrival.add(job);
                   jobInitQueue.add(job);
                   jobInitQueue.notifyAll();
            }
         }
       }
此时RetireJobs线程开始处理超时和出错的Job,JobInitThread线程初始化工作任务: job.initTasks();
开始分析 JobInProgress
在构造函数中,Tracker从发起端的DFS获取任务文件(xml和jar),然后保存到本地目录下面
       JobConf default_job_conf = new JobConf(default_conf);
       this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,
         jobid + ".xml");
       this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,
         jobid + ".jar");
       FileSystem fs = FileSystem.get(default_conf);
       fs.copyToLocalFile(new File(jobFile), localJobFile);

       conf = new JobConf(localJobFile);
       this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url,
                                     conf.getJobName());
       String jarFile = conf.getJar();
       if (jarFile != null) {
      fs.copyToLocalFile(new File(jarFile), localJarFile);
      conf.setJar(localJarFile.getCanonicalPath());
       }

这里要注意jarFile,JobConf的构造函数:
   public JobConf(Configuration conf, Class aClass) {
this(conf);
String jar = findContainingJar(aClass);
if (jar != null) {
   setJar(jar);
}
   }
如果 aClass 是在一个jar里面,那么setJar(jar);就会被执行,这个jar会被copy到 LocalJobRunner 或是 JobTracker 的工作目录下面。所以这里有一个原则: 将要执行的MapReduce操作的所有class打包到一个jar中,这样才能执行分布式的MapReduce计算
再看 JobInProgress.initTasks()
先从Jar中加载InputFormat
       String ifClassName = jd.get("mapred.input.format.class");
       InputFormat inputFormat;
       if (ifClassName != null && localJarFile != null) {
      try {
         ClassLoader loader =
             new URLClassLoader(new URL[]{ localJarFile.toURL() });
         Class inputFormatClass = loader.loadClass(ifClassName);
         inputFormat = (InputFormat)inputFormatClass.newInstance();
      } catch (Exception e) {
         throw new IOException(e.toString());
      }
       } else {
      inputFormat = jd.getInputFormat();
       }
接下来对文件块的大小进行排序
创建对应的Map任务
       this.numMapTasks = splits.length;
       // create a map task for each split
       this.maps = new TaskInProgress[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
         maps = new TaskInProgress(jobFile, splits, jobtracker, conf, this);
       }
创建Reduce任务
       this.reduces = new TaskInProgress[numReduceTasks];
       for (int i = 0; i < numReduceTasks; i++) {
         reduces = new TaskInProgress(jobFile, maps, i, jobtracker, conf, this);
       }
最后对于每Split的信息进行缓存,并且创建状态类
       for (int i = 0; i < maps.length; i++) {
         String hints[][] = fs.getFileCacheHints(splits.getFile(), splits.getStart(), splits.getLength());
         cachedHints.put(maps.getTIPId(), hints);
       }

       this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
现在轮到 TaskInProgress,它将Job里面的Map和Reduce操作进行了封装,但是JobInProgress.initTasks()仅仅对task进行了初始化,并没有执行Task,经过一番跟踪,发现Task的执行,是由 TaskTracker 来处理。
TaskTracker,实现了TaskUmbilicalProtocol接口。在之前的文章中,LocalJobRunner的内部类Job也实现了这个接口,这里对比一下:
接口 JobSubmissionProtocol: LocalJobRunner <---> JobTracker
接口 TaskUmbilicalProtocol: LocalJobRunner.Job <---> TaskTracker
下面对TaskTracker进行分析,首先也是从main入口开始。
TaskTracker实现了Runnable,main实例化TaskTracker对象,然后执行run()方法。
在构造函数中,主要进行初始化
       this.mapOutputFile = new MapOutputFile();
       this.mapOutputFile.setConf(conf);
       initialize();
initialize()里面,初始化一些变量值 ,然后初始化RPC服务器:
       while (true) {
         try {
            this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf);
            this.taskReportServer.start();
            break;
         } catch (BindException e) {
            LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");
            this.taskReportPort++;
         }
      
       }
       while (true) {
         try {
            this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);
            this.mapOutputServer.start();
            break;
         } catch (BindException e) {
            LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port");
            this.mapOutputPort++;
         }
       }
mapOutputServer使用一个循环来尝试各个端口绑定。
最后一句
       this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
这里有一个新的接口InterTrackerProtocol,是TaskTracker和中央JobTracker通讯用的协议。通过这个接口, TaskTracker可以用来执行JobTracker中的Task了。接下来分析TaskServer的主流程,run()函数。
run()中, 有两个while循环。在内部while循环里面,执行 offerService() 方法。它里面也是一个while循环,开始几段代码用于JobTracker的心跳监测。接下来,它通过协议接口调用JobTracker,获取Task并执行:
         if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
            Task t = jobClient.pollForNewTask(taskTrackerName);
            if (t != null) {
                   TaskInProgress tip = new TaskInProgress(t, this.fConf);
                   synchronized (this) {
                  tasks.put(t.getTaskId(), tip);
                  if (t.isMapTask()) {
                         mapTotal++;
                  } else {
                         reduceTotal++;
                  }
                  runningTasks.put(t.getTaskId(), tip);
                   }
                   tip.launchTask();
            }
         }
tip.launchTask(); 开始执行这个Task,在方法内部:
         this.runner = task.createRunner(TaskTracker.this);
         this.runner.start();
Task 有两个子类 MapTask和ReduceTask,它们的createRunner()方法都会创建一个TaskRunner的子类,TaskRunner继承Thread,run()方法中:
   String sep = System.getProperty("path.separator");
   File workDir = new File(new File(t.getJobFile()).getParent(), "work");
   workDir.mkdirs();
           
   StringBuffer classPath = new StringBuffer();
   // start with same classpath as parent process
   classPath.append(System.getProperty("java.class.path"));
   classPath.append(sep);
   JobConf job = new JobConf(t.getJobFile());
   String jar = job.getJar();
   if (jar != null) {                   // if jar exists, it into workDir
       unJar(new File(jar), workDir);
       File[] libs = new File(workDir, "lib").listFiles();
       if (libs != null) {
      for (int i = 0; i < libs.length; i++) {
         classPath.append(sep);          // add libs from jar to classpath
         classPath.append(libs);
      }
       }
       classPath.append(sep);
       classPath.append(new File(workDir, "classes"));
       classPath.append(sep);
       classPath.append(workDir);
   }

获取工作目录,获取classpath。然后解压工作任务的jar包。
   //   Build exec child jmv args.
   Vector vargs = new Vector(8);
   File jvm =                               // use same jvm as parent
       new File(new File(System.getProperty("java.home"), "bin"), "java");

   vargs.add(jvm.toString());
   String javaOpts = handleDeprecatedHeapSize(
      job.get("mapred.child.java.opts", "-Xmx200m"),
      job.get("mapred.child.heap.size"));
   javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
   int port = job.getInt("mapred.task.tracker.report.port", 50050) + 1;
   javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
   String [] javaOptsSplit = javaOpts.split(" ");
   for (int i = 0; i < javaOptsSplit.length; i++) {
      vargs.add(javaOptsSplit);
   }

   // Add classpath.
   vargs.add("-classpath");
   vargs.add(classPath.toString());
   // Add main class and its arguments
   vargs.add(TaskTracker.Child.class.getName());   // main of Child
   vargs.add(tracker.taskReportPort + "");        // pass umbilical port
   vargs.add(t.getTaskId());                   // pass task identifier
   // Run java
   runChild((String[])vargs.toArray(new String[0]), workDir);
这里是构造启动Java进程的classpath和其它vm参数,最后在 runChild 中开一个子进程来执行这个Task。感觉够复杂的。
最后分析TaskTracker的内部类Child。它就是上面子进程执行的类。在main函数中
      TaskUmbilicalProtocol umbilical =
         (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
                                             new InetSocketAddress(port), conf);
        
      Task task = umbilical.getTask(taskid);
      JobConf job = new JobConf(task.getJobFile());

      conf.addFinalResource(new File(task.getJobFile()));
可见该子进程也是通过RPC跟TaskTracker进行通讯。
      startPinging(umbilical, taskid);        // start pinging parent
开一个进程,对TaskTracker进行心跳监测。
             String workDir = job.getWorkingDirectory();
             if (workDir != null) {
            FileSystem file_sys = FileSystem.get(job);
            file_sys.setWorkingDirectory(new File(workDir));
             }
             task.run(job, umbilical);           // run the task
这里才真正开始执行Task。

分享到:
评论

相关推荐

    Hadoop学习资料

    以上总结的知识点均来自给定文件的内容,涵盖了Hadoop的学习资料、版本历史、生态圈、安装、HDFS、MapReduce、Zookeeper、HBase、Hive、Storm以及数据挖掘和推荐系统等多个方面,为学习和使用Hadoop提供了全面的理论...

    java及hadoop学习资料

    这份“java及hadoop学习资料”压缩包提供了丰富的学习资源,帮助初学者或进阶者深入理解和掌握这两个领域的核心概念。 Java部分: 1. **Java基础知识**:Java的学习通常从基础语法开始,包括数据类型、变量、控制...

    hadoop学习资料

    与孙老师交流Hadoop学习方法也是一种宝贵的学习机会。在学习过程中遇到问题时,可以向孙老师请教,获取及时的帮助和支持。此外,加入相关的学习社区或论坛,与其他学习者互动交流,也是提高学习效率的有效途径之一。...

    Hadoop学习时间轴

    根据提供的信息,我们可以详细地解析出关于Hadoop学习时间轴中的关键知识点,这些知识点主要集中在Hadoop的基础架构、MapReduce工作原理以及Hive在实际应用中的优化等方面。 ### Hadoop学习时间轴概述 Hadoop是一...

    Python+Spark 2.0+Hadoop机器学习与大数据

    《Python+Spark 2.0+Hadoop机器学习与大数据》是一本深入探讨大数据处理与机器学习技术结合的著作。作者林大贵以其丰富的经验,详细介绍了如何利用Python、Spark 2.0以及Hadoop这一组合来构建高效的数据分析解决方案...

    Hadoop学习笔记.pdf

    在初学者的角度,理解Hadoop的组成部分以及其架构设计是学习Hadoop的基础。 首先,Hadoop的分布式文件系统(HDFS)是其核心组件之一,它具有高吞吐量的数据访问能力,非常适合大规模数据集的存储和处理。HDFS的设计...

    hadoop学习总结(面试必备)

    【Hadoop学习总结(面试必备)】 Hadoop作为大数据处理的核心框架,因其分布式存储和计算的能力,成为业界处理海量数据的首选工具。本总结将深入探讨Hadoop的主要组件、工作原理以及在面试中可能遇到的相关知识点。...

    hadoop学习源码学习

    ### Hadoop源码学习——MapReduce核心解析 #### 一、基本概念 ##### 1.1 MapReduce逻辑过程 在Hadoop中,MapReduce是一种分布式计算框架,用于处理大规模数据集。其工作流程主要包括以下几个阶段: 1. **...

    Hadoop 大数据学习ppt

    这份Hadoop学习课件不仅会讲解每个组件的基本概念和工作原理,还会涉及它们的安装配置、使用方法以及实际案例分析。通过深入学习这些组件,你可以掌握大数据处理的核心技术,为解决实际业务问题打下坚实基础。在实际...

    基于Hadoop的数据仓库Hive学习指南.doc

    【标题】:“基于Hadoop的数据仓库Hive学习指南” 【描述】:该文档是一份针对Hive的学习资料,旨在引导读者理解如何在Hadoop平台上利用Hive进行数据仓库操作和编程实践。它涵盖了Hive的基本概念、安装步骤、实验...

    Hadoop Spark大数据巨量分析与机器学习整合开发实战 ,林大贵

    《Hadoop Spark大数据巨量分析与机器学习整合开发实战》一书由林大贵编著,主要讲解了如何将大数据分析技术和机器学习技术结合起来进行实战开发。本书的重点是Hadoop和Spark这两个在大数据处理领域占据重要地位的...

    hadoop学习假数据

    本主题将深入探讨“hadoop学习假数据”,重点关注movies.dat、ratings.dat和users.dat这三份文件,它们通常用于模拟电影推荐系统中的数据集。 首先,`movies.dat`文件包含了电影的相关信息,如电影ID、电影标题以及...

    hadoop+spark机器学习实例

    通过学习和实践这些示例,你可以了解到如何在Hadoop和Spark环境下搭建和运行机器学习项目,理解如何利用这两个强大的工具进行数据处理、特征提取、模型训练和验证。这不仅有助于提升大数据处理技能,还能为未来的...

    最新Hadoop生态圈开发学习资料——尚硅谷

    本文将深入探讨Hadoop生态圈中的关键组件及其功能,帮助你掌握最新的开发学习资料。 首先,我们从基础开始,Linux是Hadoop运行的基础平台,其稳定性和可扩展性使得它成为大数据处理的理想选择。熟悉Linux操作系统的...

    hadoop大数据基础学习

    ### Hadoop大数据基础学习知识点概览 #### 一、Hadoop简介 1. **定义**: - Hadoop是一个能够对大量数据进行分布式处理的软件框架。 - 它能够可靠地存储和处理PB级别的数据。 2. **背景**: - 随着互联网的...

    hadoop几个实例

    这些工具可能在实例中被提及或用作扩展学习。 6. **Hadoop的安装与配置**:实例可能涵盖Hadoop环境的搭建,包括配置Hadoop的环境变量、初始化HDFS、启动MapReduce服务等步骤。 7. **编程接口**:Hadoop提供了Java ...

    Hadoop2.7.7安装过程

    在安装过程中,我们会遇到各种问题,但通过学习和实践,我们可以克服这些困难,从而深入理解Hadoop及其依赖环境。 首先,安装Hadoop的第一步是创建一个专门的用户账户。在本例中,创建了一个名为"hadoop"的用户,这...

    大数据开发--hadoop全套学习课程--百度网盘

    本"大数据开发--hadoop全套学习课程"涵盖了大数据技术的多个关键组成部分,包括Hadoop2.x及其生态系统中的其他工具,如Hive、HBase、Flume、Storm和Spark。此外,还涉及到NoSQL数据库MongoDB和内存数据存储系统Redis...

Global site tag (gtag.js) - Google Analytics