`
virusfu
  • 浏览: 183085 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

spring Batch实现数据库大数据量读写

阅读更多

1. data-source-context.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans	http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

	<!-- 1) USE ANNOTATIONS TO IDENTIFY AND WIRE SPRING BEANS. -->
	<context:component-scan base-package="net.etongbao.vasp.ac" />
	
	<!-- 2) DATASOURCE, TRANSACTION MANAGER AND JDBC TEMPLATE -->
 
	<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"
		destroy-method="close" abstract="false" scope="singleton">
		<!-- oracle.jdbc.driver.oracleDriver -->
		<property name="driverClass" value="oracle.jdbc.OracleDriver" />
		<property name="jdbcUrl" value="jdbc:oracle:thin:@192.168.1.23:1521:orcl01" />
		<property name="user" value="USR_DEV01" />
		<property name="password" value="2AF0829C" />
		<property name="checkoutTimeout" value="30000" />
		<property name="maxIdleTime" value="120" />
		<property name="maxPoolSize" value="100" />
		<property name="minPoolSize" value="2" />
		<property name="initialPoolSize" value="2" />
		<property name="maxStatements" value="0" />
		<property name="maxStatementsPerConnection" value="0" />
		<property name="idleConnectionTestPeriod" value="30" />	
	</bean>
	<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>
	
	<tx:annotation-driven transaction-manager="transactionManager" />
</beans>

 2. quartz-context.xml       commit-interval="10000"每次批量数据的条数,数值越大效率越高,可在此处添加事物处理,

每次回滚数就是commit-interval数

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans	http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
	http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

	<import resource="data-source-context.xml"/>
	
	<!--   JOB REPOSITORY - WE USE IN-MEMORY REPOSITORY FOR OUR EXAMPLE -->
	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>
	
	<!-- batch config -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<!--  FINALLY OUR JOB DEFINITION. THIS IS A 1 STEP JOB -->
	<batch:job id="ledgerJob">
		<batch:listeners>
			<batch:listener ref="appJobExecutionListener" />
		</batch:listeners>
		<batch:step id="step1">
		   <batch:tasklet transaction-manager="transactionManager">
			<batch:tasklet>
				<batch:listeners>
					<batch:listener ref="itemFailureLoggerListener" />
				</batch:listeners>
				<batch:chunk reader="ledgerReader" writer="ledgerWriter"
					commit-interval="10000" /> <!-- 1万条进行一次commit -->
			</batch:tasklet>
                   </batch:tasklet>
		</batch:step>
	</batch:job>
	 
	<!--  READER -->
	<bean id="ledgerReader"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
		<property name="sql" value="select * from ledger" /> 
		<property name="rowMapper" ref="ledgerRowMapper" />
	</bean>
	 
	<!-- Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,
		添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance -->
	<bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" />
	
	<!-- 定时任务 开始 -->  
	<bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">  
		<property name="targetObject">  
			<!-- 定时执行的类 -->  
			<ref bean="quartzLedgerJob" />  
		</property>  
		<property name="targetMethod">  
			<!-- 定时执行的类方法 -->  
			<value>execute</value>  
		</property>  
	</bean>  
  
	<bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" >  
		<!-- 这里不可以直接在属性jobDetail中引用taskJob,因为他要求的是一个jobDetail类型的对象,所以我们得通过MethodInvokingJobDetailFactoryBean来转一下 -->  
		<property name="jobDetail" >  
			<ref bean="ledgerJobDetail" />  
		</property>  
		<!--在每天下午18点到下午18:59期间的每1分钟触发  -->  
		<!--在每天上午10点40分准时触发  -->  
		<property name="cronExpression" >  
			<!-- <value>0 * 15 * * ?</value> -->
			<value>0 45 10 * * ? * </value> 
		</property>  
        
	</bean>  
      
	<!-- 触发器工厂,将所有的定时任务都注入工厂-->  
	<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">  
		<!-- 添加触发器 -->  
		<property name="triggers">  
			<list>  
				<!-- 将上面定义的测试定时任务注入(可以定义多个定时任务,同时注入)-->  
				<ref local="ledgerCronTrigger" />  
			</list>  
		</property>  
	</bean>  
	<!-- 定时任务 结束 -->  
</beans>  

 

3.定时调度job类 QuartzLedgerJob.java

 

 package net.etongbao.vasp.ac.quartz;

import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;

/**
 * 定时调度类
 * @author Fu Wei
 *
 */

@Component("quartzLedgerJob")
public class QuartzLedgerJob {

	private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);

	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private Job ledgerJob;

	@Autowired
	JobParametersBuilder jobParameterBulider;

	private static long counter = 0l;
	
	/**
	 * 执行业务方法
	 * @throws Exception
	 */
	public void execute() throws Exception {
		LOG.debug("start...");
		StopWatch sw = new StopWatch();
		sw.start();
		/*
		 * Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,
		 * 可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,
		 * 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance
		 */
		jobParameterBulider.addDate("date", new Date());
		jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());
		sw.stop();
		LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", sw.prettyPrint(), ++counter);
	}
}
 

 4.程序启动类 StartQuartz.java

 package net.etongbao.vasp.ac.quartz;

import java.io.FileNotFoundException;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 启动定时调度
 * @author Fu Wei
 *
 */
public class StartQuartz {

	public static void main(String[] args) throws FileNotFoundException {

		new ClassPathXmlApplicationContext("/net/etongbao/vasp/ac/resources/quartz-context.xml");
	}
}

 

5.pojo类 Ledger.java

 

package net.etongbao.vasp.ac.pojo;

import java.io.Serializable;
import java.util.Date;

public class Ledger implements Serializable {

	private int id;
	private Date receiptDate;
	private String memberName;
	private String checkNumber;
	private Date checkDate;
	private String paymentType;
	private double depositAmount;
	private double paymentAmount;
	private String comments;

	public Ledger() {
		super();
	}

	public Ledger(int id, Date receiptDate, String memberName, String checkNumber, Date checkDate, String paymentType,
	        double depositAmount, double paymentAmount, String comments) {
		super();
		this.id = id;
		this.receiptDate = receiptDate;
		this.memberName = memberName;
		this.checkNumber = checkNumber;
		this.checkDate = checkDate;
		this.paymentType = paymentType;
		this.depositAmount = depositAmount;
		this.paymentAmount = paymentAmount;
		this.comments = comments;
	}

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public Date getReceiptDate() {
		return receiptDate;
	}

	public void setReceiptDate(Date receiptDate) {
		this.receiptDate = receiptDate;
	}

	public String getMemberName() {
		return memberName;
	}

	public void setMemberName(String memberName) {
		this.memberName = memberName;
	}

	public String getCheckNumber() {
		return checkNumber;
	}

	public void setCheckNumber(String checkNumber) {
		this.checkNumber = checkNumber;
	}

	public Date getCheckDate() {
		return checkDate;
	}

	public void setCheckDate(Date checkDate) {
		this.checkDate = checkDate;
	}

	public String getPaymentType() {
		return paymentType;
	}

	public void setPaymentType(String paymentType) {
		this.paymentType = paymentType;
	}

	public double getDepositAmount() {
		return depositAmount;
	}

	public void setDepositAmount(double depositAmount) {
		this.depositAmount = depositAmount;
	}

	public double getPaymentAmount() {
		return paymentAmount;
	}

	public void setPaymentAmount(double paymentAmount) {
		this.paymentAmount = paymentAmount;
	}

	public String getComments() {
		return comments;
	}

	public void setComments(String comments) {
		this.comments = comments;
	}
}

 

 6. LedgerDaoImpl.java

 package net.etongbao.vasp.ac.dao.impl;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import net.etongbao.vasp.ac.dao.LedgerDao;
import net.etongbao.vasp.ac.pojo.Ledger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.stereotype.Repository;

/**
 * ledger数据操作类
 * 
 * @author Fu Wei
 * 
 */

@Repository
public class LedgerDaoImpl implements LedgerDao {

	private static final String SAVE_SQL = "insert into ledger_temp (rcv_dt, mbr_nm, chk_nbr, chk_dt, pymt_typ, dpst_amt, pymt_amt, comments) values(?,?,?,?,?,?,?,?)";

	@Autowired
	private JdbcTemplate jdbcTemplate;

	@Override
	public void save(final Ledger item) {
		jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() {
			public void setValues(PreparedStatement stmt) throws SQLException {
				stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime()));
				stmt.setString(2, item.getMemberName());
				stmt.setString(3, item.getCheckNumber());
				stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime()));
				stmt.setString(5, item.getPaymentType());
				stmt.setDouble(6, item.getDepositAmount());
				stmt.setDouble(7, item.getPaymentAmount());
				stmt.setString(8, item.getComments());
			}
		});
	}

}
 

7.接口 LedgerDao .java

 

 

package net.etongbao.vasp.ac.dao;

import net.etongbao.vasp.ac.pojo.Ledger;

public interface LedgerDao {
	public void save(final Ledger item) ;
}

 

 8. JdbcTemplete 需要的LedgerRowMapper.java

  package net.etongbao.vasp.ac.batch.writer;

import java.sql.ResultSet;
import java.sql.SQLException;

import net.etongbao.vasp.ac.pojo.Ledger;

import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
 
/**
 * ledger行的映射类
 * @author Administrator
 *
 */
@Component("ledgerRowMapper")
public class LedgerRowMapper implements RowMapper {
	public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
		Ledger ledger = new Ledger();
		ledger.setId(rs.getInt("id"));
		ledger.setReceiptDate(rs.getDate("rcv_dt"));
		ledger.setMemberName(rs.getString("mbr_nm"));
		ledger.setCheckNumber(rs.getString("chk_nbr"));
		ledger.setCheckDate(rs.getDate("chk_dt"));
		ledger.setPaymentType(rs.getString("pymt_typ"));
		ledger.setDepositAmount(rs.getDouble("dpst_amt"));
		ledger.setPaymentAmount(rs.getDouble("pymt_amt"));
		ledger.setComments(rs.getString("comments"));
		return ledger;
	}
}

 

9.关键类LedgerWriter.java ,写入数据,负责数据的添加 

 

package net.etongbao.vasp.ac.batch.writer;

import java.util.List;

import net.etongbao.vasp.ac.dao.LedgerDao;
import net.etongbao.vasp.ac.pojo.Ledger;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * ledger写入数据
 * 
 * @author Fu Wei
 * 
 */
@Component("ledgerWriter")
public class LedgerWriter implements ItemWriter<Ledger> {

	@Autowired
	private LedgerDao ledgerDao;

	/**
	 * 写入数据
	 * 
	 * @param ledgers
	 */
	public void write(List<? extends Ledger> ledgers) throws Exception {
		for (Ledger ledger : ledgers) {
			ledgerDao.save(ledger);
		}
	}

}

 

 

 classPath:

 <?xml version="1.0" encoding="UTF-8"?>

<classpath>
	<classpathentry kind="src" path="src"/>
	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jrockit-jdk1.6.0_24-R28.1.3-4.0.1"/>
	<classpathentry kind="lib" path="lib/aopalliance-1.0.jar"/>
	<classpathentry kind="lib" path="lib/c3p0-0.9.1.2.jar"/>
	<classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/>
	<classpathentry kind="lib" path="lib/commons-lang-2.3.jar"/>
	<classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>
	<classpathentry kind="lib" path="lib/etb-log4j-1.2.16.jar"/>
	<classpathentry kind="lib" path="lib/etb-slf4j-api-1.5.8.jar"/>
	<classpathentry kind="lib" path="lib/etb-slf4j-log4j12-1.5.8.jar"/>
	<classpathentry kind="lib" path="lib/ojdbc6.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.aop-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.asm-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.aspects-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.beans-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.context-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.context.support-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.core-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.expression-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.instrument-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.instrument.tomcat-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.jdbc-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.jms-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.orm-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.oxm-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.test-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/org.springframework.transaction-3.0.5.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/quartz-all-1.6.5.jar"/>
	<classpathentry kind="lib" path="lib/spring-batch-core-2.1.6.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/spring-batch-infrastructure-2.1.6.RELEASE.jar"/>
	<classpathentry kind="lib" path="lib/spring-batch-test-2.1.6.RELEASE.jar"/>
	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
	<classpathentry kind="output" path="bin"/>
</classpath>
 

 

  总结: 测试数据8万多条,响应时间3分多钟。

 

关键在于quartz-context.xml 中<bean id="ledgerReader"

class="org.springframework.batch.item.database.JdbcCursorItemReader">

<property name="dataSource" ref="dataSource" />

<property name="sql" value="select * from ledger" /> 

<property name="rowMapper" ref="ledgerRowMapper" />

</bean> 负责读取数据 ,在程序执行时一次性抓取全部数据后在批量的交给LedgerWriter进行写操作。当然也可以使用分页读取JdbcPagingItemReader,但要分页数量与写入数量要大写相同,还可以对分页出来的数据进行添加悲观锁

LedgerWriter.java 负责写入数据,每次写入1000条。

 

 

 

 

 

分享到:
评论
9 楼 yjc2020 2015-01-26  
问个问题,spring batch 批量写数据时能否做日志输出
8 楼 JimmyLincole 2014-08-25  
yjc2020 写道
JimmyLincole 写道
1楼的问题,解决方法可以看一下这个:


The issue here is that your reader is singleton scoped. This means that when the job runs the first time, the ItemReader is opened successfully and the job runs. However when the job attempts to run a second time, it's using the same instance it did the first time which is already initialized (hence the exception). I'd recommend changing the readyReqPoolReader to be a step scoped bean and see if that helps.

You can read more about step scope here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/scope/StepScope.html


3.0与2.2.7 有什么差别,我升级jar包后配置文件出错

3.0与2.2.7有啥差别我也不太清楚呢,不好意思
7 楼 yjc2020 2014-08-19  
JimmyLincole 写道
1楼的问题,解决方法可以看一下这个:


The issue here is that your reader is singleton scoped. This means that when the job runs the first time, the ItemReader is opened successfully and the job runs. However when the job attempts to run a second time, it's using the same instance it did the first time which is already initialized (hence the exception). I'd recommend changing the readyReqPoolReader to be a step scoped bean and see if that helps.

You can read more about step scope here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/scope/StepScope.html


3.0与2.2.7 有什么差别,我升级jar包后配置文件出错
6 楼 JimmyLincole 2014-08-14  
1楼的问题,解决方法可以看一下这个:


The issue here is that your reader is singleton scoped. This means that when the job runs the first time, the ItemReader is opened successfully and the job runs. However when the job attempts to run a second time, it's using the same instance it did the first time which is already initialized (hence the exception). I'd recommend changing the readyReqPoolReader to be a step scoped bean and see if that helps.

You can read more about step scope here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/core/scope/StepScope.html
5 楼 yjc2020 2014-04-26  
我也遇到了1楼的错误 Reader must be open before it can be read.

先是成功执行了2次,然后开始出错,失败2次
4 楼 dotame 2014-03-25  
1楼,应该是你传参的文件路径出错了
3 楼 dotame 2014-03-25  
应该是你传参的文件路径出错了
2 楼 mbsky 2014-03-07  
[size=x-large][color=blue][/color][/size]
很nice 的文章
1 楼 wei_jiyong 2013-03-01  
你好,在么?我想问下,我这边总是提示
org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:137)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:93)
at org.springframework.batch.core.step.item.ChunkMonitor.open(ChunkMonitor.java:105)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:93)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:301)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:192)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
at net.etongbao.vasp.ac.quartz.QuartzLedgerJob.execute(QuartzLedgerJob.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:311)
at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:113)
at org.quartz.core.JobRunShell.run(JobRunShell.java:216)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:549)
Caused by: java.lang.IllegalStateException: Stream is already initialized.  Close before re-opening.
at org.springframework.util.Assert.state(Assert.java:384)
at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:397)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:134)
... 25 more
10:02:34,484  INFO org.springframework.scheduling.quartz.SchedulerFactoryBean#0_Worker-2 listener.AppJobExecutionListener:18 - Job failed: 1
10:02:34,484  INFO org.springframework.scheduling.quartz.SchedulerFactoryBean#0_Worker-2 support.SimpleJobLauncher:121 - Job: [FlowJob: [name=ledgerJob]] completed with the following parameters: [{date=1362103354109}] and the following status: [FAILED]
10:02:35,875 ERROR org.springframework.scheduling.quartz.SchedulerFactoryBean#0_Worker-1 listener.ItemFailureLoggerListener:12 - Encountered error on read
org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:437)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:85)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:90)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:108)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:103)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:68)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
at net.etongbao.vasp.ac.quartz.QuartzLedgerJob.execute(QuartzLedgerJob.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:311)
at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:113)
at org.quartz.core.JobRunShell.run(JobRunShell.java:216)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:549)
10:02:35,875 ERROR org.springframework.scheduling.quartz.SchedulerFactoryBean#0_Worker-1 step.AbstractStep:212 - Encountered an error executing the step
org.springframework.batch.core.step.skip.NonSkippableReadException: Non-skippable exception during read
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:104)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:108)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:103)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:68)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:281)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
at net.etongbao.vasp.ac.quartz.QuartzLedgerJob.execute(QuartzLedgerJob.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
at org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean$MethodInvokingJob.executeInternal(MethodInvokingJobDetailFactoryBean.java:311)
at org.springframework.scheduling.quartz.QuartzJobBean.execute(QuartzJobBean.java:113)
at org.quartz.core.JobRunShell.run(JobRunShell.java:216)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:549)
Caused by: org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:437)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:85)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:90)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
... 35 more

这个是怎么回事啊!你有遇到这样的问题么?

相关推荐

    spring-batch同步数据库mysql源码

    总结来说,Spring-Batch与MySQL的数据库同步涉及到数据库驱动的适配、数据读写接口的实现、同步策略的选择以及异常处理等多个环节。通过理解并掌握这些核心概念,开发者能够有效地利用Spring-Batch进行高效、可靠的...

    Spring Batch读取txt文件并写入数据库的方法教程

    在本教程中,我们将探讨如何使用 Spring Batch 读取文本(txt)文件,并将读取到的数据处理后写入数据库。 首先,我们需要创建一个 Maven 项目,并在 `pom.xml` 文件中添加必要的依赖。这些依赖包括 `spring-boot-...

    quartz整合springbatch动态集群定时实现mysql参考

    在这个“quartz_springbatch_dynamic”项目中,我们将看到如何将这两个强大的工具结合起来,以实现动态集群环境中的定时任务执行,并使用MySQL作为数据存储。 Quartz是一个开源的作业调度框架,允许开发者创建、...

    Spring Batch in Action英文pdf版

    Spring Batch提供了多种内置的ItemReader和ItemWriter实现,支持从数据库、文件等多种数据源读取数据,以及将数据写入到数据库、文件等目的地。 知识点五:数据处理 在数据被读取之后,通常需要经过一定的处理才能...

    spring-batch+quartz处理mysql数据示例

    在本示例中,Spring Batch 被用来从MySQL数据库读取数据,然后对数据进行处理,最后将处理结果写回到MySQL。 **Quartz** Quartz 是一个开源的作业调度框架,可以用来安排任务在特定的时间执行。Quartz 提供了丰富的...

    spring batch lib

    1. **读写操作**:Spring Batch 提供了多种ItemReader和ItemWriter实现,如JDBCItemReader用于读取数据库,FlatFileItemWriter用于将数据写入文本文件。这些抽象使得处理不同类型的数据源变得简单。 2. **事务管理*...

    Spring Batch In Action

    除了基本的数据读写,Spring Batch还提供了ItemProcessor接口,用于实现复杂的数据处理逻辑。此外,框架还支持分块处理,即一次只处理一部分数据,从而提高内存效率和处理速度。 #### 4. 错误恢复与重试 在数据处理...

    Spring batch 2.1.jar

    7. **自定义分割策略**:对于大数据量处理,Spring Batch 支持自定义分割策略,例如多线程并行处理数据块,以提高处理效率。 8. **集成性**:作为Spring框架的一部分,Spring Batch 可以无缝集成其他Spring组件,如...

    Spring Batch Extensions

    如JDBCItemReader和FlatFileItemWriter,但Spring Batch Extensions可能包含更多定制化的读写器和处理器,比如支持NoSQL数据库、XML文件或其他复杂格式的数据源。 2. **分割与并行处理**:Spring Batch支持基于任务...

    Spring-batch简介.pdf

    SpringBatch是SpringSource与Accenture合作开发的一个开源大数据量并行处理框架。它提供了丰富的参考经验,Accenture在工业级别的批处理架构上拥有丰富的经验,而SpringSource则基于深刻的Spring框架编程模型。...

    spring batch批处理 教程

    Spring Batch 是一个强大的批处理框架,它为Java开发者提供了处理大量数据的能力,广泛应用于企业级应用和大数据处理中。在本文中,我们将深入探讨Spring Batch的各个方面,包括其概念、结构、执行流程以及如何在...

    spring batch简介

    总的来说,Spring Batch 是一个强大且灵活的批量处理解决方案,它的设计考虑了企业级应用的复杂性和可扩展性,使得开发者可以高效地处理大数据量的业务场景,同时保持代码的简洁和可维护性。无论是简单的数据导入...

    Spring Batch 演示

    通过这个“Spring Batch 演示”,你可以学习如何创建、配置和运行一个简单的 Spring Batch 作业,以及如何处理数据读写、错误处理和事务管理。通过实际操作,你将更好地理解 Spring Batch 如何简化和优化批处理应用...

    Spring.Batch.in.Action.pdf

    - **ItemStreamReader**、**ItemStreamWriter**:流式读写器,适用于大文件处理,减少内存消耗。 - **生命周期管理**:介绍了Spring Batch如何管理Job和Step的生命周期,包括启动、暂停、恢复和终止等操作。 - **重...

    Spring batch

    通过这个简单的 Spring Batch 示例项目,你可以了解到如何设置一个基本的批处理流程,并从中学习到如何利用 Spring Batch 的强大功能来处理大规模数据。这个项目对于理解 Spring Batch 的核心概念和实践操作非常有...

    Spring_Batch系列文章汇总

    此外,Spring_Batch也支持使用游标方式读写数据库数据表,并且可以对作业执行过程中出现的错误进行跳过处理。 Spring_Batch框架主要包含三个层次结构:应用层、核心层和基础架构层。应用层负责所有批处理作业的实现...

    spring-batch-database-to-database:Spring Batch 示例,展示了如何在 Spring Batch 的 HSQL 内存中进行读写

    下面我们将详细探讨Spring Batch的关键特性、HSQL数据库以及如何在它们之间实现数据操作。 **Spring Batch关键特性** 1. **模块化设计**:Spring Batch 提供了一个清晰的模块化架构,包括作业(Job)、步骤(Step...

    SpringBatch成功案例(CSV、XML、自定义长度等方式)

    本案例集中展示了SpringBatch在处理CSV、XML以及自定义长度数据格式时的成功应用。 1. **CSV处理**: CSV(Comma Separated Values)文件是最常见的数据交换格式之一,适用于简单的数据存储。SpringBatch提供了`...

    Spring+Batch+Learning实例

    在实际应用中,Spring Batch 可以与Spring Framework的其他模块(如Spring JDBC、Spring Integration)无缝集成,实现更复杂的数据处理场景。例如,可以结合Spring JPA来读写数据库,或者使用Spring Integration与...

Global site tag (gtag.js) - Google Analytics