`

Apache Commons Pipeline 使用学习(一)

阅读更多

本系列源于对commons-pipeline 使用的学习:

 

首先是:翻译官方文档

 

本文是针对使用 Apache Commons Pipeline工作流程框架的基础介绍,本文档目标读者为需要组装现有stages或编写自己的stages的开发人员。该项目提供了一个Java类库旨在提升使用和重用模块化的stage的易用性。

 

一、Pipeline结构:

  • Stages:stages在Pipeline中代表来处理数据所需的逻辑单元。每一个stage代表一个高层次的处理概念:如查找文件,读取文件格式,从数据计算产品,或者将数据写入到数据库。使用工作流架构并且构建处理单元到Stages的主要优点是提升Stages在其他Pipeline的可重用性。


 

  • Pipeline:一个Pipeline由stages构建而成,这些stages可以将数据传递给后续的stages。上图中箭头所标记的“EMIT”表明一个stage的数据输出被传递到下一个stage。从代码层面看,有一个 EMIT() 方法将数据发送到下一个stage。数据流开始于左侧,在那里有一个标为“FEED”的箭头。FEED通常通过一个配置文件开始一个Pipeline,这点在后面讨论。Stage自己不关心输入的数据是来自FEED或前一阶段的EMIT()。

      Pipeline也可发送相同或不同的数据到不同的分支(branch),使数据沿不同的处理路线流转。



 

  • 通过Digester 或者 Spring 配置:有两种方法来配置Pipeline,都是是基于XML的控制文件。其中更简单的方法是使用Digester,Pipeline的最终用户可以修改该配置来满足自己的需求。 Spring框架也被用于配置Pipeline,但是由于他的结构与java编程对象更加接近,使得配置更复杂,同时也更灵活。通过XML配置文件的顺序来控制stage的顺序,以及stage的特殊参数都在该文件中进行设置。这些控制文件还允许以环境遍历的形式设置对所有stage可见全部参数,这种配置的方式使得完全不需要重新编译Java代码即可实现很多改变Pipeline布局和行为的需求。

      本教程将介绍使用Digester配置的pipelines,因为这是比较简单的方法。

 

 二、Stages使用

     一个标准的stage都有一个队列来缓存输入的数据对象。当某些stage比其他stage具有不同吞吐量或不规则的处理速度时,排队是一种有效的处理手段,特别是那些数据依赖于网络连接或近线媒体的stage。这个队列并不是stage本身的一部分,而是由stage driver来管理,stage driver负责当数据准备好的时候,将其送入stage。stage将一个数据对象传给下一级stage的时候,它可能在一个队列中排队等待(按顺序接收),直到下一个stage准备好来处理它。通常情况下,每个stage运行一个单独的处理线程。当然,对于一些应用也可以配置pipelines,对于同一个对象一次处理的所以stage运行一个单一的线程,也就是说,除非前一个对象以及完成了所有stage。下一个对象不会开始处理。

     Stages都继承自抽象类(org.apache.commons.pipeline.stage.BaseStage),有许多现有的stage可以直接使用,以满足各种加工要求。你还可以通过扩展BaseStage或其他现有的stage之一创建自定义的stage。

     下图展示了各种类性和数量。


 

上图中stage说明:

  • 通常所有进入一个stage的这些对象是同一类型的。避免在stage的代码中重复的写switch语句去区分对象,可以通过使用分支来隔离不同的对象类型。
  • 一个数据对象输入stage并不总是产生一个输出对象。
  1.  终端stage不传递(EMIT)任何数据对象。应当避免创建这种类型的阶段,因为他们限制了你建造pipelines时的可能性。 (这很容易做到,将数据传递到下一个阶段只需要一行代码。)
  2. 传递(EMIT)对象到一个以上的后续stage的stage被称为分支stage。
  3. 如果stages传递的对象与他们解释的对象为同一类型的对象,仅仅是做选择条件过滤,被称为过滤stage。
  4. 通常都有读取数据的stage和写数据的stage,他们将数据读入pipeline或者pipeline作为的输出。
  5. 创造一个与传递给他们的不同的对象的stage被称为转换stage。
  • 传递的(emitted)对象类型没有必要保持一致。
  • 当碰到分支的时候,进入到不同stage中的对象不必是相同类型的,或有相同的量。注意上图中的“FileReader”stage对于每个到来的文件产生100单元的数据对象,但是通过只有一个边界形状被传递给给了分支。

其他注意事项(不一定是从上面的图中很明显):

  • 虽然数据被传递道stage是Java对象的方式,但是stage接受到他们往往期待是一个更具体的数据类型,如文件或数据记录。通常对接受的对象进行检查,看它们是否是期望的数据类型的实例,然后在具体工作之前转换为该类性的对象。
  • 您可以为你的pipeline的每个stage设置 stage driver。他们可以限制队列的大小来控制内存和资源使用。对于有界队列来说,上游stage将阻塞并等待,直到下游stage的队列有足够的空间。

StageDriver的作用

      StageDriver是一个Java接口,他控制将数据提交(feeding)给stage和不同stages之间的通信。因此,stage的生命周期和不同stage之间的相互作用是非常依赖于通过这些stage drivers来控制。这些StageDriver的工厂实现StageDriverFactory接口。在pipeline初始化的时候,StageDrivers是由产生特定类型的StageDriver的工厂类提供。每个stage都会有其自己的 StageDriver的实例,并在pipeline内的不同stage可以使用不同类型的StageDrivers,虽然常见的情况是pipeline中所有stage使用相同类型StageDriver(全部共享相同的StageDriverFactory实现)。

     下面是一些通用的StageDriver

DedicatedThreadStageDriver 为每一个stage生成一个单独的线程,DedicatedThreadStageDriverFactory()提供
SynchronousStageDriver 这个是非线程的StageDriverSynchronousStageDriverFactory()提供
ThreadPoolStageDriver 使用一个线程池处理输入的对象,ThreadPoolStageDriverFactory()提供

 本教程将介绍DedicatedThreadStageDriver,因为这是一个很好的通用驱动程序。某些时候你肯希望编写自己的StageDriver实现,这里没有涉及到一个高级的主题。

 

Stage内部解析

     如果你需要编写自己的stage,本节给出了一些为了实现Stage接口你需要了解的方法的概述。

     stage本身定义org.apache.commons.pipeline.Stage接口,它具有下列方法:

Stage Interface Methods
 
init(StageContext)  关联stage和environment。在生命周期中运行一次。
preprocess()  做任何必要的设置。在生命周期中运行一次。
process(Object)  处理数据对象和传递结果到下一stage。跑N次,为每一个传递到其中的对象运行一次。
postprocess()  处理汇总数据等,在生命周期中运行一次。
release()  清理该stage持有的资源。在生命周期中运行一次。

      一个可用的抽象类为org.apache.commons.pipeline.BaseStage,许多其它的stage都是从来衍生出来的。你可以扩展这个类或者在BaseStage之上建造的其他stage。它提供了stage接口的所有方法的无操作实现。您可以按照需求重写这些方法。对于简单的处理,你可能并不需要重写init(StageContext),postprocess(), nor release()方法 。你几乎只需要提供自己的process(Object)方法。从软件设计的角度来看,认为反转控制的,因为你不是在写一个自定义的主程序来调用标准的子程序,而是,写自定义的子程序通过一个标准的主程序调用。

      BaseStage提供了一个emit(Object obj)方法和提交到分支的emit(String branch, Object obj) 方法来传递对象到下一个stage。因此,通常会在process()方法的结束的附近调用EMIT()。终端stage不调用EMIT(),所以没有对象被传递。这也很容易改变,通过添加EMIT()的代码,终端stage变成一个普通的stage。注意,一个stage提交一个数据对象没有后续stage使用它,这样提交的对象只是没有使用而已并没有什么坏处。有时EMIT()方法在postprocess中调用。当处理缓存,或输入和输出对象的汇总,则process()方法通常仅仅存储进入对象的信息,postprocess()完成了工作之后并提交一个新的对象。

 

                              接下一篇:Apache Commons Pipeline 使用学习(二)

  • 大小: 38.2 KB
  • 大小: 61.8 KB
  • 大小: 102.8 KB
分享到:
评论

相关推荐

    commons-digester.jar

    例如,以下代码展示了如何使用Apache Commons Digester创建一个简单的Java对象: ```java import org.apache.commons.digester.Digester; import org.apache.commons.digester.ObjectCreateRule; import org.apache...

    commons-collections4-4.1.jar

    Commons Collections是Apache软件基金会开发的一个Java库,主要提供对集合框架的增强和扩展。这个库是Java标准集合接口的补充,增加了许多实用的功能,提高了代码的可读性和效率。"commons-collections4-4.1.jar"是...

    stepchain-master.zip

    Java Pipeline Step Chain like Apache Commons Chain and Commons Pipeline.A popular technique for organizing the execution of complex processing flows is the "Chain of Responsibility" pattern

    Redis Java客户端Jedis 2.9.0 jar+commons-pool2-2.4.2

    这里提到的`commons-pool2-2.4.2-bin.zip`就是Apache Commons Pool 2库,它是一个通用的对象池服务,用于创建和维护Jedis实例。Jedis通过这个库可以实现高效的连接复用,降低系统资源消耗。 9. **订阅/发布机制**:...

    stepchain 通用业务流程流水线处理框架

    类似于Commons Chain和Commons Pipeline这样的Java Pipeline Step Chain用于组织复杂处理流程执行的流行技术。支持通用业务job、services子流程无限制拆分。支持业务子流程串行化、业务子流程并行化,可配置化。...

    redis整合Spring 需要jar包

    另一个关键的jar包是`commons-pool2-2.4.2.jar`,这是Apache Commons Pool的第二版,一个对象池设计,用于管理和复用特定类型对象,如数据库连接、网络连接等。在与Redis整合时,我们通常会使用它来管理Jedis实例,...

    redis数据库开发所需jar包

    总结来说,"redis数据库开发所需jar包"主要是指Jedis和Apache Commons Pool2这两个Java库,它们为Java开发者提供了一个高效、便捷的接口来与Redis数据库进行交互,是开发基于Redis的应用必不可少的工具。正确理解和...

    PyPI 官网下载 | cidc_ngs_pipeline_api-0.1.10.tar.gz

    Zookeeper是由Apache开发的分布式协调服务,它提供了一种集中式的、高度可靠的配置管理、命名服务、分布式同步和组服务。cidc_ngs_pipeline_api可能利用Zookeeper来管理和协调分布在不同节点上的任务,确保整个管道...

    redis2.9.0+pool2.2

    Apache Commons Pool是Java的一个对象池库,它提供了一种通用的对象池服务,可以被各种组件用来管理资源。在Jedis中,Commons Pool 2.2用于实现连接池管理,它负责维护和复用Jedis实例,以提高性能和效率。 - **...

    redis驱动包

    2. **Apache Commons Pool**: `commons-pool.jar` 是Apache Commons项目中的一个组件,它提供了一个通用的对象池服务。在Jedis中,这个库被用来管理Redis连接池。对象池技术可以有效地减少创建和销毁对象的开销,...

    redis java 客户端

    5. **管道(Pipeline)**:通过一次性发送多条命令并一次性读取所有响应,提高了通信效率。 6. **发布/订阅**:Jedis支持消息发布`publish(channel, message)`和订阅`subscribe(JedisPubSub subscriber, channels.....

    jedis-2.9.0-java

    描述中提到的"commons-pool2-2.0.jar"是Apache Commons Pool 2.0的jar包,这是一个对象池库,主要用于管理资源,比如数据库连接或线程。在Jedis中,它被用来实现Redis连接池,以提高应用程序的效率和性能,避免频繁...

    jedis2.x使用指南.pdf

    jedis的性能优化主要依赖于对象池的配置,jedis使用Apache的commons-pool实现对象池。可以通过配置对象池的参数来优化jedis的性能。同时,jedis还提供了一些扩展包,可以在特定场景下增强jedis的能力。

    tomcat源码学习并添加注释学习

    2. **源码结构**:Tomcat的源代码主要分为几个关键模块,包括Catalina(核心处理引擎)、Commons、NIO(非阻塞I/O)、apr(Apache Portable Runtime,用于提高性能)、Cluster(集群支持)等。 3. **Catalina模块**...

    管道演示

    Java作为一种强大的编程语言,提供了多种方式来实现数据管道,如使用流(Stream)API、Apache Commons Lang的Pipeline工具等。 首先,我们来了解一下Java的流(Stream)API。自Java 8引入以来,流API已经成为处理...

    jedis-3.1.0.jar 最新

    3. `commons-pool2-2.4.3.jar`:这是一个Apache Commons Pool库的版本,它是Jedis依赖的连接池组件,用于管理Redis连接,提高性能和资源利用率。 **Redis与Jedis** Redis是一个高性能的键值存储系统,常用于数据...

    jedis-2.6.3

    `commons-pool2-2.3.jar`是Apache Commons Pool库的版本2.3,这是一个通用的对象池服务,Jedis为了提高性能和资源管理,会利用这个库实现连接池。对象池允许重复使用已创建的对象,避免频繁创建和销毁带来的开销,这...

    jedis开发使用包

    1. **commons-pool-1.6.jar**:这是Apache Commons Pool库的1.6版本,它是一个通用的对象池服务。在Jedis中,它用于管理Redis连接的池化,避免频繁地创建和销毁连接,提高性能。对象池允许程序预先创建一定数量的...

    redis 需要的jar

    `commons-pool-1.5.4.jar`是Apache Commons Pool库的一个旧版本,它是Java对象池设计模式的实现。在Jedis中,这个库被用来管理Redis连接的池化,提高连接的复用性和效率,避免频繁创建和销毁连接带来的性能开销。...

    pipeline-demo

    7. **第三方库的使用**:Java社区中有许多库(如Apache Commons IO、Guava等)提供了丰富的工具和类来简化流操作,学习如何利用这些库提升代码质量。 通过这个“pipeline-demo”项目,开发者可以深入理解数据处理的...

Global site tag (gtag.js) - Google Analytics