拖拽控件User Defined Java Class
import java.io.File; import java.text.SimpleDateFormat; import java.util.*; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.plugins.PluginRegistry; import org.pentaho.di.core.plugins.RepositoryPluginType; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.repository.RepositoriesMeta; import org.pentaho.di.repository.Repository; import org.pentaho.di.repository.RepositoryDirectoryInterface; import org.pentaho.di.repository.RepositoryElementMetaInterface; import org.pentaho.di.repository.RepositoryMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.aliyun.openservices.oss.OSSClient; import com.aliyun.openservices.oss.model.GetObjectRequest; import com.aliyun.openservices.oss.model.OSSObjectSummary; import com.aliyun.openservices.oss.model.ObjectListing; private int accessIdIndex; private int accessKeyIndex; private int bucketNameIndex; private int prefixIndex; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r=getRow(); if (r==null) { setOutputDone(); return false; } if (first) { accessIdIndex = getInputRowMeta().indexOfValue(getParameter("ACCESS_ID")); if (accessIdIndex<0) { throw new KettleException("accessId field not found in the input row, check parameter 'accessId'!"); } accessKeyIndex = getInputRowMeta().indexOfValue(getParameter("ACCESS_KEY")); if (accessKeyIndex<0) { throw new KettleException("accessKey field not found in the input row, check parameter 'accessKey'!"); } bucketNameIndex = getInputRowMeta().indexOfValue(getParameter("BUCKET_NAME")); if (bucketNameIndex<0) { throw new KettleException("bucketName field not found in the input row, check parameter 'bucketName'!"); } prefixIndex = getInputRowMeta().indexOfValue(getParameter("PREFIX")); if (prefixIndex<0) { throw new KettleException("prefix field not found in the input row, check parameter 'prefix'!"); } first=false; } Object[] outputRowData = RowDataUtil.resizeArray(r, data.outputRowMeta.size()); int outputIndex = getInputRowMeta().size(); String accessId = getInputRowMeta().getString(r, accessIdIndex); String accessKey = getInputRowMeta().getString(r, accessKeyIndex); String bucketName = getInputRowMeta().getString(r, bucketNameIndex); String prefix = getInputRowMeta().getString(r, prefixIndex); downAliyunLog(accessId, accessKey, bucketName, prefix); putRow(data.outputRowMeta, outputRowData); return true; } public void downAliyunLog(String accessId, String accessKey, String bucketName, String prefix) { OSSClient client = new OSSClient(accessId, accessKey); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.DAY_OF_YEAR, -1); String date = new SimpleDateFormat("yyyy-MM-dd").format(calendar.getTime()); ObjectListing ol = client.listObjects(bucketName, prefix + date); List yesterdayList = ol.getObjectSummaries(); date = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); ol = client.listObjects(bucketName, prefix + date); List todayList = ol.getObjectSummaries(); List list = new ArrayList(); list.addAll(yesterdayList); list.addAll(todayList); File downloadPath = new File(System.getProperty("LOG_FOLDER")); if(!downloadPath.exists()) downloadPath.mkdirs(); File downloadPathBak = new File(System.getProperty("DOWN_LOG_FOLDER")); if(!downloadPathBak.exists()) downloadPathBak.mkdirs(); List file = Arrays.asList(downloadPath.list()); List fileBak = Arrays.asList(downloadPathBak.list()); for (int i = 0; i < list.size(); i++) { OSSObjectSummary ossObjectSummary = (OSSObjectSummary) list.get(i); if(!file.contains(ossObjectSummary.getKey()) && !fileBak.contains(ossObjectSummary.getKey())) client.getObject(new GetObjectRequest(bucketName, ossObjectSummary.getKey()), new File(downloadPath, ossObjectSummary.getKey())); } System.out.println(list.size()); }
相关推荐
描述中提到"由于文件大小超过1000mb,这里分成两部分上传,下载之后合并成一个文件即可。" 这意味着这个安装包非常大,可能包含了大量的组件、库和资源,以支持各种复杂的数据操作。用户在下载时需要注意,必须获取...
在安装过程中,可能需要更换软件源,例如将阿里云的软件源添加到系统,以提高下载速度和稳定性。安装完毕后,启动MySQL Workbench,可以进行数据库的设计和管理。 接着,我们需要安装Java开发环境,因为许多ETL工具...
在当前数字化转型深入发展的背景下,企业对于日志数据平台的需求日益增大,尤其是高并发...在实践分享中,阿里云栖开发者沙龙合肥专场分享了企业级应用架构实践,这为本资料中的架构演进之路提供了实际案例和应用背景。
数据存储方面,分布式文件/对象存储如COS(腾讯云)、OSS(阿里云)、OBS(华为云)、KODO(七牛云)、UFile(UCloud)和HDFS提供了高可用性和扩展性的存储解决方案。这些系统通常支持大规模的非结构化数据。同时,...
而“工具”标签则表明可能讨论了一种现成的软件工具,如ETL工具(如Kettle)、数据库同步工具(如阿里云的DTS)等。 压缩包内的“表增量同步.ktr”文件名,根据Kettle(也称为Pentaho Data Integration,简称PDI)...