`

kettle阿里云下载日志文件

 
阅读更多

拖拽控件User Defined Java Class

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.*;

import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.RepositoryPluginType;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoriesMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.RepositoryElementMetaInterface;
import org.pentaho.di.repository.RepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aliyun.openservices.oss.OSSClient;
import com.aliyun.openservices.oss.model.GetObjectRequest;
import com.aliyun.openservices.oss.model.OSSObjectSummary;
import com.aliyun.openservices.oss.model.ObjectListing;

private int accessIdIndex;
private int accessKeyIndex;
private int bucketNameIndex;
private int prefixIndex;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException 
{
  Object[] r=getRow();
  if (r==null)
  {
    setOutputDone();
	return false;
  }

  if (first) {
     accessIdIndex = getInputRowMeta().indexOfValue(getParameter("ACCESS_ID"));
     if (accessIdIndex<0) {
         throw new KettleException("accessId field not found in the input row, check parameter 'accessId'!");
     }
     accessKeyIndex = getInputRowMeta().indexOfValue(getParameter("ACCESS_KEY"));
     if (accessKeyIndex<0) {
         throw new KettleException("accessKey field not found in the input row, check parameter 'accessKey'!");
     }
     bucketNameIndex = getInputRowMeta().indexOfValue(getParameter("BUCKET_NAME"));
     if (bucketNameIndex<0) {
         throw new KettleException("bucketName field not found in the input row, check parameter 'bucketName'!");
     }
     prefixIndex = getInputRowMeta().indexOfValue(getParameter("PREFIX"));
     if (prefixIndex<0) {
         throw new KettleException("prefix field not found in the input row, check parameter 'prefix'!");
     }
     first=false;
  }
 
  Object[] outputRowData = RowDataUtil.resizeArray(r, data.outputRowMeta.size());
  int outputIndex = getInputRowMeta().size();

  String accessId = getInputRowMeta().getString(r, accessIdIndex);
  String accessKey = getInputRowMeta().getString(r, accessKeyIndex);
  String bucketName = getInputRowMeta().getString(r, bucketNameIndex);
  String prefix = getInputRowMeta().getString(r, prefixIndex);
  downAliyunLog(accessId, accessKey, bucketName, prefix);

  putRow(data.outputRowMeta, outputRowData);

  return true;
}

	public void downAliyunLog(String accessId, String accessKey, String bucketName, String prefix) {
        OSSClient client = new OSSClient(accessId, accessKey);
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.DAY_OF_YEAR, -1);

        String date = new SimpleDateFormat("yyyy-MM-dd").format(calendar.getTime());
        ObjectListing ol = client.listObjects(bucketName, prefix + date);
        List yesterdayList = ol.getObjectSummaries();
        
        

        date = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
        ol = client.listObjects(bucketName, prefix + date);
        List todayList = ol.getObjectSummaries();
        

        List list = new ArrayList();
        list.addAll(yesterdayList);
        list.addAll(todayList);
        
        File downloadPath = new File(System.getProperty("LOG_FOLDER"));
        if(!downloadPath.exists())
        	downloadPath.mkdirs();
        File downloadPathBak = new File(System.getProperty("DOWN_LOG_FOLDER"));
        if(!downloadPathBak.exists())
        	downloadPathBak.mkdirs();
        List file = Arrays.asList(downloadPath.list());
        List fileBak = Arrays.asList(downloadPathBak.list());
        for (int i = 0; i < list.size(); i++) {
        	OSSObjectSummary ossObjectSummary = (OSSObjectSummary) list.get(i);
        	if(!file.contains(ossObjectSummary.getKey()) && !fileBak.contains(ossObjectSummary.getKey()))
                client.getObject(new GetObjectRequest(bucketName, ossObjectSummary.getKey()), new File(downloadPath, ossObjectSummary.getKey()));
		}
        System.out.println(list.size());
	}

 

分享到:
评论

相关推荐

    pdi-ce-9.3.0.0-428a安装包-1(kettle)

    描述中提到"由于文件大小超过1000mb,这里分成两部分上传,下载之后合并成一个文件即可。" 这意味着这个安装包非常大,可能包含了大量的组件、库和资源,以支持各种复杂的数据操作。用户在下载时需要注意,必须获取...

    ETL实验1-搭建环境

    在安装过程中,可能需要更换软件源,例如将阿里云的软件源添加到系统,以提高下载速度和稳定性。安装完毕后,启动MySQL Workbench,可以进行数据库的设计和管理。 接着,我们需要安装Java开发环境,因为许多ETL工具...

    02于俊大规模日志数据平台架构面临的问题与挑战.pdf

    在当前数字化转型深入发展的背景下,企业对于日志数据平台的需求日益增大,尤其是高并发...在实践分享中,阿里云栖开发者沙龙合肥专场分享了企业级应用架构实践,这为本资料中的架构演进之路提供了实际案例和应用背景。

    大数据技术体系图谱.pptx

    数据存储方面,分布式文件/对象存储如COS(腾讯云)、OSS(阿里云)、OBS(华为云)、KODO(七牛云)、UFile(UCloud)和HDFS提供了高可用性和扩展性的存储解决方案。这些系统通常支持大规模的非结构化数据。同时,...

    异构表同步

    而“工具”标签则表明可能讨论了一种现成的软件工具,如ETL工具(如Kettle)、数据库同步工具(如阿里云的DTS)等。 压缩包内的“表增量同步.ktr”文件名,根据Kettle(也称为Pentaho Data Integration,简称PDI)...

Global site tag (gtag.js) - Google Analytics