`
x-rip
  • 浏览: 106930 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Job初始化(一)

 
阅读更多

Job的初始化

1.WordCount.java

  public static void main(String[] args) throws Exception {
  // 初始化Configuration
    Configuration conf = new Configuration();
		--> Configuration.java 默认构造函数执行this(true);即
			  public Configuration(boolean loadDefaults) {
			  // 设定加载默认设置
				this.loadDefaults = loadDefaults;
			  // 保存最近使用最多的key和value
				updatingResource = new HashMap<String, String>();
			  // 将Configuration放入一个WeakHashMap中
				synchronized(Configuration.class) {
				  REGISTRY.put(this, null);
				}
			  }			
	...
  // 初始化Job
    Job job = new Job(conf, "word count");
		--> JobConf.java conf作为构造参数传入,将其封装为JobConf
			  private Credentials credentials = new Credentials(); // 初始化credentials
			  public JobConf(Configuration conf) {
				super(conf);// JobConf继承自Configuration,即将传进来的conf复制到另一个的Configuration中				
				if (conf instanceof JobConf) { // 该条件不符合
					...
				}
				checkAndWarnDeprecation(); // 在conf中获取已经废弃的key,如果存在则向用户警告使用了过期的参数
			  }
			  
		--> JobContextImpl.java Job的构造方法最终调用父类JobContextImpl的构造方法
			  public JobContextImpl(Configuration conf, JobID jobId) {
				if (conf instanceof JobConf) {
				  this.conf = (JobConf)conf; // 初始化JobConf
				} else {
				  this.conf = new JobConf(conf);
				}
				this.jobId = jobId;
				this.credentials = this.conf.getCredentials(); // 将JobConf中初始化的credentials赋给JobContextImpl
				try {
				  this.ugi = UserGroupInformation.getCurrentUser(); // 初始化UGI
				} catch (IOException e) {
				  throw new RuntimeException(e);
				}
			  }
    job.setJarByClass(WordCount.class); 
		--> JobConf.java 该方法会最终调用到JobConf的setJarByClass()
			  public void setJarByClass(Class cls) {
				String jar = findContainingJar(cls); // 通过class名字获取到class所在的jar包
				if (jar != null) {
				  setJar(jar); // 调用Configuration的set(JobContext.JAR, jar);方法
				}   
			  }
    job.setMapperClass(TokenizerMapper.class); // 调用Configuration的setClass();方法
	...
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		--> FileInputFormat.java中的addInputPath方法
			  public static void addInputPath(Job job, 
											  Path path) throws IOException {
				Configuration conf = job.getConfiguration();
				path = path.getFileSystem(conf).makeQualified(path); // 获取文件系统,并补全路径
				String dirStr = StringUtils.escapeString(path.toString());
				String dirs = conf.get(INPUT_DIR);
				conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
			  }  
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }	
2. Job.java
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit(); // 提交Job
		--> Job.java submit()方法
		  public void submit() 
				 throws IOException, InterruptedException, ClassNotFoundException {
			ensureState(JobState.DEFINE);
			setUseNewAPI();
			connect(); // 初始化Cluster类,代理所有Job的操作和JobSubmitter的submitInternal操作(加载yarn-site.xml,mapred-site.xml等,最近的用于ClientProtocolProvider加载服务,通过在xml中指定框架的名字。初始化Cluster需要的UGI),该类用于确定当前的job运行在何种模式(Yarn、Local、JobTracker)通过ServiceLoader<ClientProtocolProvider> frameworkLoader加载ClientProtocolProvider,通过ClientProtocolProvider启动运行的框架(Yarn等等)
			final JobSubmitter submitter = 
				getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
			// JobSubmitter:1、检查input和output 2、计算map数 3、设置DistributeCache即用户Map/Reduce程序运行需要的jar包 4、复制job的相关信息到hdfs上 5、提交Job
			status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
			  public JobStatus run() throws IOException, InterruptedException, 
			  ClassNotFoundException {
				return submitter.submitJobInternal(Job.this, cluster);
			  }
			});
			state = JobState.RUNNING;
			LOG.info("The url to track the job: " + getTrackingURL());
		   }
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
  public boolean monitorAndPrintJob() 
      throws IOException, InterruptedException {
    String lastReport = null; // 记录最近一次的report
    Job.TaskStatusFilter filter;
    Configuration clientConf = getConfiguration();
    filter = Job.getTaskOutputFilter(clientConf);
    JobID jobId = getJobID();
    LOG.info("Running job: " + jobId);
    int eventCounter = 0;
    boolean profiling = getProfileEnabled();
    IntegerRanges mapRanges = getProfileTaskRange(true);
    IntegerRanges reduceRanges = getProfileTaskRange(false);
    int progMonitorPollIntervalMillis = 
      Job.getProgressPollInterval(clientConf);
    /* make sure to report full progress after the job is done */
    boolean reportedAfterCompletion = false;
    boolean reportedUberMode = false;
    while (!isComplete() || !reportedAfterCompletion) {
      if (isComplete()) {
        reportedAfterCompletion = true;
      } else {
        Thread.sleep(progMonitorPollIntervalMillis);
      }
      if (status.getState() == JobStatus.State.PREP) {
        continue;
      }      
      if (!reportedUberMode) {
        reportedUberMode = true;
        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
      }      
      String report = 
        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
            " reduce " + 
            StringUtils.formatPercent(reduceProgress(), 0));
      if (!report.equals(lastReport)) {
        LOG.info(report);
        lastReport = report;
      }

      TaskCompletionEvent[] events = 
        getTaskCompletionEvents(eventCounter, 10); 
      eventCounter += events.length;
      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
    }
    boolean success = isSuccessful();
    if (success) {
      LOG.info("Job " + jobId + " completed successfully");
    } else {
      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
          " due to: " + status.getFailureInfo());
    }
    Counters counters = getCounters();
    if (counters != null) {
      LOG.info(counters.toString());
    }
    return success;
  }
 
分享到:
评论

相关推荐

    quartz-job初始化数据表.zip

    "quartz-job初始化数据表.zip" 文件显然包含了用于设置Quartz作业调度系统的数据库表结构。 这个压缩包可能包含了一系列SQL脚本,用于在数据库中创建必要的表,这些表包括但不限于: 1. **QRTZ_JOB_DETAILS**:此...

    xxl-job的pgsql初始化sql

    在SQL初始化方面,XXL-JOB提供了针对MySQL的数据库脚本,但针对PostgreSQL(pgsql)的初始化脚本可能需要自行获取或构建。 在这个压缩包中,"xxlPgsql"很可能包含了XXL-JOB为PostgreSQL数据库设计的数据表结构和...

    quartz-2.2.3版本的quartz初始化sql语句

    在Quartz 2.2.3版本中,初始化数据库是使用Quartz的关键步骤,因为Quartz依赖于一个持久化存储来保存作业和触发器的信息。这个过程通常涉及执行一系列SQL语句来创建必要的表结构。 Quartz的初始化SQL语句主要用于...

    Kubernetes-高级调度(CronJob、初始化容器InitContainer、污点与容忍、亲和力)

    Kubernetes-高级调度(CronJob、初始化容器InitContainer、污点与容忍、亲和力)

    Go-实践教程用于构建和部署Kubernetes初始化程序

    3. **部署初始化器**:将Go程序部署为一个Deployment或Job,确保它在apiserver能够访问。 4. **测试**:创建一个测试Pod,观察初始化程序是否正确地在Pod启动前执行了预期的任务。 ### 总结 通过这个Go开发-学习...

    xxl-job-2.2.0版本Oracle建表及初始化语句

    xxl-job-2.2.0版本Oracle建表及初始化语句,建表后需要同步修改Mapper文件,增加oracle驱动依赖,修改数据库连接配置。

    xxl-job数据库脚本

    xxl-job数据库脚本

    【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第一节 Configuration和Job对象的初始化

    【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第一节 Configuration和Job对象的初始化前言Configuration和Job对象的初始化后记跳转 前言 不得不说,在此前我对阅读源码这件事是拒绝的,一方面也知道自己非读...

    SqlServer数据库OGG安装部署及数据初始化.docx

    接下来,清除默认的清理作业,使用`sys.sp_cdc_drop_job 'cleanup'`,并用OGG的清理脚本创建新的清理作业,如`ogg_cdc_cleanup_setup.bat createjob ogg ogg dbname (local) ogg2`。 进入OGG的ggsci控制台,创建...

    在mysql中创建 oracle scott 用户的四个表及插入初始化数据

    根据提供的文件信息,本文将详细解释如何在MySQL中创建Oracle Scott用户所使用的四个表,并向这些表中插入初始化数据。 ### 一、创建Emp表 #### 表结构 Emp表用于存储员工信息,包含以下字段: - **empno**:整型...

    xxl-job适配达梦数据库

    1. **下载数据库脚本**:提供的压缩包中应该包含了XXL-JOB针对达梦数据库的初始化脚本。这些脚本用于创建XXL-JOB所需的表结构和索引,例如`xxl_job_info`(任务信息表)、`xxl_job_log`(任务日志表)等。请根据实际...

    Oracle JOB 用法小结

    首先,为了启用JOB队列,需要设置初始化参数`job_queue_processes`。通过SQL语句`alter system set job_queue_processes=n;`设置该参数,其中`n`代表期望的并发作业数量,最大值为1000。要查看当前的作业队列后台...

    xxl-job-2.4.0 适配postgresql

    xxl-job-2.4.0 postgresql初始化脚本

    xxl job源码分析

    三、xxl-job-executors-sample:主要负责执行器的初始化工作,执行器会主动注册到调度中心,并以bean的方式注入用户线下编辑好的任务。 在进行xxl-job源码分析之前,有些预备知识点是必须了解的: 1、quartz的使用...

    xxl-job的可视化 附带sql文件

    在本文中,我们将深入探讨XXL-JOB的核心功能、架构设计以及如何使用附带的SQL文件进行数据库初始化。 XXL-JOB的主要特点包括: 1. 分布式:XXL-JOB支持分布式部署,能有效解决单节点任务调度的性能瓶颈,提高系统...

    ORACLE创建JOB脚本

    2. DBMS_JOB.SUBMIT:提交一个新任务到调度队列,指定执行的PL/SQL代码或存储过程,并设置初始执行时间和间隔。 3. DBMS_JOB.RUN:立即启动一个已经存在的JOB。 4. DBMS_JOB.SCHEDULE:重新安排一个已存在的JOB,...

    quartz-2.3.1.jar和官网发布包及数据库初始化脚本文件及快速获取其他版本方法

    这些SQL脚本用于在不同的数据库系统上创建和初始化Quartz所需的表结构。例如,对于MySQL,你需要运行相应的SQL文件来创建与Quartz相关的表,如QRTZ_TRIGGERS、QRTZ_JOB_DETAILS等。同样,对于Oracle或DB2,也有对应...

    黑马头条初始化工程项目代码

    拥有文章列表查看(freemarker、CDN、ElasticSearch)、热点文章计算(kafka、xxl-job)、CMS自媒体端文章发布审核(延迟队列)、项目部署数据迁移(Hbase、Jenkins、docker)等功能的头条文章系统的初始化工程maven...

    oracle中JOB总结

    这个周期的间隔时间由初始化参数`job_queue_interval`设定。该参数应根据实际环境中的JOB需求进行调整,避免设置得过大导致JOB无法及时执行,或过小增加不必要的系统资源消耗。 在SNP执行JOB的过程中,它首先以JOB...

Global site tag (gtag.js) - Google Analytics