`
kanpiaoxue
  • 浏览: 1789396 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

spring batch(二):核心部分(2)Spring batch的启动

 
阅读更多

 chapter 4、Running batch jobs

 1、Spring Launch API:它的核心就是 JobLauncher 接口。

JobLauncher 的接口:

public interface JobLauncher {
	public JobExecution run(Job job, JobParameters jobParameters) throws (…);
}

 JobLauncher 需要2个参数:Job , JobParameters。

示例:

ApplicationContext context = (...)
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(
	job,
	new JobParametersBuilder()
	.addString("inputFile", "file:./products.txt")
	.addDate("date", new Date())
	.toJobParameters()
);

 job的参数是key-value的键值对的形式。Spring batch提供4种类型的job参数:string,long,double,date

[注意:一个job的参数定义了一个job的实例。而一个job实例有一个或更多个的执行上下文内容。这种执行的上下文,你可以把它看作是“一次尝试执行处理过程”]

一个JobLauncher必须依赖一个Job仓库(jobRepository)

示例:

<batch:job-repository id="jobRepository" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
	<property name="jobRepository" ref="jobRepository" />
</bean>

 JobExecution 接口的示例,提供job的“运行、结束、失败”等状态的查询。

由于Job运行可能消耗非常长的时间,Spring batch同步和异步两种launcher。

异步的情况,用于类似于http请求启动job的情况。如果不是异步的启动job,会耗尽http的线程资源。

指定一个launcher是异步的,只需要给launcher的配置添加一个taskExecutor属性。

示例:

<task:executor id="executor" pool-size="10" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
	<property name="jobRepository" ref="jobRepository" />
	<property name="taskExecutor" ref="executor" />
</bean>

 

一、从命令行启动job

Triggering system ---> Creates JVM process ---> Spring Batch job

CommandLineJobRunner 是Spring batch的启动类。里面包含main函数。

运行CommandLineJobRunner的必要配置有:Spring的XML配置文件,Job,Job参数,程序退出编码。

可以把整个spring batch的项目打包成jar文件,用来运行程序。也可以指定classpath,指定CommandLineJobRunner的包中的路径来运行。

[注意:]CommandLineJobRunner是利用spring的ClassPathXmlApplicationContext来使用启动spring的,需要把spring的XML配置文件放在classpath目录内

java -classpath "./lib/*" org.springframework.batch.core.launch.support.CommandLineJobRunner import-products-job.xml importProductsJob inputFile=file:./products.txt date(date)=2010/12/08

 CommandLineJobRunner 指定参数,是name=value的键值对的形式,即名字后面跟随指定的值。在指定参数的时候,可以指定参数的类型(spring batch的参数类型有:string、long、double、date),形式如下:name(type)=value  如上面的date(date)=2010/12/08 。注意date的格式是yyyy/MM/dd

Type           Java Type                                  Example

String      java.lang.String                          inputFile(string)=products.txt

Date       java.util.Date                               date(date)=2010/12/08

Long       Long                                           timeout(long)=1000

Double    Double                                       delta(double)=20.1

参数的默认类型是string,上面的 inputFile=file:./products.txt 也可以写成 inputFile(string)=file:./products.txt

 

CommandLineJobRunner是拥有运行返回值的。这个返回值默认的是:

 

0 --> The job completed successfully (COMPLETED).
1 --> The job failed (FAILED).
2 --> Used for errors from the command-line job runner—for example,the runner couldn’t find the job in the Spring application context.

 当然spring batch也允许你定制这个返回值来满足不同的需求,只要实现 ExitCodeMapper 接口即可。

示例:

 

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.launch.support.ExitCodeMapper;

public class SkippedAwareExitCodeMapper implements ExitCodeMapper {
	@Override
	public int intValue(String exitCode) {
		if(ExitStatus.COMPLETED.getExitCode().equals(exitCode)) {
			return 0;
		} else if(ExitStatus.FAILED.getExitCode().equals(exitCode)) {
			return 1;
		} else if("COMPLETED WITH SKIPS".equals(exitCode)) {
			return 3;
		} else {
			return 2;
		}
	}
}

 这个SkippedAwareExitCodeMapper类,该如何使用呢?把它生命在spring的XML里面靠近job就可以了,CommandLineJobRunner 会自己感知到它并且使用的。

示例:

 

<bean class="com.manning.sbia.ch04.SkippedAwareExitCodeMapper" />

<job id="importProductsJob" xmlns="http://www.springframework.org/schema/batch">
	(...)
</job>

 

二、Job调度

 (1)、UNIX-like的cron命令

示例:

0 4 * * ? acogoluegnes java -classpath "/usr/local/bin/sb/lib/*" org.springframework.batch.core.launch.support.CommandLineJobRunner import-products-job.xml importProductsJob inputFile=file:/home/sb/import/products.txt date=2010/12/08

 操作系统的cron命令的缺点也很明显,它会为每一个job启动一个java的进程。如果是频繁的启动job,会启动大量的java进程,消耗资源;另外,如果想在一个JVM里面启动多个job,这样job之间就可以进行内部的交互,显然这是cron做不到的。

 (2)、基于java的scheduler

 这里可以使用Quartz框架,还可以使用spring提供的task scheduler。

task scheduler的使用请参考spring reference,里面有详细的介绍。这里给出简单的例子。

java类:

public class SpringSchedulingLauncher {
	private Job job;
	private JobLauncher jobLauncher;
	
	public void launch() throws Exception {
		JobParameters jobParams = createJobParameters();
		jobLauncher.run(job, jobParams);
	}
	
	private JobParameters createJobParameters() {
		(...)
	}
	// --- setter / getter
}

 这里的setter/getter一定要书写,保证spring的XML能够注入属性。

使用spring的task配置调度:

<bean id="springSchedulingLauncher"
	class="com.manning.sbia.ch04.SpringSchedulingLauncher">
		<property name="job" ref="job" />
		<property name="jobLauncher" ref="jobLauncher" />
</bean>

<task:scheduler id="scheduler" />

<task:scheduled-tasks scheduler="scheduler">
	<task:scheduled ref="springSchedulingLauncher" method="launch" fixed-rate="1000" />
</task:scheduled-tasks>

 

三、从web触发job

可以在使用spring的web环境配置job。通过取得http request传递过来的参数,构建JobParameter,通过JobLauncher,为每一个http request请求启动一个job。可以使用Servlet,spring MVC,structs2等框架,进行触发。

还可以在spring的web环境中配置quartz的调度,来是web拥有job的能力,不过这种方法不会响应用户http request的请求。

 

四、优雅地停止job的运行

 谁需要停止job呢?从两个角度来考虑:使用者和开发者。

(1)、使用者可能由于某些原因,需要停止job的运行,比如发现job出现数据错误,或者接到领导的命令,需要停止job的工作。

(2)、开发者。在开发程序的过程中,开发者明确的知道一些业务逻辑需要停止job。比如,一个job运行的时间不能超过早上8点,如果超过这个时间需要停止job的运行,等等的情况。

使用者,如何停止job呢?

    调用JMX或者使用spring batch administration的管理web程序,停止job

在程序中调用JMX或者使用spring batch的JobOperator接口。

也可以在spring batch里面配置简单的JobOperator,

示例:

<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator">
	<property name="jobRepository" ref="jobRepository"/>
	<property name="jobLauncher" ref="jobLauncher" />
	<property name="jobRegistry" ref="jobRegistry" />
	<property name="jobExplorer" ref="jobExplorer" />
</bean>

<batch:job-repository id="jobRepository" data-source="dataSource" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
	<property name="jobRepository" ref="jobRepository" />
</bean>

<bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
	<property name="jobRegistry" ref="jobRegistry" />
</bean>

<bean id="jobRegistry"class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
	<property name="dataSource" ref="dataSource" />
</bean>

<bean class="org.springframework.jmx.export.MBeanExporter">
	<property name="beans">
		<map>
			<entry key="com.manning.sbia:name=jobOperator" value-ref="jobOperator" />
		</map>
	</property>
</bean>

 

 job停止的过程的可能情况:

a、代码中的job运行的之后,检查Thread.currentThread().isInterrupted()的状态,则job会马上停止。

b、job中的程序在运行逻辑代码,只有当这些业务完成之后,程序的管理权交回到spring batch的时候,才会被终止。如果中间的业务运行需要很长的时间,则job不会马上停止。

 

开发者,如何停止job呢?

(1)、运行的过程中抛出一个exception,造成job的停止。

(2)、利用StepExecution来设置一个标识,停止job的运行。

最好的办法是,利用StepExecution来设置一个标识,停止job的运行。

代码:StepExecution.setTerminateOnly()

如何获得StepExecution 呢?

a、在Tasklet接口的方法中有StepExecution参数,可以进行调用。

示例:

public class ProcessItemsTasklet implements Tasklet {
	@Override
	public RepeatStatus execute(StepContribution contribution,
			ChunkContext chunkContext) throws Exception {
		if(shouldStop()) {
			chunkContext.getStepContext().getStepExecution().setTerminateOnly();
		}
		processItem();
		if(moreItemsToProcess()) {
			return RepeatStatus.CONTINUABLE;
		} else {
			return RepeatStatus.FINISHED;
		}
		}
		(...)
}

 

 b、面向“块”的step中,ItemReader, ItemProcessor, 和 ItemWriter 这3个接口中,你会惊奇的发现它们中间没有 StepExecution。那么你该如何在“块”Chunk中获得StepExecution 而停止程序呢?在Listener中,可以找到StepExecution。

示例:

public class StopListener {
	private StepExecution stepExecution;
	@BeforeStep
	public void beforeStep(StepExecution stepExecution) {
		this.stepExecution = stepExecution;
	}
	
	@AfterRead
	public void afterRead() {
		if(stopConditionsMet()) {
			stepExecution.setTerminateOnly();
		}
	}
	(...)
}

 

spring batch 的listener的配置文件:

<bean id="stopListener" class="com.manning.sbia.ch04.stop.StopListener" />

<batch:job id="importProductsJob">
	<batch:step id="importProductsStep">
		<batch:tasklet>
			<batch:chunk reader="reader" writer="writer" commit-interval="100"/>
			<batch:listeners>
				<batch:listener ref="stopListener" />
			</batch:listeners>
		</batch:tasklet>
	</batch:step>
</batch:job>

 

 [备注:] 可以把schedule调度和停止job结合起来。可以定时调用一个程序,来停止job的运行。可以把这个定时停止job的任务和spring batch放在一个spring的上下文中,在一个java进程中运行。

 从业务逻辑的需要出发,停止job的最佳的方式,还是设置stepExecution.setTerminateOnly();这个job停止标识,来让job停止运行。

 

分享到:
评论

相关推荐

    SpringBatch+Spring+Mybatis+MySql (spring batch 使用jar)

    Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。 业务方案: 1、批处理定期提交。 2、并行批处理:并行处理工作。 3、...

    Spring Boot整合Spring Batch,实现批处理

    Spring Boot以其简洁的配置和快速的启动能力深受开发者喜爱,而Spring Batch作为Spring框架的一部分,专注于批量处理任务,提供了强大的数据处理能力和事务管理功能。下面我们将深入探讨这个主题。 首先,**Spring ...

    The Definitive Guide to Spring Batch, 2nd Edition.epub

    Additionally, you’ll discover how Spring Batch 4 takes advantage of Java 9, Spring Framework 5, and the new Spring Boot 2 micro-framework. After reading this book, you’ll be able to use Spring Boot ...

    Spring Batch in Action英文pdf版

    知识点一:Spring Batch的核心概念 Spring Batch框架以处理大量记录和复杂业务逻辑为核心。它提供了一套丰富的特性来处理任务执行、数据读取和写入、事务管理等批处理常见的复杂性。Spring Batch通过批次作业(Batch...

    springbatch 详解PDF附加 全书源码 压缩包

    **二、Spring Batch 的特性** 1. **可重试与跳过错误**: 支持对失败记录的重试,以及在遇到错误时选择跳过某些记录。 2. **事务管理**: 内置支持分布式事务,确保数据的一致性和完整性。 3. **并发处理**: 可以并行...

    bank-spring-batch:具有多处理器的Spring Batch项目

    ItemProcessor是Spring Batch的核心组件之一,它的主要职责是接收一个输入项(Item),对其进行处理,并返回一个新的处理结果。在多处理器场景下,项目可能包含多个ItemProcessor,每个处理器执行不同的转换或业务...

    Spring Batch批处理框架

    由于信息给定的【部分内容】并没有提供实际的技术细节,因此我将基于Spring Batch框架本身,详细介绍它的一些核心知识点。 1. 核心概念 Spring Batch定义了一些核心组件,例如Job(作业)、Step(步骤)、Tasklet和...

    SpringBoot+Batch实现

    spring batch3.x中文文档:http://www.kailing.pub/SpringBatchReference spring batch官方入门实例:https://projects.spring.io/spring-batch/ 简单的任务操作实现,当前项目只是一个demo,我也是初学,目前启动...

    quartz整合springbatch动态集群定时实现mysql参考

    在这个“quartz_springbatch_dynamic”项目中,我们将看到如何将这两个强大的工具结合起来,以实现动态集群环境中的定时任务执行,并使用MySQL作为数据存储。 Quartz是一个开源的作业调度框架,允许开发者创建、...

    基于Spring Batch的大数据量并行处理

    - **领域模型**:Spring Batch定义了一系列核心领域概念,如Job(作业)、Step(步骤)、Chunk(块)、Reader(读取器)、Processor(处理器)和Writer(写入器),这些概念构成了处理数据流的基本单元。 - **应用...

    spring-batch包

    Spring Batch 是一个强大的、全面的批处理框架,用于处理大量数据的处理任务。它由 Spring 框架提供支持,因此具有高度的可配置性和可扩展性,适用于各种企业级应用。Spring Batch 4.0.0 版本是该框架的一个重要版本...

    [原创]Spring Batch 示例程序

    该示例程序采用的是Spring 3作为基础框架,以及Spring Batch 2.2.7版本,这两个组件都是Spring生态系统的重要部分。 **Spring Batch 的核心概念:** 1. **Job**:Job是Spring Batch中的顶级概念,代表一个完整的...

    Spring batch in action

    本书的第二部分深入探讨了Spring Batch的核心概念,包括如何配置批处理作业、运行作业、读取和写入数据、处理数据以及实现可靠的作业。第三部分进一步深入到高级话题,涵盖执行处理、企业集成、监控、扩展以及批量...

    springBoot+springBatch批量处理数据demo

    SpringBoot简化了Spring应用程序的配置和启动过程,而SpringBatch则专注于批处理任务,提供了一套全面且可扩展的解决方案。在这个“springBoot+springBatch批量处理数据demo”中,我们将探讨如何将这两个强大的工具...

    spring-batch分区处理示例

    Spring Batch 是一个强大的Java框架,专门用于处理批量数据处理任务。在Spring Batch中,分区处理是一种优化策略,它将大型工作负载分解成多个较小、独立的任务,这些任务可以在不同的线程或甚至不同的节点上并行...

    Spring Batch In Action

    - **Spring Batch** 的核心优势在于它能够简化复杂的批量数据处理逻辑,并且通过其内置的功能模块来提高系统的可维护性和可扩展性。 #### 知识点二:Spring Batch 的主要概念 - **Job**:在 Spring Batch 中,一个...

    详细spring batch资料

    在本文中,我们将深入探讨Spring Batch的基本概念、核心组件以及如何在实际项目中应用它。 1. **基本概念** - **作业(Job)**:在Spring Batch中,作业是执行批处理任务的顶级概念。它由一系列步骤组成,可以有...

    SpringBatch-DataMigration SpringBatch数据迁移项目

    mybatis、springBatch、mysql、quartz、spring、springMVC 部署说明: 本项目为两个数据库,由一个数据库的表向另外一个数据库的表做数据迁移,其中数据库脚本在:/src/main/resources/sql/下面(其中data_rep中的表...

    Spring Batch API(Spring Batch 开发文档).CHM

    Spring Batch API(Spring Batch 开发文档).CHM。 官网 Spring Batch API,Spring Batch 开发文档

Global site tag (gtag.js) - Google Analytics