项目需求:
需要统计一下线上日志中某些信息每天出现的频率,举个简单的例子,统计线上每天的请求总数和异常请求数。线上大概几十台
服务器,每台服务器大概每天产生4到5G左右的日志,假设有30台,每台5G的,一天产生的日志总量为150G。
处理方案:
方案1:传统的处理方式,写个JAVA日志分析代码,部署到每台服务器进行处理,这种方式部署起来耗时费力,又不好维护。
方案2:采用Hadoop分布式处理,日志分析是Hadoop集群系统的拿手好戏。150G每天的日志也算是比较大的数据量了,搭个简
单的Hadoop集群来处理这些日志是再好不过的了。
Hadoop集群的搭建:
参见这两篇文章:http://www.cnblogs.com/cstar/archive/2012/12/16/2820209.html
http://www.cnblogs.com/cstar/archive/2012/12/16/2820220.html
我们这里的集群就采用了两台机器,配置每台8核,32G内存,500G磁盘空间。
日志准备工作:
由于日志分散在各个服务器,所以我们先需要将所有的日志拷贝到我们的集群系统当中,这个可以通过linux服务器下rsync或者scp
服务来执行。这里我们通过scp服务来拷贝,由于都是内网的机器,所以拷贝几个G的日志可以很快就完成。下面是拷贝日志的脚本,脚本
还是有一些需要注意的地方,我们只需要拷贝前一天的数据,实际保存的数据可能是好几天的,所以我们只要把我们需要的这一天的数据
SCP过去就可以了。
#!/bin/sh workdir=/home/myproj/bin/log/ files=`ls $workdir` pre1date=`date + "%Y%m%d" -d "-1 days" `
pre1date1=`date + "%Y-%m-%d" -d "-1 days" `
curdate=`date + "%Y%m%d" `
hostname=`uname -n` echo $pre1date $curdate uploadpath= "/home/hadoop/hadoop/mytest/log/" $pre1date1 "/" $hostname
echo $uploadpath cd $workdir mintime= 240000
secondmintime= 0
for file in $files; do
filedate=`stat $file | grep Modify| awk '{print $2}' |sed -e 's/-//g' `
filetime=`stat $file | grep Modify| awk '{print $3}' |cut -d "." -f1 | sed -e 's/://g' | sed 's/^0\+//' `
if [ $filedate -eq $curdate ]; then
if [ $filetime -lt $mintime ]; then
secondmintime=$mintime
mintime=$filetime
fi
fi
done echo "mintime:" $mintime
step= 1000
mintime=`expr $mintime + $step` echo "mintime+1000:" $mintime
for file in $files; do
filedate=`stat $file | grep Modify| awk '{print $2}' |sed -e 's/-//g' `
filetime=`stat $file | grep Modify| awk '{print $3}' |cut -d "." -f1 | sed -e 's/://g' | sed 's/^0\+//' `
filename=`echo $file | cut -c 1 - 8 `
startchars= "info.log"
#echo $filename
if [ $filename == $startchars ]; then
if [ $filedate -eq $pre1date ]; then
scp -rp $file dir @antix2 :$uploadpath
#echo $file
elif [ $filedate -eq $curdate ]; then
if [ $filetime -lt $mintime ]; then
scp -rp $file dir @antix2 :$uploadpath
#echo $file
fi
fi
fi
#echo $filedate $filetime
done |
MapReduce代码
接下来就是编写MapReduce的代码了。使用Eclipse环境来编写,需要安装hadoop插件,我们hadoop机器采用的是1.1.1版本,所以插
件使用hadoop-eclipse-plugin-1.1.1.jar,将插件拷贝到eclipse的plugins目录下就可以了。然后新建一个MapReduce项目:
工程新建好了然后我们就可以编写我们的MapReduce代码了。
import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class LogAnalysis { public static class LogMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private Text hourWord = new Text(); public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); SimpleDateFormat formatter2 = new SimpleDateFormat("yy-MM-dd"); java.util.Date d1 =new Date(); d1.setTime(System.currentTimeMillis()-1*24*3600*1000); String strDate =formatter2.format(d1); if(line.contains(strDate)){ String[] strArr = line.split(","); int len = strArr[0].length(); String time = strArr[0].substring(1,len-1); String[] timeArr = time.split(":"); String strHour = timeArr[0]; String hour = strHour.substring(strHour.length()-2,strHour.length()); String hourKey = ""; if(line.contains("StartASocket")){ word.set("SocketCount"); context.write(word, one); hourKey = "SocketCount:" + hour; hourWord.set(hourKey); context.write(hourWord, one); word.clear(); hourWord.clear(); } if(line.contains("SocketException")){ word.set("SocketExceptionCount"); context.write(word, one); hourKey = "SocketExceptionCount:" + hour; hourWord.set(hourKey); context.write(hourWord, one); word.clear(); hourWord.clear(); }
} } public static class LogReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static int run(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: loganalysis <in> <out>"); System.exit(2); } FileSystem fileSys = FileSystem.get(conf); String inputPath = "input/" + args[0]; fileSys.copyFromLocalFile(new Path(args[0]), new Path(inputPath));//将本地文件系统的文件拷贝到HDFS中 Job job = new Job(conf, "loganalysis"); job.setJarByClass(LogAnalysis.class); job.setMapperClass(LogMapper.class); job.setCombinerClass(LogReducer.class); job.setReducerClass(LogReducer.class); // 设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); Date startTime = new Date(); System.out.println("Job started: " + startTime); int ret = job.waitForCompletion(true)? 0 : 1; fileSys.copyToLocalFile(new Path(otherArgs[1]), new Path(otherArgs[1])); fileSys.delete(new Path(inputPath), true); fileSys.delete(new Path(otherArgs[1]), true); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); return ret; } public static void main(String[] args) { try { int ret = run(args); System.exit(ret); } catch (Exception e) { e.printStackTrace(); System.out.println(e.getMessage()); } } }
部署到Hadoop集群:
代码完成后测试没有问题后,部署到集群当中去执行,我们有几十台服务器,所以每台的服务器的日志当成一个任务来执行。
workdir="/home/hadoop/hadoop/mytest" cd $workdir pre1date=`date +"%Y-%m-%d" -d "-1 days"` servers=(mach1 mach2 mach3 ) for i in ${servers[@]};do inputPath="log/"$pre1date"/"$i outputPath="output/log/"$pre1date"/"$i echo $inputPath $outputPath echo "start job "$i" date:"`date` hadoop jar LogAnalysis.jar loganalysis $inputPath $outputPath echo "end job "$i" date:"`date` done
相关推荐
基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...
本次要实践的数据日志来源于国内某技术学习论坛,该论坛由某培训机构主办,汇聚了众多技术学习者,每天都有人发帖、回帖。...两个日志文件,一共有200MB,符合大数据量级,可以作为推荐系统数据集和hadoop测试集。
- **日志处理**:大量日志文件的收集和分析也是HDFS的一个常见应用场景。 - **备份与归档**:由于HDFS的高容错性和低成本特性,它也适用于数据的长期存储和备份。 #### 总结 Hadoop分布式文件系统(HDFS)作为大...
4. **日志分析**:最后一步是对存储的日志文件进行分析,提取有价值的信息,帮助管理员诊断问题和优化系统性能。 #### 基于Hadoop的网络日志分析系统设计与实现 针对日志数据的特点,设计了一套基于Hadoop的网络...
本文将详细阐述Hadoop日志的存放位置、存储规则以及如何进行修改。 Hadoop的日志通常包括各个组件的日志,如ResourceManager、MapReduce、Container等。这些日志的默认存储位置可能会因环境和配置的不同而有所变化...
### 基于Hadoop集群的分布式日志分析系统研究 #### 一、Hadoop及其在日志分析中的应用背景 随着互联网技术的飞速发展,各类Web2.0网站、电子商务平台以及大型网络游戏产生了空前的数据量。这些系统在运行过程中会...
《基于Hadoop的数据分析系统详解》 在当今大数据时代,数据的价值日益凸显,而有效处理海量数据的关键技术之一就是Hadoop。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据,尤其适合...
《HDFS——Hadoop分布式文件系统深度实践》这本书是针对Hadoop分布式文件系统(HDFS)的详尽指南,旨在帮助读者深入理解HDFS的工作原理、设计思想以及在实际应用中的最佳实践。HDFS是Apache Hadoop项目的核心组件之...
在日志分析中,Map阶段通常负责解析日志文件,抽取出关键字段,而Reduce阶段则聚合这些数据,进行统计分析。 4. **Hive**:Hive是一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL-like查询...
3.1.2 元数据更新及日志写入情景分析 3.1.3 Checkpoint过程情景分析 3.1.4 元数据可靠性机制 3.1.5 元数据一致性机制 3.2 使用说明 第4章 Hadoop的Backup Node方案 4.1 Backup Node概述 4.1.1 系统架构 4.1.2 使用...
本文将深入探讨"基于Hadoop的日志行为分析系统",结合人工智能技术,来理解如何利用Hadoop进行大规模日志数据的收集、存储、处理和分析。 一、Hadoop基础 Hadoop是由Apache基金会开发的分布式计算框架,它允许在...
- 源数据文件准备:下载日志文件并将其复制到Hadoop虚拟机,使用hdfs命令将文件上传到HDFS,创建对应目录并确保文件正确无误。 - Python MapReduce脚本开发:编写mapper和reducer程序,用于清洗日志数据,提取关键...
标题 "Hadoop日志分析的数据包" 涉及到的核心技术是大数据处理与分析,主要使用了Hadoop框架。Hadoop是Apache软件基金会开发的一个开源分布式计算平台,它允许在低成本硬件上处理大规模数据。在这个数据包中,我们有...
【标题】:“2022毕业设计,基于 Hadoop 的游戏数据分析系统源码” 这个毕业设计项目主要聚焦于使用Hadoop框架开发一个游戏数据分析系统。Hadoop是Apache软件基金会的一个开源分布式计算平台,专为处理和存储大规模...
2. 数据清洗模块:对采集到的日志数据进行预处理,包括去除无关数据、解析日志文件中的关键信息,如用户访问时间、IP地址、请求的URL等。 3. 数据存储模块:将清洗后的数据存储在Hive提供的数据仓库中,通常需要...
此外,Hadoop社区还开发了一些专门用于Hadoop日志分析的工具,如Apache Hadoop Log4j Viewer,方便用户快速定位和解析日志。 总之,理解和分析Hadoop启动日志是Hadoop运维工作的重要一环,通过深入挖掘这些日志,...
利用HQL语言以及Hadoop分布式文件系统(HDFS)和MapReduce编程模式对海量搜索日志进行分析处理, 对用户搜索行为进行了分析研究。对用户搜索行为中的查询热点主题、用户点击数和URL排名、查询会话的分析结果对于搜索...
为了验证基于Hadoop的Web日志挖掘平台的有效性和效率,研究者们在Hadoop集群上进行了实验,使用改进后的混合算法对大量的Web日志文件进行了处理。实验结果表明,相比于传统单一节点的数据挖掘系统,基于Hadoop的Web...