正常情况下,我们都是启动Hadoop任务的方式大概就是通过hadoop jar命令(或者写在shell中),事实上运行的hadoop就是一个包装的.sh,下面就是其中的最后一行,表示在其中执行一个java命令,调用hadoop的一些主类,同时配置一些hadoop的相关CLASSPATH,OPTS等选项:
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
当使用hadoop jar时,调用的$CLASS是下面的类型:
org.apache.hadoop.util.RunJar
而通过hadoop jar调用的主类,必须满足条件:
1,其中有main方法,类似下面的定义:
public static void main(String[] args) throws Exception { int result = ToolRunner.run(new ThisClass(), args); System.exit(result); }
2. ToolRunner中的的类需要有如下签名:
extends Configured implements Tool
并实现其中的public int run方法,在进行必要的hadoop job构造后,执行job的方法,同步等待执行结果并返回即可。
boolean success = job2.waitForCompletion(true);
大体的过程如下,以前也没有对整个过程进行质疑,直到我们有新的需要,在其他的客户端(java,而不是shell中)启动MapReduce任务,顺带好好看了这个函数waitForCompletion...
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
读完源码后发现,其实这个方法主要的目的就是看一下当前job的状态,如果没有提交,那么就执行submit操作(同步)将其提交到集群上。传递的参数verbose,如果是true,就是表示需要检测并打印job的相关信息(使用LOG.info()来打印到console中);否则,就等待任务的complete,反正这是个同步的操作;我们如果不需要监测任务的执行状态,仅仅进行一步submit就可以了。
那么就看一下monitorAndPrintJob这个函数吧,核心代码如下:
while (!isComplete() || !reportedAfterCompletion) { if (isComplete()) { reportedAfterCompletion = true; } else { Thread.sleep(progMonitorPollIntervalMillis); } if (status.getState() == JobStatus.State.PREP) { continue; } if (!reportedUberMode) { reportedUberMode = true; LOG.info("Job " + jobId + " running in uber mode : " + isUber()); } String report = (" map " + StringUtils.formatPercent(mapProgress(), 0)+ " reduce " + StringUtils.formatPercent(reduceProgress(), 0)); if (!report.equals(lastReport)) { LOG.info(report); lastReport = report; } TaskCompletionEvent[] events = getTaskCompletionEvents(eventCounter, 10); eventCounter += events.length; printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); } boolean success = isSuccessful(); if (success) { LOG.info("Job " + jobId + " completed successfully"); } else { LOG.info("Job " + jobId + " failed with state " + status.getState() + " due to: " + status.getFailureInfo()); } Counters counters = getCounters(); if (counters != null) { LOG.info(counters.toString()); } return success;
其实就是定时循环去报告,检查状态,其中涉及到map和reduce的总体进度(通过某种算法计算出来的百分比),如果报告与上一次有变化,就进行输出。直到任务执行完成,并将其中的所有Counter均打印出来;如果任务失败,打印出任务执行失败的原因。
最终,MapReduce的执行日志大概就是这个样子:
15/04/13 15:01:08 INFO mapreduce.Job: map 96% reduce 28% 15/04/13 15:01:09 INFO mapreduce.Job: map 98% reduce 28% 15/04/13 15:01:10 INFO mapreduce.Job: map 98% reduce 32% 15/04/13 15:01:13 INFO mapreduce.Job: map 100% reduce 33% 15/04/13 15:01:16 INFO mapreduce.Job: map 100% reduce 37% 15/04/13 15:01:19 INFO mapreduce.Job: map 100% reduce 46% 15/04/13 15:01:22 INFO mapreduce.Job: map 100% reduce 54% 15/04/13 15:01:25 INFO mapreduce.Job: map 100% reduce 62% 15/04/13 15:01:28 INFO mapreduce.Job: map 100% reduce 68% 15/04/13 15:01:31 INFO mapreduce.Job: map 100% reduce 71% 15/04/13 15:01:34 INFO mapreduce.Job: map 100% reduce 76% 15/04/13 15:01:35 INFO mapreduce.Job: map 100% reduce 100% 15/04/13 15:01:37 INFO mapreduce.Job: Job job_1421455790417_222365 completed successfully 15/04/13 15:01:37 INFO mapreduce.Job: Counters: 46 File System Counters FILE: Number of bytes read=70894655 FILE: Number of bytes written=158829484 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=5151416348 HDFS: Number of bytes written=78309 HDFS: Number of read operations=1091 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0
如果我们需要将任务执行进度打印出来,就可以对这部分的功能就行改进并重写。
如果任务已经提交到集群,可以使用job对象的getTrackingURL()通过页面的形式查看到其具体详情,其中job对象还提供了一些可以操作集群任务的API,包括killTask, failTask等。
在任务执行完成后,就可以得到任务的所有Counter,使用Counter来对任务的各项指标进行详细统计是非常易用有效的方式,我们在任务中定义了大量的Counter来进行该操作(包括以后以后可能会评估任务的消耗,以便进行费用统计等…)。
如果需要启动多个任务,或以某种依赖的方式启动多个顺序MapReduce任务,可以使用JobControl来链接多个任务,JobControl的run方法,会根据任务的依赖关系来调度整个过程,并提供了一些常用的API,同样可以将任务kill/fail掉。但是如果流程的复杂性稍微比较高的情况下,建议使用一套工作流系统,例如oozie,便于管理以及应对流程上的变化。
相关推荐
"Hadoop MapReduce HelloWorld 能调试" 的主题意味着我们将深入理解如何设置、运行以及调试 MapReduce 任务,特别是针对入门级的 "wordcount" 示例。 MapReduce 分为两个主要阶段:Map 阶段和 Reduce 阶段。Map ...
4. **更好的容错性**:通过重新启动失败的任务和容器,YARN提供了更高的容错性。 5. **优化的调度器**:YARN支持多种调度策略,如FIFO、Capacity Scheduler和Fair Scheduler,以满足不同工作负载的需求。 本书...
(2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...
Hadoop MapReduce框架提供了丰富的功能和优化,如任务调度、容错处理、数据本地化等。任务调度器根据可用资源动态地分配任务,确保高效利用集群资源。如果某个任务失败,系统能够自动检测并重新启动该任务,确保作业...
- Hadoop MapReduce 能够智能地将计算任务调度到存储数据的节点上,减少网络传输延迟。 - 这种设计使得 MapReduce 能够有效地利用集群内的网络资源。 **2. 容错机制** - 当某个任务失败时,JobTracker 会自动...
4. **执行Hadoop MapReduce任务**:使用`bin/hadoop jar`命令执行jar包中的主类,例如`triangle.triangle`,并提供输入和输出目录。这一步骤会在Hadoop集群上启动MapReduce作业。 实验中还涉及到了Hadoop自带的`...
【标题】:“Hadoop MapReduce实现基于ItemCF的协同过滤物品推荐系统” 在这个项目中,我们探讨了如何利用Hadoop MapReduce框架来构建一个基于Item-Based Collaborative Filtering(ItemCF)的物品推荐系统。这是一...
通过运行一些简单的测试来检查Hadoop集群是否正确配置并能够正常工作,例如启动HDFS和MapReduce服务,然后尝试上传一些文件到HDFS,并执行一些简单的MapReduce任务。 #### 三、MapReduce开发 **3.1 Hadoop Eclipse...
学习这个项目,你将了解如何使用Hadoop MapReduce编写网络爬虫,如何设置和运行Hadoop作业,以及如何处理和分析抓取到的数据。这对于想要进入大数据处理或AI领域的开发者来说,是一个很好的实践项目。同时,这也为你...
这通常涉及到安装Java运行环境,配置Hadoop环境变量,并启动Hadoop的各个服务,如NameNode、DataNode和ResourceManager等。 4. 配置Hadoop:在Windows上设置Hadoop时,需要修改配置文件如`core-site.xml`、`hdfs-...
在实际应用中,我们通过编写Hadoop MapReduce程序对上游提供的源数据文件进行预处理,主要包括编码转换、数据去重、添加时间戳、数据清洗等一系列操作,最终生成可供后续分析使用的贴源ODS层数据。然而,在一段时间...
在大数据处理领域,Hadoop MapReduce和YARN是两个至关重要的组件,它们构成了Apache Hadoop生态系统的核心部分。MapReduce是一种编程模型,用于处理和生成大规模数据集,而YARN(Yet Another Resource Negotiator)...
例如,在Hadoop集群中,可以设置两个NameNode用于高可用性,多个DataNode用于存储数据,多个ResourceManager实现资源调度的高可用,以及多个NodeManager负责节点上的资源管理和任务执行。 2. 域名和IP关系是集群...
### hadoop MapReduce编程教程知识点概述 #### 一、MapReduce基本概念 - **MapReduce**:这是一种分布式计算模型,由Google提出,并被Apache Hadoop所实现。它主要用于大规模数据集(多TB甚至PB级别)的并行处理,...
这里执行的命令是启动一个Hadoop MapReduce作业,使用的是名为“max.jar”的可执行JAR文件。这个JAR文件包含了我们编写的MapReduce程序,用于查找最高气温。执行命令时,'upload.MaxTemperature'是主类名,指示...
总结来说,Hadoop MapReduce通过客户端提交任务,JobTracker分配任务,TaskTracker执行任务,以及Child执行map和reduce操作,实现了分布式数据处理。这种模型充分利用了集群资源,能高效地处理海量数据。然而,由于...
- **任务执行**: TaskTracker 启动 Map 或 Reduce 任务,执行具体的计算逻辑。 - **错误处理**: 包括 JobTracker、TaskTracker 和 Task 的故障恢复机制。 ##### 3.2 作业提交流程 - **编写 MR 程序**: 用户需要实现...
用户通过提交作业来启动MapReduce任务。作业由一系列的Map任务和Reduce任务组成。 2. **Map阶段** - **输入格式化(InputFormat)**:首先,InputFormat会根据输入文件的类型(如文本文件或数据库表)确定如何读取...