基于kettle版本5.4.0.1-130
1:实现类
2:JSONToStringMeta
说明:配置标签@Step(),初始化程序
extends BaseStepMeta :继承BaseStepMeta
implements StepMetaInterface :实现接口StepMetaInterface
整个代码的执行过程是:先运行配置标签meta类型,通过meta类找到Dialog类,当运行脚本时调用实现类
3:JSONToStringData:定义共享对象类
4:JSONToStringDialog:kettle的ui管理界面
说明:extends BaseStepDialog implements StepDialogInterface
5:JSONToString
说明:实现json转string
1:实现类
2:JSONToStringMeta
说明:配置标签@Step(),初始化程序
extends BaseStepMeta :继承BaseStepMeta
implements StepMetaInterface :实现接口StepMetaInterface
整个代码的执行过程是:先运行配置标签meta类型,通过meta类找到Dialog类,当运行脚本时调用实现类
@Step(id="bsoft-json", image="JSO.png", name="JSONToString", description="json格式转string", categoryDescription="Bsoft-Dc") public class JSONToStringMeta extends BaseStepMeta implements StepMetaInterface { private XMLField[] inputFields; private ValueMetaAndData value; private String outputName; public JSONToStringMeta() { super(); // allocate BaseStepInfo } //流程中新增列字段 @SuppressWarnings("deprecation") @Override public void getFields(RowMetaInterface r, String origin, RowMetaInterface[] info, StepMeta nextStep, VariableSpace space) { ValueMetaInterface v = new ValueMeta(outputName); v.setType(ValueMetaInterface.TYPE_STRING); r.addValueMeta(v); } @Override public Object clone() { JSONToStringMeta retval = (JSONToStringMeta) super.clone(); int nrfields = inputFields.length; retval.allocateInput(nrfields); for (int i = 0; i < nrfields; i++) { retval.inputFields[i] = (XMLField) inputFields[i].clone(); } return retval; } public void allocateInput(int nrfields) { inputFields = new XMLField[nrfields]; } //读取ktr脚本的xml数据 @Override public void loadXML(Node stepnode, List<DatabaseMeta> databases, Map<String, Counter> counters) throws KettleXMLException { outputName = XMLHandler.getTagValue(stepnode, "outputName"); //$NON-NLS-1$ Node infields = XMLHandler.getSubNode(stepnode, "inFields"); //$NON-NLS-1$ int num = XMLHandler.countNodes(infields, "field"); //$NON-NLS-1$ allocateInput(num); for (int i = 0; i < num; i++) { Node fnode = XMLHandler.getSubNodeByNr(infields, "field", i); //$NON-NLS-1$ inputFields[i] = new XMLField(); inputFields[i].setType(XMLHandler.getTagValue(fnode, "type")); //$NON-NLS-1$ inputFields[i].setFieldName(XMLHandler.getTagValue(fnode, "name")); //$NON-NLS-1$ inputFields[i].setFormat(XMLHandler.getTagValue(fnode, "format")); //$NON-NLS-1$ inputFields[i].setLength(Const.toInt( XMLHandler.getTagValue(fnode, "length"), -1)); //$NON-NLS-1$ inputFields[i].setPrecision(Const.toInt( XMLHandler.getTagValue(fnode, "precision"), -1)); //$NON-NLS-1$ } } //将对象Meta数据转成xml返回 @Override public String getXML() { StringBuffer retval = new StringBuffer(300); retval.append(" " + XMLHandler.addTagValue("outputName", outputName == null ? "" : outputName)); XMLField[] infields = getInputField(); retval.append(" <inFields>"); //$NON-NLS-1$ for (int i = 0; i < infields.length; i++) { inputFields[i] = infields[i]; retval.append(" <field>"); //$NON-NLS-1$ retval.append(" ").append(XMLHandler.addTagValue("name", inputFields[i].getFieldName())); //$NON-NLS-1$ //$NON-NLS-2$ retval.append(" ").append(XMLHandler.addTagValue("type", inputFields[i].getTypeDesc())); //$NON-NLS-1$ //$NON-NLS-2$ retval.append(" ").append(XMLHandler.addTagValue("format", inputFields[i].getFormat())); //$NON-NLS-1$ //$NON-NLS-2$ retval.append(" ").append(XMLHandler.addTagValue("length", inputFields[i].getLength())); //$NON-NLS-1$ //$NON-NLS-2$ retval.append(" ").append(XMLHandler.addTagValue("precision", inputFields[i].getPrecision())); //$NON-NLS-1$ //$NON-NLS-2$ retval.append(" </field>"); //$NON-NLS-1$ } retval.append(" </inFields>"); //$NON-NLS-1$ return retval.toString(); } //界面调试 @Override public void check(List<CheckResultInterface> remarks, TransMeta transmeta, StepMeta stepMeta, RowMetaInterface prev, String input[], String output[], RowMetaInterface info) { CheckResult cr; if (prev == null || prev.size() == 0) { cr = new CheckResult(CheckResultInterface.TYPE_RESULT_WARNING, "Not receiving any fields from previous steps!", stepMeta); remarks.add(cr); } } //获得缓存中的数据 @Override public void readRep(Repository rep, ObjectId id_step, List<DatabaseMeta> databases, Map<String, Counter> counters) throws KettleStepException { try { outputName = rep.getStepAttributeString(id_step, "outputName"); int inFields = rep.countNrStepAttributes(id_step, "inFields"); //$NON-NLS-1$ allocateInput(inFields); for (int i = 0; i < inFields; i++) { inputFields[i] = new XMLField(); inputFields[i].setFieldName(rep.getStepAttributeString(id_step, i, "field_name")); //$NON-NLS-1$ inputFields[i].setType(rep.getStepAttributeString(id_step, i, "field_type")); //$NON-NLS-1$ inputFields[i].setFormat(rep.getStepAttributeString(id_step, i, "field_format")); //$NON-NLS-1$ inputFields[i].setLength((int) rep.getStepAttributeInteger( id_step, i, "field_length")); //$NON-NLS-1$ inputFields[i].setPrecision((int) rep.getStepAttributeInteger( id_step, i, "field_precision")); //$NON-NLS-1$ } } catch (KettleException e) { throw new KettleStepException(e.getMessage()); } } //保存数据到缓存中 @Override public void saveRep(Repository rep, ObjectId id_transformation, ObjectId id_step) throws KettleException { try { rep.saveStepAttribute(id_transformation, id_step, "outputName", outputName); XMLField[] nrfields = getInputFields(); for (int i = 0; i < nrfields.length; i++) { rep.saveStepAttribute(id_transformation, id_step, i, "field_name", nrfields[i].getFieldName()); //$NON-NLS-1$ rep.saveStepAttribute(id_transformation, id_step, i, "field_type", nrfields[i].getTypeDesc()); //$NON-NLS-1$ rep.saveStepAttribute(id_transformation, id_step, i, "field_format", nrfields[i].getFormat()); //$NON-NLS-1$ rep.saveStepAttribute(id_transformation, id_step, i, "field_length", nrfields[i].getLength()); //$NON-NLS-1$ rep.saveStepAttribute(id_transformation, id_step, i, "field_precision", nrfields[i].getPrecision()); //$NON-NLS-1$ } } catch (KettleException e) { throw new KettleStepException(e.getMessage()); } } /** * @return inputFields */ public XMLField[] getInputFields() { return inputFields; } /** * @param inputFields * inputFields */ public void setInputFields(XMLField[] inputFields) { this.inputFields = inputFields; } @Override public void setDefault() { int nrfields = 0; allocateInput(nrfields); XMLField[] xml = new XMLField[nrfields]; for (int i = 0; i < nrfields; i++) { XMLField inPutField = new XMLField(); xml[i] = inPutField; } setInputField(xml); } public StepDialogInterface getDialog(Shell shell, StepMetaInterface meta, TransMeta transMeta, String name) { return new JSONToStringDialog(shell, meta, transMeta, name); } @Override public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, Trans disp) { return new JSONToString(stepMeta, stepDataInterface, cnr, transMeta, disp); } @Override public StepDataInterface getStepData() { return new JSONToStringData(); } //控制错误输出 @Override public boolean supportsErrorHandling() { return true; } /** * @return url */ public String getOutputName() { return outputName; } /** * @param url * url */ public void setOutputName(String packageName) { this.outputName = packageName; } /** * @return filterField */ public XMLField[] getInputField() { return inputFields; } /** * @param filterField * filterField */ public void setInputField(XMLField[] inputFields) { this.inputFields = inputFields; } public ValueMetaAndData getValue() { return value; } public void setValue(ValueMetaAndData value) { this.value = value; } }
3:JSONToStringData:定义共享对象类
public class JSONToStringData extends BaseStepData implements StepDataInterface { public RowMetaInterface outputRowMeta; public SAXReader saxReader=null; public Configuration conf =null; public List<String> typeList=null; public FileSystem fs=null; public JSONToStringData() { super(); saxReader = new SAXReader(); conf= new Configuration(); typeList=new ArrayList<String>(); } }
4:JSONToStringDialog:kettle的ui管理界面
说明:extends BaseStepDialog implements StepDialogInterface
public class JSONToStringDialog extends BaseStepDialog implements StepDialogInterface { private static Class<?> PKG = JSONToStringMeta.class; // for i18n private JSONToStringMeta input; private ModifyListener lsMod; private CTabFolder wTabFolder; private FormData fdTabFolder; private CTabItem inputTab,outputTab,dbTab; private MessageBox msgError; private Label className_lable; private CCombo className_text; private FormData className_lform, className_tform; private Label input_lable; private TableView input_view; private FormData input_lform, input_tform; public JSONToStringDialog(Shell parent, Object in, TransMeta transMeta, String sname) { super(parent, (BaseStepMeta) in, transMeta, sname); input = (JSONToStringMeta) in; } @Override public String open() {// 初始化控制台 Shell parent = getParent(); Display display = parent.getDisplay(); shell = new Shell(parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN); props.setLook(shell); setShellImage(shell, input); lsMod = new ModifyListener() { @Override public void modifyText(ModifyEvent e) { input.setChanged(); } }; changed = input.hasChanged(); FormLayout formLayout = new FormLayout (); formLayout.marginWidth = Const.FORM_MARGIN; formLayout.marginHeight = Const.FORM_MARGIN; shell.setLayout(formLayout); shell.setText("json转string"); //$NON-NLS-1$ int middle = props.getMiddlePct(); int margin = Const.MARGIN; // 参数输入 wlStepname=new Label(shell, SWT.RIGHT); wlStepname.setText("名称"); //$NON-NLS-1$ props.setLook(wlStepname); fdlStepname=new FormData(); fdlStepname.left = new FormAttachment(0, 0); fdlStepname.right = new FormAttachment(middle, -margin); fdlStepname.top = new FormAttachment(0, 0); wlStepname.setLayoutData(fdlStepname); wStepname=new Text(shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER); wStepname.setText(stepname); props.setLook(wStepname); wStepname.addModifyListener(lsMod); fdStepname=new FormData(); fdStepname.left = new FormAttachment(middle, 0); fdStepname.top = new FormAttachment(0, margin); fdStepname.right= new FormAttachment(100, 0); wStepname.setLayoutData(fdStepname); wTabFolder = new CTabFolder(shell, SWT.BORDER); props.setLook(wTabFolder, Props.WIDGET_STYLE_TAB); wTabFolder.setSimple(false); FormLayout fieldsLayout = new FormLayout (); fieldsLayout.marginWidth = Const.FORM_MARGIN; fieldsLayout.marginHeight = Const.FORM_MARGIN; Composite inputComp = new Composite(wTabFolder, SWT.NONE); inputComp.setLayout(fieldsLayout); props.setLook(inputComp); inputTab = new CTabItem(wTabFolder, SWT.NONE); inputTab.setText("参数输入"); inputComp.layout(); inputTab.setControl(inputComp); fdTabFolder = new FormData(); fdTabFolder.left = new FormAttachment(0, 0); fdTabFolder.top = new FormAttachment(wStepname, margin); fdTabFolder.right = new FormAttachment(100, 0); fdTabFolder.bottom= new FormAttachment(100, -50); wTabFolder.setLayoutData(fdTabFolder); //输入包路径 className_lable = new Label(inputComp, SWT.RIGHT); className_lable.setText("输出字段:"); props.setLook(className_lable); className_lform = new FormData(); className_lform.left = new FormAttachment(0, 0); className_lform.right = new FormAttachment(middle, -margin); className_lform.top = new FormAttachment(wOK, margin); className_lable.setLayoutData(className_lform); className_text = new CCombo(inputComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER); props.setLook(className_text); className_text.addModifyListener(lsMod); className_tform = new FormData(); className_tform.left = new FormAttachment(middle, 0); className_tform.right = new FormAttachment(100, 0); className_tform.top = new FormAttachment(wOK, margin); className_text.setLayoutData(className_tform); input_lable=new Label(inputComp, SWT.NONE); input_lable.setText("输入参数配置:"); //$NON-NLS-1$ props.setLook(input_lable); input_lform=new FormData(); input_lform.left = new FormAttachment(0, 0); input_lform.right = new FormAttachment(100, 0); input_lform.top = new FormAttachment(className_text, -margin+2); input_lable.setLayoutData(input_lform); int FieldsRows=0; if(input.getInputField()!=null){ FieldsRows=input.getInputField().length; } // ColumnInfo[] colinf = new ColumnInfo[] { new ColumnInfo("字段", ColumnInfo.COLUMN_TYPE_CCOMBO, setRowMeta()), //$NON-NLS-1$ new ColumnInfo("改名为", ColumnInfo.COLUMN_TYPE_TEXT, false), //$NON-NLS-1$ new ColumnInfo("类型", ColumnInfo.COLUMN_TYPE_CCOMBO, ValueMeta.getTypes()), //$NON-NLS-1$ new ColumnInfo("长度", ColumnInfo.COLUMN_TYPE_TEXT, false), //$NON-NLS-1$ new ColumnInfo("精度", ColumnInfo.COLUMN_TYPE_TEXT, false), //$NON-NLS-1$ }; input_view = new TableView(transMeta, inputComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI | SWT.V_SCROLL | SWT.H_SCROLL, colinf, FieldsRows, lsMod, props); input_tform = new FormData(); input_tform.left = new FormAttachment(0, 0); input_tform.top = new FormAttachment(input_lable, margin); input_tform.right = new FormAttachment(100, 0); input_tform.bottom = new FormAttachment(100, 0); input_view.setLayoutData(input_tform); wOK=new Button(shell, SWT.PUSH); wOK.setText(BaseMessages.getString(PKG, "System.Button.OK")); wCancel=new Button(shell, SWT.PUSH); wCancel.setText(BaseMessages.getString(PKG, "System.Button.Cancel")); setButtonPositions(new Button[] { wOK, wCancel }, margin, wTabFolder); msgError = new MessageBox(shell, SWT.ICON_ERROR); setSize(); initInfo(); Listener lsCancel = new Listener() { @Override public void handleEvent(Event e) { cancel(); } }; Listener lsOk = new Listener() { @Override public void handleEvent(Event e) { ok(); } }; wCancel.addListener(SWT.Selection, lsCancel); wOK.addListener(SWT.Selection, lsOk); wTabFolder.setSelection(0); shell.open(); setRowMeta(); while (!shell.isDisposed()) { if (!display.readAndDispatch()) display.sleep(); } return stepname; } private String[] getDBMeta(){ return transMeta.getDatabaseNames(); } //初始化数据 private void initInfo(){ if(input.getOutputName()==null){ className_text.setText(""); }else{ className_text.setText(input.getOutputName()); } XMLField[] nrfields = input.getInputField(); for (int i = 0; i < nrfields.length; i++) { XMLField field = nrfields[i]; TableItem item = input_view.table.getItem(i); if (field.getFieldName()!=null) item.setText(1, field.getFieldName()); if (field.getFormat()!=null) item.setText(2, field.getFormat()); item.setText(3, field.getTypeDesc()); if (field.getLength()>=0) item.setText(4, ""+field.getLength()); if (field.getPrecision()>=0) item.setText(5, ""+field.getPrecision()); } } //设置列名 private String[] setRowMeta() { String[] rowNames = null; StepMeta stepMeta = transMeta.findStep(stepname); if (stepMeta != null) { try { RowMetaInterface row = transMeta.getPrevStepFields(stepMeta); if (row == null) { return null; } rowNames = new String[row.size()]; for (int i = 0; i < row.size(); i++) { rowNames[i] = row.getValueMeta(i).getName(); } } catch (KettleException e) { logError(BaseMessages.getString(PKG, "System.Dialog.GetFieldsFailed.Message")); } } return rowNames; } private void cancel() { dispose(); } //点击确定按钮操作,并保存数据 private void ok() { if (!"".equals(className_text.getText())) { input.setOutputName(className_text.getText()); int infields = input_view.nrNonEmpty(); input.allocateInput(infields); for (int i=0;i<infields;i++) { XMLField field = new XMLField(); TableItem item = input_view.getNonEmpty(i); field.setFieldName( item.getText(1) ); field.setFormat( item.getText(2) ); field.setType( item.getText(3) ); field.setLength( Const.toInt(item.getText(4), -1) ); field.setPrecision( Const.toInt(item.getText(5), -1) ); input.getInputFields()[i] = field; } dispose(); return; }else{ msgError.setMessage("请输入需要解析的对象"); } msgError.setMessage("请输入需要解析的对象"); msgError.open(); // } }
5:JSONToString
说明:实现json转string
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { meta = (JSONToStringMeta) smi; data = (JSONToStringData) sdi; //获得转换流程中的数据 Object[] row = getRow(); if (row == null) { setOutputDone(); return false; } // 初始数据 if (first) { first = false; //获得转换流程中的例字段 data.outputRowMeta = getInputRowMeta().clone(); //往转换流程中新增字段 meta.getFields(data.outputRowMeta, getStepname(), null, null, this); } if(meta.getOutputName()==null){ logError("no find class,class name is ["+meta.getOutputName()+ "]"); return false; } String msg = "Success"; String code = "200"; Map<String, Object> res = new HashMap<String, Object>(); res.put(RES_CODE, code); res.put(RES_MESSAGE, msg); Map<String,Object> items=getInputRows( row); //格式转换 String out=JSON.toJSONString(items); //往转换流程中新增数据 Object[] outData = RowDataUtil.addValueData(row, getInputRowMeta().size(), out); getRes(res, outData, row); return true; }
相关推荐
### Kettle插件开发 #### 一、Kettle简介与组件 Kettle,又称为PDI(Pentaho Data Integration),是一款开源的数据集成工具,它主要用于数据抽取、转换和加载(ETL)。Kettle最初由一家名为Pentaho的公司开发,并...
官方提供的Kettle插件开发示例,如"pdi-sdk-plugins-8.1.0.0-R"这个压缩包,涵盖了五种类型的插件示例,这为我们深入了解Kettle插件开发提供了宝贵的学习资源。下面将详细介绍这些类型以及它们在Kettle中的作用: 1...
- Kettle支持多种数据源,包括关系型数据库、XML、CSV等格式的数据文件,以及复杂的JSON文件。 #### 2. **JSON解析的重要性** - JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和...
标题 "kettle rabbitmq 插件开发" 涉及的是如何在 Pentaho Kettle(也称为 Spoon)中创建和使用 RabbitMQ 插件。Kettle 是一个开源的数据集成工具,它允许用户进行数据抽取、转换和加载(ETL)操作。RabbitMQ 是一个...
Kettle插件开发是其核心功能之一,允许用户根据需求扩展和定制工具,以满足特定的数据处理任务。下面将详细介绍Kettle插件开发的基本概念、步骤以及一些关键知识点。 1. **Kettle插件架构** Kettle插件基于Java...
平时使用kettle进行推送数据比较方便,可视化操作,不需要写代码,但是有些时候提供的插件不满足我们的需求,我们需要进行自定义插件的开发来满足我们的需求,所以官方提供了开发插件的demo例子,方便我们进行二次...
在这个步骤中,我们将前一步骤中的开始时间和结束时间拼接成json格式的参数。这个参数将被用于后续的http请求中。 第四步:获取参数中想要的数据,通过 jsonpath 语法获取上个步骤的数据 在这个步骤中,我们使用...
2 Kettle转换步骤插件开发 2.1 Kettle转换步骤插件开发概述 2.1.1 Kettle转换步骤插件至少需要实现四个接口 2.1.2 Kettle转换步骤插件各个类命名推荐规则 2.2 Kettle转换步骤插件开发例子 2.2.1 Kettle转换步骤...
kettle自定义拼接json格式输出,arcgis的json格式为例子,速度的话我跑了20万数据,16个字段,14000条/秒,机器是自己笔记本i5处理器,kettle给了4g内存
开发Kettle插件能够满足特定的数据处理需求,增强Kettle的功能,使其能够处理更复杂的数据集成场景。这包括与新的数据源集成、实现特定的数据转换算法或者优化特定的工作流程控制。 总结,Pentaho Kettle通过其直观...
- **开发新插件**:如果现有插件无法满足需求,可以自行开发新的插件来扩展 Kettle 的功能。开发新的插件一般需要熟悉 Kettle 的插件架构和 API。 - **调试现有插件**:在使用现有插件时遇到问题时,可以通过源码...
"Kettle插件的制作是Pentaho Data Integration(也称为Kettle或Spoon)中的一个重要环节,允许用户自定义转换和作业的行为,以满足特定的业务需求。下面将详细介绍如何制作Kettle插件,包括核心接口的实现、类结构...
Kettle支持插件开发,通过编写Java代码扩展其功能。要创建一个连接Clickhouse的插件,你需要遵循以下步骤: - 创建一个新的Java项目,并添加Kettle的API库以及clickhouse JDBC驱动到项目的类路径。 - 编写一个...
### Kettle转换步骤插件开发知识点详解 #### 1. Kettle介绍 Kettle是一款开源的数据集成工具,原名PDI(Pentaho Data Integration),它由Pentaho公司维护和支持。Kettle最初是由一位荷兰程序员开发的,后来在2006...
在数据整合和分析的场景中,Kettle连接SAP插件显得尤为重要,因为它可以方便地从SAP系统中提取数据,进行清洗、转换,然后加载到其他系统或数据仓库中。 ITN ERP Connector是专为Kettle设计的SAP连接组件,它的主要...
Kettle,又称Pentaho Data Integration(PDI),是一款强大的ETL(Extract, Transform, Load)工具,专门用于数据抽取、转换和加载。本示例将深入探讨如何使用Kettle进行简单数据转换。 在Kettle中,数据转换是通过...
《eteenterprisejava.pdf》可能涉及与Java企业级应用相关的知识,虽然不是直接关于Pentaho Kettle,但理解Java基础对于掌握Pentaho Kettle的自定义插件开发非常重要。因为Kettle是用Java编写并支持Java插件扩展,...
kettle插件开发必备,可以当作入门级材料。
使用这些插件,开发人员可以构建高度定制化的数据处理解决方案,满足企业级数据集成的需求。在Java应用程序中调用Kettle插件通常涉及以下步骤: 1. 添加Kettle库和插件到项目类路径。 2. 创建并配置Kettle的`...