`
dota_00085
  • 浏览: 11118 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

springbatch每小时读取实时追加数据的每日报表入库

阅读更多

需求:每小时定时读取文件,文件名称是按日期递增,文件信息为同一个文件追加数据,需要每次批处理操作读取的时候设置起始行;

 

目前我们项目中所采用的方法,不知道还有没有更好的方法,希望大家分享

 

1、创建项目,配置数据源就不讲了,我的是springboot项目,其中(1)为springbatch项目的文件目录(2)为需要处理的文件,(3)为yml中的springbatch配置,其中个属性配置web应用启动的时候不执行批处理,默认为true执行,项目中是通过定时任务来执行赔处理所以设置为启动不执行,第二个属性为设置springbatch是否每次执行都创建其数据库表,第一次执行需要设置为true,初始化完数据库表之后设为false否则会报错  注意:引入的springbatch依赖中不要有内存数据库的依赖


 
 2、job配置文件

package com.st.batch.job;

import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.batch.step.MemberInfoStepConf;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class JobConf {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public Step memberInfoStep;
    @Bean
    public Job MFJob(JobCompletionNotificationListener listener) {

        return jobBuilderFactory.get("MFJob")
                .incrementer(new RunIdIncrementer())
                //.listener(listener)
                .flow(memberInfoStep)
                .end()
                .build();
    }

}

 

 

 3、step配置文件

 

package com.st.batch.step;

import com.st.batch.entity.MemberInfo;
import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.batch.listener.StepCompletionNotificationListener;
import com.st.batch.mapper.MemberInfoLineMapper;
import com.st.service.StartCountService;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.JsonLineMapper;
import org.springframework.batch.item.file.separator.JsonRecordSeparatorPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import javax.sql.DataSource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

@Configuration
@EnableBatchProcessing
public class MemberInfoStepConf {

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public DataSource dataSource;

    @Autowired
    StepCompletionNotificationListener steplistener;

    @Autowired
    StartCountService startCountService;


    @Bean
    public Step memberInfoStep() {
        return stepBuilderFactory.get("memberInfoStep")
                .allowStartIfComplete(true)
                .listener(steplistener)
                .<MemberInfo, MemberInfo> chunk(10)
                .reader(memberInfoReader())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<MemberInfo> memberInfoReader() {

        //创建Reader
        FlatFileItemReader<MemberInfo> reader = new FlatFileItemReader<MemberInfo>();
        //加载Resource
        reader.setResource(new ClassPathResource("classpath:").createRelative("IF_MemberInfo-"+new SimpleDateFormat("yyyy-MM-dd").format(new Date().getTime() - 40*60*1000)+".txt"));
        //查询起始执行行数
        HashMap parm = new HashMap();
        parm.put("step", "memberInfoStep");
        parm.put("date",new SimpleDateFormat("yyyy-MM-dd").format(new Date(new Date().getTime() - 40*60*1000)));
        reader.setLinesToSkip(startCountService.getCount(parm) == null ? 0 :startCountService.getCount(parm));
        reader.setRecordSeparatorPolicy(new JsonRecordSeparatorPolicy());
        reader.setLineMapper(new MemberInfoLineMapper(new JsonLineMapper()));
        return reader;
    }

    @Bean
    public JdbcBatchItemWriter<MemberInfo> writer() {
        JdbcBatchItemWriter<MemberInfo> writer = new JdbcBatchItemWriter<MemberInfo>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<MemberInfo>());
        writer.setSql("INSERT INTO memberInfo (" +
                "BrandCode,IFMemberId,MemberCode,MemName,Gender,MobilePhone,Email,IdentityCard,BirthDay,CounterCodeBelong,JoinDate,JoinTime,TotalPoint,MemberLevelCode,DataSource) " +
                "VALUES (:brandCode,:iFMemberId,:memberCode,:memName,:gender,:mobilePhone,:email,:identityCard,:birthDay,:counterCodeBelong,:joinDate,:joinTime,:totalPoint,:memberLevelCode,:dataSource)");
        writer.setDataSource(dataSource);
        return writer;
    }
}

 其中reader.setLinesToSkip()方法为设置起始行数,是通过查询下图中表所获得,图中batch前缀的表为yml中第二项配置所配置,都是batch执行中的状态信息,

 

 中的READ_COUNT来实现的,注意reader上必须使用@StepScope注解

4、LineMapper

 

package com.st.batch.mapper;

import com.st.batch.entity.MemberInfo;
import com.st.util.MapUtils;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.JsonLineMapper;

/**
 * Created by admin on 2016/12/29.
 */
public class MemberInfoLineMapper implements LineMapper<MemberInfo> {

    private JsonLineMapper delegate;
    @Override
    public MemberInfo mapLine(String line, int lineNumber) throws Exception {
        return MapUtils.toObject(MemberInfo.class,delegate.mapLine(line, lineNumber),true);
//将每条对应信息转化为领域对象的工具类
    }

    public MemberInfoLineMapper(JsonLineMapper delegate) {
        this.delegate = delegate;
    }

    public JsonLineMapper getDelegate() {
        return delegate;
    }

    public void setDelegate(JsonLineMapper delegate) {
        this.delegate = delegate;
    }
}

 5、定时调用job

 

package com.st.scheduled;


import com.st.batch.listener.JobCompletionNotificationListener;
import com.st.service.StartCountService;
import com.st.util.SpringContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * Created by sxm on 2016/10/14.
 */
@Component
@Configurable
@EnableScheduling
public class ScheduledTask {
    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    @Autowired
    StartCountService startCountService;

    @Scheduled(cron = "0 0/1 * * * ?")
    public void reportCurrentTime() {
        JobLauncher launcher = SpringContextUtil.getBean(JobLauncher.class);
        Job importUserJob = SpringContextUtil.getBean("MFJob");
        JobParameters jobParameters = new JobParametersBuilder()
                .addDate("date", new Date()).toJobParameters();
        try {
            launcher.run(importUserJob, jobParameters);
            log.info("批处理任务执行完成,date:"+new Date());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 IF_MemberInfo-2016-12-30.txt      json格式

 

{"BrandCode":"MF","IFMemberId":"1267266","MemberCode":"13489568093","MemName":"\u5927\u5927\u5927\u6a58\u5b50458","Gender":"0","MobilePhone":"13489568093","Email":" ","IdentityCard":"","BirthYear":"","BirthDay":"    ","CounterCodeBelong":"","BaCodeBelong":" ","JoinDate":"2016-12-28","JoinTime":"          ","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"TB"}
{"BrandCode":"MF","IFMemberId":"1267265","MemberCode":"13840017311","MemName":"\u6768\u96e8\u6615","Gender":"2","MobilePhone":"13840017311","Email":" ","IdentityCard":"","BirthYear":"1999","BirthDay":"0806","CounterCodeBelong":"mf0sy003","BaCodeBelong":"14129994","JoinDate":"2016-12-28","JoinTime":"12:36:30  ","TotalPoint":"802","MemberLevelCode":"WMLC002","DataSource":"POS3"}
{"BrandCode":"MF","IFMemberId":"1267264","MemberCode":"18210648271","MemName":"","Gender":"","MobilePhone":"18210648271","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267263","MemberCode":"18753740991","MemName":"","Gender":"","MobilePhone":"18753740991","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267262","MemberCode":"13918726271","MemName":"","Gender":"","MobilePhone":"13918726271","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267261","MemberCode":"15533079902","MemName":"","Gender":"","MobilePhone":"15533079902","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1267260","MemberCode":"18213506880","MemName":"\u9a6c\u5c0f\u59d0","Gender":"2","MobilePhone":"18213506880","Email":" ","IdentityCard":"","BirthYear":"1990","BirthDay":"0625","CounterCodeBelong":"MF0KM003","BaCodeBelong":"16108991","JoinDate":"2016-12-28","JoinTime":"12:14:23  ","TotalPoint":"804","MemberLevelCode":"WMLC002","DataSource":"MPOS"}
{"BrandCode":"MF","IFMemberId":"1267259","MemberCode":"15295502603","MemName":"","Gender":"","MobilePhone":"15295502603","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"WEIXIN","BaCodeBelong":"","JoinDate":"2016-12-28","JoinTime":"","TotalPoint":"","MemberLevelCode":"WMLC003","DataSource":"wechat"}
{"BrandCode":"MF","IFMemberId":"1265714","MemberCode":"18539039009","MemName":"\u6881\u4fca\u971e","Gender":"2","MobilePhone":"18539039009","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-26","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}
{"BrandCode":"MF","IFMemberId":"1262436","MemberCode":"13751786171","MemName":"\u674e\u6631\u84d3","Gender":"2","MobilePhone":"13751786171","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-23","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}
{"BrandCode":"MF","IFMemberId":"1262436","MemberCode":"13751786171","MemName":"\u674e\u6631\u84d3","Gender":"2","MobilePhone":"13751786171","Email":"","IdentityCard":"","BirthYear":"","BirthDay":"","CounterCodeBelong":"MF0TMALL","BaCodeBelong":"99999998","JoinDate":"2016-12-23","JoinTime":"","TotalPoint":"0","MemberLevelCode":"WMLC003","DataSource":"bycsv"}

 这只是项目起始的一个Demo,之后准备定义统一的Reader,并将writer中的模板方式插入数据库改为常用的mybatis方式,

 

  • 大小: 359.2 KB
  • 大小: 169.3 KB
  • 大小: 6.4 KB
  • 大小: 16.2 KB
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    spring Batch实现数据库大数据量读写

    Spring Batch 是一个强大的、可扩展的Java框架,专门用于处理批量数据处理任务,包括大量数据库数据的读取、处理和写入。它被设计为在企业级应用中处理大规模、高吞吐量的工作负载。本篇文章将深入探讨如何利用...

    基于Spring Batch的大数据量并行处理

    - **领域模型**:Spring Batch定义了一系列核心领域概念,如Job(作业)、Step(步骤)、Chunk(块)、Reader(读取器)、Processor(处理器)和Writer(写入器),这些概念构成了处理数据流的基本单元。 - **应用...

    springBoot+springBatch批量处理数据demo

    这个“springBoot+springBatch批量处理数据demo”涵盖了SpringBoot集成SpringBatch的基础流程,演示了如何构建一个批处理任务,从数据源读取,进行处理,然后写入结果。对于大型数据集的处理,这样的组合提供了强大...

    Spring Batch in Action英文pdf版

    Spring Batch通过批次作业(Batch Jobs)来组织和控制大规模数据处理任务,其中每个批次作业由一系列步骤(Steps)组成,每个步骤又可以包含多个任务(Tasklets)和读写操作(Readers and Writers)。 知识点二:...

    Spring batch in action

    Spring Batch允许开发者通过实现ItemProcessor接口来添加业务逻辑处理数据项。这使得开发者可以在读取数据后和写入数据前对数据进行复杂的转换和验证。 为了确保批处理作业能够可靠地执行,Spring Batch提供了事务...

    springbatch 详解PDF附加 全书源码 压缩包

    4. **Item Processor**: 对读取的数据进行处理,添加业务逻辑。 5. **Item Writer**: 将处理后的数据写入目标,如数据库、文件系统等。 6. **Job Repository**: 存储有关批处理任务状态的信息,用于恢复失败的任务或...

    Spring Boot整合Spring Batch,实现批处理

    4. **创建读取器(ItemReader)、处理器(ItemProcessor)和写入器(ItemWriter)**:这是Spring Batch的三个核心组件,它们分别负责从数据源读取数据、处理数据和将处理后的数据写入目标。例如,我们可以使用...

    quartz整合springbatch动态集群定时实现mysql参考

    2. **Spring Batch配置**:包含了作业和步骤的定义,以及读取和写入数据的ItemReader和ItemWriter实现。 3. **JobStore配置**:可能使用了`org.quartz.impl.jdbcjobstore.JobStoreTX`或`org.quartz.impl....

    spring-batch分区处理示例

    Spring Batch 是一个强大的Java框架,专门用于处理批量数据处理任务。在Spring Batch中,分区处理是一种优化策略,它将大型工作负载分解成多个较小、独立的任务,这些任务可以在不同的线程或甚至不同的节点上并行...

    Spring Batch学习demo项目源码

    3. **读取器(Reader)**:Spring Batch 提供了多种数据读取器,如`FlatFileItemReader`用于读取文本文件,`JdbcPagingItemReader`用于从数据库分页读取数据。在XML配置中,它们通常作为步骤的一部分配置。 4. **...

    Spring Batch In Action

    Spring Batch是一个强大的批处理框架,用于在后台执行大量数据处理任务,如数据迁移、报表生成等,它提供了丰富的功能来处理大量的记录集,并且能够确保处理过程的可靠性和一致性。 ### Spring Batch核心概念 ####...

    Spring Batch in Action

    《Spring Batch in Action》是一本深入探讨Spring Batch框架的书籍,由Arnaud Cogoluègnes、Thierry Templier、Gary Gregory和Olivier Bazoud共同编写,Manning出版社出版。这本书旨在帮助读者理解和掌握如何使用...

    spring-batch+quartz处理mysql数据示例

    Spring Batch 是一个全面、强大的批处理框架,它提供了大量用于处理批量数据的功能,包括读取、处理和写入数据,以及事务管理、错误处理等。Spring Batch 的核心组件包括: 1. **Job**: 代表一个完整的批处理任务,...

    spring batch 读取多个文件数据导入数据库示例

    Spring Batch 是一个强大的框架,专门用于处理批量数据处理任务,如读取大量文件并将其数据导入到数据库中。在本文中,我们将深入探讨如何使用 Spring Batch 实现读取多个文件并将数据导入数据库的示例。 首先,让...

    SpringBatch+SpringBoot构建海量数据企业批处理系统和性能优化

    SpringBatch+SpringBoot构建海量数据企业批处理系统和性能优化,Spring Batch是一个基于Spring的企业级批处理框架,所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再...

    spring batch使用reader读数据的内存容量问题详解

    Spring Batch 使用 Reader 读数据的内存容量问题详解 Spring Batch 使用 Reader 读数据的内存容量问题是指在数据迁移过程中,如何避免内存溢出问题。这个问题的关键在于 Reader 的实现方式,当数据量非常大时,...

    springbatch简单用

    SpringBatch 提供了处理批量作业所需的全部基础设施,包括读取、处理和写入数据,以及作业调度、事务管理、错误处理等关键功能。 在"springbatch简单用"的场景中,我们通常会关注以下几个核心概念和步骤: 1. **...

    SpringBatch学习入门

    SpringBatch 是一个强大的Java批处理框架,由Spring社区开发并维护,主要用于处理大量数据的后台操作,如批量导入导出、日志分析、定时任务等。在本文中,我们将深入探讨SpringBatch的学习入门,以及如何在IDEA环境...

    详细spring batch资料

    Spring Batch 是一个强大的Java框架,专门用于处理批量处理任务。它是Spring生态系统的组成部分,提供了大量功能,如事务管理、错误处理、作业跟踪和监控。在本文中,我们将深入探讨Spring Batch的基本概念、核心...

Global site tag (gtag.js) - Google Analytics