需求:每小时定时读取文件,文件名称是按日期递增,文件信息为同一个文件追加数据,需要每次批处理操作读取的时候设置起始行;
目前我们项目中所采用的方法,不知道还有没有更好的方法,希望大家分享
1、创建项目,配置数据源就不讲了,我的是springboot项目,其中(1)为springbatch项目的文件目录(2)为需要处理的文件,(3)为yml中的springbatch配置,其中个属性配置web应用启动的时候不执行批处理,默认为true执行,项目中是通过定时任务来执行赔处理所以设置为启动不执行,第二个属性为设置springbatch是否每次执行都创建其数据库表,第一次执行需要设置为true,初始化完数据库表之后设为false否则会报错 注意:引入的springbatch依赖中不要有内存数据库的依赖
2、job配置文件
package com.st.batch.job; import com.st.batch.listener.JobCompletionNotificationListener; import com.st.batch.step.MemberInfoStepConf; import org.springframework.batch.core.Job; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableBatchProcessing public class JobConf { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public Step memberInfoStep; @Bean public Job MFJob(JobCompletionNotificationListener listener) { return jobBuilderFactory.get("MFJob") .incrementer(new RunIdIncrementer()) //.listener(listener) .flow(memberInfoStep) .end() .build(); } }
3、step配置文件
package com.st.batch.step; import com.st.batch.entity.MemberInfo; import com.st.batch.listener.JobCompletionNotificationListener; import com.st.batch.listener.StepCompletionNotificationListener; import com.st.batch.mapper.MemberInfoLineMapper; import com.st.service.StartCountService; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.JsonLineMapper; import org.springframework.batch.item.file.separator.JsonRecordSeparatorPolicy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import javax.sql.DataSource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; @Configuration @EnableBatchProcessing public class MemberInfoStepConf { @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired public DataSource dataSource; @Autowired StepCompletionNotificationListener steplistener; @Autowired StartCountService startCountService; @Bean public Step memberInfoStep() { return stepBuilderFactory.get("memberInfoStep") .allowStartIfComplete(true) .listener(steplistener) .<MemberInfo, MemberInfo> chunk(10) .reader(memberInfoReader()) .writer(writer()) .build(); } @Bean @StepScope public FlatFileItemReader<MemberInfo> memberInfoReader() { //创建Reader FlatFileItemReader<MemberInfo> reader = new FlatFileItemReader<MemberInfo>(); //加载Resource reader.setResource(new ClassPathResource("classpath:").createRelative("IF_MemberInfo-"+new SimpleDateFormat("yyyy-MM-dd").format(new Date().getTime() - 40*60*1000)+".txt")); //查询起始执行行数 HashMap parm = new HashMap(); parm.put("step", "memberInfoStep"); parm.put("date",new SimpleDateFormat("yyyy-MM-dd").format(new Date(new Date().getTime() - 40*60*1000))); reader.setLinesToSkip(startCountService.getCount(parm) == null ? 0 :startCountService.getCount(parm)); reader.setRecordSeparatorPolicy(new JsonRecordSeparatorPolicy()); reader.setLineMapper(new MemberInfoLineMapper(new JsonLineMapper())); return reader; } @Bean public JdbcBatchItemWriter<MemberInfo> writer() { JdbcBatchItemWriter<MemberInfo> writer = new JdbcBatchItemWriter<MemberInfo>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<MemberInfo>()); writer.setSql("INSERT INTO memberInfo (" + "BrandCode,IFMemberId,MemberCode,MemName,Gender,MobilePhone,Email,IdentityCard,BirthDay,CounterCodeBelong,JoinDate,JoinTime,TotalPoint,MemberLevelCode,DataSource) " + "VALUES (:brandCode,:iFMemberId,:memberCode,:memName,:gender,:mobilePhone,:email,:identityCard,:birthDay,:counterCodeBelong,:joinDate,:joinTime,:totalPoint,:memberLevelCode,:dataSource)"); writer.setDataSource(dataSource); return writer; } }
其中reader.setLinesToSkip()方法为设置起始行数,是通过查询下图中表所获得,图中batch前缀的表为yml中第二项配置所配置,都是batch执行中的状态信息,
中的READ_COUNT来实现的,注意reader上必须使用@StepScope注解
4、LineMapper
package com.st.batch.mapper; import com.st.batch.entity.MemberInfo; import com.st.util.MapUtils; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.JsonLineMapper; /** * Created by admin on 2016/12/29. */ public class MemberInfoLineMapper implements LineMapper<MemberInfo> { private JsonLineMapper delegate; @Override public MemberInfo mapLine(String line, int lineNumber) throws Exception { return MapUtils.toObject(MemberInfo.class,delegate.mapLine(line, lineNumber),true); //将每条对应信息转化为领域对象的工具类 } public MemberInfoLineMapper(JsonLineMapper delegate) { this.delegate = delegate; } public JsonLineMapper getDelegate() { return delegate; } public void setDelegate(JsonLineMapper delegate) { this.delegate = delegate; } }
5、定时调用job
package com.st.scheduled; import com.st.batch.listener.JobCompletionNotificationListener; import com.st.service.StartCountService; import com.st.util.SpringContextUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.Date; /** * Created by sxm on 2016/10/14. */ @Component @Configurable @EnableScheduling public class ScheduledTask { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class); @Autowired StartCountService startCountService; @Scheduled(cron = "0 0/1 * * * ?") public void reportCurrentTime() { JobLauncher launcher = SpringContextUtil.getBean(JobLauncher.class); Job importUserJob = SpringContextUtil.getBean("MFJob"); JobParameters jobParameters = new JobParametersBuilder() .addDate("date", new Date()).toJobParameters(); try { launcher.run(importUserJob, jobParameters); log.info("批处理任务执行完成,date:"+new Date()); } catch (Exception e) { e.printStackTrace(); } } }
IF_MemberInfo-2016-12-30.txt json格式
{"BrandCode":"MF","IFMemberId":"1267266","MemberCode":"13489568093","MemName":"\u5927\u5927\u5927\u6a58\u5b50458","Gender":"0","MobilePhone":"13489568093","Email":" ","IdentityCard":"","BirthYear":"","BirthDay":" ","CounterCodeBelong":"","BaCodeBelong":" ","JoinDate":"2016-12-28","JoinTime":" ","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"TB"} {"BrandCode":"MF","IFMemberId":"1267265","MemberCode":"13840017311","MemName":"\u6768\u96e8\u6615","Gender":"2","MobilePhone":"13840017311","Email":" ","IdentityCard":"","BirthYear":"1999","BirthDay":"0806","CounterCodeBelong":"mf0sy003","BaCodeBelong":"14129994","JoinDate":"2016-12-28","JoinTime":"12:36:30 ","TotalPoint":"802","MemberLevelCode":"WMLC002","DataSource":"POS3"} {"BrandCode":"MF","IFMemberId":"1267264","MemberCode":"18210648271","MemName":"","Gender":"","MobilePhone":"18210648271","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"} {"BrandCode":"MF","IFMemberId":"1267263","MemberCode":"18753740991","MemName":"","Gender":"","MobilePhone":"18753740991","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"} {"BrandCode":"MF","IFMemberId":"1267262","MemberCode":"13918726271","MemName":"","Gender":"","MobilePhone":"13918726271","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"} {"BrandCode":"MF","IFMemberId":"1267261","MemberCode":"15533079902","MemName":"","Gender":"","MobilePhone":"15533079902","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"} {"BrandCode":"MF","IFMemberId":"1267260","MemberCode":"18213506880","MemName":"\u9a6c\u5c0f\u59d0","Gender":"2","MobilePhone":"18213506880","Email":" ","IdentityCard":"","BirthYear":"1990","BirthDay":"0625","CounterCodeBelong":"MF0KM003","BaCodeBelong":"16108991","JoinDate":"2016-12-28","JoinTime":"12:14:23 ","TotalPoint":"804","MemberLevelCode":"WMLC002","DataSource":"MPOS"} {"BrandCode":"MF","IFMemberId":"1267259","MemberCode":"15295502603","MemName":"","Gender":"","MobilePhone":"15295502603","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"} {"BrandCode":"MF","IFMemberId":"1265714","MemberCode":"18539039009","MemName":"\u6881\u4fca\u971e","Gender":"2","MobilePhone":"18539039009","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-26","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"} {"BrandCode":"MF","IFMemberId":"1262436","MemberCode":"13751786171","MemName":"\u674e\u6631\u84d3","Gender":"2","MobilePhone":"13751786171","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-23","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"} {"BrandCode":"MF","IFMemberId":"1262436","MemberCode":"13751786171","MemName":"\u674e\u6631\u84d3","Gender":"2","MobilePhone":"13751786171","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-23","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}
这只是项目起始的一个Demo,之后准备定义统一的Reader,并将writer中的模板方式插入数据库改为常用的mybatis方式,
相关推荐
Spring Batch 是一个强大的、可扩展的Java框架,专门用于处理批量数据处理任务,包括大量数据库数据的读取、处理和写入。它被设计为在企业级应用中处理大规模、高吞吐量的工作负载。本篇文章将深入探讨如何利用...
4. **Item Processor**: 对读取的数据进行处理,添加业务逻辑。 5. **Item Writer**: 将处理后的数据写入目标,如数据库、文件系统等。 6. **Job Repository**: 存储有关批处理任务状态的信息,用于恢复失败的任务或...
4. **创建读取器(ItemReader)、处理器(ItemProcessor)和写入器(ItemWriter)**:这是Spring Batch的三个核心组件,它们分别负责从数据源读取数据、处理数据和将处理后的数据写入目标。例如,我们可以使用...
2. **Spring Batch配置**:包含了作业和步骤的定义,以及读取和写入数据的ItemReader和ItemWriter实现。 3. **JobStore配置**:可能使用了`org.quartz.impl.jdbcjobstore.JobStoreTX`或`org.quartz.impl....
Spring Batch是一个开源的轻量级批处理框架,它提供了一整套可复用的组件,用于构建健壮且高效的批处理应用程序。由于信息给定的【部分内容】并没有提供实际的技术细节,因此我将基于Spring Batch框架本身,详细介绍...
Spring Batch 是一个强大的Java框架,专门用于处理批量数据处理任务。在Spring Batch中,分区处理是一种优化策略,它将大型工作负载分解成多个较小、独立的任务,这些任务可以在不同的线程或甚至不同的节点上并行...
- **Chunk 原理**:Spring Batch 使用 Chunk 模式处理数据,即按固定数量读取、处理和写入数据,以保证性能和事务一致性。 2. **读取器(Readers)**: - **FlatFileItemReader**:用于读取文本文件,如 CSV 或...
Spring Batch是一个强大的批处理框架,用于在后台执行大量数据处理任务,如数据迁移、报表生成等,它提供了丰富的功能来处理大量的记录集,并且能够确保处理过程的可靠性和一致性。 ### Spring Batch核心概念 ####...
《Spring Batch in Action》是一本深入探讨Spring Batch框架的书籍,由Arnaud Cogoluègnes、Thierry Templier、Gary Gregory和Olivier Bazoud共同编写,Manning出版社出版。这本书旨在帮助读者理解和掌握如何使用...
Spring Batch 是一个全面、强大的批处理框架,它提供了大量用于处理批量数据的功能,包括读取、处理和写入数据,以及事务管理、错误处理等。Spring Batch 的核心组件包括: 1. **Job**: 代表一个完整的批处理任务,...
Spring Batch 是一个强大的Java框架,专门用于处理批量数据处理任务。在给定的"Spring Batch 示例程序"中,我们可以深入探讨这个框架的核心概念和在实际应用中的使用方式。该示例程序采用的是Spring 3作为基础框架,...
2. **步骤(Step)**:作业的执行单元,每个步骤中包含读取(Read)、处理(Process)和写入(Write)数据的逻辑。 3. **读器(Reader)**:负责从数据源中读取数据,如数据库、文件等。 4. **处理器(Processor)**...
SpringBatch 是一个强大的Java批处理框架,由Spring社区开发并维护,主要用于处理大量数据的后台操作,如批量导入导出、日志分析、定时任务等。在本文中,我们将深入探讨SpringBatch的学习入门,以及如何在IDEA环境...
1. **读/写操作**:Spring Batch 提供了多种ItemReader和ItemWriter接口,如JdbcPagingItemReader用于分页读取数据库数据,FlatFileItemWriter用于将数据写入文本文件等。 2. **事务管理**:Spring Batch 自动处理...
1. **读取数据**:Spring Batch 提供多种`ItemReader`接口的实现,如JDBCItemReader,用于从数据库、文件等数据源读取数据。在本示例中,可能展示了如何配置并使用这些读者来获取输入数据。 2. **处理数据**:`...
Spring Batch 是一个强大的、全面的批处理框架,用于处理大量数据。它被广泛应用于企业级应用,特别是那些需要高效、可扩展性和可管理性的场景。本实例代码将帮助你理解和运用Spring Batch的基本概念和功能。 在...
在 Spring Batch 中,批处理作业被分解为一系列步骤(steps),每个步骤又由读取(read)、处理(process)和写入(write)操作组成。这些操作可以通过各种类型的 ItemReader、ItemProcessor 和 ItemWriter 进行实现...
Spring Batch框架使用ItemReader分页读取数据,ItemWriter批量写数据,但缺乏Elasticsearch的ItemWriter和ItemReader,故而需要自定义ElasticsearchItemWriter和ElasticsearchItemReader用于批量导入。 Spring ...
Job 是Spring Batch 中批处理任务的表示,由一个或多个Step组成,每个Step则包含数据读取、处理和写入的过程。JobRepository 是用于持久化Job执行数据的存储机制,而JobLauncher 负责启动Job。 ItemReader 是数据...