chapter 3、Batch configuration
1、spring batch 的命名空间
spring xml中指定batch的前缀作为命名空间。
示例:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> <batch:job id="importProductsJob"> (...) </batch:job> </beans>
当指定batch命名空间之后,bean中的声明都需要加上batch的前缀,例如:<batch:job id="importProductsJob">
spring的命名空间的前缀可以指定任意的名称,这里采用batch作为前缀,为了方便理解。
2、spring batch XML的主要标签有:job、step、tasklet、chunk、job-repository
3、Job配置。job元素是整个配置的顶级元素,它的属性有:
a、id
b、restartable
c、incrementer
d、abstract
e、parent
f、job-repository
restartable 属性如果是false,则程序不允许重启。如果发生重启,会抛出JobRestartException异常。
<batch:job id="importProductsJob" restartable="false"> (...) </batch:job>
job除了这些属性外,还可以配置验证器<batch:validator ref="parameterValidator" />,用来校验工作参数(job parameters),可以实现JobParametersValidator接口。
如果无法通过验证,会抛出JobParametersInvalidException异常。spring batch提供了一个默认的实现类DefaultJobParametersValidator,完成绝大部分的工作。如果还是无法满足需求,可以自己编码实现接口。
实例:
<batch:job id="importProductsJob"> (...) <batch:validator ref="validator"/> </batch:job> <bean id="validator" class="org.springframework.batch.core.job.DefaultJobParametersValidator"> <property name="requiredKeys"> <set> <value>date</value> </set> </property> <property name="optionalKeys"> <set> <value>productId</value> </set> </property> </bean>
4、step步骤的配置。
step的属性:
a、next
b、parent
c、abstract
示例:
<job id="importProductsJob"> <step id="decompress" next="readWrite"> (...) </step> <step id="readWrite"> (...) </step> </job>
5、tasklet和chunk的配置。
tasklet和chunk是用来指定处理过程的。
一个tasklet对应于一个事务性的、潜在可重复的过程中发生的一个步骤。
你可以自己实现Tasklet 接口,定义自己的tasklet。这个特性很有用,比如:用于解压文件,执行系统命令,执行存储过程等待。
你也可以使用系统tasklet的属性有:
a、ref指定应用的bean
b、transaction-manager事物管理器
c、start-limittasklet重试retry的次数
d、allow-start-if-complete如果tasklet成功完成之后,是否可以进行重试retry操作。
示例:
<batch:job id="importProductsJob"> (...) <batch:step id="readWriteStep"> <batch:tasklet transaction-manager="transactionManager" start-limit="3" allow-start-if-complete="true"> (...) </batch:tasklet> </batch:step> </batch:job> <bean id="transactionManager" class="(...)"> (...) </bean>
chunk ,ChunkOrientedTasklet 类实现类“块处理(chunk processing)”。
配置tasklet很简单,但是配置“块处理”就会复杂一点,因为它涉及的内容更多。
chunk的属性有:
a、reader
b、processor
c、writer
d、commit-interval事物提交一次处理的items的数量。也是chunk的大小。
e、skip-limit跳跃的次数
f、skip-policy跳跃的策略:要实现SkipPolicy接口
g、retry-policy重试的策略:要实现RetryPolicy接口
h、retry-limit最大的重试次数
i、cache-capacity重试的缓存策略
j、reader-transactional-queue从一个拥有事务的JMS的queue读取item数据
k、processor-transactional处理器是否包含事务处理
l、chunk-completion-policychunk的完成策略
示例:
<batch:job id="importProductsJob"> (...) <batch:step id="readWrite"> <batch:tasklet> <batch:chunk reader="productItemReader" processor="productItemProcessor" writer="productItemWriter" commit-interval="100" skip-limit="20" retry-limit="3" cache-capacity="100" chunk-completion-policy="timeoutCompletionPolicy"/> </batch:tasklet> </batch:step> </batch:job> <bean id="productItemReader" class="(...)"> (...) </bean> <bean id="productItemProcessor" class="(...)"> (...) </bean> <bean id="productItemWriter" class="(...)"> (...) </bean> <bean id="timeoutCompletionPolicy" class="org.springframework.batch.repeat.policy.TimeoutTerminationPolicy"> <constructor-arg value="60"/> </bean>
chunk还有几个子标签,包括:reader、processor、writer、skip-policy、retry-policy
示例:
<batch:job id="importProductsJob"> (...) <batch:step id="readWrite"> <batch:tasklet> <batch:chunk commit-interval="100"> <batch:reader> <bean class="(...)"> (...) </bean> </batch:reader> <batch:processor> <bean class="(...)"> (...) </bean> </batch:processor> <batch:writer> <bean class="(...)"> (...) </bean> </batch:writer> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>
chunk的一些其他额外的子标签:retry-listeners、skippable-exception-classes、retryable-exception-classes、streams
示例:
<batch:job id="importProductsJob"> (...) <batch:step id="readWrite"> <batch:tasklet> <batch:chunk commit-interval="100" skip-limit="10"> <skippable-exception-classes> <include class="org.springframework.batch.item.file.FlatFileParseException"/> <exclude class="java.io.FileNotFoundException"/> </skippable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>
在chunk里面配置streams
示例:
<batch:job id="importProductsJob"> (...) <batch:step id="readWrite"> <batch:tasklet> <batch:chunk reader="productItemReader" writer="compositeWriter"/> <streams> <stream ref="productItemWriter1"/> <stream ref="productItemWriter2"/> </streams> </batch:tasklet> </batch:step> </batch:job> <bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter"> <property name="delegates"> <list> <ref bean="productItemWriter1"/> <ref bean="productItemWriter2"/> </list> </property> </bean>
使用一个复合的stream形式,如上例的itemWriter,内部是writer1和writer2是stream的,需要在chunk里面的streams元素里面注册。
6、事务配置。
事务是spring batch的一个重要主题。主要作用于batch处理程序的健壮性,chunk处理的合并来完成它的工作。因为事务调用的对象类型不同,你可以在不同的层次配置事务。
示例:
<batch:job id="importProductsJob"> (...) <batch:step id="readWrite"> <batch:tasklet transaction-manager="transactionManager" (...)> (...) </batch:tasklet> </batch:step> </batch:job>
事务,有几个定义的属性:行为,隔离 isolation,超时 timeout。
spring batch提供transaction-attributes标签来描述这些事务的属性。
示例:
<batch:tasklet> <batch:chunk reader="productItemReader" writer="productItemReader" commit-interval="100"/> <batch:transaction-attributes isolation="DEFAULT" propagation="REQUIRED" timeout="30"/> </batch:chunk> </batch:tasklet>
isolation 定义数据库的隔离级别以及对外部事务的可见性。
spring的事务处理是基于java的异常体系的。java里面的异常分为两类:检查型异常(extends Exception)和非检查型异常(extends RuntimeException)
spring对检查型异常进行commit提交处理,对非检查型异常进行rollback回滚处理。
spring batch运行你对不需要触发rollback回滚动作的异常进行定义。你可以在tasklet的no-rollback-exception-class元素中进行指定。
示例:
<batch:tasklet> (...) <batch:no-rollback-exception-classes> <batch:include class="org.springframework.batch.item.validator.ValidationException"/> </batch:no-rollback-exception-classes> </batch:tasklet>
对于JMS,spring batch提供chunk的reader-transactional-queue 属性,提供事务处理。
7、配置job仓库
job仓库(job repository)是spring batch底层基础的一个关键特征,它为spring batch的运行提供信息。
job仓库必须实现JobRepository接口,spring batch只提供了一个具体的实现类:SimpleJobRepository。
spring batch为DAO提供了两种实现:
a、内存无持久化
b、jdbc元数据的持久化
第一种“内存无持久化”的方式可以用来spring batch的测试和开发,但是不能应用于生产环境。因为内存中的东西会丢失。
设定job仓库的参数
(1)、配置内存模式的job仓库:Configuring an in-memory job repository
示例:
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager-ref" ref="transactionManager"/> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/> <batch:job id="importInvoicesJob" job-repository="jobRepository"> (...) </batch:job>
由于job仓库是内存模式的,所以事务管理器采用ResourcelessTransactionManager。 这个类是NOOP (NO OPeration)无操作的PlatformTransactionManager接口实现。
(2)、持久化的job仓库:Configuring a persistent job repository
关系型数据库的job仓库的属性如下:data-source,transaction-manager,isolation-level-for-create,max-varchar-length,table-prefix,lob-handler
示例:
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${batch.jdbc.driver}" /> <property name="url" value="${batch.jdbc.url}" /> <property name="username" value="${batch.jdbc.user}" /> <property name="password" value="${batch.jdbc.password}" /> </bean> <bean id="transactionManager" lazy-init="true" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager" isolation-level-for-create="SERIALIZABLE" table-prefix="BATCH_" /> <batch:job id="importInvoicesJob" job-repository="jobRepository"> (...) </batch:job>
[问题:如果在不同的物理节点上面运行同样的job会发生什么呢?]
前提:使用同一份spring batch的元数据,即采用同一份数据库上面的表结构。
当创建job实例和执行信息的元数据的时候,job仓库扮演了一个集中维护的工作库的角色。当job并发运行的时候,spring batch 的job仓库,可以防止创建相同的工作情况。这个功能依赖于底层数据库的事务能力,完成job的同步任务。我们设置job仓库的属性isolation-level-for-create="SERIALIZABLE",来避免发生job的并发问题。由于有这种防护措施,你可以把你的spring batch的job分布到各个不同的物理节点,保证你的job实例不会被创建两次。
job仓库是spring batch底层结构中的重要部分,它记录了batch处理的信息,并且跟踪job运行的成功与失败。
8、spring batch配置的高级主题。
(1)、使用step作用域。当使用SpEL的时候,step作用域很有用,来实现属性的晚绑定。
(2)、Spring表达式语言:Spring Expression Language (SpEL)
Spring3.× 开始提供。
step的作用的实例范围包括:jobParameters、jobExecutionContext、stepExecutionContext
示例:
<bean id="decompressTasklet" class="com.manning.sbia.ch01.batch.DecompressTasklet" scope="step"> <property name="inputResource" value="#{jobParameters['inputResource']}" /> <property name="targetDirectory" value="#{jobParameters['targetDirectory']}" /> <property name="targetFile" value="#{jobParameters['targetFile']}" /> </bean>
SpEL由 #{ 和 } 组成
在jobExecutionContext 和 stepExecutionContext也可以应用SpEL表达式。
(3)、使用Linteners提供的更多的处理。
Spring batch在job和step级别上面提供listener。
Spring batch提供的listener的类型:
a、Job listener:在job级别监听处理过程
b、Step listeners:在step级别监听处理过程
c、Item listeners:监听item的retry重试和repeat重做动作
一、Job listener 可以监听job的运行,在job运行前和后添加动作。可以利用 listener标签,在job标签下面作为子元素进行添加。
示例1:
<batch:job id="importProductsJob"> <batch:listeners> <batch:listener ref="importProductsJobListener"/> </batch:listeners> </batch:job>
importProductsJobListener不管job运行成功还是失败,它都会在job运行的开始和结束的时候,接收job的通知,进行监听。
还可以通过普通的POJO的java对象来做监听器,只需要进行简单的配置即可。
示例2:
<batch:listeners> <batch:listener ref="importProductsJobListener" after-job-method="afterJob" before-job-method="beforeJob"/> </batch:listeners>
可以在listener里面的ref指定引用的POJO的bean,通过after-job-method="afterJob" before-job-method="beforeJob" 来指定job之前和之后的执行方法。不过,被指定的这两个方法的参数都需要是:JobExecution jobExecution。 这个2个方法的返回值都是void
还有一种方法是利用“注释”来配置listener,spring batch会自己发现并运行该类。
二、Step listener
Step有一系列的listener来跟踪step的处理过程。这里所有的listener接口都继承了StepListener接口。
Spring batch提供的Step的listener有:
a、ChunkListener:在chunk执行的之前和之后调用。
b、ItemProcessListener:在 ItemProcessor得到一个item之前和之后调用,在ItemProcessor抛出一个异常的时候调用。
c、ItemReadListener:在读取item之前和读取item之后调用,或者在读取item的过程中触发异常的时候调用。
d、ItemWriteListener:在一个item输出之前和之后调用,或者在item输出的过程中调用。
e、SkipListener:当读取、处理和输出的过程中产生了skip跳跃一个item的时候调用。
f、StepExecutionListener:在step运行之前和之后调用。
接口代码:
public interface StepExecutionListener extends StepListener { void beforeStep(StepExecution stepExecution); ExitStatus afterStep(StepExecution stepExecution); } public interface ChunkListener extends StepListener { void beforeChunk(); void afterChunk(); } public interface ItemProcessListener<T, S> extends StepListener { void beforeProcess(T item); void afterProcess(T item, S result); void onProcessError(T item, Exception e); } public interface ItemReadListener<T> extends StepListener { void beforeRead(); void afterRead(T item); void onReadError(Exception ex); } public interface ItemWriteListener<S> extends StepListener { void beforeWrite(List<? extends S> items); void afterWrite(List<? extends S> items); void onWriteError(Exception exception, List<? extends S> items); } public interface SkipListener<T,S> extends StepListener { void onSkipInRead(Throwable t); void onSkipInProcess(T item, Throwable t); void onSkipInWrite(S item, Throwable t); }
Step listener 作为tasklet标签的一个子标签进行配置。
上面这些所有的Step listener都可以作为tasklet标签的子标签以相同的方式和等级进行配置。
示例:
<bean id="importProductsJobListener" class="test.case01.java.batch.listener.job.ImportProductsJobListener" /> <bean id="productStepExecutionListener" class="test.case01.java.batch.listener.step.ProductStepExecutionListener" /> <bean id="productChunkListener" class="test.case01.java.batch.listener.step.chunk.ProductChunkListener" /> <bean id="productItemProcessListener" class="test.case01.java.batch.listener.step.chunk.item.ProductItemProcessListener" /> <batch:job id="importProducts" restartable="false"> <batch:step id="readWriteProducts"> <batch:tasklet> <batch:chunk reader="reader" writer="writer" processor="processor" commit-interval="100" skip-limit="5"> <batch:skippable-exception-classes> <batch:include class="org.springframework.batch.item.file.FlatFileParseException" /> </batch:skippable-exception-classes> </batch:chunk> <batch:listeners> <!-- here configure three kinds listeners for StepExecutionListener , ChunkListener , ItemProcessListener for example.--> <batch:listener ref="productStepExecutionListener" /> <batch:listener ref="productChunkListener" /> <batch:listener ref="productItemProcessListener" /> </batch:listeners> </batch:tasklet> </batch:step> <batch:validator ref="parameterValidator" /> <batch:listeners> <batch:listener ref="importProductsJobListener" /> </batch:listeners> </batch:job>
三、retry重试和repeat重做的listener
repeat listener拥有的方法名称:before、after、close(在一个item最后一次repeat重做之后调用,不管repeat成功与否)、onError、open
retry listener拥有的方法名称:close(在一个item上面最后一次尝试retry之后调用,不管retry成功与否)、onError、open
接口代码:
public interface RepeatListener { void before(RepeatContext context); void after(RepeatContext context, RepeatStatus result); void open(RepeatContext context); void onError(RepeatContext context, Throwable e); void close(RepeatContext context); } public interface RetryListener { <T> void open(RetryContext context, RetryCallback<T> callback); <T> void onError(RetryContext context, RetryCallback<T> callback, Throwable e); <T> void close(RetryContext context, RetryCallback<T> callback, Throwable e); }
这2个接口,和上面的step listener的配置位置和方式一致,都是tasklet标签的子标签位置
四、配置继承关系
spring的XML提供配置的继承。使用abstract和parent两个属性。从一个parent的bean继承,表示该bean可以利用parent bean中的所有的属性,并且可以覆盖这些属性。
示例:
<bean id="parentBean" abstract="true"> <property name="propertyOne" value="(...)"/> </bean> <bean id="childBean" parent="parentBean"> <property name="propertyOne" value="(...)"/> <property name="propertyTwo" value="(...)"/> </bean>
这种继承关系是由spring提供的,在spring batch里面也可以使用。
listeners标签,提供merge属性,可以用来合并parent和自身的listener
示例:
<job id="parentJob" abstract="true"> <listeners> <listener ref="globalListener"/> <listeners> </job> <job id="importProductsJob" parent="parentJob"> (...) <listeners merge="true"> <listener ref="specificListener"/> <listeners> </job>
spring XML的这种继承关系,使得spring batch的XML配置更简单。
相关推荐
Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供...
Spring Boot以其简洁的配置和快速的启动能力深受开发者喜爱,而Spring Batch作为Spring框架的一部分,专注于批量处理任务,提供了强大的数据处理能力和事务管理功能。下面我们将深入探讨这个主题。 首先,**Spring ...
知识点一:Spring Batch的核心概念 Spring Batch框架以处理大量记录和复杂业务逻辑为核心。它提供了一套丰富的特性来处理任务执行、数据读取和写入、事务管理等批处理常见的复杂性。Spring Batch通过批次作业(Batch...
在这个“quartz_springbatch_dynamic”项目中,我们将看到如何将这两个强大的工具结合起来,以实现动态集群环境中的定时任务执行,并使用MySQL作为数据存储。 Quartz是一个开源的作业调度框架,允许开发者创建、...
**二、Spring Batch 的特性** 1. **可重试与跳过错误**: 支持对失败记录的重试,以及在遇到错误时选择跳过某些记录。 2. **事务管理**: 内置支持分布式事务,确保数据的一致性和完整性。 3. **并发处理**: 可以并行...
Work with all aspects of batch processing in a modern Java environment using a selection of Spring frameworks. This book provides up-to-date examples using the latest configuration techniques based on...
ItemProcessor是Spring Batch的核心组件之一,它的主要职责是接收一个输入项(Item),对其进行处理,并返回一个新的处理结果。在多处理器场景下,项目可能包含多个ItemProcessor,每个处理器执行不同的转换或业务...
由于信息给定的【部分内容】并没有提供实际的技术细节,因此我将基于Spring Batch框架本身,详细介绍它的一些核心知识点。 1. 核心概念 Spring Batch定义了一些核心组件,例如Job(作业)、Step(步骤)、Tasklet和...
- **高性能与可扩展性**:Spring Batch支持高并发处理,可以通过简单的配置实现应用的纵向和横向扩展。 - **丰富的特性**:包括但不限于事务管理、重试机制、跳过机制、作业监控等。 - **全面的API支持**:提供了...
1. **增强的事务管理**:Spring Batch 4.0.0 对事务管理进行了优化,支持更灵活的事务边界定义,允许开发者根据需要调整步进事务的粒度。 2. **元数据存储升级**:此版本对元数据存储进行了优化,支持更多的数据库...
本书的第二部分深入探讨了Spring Batch的核心概念,包括如何配置批处理作业、运行作业、读取和写入数据、处理数据以及实现可靠的作业。第三部分进一步深入到高级话题,涵盖执行处理、企业集成、监控、扩展以及批量...
1. **Job配置**:Spring Batch的工作(Job)定义了整个处理流程。在分区场景中,Job配置会包含一个`Step`,该`Step`使用了`Partitioner`。 2. **Partitioner实现**:这个类会根据业务需求分割任务。例如,如果数据...
该示例程序采用的是Spring 3作为基础框架,以及Spring Batch 2.2.7版本,这两个组件都是Spring生态系统的重要部分。 **Spring Batch 的核心概念:** 1. **Job**:Job是Spring Batch中的顶级概念,代表一个完整的...
使用 `application.yml` 或 `application.properties` 文件配置 SpringBoot 和 SpringBatch 的相关参数。例如,你可以定义批处理作业(JOB)的相关配置、数据源信息、事务管理等。配置文件可能会包含如下的内容: ...
SpringBoot简化了Spring应用程序的配置和启动过程,而SpringBatch则专注于批处理任务,提供了一套全面且可扩展的解决方案。在这个“springBoot+springBatch批量处理数据demo”中,我们将探讨如何将这两个强大的工具...
在本文中,我们将深入探讨Spring Batch的基本概念、核心组件以及如何在实际项目中应用它。 1. **基本概念** - **作业(Job)**:在Spring Batch中,作业是执行批处理任务的顶级概念。它由一系列步骤组成,可以有...
- **Spring Batch** 的核心优势在于它能够简化复杂的批量数据处理逻辑,并且通过其内置的功能模块来提高系统的可维护性和可扩展性。 #### 知识点二:Spring Batch 的主要概念 - **Job**:在 Spring Batch 中,一个...
1. **Job**:是SpringBatch中的最高级别抽象,代表一个完整的批处理任务,可以包含一个或多个Step。 2. **Step**:是Job的执行单元,负责执行特定的处理任务。每个Step可以有多个Tasklet,Tasklet是实际处理数据的...
在 `springBatchDemo` 文件中,通常包含了示例代码,演示如何配置和运行一个简单的Spring Batch作业,涉及到读取数据库中的数据,进行处理,然后写回到数据库。你可以参考这些代码,理解Spring Batch的工作原理和...