`
xiashenghai
  • 浏览: 23147 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

spring batch的批处理框架简单介绍

阅读更多

有关spring batch的介绍我就不多说了,可以去下面的网址看看:

http://www.infoq.com/cn/news/2008/07/spring-batch-zh

 

刚接触到spring batch的时候无从下手,javaeye有关的帖子博文也非常的少,很郁闷只能看它自己提供的文档,说真的,那文档帮助不大,顶多就是让你知道spring batch靠这么几个类玩的。没办法只能自己一步步看代码调试,走了不少弯路呢。

 

这篇文章简单介绍一下spring batch是怎么处理单个文件的。

 

首先看下spring batch关键的几个类:

 

 

JobLauncher负责启动Job,Job中干事的是Step,想让Step干活,就要给它点工 具,itemReader,ItemProcessor,itemWriter就是它需要的工具,这些工具可以是你自己提供,以可以用spring batch现成的。就看实际情况了。

 

接下来就具体看看这些类究竟包含了什么东西,干了哪些事情。

先看配置文件:

<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
</bean>
	
	
<bean id="jobRepository"class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
		p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" />

 jobLauncher的配置很简单,只是将需要的jobRepository注进来而已。复杂的是job和step的配置

<!-- 对账单批处理 begin -->
	<bean id="checkSheetFileImportJob" parent="simpleJob">
		<property name="steps">
			<list>
				<bean id="checkSheetTransactionLoadStep" parent="simpleStep">
					<property name="commitInterval" value="3"/>
					<property name="allowStartIfComplete" value="true"/>
					<property name="itemReader" ref="checkSheetTransactionItemReader"/>
					<property name="itemWriter" ref="checkSheetTransactionWriter"/>
				</bean>
			</list>
		</property>
	</bean>

        <bean id="checkSheetTransactionWriter" class="com.longtop.netbank.checksheet.CheckSheetTransactionJDBCWriter">
		<property name="dataSource" ref="dataSource"/>
		<property name="resource" ref="checkSheetFileResource"></property>
	</bean>
	
	<bean id="checkSheetTransactionItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" ref="checkSheetFileResource"/>
		<property name="linesToSkip" value="0"/>
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean
						class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
						<property name="names"
							value="accountNo,transactionDate,transactionType,oppositeAccountNo,oppositeAccountNickName,summary,lending,transactionAmount,balance"/>
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.longtop.netbank.checksheet.CheckSheetTransactionFieldSetMapper"/>
				</property>
			</bean>
		</property>
	</bean>

 

        <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">
		<property name="jobRepository" ref="jobRepository" />
		<property name="restartable" value="true" />
	</bean>

	<bean id="simpleStep" class="org.springframework.batch.core.step.item.SimpleStepFactoryBean"
		abstract="true">
		<property name="transactionManager" ref="transactionManager" />
		<property name="jobRepository" ref="jobRepository" />
		<property name="commitInterval" value="1" />
	</bean>

 job是springbatch自带的,我们配置steps属性,以便执行批处理。

在示例中,step的工作就是通过itemReader读取数据文件,然后用itemProcessor进行处理,然后通过itemWriter写入到数据库中,示例中的step也是用的springbatch自带的类SimpleStepFactoryBean。

 

对于SimpleStepFactoryBean需要花功夫好好的看看它究竟做了哪些事情。

 

public class SimpleStepFactoryBean<T,S> implements FactoryBean, BeanNameAware {

private int commitInterval = 0;

/**
	 * Create a {@link Step} from the configuration provided.
	 * 
	 * @see org.springframework.beans.factory.FactoryBean#getObject()
	 */
	public final Object getObject() throws Exception {
		TaskletStep step = new TaskletStep(getName());
		applyConfiguration(step);
		return step;
	}

	public Class<Step> getObjectType() {
		return Step.class;
	}
...
}

 

 我们可以知道这个类返回的是TaskletStep对象,并通过applyConfiguration方法设置TaskletStep对象的属性。applyConfiguration比较长,着了就不贴出来了。再讲到后面的时候在回过头来介绍这个方法。

commitInterval 大家通过单词的意思就应该知道该变量是控制itemReader每次读取的量,和itemWriter每次写入的量,简单的说就是:

如果commitInterval=10,数据文件有30个数据,当读到写到24个数据的时候抛出了异常,那么成功写入数据库的数据以后20条,第21--第24条数据放弃。下次如果执行断点回复时,就从第21条数据开始

 

 

/**
	 * @return a {@link CompletionPolicy} consistent with the commit interval
	 * and injected policy (if present).
	 */
	private CompletionPolicy getChunkCompletionPolicy() {
		Assert.state(!(chunkCompletionPolicy != null && commitInterval != 0),
				"You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
		Assert.state(commitInterval >= 0, "The commitInterval must be positive or zero (for default value).");
	
		if (chunkCompletionPolicy != null) {
			return chunkCompletionPolicy;
		}
		if (commitInterval == 0) {
			logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");
			commitInterval = DEFAULT_COMMIT_INTERVAL;
		}
		return new SimpleCompletionPolicy(commitInterval);
	}

 这个方法就是设置将commitInterval设置到SimpleCompetionPolicy中以便以后使用。 SimpleCompetionPolicy是用来控制循环什么时候结束,控制的方法就是通过判断执行的次数是否超过了commitInterval。

 

下面就大概的讲下执行步骤(只贴类名和部分代码,类名方便查找源代码):

 

1 jobLauncher调用job:

public class SimpleJobLauncher implements JobLauncher, InitializingBean {
/**
	 * Run the provided job with the given {@link JobParameters}. The
	 * {@link JobParameters} will be used to determine if this is an execution
	 * of an existing job instance, or if a new one should be created.
	 * 
	 * @param job the job to be run.
	 * @param jobParameters the {@link JobParameters} for this particular
	 * execution.
	 * @return JobExecutionAlreadyRunningException if the JobInstance already
	 * exists and has an execution already running.
	 * @throws JobRestartException if the execution would be a re-start, but a
	 * re-start is either not allowed or not needed.
	 * @throws JobInstanceAlreadyCompleteException if this instance has already
	 * completed successfully
	 */
	public JobExecution run(final Job job, final JobParameters jobParameters)
			throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

		Assert.notNull(job, "The Job must not be null.");
		Assert.notNull(jobParameters, "The JobParameters must not be null.");

		boolean exists = jobRepository.isJobInstanceExists(job.getName(), jobParameters);
		if (exists && !job.isRestartable()) {
			throw new JobRestartException("JobInstance already exists and is not restartable");
		}
		/**
		 * There is a very small probability that a non-restartable job can be
		 * restarted, but only if another process or thread manages to launch
		 * <i>and</i> fail a job execution for this instance between the last assertion
		 * and the next method returning successfully.
		 */
		final JobExecution jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

		taskExecutor.execute(new Runnable() {

			public void run() {
				try {
					
					job.execute(jobExecution);
					
				}
				catch (Throwable t) {
					logger.info("Job: [" + job + "] failed with the following parameters: [" + jobParameters + "]", t);
					rethrow(t);
				}
			}

			private void rethrow(Throwable t) {
				if (t instanceof RuntimeException) {
					throw (RuntimeException) t;
				}
				throw new RuntimeException(t);
			}
		});

		return jobExecution;
	}

 执行job前先判断下job实例是否已经存在或能否重复执行。不满足条件就抛异常。这段代码很简单不用多讲。

 

2 job执行step

 

public class SimpleJob extends AbstractJob {
	public void execute(JobExecution execution) {
...
		getCompositeListener().beforeJob(execution);
	
		StepExecution lastStepExecution = handleSteps(steps, execution);
...
                try {
				getCompositeListener().afterJob(execution);
			}
...
       }

 代码很多,贴了一写关键的。在job执行step之前,会先执行JobExecutionListener的beforeJob方法。执行结束之 后又会执行JobExecutionListener的afterJob方法。可以根据需要来决定这些方法干什么事情。如果这个job有多个step

那么程序会根据最后一个step的stepExecution来更新jobExecution.

 

3 step的执行

 

public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {

/**
	 * Template method for step execution logic - calls abstract methods for
	 * resource initialization ({@link #open(ExecutionContext)}), execution
	 * logic ({@link #doExecute(StepExecution)}) and resource closing ({@link #close(ExecutionContext)}).
	 */
	public final void execute(StepExecution stepExecution) throws JobInterruptedException,
			UnexpectedJobExecutionException 『
        ...
        }
}

 step就是执行这个方法干活的。首先是将数据文件放入流中:

open(stepExecution.getExecutionContext());

也就是调用 FlatFileItemReader的open方法,因为在配置文件中itemReader的实现类型就是FlatFileItemReader,在SimpleStepBeanFacotry的applyConfiguration中讲FlatFileItemReader配置到TaskletStep中。(详情请看代码)。

然后调用TaskletStep的doExecute方法

exitStatus = doExecute(stepExecution);

 看下该方法的内容(代码不贴了,请看源码)

return stepOperations.iterate(ReapCallBack callback)

 这个callback就是从流中读取commitInterval指定个数的数据并写入到数据库中它是调用tasklet.execute方法

exitStatus = tasklet.execute(contribution, attributes);

 该tasklet类型是SimpleChunkOrientedTasklet,至于为什么是这个类型,就要看看SimpleStepBeanFactory的applyConfiguration方法了。

SimpleChunkOrientedTasklet的execute简单明了,不多说了。

 

这里有一点饶人的地方就是stepOperations.iterate了,看代码的时候需要明确的一点是stepOperations(RepeatTemplate)是使用CompletionPolicy接口来控制循环的,对于TaskletStep和SimpleChunkOrientedTasklet使用哪个CompletionPolicy来控制循环,那么还是要看下SimpleStepBeanFactory的

applyConfiguration方法。

 

 

上面介绍的就是一个job的执行过程。因为是第一次写。所以很多地方都写有说清楚,如果有疑问,请提出来。

附件是demo,要创建一个springbatch的数据就可以运行。当然这个demo有小陷阱(每次执行的时候会删除数据表重新创建)大家可以根据需要修改一下,当然如果每次都删除数据表的话,就看不到spring batch断点回复的功能了。

因为附件太大不能上传,各位只能自己去下了。

 

 

 

 

 

 

 

 

  • 大小: 28 KB
分享到:
评论
2 楼 mengfei86 2012-09-17  
,好文章
1 楼 xiandaoyan 2009-11-27  
没有看见你的附件啊!!!!!!!!!!!!

相关推荐

    Spring Batch批处理框架

    Spring Batch批处理框架Spring Batch批处理框架Spring Batch批处理框架

    Spring.Batch批处理框架

    Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更...

    SpringBatch批处理框架

    资源名称:Spring Batch 批处理框架内容简介:《Spring Batch 批处理框架》全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的...

    SpringBatch批处理 刘相编

    《Spring Batch 批处理框架》全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的架构设计、源码做了特定的剖析;在帮助读者掌握...

    springbatch 详解PDF附加 全书源码 压缩包

    通过阅读《Spring.Batch批处理框架.pdf》和源码,你将能够掌握 Spring Batch 的核心概念和实践技巧,为你的企业级应用开发带来高效、可靠的批量处理能力。同时,源码可以直接运行,提供了动手实践的机会,加深理解和...

    Spring Boot整合Spring Batch,实现批处理

    在Java开发领域,Spring Boot和Spring Batch的整合是构建高效批处理系统的一种常见方式。Spring Boot以其简洁的配置和快速的启动能力深受开发者喜爱,而Spring Batch作为Spring框架的一部分,专注于批量处理任务,...

    Spring Batch批处理框架使用解析

    Spring Batch批处理框架使用解析 Spring Batch是一个开源的批处理框架,主要用于处理大量数据的批处理任务。其主要思想是将复杂的批处理任务分解成多个小的任务,通过将任务分配给多个步骤来实现批处理。Spring ...

    Spring的批处理框架

    Spring的批处理框架,开源界少见的后台ETL框架。在此基础上定制,有望建立企业级的ETL支撑系统。

    Batch批处理框架.zip

    这个压缩包"Batch批处理框架.zip"包含了深入学习Spring Batch的相关资源,适合初学者到高级开发者,旨在帮助你成为批处理领域的专家。 Spring Batch 提供了一套标准的API和实现,用于处理批量数据处理任务,包括...

    Spring家族的新成员Spring Batch批处理作业中间件.zip_中间件举例

    通过上述介绍,我们可以看出Spring Batch是一个强大且灵活的批处理框架,它为企业级应用提供了解决大量数据处理问题的有效工具。结合Spring生态的其他组件,如Spring Integration、Spring Data等,可以构建出更加...

    【SpringBatch】批处理框架整合配置过程文档.docx

    Spring Batch是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序。 Spring Batch构建了人们期望的Spring Framework特性(生产力,基于POJO的开发方法和一般易用性),同时使...

    Spring batch批处理框架

    Spring Batch 是一个强大的批处理框架,它为Java应用程序提供了处理大量数据的能力。Spring Batch 的设计目标是处理批量数据,如数据库记录、文件等,而无需人工干预,这在许多业务场景中至关重要。它通过一系列精心...

    SpringBatch+Spring+Mybatis+MySql (spring batch 使用jar)

    Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供...

    Java中批处理框架spring batch详细介绍

    Java中批处理框架Spring Batch详细介绍 Spring Batch是Spring提供的一个数据处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序。它提供了可重用的功能,这些功能对于处理大量的数据至关重要,...

    SpringBatch+SpringBoot构建海量数据企业批处理系统和性能优化

    SpringBatch+SpringBoot构建海量数据企业批处理系统和性能优化,Spring Batch是一个基于Spring的企业级批处理框架,所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再...

    详解Spring Batch 轻量级批处理框架实践

    在本文中,我们将通过示例代码详细介绍 Spring Batch 轻量级批处理框架实践,展示如何使用 Spring Batch 实现批处理任务,从 MariaDB 一张表内读取 10 万条记录,经处理后写到 MongoDB。 Spring Batch 介绍 Spring...

    spring batch

    Spring Batch 是一个强大的、全面的批处理框架,用于处理大量数据。它被设计为高度可扩展和可配置,适用于各种企业级应用。Spring Batch 与 Spring 框架紧密集成,能够利用 Spring 的核心功能,如依赖注入和事务管理...

    quartz整合springbatch动态集群定时实现mysql参考

    在这个“quartz_springbatch_dynamic”项目中,我们将看到如何将这两个强大的工具结合起来,以实现动态集群环境中的定时任务执行,并使用MySQL作为数据存储。 Quartz是一个开源的作业调度框架,允许开发者创建、...

Global site tag (gtag.js) - Google Analytics