调度器调用JobTracker.initJob()函数对新作业进行初始化。相关代码如下:
// 调度器调用eagerTaskInitializationListener.start()方法。 class JobQueueTaskScheduler extends TaskScheduler { @Override public synchronized void start() throws IOException { super.start(); ... ... eagerTaskInitializationListener.start(); ... ... } } // EagerTaskInitializationListener.start()方法启动作业管理器线程。 class EagerTaskInitializationListener extends JobInProgressListener { ... ... public void start() throws IOException { this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager"); ... ... this.jobInitManagerThread.start(); } ... ... } // 作业初始化管理器执行作业初始化动作。 class JobInitManager implements Runnable { public void run() { ... ... threadPool.execute(new InitJob(job)); ... ... } } |
作业初始化的主要工作是构造Map Task和Reduce Task并对它们进行初始化。
Hadoop将每个作业分解成4种类型的任务,分别是Setup Task、Map Task、Reduce Task和Cleanup Task。它们的运行时信息由TaskInProgress类维护,因此,创建这些任务实际上是创建TaskInProgress对象。
上述4种任务的作用及创建过程如下。
n Setup Task:作业初始化标识性任务。它进行一些非常简单的作业初始化工作,比如将运行状态设置为“setup”,调用OutputCommitter.setupJob()函数等。该任务运行完后,作业由PREP状态变为RUNNING状态,并开始运行Map Task。该类型任务又被分为Map Setup Task和Reduce Setup Task两种,且每个作业各有一个。它们运行时分别占用一个Map slot和Reduce slot。由于这两种任务功能相同,因此有且只有一个可以获得运行的机会(即只要有一个开始运行,另一个马上被杀掉,而具体哪一个能够运行,取决于当时存在的空闲slot种类及调度策略。相关代码如下:
public class JobInProgress { TaskInProgress setup[] = new TaskInProgress[0]; ... ... public synchronized void initTasks() { ... ... // create two setup tips, one map and one reduce. setup = new TaskInProgress[2]; // setup map tip. This map doesn't use any split. Just assign an empty // split. setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1); setup[0].setJobSetupTask(); // setup reduce tip. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1); setup[1].setJobSetupTask(); ... ... } } |
n Map Task:Map阶段处理数据的任务。其数目及对应的处理数据分片由应用程序中的
InputFormat组件确定。关代码如下:
public class JobInProgress { TaskInProgress maps[] = new TaskInProgress[0]; ... ... public synchronized void initTasks() { // read input splits and create a map per a split TaskSplitMetaInfo[] splits = createSplits(jobId); numMapTasks = splits.length; ... ... maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { inputLength += splits[i].getInputDataLength(); maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i, numSlotsPerMap); } ... ... } } |
n Reduce Task:Reduce阶段处理数据的任务。其数目由用户通过参数mapred.reduce.tasks(默认数目为1)指定。考虑到Reduce Task能否运行依赖于Map Task的输出结果,因此,Hadoop刚开始只会调度Map Task,直到Map Task完成数目达到一定比例(由参数mapred.reduce.slowstart.completed.maps指定,默认是0.05,即5%)后,才开始调度Reduce Task。关代码如下:
public class JobInProgress { TaskInProgress reduces[] = new TaskInProgress[0]; ... ... public synchronized void initTasks() { ... ... // Create reduce tasks this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this, numSlotsPerReduce); nonRunningReduces.add(reduces[i]); } ... ... } |
n Cleanup Task:作业结束标志性任务,主要完成一些清理工作,比如删除作业运行过程中用到的一些临时目录(比如_temporary目录)。一旦该任务运行成功后,作业由RUNNING状态变为SUCCESSED状态。关代码如下:
public class JobInProgress { TaskInProgress cleanup[] = new TaskInProgress[0]; ... ... public synchronized void initTasks() { ... ... // create cleanup two cleanup tips, one map and one reduce. cleanup = new TaskInProgress[2]; // cleanup map tip. This map doesn't use any splits. Just assign an empty // split. TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1); cleanup[0].setJobCleanupTask(); // cleanup reduce tip. cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this, 1); cleanup[1].setJobCleanupTask(); ... ... } |
相关推荐
JobTracker管理整个作业的生命周期,包括初始化、监控和完成,而TaskTracker在工作节点上运行具体的Map和Reduce任务。 为了运行这些MapReduce程序,开发者需要设置Hadoop环境,包括安装Hadoop,配置集群参数,以及...
- 该脚本用于对环境变量进行初始化设置。 - 其中最关键的是定义了`HADOOP_HOME`、`HADOOP_CONF_DIR`和`HADOOP_SLAVES`等环境变量。 - `HADOOP_HOME`指向Hadoop的安装目录。 - `HADOOP_CONF_DIR`指向Hadoop的配置...
Hadoop是大数据处理的核心框架,而Zookeeper则是Hadoop生态系统中的协调服务,用于管理分布式系统中的配置信息、命名服务、集群同步等。 **SSH免密配置** SSH(Secure Shell)是用于在不同节点间安全地进行远程登录...
其中,Hadoop作为大数据领域最为流行的开源框架之一,它通过提供分布式存储与计算解决方案,有效地处理大规模数据集。对于刚接触Hadoop分布式集群的初学者来说,理解并掌握集群的初次启动步骤是学习的第一步。本文将...
此过程会创建HDFS的命名空间并初始化相关的数据结构。 2. **启动Hadoop集群** 格式化完成后,可以启动Hadoop集群。通常,我们使用`start-all.sh`脚本一次性启动所有服务,包括DataNode、NameNode、NodeManager和...
在大数据领域,Hadoop是一个关键的开源框架,用于存储和处理海量数据。Hadoop-3.1.3是Hadoop的稳定版本,提供了许多增强的功能和优化,使其更适合大规模分布式计算环境。在这个针对Linux系统的安装包中,我们将探讨...
7. **Hadoop安装与配置**:学习如何在本地或集群环境下搭建Hadoop环境,包括配置Hadoop的环境变量、初始化HDFS、启动和停止服务等。 8. **数据处理实践**:通过实例演示如何使用Hadoop处理数据,如使用MapReduce...
- Hadoop伪分布式安装包括创建用户、修改主机名与域名映射、配置SSH免密登录、安装Java环境、设置Hadoop配置文件以及初始化和启动集群。 - SSH免密码登录的基本原理涉及到Client与Server的交互,其中Client将自己...
在大数据领域,Hadoop是一个关键的技术,它是一个开源的分布式计算框架,专为处理和存储大规模数据而设计。以下是一些关于Hadoop的面试题及其解析,帮助理解Hadoop的核心组件和工作原理。 1. NameNode管理metadata...
在Map阶段,遗传算法的并行化主要体现在种群的初始化和交叉操作上。每个Map任务负责处理一部分种群,进行独立的随机初始化,然后执行交叉操作生成新的个体。由于Map任务是并行执行的,因此可以显著减少整体的计算...
1. **Hadoop**:Hadoop是Apache软件基金会开发的一个开源框架,用于分布式存储和处理大规模数据。它的主要组成部分包括HDFS(Hadoop Distributed File System)和MapReduce。HDFS提供了高容错性的文件存储,而...
在主节点上执行Hadoop的格式化和启动操作,初始化HDFS和YARN: - `hadoop namenode -format` - 启动Hadoop相关服务,如`start-dfs.sh`和`start-yarn.sh` **12. 验证启动成功** 通过Web界面或命令行检查Hadoop集群...
6. **Hadoop的安装与配置**:实例可能涵盖Hadoop环境的搭建,包括配置Hadoop的环境变量、初始化HDFS、启动MapReduce服务等步骤。 7. **编程接口**:Hadoop提供了Java API来编写MapReduce程序,但也有如Hadoop ...
课件可能会讲解Hadoop的安装与配置步骤,包括分发Hadoop软件、配置环境变量、初始化HDFS和YARN(资源调度器)等。 在Hadoop使用方面,课件可能涵盖以下几个部分: 1. **数据导入**:如何将数据加载到HDFS中,这...
启动Hadoop服务前,还需初始化HDFS文件系统,使用`hadoop namenode -format`命令。然后,通过`start-dfs.sh`和`start-yarn.sh`启动Hadoop的NameNode、DataNode、ResourceManager和NodeManager。使用`jps`命令检查...
8. **格式化NameNode**:在主节点上执行Hadoop的NameNode格式化操作,初始化HDFS文件系统。 9. **启动Hadoop**:分别启动Hadoop的各个服务,包括DataNode、NameNode、ResourceManager、NodeManager等。 10. **验证...
4. **Hadoop安装与配置**:如何在本地或集群环境中设置Hadoop环境,包括环境变量配置、HDFS初始化等。 5. **数据存储**:深入理解HDFS的工作机制,包括数据块、副本、数据读写流程等。 6. **MapReduce编程**:学习...
【大数据作业1】是针对大数据技术的一次实践性学习任务,主要涵盖了Linux操作系统的基本操作以及Hadoop分布式计算框架的使用。本次作业旨在帮助学生熟悉这两个关键领域的基础技能,为后续的深入学习和实际项目实施...
// 初始化连接池 jedisPool = new JedisPool(poolConfig, REDIS_HOST, REDIS_PORT); // 使用示例 Jedis jedis = null; try { jedis = jedisPool.getResource(); // 从连接池获取连接 jedis.set("key", ...