如果你需要在自己的Java应用程序中集成Kettle , 一般来说有两种应用需求,一种是通过纯设计器来设计ETL转换任务,然后保存成某种格式,比如xml或者在数据库中都可以,然后自己调用程序解析这个格式,执行这种转换,是比较抽象的一种执行方式,ETL里面转换了什么东西我们并不关心,只关心它有没有正常执行。另一种是通过完全编程的方式来实现,详细的控制每一个步骤,需要知道转换执行的成功与否,这种方式可能需要更多的理解kettle的API 以便更好的跟你的应用程序紧密结合,不过难度也比较大,可以很好的定制你的应用程序,代价自然是入门门槛比较高。本文主要向你解释第一种Kettle的集成方式,文中所列出的代码节选自pentaho ,不过应用程序本身跟pentaho 没有什么关系。
Pentaho 集成kettle的代码主要是两个类,KettleSystemListener和 KettleComponent,看名字就猜出KettleSystemListener 主要是起监听器的作用,它主要负责初始化kettle的一些环境变量,这个类主要包含四个方法: startup() , readProperties(),environmentInit(),shutdown(),程序入口自然是startup()方法,然后它会调用environmentInit() 方法,这个方法就调用readProperties()方法读一个配置文件kettle.properties,这个文件主要记录者kettle运行时可以调用的一些环境变量,关于kettle.properties文件怎么用,第二篇文章“使用Kettle设计动态转换”有提到,readProperties()方法读完这个文件之后就把里面的键值对转换成变量传给kettle运行环境.当kettle运行完了之后就调用shutdown()方法结束转换. KettleSystemListener相对逻辑比较简单,就不多介绍,下面主要介绍重点类:
KettleComponent
KettleComponent的方法主要有三种类型,一类是用来初始化工作,做一些验证工作,第二类是执行转换的方法,也是主要需要讨论的方法,第三类是取得数据结果的,有时候你需要得到转换的结果交给下一个步骤处理.下面分别讨论这三类方法。
初始化
KettleComponent的初始化工作主要是验证这个转换,包括有validateSystemSettings(),init(),validateAction(),全部都是public 方法,validateSystemSettings()会检查kettle 使用何种方式来连接资源库。
kettle有两种方式连接资源库,一种是纯数据库式,也就是你所有的转换全部都保存在一个数据库中,一般你在开始使用kettle的时候,它都会要求你建立一个资源仓库,这个资源仓库的连接方式就是你的数据库连接,你需要能够有相应的数据库驱动和对应的连接用户名和密码。另外一种连接方式是使用文本文件,也就是xml文件,在做完任何转换之后,我们都可以把转换或者Job变成xml文件输出,这个输出文件包含你所有转换的全部信息。
在示例应用中使用的是文件的连接方式,下面看一下初始化的一段代码:
Boolean useRepository = PentahoSystem.getSystemSetting("kettle/settings.xml",
"repository.type","files").equals("rdbms");
PentahoSystem.getSystemSetting()方法只是返回一个字符串,使用的xpath读一个xml的对应字段,下面列出settings.xml文件:
<kettle-repository>
<!-- The values within <properties> are passed directly to the Kettle Pentaho components. -->
<!-- This is the location of the Kettle repositories.xml file, leave empty if the default is used: $HOME/.kettle/repositories.xml -->
<repositories.xml.file></repositories.xml.file>
<repository.type>files</repository.type>
<!--The name of the repository to use -->
<repository.name></repository.name>
<repository.userid>admin</repository.userid>
<repository.password>admin</repository.password>
</kettle-repository>
可以看到其中的repositories.xml.file 上面的一段注释,如果这个值为空会默认使用$HOME/.kettle/repository.xml文件当作资源库的连接文件,由于示例中使用的是文本文件所以没有用数据库连接,下面的repository.userid和repository.password是指的kettle的资源库连接的用户名和密码,一般默认安装就两个,admin/admin和guest/guest , 这里的用户名和密码不是连接数据库的用户名和密码,连接数据库的用户名和密码是在另外一个文件repositories.xml.file指定的值所定义的
一般默认的kettle安装并且运行了一段时间之后,会在$HOME/.kettle 目录下创建一些文件,如果你要在自己的系统中集成kettle的话,也需要保留这些文件,当然不一定位置是在原来的位置,关键是要让kettle知道这些文件放在哪。
执行转换
当读完了这些配置文件并且验证了之后,KettleComponent就开始把前面读到的转换文件或者资源库类型变成Kettle的API,这主要是在executeAction()方法里面进行,它当然根据连接方式也分两种执行类型:
1. 文本执行方式
2. 资源库连接方式
文本执行方式需要接受一个你指定的运行转换的文件或者Job的文件,然后把这个xml文件解析成Kettle能够执行的模式,
根据执行的类型又可以分成两种:
1. Trans任务
2. Job任务
两个执行的逻辑差不多,下面先介绍Trans的执行方式:
执行Trans任务
transMeta = new TransMeta(fileAddress, repository, true);
transMeta.setFilename(fileAddress);
然后它会调用:
executeTransformation(TransMeta transMeta, LogWriter logWriter)
这个方法是真正的把前面的来的transMeta转换成trans对象,等待下一步的执行:
Trans trans = new Trans(logWriter, transMeta);
List stepList = trans.getSteps();
for (int stepNo = 0; stepNo < stepList.size(); stepNo++) {
StepMetaDataCombi step = (StepMetaDataCombi) stepList.get(stepNo);
if (step.stepname.equals(stepName)) {
①Row row = transMeta.getStepFields(stepName);
// create the metadata that the Pentaho result set needs
String fieldNames[] = row.getFieldNames();
String columns[][] = new String[1][fieldNames.length];
for (int column = 0; column < fieldNames.length; column++) {
columns[0][column] = fieldNames[column];
}
②MemoryMetaData metaData = new MemoryMetaData(columns, null);
results = new MemoryResultSet(metaData);
// add ourself as a row listener
③step.step.addRowListener(this);
foundStep = true;
break;
}
}
1. Row对象是kettle用来表示一行数据的标准对象,跟jdbc取出来的一条数据转化后成为的一个POJO是一样的。里面可以包含多个字段。
2 . MemoryMetaData对象是pentaho特有的,是专门用来返回ETL任务执行后的结果的,与标准的JDBC里面的resultSet 对应的resultSetMetaData是一样的。
3. 对于如何处理数据的一个Listener,实现的是一个RowListener,数据是每一行每一行处理的,后面会介绍如果需要输出数据怎么取得这些输出数据。如果不需要放回任何对象,则从1处开始都可以不要,只要初始化step对象即可。
所有的step对象都已经初始化之后就可以开始执行了,
trans.startThreads();
trans.waitUntilFinished();
结束之后还有一些清理工作就不列出了。
执行Job任务
执行Job任务之前还是会读取Job任务的描述文件,然后把这个描述文件(kettle的 .ktr文件)变成一个xml文档的dom :
org.w3c.dom.Document doc = XmlW3CHelper.getDomFromString(jobXmlStr);
之后也是初始化对应的元数据对象JobMeta
jobMeta = new JobMeta(logWriter, doc.getFirstChild(), repository);
得到了jobMeta 之后就可以执行这个Job了,这里跟trans是一样的。
job = new Job(logWriter, StepLoader.getInstance(), repository, jobMeta);
由于Job一般都没有什么返回值,所以Job不需要初始化它下面的对象,直接开始运行就可以了
job.start();
job.waitUntilFinished(5000000);
连接资源库
连接资源库使用的是connectToRepository()方法,先取得RepositoriesMeta对象,然后根据你在setting.xml文件里面定义的repository的名字来连接对应的repository.理论上来说我们一般都只使用一个repository ,但如果在产品中需要使用多个repository的话,你需要自己配置多个repository的名字和对应的用户名和密码。只列出几行关键代码,
repositoriesMeta = new RepositoriesMeta(logWriter);
repositoriesMeta.readData(); // 从$HOME/.kettle/repositories.xml 读数据.
repositoryMeta = repositoriesMeta.findRepository(repositoryName);
repository = new Repository(logWriter, repositoryMeta, userInfo);
userInfo = new UserInfo(repository, username, password);
从资源库读取Trans
连接到资源库之后自然是想办法读取数据库的表,把里面的记录转换成为Trans 对象,使用的是loadTransformFromRepository,这个方法的函数原型需要解释一下:
TransMetaloadTransformFromRepository(String directoryName, String transformationName, Repository repository,LogWriter logWriter)
第一个参数String directoryName 代表是你储存转换的目录,当你使用kettle 图形界面的时候,点击repository菜单的explorer repository , 你会发现你所有的东西都是存储在一个虚拟的类似与目录结构的地方,其中包括database connections , transformations , job , users 等,所以你需要的是指定你连接的目录位置,你也可以在目录里面再创建目录。
String transformationName 自然指的就是你转换的名字.
Repository repository 指的是你连接的资源库。
LogWriter logWriter 指定你的日志输出,这个log 指的是你kettle 转换的日志输出,不是应用程序本身的输出。
读取TransMeta的步骤也相对比较简单
repositoryDirectory=repository.getDirectoryTree().findDirectory(directoryName);
transMeta = new TransMeta(repository, transformationName, repositoryDirectory);
从资源库读取Job
从资源库读取Job跟Trans 的步骤基本是一样的,同样需要指定你存储Job的目录位置.
JobMeta loadJobFromRepository(String directoryName, String jobName,
Repository repository, LogWriter logWriter)
读取结果集
一般Job都是不会返回任何结果集的,大部分Trans也不会返回结果集,应为结果集一般都会直接从一个数据库到另一个数据库了,但是如果你需要返回转换的结果集,那么这一小结将会向你解释如何从一个Trans里面读取这些结果集
首先,你需要一个容纳Result的容器,就是类似与JDBC里面的resultSet, resultSet当然会有一个resultSetMetadata跟它相关联,在本文所举的实例中,使用的是pentaho私有的memoryResultSet,
你可以不用关心它的细节,并且它的类型正如它的名字一样是存在与Memory的,所以它不能被持久化,这个里面储存的是一个二维的Object数组,里面的数据就是从kettle转化之后来的。
要从kettle的转换中读取结果集,要实现RowListener 接口,Row 是kettle里面表示一行数据的一个类,RowListener 自然是指在转换数据转换的时候发生的事件,它有三个方法需要实现,
void rowReadEvent(Row)
void rowWrittenEvent(Row)
void errorRowWrittenEvent(Row)
分别对应读数据时的事件,写数据事的时间,出错时的时间,我们需要取得结果集,所以只需要实现rowWrittenEvent(Row)就可以了,Row对象是通过TransMeta取得的,
Row row = transMeta.getStepFields(stepName);
下面给出具体实现取Row转换成resultSet的代码:
Object pentahoRow[] = new Object[results.getColumnCount()];
for (int columnNo = 0; columnNo < results.getColumnCount(); columnNo++) {
Value value = row.getValue(columnNo);
switch (value.getType()) {
case Value.VALUE_TYPE_BIGNUMBER:
pentahoRow[columnNo] = value.getBigNumber();
break;
........
results.addRow(pentahoRow);
默认的数据类型是String 类型(在省略部分).
整个代码最重要的一行是Value value = row.getValue(columnNo);
这是真正取得实际数据的一行。有时候你会觉得实现一个resultSet比较麻烦,尤其是你还要实现相关的resultSetMetaData,怎么把数据转换成你自己的类型,你大可以就用一个List of List 来实现,里面的List 就代表Row 的对应数据,外面一层List 就是result , 整个代码会简单一些,当然,你要自己知道最后这个List怎么用.
本文有意隐藏了一些跟pentaho有关的细节,比如validateSystemSettings(),init(),validateAction()方法,这些都是pentaho私有的,有些方法比如rowWrittenEvent(Row) 是用来取结果集的,但是很多时候我们不需要取转换的结果集,文中很多代码都只列出主要的部分,省略一些判断,调试,log部分的代码,大家可以自己下载这些代码来研究,
本文并没有给出一个可以独立运行的示例,因为这个示例一定会太过于简单(不超过15行代码),但是却并不能考虑到各种情况,连接资源库还是文件,运行转换还是Job ,metadata怎么得来的,需不需要转换之后的结果。
关于在本文一开始提到的使用kettle的两种方式,对于第二种使用方式:使用完全编程的方式来运行转换,其实它的与第一种方式的区别就好像一个用设计器来写xml文件,一个用纯手工方式写xml文件(用代码的xml),大家可以参考官方网站上的一段示例代码,地址如下:
http://kettle.pentaho.org/downloads/api.php
分享到:
相关推荐
然而,在Java应用程序中集成Kettle,需要将相关的Jar包添加到项目的类路径中,以便能够调用其API来执行工作流或转换。 首先,要理解Java与Kettle的集成,我们需要知道Kettle的主要组件。Kettle由以下几部分组成: ...
标题 "如何将JAVA程序集成到KETTLE中" 提示了我们,这个主题主要讨论的是如何在数据处理工具Kettle(也称为Pentaho Data Integration,简称PDI)中利用Java代码进行扩展和功能增强。Kettle是一个开源的数据集成平台...
在java应用程序中集成kettle,需要在项目中引入执行kettle所需要的jar包,包括kettle对应的目录下找到的外部jar包。另外,将kettle目录下plugins目录拷贝到应用程序根目录下,这一点很重要,否则在用transformation...
Java集成Kettle执行作业文件是将Java程序与Pentaho Data Integration(Kettle)工具结合,以实现数据处理和转换的自动化。Kettle是一款强大的ETL(Extract, Transform, Load)工具,它允许用户通过图形化界面创建...
- "4连接资源库的包"可能指的是Kettle中用于连接不同数据库的JDBC驱动,这些驱动是Kettle连接到各种数据库系统所必需的。Kettle支持多种数据库,如MySQL、Oracle、SQL Server等,因此,确保正确的JDBC驱动被添加到...
Java调用Kettle 5.3任务是一种在Java应用程序中集成Kettle(Pentaho Data Integration,也称为ETL工具)的方式,以便利用其强大的数据转换和加载功能。以下是对这个主题的详细解释: 1. **Kettle简介**:Kettle是一...
2. 集成性:可以将Kettle作业无缝集成到现有的Java应用程序或服务中。 3. 自动化:便于自动化部署和执行,例如在持续集成/持续部署(CI/CD)流程中。 总结起来,这个压缩包提供的Java代码示例展示了如何使用Kettle...
标题 "springboot整合kettle项目源码" 描述了一个基于Spring Boot框架的集成Kettle(Pentaho Data Integration,简称KDI)的工程实例。Kettle是一款强大的ETL(提取、转换、加载)工具,它允许开发者通过编写Java...
它提供了一套全面的Java API,允许开发人员在Java应用程序中直接集成和操作Kettle的功能。这篇详细的知识点解释将深入探讨Kettle Java API的核心概念、使用场景和主要功能。 1. **Kettle Engine API**: Kettle...
首先,我们要理解Kettle转换(KTR)文件是Kettle中的核心组件,它保存了由用户设计的数据转换逻辑。这些文件通常是由Kettle的 Spoon 工具创建的,但通过Java API,我们可以自定义生成这些文件,这对于批量创建或动态...
Java调用Kettle是将Java程序与Pentaho Kettle(也称为Spoon)集成的过程,Kettle是一款强大的数据转换和ETL(提取、转换、加载)工具。在Java环境中,我们可以通过API来启动和控制Kettle的工作流或转换。这个压缩包...
总之,Java调用Kettle通过引入相关jar包并利用其API,能够无缝地将强大的Kettle ETL功能集成到Java应用程序中,从而实现更复杂的业务逻辑和数据处理需求。这为开发人员提供了更大的灵活性,同时利用了Kettle的可视化...
通过`TestKettle.java`,我们可以了解到Kettle在实际应用中的集成方法,掌握如何在Java程序中启动和控制数据转换流程。这仅仅是开始,深入学习Kettle的API和最佳实践,将帮助我们更好地利用其强大功能,解决复杂的...
在Java环境中,我们可以利用Kettle的API来编写自定义程序,实现数据抽取、清洗、转换和加载到目标系统。 首先,了解Kettle API的基础知识至关重要。Kettle API是Kettle的核心组件之一,它提供了一系列接口和类,...
Java调用Kettle是一种常见的数据集成方案,Kettle(也称为Pentaho Data Integration或PDI)是一款强大的ETL(Extract, Transform, Load)工具,它允许用户通过图形化界面设计数据转换流程,并能被其他应用程序如Java...
Kettle提供了API和执行引擎,允许外部应用程序如Java程序与其交互。通过使用Kettle的`trans`和`job` API,可以启动、监控和控制转换(Transformation)和作业(Job)的执行。Kettle的核心类库包括`kettle-engine`和...
在Java开发环境中,引入这些JAR包后,开发者可以编写自定义的Java程序来操作Kettle。例如,你可以创建一个Java应用来动态生成Kettle的作业或转换,或者通过API来控制现有作业的执行状态。这为自动化数据处理流程提供...
在Kettle中,可以将作业和转换保存为.kjb和.ktr文件,然后通过执行这些文件来运行相应的任务。 在Java中调用Kettle的jar包,我们通常会依赖Kettle的API,如`org.pentaho.di.core.KettleClientEnvironment`和`org....
JAR文件在Java应用程序中是代码的打包形式,它包含了一组类文件和其他资源。在Kettle中,JAR文件通常包含了特定的功能扩展或插件,例如自定义的数据转换步骤、连接器或者特殊功能实现。这些JAR文件被添加到Kettle的...
通过引入这些库,开发者可以在Java程序中灵活地调用Kettle的功能,实现复杂的数据处理逻辑,提升数据处理的效率和灵活性。在使用过程中,还需要注意处理好依赖关系,避免版本冲突,以及适当地进行错误处理和日志记录...