spring batch的处理流程:
读取数据->处理数据->写数据
reader->process->writer
maven 依赖:
<properties> <spring.version>3.2.2.RELEASE</spring.version> <spring.batch.version>2.2.0.RELEASE</spring.batch.version> <mysql.driver.version>5.1.25</mysql.driver.version> <junit.version>4.11</junit.version> </properties> <dependencies> <!-- Spring Core --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring jdbc, for database --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring XML to/back object --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.version}</version> </dependency> <!-- MySQL database driver --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.driver.version}</version> </dependency> <!-- Spring Batch dependencies --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- Spring Batch unit test --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- Junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies>
POJO类:
package com.tch.test.spring.batch.entity; import java.text.SimpleDateFormat; import java.util.Date; public class Report { private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); private int id; private Date date; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } @Override public String toString() { return "Report [id=" + id + ", date=" + dateFormat.format(date) + "]"; } }
mapper类:
package com.tch.test.spring.batch; import java.text.ParseException; import java.text.SimpleDateFormat; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.validation.BindException; import com.tch.test.spring.batch.entity.Report; public class ReportFieldSetMapper implements FieldSetMapper<Report> { private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); @Override public Report mapFieldSet(FieldSet fieldSet) throws BindException { Report report = new Report(); report.setId(fieldSet.readInt(0)); String date = fieldSet.readString(1); try { report.setDate(dateFormat.parse(date)); } catch (ParseException e) { e.printStackTrace(); } return report; } }
processor类:
package com.tch.test.spring.batch; import org.springframework.batch.item.ItemProcessor; import com.tch.test.spring.batch.entity.Report; public class CustomItemProcessor implements ItemProcessor<Report, Report> { @Override public Report process(Report report) throws Exception { System.out.println("Processing..." + report); return report; } }
writer类:
package com.tch.test.spring.batch; import java.util.List; import org.springframework.batch.item.ItemWriter; import com.tch.test.spring.batch.entity.Report; public class ReportItemWriter implements ItemWriter<Report>{ public void write(List<? extends Report> reports) throws Exception { for (Report m : reports) { System.out.println("write results : "+m); } } }
beans.xml:(commit-interval表示批处理每次事务处理记录数,这里每次处理十条)
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd "> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="cvsFileItemReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="cvsFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:report.txt" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans>
report.txt:
1001, 29/7/2013 1002, 30/7/2013 1003, 31/7/2013 1004, 29/7/2013 1005,30/7/2013 1006, 31/7/2013 1007, 29/7/2013 1008,30/7/2013 1009, 31/7/2013 1010, 29/7/2013 1011,30/7/2013 1012, 31/7/2013 1013, 29/7/2013 1014,30/7/2013 1015, 31/7/2013 1016, 29/7/2013 1017,30/7/2013 1018, 31/7/2013
Test:
package com.tch.test.spring.batch; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Test { public static void main(String[] args) throws Exception { String[] springConfig = { "beans.xml" }; ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("helloWorldJob"); JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Exit Status : " + execution.getStatus()); System.out.println("Done"); context.close(); } }
如果要读取多个文件,则只需要使用MultiResourceItemReader即可:
beans.xml:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="multiResourceReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="multiResourceReader" class=" org.springframework.batch.item.file.MultiResourceItemReader"> <property name="resources" value="classpath:report*.txt" /> <property name="delegate" ref="flatFileItemReader" /> </bean> <bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:report.txt" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans>
beans.xml:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- 处理数据 --> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <!-- 处理数据的job --> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="multiResourceReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <!-- 读取多个资源的reader --> <bean id="multiResourceReader" class=" org.springframework.batch.item.file.MultiResourceItemReader"> <!-- 资源位置 --> <property name="resources" value="classpath:report*.txt" /> <!-- 使用读取单个资源的reader --> <property name="delegate" ref="flatFileItemReader" /> </bean> <!-- 读取单个资源的reader --> <bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <!-- 分词器 --> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <!-- 数据和实体的映射处理 --> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <!-- 数据处理完成之后,自定义写操作 --> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <!-- job启动 --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <!-- job仓库 --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <!-- 事务管理 --> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> <!-- job注册 --> <bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor"> <property name="jobRegistry" ref="jobRegistry" /> </bean> <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" /> <!-- 使用quartz进行调度管理(每5秒钟执行一次) --> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean"> <!-- 执行的操作 --> <property name="jobDetail" ref="jobDetail" /> <!-- 定时表达式 --> <property name="cronExpression" value="*/5 * * * * ?" /> </bean> </property> </bean> <bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean"> <!-- 执行操作的class --> <property name="jobClass" value="com.tch.test.spring.batch.JobLauncherDetails" /> <property name="group" value="quartz-batch" /> <property name="jobDataAsMap"> <map> <entry key="jobName" value="helloWorldJob" /> <entry key="jobLocator" value-ref="jobRegistry" /> <entry key="jobLauncher" value-ref="jobLauncher" /> <entry key="param1" value="value1" /> <entry key="param2" value="value2" /> </map> </property> </bean> </beans>
JobLauncherDetails:
package com.tch.test.spring.batch; import java.util.Date; import java.util.Map; import java.util.Map.Entry; import org.quartz.JobExecutionContext; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.configuration.JobLocator; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.scheduling.quartz.QuartzJobBean; public class JobLauncherDetails extends QuartzJobBean { static final String JOB_NAME = "jobName"; private JobLocator jobLocator; private JobLauncher jobLauncher; public void setJobLocator(JobLocator jobLocator) { this.jobLocator = jobLocator; } public void setJobLauncher(JobLauncher jobLauncher) { this.jobLauncher = jobLauncher; } @SuppressWarnings("unchecked") protected void executeInternal(JobExecutionContext context) { Map<String, Object> jobDataMap = context.getMergedJobDataMap(); //拿到beans.xml中jobDetail里面配置的"jobName"对应的value String jobName = (String) jobDataMap.get(JOB_NAME); JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap); try { //运行对应的job jobLauncher.run(jobLocator.getJob(jobName), jobParameters); } catch (JobExecutionException e) { e.printStackTrace(); } } // 读取配置的参数 private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) { JobParametersBuilder builder = new JobParametersBuilder(); for (Entry<String, Object> entry : jobDataMap.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); //过滤掉"jobName" if (value instanceof String && !key.equals(JOB_NAME)) { builder.addString(key, (String) value); } else if (value instanceof Float || value instanceof Double) { builder.addDouble(key, ((Number) value).doubleValue()); } else if (value instanceof Integer || value instanceof Long) { builder.addLong(key, ((Number) value).longValue()); } else if (value instanceof Date) { builder.addDate(key, (Date) value); } else { //过滤掉beans.xml中jobDetail的jobDataAsMap配置的"jobLocator"、"jobLauncher"属性 // JobDataMap contains values which are not job parameters // (ignoring) } } // need unique job parameter to rerun the completed job builder.addDate("run date", new Date()); return builder.toJobParameters(); } }
运行:
package com.tch.test.spring.batch; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Test { public static void main(String[] args) throws Exception { String[] springConfig = { "beans.xml" }; context = new ClassPathXmlApplicationContext(springConfig); } }
相关推荐
轻松引领进入数据批处理世界:基本特性...深度探索Spring Batch 批处理框架的核心概念:作业配置、作业步配置,以及Spring Batch 框架中经典的三步走策略。 快速提升数据批处理的能力:高性能、高可靠性、并行处理。
入门篇介绍了批处理、Spring Batch的基本特性和新特性,快速入门的Hello World等内容引领读者入门,从而进入数据批处理的世界。基本篇重点讲述了数据批处理的核心概念、典型的作业配置、作业步配置,以及Spring ...
在本文中,我们将深入探讨SpringBatch的学习入门,以及如何在IDEA环境中进行配置和实践。 首先,我们需要了解SpringBatch的核心概念: 1. **Job**:是SpringBatch中的最高级别抽象,代表一个完整的批处理任务,...
《Pro Spring Batch》一书深入探讨了Java Spring Batch框架的核心概念与实际应用,为开发者提供了全面的批处理技术指南。本书由Apress出版,旨在帮助读者掌握Spring Batch框架的精髓,提升在批量数据处理领域的技能...
一个SpringBatch的入门案例,SpringBatch是spring体系下的一个轻量级的批处理框架,使用简单方便,是非常优秀的一个批处理框架。如果你的工作中涉及到了批处理作业,可以考虑使用SpringBatch框架,本案例有详细的...
Spring Batch 入门教程篇 Spring Batch 是一个大数据量的并行处理框架,通常用于数据的离线迁移和数据处理,具有事务、并发、流程、监控、纵向和横向扩展等特性,提供统一的接口管理和任务管理。Spring Batch 是 ...
Spring_Batch项目在实际开发中可能会遇到一些问题,因为它所提供的高级特性在网上资料尤其是中文资料非常有限,官方文档也可能只是覆盖了入门级别的基础知识。因此,学习者往往需要根据自己的项目需求,结合Spring_...
详解 Spring Batch 入门学习教程 Spring Batch 是一个开源的批处理框架,执行一系列的任务。在 Spring Batch 中,一个 Job 是由许多 Step 组成的,而每一个 Step 又是由 READ-PROCESS-WRITE 任务或者单个 Task 组成...
- 这份教程可能包含实战案例,讲解如何使用Spring进行数据库操作(如JDBC、Hibernate、MyBatis集成)、Spring Security安全框架、Spring Batch批处理等进阶主题。 5. **spring基础教程.rar**: - 这是一份针对...
13. **性能优化**:包括懒加载(Lazy Loading)、批处理(Batch Processing)、缓存(Caching)等策略,以提升JPA应用的性能。 通过学习《Spring Data JPA从入门到精通》,读者不仅可以掌握Spring Data JPA的基本...
GKE上的Spring Batch Spring Batch Job在GKE上运行 描述 Spring批处理模型 工作 它是一个封装整个批处理过程的实体 它只是Step实例的容器 它组合了逻辑上属于流程的多个步骤,并允许配置所有步骤全局的属性 步它是...
Spring Batch则专注于批处理任务,提供了一组高级服务,如跳过失败的记录、重试和分步处理。 10. **实战示例** "Spring入门经典源代码"中的每个章节都配有示例代码,这些示例涵盖了上述各种概念,包括简单的Hello...
8. **Spring Batch**:处理批量数据和批处理任务的框架,提供可重试、事务管理、分割和分区等高级特性。 9. **Spring Cloud**:基于Spring Boot的微服务工具集合,用于构建分布式系统,包括服务发现、配置中心、...
《Spring入门经典》是一本深度剖析Spring框架的入门书籍,旨在帮助初学者快速掌握Spring的核心概念和使用方法。书中的示例代码是理论知识的实践体现,通过这些代码,读者可以直观地理解Spring如何工作,如何配置,...
Spring MVC可以无缝集成Spring的其他模块,如Spring Security(安全控制)、Spring Data(数据访问)、Spring Batch(批处理)等,构建完整的应用程序。 总的来说,Spring MVC 提供了一套完善的Web开发解决方案,...
5. **Spring Batch**:讨论批处理功能,如何设计和执行大规模数据处理任务。 6. **Spring Integration**:介绍Spring Integration,用于实现不同系统间的集成和消息传递。 7. **Spring Cloud Stream**:讲解如何利用...
Spring Batch是一个用于处理大量数据的批处理框架,它提供了许多核心功能,如读取、处理和写入数据,以及错误处理和事务管理。 九、Spring Integration Spring Integration提供了一套强大的工具,用于实现不同系统...
Spring Batch是针对大规模批处理操作的框架,它提供了一套完整的解决方案,包括读取、处理和写入大量数据,支持事务、错误处理和重启等功能。 10. **测试支持** Spring提供了全面的测试支持,包括单元测试、集成...
Spring Batch是一个用于处理大量数据的批处理框架。它提供了灵活的事务管理、错误处理和重复作业防止机制。 以上只是Spring框架部分核心知识点的简要介绍,实际学习过程中,还需要深入了解每个知识点的具体用法和...