`
dreamoftch
  • 浏览: 496509 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

spring batch批处理 入门

阅读更多

 

参考:spring batch参考

 

spring batch的处理流程:

 

读取数据->处理数据->写数据

 

reader->process->writer

 

 

maven 依赖:

<properties>
		<spring.version>3.2.2.RELEASE</spring.version>
		<spring.batch.version>2.2.0.RELEASE</spring.batch.version>
		<mysql.driver.version>5.1.25</mysql.driver.version>
		<junit.version>4.11</junit.version>
	</properties>
 
	<dependencies>
 
		<!-- Spring Core -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- Spring jdbc, for database -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- Spring XML to/back object -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- MySQL database driver -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql.driver.version}</version>
		</dependency>
 
		<!-- Spring Batch dependencies -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-infrastructure</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
 
		<!-- Spring Batch unit test -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
 
		<!-- Junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
 
	</dependencies>

 

POJO类:

 

package com.tch.test.spring.batch.entity;
 
import java.text.SimpleDateFormat;
import java.util.Date;
 
public class Report {
 
	private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
	
	private int id;
	private Date date;
 
	public int getId() {
		return id;
	}
 
	public void setId(int id) {
		this.id = id;
	}
 
	public Date getDate() {
		return date;
	}
 
	public void setDate(Date date) {
		this.date = date;
	}

	@Override
	public String toString() {
		return "Report [id=" + id + ", date=" + dateFormat.format(date) + "]";
	}
 
}

 

mapper类:

 

package com.tch.test.spring.batch;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import com.tch.test.spring.batch.entity.Report;
 
public class ReportFieldSetMapper implements FieldSetMapper<Report> {
 
	private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
 
	@Override
	public Report mapFieldSet(FieldSet fieldSet) throws BindException {
		Report report = new Report();
		report.setId(fieldSet.readInt(0));
		String date = fieldSet.readString(1);
		try {
			report.setDate(dateFormat.parse(date));
		} catch (ParseException e) {
			e.printStackTrace();
		}
		return report;
	}
 
}

 

 

 

processor类:

 

package com.tch.test.spring.batch;
import org.springframework.batch.item.ItemProcessor;

import com.tch.test.spring.batch.entity.Report;
 
public class CustomItemProcessor implements ItemProcessor<Report, Report> {
 
	@Override
	public Report process(Report report) throws Exception {
		System.out.println("Processing..." + report);
		return report;
	}
 
}

 

 

writer类:

 

package com.tch.test.spring.batch;
import java.util.List;

import org.springframework.batch.item.ItemWriter;

import com.tch.test.spring.batch.entity.Report;

public class ReportItemWriter implements ItemWriter<Report>{
	public void write(List<? extends Report> reports) throws Exception {
		for (Report m : reports) {
			System.out.println("write results : "+m);
		}
	}
}

 

 

beans.xml:(commit-interval表示批处理每次事务处理记录数,这里每次处理十条)

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
	">

	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="cvsFileItemReader" writer="reportWriter" processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<bean id="cvsFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" value="classpath:report.txt" />
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

</beans>

 

 

report.txt:

 

1001, 29/7/2013
1002, 30/7/2013
1003, 31/7/2013
1004, 29/7/2013
1005,30/7/2013
1006, 31/7/2013
1007, 29/7/2013
1008,30/7/2013
1009, 31/7/2013
1010, 29/7/2013
1011,30/7/2013
1012, 31/7/2013
1013, 29/7/2013
1014,30/7/2013
1015, 31/7/2013
1016, 29/7/2013
1017,30/7/2013
1018, 31/7/2013

 

Test:

 

package com.tch.test.spring.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Test {
	public static void main(String[] args) throws Exception {

		String[] springConfig = { "beans.xml" };

		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
		
		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
		Job job = (Job) context.getBean("helloWorldJob");

		JobExecution execution = jobLauncher.run(job, new JobParameters());
		System.out.println("Exit Status : " + execution.getStatus());
		System.out.println("Done");
		
		context.close();
	}
}

 

 

 

 

如果要读取多个文件,则只需要使用MultiResourceItemReader即可:

 

beans.xml:

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="multiResourceReader" writer="reportWriter"
					processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<bean id="multiResourceReader"
		class=" org.springframework.batch.item.file.MultiResourceItemReader">
		<property name="resources" value="classpath:report*.txt" />
		<property name="delegate" ref="flatFileItemReader" />
	</bean>

	<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" value="classpath:report.txt" />
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean
						class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<bean id="transactionManager"
		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

</beans>

 

 

 

spring batch 和 quartz 定时批处理

 

beans.xml:

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

	<!-- 处理数据 -->
	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<!-- 处理数据的job -->
	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="multiResourceReader" writer="reportWriter"
					processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<!-- 读取多个资源的reader -->
	<bean id="multiResourceReader"
		class=" org.springframework.batch.item.file.MultiResourceItemReader">
		<!-- 资源位置 -->
		<property name="resources" value="classpath:report*.txt" />
		<!-- 使用读取单个资源的reader -->
		<property name="delegate" ref="flatFileItemReader" />
	</bean>

	<!-- 读取单个资源的reader -->
	<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<!-- 分词器 -->
				<property name="lineTokenizer">
					<bean
						class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<!-- 数据和实体的映射处理 -->
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<!-- 数据处理完成之后,自定义写操作 -->
	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<!-- job启动 -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<!-- job仓库 -->
	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>
	
	<!-- 事务管理 -->
	<bean id="transactionManager"
		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

	<!-- job注册 -->
	<bean
		class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
		<property name="jobRegistry" ref="jobRegistry" />
	</bean>

	<bean id="jobRegistry"
		class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

	<!-- 使用quartz进行调度管理(每5秒钟执行一次) -->
	<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="triggers">
			<bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
				<!-- 执行的操作 -->
				<property name="jobDetail" ref="jobDetail" />
				<!-- 定时表达式 -->
				<property name="cronExpression" value="*/5 * * * * ?" />
			</bean>
		</property>
	</bean>

	<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
		<!-- 执行操作的class -->
		<property name="jobClass" value="com.tch.test.spring.batch.JobLauncherDetails" />
		<property name="group" value="quartz-batch" />
		<property name="jobDataAsMap">
			<map>
				<entry key="jobName" value="helloWorldJob" />
				<entry key="jobLocator" value-ref="jobRegistry" />
				<entry key="jobLauncher" value-ref="jobLauncher" />
				<entry key="param1" value="value1" />
				<entry key="param2" value="value2" />
			</map>
		</property>
	</bean>
</beans>

 

 

JobLauncherDetails:

 

package com.tch.test.spring.batch;

import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;

import org.quartz.JobExecutionContext;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.quartz.QuartzJobBean;

public class JobLauncherDetails extends QuartzJobBean {

	static final String JOB_NAME = "jobName";

	private JobLocator jobLocator;

	private JobLauncher jobLauncher;

	public void setJobLocator(JobLocator jobLocator) {
		this.jobLocator = jobLocator;
	}

	public void setJobLauncher(JobLauncher jobLauncher) {
		this.jobLauncher = jobLauncher;
	}

	@SuppressWarnings("unchecked")
	protected void executeInternal(JobExecutionContext context) {

		Map<String, Object> jobDataMap = context.getMergedJobDataMap();

		//拿到beans.xml中jobDetail里面配置的"jobName"对应的value
		String jobName = (String) jobDataMap.get(JOB_NAME);
		JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
		try {
			//运行对应的job
			jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
		} catch (JobExecutionException e) {
			e.printStackTrace();
		}
	}

	// 读取配置的参数
	private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) {
		JobParametersBuilder builder = new JobParametersBuilder();
		for (Entry<String, Object> entry : jobDataMap.entrySet()) {
			String key = entry.getKey();
			Object value = entry.getValue();
			//过滤掉"jobName"
			if (value instanceof String && !key.equals(JOB_NAME)) {
				builder.addString(key, (String) value);
			} else if (value instanceof Float || value instanceof Double) {
				builder.addDouble(key, ((Number) value).doubleValue());
			} else if (value instanceof Integer || value instanceof Long) {
				builder.addLong(key, ((Number) value).longValue());
			} else if (value instanceof Date) {
				builder.addDate(key, (Date) value);
			} else {
				//过滤掉beans.xml中jobDetail的jobDataAsMap配置的"jobLocator"、"jobLauncher"属性
				// JobDataMap contains values which are not job parameters
				// (ignoring)
			}
		}
		// need unique job parameter to rerun the completed job
		builder.addDate("run date", new Date());
		return builder.toJobParameters();
	}

}

 

 

运行:

 

package com.tch.test.spring.batch;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Test {
	public static void main(String[] args) throws Exception {

		String[] springConfig = { "beans.xml" };
		
		context = new ClassPathXmlApplicationContext(springConfig);

	}
}

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Spring Batch批处理框架

    轻松引领进入数据批处理世界:基本特性...深度探索Spring Batch 批处理框架的核心概念:作业配置、作业步配置,以及Spring Batch 框架中经典的三步走策略。 快速提升数据批处理的能力:高性能、高可靠性、并行处理。

    SpringBatch批处理 刘相编

    入门篇介绍了批处理、Spring Batch的基本特性和新特性,快速入门的Hello World等内容引领读者入门,从而进入数据批处理的世界。基本篇重点讲述了数据批处理的核心概念、典型的作业配置、作业步配置,以及Spring ...

    SpringBatch学习入门

    在本文中,我们将深入探讨SpringBatch的学习入门,以及如何在IDEA环境中进行配置和实践。 首先,我们需要了解SpringBatch的核心概念: 1. **Job**:是SpringBatch中的最高级别抽象,代表一个完整的批处理任务,...

    Pro Spring Batch

    《Pro Spring Batch》一书深入探讨了Java Spring Batch框架的核心概念与实际应用,为开发者提供了全面的批处理技术指南。本书由Apress出版,旨在帮助读者掌握Spring Batch框架的精髓,提升在批量数据处理领域的技能...

    spring-batch-demo.rar

    一个SpringBatch的入门案例,SpringBatch是spring体系下的一个轻量级的批处理框架,使用简单方便,是非常优秀的一个批处理框架。如果你的工作中涉及到了批处理作业,可以考虑使用SpringBatch框架,本案例有详细的...

    Spring Batch入门教程篇

    Spring Batch 入门教程篇 Spring Batch 是一个大数据量的并行处理框架,通常用于数据的离线迁移和数据处理,具有事务、并发、流程、监控、纵向和横向扩展等特性,提供统一的接口管理和任务管理。Spring Batch 是 ...

    Spring_Batch系列文章汇总

    Spring_Batch项目在实际开发中可能会遇到一些问题,因为它所提供的高级特性在网上资料尤其是中文资料非常有限,官方文档也可能只是覆盖了入门级别的基础知识。因此,学习者往往需要根据自己的项目需求,结合Spring_...

    详解Spring batch 入门学习教程(附源码)

    详解 Spring Batch 入门学习教程 Spring Batch 是一个开源的批处理框架,执行一系列的任务。在 Spring Batch 中,一个 Job 是由许多 Step 组成的,而每一个 Step 又是由 READ-PROCESS-WRITE 任务或者单个 Task 组成...

    一些Spring的入门与进阶教程

    - 这份教程可能包含实战案例,讲解如何使用Spring进行数据库操作(如JDBC、Hibernate、MyBatis集成)、Spring Security安全框架、Spring Batch批处理等进阶主题。 5. **spring基础教程.rar**: - 这是一份针对...

    Spring Data JPA从入门到精通

    13. **性能优化**:包括懒加载(Lazy Loading)、批处理(Batch Processing)、缓存(Caching)等策略,以提升JPA应用的性能。 通过学习《Spring Data JPA从入门到精通》,读者不仅可以掌握Spring Data JPA的基本...

    spring-batch-kubernetes-gs:Kuernetes上的Spring Batch(GKE)-入门

    GKE上的Spring Batch Spring Batch Job在GKE上运行 描述 Spring批处理模型 工作 它是一个封装整个批处理过程的实体 它只是Step实例的容器 它组合了逻辑上属于流程的多个步骤,并允许配置所有步骤全局的属性 步它是...

    spring入门经典源代码

    Spring Batch则专注于批处理任务,提供了一组高级服务,如跳过失败的记录、重试和分步处理。 10. **实战示例** "Spring入门经典源代码"中的每个章节都配有示例代码,这些示例涵盖了上述各种概念,包括简单的Hello...

    《Spring入门经典》源代码

    8. **Spring Batch**:处理批量数据和批处理任务的框架,提供可重试、事务管理、分割和分区等高级特性。 9. **Spring Cloud**:基于Spring Boot的微服务工具集合,用于构建分布式系统,包括服务发现、配置中心、...

    《Spring入门经典》书中示例代码

    《Spring入门经典》是一本深度剖析Spring框架的入门书籍,旨在帮助初学者快速掌握Spring的核心概念和使用方法。书中的示例代码是理论知识的实践体现,通过这些代码,读者可以直观地理解Spring如何工作,如何配置,...

    Spring MVC 详细入门教程(含jar包)

    Spring MVC可以无缝集成Spring的其他模块,如Spring Security(安全控制)、Spring Data(数据访问)、Spring Batch(批处理)等,构建完整的应用程序。 总的来说,Spring MVC 提供了一套完善的Web开发解决方案,...

    spring入门教程书籍

    5. **Spring Batch**:讨论批处理功能,如何设计和执行大规模数据处理任务。 6. **Spring Integration**:介绍Spring Integration,用于实现不同系统间的集成和消息传递。 7. **Spring Cloud Stream**:讲解如何利用...

    Spring 快速入门教程(PDF)

    Spring Batch是一个用于处理大量数据的批处理框架,它提供了许多核心功能,如读取、处理和写入数据,以及错误处理和事务管理。 九、Spring Integration Spring Integration提供了一套强大的工具,用于实现不同系统...

    Spring入门教程doc

    Spring Batch是针对大规模批处理操作的框架,它提供了一套完整的解决方案,包括读取、处理和写入大量数据,支持事务、错误处理和重启等功能。 10. **测试支持** Spring提供了全面的测试支持,包括单元测试、集成...

    spring 入门学习资料

    Spring Batch是一个用于处理大量数据的批处理框架。它提供了灵活的事务管理、错误处理和重复作业防止机制。 以上只是Spring框架部分核心知识点的简要介绍,实际学习过程中,还需要深入了解每个知识点的具体用法和...

Global site tag (gtag.js) - Google Analytics