接上篇:Apache Commons Pipeline 使用学习(一)
stage生命周期
当pipeline组装、运行的时候,每个stage通常是在它自己的线程中运行(pipeline的所有线程由同一个JVM实例所拥有)。这种多线程的方法在多处理器系统中有处理上的优势。对于给定的stage,各种stage的方法按照顺序运行: init(), preprocess(), process(), postprocess() and release()。然而,stage之间各方法开始和完成的顺序是不确定性。换言之,在具有多个stage的pipeline不能指望任何特定stage的preprocess()方法开始或者完成在另一stage的任何方法之后。如果你的stage之间有依赖关系,请参阅下面部分讨论的stage之前的Events和Listeners。
pipeline中stage的顺序由配置文件来确定。配置文件通过Digester定义,这是一个XML文件,其中会列出使用的stage和初始化参数。每个stage被添加到pipeline中,并执行其init()方法。当所有stage都被装入到pipeline中,pipeline被设置为开始运行。调用的各个stage的preprocess()方法。当使用DedicatedThreadStageDriver每个stage在它自己的线程中运行,并且preprocess()方法被异步运行。
当pipeline的第一个stage的preprocess方法完成,将在由关联的stage driver传入的数据对象上开始运行process()方法。当第一stage完成处理后,数据对象将传递到下一个stage。如果此时下一stage没有完成自己的preprocess()方法,传递的数据对象将会在第二stage的stage driver中排队。当所有的初始化的对象被第一个stage的process()完成之后,将调用postprocess()方法。当postprocess()方法完成后,STOP_REQUESTED信号被发送到下一个stage,以表明没有更多的对象进入pipeline。下一stage将处理队列中的对象,然后调用它自己的postprocess()方法。这中处理完队列中的数据然后调用postprocess方法向下传播。每个stage完成 postprocess方法后,运行它的release()方法()。init()和release()不依赖自己stage之外的任何东西。
每个stage在处理过程中发生异常时可以配置为停止或继续。stage在preprocess(), process(), or postprocess()中可能抛出一个StageException()。如果配置为继续运行,stage将处理下一个数据对象。如果配置停止,stage将结束处理,并且任何后续process() 、postprocess() 方法将不会被调用。release()方法总是被调用,因为它写在stage处理代码try-catch结构中的finally块中。
stage之间的通信
stage彼此通信有两种主要机制。为了保持数据流和“管道(Pipeline)”的比喻,两种都是发送消息到“下游”到后续stage。
- 正常EMIT()到下一stage(的队列) - 有序的传递数据对象。这些对象通常实现为Java bean,并且有时被称为"data beans".
- 事件和监听器 - 通常传递控制或stage之间同步元数据。使用此机制时,在Pipeline中较晚的stage需要的信息只能由较早的stage提供,不属于数据bean提供。
作为事件和监听器的例子,假设你有一个从数据库表中读取数据的stage,而后面的stage将数据写入到另一个数据库。读取该表的stage需要将表的布局信息传递给写表操作的stage,这样当目标表不存在时,写表的stage可以通过事件中的信息创建一个表。TableReader.preprocess()方法触发一个事件,并且携带表的布局数据。TableWriter stage的 preprocess()方法设置为侦听表事件,并且等待该事件发生之后,才处理数据,这样的TableWriter不会处理对象,直到目标表已准备就绪。
三、使用Digester 配置 Pipeline
现在是时候展示的Pipeline的配置文件,当使用Digester时为XML格式。
例子一:
下面是一个展示基本结构的例子。这个Pipeline有三个stage,一个环境变量。示例代码如下。
<?xml version="1.0" encoding="UTF-8"?> <!-- Document : configMyPipeline.xml Description: An example Pipeline configuration file --> <pipeline> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df0"/> <!-- The <env> element can be used to add global environment variable values to the pipeline. In this instance almost all of the stages need a key to tell them what type of data to process. --> <env> <value key="dataType">STLD</value> </env> <!-- The initial stage traverses a directory so that it can feed the filenames of the files to be processed to the subsequent stages. The directory path to be traversed is in the feed block following this stage. The filePattern in the stage block is the pattern to look for within that directory. --> <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="df0" filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/> <feed> <value>/mnt/data2/gdsg/sst/npr</value> </feed> <stage className="gov.noaa.eds.example.Stage2" driverFactoryId="df0" /> <!-- Write the data from the SstFileReader stage into the Rich Inventory database. --> <stage className="gov.noaa.eds.sst2ri.SstWriterRI" driverFactoryId="df0"/> </pipeline>
下面是上面例子的总结:
<?xml version="1.0" encoding="UTF-8"?>
这些pipeline的配置文件总是以这个XML声明开始。
<pipeline>...</pipeline>
顶级元素是<pipeline>包围其余部分。
<driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df0"/>
设置一个StageDriverFactory来输入和控制stage。stage被DedicatedThreadStageDriver控制,它从一个名为“DF0”的工厂中获得。
<env> <value key="dataType">STLD</value> </env>
设置一个名为“dataType”的常量,各个stage都可以访问“STLD”数据并且运行中使用。如果有分支,环境常量是局部的,他们只是在它们所在分支中有效,分支之间不共享。但是,你可以定义相同的环境在不同分支。
<stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="df0" filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/>
定义stage,FileFinderStage将为下一stage的处理选择文件。本例中有一个“filePattern”的参数限制了传递到下一stage的文件。仅仅匹配到给定的正则表达式的文件会被使用。注意,“driverFactoryId”是“DF0”,它匹配给先前在此文件中的driverFactory元素的名称。
<feed> <value>/mnt/data2/gdsg/sst/npr</value> </feed>
<feed> 中的值用于第一个stage的初始数据。在这个例子中,FileFinderStage期望获取的文件,至少是这些开始的目录。注意, <feed>必须配置在的pipeline中的第一个stage之后。在stage创建时,如果之前没有任何stage,feed的值将被舍弃。
例二:
第二个示例显示了两个stage的最小的pipeline。第一个stage是FileFinderStage,它从起始目录“"/data/sample" 中读取的文件名和匹配任何已“HelloWorld”开头的文件。第二个status是LogStage,它在通常用在调试过程中。 LogStage调用输入对象的toString方法,然后写到日志文件,然后传递它所接收到对象到下一个stage,因此很容易在任意两个stage之间使用,在不改变它们之间传递的对象的情况下记录日志文件。
对应上图,配置文件有一些彩色文本,使其更容易匹配到的图像中的对象。
<?xml version="1.0" encoding="UTF-8"?> <!-- Document : configSimplePipeline.xml Description: A sample configuration file for a very simple pipeline --> <pipeline> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="driverFactory"/> <!-- ((1)) The first stage recursively searches the directory given in the feed statement. The filePattern given will match any files beginning with "HelloWorld". --> <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="driverFactory" filePattern="HelloWorld.*"/><!-- ((3)) --> <!-- Starting directory for the first stage. --> <feed> <value>/data/sample</value> <!-- ((4)) --> </feed> <!-- ((2)) Report the files found. --> <stage className="org.apache.commons.pipeline.stage.LogStage" driverFactoryId="driverFactory" /> </pipeline>
一个driver factory 服务两个stage。driver factory ID是“driverFactory”,并且这个值被用于两个stage上
理论上,pipeline可以仅仅有一个stage,但是这中退化的情况与普通的程序没有什么不同,只是它可以方便的扩展为多个stage。
例三:
带颜色的配置文件如下:
<?xml version="1.0" encoding="UTF-8"?> <!-- Document : branchingPipeline.xml Description: Configuration file for a pipeline that takes user provided files as input, and from that both generates HTML files and puts data into a database. --> <pipeline> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df0"/> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df1"> <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory" capacity="4" fair="false"/> </driverFactory> <!-- The <env> element can be used to add global environment variable values to the pipeline. In this instance almost all of the stages need a key to tell them what type of data to process. --> <env> <value key="division">West</value> <!-- ((9)) --> </env> <!-- ((1)) The initial stage traverses a directory so that it can feed the filenames of of the files to be processed to the subsequent stages. The directory path to be traversed is in the feed block at the end of this file. The filePattern in the stage block is the pattern to look for within that directory. --> <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="df0" filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/><!-- ((8)) --><feed> <value>/data/INPUT/raw</value> <!-- ((7)), ((11)) --> </feed> <!-- ((2)) This stage is going to select a subset of the files from the previous stage and orders them for time sequential processing using the date embedded in the last several characters of the file name. The filesToProcess is the number of files to emit to the next stage, before terminating processing. Zero (0) has the special meaning that ALL available files should be processed. --> <stage className="com.demo.pipeline.stages.FileSorterStage" driverFactoryId="df1" filesToProcess="0"/> <!-- ((3)) Read the files and create the objects to be passed to stage that writes to the database and to the stage that writes the data to HTML files. WARNING: The value for htmlPipelineKey in the stage declaration here must exactly match the branch pipeline key further down in this file. --> <stage className="com.demo.pipeline.stages.FileReaderStage" driverFactoryId="df1" htmlPipelineKey="sales2html"/> <!-- ((4)) Write the data from the FileReaderStage stage into the database. --> <stage className="com.demo.pipeline.stages.DatabaseWriterStage" driverFactoryId="df1"> <datasource user="test" password="abc123" type="oracle" host="brain.demo.com" port="1521" database="SALES" /> <database-proxy className="gov.noaa.gdsg.sql.oracle.OracleDatabaseProxy" /> <tablePath path="summary.inventory" /> <!-- ((13)) --> </stage> <!-- Write the data from the FileReaderStage stage to HTML files. The outputFilePath is the path to which we will be writing our summary HTML files. WARNING: The value for the branch pipeline key declaration here must exactly match the htmlPipelineKey in the FileReaderStage stage in this file. --> <branch> <pipeline key="sales2html"> <!-- ((10)) --><env> <value key="division">West</value> <!-- ((14)) --> </env> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df2"> <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory" capacity="4" fair="false"/> </driverFactory> <!-- ((5)) HTMLWriterStage --> <stage className="com.demo.pipeline.stages.HTMLWriterStage" driverFactoryId="df2" outputFilePath="/data/OUTPUT/web"/> <!-- ((12)) --> <!-- ((6)) StatPlotterStage --> <stage className="com.demo.pipeline.stages.StatPlotterStage" driverFactoryId="df2" outputFilePath="/data/OUTPUT/web"/> <!-- ((12)) --></pipeline> </branch> </pipeline>
注:在这个例子中配置为“West” 的常量“division”,定义在两个地方。在主pipeline和分支pipeline都是相同的值。这是因为分支不共享相同的环境常数。
该driverFactories“DF1”和“DF2”通过指定ArrayBlockingQueueFactory覆盖默认queueFactory。设置容量为4个对象,这样做是为了限制使用DF1或DF2的stage的队列大小。这通常是要限制pipeline使用的资源,并且是必要的,防止无界队列使用了所有可用的java存储或超过了所允许打开的文件句柄的数量。创建队列之后的队列大小不能被改变。因为只有一个线程正在访问的队列,公平属性可以被设置为“false”。如果公平=“true”,则有额外的开销,以确保访问队列中的所有线程的顺序处理(FIFO)。
相关推荐
例如,以下代码展示了如何使用Apache Commons Digester创建一个简单的Java对象: ```java import org.apache.commons.digester.Digester; import org.apache.commons.digester.ObjectCreateRule; import org.apache...
Commons Collections是Apache软件基金会开发的一个Java库,主要提供对集合框架的增强和扩展。这个库是Java标准集合接口的补充,增加了许多实用的功能,提高了代码的可读性和效率。"commons-collections4-4.1.jar"是...
Java Pipeline Step Chain like Apache Commons Chain and Commons Pipeline.A popular technique for organizing the execution of complex processing flows is the "Chain of Responsibility" pattern
这里提到的`commons-pool2-2.4.2-bin.zip`就是Apache Commons Pool 2库,它是一个通用的对象池服务,用于创建和维护Jedis实例。Jedis通过这个库可以实现高效的连接复用,降低系统资源消耗。 9. **订阅/发布机制**:...
类似于Commons Chain和Commons Pipeline这样的Java Pipeline Step Chain用于组织复杂处理流程执行的流行技术。支持通用业务job、services子流程无限制拆分。支持业务子流程串行化、业务子流程并行化,可配置化。...
另一个关键的jar包是`commons-pool2-2.4.2.jar`,这是Apache Commons Pool的第二版,一个对象池设计,用于管理和复用特定类型对象,如数据库连接、网络连接等。在与Redis整合时,我们通常会使用它来管理Jedis实例,...
在本场景中,我们关注的是Redis的Java客户端库Jedis以及连接池管理库Apache Commons Pool 2,具体版本为Jedis 2.9.0和Commons Pool 2.2。 **Jedis 2.9.0** Jedis是Java开发人员广泛使用的Redis客户端,它提供了丰富...
7. **性能优化与最佳实践**: 在使用Jedis时,了解一些最佳实践是必要的,比如合理设置连接池参数,使用Pipeline或Transaction批量处理命令以减少网络通信,以及利用Redis的持久化和复制功能来提升系统可靠性。...
cidc_ngs_pipeline_api-0.1.10.tar.gz 是一个在Python生态系统中的软件包,它可以从...通过理解和使用cidc_ngs_pipeline_api,开发者和研究人员能够更好地整合和分析NGS数据,从而加速对癌症治疗的理解和新疗法的发现。
本篇将深入探讨`redis java 客户端`,特别是Jedis的使用以及与其相关的`commons-pool-1.5.6.jar`。 **Jedis** Jedis是Java语言编写的一个Redis客户端,它提供了丰富的API,可以方便地执行Redis命令,包括字符串、...
在Java环境下开发与Redis交互的应用时,常常需要依赖特定的Java库,如Jedis和Apache Commons Pool。下面将详细阐述这两个关键组件以及它们在Redis开发中的作用。 Jedis是Java语言的Redis客户端,版本2.7.2是其中的...
jedis的性能优化主要依赖于对象池的配置,jedis使用Apache的commons-pool实现对象池。可以通过配置对象池的参数来优化jedis的性能。同时,jedis还提供了一些扩展包,可以在特定场景下增强jedis的能力。
描述中提到的"commons-pool2-2.0.jar"是Apache Commons Pool 2.0的jar包,这是一个对象池库,主要用于管理资源,比如数据库连接或线程。在Jedis中,它被用来实现Redis连接池,以提高应用程序的效率和性能,避免频繁...
3. `commons-pool2-2.4.3.jar`:这是一个Apache Commons Pool库的版本,它是Jedis依赖的连接池组件,用于管理Redis连接,提高性能和资源利用率。 **Redis与Jedis** Redis是一个高性能的键值存储系统,常用于数据...
Java作为一种强大的编程语言,提供了多种方式来实现数据管道,如使用流(Stream)API、Apache Commons Lang的Pipeline工具等。 首先,我们来了解一下Java的流(Stream)API。自Java 8引入以来,流API已经成为处理...
`commons-pool2-2.3.jar`是Apache Commons Pool库的版本2.3,这是一个通用的对象池服务,Jedis为了提高性能和资源管理,会利用这个库实现连接池。对象池允许重复使用已创建的对象,避免频繁创建和销毁带来的开销,这...
2. **源码结构**:Tomcat的源代码主要分为几个关键模块,包括Catalina(核心处理引擎)、Commons、NIO(非阻塞I/O)、apr(Apache Portable Runtime,用于提高性能)、Cluster(集群支持)等。 3. **Catalina模块**...
1. **commons-pool-1.6.jar**:这是Apache Commons Pool库的1.6版本,它是一个通用的对象池服务。在Jedis中,它用于管理Redis连接的池化,避免频繁地创建和销毁连接,提高性能。对象池允许程序预先创建一定数量的...
7. **第三方库的使用**:Java社区中有许多库(如Apache Commons IO、Guava等)提供了丰富的工具和类来简化流操作,学习如何利用这些库提升代码质量。 通过这个“pipeline-demo”项目,开发者可以深入理解数据处理的...
为了提高性能和资源管理,Jedis使用Apache Commons Pool进行对象池配置。你需要根据实际需求调整池的大小、超时时间等参数。例如: ```java GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig...