- 浏览: 27286 次
- 性别:
- 来自: 大连
文章分类
最新评论
-
dalongxn:
这是什么问题的,我现在也遇到了 求教、
Stop Tomcat: java.net.ConnectException
作者 岑文初 发布于 2008年8月13日 上午1时54分
── 分布式计算开源框架Hadoop入门实践(三)
Hadoop基本流程
一个图片太大了,只好分割成为两部分。根据流程图来说一下具体一个任务执行的情况。
- 在分布式环境中客户端创建任务并提交。
- InputFormat做Map前的预处理,主要负责以下工作:
- 验证输入的格式是否符合JobConfig的输入定义,这个在实现Map和构建Conf的时候就会知道,不定义可以是Writable的任意子类。
- 将input的文件切分为逻辑上的输入InputSplit,其实这就是在上面提到的在分布式文件系统中blocksize是有大小限制的,因此大文件会被划分为多个block。
- 通过RecordReader来再次处理inputsplit为一组records,输出给Map。(inputsplit只是逻辑切分的第一步,但是如何根据文件中的信息来切分还需要RecordReader来实现,例如最简单的默认方式就是回车换行的切分)
- RecordReader处理后的结果作为Map的输入,Map执行定义的Map逻辑,输出处理后的key和value对应到临时中间文件。
- Combiner可选择配置,主要作用是在每一个Map执行完分析以后,在本地优先作Reduce的工作,减少在Reduce过程中的数据传输量。
- Partitioner可选择配置,主要作用是在多个Reduce的情况下,指定Map的结果由某一个Reduce处理,每一个Reduce都会有单独的输出文件。(后面的代码实例中有介绍使用场景)
- Reduce执行具体的业务逻辑,并且将处理结果输出给OutputFormat。
- OutputFormat的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否如Config中配置,最后输出Reduce汇总后的结果。
业务场景和代码范例
业务场景描述: 可设定输入和输出路径(操作系统的路径非HDFS路径),根据访问日志分析某一个应用访问某一个API的总次数和总流量,统计后分别输出到两个文件中。这里仅仅为了测试,没有去细分很多类,将所有的类都归并于一个类便于说明问题。
测试代码类图
LogAnalysiser就是主类,主要负责创建、提交任务,并且输出部分信息。内部的几个子类用途可以参看流程中提到的角色职责。具体地看看几个类和方法的代码片断:
LogAnalysiser::MapClass
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, LongWritable>
{
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();//没有配置RecordReader,所以默认采用line的实现,key就是行号,value就是行内容
if (line == null || line.equals(""))
return;
String[] words = line.split(",");
if (words == null || words.length < 8)
return;
String appid = words[1];
String apiName = words[2];
LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
Text record = new Text();
record.set(new StringBuffer("flow::").append(appid)
.append("::").append(apiName).toString());
reporter.progress();
output.collect(record, recbytes);//输出流量的统计结果,通过flow::作为前缀来标示。
record.clear();
record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
output.collect(record, new LongWritable(1));//输出次数的统计结果,通过count::作为前缀来标示
}
}
LogAnalysiser:: PartitionerClass
public static class PartitionerClass implements Partitioner<Text, LongWritable>
{
public int getPartition(Text key, LongWritable value, int numPartitions)
{
if (numPartitions >= 2)//Reduce 个数,判断流量还是次数的统计分配到不同的Reduce
if (key.toString().startsWith("flow::"))
return 0;
else
return 1;
else
return 0;
}
public void configure(JobConf job){}
}
LogAnalysiser:: CombinerClass
参看ReduceClass,通常两者可以使用一个,不过这里有些不同的处理就分成了两个。在ReduceClass中蓝色的行表示在CombinerClass中不存在。
LogAnalysiser:: ReduceClass
public static class ReduceClass extends MapReduceBase
implements Reducer<Text, LongWritable,Text, LongWritable>
{
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
{
Text newkey = new Text();
newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
LongWritable result = new LongWritable();
long tmp = 0;
int counter = 0;
while(values.hasNext())//累加同一个key的统计结果
{
tmp = tmp + values.next().get();
counter = counter +1;//担心处理太久,JobTracker长时间没有收到报告会认为TaskTracker已经失效,因此定时报告一下
if (counter == 1000)
{
counter = 0;
reporter.progress();
}
}
result.set(tmp);
output.collect(newkey, result);//输出最后的汇总结果
}
}
LogAnalysiser
public static void main(String[] args)
{
try
{
run(args);
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();
if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/root" + shortin;
shortout = "/user/root" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn't dir!");
return;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}
JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//构建Config
FileSystem fileSys = FileSystem.get(conf);
fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//将本地文件系统的文件拷贝到HDFS中
conf.setJobName("analysisjob");
conf.setOutputKeyClass(Text.class);//输出的key类型,在OutputFormat会检查
conf.setOutputValueClass(LongWritable.class); //输出的value类型,在OutputFormat会检查
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(CombinerClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setPartitionerClass(PartitionerClass.class);
conf.set("mapred.reduce.tasks", "2");//强制需要有两个Reduce来分别处理流量和次数的统计
FileInputFormat.setInputPaths(conf, shortin);//hdfs中的输入路径
FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中输出路径
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(conf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
//删除输入和输出的临时文件
fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
fileSys.delete(new Path(shortin),true);
fileSys.delete(new Path(shortout),true);
}
以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务Class为一个可标示的命令,让hadoop jar可以执行。
public class ExampleDriver {
public static void main(String argv[]){
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
pgd.driver(argv);
}
catch(Throwable e){
e.printStackTrace();
}
}
}
将代码打成jar,并且设置jar的mainClass为ExampleDriver这个类。在分布式环境启动以后执行如下语句:
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out
在/home/wenchu/test-in中是需要分析的日志文件,执行后就会看见整个执行过程,包括了Map和Reduce的进度。执行完毕会 在/home/wenchu/test-out下看到输出的内容。有两个文件:part-00000和part-00001分别记录了统计后的结果。 如果需要看执行的具体情况,可以看在输出目录下的_logs/history/xxxx_analysisjob,里面罗列了所有的Map,Reduce 的创建情况以及执行情况。在运行期也可以通过浏览器来查看Map,Reduce的情况:http://MasterIP:50030 /jobtracker.jsp
Hadoop集群测试
首先这里使用上面的范例作为测试,也没有做太多的优化配置,这个测试结果只是为了看看集群的效果,以及一些参数配置的影响。
文件复制数为1,blocksize 5M
Slave数 | 处理记录数(万条) | 执行时间(秒) |
2 | 95 | 38 |
2 | 950 | 337 |
4 | 95 | 24 |
4 | 950 | 178 |
6 | 95 | 21 |
6 | 950 | 114 |
Blocksize 5M
Slave数 | 处理记录数(万条) | 执行时间(秒) |
2(文件复制数为1) | 950 | 337 |
2(文件复制数为3) | 950 | 339 |
6(文件复制数为1) | 950 | 114 |
6(文件复制数为3) | 950 | 117 |
文件复制数为1
Slave数 | 处理记录数(万条) | 执行时间(秒) |
6(blocksize 5M) | 95 | 21 |
6(blocksize 77M) | 95 | 26 |
4(blocksize 5M) | 950 | 178 |
4(blocksize 50M) | 950 | 54 |
6(blocksize 5M) | 950 | 114 |
6(blocksize 50M) | 950 | 44 |
6(blocksize 77M) | 950 | 74 |
测试的数据结果很稳定,基本测几次同样条件下都是一样。通过测试结果可以看出以下几点:
- 机器数对于性能还是有帮助的(等于没说^_^)。
- 文件复制数的增加只对安全性有帮助,但是对于性能没有太多帮助。而且现在采取的是将操作系统文件拷贝到HDFS中,所以备份多了,准备的时间很长。
- blocksize对于性能影响很大,首先如果将block划分的太小,那么将会增加job的数量,同时也增加了协作的代价,降低了性能,但是配置的太大也会让job不能最大化并行处理。所以这个值的配置需要根据数据处理的量来考虑。
- 最后就是除了这个表里面列出来的结果,应该去仔细看输出目录中的_logs/history中的xxx_analysisjob这个文件,里面记录了全部的执行过程以及读写情况。这个可以更加清楚地了解哪里可能会更加耗时。
随想
“云计算”热的烫手,就和SAAS、Web2及SNS等一样,往往都是在搞概念,只有真正踏踏实实的大型互联网公司,才会投入人力物力去研究符合自 己的分布式计算。其实当你的数据量没有那么大的时候,这种分布式计算也就仅仅只是一个玩具而已,只有在真正解决问题的过程中,它深层次的问题才会被挖掘出 来。
这三篇文章(分布式计算开源框架Hadoop介绍,Hadoop中的集群配置和使用技巧)仅仅是为了给对分布式计算有兴趣的朋友抛个砖,要想真的掘到金 子,那么就踏踏实实的去用、去想、去分析。或者自己也会更进一步地去研究框架中的实现机制,在解决自己问题的同时,也能够贡献一些什么。
前几日看到有人跪求成为架构师的方式,看了有些可悲,有些可笑,其实有多少架构师知道什么叫做架构?架构师的职责是什么?与其追求这么一个名号,还不如踏踏实实地做块石头沉到水底。要知道,积累和沉淀的过程就是一种成长。
相关阅读:
作者介绍: 岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。没有什么擅长或者精通,工作到现在唯一提升的就是学习能力和速度。个人Blog为:http://blog.csdn.net/cenwenchu79 。
志愿参与InfoQ中文站内容建设,请邮件至editors@cn.infoq.com 。也欢迎大家到InfoQ中文站用户讨论组 参与我们的线上讨论。
相关推荐
总之,《Hadoop集群程序设计与开发教材最终代码》是深入学习Hadoop及其应用的宝贵资源,涵盖了从基础概念到实际编程的各个方面。通过研究这些代码,学习者可以提升自己在大数据处理领域的技能,掌握分布式计算的关键...
《Hadoop应用开发与案例实战(慕课版)》是一门深入探讨大数据处理技术的课程,主要聚焦在Hadoop平台上进行应用开发的实践操作。这门课通过丰富的PPT课件,旨在帮助学习者理解Hadoop的核心概念,掌握其开发技巧,并...
通过深入学习这份《Hadoop集群程序设计与开发》的PPT,开发者不仅能了解Hadoop的基本概念,还能掌握实际项目中的应用技巧,从而在大数据时代中发挥出更大的价值。对于希望从事大数据处理和分析的IT专业人士来说,这...
从文件中提供的信息来看,董西成在其关于Hadoop YARN程序设计与应用案例的演讲中,涵盖了Hadoop YARN的定义、架构、API和开发步骤、应用类型、以及YARN API所涉及的通信协议和客户端库等核心知识点。以下是对这些...
【Hadoop集群程序设计与开发】是一门针对大数据技术类专业的必修课程,旨在让学生全面理解和掌握Hadoop框架。这门课程总共64学时,4.0学分,涵盖了从Hadoop的基本概念到实际应用的各个层面。 课程首先介绍了大数据...
- 大数据的处理流程主要包括数据采集与预处理、数据存储与数据分析、以及数据可视化与应用。 - 技术层面,数据采集通常使用ETL工具将来自不同数据源的数据抽取到临时存储区进行清洗、转换和集成,之后加载至数据...
MapReduce是Hadoop的主要计算框架,它的核心流程分为Map、Shuffle、Reduce三个阶段: 1. Map阶段:输入数据被拆分成多个块(Splitting),然后分配给多个Mapper进行处理。Mapper实现用户定义的逻辑,将输入的key/...
基于 Eclipse 的 Hadoop 应用开发环境配置是指在 Eclipse 中配置 Hadoop 开发环境,以便开发和运行 Hadoop 应用程序。本节将对基于 Eclipse 的 Hadoop 应用开发环境配置进行详细介绍。 一、Hadoop 概述 Hadoop 是...
InfoQ 提供的相关资料如"InfoQ Hadoop基本流程与应用开发"、"InfoQ 分布式计算开源框架Hadoop介绍"和"InfoQ Hadoop中的集群配置和使用技巧"将帮助你深入学习这些主题,为你的Hadoop之旅提供坚实的理论基础和实践指导...
【Yarn资源调度器】是Hadoop大数据处理框架的核心组件之一,主要负责集群资源的管理和分配,...此外,课程还提供了实训文档和演示视频,以帮助学员将理论知识转化为实践能力,提升在Hadoop集群程序设计与开发中的技能。
课程的重点在于Hadoop的介绍以及离线数据分析流程的理解,难点可能在于如何将这些理论知识与实际问题相结合,形成有效的数据处理策略。 教学目标不仅要求学生理解大数据的基本概念,还期望他们能掌握大数据学习的...
### Hadoop在雅虎的应用详解 #### 一、引言 随着互联网的飞速发展,海量数据处理成为了各个大型互联网公司的必备技术能力。雅虎作为全球知名的互联网公司,在早期便开始采用并发展Hadoop这一开源分布式计算框架来...
因此,为了简化开发流程,提高效率,通常会搭建一个专门的开发环境来编写和测试Hadoop应用程序。本文档将详细介绍如何搭建这样一个环境。 #### 二、HadoopEclipse插件介绍 HadoopEclipse插件是专为Eclipse开发环境...
标题中的“Hadoop应用程序”指的是基于Hadoop框架开发的应用,这个框架主要用于大数据处理。Hadoop是Apache软件基金会下的一个开源项目,它提供了分布式文件系统(HDFS)和数据处理工具(MapReduce)来处理和存储大...
【标题】"Hadoop Shell操作与程序开发"涵盖了在分布式计算环境Hadoop中进行命令行交互和编写应用程序的核心概念。Hadoop是一个开源框架,专为处理和存储大量数据而设计,它利用分布式文件系统(HDFS)和MapReduce...
本篇将详细介绍如何配置基于Eclipse的Hadoop应用开发环境。 首先,我们需要了解Hadoop的基本构成。Hadoop主要由两个核心部分组成:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是分布式文件系统,用于...
从Eclipse官网下载适合Windows 7的Eclipse IDE版本,如Java EE版本,因为它包含了对Web和企业级应用开发的支持。 3. **安装Hadoop**: 下载Hadoop的稳定版本,解压到一个合适的目录,并配置`HADOOP_HOME`环境变量...
通过对Hadoop的基本原理、国内应用现状以及开发环境搭建等内容的学习,可以帮助开发者更好地利用Hadoop进行大数据处理工作。随着技术的不断发展,Hadoop及相关组件也在不断演进,未来将会更加成熟和完善。