1. 客户端
1)Job.java
用户编辑好的MapReduce程序会通过Job.waitForCompletion(true)提交任务。
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
// 通过submit()方法提交Job
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
//在循环中不断得到此任务的状态,并打印到客户端的Console中
monitorAndPrintJob();
} else {
...
}
return isSuccessful();
}
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
// 确认Job状态
ensureState(JobState.DEFINE);
// 使用YARN的API
setUseNewAPI();
// 初始化用于获取MapReduce程序状态的Cluster,该类会去配置文件中加载是将job提交到Yarn还是JobTracker中
connect();
// 构建用于提交Job的JobSubmitter
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
// 更新Job状态
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
2) JobSubmitter.java
由Job提交的任务将由JobSubmitter接管,在提交前进行一些检查和准备工作
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 检查Job的output格式是否符合要求
checkSpecs(job);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
job.getConfiguration());
// 确认提交Job时使用的参数是否正确,比如运行该Job的host等
Configuration conf = job.getConfiguration();
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
// 从ClientPotocol的实现类获得当前job的id
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
// 将该job任务运行所在的程序的jar包、所要处理的input split信息以及配置项写入HDFS
try {
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// 写入Split信息
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// 写入Job提交的queue
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// 删除用于提供用户访问的Token
TokenCache.cleanUpTokenReferral(conf);
// 写入配置信息
writeConf(conf, submitJobFile);
printTokens(jobId, job.getCredentials());
// 调用ClientProtocol的submitJob方法提交Job
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
...
}
3) YARNRunner.java
在Yarn框架中,将由实现了ClientPotocol的YARNRunner类接管submitJob工作,检查运行条件并初始化运行MRAppMaster的相关信息,向ResourceManager申请Container运行MRAppMaster
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
// 查看是否开启HistoryServer,如果开启则设置相关信息
MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
if (hsProxy != null) {
if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
Token hsDT = getDelegationTokenFromHS(hsProxy, new Text(
conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
ts.addToken(hsDT.getService(), hsDT);
}
}
// 写入Job相关文件的相关信息:文件大小等
Path applicationTokensFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
try {
ts.writeTokenStorageFile(applicationTokensFile, conf);
} catch (IOException e) {
throw new YarnException(e);
}
/* 初始化Application上下文
ApplicationId
ApplicationName
Queue:Application将被提交到的队列
Priority:Application的优先级
User:运行MRAppMaster的用户
AMContainerSpec:运行ApplicationMaster的Container的信息
ContainerId
User:运行MRAppMaster的用户
Resource:ResourceManager分配给该MRAppMaster的资源
ContainerToken:Security模式下的SecurityTokens
LocalResources:MRAppMaster所在的jar包、Job的配置文件、Job程序所在的jar包、每个Split的相关信息等
ServiceData:
Environment:运行MRAppMaster的ClassPath以及其他的环境便令
Commands:运行MRAppMaster的Command,如:$JAVA_HOME/bin/java MRAppMaster.class.getName() ...
ApplicationACLs:MRAppMaster的访问控制列表
*/
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// 由ResourceMgrDelegate提交给ResourceManager
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
// 由ResourceMgrDelegate获取MRAppMaster的运行信息
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
...
return clientCache.getClient(jobId).getJobStatus(jobId);
}
4) ResourceMgrDelegate.java
ResourceMgrDelegate负责和ResourceManager的通信,并向ResourceManager提交启动ApplicationMaster(MRAppMaster)
// 连接ResourceManager
public ResourceMgrDelegate(YarnConfiguration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf);
InetSocketAddress rmAddress =
NetUtils.createSocketAddr(this.conf.get(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS),
YarnConfiguration.DEFAULT_RM_PORT,
YarnConfiguration.RM_ADDRESS);
this.rmAddress = rmAddress.toString();
LOG.debug("Connecting to ResourceManager at " + rmAddress);
applicationsManager =
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
rmAddress, this.conf);
LOG.debug("Connected to ResourceManager at " + rmAddress);
}
// 获取一个Application
public JobID getNewJobID() throws IOException, InterruptedException {
GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
applicationId = applicationsManager.getNewApplication(request).getApplicationId();
// TypeConverter类用于对Yarn格式的Job信息和旧版本Hadoop的Job信息进行转换,比如JobId、ApplicationId、TaskId等
return TypeConverter.fromYarn(applicationId);
}
// 将Application提交到ResourceManager,ResourceManager将分配Container运行MRAppMaster
public ApplicationId submitApplication(
ApplicationSubmissionContext appContext)
throws IOException {
appContext.setApplicationId(applicationId);
SubmitApplicationRequest request =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
applicationsManager.submitApplication(request);
LOG.info("Submitted application " + applicationId + " to ResourceManager" +
" at " + rmAddress);
return applicationId;
}
// 向ResourceManager询问Application运行的信息
public ApplicationReport getApplicationReport(ApplicationId appId)
throws YarnRemoteException {
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId);
GetApplicationReportResponse response = applicationsManager
.getApplicationReport(request);
ApplicationReport applicationReport = response.getApplicationReport();
return applicationReport;
}
至此Client端(即提交Job的机器)的工作结束,接下来将由ResourceManager接管,分配Container运行MRAppMaster
2. Server端(ResourceManager)
To be continued...
分享到:
相关推荐
YARN(MRv2)搭建
yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:150) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56) ... Caused by...
flink-hadoop-compatibility_2.12-1.7.1.jar javax.ws.rs-api-2.0.1.jar jersey-common-2.27.jar ...Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
赠送jar包:hadoop-yarn-client-2.7.3.jar; 赠送原API文档:hadoop-yarn-client-2.7.3-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.7.3.pom;...
赠送jar包:hadoop-yarn-client-2.5.1.jar; 赠送原API文档:hadoop-yarn-client-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.5.1.pom;...
Spark 初始化源码阅读 Spark on YARN 的 Client 和 Cluster 区别 Spark 是一个大数据处理的开源框架,它可以在多种集群管理器上运行,如 YARN、Mesos 和 Standalone。Spark on YARN 是 Spark 在 YARN 集群管理器上...
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start(); GetApplicationsRequestProto request = GetApplicationsRequestProto.getDefaultInstance(); ...
#### Client端Application提交流程源码分析 Client端提交作业的过程涉及多个关键技术和步骤,主要包括: 1. **使用Google的ProtoBuf技术**:用于序列化和反序列化数据。ProtoBuf是一种轻量级的数据交换格式,类似...
2. **锁定文件**:Yarn 使用 `yarn.lock` 文件来确保所有开发者的环境中安装的包版本一致,解决了 npm 的版本漂移问题,提高了项目的可预测性和可重复性。 3. **并行下载**:Yarn 能够并行下载依赖包,显著提升了...
在YARN/K8S上部署Flink,意味着Flink应用可以无缝接入大数据生态系统,并利用YARN/K8S的资源管理能力,实现资源的有效调度和高可用性部署。同时,这种集成也为大数据处理提供了更加灵活的运行环境,用户可以根据自己...
2. 支持多计算模型:YARN通过引入ApplicationMaster抽象,可以支持除MapReduce之外的多种计算模型,例如Spark, Tez, Flink等。因此,YARN不仅仅局限于批处理作业,还可以支持交互式、迭代式作业以及数据流处理。 在...
hadoop-yarn-api-python-client 适用于ApacheHadoop:registered:YARN API的Python客户端 软件包文档: REST API文档: 兼容性库与Apache Hadoop 3.2.1兼容。 如果您使用的版本不是上述版本(或类似Hortonworks的供应...
YARN是Hadoop 2.x版本引入的重要更新,它的出现旨在解决Hadoop MapReduce v1在资源管理和任务调度上的局限性。YARN的核心理念是将集群资源管理和应用程序执行分离,从而提高了系统的资源利用率和并发处理能力。 ...
YARN的WEB框架解析包括了对YARN WEB界面代码的分析以及它提供的一些功能,如集群信息的查看、资源使用情况的监控等。 总之,YARN作为Hadoop生态系统中的重要组件,通过它的设计和代码实现了对大规模分布式计算资源...
赠送jar包:hadoop-yarn-client-2.5.1.jar; 赠送原API文档:hadoop-yarn-client-2.5.1-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.5.1.pom;...
要实现 Yarn 获取 Application 列表编码,需要使用 Yarn 客户端 API,首先需要创建一个 YarnClient 对象,然后使用该对象的 getApplications() 方法获取当前 Hadoop 集群中的应用程序列表。每个应用程序都对应一个 ...
Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...