`
laserdance
  • 浏览: 92527 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

日志统计平台2

阅读更多
ImportTask里面我们就实现了ftp下载日志文件,然后批量导入数据库,其实该task是一个线程的实现类,
public class ImportTask implements Runnable {
	final static Logger logger = LoggerFactory.getLogger(ImportTask.class);	
         private Element taskEle;
	private String taskName;
	/**
	 * Construct for ImportTask.java.
	 */
	public ImportTask(Element ele ) {
		this.taskEle = ele;
		this.taskName = taskEle.getAttribute("name");
	}
	
	protected void doTask(Date preDate) {
		try {
			//批量导数据
			BatchImport2DB bidb = new BatchImport2DB();
//设置数据源,连接数据库			
bidb.setDataSource( GloalDataUtil.getDataSource());
			bidb.init(taskEle);
			//下载日志文件
			DownloadFileLog dflog = new DownloadFileLog();//初始化下载日志的一些参数
			dflog.init(taskEle);
			long logStart = System.currentTimeMillis();
//返加根据日期下载的日志文件名列表,downloadlogfile就是apache的软件实现的ftp下载			
String[] arrFileName = dflog.downloadLogFile(preDate);
			long logEnd = System.currentTimeMillis();
			logger.info(taskName+"查询符合条件的日志共用时(ms)[ " + (logEnd - logStart) + " ]");
			for(String fileName: arrFileName){
				// 调用BatchImport2DB.class
				long batchStart = System.currentTimeMillis();
				ICSVReader csvReader = (ICSVReader) Class.forName(taskEle.getAttribute("csvReader")).newInstance();
				String csvpattern = taskEle.getAttribute("csvpattern");
//批导入
				if(csvReader.beginReader(fileName,csvpattern)) bidb.batchImport(csvReader);		
				long batchCost = (System.currentTimeMillis()- batchStart);
				logger.info("将[ " + fileName + " ]导入数据库共用时[ " + (batchCost) + " ]ms");
			}		
		} catch (Exception e) {
			logger.error(e.toString(),e);
		}
	}

	public void run() {
		//only use for test
//		doTask( new Date(1216098921590l - 24 * 60 *60 *1000L));
		//该任务是得到当前日期的前一天的日志.
		doTask( new Date(System.currentTimeMillis()- 24 * 60 *60 *1000L));
	}
}

该类就实现了sftp或ftp下载,然后再将这些文件导入数据库中,具体的是由csvreader来读取,然后则batchimport2db来导入数据库,读取就不在这写了不外是流读写,然后一行一行的读.
BatchImport2DB.java
public class BatchImport2DB {
	// 得到DB,连接数据库,操作sql
	private List<Field> fieldList = new ArrayList<Field>();
	private String sql;
	private static int BATCH_SIZE = 100;
	protected static final Logger logger = LoggerFactory
			.getLogger(BatchImport2DB.class);
	/**
	 * spring container invoke and the data source is setter IOC
	 */
	private JdbcTemplate jdbcTemplate;//spring最佳实践
	public void setDataSource(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}
	public void init(Element ele) {
		// 得到insertsql语句
		Element sqlEle = (Element) ele.getElementsByTagName("sql").item(0);
		sql = sqlEle.getAttribute("insert");
		BATCH_SIZE = Integer.parseInt(sqlEle.getAttribute("maxCommitNumber"));
		Element tabEle = (Element) XmlManager.querySingle("table", ele);
		NodeList nlc = tabEle.getElementsByTagName("column");
		for (int i = 0; i < nlc.getLength(); i++) {
			Element nmEle = (Element) nlc.item(i);
			// 将字段名称加入list表中
			String name = nmEle.getAttribute("name");
			String type = nmEle.getAttribute("type");
			int csvindex = Integer.parseInt(nmEle.getAttribute("csvindex"));
			String format = nmEle.getAttribute("format");
			fieldList.add(new Field(name, type, csvindex, format));
		}
	}
	/**
	 * Method for batchImport 用spring jdbcTemplate来插入sql.
	 * 
	 * @param csvReader
	 */
	public void batchImport(final ICSVReader csvReader) {
		// 得到sql clause
		InterruptibleBatchPreparedStatementSetter psset = new InterruptibleBatchPreparedStatementSetter() {
			public void setValues(PreparedStatement ps, int pos)
					throws SQLException {
				try {
					int leng = fieldList.size();
					String[] arrvalue = null;
					do {
						arrvalue = csvReader.readLine();
						if (!csvReader.lineIsNotNull()) {
							return;
						}

					} while (arrvalue.length < leng);
					// 处理结果数组
					for (int i = 0; i < leng; i++) {
						Field f = fieldList.get(i);
						if ("datetime".equals(f.type)) {
							ps.setTimestamp(i + 1, StringUtil.parse2Timestamp(
									arrvalue[f.csvindex], f.format));
						} else if ("int".equals(f.type)) {
							ps.setInt(i + 1, Integer
									.parseInt(arrvalue[f.csvindex]));
						} else if ("long".equals(f.type)) {
							ps.setLong(i + 1, Long
									.parseLong(arrvalue[f.csvindex]));
						} else if ("text".equals(f.type)) {
							ps.setString(i + 1, arrvalue[f.csvindex].substring(
									0, 1));
						} else {
							String content = arrvalue[f.csvindex];
							if (content.length() > 100) {
								content = content.substring(0, 99);
							}
							ps.setString(i + 1, content);
						}
					}
				} catch (SQLException ex) {
					logger.error(ex.toString(), ex);
					throw ex;
				} catch (Exception ex) {
					logger.error(ex.toString(), ex);
					throw new SQLException(ex.toString());
				}
			}
			public int getBatchSize() {
				return BATCH_SIZE;
			}
			 // 判断批处理是否完成.
			public boolean isBatchExhausted(int pos) {
				return !csvReader.lineIsNotNull();
			}
		};
		do {
			jdbcTemplate.batchUpdate(sql, psset);
		} while (csvReader.lineIsNotNull());
	}
	static class Field {
		String format;
		String name;
		String type;
		int csvindex;
		Field(String name, String type, int index, String format) {
			this.name = name;
			this.type = type;
			this.csvindex = index;
			this.format = format;
		}
	}
}



至此,所有的下载啊,导入啊全部完成.还未完成的就是SqlServer存储过程,以及web界面的图表显示.下面文章我们只讲jfreechart的图表显示.不讲Sqlserver存储过程.
分享到:
评论

相关推荐

    从日志统计到大数据分析.pdf

    在2008年,日志统计平台主要负责收集和处理各种系统的运行日志,这些日志包含了丰富的系统运行状态和用户行为信息。然而,当时的平台存在诸多问题,如需求响应周期长,运维成本高,运行速度慢,这不仅限制了工作效率...

    唯品会日志平台建设

    日志计算平台则涉及对收集到的日志数据进行深度加工和分析,如实时处理、存储和统计等。这部分内容在文档中没有详细的介绍,但可以推测,唯品会的日志计算平台可能包含了数据仓库、数据分析引擎等组件。数据可视化...

    企业级电商网站的大数据统计分析平台源码+项目说明(以 Spark 框架为核心,对电商网站的日志进行离线和实时分析).zip

    【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学...企业级电商网站的大数据统计分析平台源码+项目说明(以 Spark 框架为核心,对电商网站的日志进行离线和实时分析)...

    java基于spark streaming和kafka,hbase的日志统计分析系统.rar

    在现代大数据处理领域,Java、Kafka、Spark和HBase是四大关键组件,它们共同构建了一个高效、实时的日志统计分析系统。这个系统利用这些工具的特性,实现了从数据采集到处理再到存储的完整流程。 首先,Kafka是...

    唯品会日志平台架构介绍

    ### 唯品会日志平台架构介绍 #### 唯品会简介 唯品会是一家专注于特卖的电商平台,自2012年3月23日在纽约证券交易所上市以来,凭借其独特的商业模式和高效的运营策略,在电商领域取得了显著的成绩,并成为少数实现...

    从日志统计到大数据分析.pptx

    2008年,日志统计平台的出现是这个过程的起点。这些平台主要用于收集和处理来自各种系统的日志数据,以获取运营和性能指标。然而,这样的系统存在一些明显的局限性,比如需求响应周期长,运维成本高,运行速度慢,...

    日志易V2.0入门手册

    通过安装日志易Agent,可以方便地采集本地系统的日志数据,并将其传输到日志易平台进行处理。 2. **rsyslog Agent**:这种方式适用于已经使用rsyslog5.8.0或更高版本的环境。用户需要拥有sudo权限,并通过配置...

    02于俊大规模日志数据平台架构面临的问题与挑战.pdf

    在当前数字化转型深入发展的背景下,企业对于日志数据平台的需求日益增大,尤其是高并发、大规模、多维度的数据处理与分析能力。本资料中提到的“于俊大规模日志数据平台架构面临的问题与挑战”主要涉及到了大规模...

    c#log日志类和日志分析器(源码)

    3. **统计和报告**:生成统计图表和报告,展示日志信息的分布、趋势和异常情况。 4. **报警和通知**:当发现潜在问题或达到预设阈值时,发送报警通知。 5. **日志聚合**:对来自多个源的日志进行整合和分析,以全局...

    云智综合日志审计平台产品白皮书.docx

    2. 提高安全性:该平台提供了强大的分析功能,能够对大量分散设备的异构日志进行统一管理、集中存储、统计分析和快速查询,帮助用户检测和防范安全风险。 3. 提高效率:平台支持多维度日志审计、海量日志分类检索、...

    人人网数据服务平台:基于日志分析的数据系统架构

    2. **Mini-batch(迷你批处理)**:针对大量统计需求,Mini-batch提供了面向用户的系统、内部查询平台和中间数据源的支持。它旨在实现分钟级别的数据处理,适用于结果数据量较小(每日不超过百万行)的场景,特别...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    《基于Spark Streaming、Kafka与HBase的日志统计分析系统详解》 在现代大数据处理领域,实时数据分析成为了不可或缺的一部分。为了实现高效的日志统计分析,技术栈通常会结合多个组件,如Spark Streaming、Kafka...

    云智综合日志审计平台产品白皮书.pdf

    云智综合日志审计平台是针对企业日志审计需求量身打造的解决方案,旨在帮助企业管理者通过日志审计功能来提高安全风险管控能力和满足合规性要求。平台具备高性能日志采集能力、强大的分析功能、多维度日志审计、多样...

    Web服务器日志统计分析完全解决方案[定义].pdf

    【Web服务器日志统计分析完全解决方案】 Web服务器日志统计分析是互联网服务提供商(ICP)和网站管理员不可或缺的一项工作,旨在了解网站的运行状态、访问量和用户行为。通过对Web服务器日志文件的深入分析,可以...

    Webiase-日志分析工具 使用手册

    ### Webiase - 强大的日志分析工具 #### 一、概述 Webiase是一款基于Java技术构建的日志分析工具。其设计理念强调简洁与易用性,旨在为用户提供高效且直观的日志分析体验。该工具的核心优势在于其友好的用户界面、...

Global site tag (gtag.js) - Google Analytics