`
hongliangpan
  • 浏览: 319694 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

kettle api 执行转换

阅读更多


import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Date;

import be.ibridge.kettle.core.Const;
import be.ibridge.kettle.core.LogWriter;
import be.ibridge.kettle.core.NotePadMeta;
import be.ibridge.kettle.core.database.Database;
import be.ibridge.kettle.core.database.DatabaseMeta;
import be.ibridge.kettle.core.exception.KettleException;
import be.ibridge.kettle.trans.StepLoader;
import be.ibridge.kettle.trans.Trans;
import be.ibridge.kettle.trans.TransHopMeta;
import be.ibridge.kettle.trans.TransMeta;
import be.ibridge.kettle.trans.step.StepMeta;
import be.ibridge.kettle.trans.step.StepMetaInterface;
import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;

/**
 *
 * <p>Title:
 * 本文描述了以下操作:

1)           建立一个新的转换(transformation)

2)           把转换(transformation)存储为XML文件

3)           生成需要在目标表运行的SQL语句

4)           执行转换(transformation)

5)           删除目标表,可以使测试程序可以反复执行(这一点可根据需要修改)。
</p>
 * <p>Description: TODO 类的功能描述</p>
 * <p>Copyright: Copyright (c) 2003</p>

 * @author <a href="mailto: hongliangpan@gmail.com">洪亮</a>
 * @version 1.0
 * <p>------------------------------------------------------------</p>
 * <p> 修改历史 </p>
 * <p>  序号    日期       时间        修 改 人    修 改 原 因</p>
 * <p>   1    2006-9-20   下午05:59:06     洪亮       创建  </p>
 *
 */

public class TransBuilderME
{
    public static final String[] databasesXML = {
        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
        "<connection>" +
            "<name>target</name>" +
            "<server>192.168.169.220</server>" +
            "<type>ORACLE</type>" +
            "<access>Native</access>" +
            "<database>NMSDB</database>" +
            "<port>1521</port>" +
            "<username>UCP</username>" +
            "<password>UCP</password>" +
          "</connection>",
         
          "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
          "<connection>" +
              "<name>source</name>" +
              "<server>192.168.169.220</server>" +
              "<type>ORACLE</type>" +
              "<access>Native</access>" +
              "<database>NMSDB</database>" +
              "<port>1521</port>" +
              "<username>UCP</username>" +
              "<password>UCP</password>" +
            "</connection>" 
    };

    /**
     * Creates a new Transformation using input parameters such as the tablename to read from.
     * @param transformationName The name of the transformation
     * @param sourceDatabaseName The name of the database to read from
     * @param sourceTableName The name of the table to read from
     * @param sourceFields The field names we want to read from the source table
     * @param targetDatabaseName The name of the target database
     * @param targetTableName The name of the target table we want to write to
     * @param targetFields The names of the fields in the target table (same number of fields as sourceFields)
     * @return A new transformation
     * @throws KettleException In the rare case something goes wrong
     */
    public static final TransMeta buildCopyTable(String transformationName, String sourceDatabaseName, String sourceTableName, String[] sourceFields, String targetDatabaseName, String targetTableName, String[] targetFields) throws KettleException
    {
        LogWriter log = LogWriter.getInstance();
       
        try
        {
            //
            // Create a new transformation...
            //传输元信息
            TransMeta transMeta = new TransMeta();
            transMeta.setName(transformationName);//传输名称
           
            // Add the database connections
            for (int i=0;i<databasesXML.length;i++)//数据库配置信息数组
            {
                DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);//数据库元信息
                transMeta.addDatabase(databaseMeta);//传输元  中加入数据库元信息
            }
           
            DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);//查找源数据库元信息
            DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);//查找目标数据库元信息

           
            //
            // Add a note
            //
            String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;
            note += "After that, it writes the information to table [" + targetTableName + "] on database [" + targetDBInfo + "]";
            NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);//注释信息
            transMeta.addNote(ni);

            //
            // create the source step...
            //
            String fromstepname = "read from [" + sourceTableName + "]";//from步骤名称
            TableInputMeta tii = new TableInputMeta();//表输入元数据信息
            tii.setDatabaseMeta(sourceDBInfo);//为表输入 指定 数据库
            String selectSQL = "SELECT "+Const.CR;//拼接查询sql语句
            for (int i=0;i<sourceFields.length;i++)
            {
                if (i>0) selectSQL+=", "; else selectSQL+="  ";
                selectSQL+=sourceFields[i]+Const.CR;
            }
            selectSQL+="FROM "+sourceTableName;
            tii.setSQL(selectSQL);//设置查询sql语句

            StepLoader steploader = StepLoader.getInstance();//???

            String fromstepid = steploader.getStepPluginID(tii);
            //步骤元数据信息
            StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
            fromstep.setLocation(150, 100);
            fromstep.setDraw(true);
            fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");
            //传输中 添加步骤
            transMeta.addStep(fromstep);
            //
            // add logic to rename fields
            // Use metadata logic in SelectValues, use SelectValueInfo...
            //选择字段(重命名)
            SelectValuesMeta svi = new SelectValuesMeta();
            svi.allocate(0, 0, sourceFields.length);
            for (int i = 0; i < sourceFields.length; i++)
            {
             //设置源字段和目标字段
                svi.getMetaName()[i] = sourceFields[i];
                svi.getMetaRename()[i] = targetFields[i];
            }

            String selstepname = "Rename field names";
            //获取步骤插件ID
            String selstepid = steploader.getStepPluginID(svi);
            //创建步骤元数据信息
            StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
            selstep.setLocation(350, 100);
            selstep.setDraw(true);
            selstep.setDescription("Rename field names");
            //添加步骤
            transMeta.addStep(selstep);

            //传输连接元数据信息(连接from和select)
            TransHopMeta shi = new TransHopMeta(fromstep, selstep);
            transMeta.addTransHop(shi);//添加到传输元对象
            fromstep = selstep;//然后设置from步骤为select步骤

            //
            // Create the target step...
            //
            //
            // Add the TableOutputMeta step...
            //设置目标步骤名称
            String tostepname = "write to [" + targetTableName + "]";
            //表输出元对象
            TableOutputMeta toi = new TableOutputMeta();
            toi.setDatabase(targetDBInfo);//设置数据库
            toi.setTablename(targetTableName);//设置表名
            toi.setCommitSize(3000);//设置批量提交数
            toi.setTruncateTable(true);//是否清除原有数据

            //获取步骤ID
            String tostepid = steploader.getStepPluginID(toi);
            //创建to步骤
            StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
            tostep.setLocation(550, 100);
            tostep.setDraw(true);
            tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
            transMeta.addStep(tostep);//添加步骤

            //
            // Add a hop between the two steps...
            //
            //创建连接 from--to
            TransHopMeta hi = new TransHopMeta(fromstep, tostep);
            transMeta.addTransHop(hi);

            // OK, if we're still here: overwrite the current transformation...
            return transMeta;
        }
        catch (Exception e)
        {
            throw new KettleException("An unexpected error occurred creating the new transformation", e);
        }
    }

    /**
     * 1) create a new transformation
     * 2) save the transformation as XML file
     * 3) generate the SQL for the target table
     * 4) Execute the transformation
     * 5) drop the target table to make this program repeatable
     *
     * @param args
     */
    public static void main(String[] args) throws Exception
    {
        long start = new Date().getTime();
        // Init the logging...
        LogWriter log = LogWriter.getInstance("TransBuilder.log", true, LogWriter.LOG_LEVEL_DETAILED);
       
        // Load the Kettle steps & plugins
        StepLoader stloader = StepLoader.getInstance();
        if (!stloader.read())
        {
            log.logError("TransBuilder",  "Error loading Kettle steps & plugins... stopping now!");
            return;
        }
       
        // The parameters we want, optionally this can be
        String fileName = "./NewTrans.xml";
        String transformationName = "Test Transformation";
        String sourceDatabaseName = "source";
        String sourceTableName = "emp_collect";
        String sourceFields[] = {
          "empno",      
          "ename",      
          "job",        
          "mgr",        
          "comm",       
          "sal",        
          "deptno",     
          "birthday"   
 
            };

        String targetDatabaseName = "target";
        String targetTableName = "emp_kettle01";
        String targetFields[] = {
          "empno01",      
          "ename01",      
          "job01",        
          "mgr01",        
          "comm",       
          "sal",        
          "deptno",     
          "birthday"
            };

       
        // Generate the transformation.
        //创建转换元对象
        TransMeta transMeta = TransBuilderME.buildCopyTable(
                transformationName,
                sourceDatabaseName,
                sourceTableName,
                sourceFields,
                targetDatabaseName,
                targetTableName,
                targetFields
                );
//        transMeta = new  TransMeta();
        // Save it as a file:
        //传输元对象 中获得XML,并输出
        String xml = transMeta.getXML();
        DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
        dos.write(xml.getBytes("UTF-8"));
        dos.close();
        System.out.println("Saved transformation to file: "+fileName);

        // OK, What's the SQL we need to execute to generate the target table?
        //获得sql语句,创建表语句
        String sql = transMeta.getSQLStatementsString();

        // Execute the SQL on the target table:
        //创建表
        Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
        targetDatabase.connect();//连接数据库
        targetDatabase.execStatements(sql);//执行sql
       
        // Now execute the transformation...
        //执行传输任务
        Trans trans = new Trans(log, transMeta);
        trans.execute(null);
        trans.waitUntilFinished();//等待执行完毕
       
        // For testing/repeatability, we drop the target table again
//        targetDatabase.execStatement("drop table "+targetTableName);
        targetDatabase.disconnect();//断开数据库连接
       
        long end = new Date().getTime();
        System.out.println("运行时间:" + (end - start) / 1000 + "秒");
        long min = (end - start) / 1000 / 60;
        long second = (end - start) / 1000 % 60;
        System.out.println("运行时间:" + min + "分钟" + second + "秒");
    }


}
 

分享到:
评论

相关推荐

    Java调用Kettle API执行转换和作业,Java代码生成Kettle转换。.zip

    本篇笔记主要探讨如何利用Java调用Kettle API来执行转换(Transformation)和作业(Job),以及如何通过Java代码生成Kettle转换。 首先,理解Kettle的基本概念是必要的。转换是数据清洗、转换和加载过程的逻辑单元...

    kettle执行转换每一行数据

    在这个场景中,我们要探讨的是如何使用Kettle来执行转换,特别是针对数据库中的每一行数据。 标题“kettle执行转换每一行数据”暗示了我们将关注Kettle如何处理数据库表中的记录。在Kettle中,这个过程通常涉及到...

    利用kettle的api运用Java代码完成数据转换到excel和数据库源码

    Kettle 的四大块分别是 Chef、Kitchen、Spoon 和 Span,分别负责工作设计、工作执行、转换设计和转换执行。 在 Java 中,可以利用 Kettle 的 API 来对数据进行转换。下面是一个使用 Kettle 的 API 完成数据库与...

    Kettle API(HTML格式)

    这些内容有助于开发者理解如何利用API来与Kettle进行交互,比如创建和执行转换、作业,管理数据存储,或者与其他系统进行集成。HTML格式使得这些文档易于浏览和搜索,便于开发人员快速定位所需的信息。 Kettle API...

    Kettle API - Java调用示例

    Kettle API 是一个强大的工具,允许开发者通过Java代码与Pentaho Data Integration (Kettle) 进行交互,实现数据提取、转换和加载(ETL)过程的自动化。在给定的示例中,主要展示了如何使用Kettle API 创建、保存和...

    Kettle java API

    Kettle引擎是整个平台的核心,负责执行转换(Transformations)和作业(Jobs)。通过Java API,开发者可以创建、运行和控制这些实体。例如,`Trans`和`Job`类分别代表转换和作业对象,提供了启动、暂停、停止和监控...

    java调用kettle提供的API实现数据抽取DEMO

    Java调用Kettle API实现数据抽取DEMO是一个典型的ETL(提取、转换、加载)过程,其中Kettle(又称Pentaho Data Integration或KDI)是一个强大的数据集成工具,允许开发者通过图形化界面或者编程方式处理数据迁移和...

    开源ETL工具-kettle API 使用手册 下载

    Kettle API是Kettle框架的一部分,允许开发者通过编写Java代码来创建、运行和管理Kettle的数据转换和工作流。API提供了对内部对象(如步骤、转换和工作流)的直接访问,以及对Kettle元数据的控制,使得开发人员能够...

    java调用kettle中的job与转换-源码

    注意,执行转换前需要创建`TransExecutionConfiguration`对象,对于作业则是`JobExecutionConfiguration`。例如: ```java Trans trans = new Trans(transMeta); trans.execute(new String[0]); // 执行转换 Job...

    kettle作业及转换脚本.zip

    这个.kjb文件可以在Kettle的Job Designer中打开和编辑,也可以通过命令行或API执行。 4. **KTR文件**: KTR是Kettle转换文件的扩展名,它包含了转换的所有细节,如步骤类型、参数设置、字段映射等。与.kjb文件类似...

    Kettle 4.4.0_api

    7. **Pan and Kitchen**:Pan用于执行转换,而Kitchen用于执行工作。这两个命令行工具使得Kettle可以在无人值守的情况下运行,常用于批处理和自动化场景。 8. **Plugin系统**:Kettle的插件系统非常强大,允许...

    kettle_7.1.0_API.rar

    1. **核心API**:这是Kettle的核心组件,包括Job和Transformation的执行引擎,Step和JobEntry的实现,以及数据流的管理。其中,Transformation负责数据的转换过程,Job则用于管理工作流程。开发者可以通过API创建、...

    KETTLE JAVA API学习

    - `Trans` 类提供了执行转换的方法,如`trans.execute()`启动转换,`trans.waitUntilFinished()`等待转换完成。 - `TransHopMeta` 用于定义步骤之间的连接,包括数据流和控制流。 6. **日志与调试**: - `...

    java kettle api pom maven jar

    Java Kettle API是用于在Java应用程序中与Pentaho Data Integration (Kettle)进行交互的接口,它允许开发者在Java代码中执行Kettle的工作流和转换。Pentaho Data Integration,通常简称为Kettle,是一个开源的数据...

    Kettle-API FOR java

    4. 执行转换和作业:有了配置好的转换和作业,可以通过`Trans`和`Job`对象进行执行。`Trans.execute()`方法用于启动转换,`Job.run()`方法执行作业。这些方法会启动Kettle引擎并执行实际的数据处理。 三、Kettle ...

    KettleAPI-Java调用示例.pdf

    在执行转换后,通过`execStatement()`方法执行DROP TABLE语句,断开与数据库的连接,完成清理工作。 创建转换的方法`buildCopyTable()`是一个静态工厂方法,接受源和目标数据库的名称、表名以及字段列表作为参数。...

    kettleAPI文档

    Kettle API文档主要聚焦于Pentaho Data Integration(也称为Kettle或ETL工具)的编程接口,它允许开发人员通过代码与Kettle进行交互,实现自动化数据转换和集成任务。Ambari是一个用于Hadoop集群管理和监控的开源...

    Kettle运行jar及其mvn脚本及配置文件.rar

    - 在Java代码中导入Kettle相关的类和API,创建并执行Kettle作业或转换。 - 构建和运行Java项目,Kettle的功能将被无缝集成到项目中。 以上就是关于"Kettle运行jar及其mvn脚本及配置文件.rar"的详细知识点,涵盖了...

    Kettle中文教程(含3.2中文教程,API调用说明等)

    用户可以通过手册学习如何设计和执行转换、作业,管理连接,设置日志和监控,以及利用Kettle的各种高级特性,如ETL元数据注入和数据仓库建模等。 `KETTLE.pdf`可能是Kettle的官方文档或者是某一特定版本的详细指南...

    kettle3.2.0 java API

    6. **Execution and Control API**:这些API允许开发人员在程序中控制Kettle作业和转换的执行,比如暂停、继续、停止等操作,以及实时获取执行状态和进度。 7. **Data Access API**:Kettle支持多种数据源,包括...

Global site tag (gtag.js) - Google Analytics