`
laserdance
  • 浏览: 92172 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

日志统计平台2

阅读更多
ImportTask里面我们就实现了ftp下载日志文件,然后批量导入数据库,其实该task是一个线程的实现类,
public class ImportTask implements Runnable {
	final static Logger logger = LoggerFactory.getLogger(ImportTask.class);	
         private Element taskEle;
	private String taskName;
	/**
	 * Construct for ImportTask.java.
	 */
	public ImportTask(Element ele ) {
		this.taskEle = ele;
		this.taskName = taskEle.getAttribute("name");
	}
	
	protected void doTask(Date preDate) {
		try {
			//批量导数据
			BatchImport2DB bidb = new BatchImport2DB();
//设置数据源,连接数据库			
bidb.setDataSource( GloalDataUtil.getDataSource());
			bidb.init(taskEle);
			//下载日志文件
			DownloadFileLog dflog = new DownloadFileLog();//初始化下载日志的一些参数
			dflog.init(taskEle);
			long logStart = System.currentTimeMillis();
//返加根据日期下载的日志文件名列表,downloadlogfile就是apache的软件实现的ftp下载			
String[] arrFileName = dflog.downloadLogFile(preDate);
			long logEnd = System.currentTimeMillis();
			logger.info(taskName+"查询符合条件的日志共用时(ms)[ " + (logEnd - logStart) + " ]");
			for(String fileName: arrFileName){
				// 调用BatchImport2DB.class
				long batchStart = System.currentTimeMillis();
				ICSVReader csvReader = (ICSVReader) Class.forName(taskEle.getAttribute("csvReader")).newInstance();
				String csvpattern = taskEle.getAttribute("csvpattern");
//批导入
				if(csvReader.beginReader(fileName,csvpattern)) bidb.batchImport(csvReader);		
				long batchCost = (System.currentTimeMillis()- batchStart);
				logger.info("将[ " + fileName + " ]导入数据库共用时[ " + (batchCost) + " ]ms");
			}		
		} catch (Exception e) {
			logger.error(e.toString(),e);
		}
	}

	public void run() {
		//only use for test
//		doTask( new Date(1216098921590l - 24 * 60 *60 *1000L));
		//该任务是得到当前日期的前一天的日志.
		doTask( new Date(System.currentTimeMillis()- 24 * 60 *60 *1000L));
	}
}

该类就实现了sftp或ftp下载,然后再将这些文件导入数据库中,具体的是由csvreader来读取,然后则batchimport2db来导入数据库,读取就不在这写了不外是流读写,然后一行一行的读.
BatchImport2DB.java
public class BatchImport2DB {
	// 得到DB,连接数据库,操作sql
	private List<Field> fieldList = new ArrayList<Field>();
	private String sql;
	private static int BATCH_SIZE = 100;
	protected static final Logger logger = LoggerFactory
			.getLogger(BatchImport2DB.class);
	/**
	 * spring container invoke and the data source is setter IOC
	 */
	private JdbcTemplate jdbcTemplate;//spring最佳实践
	public void setDataSource(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}
	public void init(Element ele) {
		// 得到insertsql语句
		Element sqlEle = (Element) ele.getElementsByTagName("sql").item(0);
		sql = sqlEle.getAttribute("insert");
		BATCH_SIZE = Integer.parseInt(sqlEle.getAttribute("maxCommitNumber"));
		Element tabEle = (Element) XmlManager.querySingle("table", ele);
		NodeList nlc = tabEle.getElementsByTagName("column");
		for (int i = 0; i < nlc.getLength(); i++) {
			Element nmEle = (Element) nlc.item(i);
			// 将字段名称加入list表中
			String name = nmEle.getAttribute("name");
			String type = nmEle.getAttribute("type");
			int csvindex = Integer.parseInt(nmEle.getAttribute("csvindex"));
			String format = nmEle.getAttribute("format");
			fieldList.add(new Field(name, type, csvindex, format));
		}
	}
	/**
	 * Method for batchImport 用spring jdbcTemplate来插入sql.
	 * 
	 * @param csvReader
	 */
	public void batchImport(final ICSVReader csvReader) {
		// 得到sql clause
		InterruptibleBatchPreparedStatementSetter psset = new InterruptibleBatchPreparedStatementSetter() {
			public void setValues(PreparedStatement ps, int pos)
					throws SQLException {
				try {
					int leng = fieldList.size();
					String[] arrvalue = null;
					do {
						arrvalue = csvReader.readLine();
						if (!csvReader.lineIsNotNull()) {
							return;
						}

					} while (arrvalue.length < leng);
					// 处理结果数组
					for (int i = 0; i < leng; i++) {
						Field f = fieldList.get(i);
						if ("datetime".equals(f.type)) {
							ps.setTimestamp(i + 1, StringUtil.parse2Timestamp(
									arrvalue[f.csvindex], f.format));
						} else if ("int".equals(f.type)) {
							ps.setInt(i + 1, Integer
									.parseInt(arrvalue[f.csvindex]));
						} else if ("long".equals(f.type)) {
							ps.setLong(i + 1, Long
									.parseLong(arrvalue[f.csvindex]));
						} else if ("text".equals(f.type)) {
							ps.setString(i + 1, arrvalue[f.csvindex].substring(
									0, 1));
						} else {
							String content = arrvalue[f.csvindex];
							if (content.length() > 100) {
								content = content.substring(0, 99);
							}
							ps.setString(i + 1, content);
						}
					}
				} catch (SQLException ex) {
					logger.error(ex.toString(), ex);
					throw ex;
				} catch (Exception ex) {
					logger.error(ex.toString(), ex);
					throw new SQLException(ex.toString());
				}
			}
			public int getBatchSize() {
				return BATCH_SIZE;
			}
			 // 判断批处理是否完成.
			public boolean isBatchExhausted(int pos) {
				return !csvReader.lineIsNotNull();
			}
		};
		do {
			jdbcTemplate.batchUpdate(sql, psset);
		} while (csvReader.lineIsNotNull());
	}
	static class Field {
		String format;
		String name;
		String type;
		int csvindex;
		Field(String name, String type, int index, String format) {
			this.name = name;
			this.type = type;
			this.csvindex = index;
			this.format = format;
		}
	}
}



至此,所有的下载啊,导入啊全部完成.还未完成的就是SqlServer存储过程,以及web界面的图表显示.下面文章我们只讲jfreechart的图表显示.不讲Sqlserver存储过程.
分享到:
评论

相关推荐

    从日志统计到大数据分析.pdf

    在2008年,日志统计平台主要负责收集和处理各种系统的运行日志,这些日志包含了丰富的系统运行状态和用户行为信息。然而,当时的平台存在诸多问题,如需求响应周期长,运维成本高,运行速度慢,这不仅限制了工作效率...

    唯品会日志平台建设

    日志计算平台则涉及对收集到的日志数据进行深度加工和分析,如实时处理、存储和统计等。这部分内容在文档中没有详细的介绍,但可以推测,唯品会的日志计算平台可能包含了数据仓库、数据分析引擎等组件。数据可视化...

    企业级电商网站的大数据统计分析平台源码+项目说明(以 Spark 框架为核心,对电商网站的日志进行离线和实时分析).zip

    【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学...企业级电商网站的大数据统计分析平台源码+项目说明(以 Spark 框架为核心,对电商网站的日志进行离线和实时分析)...

    java基于spark streaming和kafka,hbase的日志统计分析系统.rar

    在现代大数据处理领域,Java、Kafka、Spark和HBase是四大关键组件,它们共同构建了一个高效、实时的日志统计分析系统。这个系统利用这些工具的特性,实现了从数据采集到处理再到存储的完整流程。 首先,Kafka是...

    唯品会日志平台架构介绍

    ### 唯品会日志平台架构介绍 #### 唯品会简介 唯品会是一家专注于特卖的电商平台,自2012年3月23日在纽约证券交易所上市以来,凭借其独特的商业模式和高效的运营策略,在电商领域取得了显著的成绩,并成为少数实现...

    从日志统计到大数据分析.pptx

    2008年,日志统计平台的出现是这个过程的起点。这些平台主要用于收集和处理来自各种系统的日志数据,以获取运营和性能指标。然而,这样的系统存在一些明显的局限性,比如需求响应周期长,运维成本高,运行速度慢,...

    日志易V2.0入门手册

    通过安装日志易Agent,可以方便地采集本地系统的日志数据,并将其传输到日志易平台进行处理。 2. **rsyslog Agent**:这种方式适用于已经使用rsyslog5.8.0或更高版本的环境。用户需要拥有sudo权限,并通过配置...

    02于俊大规模日志数据平台架构面临的问题与挑战.pdf

    在当前数字化转型深入发展的背景下,企业对于日志数据平台的需求日益增大,尤其是高并发、大规模、多维度的数据处理与分析能力。本资料中提到的“于俊大规模日志数据平台架构面临的问题与挑战”主要涉及到了大规模...

    云智综合日志审计平台产品白皮书.pdf

    该平台可以对大量分散设备的异构日志进行统一管理、集中存储、统计分析、快速查询,并提供真正可靠的事件追责依据和业务运行的深度安全。 云智综合日志审计平台的核心特点包括: 1. 多类型数据采集:支持多种网络...

    c#log日志类和日志分析器(源码)

    3. **统计和报告**:生成统计图表和报告,展示日志信息的分布、趋势和异常情况。 4. **报警和通知**:当发现潜在问题或达到预设阈值时,发送报警通知。 5. **日志聚合**:对来自多个源的日志进行整合和分析,以全局...

    云智综合日志审计平台产品白皮书.docx

    2. 提高安全性:该平台提供了强大的分析功能,能够对大量分散设备的异构日志进行统一管理、集中存储、统计分析和快速查询,帮助用户检测和防范安全风险。 3. 提高效率:平台支持多维度日志审计、海量日志分类检索、...

    人人网数据服务平台:基于日志分析的数据系统架构

    2. **Mini-batch(迷你批处理)**:针对大量统计需求,Mini-batch提供了面向用户的系统、内部查询平台和中间数据源的支持。它旨在实现分钟级别的数据处理,适用于结果数据量较小(每日不超过百万行)的场景,特别...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    《基于Spark Streaming、Kafka与HBase的日志统计分析系统详解》 在现代大数据处理领域,实时数据分析成为了不可或缺的一部分。为了实现高效的日志统计分析,技术栈通常会结合多个组件,如Spark Streaming、Kafka...

    Web服务器日志统计分析完全解决方案[定义].pdf

    【Web服务器日志统计分析完全解决方案】 Web服务器日志统计分析是互联网服务提供商(ICP)和网站管理员不可或缺的一项工作,旨在了解网站的运行状态、访问量和用户行为。通过对Web服务器日志文件的深入分析,可以...

    Webiase-日志分析工具 使用手册

    ### Webiase - 强大的日志分析工具 #### 一、概述 Webiase是一款基于Java技术构建的日志分析工具。其设计理念强调简洁与易用性,旨在为用户提供高效且直观的日志分析体验。该工具的核心优势在于其友好的用户界面、...

Global site tag (gtag.js) - Google Analytics