`
firecool
  • 浏览: 83825 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

Chapter 6. ItemReaders and ItemWriters

 
阅读更多

All batch processing can be described in its most simple form as reading in large amounts of data, performing some type of calculation or transformation, and writing the result out. Spring Batch provides three key interfaces to help perform bulk reading and writing: ItemReader,ItemProcessor and ItemWriter.

所有的批处理均可将其描述成最简单的形式,比如读取大量数据、进行某种类型的计算、转换、或者将结果写出。Spring Batch提供3个接口帮助进行批读取和批写出:ItemReader、ItemProcessor、ItemWriter。

6.1. ItemReader

Although a simple concept, an ItemReader is the means for providing data from many different types of input. The most general examples include:

  • Flat File- Flat File Item Readers read lines of data from a flat file that typically describe records with fields of data defined by fixed positions in the file or delimited by some special character (e.g. Comma).

  • XML - XML ItemReaders process XML independently of technologies used for parsing, mapping and validating objects. Input data allows for the validation of an XML file against an XSD schema.

  • Database - A database resource is accessed to return resultsets which can be mapped to objects for processing. The default SQL ItemReaders invoke a RowMapper to return objects, keep track of the current row if restart is required, store basic statistics, and provide some transaction enhancements that will be explained later.

There are many more possibilities, but we'll focus on the basic ones for this chapter. A complete list of all available ItemReaders can be found in Appendix A.

ItemReader is a basic interface for generic input operations:

 

ItemReader的概念虽然简单,但是提供了一种方式,以提供不同类型的输入数据。最典型的例子包括:

  • Flat File- Flat File Item Readers从平面文件中读取数据;一般情况下,这些数据用文件中固定位置定义的数据字段或特殊字符定界的数据字段(比如逗号)来描述记录。
  • XML - XML ItemReaders 处理XML语言且与对象解析、映射和确认技术无关。输入数据支持根据XSD模式对XML文件进行确认。
  • Database - 访问数据库资源后,可以返回结果集,且这些结果集可以映射到可以处理的对象。默认的SQL ItemReaders通过调用RowMapper可以返回对象,根据重启需要跟踪当前行,存储基本统计数据,同时对事务功能进行一些改进;这些改进将在下文讨论。

There are many more possibilities, but we'll focus on the basic ones for this chapter. A complete list of all available ItemReaders can be found in Appendix A.

虽然内容涉及许多方面,但是我们将在本章中重点讨论基础部分。ItemReaders全面内容请见附录A。

ItemReader is a basic interface for generic input operations:

ItemReaders是通用输入操作的基本接口:

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException;

}

 The read method defines the most essential contract of the ItemReader; calling it returns one Item or null if no more items are left. An item might represent a line in a file, a row in a database, or an element in an XML file. It is generally expected that these will be mapped to a usable domain object (i.e. Trade, Foo, etc) but there is no requirement in the contract to do so.

读取方法定义了ItemReader最核心的原则(contract);如果无更多项目留下,则调用它之后可以返回一个项目或null。一个项目可以表示文件中的一行(line),数据库中的一列,或者XML文件中的一个元素。人们普遍希望这些项目可以映射到一个可用的域对象(比如Trade, Foo,等等),但是原则对此未做要求。

It is expected that implementations of the ItemReader interface will be forward only. However, if the underlying resource is transactional (such as a JMS queue) then calling read may return the same logical item on subsequent calls in a rollback scenario. It is also worth noting that a lack of items to process by an ItemReader will not cause an exception to be thrown. For example, a database ItemReader that is configured with a query that returns 0 results will simply return null on the first invocation of read.

人们还希望仅支持前向ItemReader接口部署。然而,如果底层资源是事务性资源(比如JMS队列),则调用读取(接口)可能会返回与回转(rollback)情况后续调用时相同的逻辑项目。请注意,ItemReader即使缺少待处理项目也不会抛出异常。例如,数据库ItemReader如果用返回0个结果的查询进行配置,则在首次调用读取(接口)时只会返回空。

 

6.2. ItemWriter

ItemWriter is similar in functionality to an ItemReader, but with inverse operations. Resources still need to be located, opened and closed but they differ in that an ItemWriter writes out, rather than reading in. In the case of databases or queues these may be inserts, updates, or sends. The format of the serialization of the output is specific to each batch job.

As with ItemReaderItemWriter is a fairly generic interface:

ItemWriter的功能与ItemReader类似,但是支持逆操作。虽然还需对资源进行定位、开启和关闭操作,但是区别在于ItemWriter是写出而不是读入。对数据库或队列,它们可能是插入、更新或发送。

每个批作业的输出串行化格式都不同。

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

 As with read on ItemReaderwrite provides the basic contract of ItemWriter; it will attempt to write out the list of items passed in as long as it is open. Because it is generally expected that items will be 'batched' together into a chunk and then output, the interface accepts a list of items, rather than an item by itself. After writing out the list, any flushing that may be necessary can be performed before returning from the write method. For example, if writing to a Hibernate DAO, multiple calls to write can be made, one for each item. The writer can then call close on the hibernate Session before returning.

与ItemReader读取类似,写入(接口)也提供基本的ItemWriter原则;它将试图写出与开启时长度相同的项目列表。因为我们普遍希望项目被“批量”装进箱中然后输出,所以接口可以接收一列项目而不是一个项目。写出列表后,任何必须的清洗操作必须在从写入方法返回之前执行。例如,在写入Hibernate DAO前,可以调用多次写入操作,每个项目一次。然后,写入器在返回前调用关闭Hibernate会话。

6.3. ItemProcessor

The ItemReader and ItemWriter interfaces are both very useful for their specific tasks, but what if you want to insert business logic before writing? One option for both reading and writing is to use the composite pattern: create an ItemWriter that contains another ItemWriter, or an ItemReaderthat contains another ItemReader. For example:

ItemReader和ItemWriter接口对其各自任务都很有帮助。但是如果你想在写入前插入业务逻辑,该如何处理?一种读取和写出选择就是使用混合模式:创建一个包括另一个ItemWriter的ItemWriter,或者创建一个包括另一个ItemReader的ItemReader。例如:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(item);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

 The class above contains another ItemWriter to which it delegates after having provided some business logic. This pattern could easily be used for an ItemReader as well, perhaps to obtain more reference data based upon the input that was provided by the main ItemReader. It is also useful if you need to control the call to write yourself. However, if you only want to 'transform' the item passed in for writing before it is actually written, there isn't much need to call write yourself: you just want to modify the item. For this scenario, Spring Batch provides the ItemProcessor interface:

以上类包括了另一个ItemWriter,它在提供了部分业务逻辑后委派该ItemWriter。这种模式也可以用于ItemReader,以根据主ItemReader提供的输入来获得更多的参考数据。然而,如果你只希望在传过来的写入项目被切实写入前对其进行“变换”,则不必亲自调用write操作:你只是想修改项目。此时,Spring Batch提供了ItemProcessorinterface接口。

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

 An ItemProcessor is very simple; given one object, transform it and return another. The provided object may or may not be of the same type. The point is that business logic may be applied within process, and is completely up to the developer to create. An ItemProcessor can be wired directly into a step, For example, assuming an ItemReader provides a class of type Foo, and it needs to be converted to type Bar before being written out. An ItemProcessor can be written that performs the conversion:

ItemProcessor非常简单;给定一个对象后,对其转换并返回另一个对象。提供的对象可以是相同类型,也可以是不同类型。关键是,业务逻辑可能应用在进程中,且完全由开发人员创建。例如,假设ItemReader 提供Foo 类,则ItemProcessor可以直接接线(wired)至Step,在写出前必须转换为Bar类型。执行转换操作的ItemProcessor可以被写入(written):

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar>{
    public void write(List<? extends Bar> bars) throws Exception {
        //write bars
    }
}

 In the very simple example above, there is a class Foo, a class Bar, and a class FooProcessor that adheres to the ItemProcessor interface. The transformation is simple, but any type of transformation could be done here. The BarWriter will be used to write out Bar objects, throwing an exception if any other type is provided. Similarly, the FooProcessor will throw an exception if anything but a Foo is provided. The FooProcessor can then be injected into a Step:

在上面简单的例子中,类Foo、Bar和类FooProcessor依附于ItemProcessor接口。转换过程很简单,这里可以执行任何类型的转换。将使用BarWriter来写出Bar对象,如果提供的是其他类型,则会抛出一个异常。类似地,如果提供的是类Foo,则FooProcessor也会抛出异常。然后,FooProcessor就会被插入Step:

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter" 
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

 

6.3.1. Chaining ItemProcessors

Performing a single transformation is useful in many scenarios, but what if you want to 'chain' together multiple ItemProcessors? This can be accomplished using the composite pattern mentioned previously. To update the previous, single transformation, example, Foo will be transformed to Bar, which will be transformed to Foobar and written out:

在许多情况下执行一次变换就非常有效,但是如果你想要把多个ItemProcessors“拴”在一起呢?可以使用上文提到的混合模式来实现这一点。为了对前面简单的转换例子进行更新,我们将把Foo转换为Bar,再把Bar转换为Foobar并写出:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar{
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar,FooBar>{
    public FooBar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<FooBar>{
    public void write(List<? extends FooBar> items) throws Exception {
        //write items
    }
}

 A FooProcessor and BarProcessor can be 'chained' together to give the resultant Foobar:

FooProcessor和BarProcessor将被“拴”在一起,生成Foobar:

 

CompositeItemProcessor<Foo,Foobar> compositeProcessor = 
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);
 Just as with the previous example, the composite processor can be configured into the Step:

 

与前面例子类似,可以把混合处理器配置成Step:

 

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeProcessor" writer="foobarWriter" 
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor" 
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

 

6.3.2. Filtering Records

One typical use for an item processor is to filter out records before they are passed to the ItemWriter. Filtering is an action distinct from skipping; skipping indicates that a record is invalid whereas filtering simply indicates that a record should not be written.

For example, consider a batch job that reads a file containing three different types of records: records to insert, records to update, and records to delete. If record deletion is not supported by the system, then we would not want to send any "delete" records to the ItemWriter. But, since these records are not actually bad records, we would want to filter them out, rather than skip. As a result, the ItemWriter would receive only "insert" and "update" records.

To filter a record, one simply returns "null" from the ItemProcessor. The framework will detect that the result is "null" and avoid adding that item to the list of records delivered to the ItemWriter. As usual, an exception thrown from the ItemProcessor will result in a skip.

项目处理器的一个典型用途就是在记录被传送给ItemWriter前过滤记录。过滤操作与跳转操作完全不同;跳转操作表明记录是非法的,而过滤操作仅表明记录不该被写入。

例如,考虑一个批作业将要读取一份包括三种类型记录的文件:插入式记录,更新式记录及删除式记录。如果系统不支持记录删除,则我们不会向ItemWriter发送任何“删除”记录。但是,由于这些记录实际上并不是不良记录,所以我们可以过滤这些记录,而不是跳转。于是,ItemWriter将只接收”插入“和”更新“记录。

在过滤记录时,ItemProcessor只返回”null”。框架将会检测出结果是”null”,然后避免将该项目添加到将会传递给ItemWriter的记录列表中。与往常一样,ItemProcessor抛出的异常将会导致跳转。

6.4. ItemStream

Both ItemReaders and ItemWriters serve their individual purposes well, but there is a common concern among both of them that necessitates another interface. In general, as part of the scope of a batch job, readers and writers need to be opened, closed, and require a mechanism for persisting state:

ItemReaders和ItemWriters有各自的作用,但是二者有一个共同的问题:需要涉及另一个接口。总体来说,作为批作业作用域(scope)的一部分,读取器和写入器需要被开启、关闭,同时还需要一种机制来维持状态:

 

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;
  
    void close() throws ItemStreamException;
}
 Before describing each method, we should mention the ExecutionContext. Clients of an ItemReader that also implement ItemStream should call openbefore any calls to read in order to open any resources such as files or to obtain connections. A similar restriction applies to an ItemWriter that implements ItemStream. As mentioned in Chapter 2, if expected data is found in the ExecutionContext, it may be used to start the ItemReader orItemWriter at a location other than its initial state. Conversely, close will be called to ensure that any resources allocated during open will be released safely. update is called primarily to ensure that any state currently being held is loaded into the provided ExecutionContext. This method will be called before committing, to ensure that the current state is persisted in the database before commit.

 

在描述每个方法前,我们需要提及一下ExecutionContext。部署ItemStream的ItemReader客户端(client)在调用read前应该调用open,以便开启文件等各种资源或建立连接。部署ItemStream的ItemWriter也有类似的限制。根据第2章内容,如果在ExecutionContext中有预期数据,则可以用它在其初始状态之外的其他地方开启ItemReader或ItemWriter。相反,close将被调用以确保在open期间分配的所有资源将被安全释放。Updata被调用的主要目的是为了确保当前正被保持的各

In the special case where the client of an ItemStream is a Step (from the Spring Batch Core), an ExecutionContext is created for each StepExecution to allow users to store the state of a particular execution, with the expectation that it will be returned if the same JobInstance is started again. For those familiar with Quartz, the semantics are very similar to a Quartz JobDataMap.

如果在特殊情况下,ItemStream的客户端(client)是一个Step(来自Spring Batch Core),则将为每个StepExecution创建一个ExecutionContext,以允许用户存储某次运行的状态,但有一种例外,就是如果再次开启相同的JobInstance则它将被返回。如果你很熟悉Quartz,则相关语法与Quartz的JobDataMap非常类似。

6.5. The Delegate Pattern and Registering with the Step

Note that the CompositeItemWriter is an example of the delegation pattern, which is common in Spring Batch. The delegates themselves might implement callback interfaces like ItemStream or StepListener. If they do, and they are being used in conjunction with Spring Batch Core as part of a Step in a Job, then they almost certainly need to be registered manually with the Step. A reader, writer, or processor that is directly wired into the Step will be registered automatically if it implements ItemStream or a StepListener interface. But because the delegates are not known to theStep, they need to be injected as listeners or streams (or both if appropriate):

请注意,CompositeItemWriter是委托模式的一个例子,在Spring Batch中很常见。受委方本身可能会部署ItemStream或StepListener等回调接口。如果确实回调,并且在一项作业的Step中与Spring Batch Core共同使用,则它们基本上必须要用Step人工注册。如果它部署了ItemStream或StepListener接口,则直接连接(wired)至Step的读取器、写入器或处理器将被自动注册。但是受委人(delegate)对Step是未知的,它们需要作为接听器(listener)或流(stream)(或者合适情况下同时作为接听器和流)被注入:

 

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter" 
                   commit-interval="2">
                    <streams>
                    <stream ref="barWriter" />
                </streams>
            </chunk>
        </tasklet>
    </step>
</job>

<bean id="compositeItemWriter" class="...CompositeItemWriter">
    <property name="delegate" ref="barWriter" />
</bean>

<bean id="barWriter" class="...BarWriter" />

 

6.6. Flat Files

One of the most common mechanisms for interchanging bulk data has always been the flat file. Unlike XML, which has an agreed upon standard for defining how it is structured (XSD), anyone reading a flat file must understand ahead of time exactly how the file is structured. In general, all flat files fall into two types: Delimited and Fixed Length. Delimited files are those in which fields are separated by a delimiter, such as a comma. Fixed Length files have fields that are a set length.

交换批量数据最常见的机制之一就是平面文件。与XML不同,XML有一个公认的架构定义(XSD),但是要读取一份平面文件则必须要事先知道该文件是如何架构的。总体来说,各种平面文件分为两类:Delimited定界型和Fixed Length固定长度型。Delimited型文件是指字段被定界符(比如逗号)隔开;Fixed Length型文件的字段的长度是固定的。

6.6.1. The FieldSet

When working with flat files in Spring Batch, regardless of whether it is for input or output, one of the most important classes is the FieldSet. Many architectures and libraries contain abstractions for helping you read in from a file, but they usually return a String or an array of Strings. This really only gets you halfway there. A FieldSet is Spring Batch’s abstraction for enabling the binding of fields from a file resource. It allows developers to work with file input in much the same way as they would work with database input. A FieldSet is conceptually very similar to a JdbcResultSet. FieldSets only require one argument, a String array of tokens. Optionally, you can also configure in the names of the fields so that the fields may be accessed either by index or name as patterned after ResultSet:

如果使用Spring Batch中的平面文件,不论其用于输入还是输出,最重要的一个类就是FieldSet。许多架构和库均包括一些抽象技术帮助你从文件中读取数据,但是它们往往返回一个String或一组String。但这对你来说远远不够。FieldSet是Spring Batch的抽象类,用于支持文件源字段绑定。有了它之后,开发人员可以用处理数据库输入的方式来处理文件输入。FieldSet的概念与Jdbc的ResultSet非常类似。FieldSets只需要一个参数,一组String标记(token)。你也可以用字段名义进行配置,然后可以模仿ResultSet通过索引、名称来访问字段:

 

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);
 There are many more options on the FieldSet interface, such as Date, long, BigDecimal, etc. The biggest advantage of the FieldSet is that it provides consistent parsing of flat file input. Rather than each batch job parsing differently in potentially unexpected ways, it can be consistent, both when handling errors caused by a format exception, or when doing simple data conversions.

 

FieldSet接口还有许多其他选项,比如Date、long、BigDecimal等。FieldSet的最大特征是,它可以进行一致的平面文件输入解析。也就是说,它不是用难以预料的方式对每个批作业做出不同的解析,它的解析具有一致性,无论是处理格式异常导致的错误,还是进行简单的数据转换,均是如此。

6.6.2. FlatFileItemReader

A flat file is any type of file that contains at most two-dimensional (tabular) data. Reading flat files in the Spring Batch framework is facilitated by the class FlatFileItemReader, which provides basic functionality for reading and parsing flat files. The two most important required dependencies of FlatFileItemReader are Resource and LineMapper. The LineMapper interface will be explored more in the next sections. The resource property represents a Spring Core Resource. Documentation explaining how to create beans of this type can be found in Spring Framework, Chapter 5.Resources. Therefore, this guide will not go into the details of creating Resource objects. However, a simple example of a file system resource can be found below:

平面文件是指包括最多二维(表格)数据的各种文件。利用FlatFileItemReader 类可以方便地在Spring Batch框架下读取平面文件,该类提供了基本的平面文件读取和解析功能。FlatFileItemReader需要的两个最重的附属类是Resource和LineMapper。LineMapper接口将在下文中详细讨论。资源属性表示一种Spring Core Resource(资源)。《Spring Framework,第5章:Resource(资源)》给出了这种类型的bean。因此,本手册不会详细讨论如何创建Resource对象。但仍在下面给出一个简单的文件系统资源示例:

 

Resource resource = new FileSystemResource("resources/trades.csv");
 In complex batch environments the directory structures are often managed by the EAI infrastructure where drop zones for external interfaces are established for moving files from ftp locations to batch processing locations and vice versa. File moving utilities are beyond the scope of the spring batch architecture but it is not unusual for batch job streams to include file moving utilities as steps in the job stream. It is sufficient that the batch architecture only needs to know how to locate the files to be processed. Spring Batch begins the process of feeding the data into the pipe from this starting point. However, Spring Integration provides many of these types of services.

 

The other properties in FlatFileItemReader allow you to further specify how your data will be interpreted:

在复杂的批应用环境下,往往通过EAI基础设施来管理目录结构,在EAI基础设施中建立了外部结点投入区域(drop zones)以将文件从ftp位置移至批处理位置,反之亦然。文件移动功能(程序)超出了spring batch架构的范围,但是批作业流经常将文件移动程序作为step而包含在其作业流中。批架构只需知道如何定位将被处理的文件即可。Spring Batch以此为起点,开始将数据传递给管道(pipe)。然而,Spring Integration提供了多种这种类型的服务。

利用FlatFileItemReader的其他属性,你可以进一步指定如何解释你的数据:


Table 6.1. FlatFileItemReader Properties

Property Type Description
comments String[] Specifies line prefixes that indicate comment rows
encoding String Specifies what text encoding to use - default is "ISO-8859-1"
lineMapper LineMapper Converts a String to an Object representing the item.
linesToSkip int Number of lines to ignore at the top of the file
recordSeparatorPolicy RecordSeparatorPolicy Used to determine where the line endings are and do things like continue over a line ending if inside a quoted string.
resource Resource The resource from which to read.
skippedLinesCallback LineCallbackHandler Interface which passes the raw line content of the lines in the file to be skipped. If linesToSkip is set to 2, then this interface will be called twice.
strict boolean In strict mode, the reader will throw an exception on ExecutionContext if the input resource does not exist.

 

 

 

6.6.2.1. LineMapper

As with RowMapper, which takes a low level construct such as ResultSet and returns an Object, flat file processing requires the same construct to convert a String line into an Object:

与只需要一个低级别的结构(construct)(比如ResultSet )并返回一个对象的RowMapper类似,处理平面文件需要同样的结构(construct)将一个String line转换为一个对象:

 

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}
 The basic contract is that, given the current line and the line number with which it is associated, the mapper should return a resulting domain object. This is similar to RowMapper in that each line is associated with its line number, just as each row in a ResultSet is tied to its row number. This allows the line number to be tied to the resulting domain object for identity comparison or for more informative logging. However, unlikeRowMapper, the LineMapper is given a raw line which, as discussed above, only gets you halfway there. The line must be tokenized into a FieldSet, which can then be mapped to an object, as described below.

 

基本原则是,已知当前line及与之关联的line号,映像程序应该返回生成的域对象。这与RowMapper类似;在RowMapper中,每条line关联一个line号,就像ResultSet中的每一行关联一个行号一样。这样一来,line号就会关联生成的域对象,便于身份鉴定及信息量更大的日志处理。然而,与RowMapper不同,根据上文讨论内容,LineMapper只被给了一条原生line, 这对你来说远远不够。Line必须要被标记化(tokenize)为一个FieldSet,FieldSet然后被映射到一个对象,见下文。

6.6.2.2. LineTokenizer

An abstraction for turning a line of input into a line into a FieldSet is necessary because there can be many formats of flat file data that need to be converted to a FieldSet. In Spring Batch, this interface is the LineTokenizer:

必须要有一个抽象接口,将输入line转换化为FieldSet line,因为可能会有多种格式的平面文件数据需要被转化为FieldSet。在Spring Batch中,该接口为LineTokenizer:

 

public interface LineTokenizer {
  
    FieldSet tokenize(String line);

}
The contract of a LineTokenizer is such that, given a line of input (in theory the String could encompass more than one line), a FieldSetrepresenting the line will be returned. This FieldSet can then be passed to a FieldSetMapper. Spring Batch contains the following LineTokenizerimplementations:

 

  • DelmitedLineTokenizer - Used for files where fields in a record are separated by a delimiter. The most common delimiter is a comma, but pipes or semicolons are often used as well.
  • FixedLengthTokenizer - Used for files where fields in a record are each a 'fixed width'. The width of each field must be defined for each record type.
  • PatternMatchingCompositeLineTokenizer - Determines which among a list of LineTokenizers should be used on a particular line by checking against a pattern.
LineTokenizer的原则是,已知一个输入line(理论来说,String可能包括多个line),返回一个表示line的FieldSet。该FieldSet然后被传递给FieldSetMapper。在Spring Batch中, LineTokenizer部署内容如下:
  • DelmitedLineTokenizer—用于记录中的字段被定界符隔开的文件。最常见的定界符是逗号,管道符号(pipe)和分号也可以。
  • FixedLengthTokenizer—用于记录中的字段均为固定长度的文件。每种记录类型必须明确各字段的宽度。
  • PatternMatchingCompositeLineTokenizer— 通过检查相关模式,从LineTokenizers列表中确定一项用到具体的line上。

6.6.2.3. FieldSetMapper

The FieldSetMapper interface defines a single method, mapFieldSet, which takes a FieldSet object and maps its contents to an object. This object may be a custom DTO, a domain object, or a simple array, depending on the needs of the job. The FieldSetMapper is used in conjunction with theLineTokenizer to translate a line of data from a resource into an object of the desired type:

FieldSetMapper接口只定义了一个方法mapFieldSet,该方法以FieldSet为输入,将其内容映射到一个对象上(映射为一个对象)。该对象可能是一个定制的DTO,域对象,或者是一个简单的数组,具体取决于作业的需要。FieldSetMapper可以与LineTokenizer结合使用,将资源的一行数据转化为所需类型的一个对象:

public interface FieldSetMapper<T> {
  
    T mapFieldSet(FieldSet fieldSet);

}

 The pattern used is the same as the RowMapper used by JdbcTemplate.

使用的模式与JdbcTemplate使用的RowMapper相同。

6.6.2.4. DefaultLineMapper

Now that the basic interfaces for reading in flat files have been defined, it becomes clear that three basic steps are required:

  • Read one line from the file.
  • Pass the string line into the LineTokenizer#tokenize() method, in order to retrieve a FieldSet.
  • Pass the FieldSet returned from tokenizing to a FieldSetMapper, returning the result from the ItemReader#read() method.

The two interfaces described above represent two separate tasks: converting a line into a FieldSet, and mapping a FieldSet to a domain object. Because the input of a LineTokenizer matches the input of the LineMapper (a line), and the output of a FieldSetMapper matches the output of theLineMapper, a default implementation that uses both a LineTokenizer and FieldSetMapper is provided. The DefaultLineMapper represents the behavior most users will need:

现在,已经定义了读取平面文件所需要的基本接口,很明显还需要3个基本步骤:

  • 从文件中读取一行数据
  • 将string line传递给LineTokenizer#tokenize()方法,以检索FieldSet
  • 将标记化过程返回的FieldSet传递给FieldSetMapper,返回ItemReader#read() method生成的结果
上面描述的两个接口表示两个不同的任务:将line转化为一个FieldSet,将一个FieldSet映射为(到)一个域对象。因为LineTokenizer的输入与LineMapper (一个line)的输入相匹配,且FieldSetMapper的输出与LineMapper的输出相匹配,所以提供的默认部署既使用了LineTokenizer又使用了FieldSetMapper。DefaultLineMapper表示大多数用户都会需要的行为:
public class DefaultLineMapper<T> implements LineMapper<T>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) { 
        this.fieldSetMapper = fieldSetMapper;
    }
}

 The above functionality is provided in a default implementation, rather than being built into the reader itself (as was done in previous versions of the framework) in order to allow users greater flexibility in controlling the parsing process, especially if access to the raw line is needed.

上述功能在默认部署中提供,而不是集成在读取器内部(像框架以前版本一样),目的是在解析过程控制方面,尤其是需要访问原生line时,给用户更大的灵活性。

6.6.2.5. Simple Delimited File Reading Example

The following example will be used to illustrate this using an actual domain scenario. This particular batch job reads in football players from the following file:

将使用下面的例子,结合一个实际的域场景对其进行阐述。该批作业从以下文件读取足球运动员数据:

ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003" 

 The contents of this file will be mapped to the following Player domain object:

该文件的内容将映射到如下player域对象中:

public class Player implements Serializable {
        
    private String ID; 
    private String lastName; 
    private String firstName; 
    private String position; 
    private int birthYear; 
    private int debutYear;
        
    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName + 
            ",First Name=" + firstName + ",Position=" + position + 
            ",Birth Year=" + birthYear + ",DebutYear=" + 
            debutYear;
    }
   
    // setters and getters...
}

 In order to map a FieldSet into a Player object, a FieldSetMapper that returns players needs to be defined:

为了将FieldSet映射进player对象,需要定义返回球员的FieldSetMapper:

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2)); 
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}   

 The file can then be read by correctly constructing a FlatFileItemReader and calling read:

然后,通过正确建立FlatFileItemReader并调用read,便可以读取文件:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
LineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

 Each call to read will return a new Player object from each line in the file. When the end of the file is reached, null will be returned.

每次调用read都会从文件的每一行(line)中返回一个新的player对象。当到达文件末尾时,将会返回null。

6.6.2.6. Mapping Fields by Name

There is one additional piece of functionality that is allowed by both DelimitedLineTokenizer and FixedLengthTokenizer that is similar in function to a Jdbc ResultSet. The names of the fields can be injected into either of these LineTokenizer implementations to increase the readability of the mapping function. First, the column names of all fields in the flat file are injected into the tokenizer:

DelimitedLineTokenizert和FixedLengthTokenizer均支持另一项与Jdbc ResultSet比较类似的功能。字段名称可以插入LineTokenizer某一种部署中,以增加映射函数的可读性。首先,平面文件所有字段列名称被插入tokenizer(标记器)。

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"}); 

 A FieldSetMapper can use this information as follows:

FieldSetMapper可以按如下方式利用该信息:

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {
                        
       if(fs == null){
           return null;
       }
                        
       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));
                       
       return player;
   }
}

 

6.6.2.7. Automapping FieldSets to Domain Objects

For many, having to write a specific FieldSetMapper is equally as cumbersome as writing a specific RowMapper for a JdbcTemplate. Spring Batch makes this easier by providing a FieldSetMapper that automatically maps fields by matching a field name with a setter on the object using the JavaBean specification. Again using the football example, the BeanWrapperFieldSetMapper configuration looks like the following:

对许多用户来说,编写一个具体的FieldSetMapper,与为JdbcTemplate编写一个具体的RowMapper一样麻烦。Spring Batch通过提供FieldSetMapper使其简化,FieldSetMapper可以使用JavaBean规格,通过对字段名称与对象调节器(setter)进行匹配来实现字段的自动映射。再次使用足球示例,BeanWrapperFieldSetMapper配置应该如下:

<bean id="fieldSetMapper"
      class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
    <property name="prototypeBeanName" value="player" />
</bean>

<bean id="player"
      class="org.springframework.batch.sample.domain.Player"
      scope="prototype" />

 For each entry in the FieldSet, the mapper will look for a corresponding setter on a new instance of the Player object (for this reason, prototype scope is required) in the same way the Spring container will look for setters matching a property name. Each available field in the FieldSet will be mapped, and the resultant Player object will be returned, with no code required.

对FieldSet中的每个条目,映射器将用与Spring容器寻找属性名称匹配调节器相同的方式,寻找与player对象新的实例相对应的调节器(为此,需要原型scope)。FieldSet中可获得的字段均将被映射,并返回生成的player对象而不需要其他代码。

6.6.2.8. Fixed Length File Formats

So far only delimited files have been discussed in much detail, however, they represent only half of the file reading picture. Many organizations that use flat files use fixed length formats. An example fixed length file is below:

迄今只详细讨论了分隔符文件;然而,它们只是文件读取所有情况中的一部分情况。许多使用文本文件的机构使用固定长度格式。固定长度文件的一个示例如下:

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

 While this looks like one large field, it actually represent 4 distinct fields:

  • ISIN: Unique identifier for the item being order - 12 characters long.
  • Quantity: Number of this item being ordered - 3 characters long.
  • Price: Price of the item - 5 characters long.
  • Customer: Id of the customer ordering the item - 9 characters long.

When configuring the FixedLengthLineTokenizer, each of these lengths must be provided in the form of ranges:

这个字段看上去比较大,它实际上表示了4个不同的字段:

ISIN: 被预定的项目的(the item being order)唯一标识符—12字符长

Quantity:被预定的项目的(this item being ordered)数量—3字符长

Price:项目价格—5字符长

Customer:预定该项目的客户ID—9字符长

当配置FixedLengthLineTokenizer时,均需按照范围形式提供这些长度:

<bean id="fixedLengthLineTokenizer"
      class="org.springframework.batch.io.file.transform.FixedLengthTokenizer">
    <property name="names" value="ISIN,Quantity,Price,Customer" />
    <property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>

 Because the FixedLengthLineTokenizer uses the same LineTokenizer interface as discussed above, it will return the same FieldSet as if a delimiter had been used. This allows the same approaches to be used in handling its output, such as using the BeanWrapperFieldSetMapper.

如上文所述,因为FixedLengthLineTokenizer使用相同的LineTokenizer接口,所以它将返回相同的FieldSet,就好像已经使用了一个分隔符一样。于是,在处理其输出时可以使用相同的方法,比如使用BeanWrapperFieldSetMapper。

Note

Supporting the above syntax for ranges requires that a specialized property editor, RangeArrayPropertyEditor, be configured in theApplicationContext. However, this bean is automatically declared in an ApplicationContext where the batch namespace is used.

如果对范围(ranges)支持上述语法,则需要在ApplicationContext对专门的属性编辑器RangeArrayPropertyEditor进行配置。然而,该bean已经在使用批命名空间的ApplicationContext中自动声明。

6.6.2.9. Multiple Record Types within a Single File

All of the file reading examples up to this point have all made a key assumption for simplicity's sake: all of the records in a file have the same format. However, this may not always be the case. It is very common that a file might have records with different formats that need to be tokenized differently and mapped to different objects. The following excerpt from a file illustrates this:

迄今讨论的所有文件读取示例均为了简便而做了一个非常关键的假设:文件中所有记录的格式相同。然而,实际情况有时并非如此。许多情况下,一份文件中的许多记录的格式并不相同,需要不同的标记化处理和映射到不同的对象上去。比如说下面给出的一份文件的一个片断:

USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

 In this file we have three types of records, "USER", "LINEA", and "LINEB". A "USER" line corresponds to a User object. "LINEA" and "LINEB" both correspond to Line objects, though a "LINEA" has more information than a "LINEB".

在该文件中,我们有三种类型的记录:"USER", "LINEA", 和 "LINEB"。一个"USER" line对应于一个User对象。"LINEA" 和 "LINEB"均对应于Line对象,虽然一个"LINEA"拥有的信息量要大于"LINEB"。

The ItemReader will read each line individually, but we must specify different LineTokenizer and FieldSetMapper objects so that the ItemWriter will receive the correct items. The PatternMatchingCompositeLineMapper makes this easy by allowing maps of patterns to LineTokenizers and patterns toFieldSetMappers to be configured:

ItemReader将分别读取每个line,但是我们必须指明不同的LineTokenizer和FieldSetMapper对象以便ItemWriter可以接收合适的项目。PatternMatchingCompositeLineMapper通过支持LineTokenizer模式(pattern)映射和FieldSetMapper模式(pattern)映射进行配置,来简化上述过程:

<bean id="orderFileLineMapper"
      class="org.spr...PatternMatchingCompositeLineMapper">
    <property name="tokenizers">
        <map>
            <entry key="USER*" value-ref="userTokenizer" />
            <entry key="LINEA*" value-ref="lineATokenizer" />
            <entry key="LINEB*" value-ref="lineBTokenizer" />
        </map>
    </property>
    <property name="fieldSetMappers">
        <map>
            <entry key="USER*" value-ref="userFieldSetMapper" />
            <entry key="LINE*" value-ref="lineFieldSetMapper" />
        </map>
    </property>
</bean>

 In this example, "LINEA" and "LINEB" have separate LineTokenizers but they both use the same FieldSetMapper.

在该例子中,"LINEA"和"LINEB"有不同的LineTokenizer,但是有相同的FieldSetMapper。

The PatternMatchingCompositeLineMapper makes use of the PatternMatcher's match method in order to select the correct delegate for each line. ThePatternMatcher allows for two wildcard characters with special meaning: the question mark ("?") will match exactly one character, while the asterisk ("*") will match zero or more characters. Note that in the configuration above, all patterns end with an asterisk, making them effectively prefixes to lines. The PatternMatcher will always match the most specific pattern possible, regardless of the order in the configuration. So if "LINE*" and "LINEA*" were both listed as patterns, "LINEA" would match pattern "LINEA*", while "LINEB" would match pattern "LINE*". Additionally, a single asterisk ("*") can serve as a default by matching any line not matched by any other pattern.

PatternMatchingCompositeLineMapper使用PatternMatcher的match方法以便为第一个line选择合适的代表(delegate)。PatternMatcher支持两个具有特殊含义的通配符:问号("?")互相之间准确匹配,星号("*")可以匹配0个或多个字符。请注意,在上述配置中,所有模式(pattern)均以星号结尾,有效地将它们作为前缀加在line前面。PatternMatcher总是匹配最为具体的模式,与配置次序无关。因此,如果"LINE*"和"LINEA*"均被列为模式,则"LINEA"将匹配模式"LINEA*",而"LINEB"将匹配"LINE*"。此外,单个星号("*")通过匹配未与任何其他模式相匹配的各个line而作为系统默认设置。

<entry key="*" value-ref="defaultLineTokenizer" />

 There is also a PatternMatchingCompositeLineTokenizer that can be used for tokenization alone.

It is also common for a flat file to contain records that each span multiple lines. To handle this situation, a more complex strategy is required. A demonstration of this common pattern can be found in Section 11.5, “Multi-Line Records”.

还有PatternMatchingCompositeLineTokenizer接口专门用于标记化(tokenization)。

许多情况下,平面文件包含的多个记录中,每个记录覆盖多个line。对此,需要更为复杂的处理策略。这种常见情况的论证请见第11.5节“Multi-Line Records” (“多行记录”)。

6.6.2.10. Exception Handling in Flat Files

There are many scenarios when tokenizing a line may cause exceptions to be thrown. Many flat files are imperfect and contain records that aren't formatted correctly. Many users choose to skip these erroneous lines, logging out the issue, original line, and line number. These logs can later be inspected manually or by another batch job. For this reason, Spring Batch provides a hierarchy of exceptions for handling parse exceptions:FlatFileParseException and FlatFileFormatExceptionFlatFileParseException is thrown by the FlatFileItemReader when any errors are encountered while trying to read a file. FlatFileFormatException is thrown by implementations of the LineTokenizer interface, and indicates a more specific error encountered while tokenizing.

许多情况下,标记一个line可能会导致抛出一个异常。许多平面文件存在瑕疵,包括的记录的格式不当。许多用户选择跳转这些错误line,退出问题(issue)、初始line和line号。这些日志可供日后人工检查或其他批作业检查。为此,Spring Batch提供了分层次异常以处理解析异常:FlatFileParseException和FlatFileFormatException。只要在读取文件过程中遇到了错误,便由FlatFileItemReader发出FlatFileParseException异常。FlatFileFormatException异常由LineTokenizer接口部署抛出,表明在标记化期间遇到了一个更具体的错误。

 

6.6.2.10.1. IncorrectTokenCountException

Both DelimitedLineTokenizer and FixedLengthLineTokenizer have the ability to specify column names that can be used for creating a FieldSet. However, if the number of column names doesn't match the number of columns found while tokenizing a line the FieldSet can't be created, and aIncorrectTokenCountException is thrown, which contains the number of tokens encountered, and the number expected:

DelimitedLineTokenizer和FixedLengthLineTokenizer可以指定创建FieldSet时用到的列名称。然而,如果列名称数量与标记一个line时遇到的列数量不匹配,则FieldSet无法被创建,IncorrectTokenCountException异常被抛出,该异常包括遇到的标记(token)数量及预期数量:

tokenizer.setNames(new String[] {"A", "B", "C", "D"});
  
try{
    tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

 Because the tokenizer was configured with 4 column names, but only 3 tokens were found in the file, an IncorrectTokenCountException was thrown.

因为标记器(tokenizer)用4个列名称配置,但是在文件中只找到了3个标记,所以抛出一个IncorrectTokenCountException异常。

 

6.6.2.10.2. IncorrectLineLengthException

Files formatted in a fixed length format have additional requirements when parsing because, unlike a delimited format, each column must strictly adhere to its predefined width. If the total line length doesn't add up to the widest value of this column, an exception is thrown:

采用固定长度格式的文件在解析时有些额外要求,因为与定界格式不同,每一列必须要严格指定其预定义宽度。如果总的line长度加起来没有达到该列的最大值,则抛出异常:

tokenizer.setColumns(new Range[] { new Range(1, 5), 
                                   new Range(6, 10), 
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

 The configured ranges for the tokenizer above are: 1-5, 6-10, and 11-15, thus the total length of the line expected is 15. However, in this case a line of length 5 was passed in, causing an IncorrectLineLengthException to be thrown. Throwing an exception here rather than only mapping the first column allows the processing of the line to fail earlier, and with more information than it would if it failed while trying to read in column 2 in a FieldSetMapper. However, there are scenarios where the length of the line isn't always constant. For this reason, validation of line length can be turned off via the 'strict' property:

标记器配置范围为:1-5, 6-10及11-15,因此预期line总长度为15。然而,在该情况下,传递的line长度为5,导致抛出异常IncorrectLineLengthException。由于此时抛出异常而不是映射第一列,所以line处理时发生故障的时间要早于它读取FieldSetMapper第2列时发生故障的时间,且信息量也更大。然而,有些情况下line的长度并不总是恒定的。此时,通过'strict'属性可以关闭line长度验证:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

 The above example is almost identical to the one before it, except that tokenizer.setStrict(false) was called. This setting tells the tokenizer to not enforce line lengths when tokenizing the line. A FieldSet is now correctly created and returned. However, it will only contain empty tokens for the remaining values.

上面例子与前面例子基本完全相同,唯一区别就是调用了tokenizer.setStrict(false)。这一设置是在告诉标记器,在标记line时不要强行验证line长度。现在,便正确创建并返回了一个FieldSet。然而,它只包含剩余值的空标记。

6.6.3. FlatFileItemWriter

Writing out to flat files has the same problems and issues that reading in from a file must overcome. A step must be able to write out in either delimited or fixed length formats in a transactional manner.

向平面文件写入数据与从文件中读取数据具有相同的问题需要解决。Step必须能够用事务性方式以定界格式或固定长度格式写出数据。

6.6.3.1. LineAggregator

Just as the LineTokenizer interface is necessary to take an item and turn it into a String, file writing must have a way to aggregate multiple fields into a single string for writing to a file. In Spring Batch this is the LineAggregator:

就好像LineTokenizer接口必须要接收一个项目并将其转换为一个String,向文件写入时也必须要有一种方法,将多个字段聚集成为一个可以向文件写入的字符串。在Spring Batch中,有LineAggregator接口负责这一任务:

public interface LineAggregator<T> {

    public String aggregate(T item);

}

 The LineAggregator is the opposite of a LineTokenizerLineTokenizer takes a String and returns a FieldSet, whereas LineAggregator takes an item and returns a String.

LineAggregator与LineTokenizer相反。LineTokenizer接收一个String,返回一个FieldSet;而LineAggregator接收一个item(项目),返回一个String(字符串)。

 

6.6.3.1.1. PassThroughLineAggregator

The most basic implementation of the LineAggregator interface is the PassThroughLineAggregator, which simply assumes that the object is already a string, or that its string representation is acceptable for writing:

LineAggregator接口最简单的一种部署就是PassThroughLineAggregator,它假设对象已经是一个字符串,或者它的字符串表示法可以用于数据写出:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

 The above implementation is useful if direct control of creating the string is required, but the advantages of a FlatFileItemWriter, such as transaction and restart support, are necessary.

如果需要对字符串创建直接进行控制,则上面的部署可以派上用场。但是FlatFileItemWriter的优点,比如支持事务和重启,仍然必不可少。

6.6.3.2. Simplified File Writing Example

Now that the LineAggregator interface and its most basic implementation, PassThroughLineAggregator, have been defined, the basic flow of writing can be explained:

  • The object to be written is passed to the LineAggregator in order to obtain a String.
  • The returned String is written to the configured file.
The following excerpt from the FlatFileItemWriter expresses this in code:
现在已经定义了LineAggregator接口及其最基本的部署接口PassThroughLineAggregator,写入基本流程解释如下:
  • 将被写入的对象传递给LineAggregator,以获得一个String。
  • 返回的String被写入配置文件。
如下FlatFileItemWriter片断可以用代码对此进行解释:
public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}
 A simple configuration would look like the following:
一个简单的配置过程如下:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" value="file:target/test-outputs/output.txt" />
    <property name="lineAggregator">
        <bean class="org.spr...PassThroughLineAggregator"/>
    </property>
</bean>

6.6.3.3. FieldExtractor

The above example may be useful for the most basic uses of a writing to a file. However, most users of the FlatFileItemWriter will have a domain object that needs to be written out, and thus must be converted into a line. In file reading, the following was required:

  1. Read one line from the file.

  2. Pass the string line into the LineTokenizer#tokenize() method, in order to retrieve a FieldSet

  3. Pass the FieldSet returned from tokenizing to a FieldSetMapper, returning the result from the ItemReader#read() method

以上示例可能对最基本的文件写入应用有所帮助。然而,FlatFileItemWriter大多数用户将会有一个需要被写出的域对象,且该对象必须要转换为一个line。在文件读取时,需要如下过程:
  • 从文件读取一个line
  • 将字符串line传递给LineTokenizer#tokenize()方法,以检索FieldSet
  • 将标记化过程返回的FieldSet传递给FieldSetMapper,返回ItemReader#read()过程生成的结果

File writing has similar, but inverse steps:

  1. Pass the item to be written to the writer

  2. convert the fields on the item into an array

  3. aggregate the resulting array into a line

文件写入需要类似但相反的步骤:
  • 把将要写入的项目传递给写入器
  • 将项目的字段转换为一个数组
  • 将生成的数组聚集为一个line
Because there is no way for the framework to know which fields from the object need to be written out, a FieldExtractor must be written to accomplish the task of turning the item into an array:
因为框架无法知道对象的哪些字段需要写出,所以必须要编写(write)FieldExtractor以将项目转换为一个数组:
public interface FieldExtractor<T> {

    Object[] extract(T item);

}
 Implementations of the FieldExtractor interface should create an array from the fields of the provided object, which can then be written out with a delimiter between the elements, or as part of a field-width line.
FieldExtractor接口部署应该根据提供的对象的字段创建一个数组,然后用元素间的分隔符将数组写出,或者作为字段宽度line的一部分将数组写出。
6.6.3.3.1. PassThroughFieldExtractor

There are many cases where a collection, such as an array, Collection, or FieldSet, needs to be written out. "Extracting" an array from a one of these collection types is very straightforward: simply convert the collection to an array. Therefore, the PassThroughFieldExtractor should be used in this scenario. It should be noted, that if the object passed in is not a type of collection, then the PassThroughFieldExtractor will return an array containing solely the item to be extracted.

许多情况下,需要将数组、Collection、或FieldSet等集合(collection)写出。从某种类型的集合“提取”一个数组其实非常简单:将集合转换为一个数组即可。因此,此时应该使用PassThroughFieldExtractor接口。请注意,如果传递来的对象不属于集合的类型,则PassThroughFieldExtractor将返回只包括将被提取的项目的数组。

 

6.6.3.3.2. BeanWrapperFieldExtractor

As with the BeanWrapperFieldSetMapper described in the file reading section, it is often preferable to configure how to convert a domain object to an object array, rather than writing the conversion yourself. The BeanWrapperFieldExtractor provides just this type of functionality:

与文件读取章节的BeanWrapperFieldSetMapper类似,人们往往希望对域对象向对象数组的转换过程进行配置,而不是自己编写转换过程。BeanWrapperFieldExtractor可以提供这方面的功能:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

 This extractor implementation has only one required property, the names of the fields to map. Just as the BeanWrapperFieldSetMapper needs field names to map fields on the FieldSet to setters on the provided object, the BeanWrapperFieldExtractor needs names to map to getters for creating an object array. It is worth noting that the order of the names determines the order of the fields within the array.

该提取器部署只需要一种属性,即:将要映射的字段名称。就好像BeanWrapperFieldSetMapper需要字段名称以将FieldSet的字段映射到所提供的对象的调节器(setter)上,BeanWrapperFieldExtractor也需要名称来映射到获取器(getter)上以创建一个对象数组。请注意,名称的顺序决定了数组内字段的顺序。

6.6.3.4. Delimited File Writing Example

The most basic flat file format is one in which all fields are separated by a delimiter. This can be accomplished using a DelimitedLineAggregator. The example below writes out a simple domain object that represents a credit to a customer account:

最基本的平面文件格式中,所有字段用一个分隔符隔开。通过DelimitedLineAggregator可以实现这一点。下面的例子写出一个简单的域对象;该对象表示客户账户信誉:

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //getters and setters removed for clarity
}

 Because a domain object is being used, an implementation of the FieldExtractor interface must be provided, along with the delimiter to use:

因为正使用一个域对象,所以必须提供一个FieldExtractor接口部署以及将要使用的分隔符:

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...DelimitedLineAggregator">
            <property name="delimiter" value=","/>
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

 In this case, the BeanWrapperFieldExtractor described earlier in this chapter is used to turn the name and credit fields within CustomerCredit into an object array, which is then written out with commas between each field.

在该例子中,使用了本章先前描述的BeanWrapperFieldExtractor来将CustomerCredit内的名称和信誉字段(credit field)转换为一个对象数组,然后在每个字段间添加逗号将数组写出。

6.6.3.5. Fixed Width File Writing Example

Delimited is not the only type of flat file format. Many prefer to use a set width for each column to delineate between fields, which is usually referred to as 'fixed width'. Spring Batch supports this in file writing via the FormatterLineAggregator. Using the same CustomerCredit domain object described above, it can be configured as follows:

分隔符并不是平面文件的唯一格式。许多人喜欢为每一列设置宽度以将字段区分开,这种方式称为“固定宽度”方式。Spring Batch通过FormatterLineAggregator接口在文件写入时支持这种格式。使用上文描述的同样的CustomerCredit域对象,它可被配置如下:

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...FormatterLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit" />
                </bean>
            </property>
            <property name="format" value="%-9s%-2.0f" />
        </bean>
    </property>
</bean>

 Most of the above example should look familiar. However, the value of the format property is new:

上面大多数例子看起来比较类似。然而,格式属性值却是新特点:

<property name="format" value="%-9s%-2.0f" />

 The underlying implementation is built using the same Formatter added as part of Java 5. The Java Formatter is based on the printf functionality of the C programming language. Most details on how to configure a formatter can be found in the javadoc of Formatter.

利用作为Java 5的一部分的相同的Formatter来完成底层部署。Java的Formatter以C语言printf功能为基础。欲知格式器(formatter)如何配置,请见《Formatter》javadoc文档相关内容。

 

6.6.3.6. Handling File Creation

FlatFileItemReader has a very simple relationship with file resources. When the reader is initialized, it opens the file if it exists, and throws an exception if it does not. File writing isn't quite so simple. At first glance it seems like a similar straight forward contract should exist forFlatFileItemWriter: if the file already exists, throw an exception, and if it does not, create it and start writing. However, potentially restarting aJob can cause issues. In normal restart scenarios, the contract is reversed: if the file exists, start writing to it from the last known good position, and if it does not, throw an exception. However, what happens if the file name for this job is always the same? In this case, you would want to delete the file if it exists, unless it's a restart. Because of this possibility, the FlatFileItemWriter contains the property, shouldDeleteIfExists. Setting this property to true will cause an existing file with the same name to be deleted when the writer is opened.

FlatFileItemReader与文件资源的关系非常简单。当读取器初始化后,它将打开文件(如果它存在),如果不存在则抛出一个异常。文件写入并不这样简单。初看上去,它的原则比较简单,并且与FlatFileItemWriter类似:如果文件已经存在,抛出异常,如果不存在,创建并且写入。然而,强行重启一个job可能会引发问题。在正常的重启场景下,原则(contract)刚好相反:如果文件存在,从已知的最后一个正确位置开始向其写入,如果它不存在,抛出异常。然而,如果该作业的文件名始终相同时,将会如何?此时,如果文件存在的话,你可能想要删除文件,除非是重启情况。正因为存在这种可能性,FlatFileItemWriter包括shouldDeleteIfExists属性。将该属性设为真,那么在开启读取器后,将会删除具有相同名称的当前文件。

6.7. XML Item Readers and Writers

Spring Batch provides transactional infrastructure for both reading XML records and mapping them to Java objects as well as writing Java objects as XML records.

Spring Batch提供了事务性基础结构,以读取XML记录,将记录映射到Java对象,以及将Java对象写为XML记录。

Constraints on streaming XML

The StAX API is used for I/O as other standard XML parsing APIs do not fit batch processing requirements (DOM loads the whole input into memory at once and SAX controls the parsing process allowing the user only to provide callbacks).

当其他标准的XML解析API不符合批处理要求时(DOM将整个输入一次性载入内存,SAX控制解析过程,只允许用户提供回叫),StAX API可用于I/O(输入/输出)。

Lets take a closer look how XML input and output works in Spring Batch. First, there are a few concepts that vary from file reading and writing but are common across Spring Batch XML processing. With XML processing, instead of lines of records (FieldSets) that need to be tokenized, it is assumed an XML resource is a collection of 'fragments' corresponding to individual records:

让我们更详细地分析Spring Batch中的XML输入和输出是如何工作的。首先,有一些概念对不同的文件读取和写入有不同的含义,但是对Spring Batch XML处理则没有差异。对XML处理,我们假设XML资源是一组与各个记录相对应的“片断”的集合,而不是需要被标记的多行(line)记录(FieldSets):

 

Figure 3.1: XML Input

The 'trade' tag is defined as the 'root element' in the scenario above. Everything between '<trade>' and '</trade>' is considered one 'fragment'. Spring Batch uses Object/XML Mapping (OXM) to bind fragments to objects. However, Spring Batch is not tied to any particular XML binding technology. Typical use is to delegate to Spring OXM, which provides uniform abstraction for the most popular OXM technologies. The dependency on Spring OXM is optional and you can choose to implement Spring Batch specific interfaces if desired. The relationship to the technologies that OXM supports can be shown as the following:

在以上场景中,'trade'标记被定义为'root element'('根元素')。'<trade>'和'</trade>'间的一切将看作一个“片断”。 Spring Batch使用Object/XML Mapping (OXM)(对象/XML映射)来将片断与对象绑定。然而,Spring Batch并没有受限于任何一种XML绑定技术。典型的用途就是委托给Spring OXM,它可以为大多数常用的OXM技术提供统一的抽象(接口)。你既可以选择是否依赖于Spring OXM,也可以根据需要选择部署具体的Spring Batch接口。下面给出与OXM支持技术间的关系:

 

 Figure 3.2: OXM Binding

Now with an introduction to OXM and how one can use XML fragments to represent records, let's take a closer look at readers and writers.

现在简单描述了OXM及如何使用XML片断来表示记录,下面让我们详细讨论读取器和写入器。

6.7.1. StaxEventItemReader

The StaxEventItemReader configuration provides a typical setup for the processing of records from an XML input stream. First, lets examine a set of XML records that the StaxEventItemReader can process.

StaxEventItemReader配置为处理XML输入流记录提供了典型的配置方法。首先,让我们讨论StaxEventItemReader能够处理的一组XML记录。

 

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>
 To be able to process the XML records the following is needed:

 

  • Root Element Name - Name of the root element of the fragment that constitutes the object to be mapped. The example configuration demonstrates this with the value of trade.
  • Resource - Spring Resource that represents the file to be read.
  • Unmarshaller - Unmarshalling facility provided by Spring OXM for mapping the XML fragment to an object.
为了能够处理XML记录,需要如下条件:
  • Root Element Name—构成待映射对象的片断的根元素名称。配置示例用trade值对此进行了阐述。
  • Resource - 表示待读取文件的Spring资源
  • Unmarshaller—Spring OXM 提供的Unmarshaller功能,用于将XML片断映射到一个对象上。

 

<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
    <property name="fragmentRootElementName" value="trade" />
    <property name="resource" value="data/iosample/input/input.xml" />
    <property name="unmarshaller" ref="tradeMarshaller" />
</bean>
 Notice that in this example we have chosen to use an XStreamMarshaller which accepts an alias passed in as a map with the first key and value being the name of the fragment (i.e. root element) and the object type to bind. Then, similar to a FieldSet, the names of the other elements that map to fields within the object type are described as key/value pairs in the map. In the configuration file we can use a Spring configuration utility to describe the required alias as follows:

 

use a Spring configuration utility to describe the required alias as follows:

请注意,在这个例子中,我们选择使用一个XStreamMarshaller接口,该接口可以将传递过来的别名(alias)作为一个映射,第一个键和值是片断的名称(即根元素)和待绑定对象类型。然后,与FieldSet类似,映射到对象类型范围内字段的其他元素的名称将被描述为映射中的键/值对。在配置文件中,我们可以使用Spring配置程序将需要的别名描述如下:

 

<bean id="tradeMarshaller" 
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="trade"
                   value="org.springframework.batch.sample.domain.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="name" value="java.lang.String" />
        </util:map>
    </property>
</bean>
 On input the reader reads the XML resource until it recognizes that a new fragment is about to start (by matching the tag name by default). The reader creates a standalone XML document from the fragment (or at least makes it appear so) and passes the document to a deserializer (typically a wrapper around a Spring OXM Unmarshaller) to map the XML to a Java object.

 

In summary, this procedure is analogous to the following scripted Java code which uses the injection provided by the Spring configuration:

在输入时,读取器将读取XML资源,直到它识别到有一个新的片断将要开启(通过匹配默认的标签名称)。读取器根据片断来创建一个独立的XML文件(或者至少使它看上去如此),然后把文件传递给串并转换器(一般而言是Spring OXM Unmarshaller周围的一个wrapper包装对象),以将XML映射到一个Java对象。

总体来说,这一步骤与如下Java脚本类似,该脚本使用了Spring配置提供的注入:

 

StaxEventItemReader xmlStaxEventItemReader = new StaxEventItemReader()
Resource resource = new ByteArrayResource(xmlResource.getBytes()) 

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(marshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true

CustomerCredit credit = null;

while (hasNext) {
    credit = xmlStaxEventItemReader.read();
    if (credit == null) {
        hasNext = false;
    }
    else {
        System.out.println(credit);
    }
}
 

 

6.7.2. StaxEventItemWriter

Output works symmetrically to input. The StaxEventItemWriter needs a Resource, a marshaller, and a rootTagName. A Java object is passed to a marshaller (typically a standard Spring OXM Marshaller) which writes to a Resource using a custom event writer that filters the StartDocument andEndDocument events produced for each fragment by the OXM tools. We'll show this in an example using the MarshallingEventWriterSerializer. The Spring configuration for this setup looks as follows:

MarshallingEventWriterSerializer. The Spring configuration for this setup looks as follows:

输出与输入是对称的。StaxEventItemWriter需要一个Resource、marshaller及rootTagName。一个Java对象传递给marshaller(一般情况下是一个标准的Spring OXM Marshaller), marshaller然后使用一个专门的事件读取器向Resource写入,该读取器会对OXM工具为每个片断生成的StartDocument和EndDocument事件进行过滤。我们将在一个例子中使用MarshallingEventWriterSerializer来证明这一点。此时,Spring配置如下:

 

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="marshaller" ref="customerCreditMarshaller" />
    <property name="rootTagName" value="customers" />
    <property name="overwriteOutput" value="true" />
</bean>
 The configuration sets up the three required properties and optionally sets the overwriteOutput=true, mentioned earlier in the chapter for specifying whether an existing file can be overwritten. It should be noted the marshaller used for the writer is the exact same as the one used in the reading example from earlier in the chapter:

 

配置过程可以设置需要的3种属性,并且可以根据选择设置overwriteOutput=true,根据本章前面内容,这可以明确是否可以覆盖现有文件。请注意,读取器使用的marshaller与本章前面读取示例使用的marshaller完全相同。

 

<bean id="customerCreditMarshaller" 
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="customer"
                   value="org.springframework.batch.sample.domain.CustomerCredit" />
            <entry key="credit" value="java.math.BigDecimal" />
            <entry key="name" value="java.lang.String" />
        </util:map>
    </property>
</bean>
 To summarize with a Java example, the following code illustrates all of the points discussed, demonstrating the programmatic setup of the required properties:

 

用一个java例子做总结,下面代码阐述了上面讨论的所有要点,并演示了要求属性的编程设置:

 

StaxEventItemWriter staxItemWriter = new StaxEventItemWriter()
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("customer","org.springframework.batch.sample.domain.CustomerCredit");
aliases.put("credit","java.math.BigDecimal");
aliases.put("name","java.lang.String");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

staxItemWriter.setResource(resource);
staxItemWriter.setMarshaller(marshaller);
staxItemWriter.setRootTagName("trades");
staxItemWriter.setOverwriteOutput(true);

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
CustomerCredit Credit = new CustomerCredit();
trade.setPrice(11.39); 
credit.setName("Customer1");
staxItemWriter.write(trade);
 

 

6.8. Multi-File Input

It is a common requirement to process multiple files within a single Step. Assuming the files all have the same formatting, theMultiResourceItemReader supports this type of input for both XML and flat file processing. Consider the following files in a directory:

经常需要在一个step内处理多个文件。假设所有文件的格式相同,MultiResourceItemReader为XML和平面文件处理支持这种类型的输入。考虑如下一个目录中的文件:

 

file-1.txt  file-2.txt  ignored.txt
 file-1.txt and file-2.txt are formatted the same and for business reasons should be processed together. The MuliResourceItemReader can be used to read in both files by using wildcards:

 

file-1.txt和file-2.txt的格式相同,出于业务考虑需要同时处理。可以使用MuliResourceItemReader及通配符实现双份文件读取:

 

<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
    <property name="resources" value="classpath:data/input/file-*.txt" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>
 The referenced delegate is a simple FlatFileItemReader. The above configuration will read input from both files, handling rollback and restart scenarios. It should be noted that, as with any ItemReader, adding extra input (in this case a file) could cause potential issues when restarting. It is recommended that batch jobs work with their own individual directories until completed successfully.

 

被引用的受托方(delegate)是一个简单的FlatFileItemReader。上述配置将从双份文件中读取输入,处理回转(rollback)和重启情况。请注意,与ItemReader类似,重启时添加其他文件(在这个例子中是一份文件)可能会引发问题。建议批作业在各自目录内运行,直到成功完成为止。

6.9. Database

Like most enterprise application styles, a database is the central storage mechanism for batch. However, batch differs from other application styles due to the sheer size of the datasets with which the system must work. If a SQL statement returns 1 million rows, the result set probably holds all returned results in memory until all rows have been read. Spring Batch provides two types of solutions for this problem: Cursor and Paging database ItemReaders.

与大多数企业应用风格类似,数据库是批作业的中央存储场所。然而,批作业与其他应用风格又有所不同,因为系统运行所依赖的数据库非常庞大。如果一条SQL语句返回1百万行,则结果集很有可能在内存中存放返回的结果,直到所有行均被读取完毕。Spring Batch为该问题提供了两种解决方案:Cursor和Paging数据库ItemReaders。

6.9.1. Cursor Based ItemReaders

Using a database cursor is generally the default approach of most batch developers, because it is the database's solution to the problem of 'streaming' relational data. The Java ResultSet class is essentially an object orientated mechanism for manipulating a cursor. A ResultSetmaintains a cursor to the current row of data. Calling next on a ResultSet moves this cursor to the next row. Spring Batch cursor based ItemReaders open the a cursor on initialization, and move the cursor forward one row for every call to read, returning a mapped object that can be used for processing. The close method will then be called to ensure all resources are freed up. The Spring core JdbcTemplate gets around this problem by using the callback pattern to completely map all rows in a ResultSet and close before returning control back to the method caller. However, in batch this must wait until the step is complete. Below is a generic diagram of how a cursor based ItemReader works, and while a SQL statement is used as an example since it is so widely known, any technology could implement the basic approach:

使用数据库光标基本上是大多数批作业开发人员的默认选择,因为它是数据库针对关系数据”流”问题的一种解决方案。Java的ResultSet类实际上提供了一种控制光标的一种面向对象的机制。ResultSet维护一个指向当前数据行的一个光标。针对ResultSet调用next可以将该光标移向下一行。基于Spring Batch光标的ItemReaders在初始化时开启光标,每调用一次read就将光标向前移动一行,返回的被映射对象可以用于处理。然后调用close方法,以确保释放了所有资源。Spring内核JdbcTemplate对该问题的解决方案如下:使用回调模式对ResultSet中的所有行全部映射,并在将控制权返回给方法调用程序前关闭。然而,对批作业情况,这必须要一直等待,直到step结束。下面给出了基于光标的ItemReader接口的通用原理图。鉴于SQL的知名度,使用一条SQL语句作为示例,实际上其他技术均可用来部署这一基本方法:

 

This example illustrates the basic pattern. Given a 'FOO' table, which has three columns: ID, NAME, and BAR, select all rows with an ID greater than 1 but less than 7. This puts the beginning of the cursor (row 1) on ID 2. The result of this row should be a completely mapped Foo object. Calling read() again moves the cursor to the next row, which is the Foo with an ID of 3. The results of these reads will be written out after eachread, thus allowing the objects to be garbage collected (assuming no instance variables are maintaining references to them).

该示例阐述了基本模式。已知一个有3个列的'FOO'表:ID、NAME、BAR,选择ID大于1但小于7的所有行。这将使光标(第1行)从ID 2开始。这一行的结果应该是完全映射的Foo对象。再次调用read()将把光标移到下一行,即ID为3的Foo。这些读取的结果在每次read之后都将被写出,因此允许对象的无用单元被收集起来(garbage collected)(假设没有实例变量维护对它们的引用)。

6.9.1.1. JdbcCursorItemReader

JdbcCursorItemReader is the Jdbc implementation of the cursor based technique. It works directly with a ResultSet and requires a SQL statement to run against a connection obtained from a DataSource. The following database schema will be used as an example:

JdbcCursorItemReader是基于光标技术的一种Jdbc部署。它直接与ResultSet配合使用,要求SQL语句运行于从DataSource获得的连接。使用如下的数据库模式作为示例:

 

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,  
   NAME VARCHAR(45),
   CREDIT FLOAT
);
 Many people prefer to use a domain object for each row, so we'll use an implementation of the RowMapper interface to map a CustomerCredit object:

 

许多人喜欢为每一行使用一个域对象,因此我们通过RowMapper接口的一个部署来映射CustomerCredit对象:

 

public class CustomerCreditRowMapper implements RowMapper {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}
 Because JdbcTemplate is so familiar to users of Spring, and the JdbcCursorItemReader shares key interfaces with it, it is useful to see an example of how to read in this data with JdbcTemplate, in order to contrast it with the ItemReader. For the purposes of this example, let's assume there are 1,000 rows in the CUSTOMER database. The first example will be using JdbcTemplate:

 

因为Spring用户对JdbcTemplate非常熟悉,JdbcCursorItemReader与JdbcTemplate共享关键接口,所以我们将讨论如何用JdbcTemplate读取该数据,以与ItemReader做比较。就这一例子来说,让我们假设CUSTOMER数据库有1000行。第一个例子将使用JdbcTemplate:

 

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER", 
                                          new CustomerCreditRowMapper());
 After running this code snippet the customerCredits list will contain 1,000 CustomerCredit objects. In the query method, a connection will be obtained from the DataSource, the provided SQL will be run against it, and the mapRow method will be called for each row in the ResultSet. Let's contrast this with the approach of the JdbcCursorItemReader:

 

运行该代码片断后,customerCredits列表将包含1000个CustomerCredit对象。在该检索方法中,将从DataSource获得一个连接,提供的SQL将基于该连接运行,并且将会为ResultSet中的每一行调用mapRow方法。让我们将其与JdbcCursorItemReader方法进行比较:

 

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close(executionContext);
 After running this code snippet the counter will equal 1,000. If the code above had put the returned customerCredit into a list, the result would have been exactly the same as with the JdbcTemplate example. However, the big advantage of the ItemReader is that it allows items to be 'streamed'. The read method can be called once, and the item written out via an ItemWriter, and then the next item obtained via read. This allows item reading and writing to be done in 'chunks' and committed periodically, which is the essence of high performance batch processing. Furthermore, it is very easily configured for injection into a Spring Batch Step:

 

运行完该代码片断后,计算器将等于1000。如果上述代码将返回的customerCredit放入一个列表,则结果将与JdbcTemplate示例完全相同。然而,ItemReader的一个重要特点就是,它允许项目”入流”(stream)。Read方法可以调用一次,利用ItemWriter将项目写出,然后利用read获得下一个项目。这允许项目被”大块”读取和写出并被周期性提交,这是高性能批量处理的本质。此外,为了注入进Spring Batch step而进行的配置非常简单:

 

<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>
 

 

6.9.1.1.1. Additional Properties

Because there are so many varying options for opening a cursor in Java, there are many properties on the JdbcCustorItemReader that can be set:

因为开启Java光标的选项非常多,所以可被设置的JdbcCustorItemReader属性也非常多:


Table 6.2. JdbcCursorItemReader Properties

ignoreWarnings Determines whether or not SQLWarnings are logged or cause an exception - default is true
fetchSize Gives the Jdbc driver a hint as to the number of rows that should be fetched from the database when more rows are needed by the ResultSet object used by the ItemReader. By default, no hint is given.
maxRows Sets the limit for the maximum number of rows the underlying ResultSet can hold at any one time.
queryTimeout Sets the number of seconds the driver will wait for a Statement object to execute to the given number of seconds. If the limit is exceeded, a DataAccessEception is thrown. (Consult your driver vendor documentation for details).
verifyCursorPosition Because the same ResultSet held by the ItemReader is passed to the RowMapper, it is possible for users to call ResultSet.next() themselves, which could cause issues with the reader's internal count. Setting this value to true will cause an exception to be thrown if the cursor position is not the same after the RowMapper call as it was before.
saveState Indicates whether or not the reader's state should be saved in the ExecutionContext provided byItemStream#update(ExecutionContext) The default value is true.
driverSupportsAbsolute Defaults to false. Indicates whether the Jdbc driver supports setting the absolute row on aResultSet. It is recommended that this is set to true for Jdbc drivers that supportsResultSet.absolute() as it may improve performance, especially if a step fails while working with a large data set.
setUseSharedExtendedConnection Defaults to false. Indicates whether the connection used for the cursor should be used by all other processing thus sharing the same transaction. If this is set to false, which is the default, then the cursor will be opened using its own connection and will not participate in any transactions started for the rest of the step processing. If you set this flag to true then you must wrap the DataSource in an ExtendedConnectionDataSourceProxy to prevent the connection from being closed and released after each commit. When you set this option to true then the statement used to open the cursor will be created with both 'READ_ONLY' and 'HOLD_CUSORS_OVER_COMMIT' options. This allows holding the cursor open over transaction start and commits performed in the step processing. To use this feature you need a database that supports this and a Jdbc driver supporting Jdbc 3.0 or later.

6.9.1.2. HibernateCursorItemReader

Just as normal Spring users make important decisions about whether or not to use ORM solutions, which affect whether or not they use aJdbcTemplate or a HibernateTemplate, Spring Batch users have the same options. HibernateCursorItemReader is the Hibernate implementation of the cursor technique. Hibernate's usage in batch has been fairly controversial. This has largely been because Hibernate was originally developed to support online application styles. However, that doesn't mean it can't be used for batch processing. The easiest approach for solving this problem is to use a StatelessSession rather than a standard session. This removes all of the caching and dirty checking hibernate employs that can cause issues in a batch scenario. For more information on the differences between stateless and normal hibernate sessions, refer to the documentation of your specific hibernate release. The HibernateCursorItemReader allows you to declare an HQL statement and pass in a SessionFactory, which will pass back one item per call to read in the same basic fashion as the JdbcCursorItemReader. Below is an example configuration using the same 'customer credit' example as the JDBC reader:

普通的Spring用户需要对是否使用ORM解决方案做出重大决策,这将影响他们是否使用JdbcTemplate或HibernateTemplate,与此类似,Spring Batch用户面临同样的选择。HibernateCursorItemReader是光标技术的一种Hibernate部署。Hibernate在批作业中的应用极具争议性。主要原因是Hibernate的初始开发目的是为了支持在线应用。然而,这并不是说它就不能用于批处理。解决这一问题的最简单方法就是使用StatelessSession而不是标准会话。这就去除了可能给批处理带来问题的所有缓存和Hibernate脏数据检查机制。欲知无状态(stateless)和正常状态hibernate会话之间的差别,请见具体hibernate版本相关文档。通过HibernateCursorItemReader,你可以声明一个HQL语句,传递一个SessionFactory,SessionFactory将在每次调用read时返回一个项目,并且返回的基本方式与JdbcCursorItemReader相同。以下配置示例使用了与JDBC读取器相同的'customer credit'示例:

 

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close(executionContext);
 This configured ItemReader will return CustomerCredit objects in the exact same manner as described by the JdbcCursorItemReader, assuming hibernate mapping files have been created correctly for the Customer table. The 'useStatelessSession' property defaults to true, but has been added here to draw attention to the ability to switch it on or off. It is also worth noting that the fetchSize of the underlying cursor can be set via the setFetchSize property. As with JdbcCursorItemReader, configuration is straightforward:

 

假设已经为Customer表正确创建了hibernate映射文件,则这个经过配置的ItemReader将返回CustomerCredit对象,且返回的方式与JdbcCursorItemReader描述的完全相同。'useStatelessSession'属性默认为真,放在这里的目的就是引起注意,它可以将属性打开或关闭。还请注意,通过setFetchSize属性可以设置底层光标的fetchSize。与JdbcCursorItemReader类似,配置方法很简单:

 

<bean id="itemReader"
      class="org.springframework.batch.item.database.HibernateCursorItemReader">
    <property name="sessionFactory" ref="sessionFactory" />
    <property name="queryString" value="from CustomerCredit" />
</bean>
 

 

6.9.1.3. StoredProcedureItemReader

Sometimes it is necessary to obtain the cursor data using a stored procedure. The StoredProcedureItemReader works like the JdbcCursorItemReaderexcept that instead of executing a query to obtain a cursor we execute a stored procedure that returns a cursor. The stored procedure can return the cursor in three different ways:

  • as a returned ResultSet (used by SQL Server, Sybase, DB2, Derby and MySQL)
  • as a ref-cursor returned as an out parameter (used by Oracle and PostgreSQL)
  • as the return value of a stored function call

Below is a basic example configuration using the same 'customer credit' example as earlier:

有时候必须要使用被存储的程序来获得光标数据。StoredProcedureItemReader的工作方式与JdbcCursorItemReader类似,唯一区别就是,我们不是通过检索来获得光标,而是运行一段被存储的程序来返回一个光标。被存储的程序可以用三种方式来返回光标:

  • 作为一个返回的ResultSet(被SQL Server, Sybase, DB2, Derby和MySQL采用)
  • 作为一个参考光标,该光标作为一个输出参数而返回(被Oracle和PostgreSQL采用)
  • 作为调用存储函数的返回值

以下是一个基本配置示例,使用的'customer credit'例子与上文相同:

 

<bean id="reader" class="org.springframework.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>
 This example relies on the stored procedure to provide a ResultSet as a returned result (option 1 above).

 

该示例依赖存储程序将ResultSet作为返回结果而提供(上文选项1):

If the stored procedure returned a ref-cursor (option 2) then we would need to provide the position of the out parameter that is the returned ref-cursor. Here is an example where the first parameter is the returned ref-cursor:

如果存储程序返回一个参考光标(选项2),则我们需要提供返回的参考光标的输出参数位置。下面例子中,第一个参数是返回的参考光标:

 

<bean id="reader" class="org.springframework.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>
If the cursor was returned from a stored function (option 3) we would need to set the property "function" to true. It defaults to false. Here is what that would look like:

 

如果光标是从存储函数返回(选项3),则我们需要将属性"function"设为true。它默认值是false。情况应该如下文所示:

 

<bean id="reader" class="org.springframework.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>
 In all of these cases we need to define a RowMapper as well as a DataSource and the actual procedure name.

 

If the stored procedure or function takes in parameter then they must be declared and set via the parameters property. Here is an example for Oracle that declares three parameters. The first one is the out parameter that returns the ref-cursor, the second and third are in parameters that takes a value of type INTEGER:

在所有这些情况中,我们需要定义RowMapper、DataSource及实际程序名称。

如果存储程序或函数接纳参数,则它们必须通过参数属性进行声明和设置。下面是宣布3个参数的Oracle示例。第一个参数是返回参考光标的输出参数,第二个和第三个参数是类型为INTEGER的输入参数:

 

<bean id="reader" class="org.springframework.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
 In addition to the parameter declarations we need to specify a PreparedStatementSetter implementation that sets the parameter values for the call. This works the same as for the JdbcCursorItemReader above. All the additional properties listed in Section 6.9.1.1.1, “Additional Properties” apply to the StoredProcedureItemReader as well.

 

除了参数声明外,我们还需要明确为每次调用设置参数值的PreparedStatementSetter接口部署。它的工作原理与上文JdbcCursorItemReader相同。6.9.1.1.1节“Additional Properties”(“其他属性”)列出的所有其他属性也适用于StoredProcedureItemReader。

6.9.2. Paging ItemReaders

An alternative to using a database cursor is executing multiple queries where each query is bringing back a portion of the results. We refer to this portion as a page. Each query that is executed must specify the starting row number and the number of rows that we want returned for the page.

使用数据库光标的另一种方式就是进行多次检索,每次检索返回一段结果。我们将这一段结果称为一页。每次运行检索时,我们必须要明确起始行号以及我们希望为该页返回的行号。

6.9.2.1. JdbcPagingItemReader

One implementation of a paging ItemReader is the JdbcPagingItemReader. The JdbcPagingItemReader needs a PagingQueryProvider responsible for providing the SQL queries used to retrieve the rows making up a page. Since each database has its own strategy for providing paging support, we need to use a different PagingQueryProvider for each supported database type. There is also the SqlPagingQueryProviderFactoryBean that will auto-detect the database that is being used and determine the appropriate PagingQueryProvider implementation. This simplifies the configuration and is the recommended best practice.

JdbcPagingItemReader是分页ItemReader的一种部署。JdbcPagingItemReader需要PagingQueryProvider能够提供SQL查询,且它提供的查询能够用于检索构成一个页面的行。因为每个数据都有自身的分布支持策略,我们需要为每种数据库支持类型使用不同的PagingQueryProvider。还有SqlPagingQueryProviderFactoryBean接口可以自动检测出正在使用的数据库,并且确定合适的PagingQueryProvider部署。这简化了配置过程,建议用户多加使用。

 

The SqlPagingQueryProviderFactoryBean requires that you specify a select clause and a from clause. You can also provide an optional where clause. These clauses will be used to build an SQL statement combined with the required sortKey.

After the reader has been opened, it will pass back one item per call to read in the same basic fashion as any other ItemReader. The paging happens behind the scenes when additional rows are needed.

Below is an example configuration using a similar 'customer credit' example as the cursor based ItemReaders above:

SqlPagingQueryProviderFactoryBean需要你明确选择子句和from子句。你还可以根据选择提供where子句。这些子句将被用于构建与要求的sortKey相结合的SQL语句。

已经开启读取器后,它在每次调用read时将传回一个项目,基本传回方式与其他各种ItemReader相同。当需要其他行时,分页将在后台进行。

以下配置示例使用的'customer credit'例子与上文基于光标的ItemReaders类似:

 

<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>
 This configured ItemReader will return CustomerCredit objects using the RowMapper that must be specified. The 'pageSize' property determines the number of entities read from the database for each query execution.

 

The 'parameterValues' property can be used to specify a Map of parameter values for the query. If you use named parameters in the where clause the key for each entry should match the name of the named parameter. If you use a traditional '?' placeholder then the key for each entry should be the number of the placeholder, starting with 1.

这一经过配置的ItemReader通过使用必须被明确的RowMapper,可以返回CustomerCredit对象。'pageSize'属性决定了每次查询操作从数据库中读取的实体(entity)数量。

可以使用'parameterValues'属性来明确查询时的Map参数值。如果你在where子句使用了命名参数,则每个条目(entry)的键值应该与命名参数的命名相匹配。如果你使用了传统的'?'占位符,则每个条目的键值应该是占位符的数量,从1开始。

6.9.2.2. JpaPagingItemReader

Another implementation of a paging ItemReader is the JpaPagingItemReader. JPA doesn't have a concept similar to the Hibernate StatelessSession so we have to use other features provided by the JPA specification. Since JPA supports paging, this is a natural choice when it comes to using JPA for batch processing. After each page is read, the entities will become detached and the persistence context will be cleared in order to allow the entities to be garbage collected once the page is processed.

分页ItemReader的另一种部署是JpaPagingItemReader。JAP的概念与Hibernate StatelessSession不同,所以我们必须要使用JPA规格提供的其他特征。既然JPA支持分页,因此在使用JPA进行批处理时自然会选择它。每个页面读取之后,实体将被分离,持久化上下文将被清除,进而当页面被处理时使实体进行无用单元收集。

The JpaPagingItemReader allows you to declare a JPQL statement and pass in a EntityManagerFactory. It will then pass back one item per call to readin the same basic fashion as any other ItemReader. The paging happens behind the scenes when additional entities are needed. Below is an example configuration using the same 'customer credit' example as the JDBC reader above:

你可以利用JpaPagingItemReader来声明一个JPQL语句,传递EntityManagerFactory。然后,它将在每次调用read时返回一个项目,且返回的基本方式与其它各种ItemReader相同。当需要其他实体时,分页在后台进行。以下配置示例使用的'customer credit'例子与上文JDBC读取器相同:

 

<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>
 This configured ItemReader will return CustomerCredit objects in the exact same manner as described by the JdbcPagingItemReader above, assuming the Customer object has the correct JPA annotations or ORM mapping file. The 'pageSize' property determines the number of entities read from the database for each query execution.

 

假设Customer对象具有合适的annotation或者ORM映射文件,则这一经过配置的ItemReader将返回CustomerCredit对象,且返回方式与上文JdbcPagingItemReader描述完全相同。'pageSize'属性决定了每次查询操作从数据库中读取的实体(entity)数量。

6.9.2.3. IbatisPagingItemReader

If you use IBATIS for your data access then you can use the IbatisPagingItemReader which, as the name indicates, is an implementation of a paging ItemReader. IBATIS doesn't have direct support for reading rows in pages but by providing a couple of standard variables you can add paging support to your IBATIS queries.

Here is an example of a configuration for a IbatisPagingItemReader reading CustomerCredits as in the examples above:

如果你在数据访问时使用IBATIS,则你可以使用IbatisPagingItemReader;如名所示,IbatisPagingItemReader是分页ItemReader的一种部署。IBATIS并不直接支持读取页面行,但是通过提供几个标准变量,你可以为你的IBATIS查询提供分页支持。

以下是IbatisPagingItemReader读取CustomerCredits的配置示例,与上面例子类似:

 

<bean id="itemReader" class="org.spr...IbatisPagingItemReader">
    <property name="sqlMapClient" ref="sqlMapClient"/>
    <property name="queryId" value="getPagedCustomerCredits"/>
    <property name="pageSize" value="1000"/>
</bean>
 The IbatisPagingItemReader configuration above references an IBATIS query called "getPagedCustomerCredits". Here is an example of what that query should look like for MySQL.

 

上面IbatisPagingItemReader配置引用了称为"getPagedCustomerCredits"的IBATIS查询。下面是结合MySQL的查询示例。

 

<select id="getPagedCustomerCredits" resultMap="customerCreditResult">
    select id, name, credit from customer order by id asc LIMIT #_skiprows#, #_pagesize#
</select>
 The _skiprows and _pagesize variables are provided by the IbatisPagingItemReader and there is also a _page variable that can be used if necessary. The syntax for the paging queries varies with the database used. Here is an example for Oracle (unfortunately we need to use CDATA for some operators since this belongs in an XML document):

 

IbatisPagingItemReader提供_skiprows和_pagesize变量,需要时还可以使用_page变量。使用的数据库不同,分页查询的语法也不相同。下面是Oracle示例(然而,由于这是在XML文档,所以对部分操作符我们需要使用CDATA):

 

<select id="getPagedCustomerCredits" resultMap="customerCreditResult">
    select * from (
      select * from (
        select t.id, t.name, t.credit, ROWNUM ROWNUM_ from customer t order by id
       )) where ROWNUM_ <![CDATA[ > ]]> ( #_page# * #_pagesize# )
    ) where ROWNUM <![CDATA[ <= ]]> #_pagesize#
  </select>
 

 

6.9.3. Database ItemWriters

While both Flat Files and XML have specific ItemWriters, there is no exact equivalent in the database world. This is because transactions provide all the functionality that is needed. ItemWriters are necessary for files because they must act as if they're transactional, keeping track of written items and flushing or clearing at the appropriate times. Databases have no need for this functionality, since the write is already contained in a transaction. Users can create their own DAOs that implement the ItemWriter interface or use one from a custom ItemWriter that's written for generic processing concerns, either way, they should work without any issues. One thing to look out for is the performance and error handling capabilities that are provided by batching the outputs. This is most common when using hibernate as an ItemWriter, but could have the same issues when using Jdbc batch mode. Batching database output doesn't have any inherent flaws, assuming we are careful to flush and there are no errors in the data. However, any errors while writing out can cause confusion because there is no way to know which individual item caused an exception, or even if any individual item was responsible, as illustrated below:

因为平面文件和XML均有具体的ItemWriters,但是对数据库却没有完全的等价接口。这是因为事务提供了需要的所有功能。对文件而言,ItemWriters必不可少,因为它们运行时必须具有事务性(transactional),对写入项目进行跟踪,并在合适的时候执行清洗或清除操作。数据库不需要这一功能,因为事务中已经包含了写入(操作)。用户可以自己创建部署ItemWriter接口的DAO,或者从为了通用处理而编写的定制ItemWriter中选择一个,无论哪种情况,它们均可以正常运行。此时需要注意的一点就是批量输出时提供的性能和差错处理能力。在将hibernate作为ItemWriter使用时这一情况非常普遍,但是可能会与使用JDBC批量模式遇到相同的问题。如果我们对清洗操作非常仔细,并且数据中没有错误,则数据库批量输出本身并不会有问题。然而,如果写出时出现错误,则将导致困惑,因为即使是某个项目引发了差错,也没有办法弄清到底是哪个项目导致了异常,请见下文:

If items are buffered before being written out, any errors encountered will not be thrown until the buffer is flushed just before a commit. For example, let's assume that 20 items will be written per chunk, and the 15th item throws a DataIntegrityViolationException. As far as the Step is concerned, all 20 item will be written out successfully, since there's no way to know that an error will occur until they are actually written out. Once Session#flush() is called, the buffer will be emptied and the exception will be hit. At this point, there's nothing the Step can do, the transaction must be rolled back. Normally, this exception might cause the Item to be skipped (depending upon the skip/retry policies), and then it won't be written out again. However, in the batched scenario, there's no way for it to know which item caused the issue, the whole buffer was being written out when the failure happened. The only way to solve this issue is to flush after each item:

如果项目在写出前被缓存,则遇到的错误将不会被抛出,直到缓存在提交前被清洗。例如,我们假设每个块(chunk)有20个待写项目,第15个项目抛出DataIntegrityViolationException。就Step来说,所有20个项目将被成功写出,因为直到它们被写出也没有办法知道出现错误。当Session#flush()被调用时,缓存将被清空并出现异常。此时,Step束手无策,事务必须被回转(重新运行)。正常情况下,该异常可能导致项目被跳转(取决于跳转/重试策略),并且不会被再次写出。然而,在批处理情况下,它没有办法知道到底是哪个项目导致了问题,当出现故障时整个缓存区将被写出。解决这一问题的唯一方法就是每个项目后进行清洗:

This is a common use case, especially when using Hibernate, and the simple guideline for implementations of ItemWriter, is to flush on each call to write(). Doing so allows for items to be skipped reliably, with Spring Batch taking care internally of the granularity of the calls to ItemWriterafter an error.

有一种常见的应用场景,尤其是在使用Hibernate时,ItemWriter部署的指导原则就是每次调用write()时进行清洗。这样做可以提高项目被跳转的可靠性,Spring Batch内部已经考虑了出现错误后调用ItemWriter的频率(granularity)问题。

6.10. Reusing Existing Services

Batch systems are often used in conjunction with other application styles. The most common is an online system, but it may also support integration or even a thick client application by moving necessary bulk data that each application style uses. For this reason, it is common that many users want to reuse existing DAOs or other services within their batch jobs. The Spring container itself makes this fairly easy by allowing any necessary class to be injected. However, there may be cases where the existing service needs to act as an ItemReader or ItemWriter, either to satisfy the dependency of another Spring Batch class, or because it truly is the main ItemReader for a step. It is fairly trivial to write an adaptor class for each service that needs wrapping, but because it is such a common concern, Spring Batch provides implementations: ItemReaderAdapterand ItemWriterAdapter. Both classes implement the standard Spring method invoking the delegate pattern and are fairly simple to set up. Below is an example of the reader:

批处理系统经常与其他应用联合使用。最普遍的情况就是在线系统,但是通过移动每种应用所必需的批数据,它也可以为集成应用或胖客户端应用提供支持。为此,许多用户想在他们的批作业中重用当前DAO或其他服务。Spring容器通过允许注入必需的类,对此进行了大幅简化。然而,有些情况下当前服务需要作为ItemReader或ItemWriter运行,以满足其他Spring Batch类的相关性(dependency),或者因为它实际上是Step的主ItemReader。为需要wrapping(包装)的每个服务编写一个转换器实在太不值得,但是因为这个问题太过普遍,所以Spring Batch提供了如下两个类的部署:ItemReaderAdapter 和ItemWriterAdapter。这两个类部署了可以调用委托模式的标准Spring方法,配置过程也非常简单。下面是一个读取器示例:

 

<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="generateFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
 One important point to note is that the contract of the targetMethod must be the same as the contract for read: when exhausted it will return null, otherwise an Object. Anything else will prevent the framework from knowing when processing should end, either causing an infinite loop or incorrect failure, depending upon the implementation of the ItemWriter. The ItemWriter implementation is equally as simple:

 

有一点必须要指出:targetMethod的原则(contract)必须与read的原则(contract)相同:当耗尽(exhaust)时它将返回null,否则返回一个对象。其他各种情况将使框架无法知晓该何时结束处理过程,此时要么导致无限循环,要么导致错误故障,具体取决于ItemWriter的部署。ItemWriter的部署也非常简单:

 

<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="processFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
 

 

6.11. Validating Input

During the course of this chapter, multiple approaches to parsing input have been discussed. Each major implementation will throw an exception if it is not 'well-formed'. The FixedLengthTokenizer will throw an exception if a range of data is missing. Similarly, attempting to access an index in a RowMapper of FieldSetMapper that doesn't exist or is in a different format than the one expected will cause an exception to be thrown. All of these types of exceptions will be thrown before read returns. However, they don't address the issue of whether or not the returned item is valid. For example, if one of the fields is an age, it obviously cannot be negative. It will parse correctly, because it existed and is a number, but it won't cause an exception. Since there are already a plethora of Validation frameworks, Spring Batch does not attempt to provide yet another, but rather provides a very simple interface that can be implemented by any number of frameworks:

本章已经讨论了多种解析输入方法。每种部署如果“不够良好”,便会抛出异常。如果数据范围丢失,则FixedLengthTokenizer会抛出异常。类似地,如果试图访问FieldSetMapper中RowMapper的一个索引但它并不存在或者与预期的格式不同,则也会导致异常。所有这些异常均是在read返回前抛出。然而,它们并没有解决返回的项目到底是否合法这一问题。例如,如果一个字段表示年龄,则很明显它不能为负。如果它存在并且确实是一个数据,则它将被正确解析,但是它不会抛出异常。因为已经存在过多的验证框架,Spring Batch不会再提供一种验证框架,但是它将提供一种简单的接口,可以部署于任意数量的框架上。

 

public interface Validator {
  
    void validate(Object value) throws ValidationException;

}
 The contract is that the validate method will throw an exception if the object is invalid, and return normally if it is valid. Spring Batch provides an out of the box ItemProcessor:

 

原则是:如果对象非法,则validate方法抛出一个异常,如果合法则正常返回。Spring Batch提供一个立即可用的接口ItemProcessor:

<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator"
      class="org.springframework.batch.item.validator.SpringValidator">
    <property name="validator">
        <bean id="orderValidator"
              class="org.springmodules.validation.valang.ValangValidator">
            <property name="valang">
                <value>
                    <![CDATA[
           { orderId : ? > 0 AND ? <= 9999999999 : 'Incorrect order ID' : 'error.order.id' }
           { totalLines : ? = size(lineItems) : 'Bad count of order lines' 
                                              : 'error.order.lines.badcount'}
           { customer.registered : customer.businessCustomer = FALSE OR ? = TRUE 
                                 : 'Business customer must be registered' 
                                 : 'error.customer.registration'}
           { customer.companyName : customer.businessCustomer = FALSE OR ? HAS TEXT 
                                  : 'Company name for business customer is mandatory' 
                                  :'error.customer.companyname'}
                    ]]>
                </value>
            </property>
        </bean>
    </property>
</bean>

 This simple example shows a simple ValangValidator that is used to validate an order object. The intent is not to show Valang functionality as much as to show how a validator could be added.

在该例子中,ValangValidator被用于验证order对象。目的不是为了像演示如何添加验证器那样演示Valang的功能。

6.12. Preventing State Persistence

By default, all of the ItemReader and ItemWriter implementations store their current state in the ExecutionContext before it is committed. However, this may not always be the desired behavior. For example, many developers choose to make their database readers 'rerunnable' by using a process indicator. An extra column is added to the input data to indicate whether or not it has been processed. When a particular record is being read (or written out) the processed flag is flipped from false to true. The SQL statement can then contain an extra statement in the where clause, such as "where PROCESSED_IND = false", thereby ensuring that only unprocessed records will be returned in the case of a restart. In this scenario, it is preferable to not store any state, such as the current row number, since it will be irrelevant upon restart. For this reason, all readers and writers include the 'saveState' property:

默认情况下,ItemReader和ItemWriter的所有部署在提交前把它们的当前状态存储在ExecutionContext中。然而,我们有时候并不希望如此。例如,许多开发人员使用进程指示器(process indicator)来使它们的数据库读取器“可重新运行”。输入数据被额外增加了一列,以表明它是否已经被处理过。当某条记录正被读取(或写出)时,处理标记将从false转为true。然后,SQL语句可以在where子句中多包括一条语句,比如"where PROCESSED_IND = false",以确保在重启时只有未被处理的记录被返回。此时,任何状态,比如当前行号,最好都不要存储,因为它与重启无关。为此,所有的读取器和写出器均包括'saveState'属性:

<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
    </property>
    <property name="saveState" value="false" />
    <property name="sql">
        <value>
            SELECT games.player_id, games.year_no, SUM(COMPLETES),
            SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
            SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
            SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
            from games, players where players.player_id =
            games.player_id group by games.player_id, games.year_no
        </value>
    </property>
</bean>

 The ItemReader configured above will not make any entries in the ExecutionContext for any executions in which it participates.

上面配置的ItemReader不会为它所参与的各个执行过程生成ExecutionContext条目。

6.13. Creating Custom ItemReaders and ItemWriters

 

So far in this chapter the basic contracts that exist for reading and writing in Spring Batch and some common implementations have been discussed. However, these are all fairly generic, and there are many potential scenarios that may not be covered by out of the box implementations. This section will show, using a simple example, how to create a custom ItemReader and ItemWriter implementation and implement their contracts correctly. The ItemReader will also implement ItemStream, in order to illustrate how to make a reader or writer restartable.

本章迄今已经讨论了Spring Batch读取和写入基本原则及常见部署。这些内容的通用性很高,但仍有部分情况没有被立即可用的接口部署覆盖到。本节将结合一个简单示例,讨论如何创建一个定制型ItemReader和ItemWriter部署以及如何正确部署它们的原则(contract)。ItemReader也将部署ItemStream,以阐述如何使一个读取器或写入器可被重启。

6.13.1. Custom ItemReader Example

For the purpose of this example, a simple ItemReader implementation that reads from a provided list will be created. We'll start out by implementing the most basic contract of ItemReaderread:

在该例子中,将创建一个简单的ItemReader部署,以从提供的列表中读取数据。我们从部署ItemReader最基本的原则也就是部署read开始:

public class CustomItemReader<T> implements ItemReader<T>{

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NoWorkFoundException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

 This very simple class takes a list of items, and returns them one at a time, removing each from the list. When the list is empty, it returns null, thus satisfying the most basic requirements of an ItemReader, as illustrated below:

这个类以项目列表作为输入,并以一个一次的速度返回项目,并从列表中去除。当列表为空时,它将返回null,因此满足了ItemReader的最基本要求,具体内容如下:

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<String>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

 

6.13.1.1. Making the ItemReader Restartable

The final challenge now is to make the ItemReader restartable. Currently, if the power goes out, and processing begins again, the ItemReader must start at the beginning. This is actually valid in many scenarios, but it is sometimes preferable that a batch job starts where it left off. The key discriminant is often whether the reader is stateful or stateless. A stateless reader does not need to worry about restartability, but a stateful one has to try and reconstitute its last known state on restart. For this reason, we recommend that you keep custom readers stateless if possible, so you don't have to worry about restartability.

If you do need to store state, then the ItemStream interface should be used:

现在最后一个问题就是使ItemReader可重启。当前,如果电力中断,处理过程再次开始,则ItemReader必须要从头开始。在许多情况下,这是合法的,但有时候更希望批作业接着离开的地方开始。主要区别就是读取器是有状态还是无状态。无状态读取器不需担心其可重启性,但是有状态读取器必须要在重启时尝试重建上次最近的已知状态。为此,我们建议你在可能的情况下将定制型读取器保持为无状态,这样便不需要担心其可重启性。

如果你需要恢复状态,可以使用ItemStream接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }
      
        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey(CURRENT_INDEX)){
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else{
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

 On each call to the ItemStream update method, the current index of the ItemReader will be stored in the provided ExecutionContext with a key of 'current.index'. When the ItemStream open method is called, the ExecutionContext is checked to see if it contains an entry with that key. If the key is found, then the current index is moved to that location. This is a fairly trivial example, but it still meets the general contract:

每次调用ItemStream update方法时,ItemReader的当前索引将被存储于提供的键值为'current.index'的ExecutionContext中。当调用ItemStream open方法时,ExecutionContext将被检查,以确认其是否包含一个带有那个键的条目。如果找到该键,则当前索引被移到那个位置。这个例子虽然简单,但是仍然满足了总体原则:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<String>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

 Most ItemReaders have much more sophisticated restart logic. The JdbcCursorItemReader, for example, stores the row id of the last processed row in the Cursor.

It is also worth noting that the key used within the ExecutionContext should not be trivial. That is because the same ExecutionContext is used for allItemStreams within a Step. In most cases, simply prepending the key with the class name should be enough to guarantee uniqueness. However, in the rare cases where two of the same type of ItemStream are used in the same step (which can happen if two files are need for output) then a more unique name will be needed. For this reason, many of the Spring Batch ItemReader and ItemWriter implementations have a setName() property that allows this key name to be overridden.

许多ItemReaders的重启逻辑更为复杂。例如,JdbcCursorItemReader在Cursor中存储了最近被处理的行的ID。

请注意,ExecutionContext中使用的键不应被忽视。这是因为,step内的所有ItemStream使用了相同的ExecutionContext。在大多数情况下,将类名加在键值前足以保证唯一性。然而,在少部分情况下,相同的step使用了两个相同类型的ItemStream(如果输出需要两个文件,则有可能发生这种情况),则需要更加个性的名称。为此,许多Spring Batch ItemReader和ItemWriter部署都有一个setName()属性,允许该键名被覆盖。

6.13.2. Custom ItemWriter Example

Implementing a Custom ItemWriter is similar in many ways to the ItemReader example above, but differs in enough ways as to warrant its own example. However, adding restartability is essentially the same, so it won't be covered in this example. As with the ItemReader example, a Listwill be used in order to keep the example as simple as possible:

部署定制型(Custom) ItemWriter与上文ItemReader既有许多相似之处,也有很大本质性差异。但是,添加可重启性是相同的,所以在该例子中没有涉及。与ItemReader例子类似,将使用一个List以使例子尽量简单:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(List<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

 

6.13.2.1. Making the ItemWriter Restartable

To make the ItemWriter restartable we would follow the same process as for the ItemReader, adding and implementing the ItemStream interface to synchronize the execution context. In the example we might have to count the number of items processed and add that as a footer record. If we needed to do that, we could implement ItemStream in our ItemWriter so that the counter was reconstituted from the execution context if the stream was re-opened.

In many realistic cases, custom ItemWriters also delegate to another writer that itself is restartable (e.g. when writing to a file), or else it writes to a transactional resource so doesn't need to be restartable because it is stateless. When you have a stateful writer you should probably also be sure to implement ItemStream as well as ItemWriter. Remember also that the client of the writer needs to be aware of the ItemStream, so you may need to register it as a stream in the configuration xml.

为了使ItemWriter具有可重启性,我们应该遵循与ItemReader相同的步骤,添加并部署ItemStream接口以使执行上下文同步。在例子中,我们可能需要计算被处理项目的数量,并将其添加为一个脚注记录。如果确实需要,我们可以在ItemWriter中部署ItemStream,于是当stream被重启时可以从执行上下文重建计数器。

在许多现实情况中,定制型ItemWriters委派给另一个可重启写入器(比如当写入一个文件时),或者向事务性资源写入数据,这样一来,它便为无状态接口,不再需要可重启性。如果你有一个有状态写入器,你也许应该部署ItemStream及ItemWriter。请注意,写入器的客户端需要知道ItemStream,因此你可能需要在xml配置中将其注册为一个stream。

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics