`
PowerNTT
  • 浏览: 27181 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

Hadoop基本流程与应用开发

阅读更多

作者 岑文初 发布于 2008年8月13日 上午1时54分

社区
Java
主题
网格计算 ,
集群与缓存
标签
Hadoop

── 分布式计算开源框架Hadoop入门实践(三)

Hadoop基本流程

一个图片太大了,只好分割成为两部分。根据流程图来说一下具体一个任务执行的情况。

  1. 在分布式环境中客户端创建任务并提交。
  2. InputFormat做Map前的预处理,主要负责以下工作:
    1. 验证输入的格式是否符合JobConfig的输入定义,这个在实现Map和构建Conf的时候就会知道,不定义可以是Writable的任意子类。
    2. 将input的文件切分为逻辑上的输入InputSplit,其实这就是在上面提到的在分布式文件系统中blocksize是有大小限制的,因此大文件会被划分为多个block。
    3. 通过RecordReader来再次处理inputsplit为一组records,输出给Map。(inputsplit只是逻辑切分的第一步,但是如何根据文件中的信息来切分还需要RecordReader来实现,例如最简单的默认方式就是回车换行的切分)
  3. RecordReader处理后的结果作为Map的输入,Map执行定义的Map逻辑,输出处理后的key和value对应到临时中间文件。
  4. Combiner可选择配置,主要作用是在每一个Map执行完分析以后,在本地优先作Reduce的工作,减少在Reduce过程中的数据传输量。
  5. Partitioner可选择配置,主要作用是在多个Reduce的情况下,指定Map的结果由某一个Reduce处理,每一个Reduce都会有单独的输出文件。(后面的代码实例中有介绍使用场景)
  6. Reduce执行具体的业务逻辑,并且将处理结果输出给OutputFormat。
  7. OutputFormat的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否如Config中配置,最后输出Reduce汇总后的结果。

业务场景和代码范例

业务场景描述: 可设定输入和输出路径(操作系统的路径非HDFS路径),根据访问日志分析某一个应用访问某一个API的总次数和总流量,统计后分别输出到两个文件中。这里仅仅为了测试,没有去细分很多类,将所有的类都归并于一个类便于说明问题。


测试代码类图

LogAnalysiser就是主类,主要负责创建、提交任务,并且输出部分信息。内部的几个子类用途可以参看流程中提到的角色职责。具体地看看几个类和方法的代码片断:

LogAnalysiser::MapClass

    public static class MapClass extends MapReduceBase
        implements Mapper<LongWritable, Text, Text, LongWritable> 
    {
        public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
                throws IOException
        {    
            String line = value.toString();//没有配置RecordReader,所以默认采用line的实现,key就是行号,value就是行内容
            if (line == null || line.equals(""))
                return;
            String[] words = line.split(",");
            if (words == null || words.length < 8)
                return;
            String appid = words[1];
            String apiName = words[2];
            LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
            Text record = new Text();
            record.set(new StringBuffer("flow::").append(appid)
                            .append("::").append(apiName).toString());
            reporter.progress();
            output.collect(record, recbytes);//输出流量的统计结果,通过flow::作为前缀来标示。
            record.clear();
            record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
            output.collect(record, new LongWritable(1));//输出次数的统计结果,通过count::作为前缀来标示
        }    
    }


LogAnalysiser:: PartitionerClass

    public static class PartitionerClass implements Partitioner<Text, LongWritable>
    {
        public int getPartition(Text key, LongWritable value, int numPartitions)
        {
            if (numPartitions >= 2)//Reduce 个数,判断流量还是次数的统计分配到不同的Reduce
                if (key.toString().startsWith("flow::"))
                    return 0;
                else
                    return 1;
            else
                return 0;
        }
        public void configure(JobConf job){}    
}

LogAnalysiser:: CombinerClass

参看ReduceClass,通常两者可以使用一个,不过这里有些不同的处理就分成了两个。在ReduceClass中蓝色的行表示在CombinerClass中不存在。

LogAnalysiser:: ReduceClass

    public static class ReduceClass extends MapReduceBase
        implements Reducer<Text, LongWritable,Text, LongWritable> 
    {
        public void reduce(Text key, Iterator<LongWritable> values,
                OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
        {
            Text newkey = new Text();
            newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
            LongWritable result = new LongWritable();
            long tmp = 0;
            int counter = 0;
            while(values.hasNext())//累加同一个key的统计结果
            {
                tmp = tmp + values.next().get();
                
                counter = counter +1;//担心处理太久,JobTracker长时间没有收到报告会认为TaskTracker已经失效,因此定时报告一下
                if (counter == 1000)
                {
                    counter = 0;
                    reporter.progress();
                }
            }
            result.set(tmp);
            output.collect(newkey, result);//输出最后的汇总结果
        }    
    }

LogAnalysiser

	public static void main(String[] args)
	{
		try
		{
			run(args);
		} catch (Exception e)
		{
			e.printStackTrace();
		}
	}
	public static void run(String[] args) throws Exception
	{
		if (args == null || args.length <2)
		{
			System.out.println("need inputpath and outputpath");
			return;
		}
		String inputpath = args[0];
		String outputpath = args[1];
		String shortin = args[0];
		String shortout = args[1];
		if (shortin.indexOf(File.separator) >= 0)
			shortin = shortin.substring(shortin.lastIndexOf(File.separator));
		if (shortout.indexOf(File.separator) >= 0)
			shortout = shortout.substring(shortout.lastIndexOf(File.separator));
		SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
		shortout = new StringBuffer(shortout).append("-")
			.append(formater.format(new Date())).toString();
		
		
		if (!shortin.startsWith("/"))
			shortin = "/" + shortin;
		if (!shortout.startsWith("/"))
			shortout = "/" + shortout;
		shortin = "/user/root" + shortin;
		shortout = "/user/root" + shortout;			
		File inputdir = new File(inputpath);
		File outputdir = new File(outputpath);
		if (!inputdir.exists() || !inputdir.isDirectory())
		{
			System.out.println("inputpath not exist or isn't dir!");
			return;
		}
		if (!outputdir.exists())
		{
			new File(outputpath).mkdirs();
		}
		
		JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//构建Config
		FileSystem fileSys = FileSystem.get(conf);
		fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//将本地文件系统的文件拷贝到HDFS中

		conf.setJobName("analysisjob");
		conf.setOutputKeyClass(Text.class);//输出的key类型,在OutputFormat会检查
		conf.setOutputValueClass(LongWritable.class); //输出的value类型,在OutputFormat会检查
		conf.setMapperClass(MapClass.class);
		conf.setCombinerClass(CombinerClass.class);
		conf.setReducerClass(ReduceClass.class);
		conf.setPartitionerClass(PartitionerClass.class);
		conf.set("mapred.reduce.tasks", "2");//强制需要有两个Reduce来分别处理流量和次数的统计
		FileInputFormat.setInputPaths(conf, shortin);//hdfs中的输入路径
		FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中输出路径
		
		Date startTime = new Date();
	    	System.out.println("Job started: " + startTime);
	    	JobClient.runJob(conf);
	    	Date end_time = new Date();
	    	System.out.println("Job ended: " + end_time);
	    	System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
	    	//删除输入和输出的临时文件
		fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
		fileSys.delete(new Path(shortin),true);
		fileSys.delete(new Path(shortout),true);
	}


以上的代码就完成了所有的逻辑性代码,然后还需要一个注册驱动类来注册业务Class为一个可标示的命令,让hadoop jar可以执行。

public class ExampleDriver {
  public static void main(String argv[]){
    ProgramDriver pgd = new ProgramDriver();
    try {
      pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
      pgd.driver(argv);
    }
    catch(Throwable e){
      e.printStackTrace();
    }
  }
}

将代码打成jar,并且设置jar的mainClass为ExampleDriver这个类。在分布式环境启动以后执行如下语句:

hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out

在/home/wenchu/test-in中是需要分析的日志文件,执行后就会看见整个执行过程,包括了Map和Reduce的进度。执行完毕会 在/home/wenchu/test-out下看到输出的内容。有两个文件:part-00000和part-00001分别记录了统计后的结果。 如果需要看执行的具体情况,可以看在输出目录下的_logs/history/xxxx_analysisjob,里面罗列了所有的Map,Reduce 的创建情况以及执行情况。在运行期也可以通过浏览器来查看Map,Reduce的情况:http://MasterIP:50030 /jobtracker.jsp

Hadoop集群测试

首先这里使用上面的范例作为测试,也没有做太多的优化配置,这个测试结果只是为了看看集群的效果,以及一些参数配置的影响。

文件复制数为1,blocksize 5M

Slave数 处理记录数(万条) 执行时间(秒)
2 95 38
2 950 337
4 95 24
4 950 178
6 95 21
6 950 114

Blocksize 5M

Slave数 处理记录数(万条) 执行时间(秒)
2(文件复制数为1) 950 337
2(文件复制数为3) 950 339
6(文件复制数为1) 950 114
6(文件复制数为3) 950 117

文件复制数为1

Slave数 处理记录数(万条) 执行时间(秒)
6(blocksize 5M) 95 21
6(blocksize 77M) 95 26
4(blocksize 5M) 950 178
4(blocksize 50M) 950 54
6(blocksize 5M) 950 114
6(blocksize 50M) 950 44
6(blocksize 77M) 950 74

测试的数据结果很稳定,基本测几次同样条件下都是一样。通过测试结果可以看出以下几点:

  1. 机器数对于性能还是有帮助的(等于没说^_^)。
  2. 文件复制数的增加只对安全性有帮助,但是对于性能没有太多帮助。而且现在采取的是将操作系统文件拷贝到HDFS中,所以备份多了,准备的时间很长。
  3. blocksize对于性能影响很大,首先如果将block划分的太小,那么将会增加job的数量,同时也增加了协作的代价,降低了性能,但是配置的太大也会让job不能最大化并行处理。所以这个值的配置需要根据数据处理的量来考虑。
  4. 最后就是除了这个表里面列出来的结果,应该去仔细看输出目录中的_logs/history中的xxx_analysisjob这个文件,里面记录了全部的执行过程以及读写情况。这个可以更加清楚地了解哪里可能会更加耗时。

随想

“云计算”热的烫手,就和SAAS、Web2及SNS等一样,往往都是在搞概念,只有真正踏踏实实的大型互联网公司,才会投入人力物力去研究符合自 己的分布式计算。其实当你的数据量没有那么大的时候,这种分布式计算也就仅仅只是一个玩具而已,只有在真正解决问题的过程中,它深层次的问题才会被挖掘出 来。

这三篇文章(分布式计算开源框架Hadoop介绍,Hadoop中的集群配置和使用技巧)仅仅是为了给对分布式计算有兴趣的朋友抛个砖,要想真的掘到金 子,那么就踏踏实实的去用、去想、去分析。或者自己也会更进一步地去研究框架中的实现机制,在解决自己问题的同时,也能够贡献一些什么。

前几日看到有人跪求成为架构师的方式,看了有些可悲,有些可笑,其实有多少架构师知道什么叫做架构?架构师的职责是什么?与其追求这么一个名号,还不如踏踏实实地做块石头沉到水底。要知道,积累和沉淀的过程就是一种成长。

相关阅读:

  1. 分布式计算开源框架Hadoop介绍――分布式计算开源框架Hadoop入门实践(一)
  2. Hadoop中的集群配置和使用技巧――分布式计算开源框架Hadoop入门实践(二)

作者介绍: 岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。没有什么擅长或者精通,工作到现在唯一提升的就是学习能力和速度。个人Blog为:http://blog.csdn.net/cenwenchu79

志愿参与InfoQ中文站内容建设,请邮件至editors@cn.infoq.com 。也欢迎大家到InfoQ中文站用户讨论组 参与我们的线上讨论。

分享到:
评论

相关推荐

    Hadoop集群程序设计与开发教材最终代码.zip

    总之,《Hadoop集群程序设计与开发教材最终代码》是深入学习Hadoop及其应用的宝贵资源,涵盖了从基础概念到实际编程的各个方面。通过研究这些代码,学习者可以提升自己在大数据处理领域的技能,掌握分布式计算的关键...

    Hadoop应用开发与案例实战(慕课版)-课件PPT.rar

    《Hadoop应用开发与案例实战(慕课版)》是一门深入探讨大数据处理技术的课程,主要聚焦在Hadoop平台上进行应用开发的实践操作。这门课通过丰富的PPT课件,旨在帮助学习者理解Hadoop的核心概念,掌握其开发技巧,并...

    Hadoop集群程序设计与开发PPT.rar

    通过深入学习这份《Hadoop集群程序设计与开发》的PPT,开发者不仅能了解Hadoop的基本概念,还能掌握实际项目中的应用技巧,从而在大数据时代中发挥出更大的价值。对于希望从事大数据处理和分析的IT专业人士来说,这...

    董西成:Hadoop YARN程序设计与应用案例

    从文件中提供的信息来看,董西成在其关于Hadoop YARN程序设计与应用案例的演讲中,涵盖了Hadoop YARN的定义、架构、API和开发步骤、应用类型、以及YARN API所涉及的通信协议和客户端库等核心知识点。以下是对这些...

    Hadoop集群程序设计与开发教学大纲.docx

    【Hadoop集群程序设计与开发】是一门针对大数据技术类专业的必修课程,旨在让学生全面理解和掌握Hadoop框架。这门课程总共64学时,4.0学分,涵盖了从Hadoop的基本概念到实际应用的各个层面。 课程首先介绍了大数据...

    hadoop大数据平台技术与应用 --课后习题参考答案.pdf

    - 大数据的处理流程主要包括数据采集与预处理、数据存储与数据分析、以及数据可视化与应用。 - 技术层面,数据采集通常使用ETL工具将来自不同数据源的数据抽取到临时存储区进行清洗、转换和集成,之后加载至数据...

    hadoop基本流程与mapReduce应用开发.pdf

    MapReduce是Hadoop的主要计算框架,它的核心流程分为Map、Shuffle、Reduce三个阶段: 1. Map阶段:输入数据被拆分成多个块(Splitting),然后分配给多个Mapper进行处理。Mapper实现用户定义的逻辑,将输入的key/...

    基于Eclipse的Hadoop应用开发环境配置

    基于 Eclipse 的 Hadoop 应用开发环境配置是指在 Eclipse 中配置 Hadoop 开发环境,以便开发和运行 Hadoop 应用程序。本节将对基于 Eclipse 的 Hadoop 应用开发环境配置进行详细介绍。 一、Hadoop 概述 Hadoop 是...

    hadoop 入门

    InfoQ 提供的相关资料如"InfoQ Hadoop基本流程与应用开发"、"InfoQ 分布式计算开源框架Hadoop介绍"和"InfoQ Hadoop中的集群配置和使用技巧"将帮助你深入学习这些主题,为你的Hadoop之旅提供坚实的理论基础和实践指导...

    大数据课程-Hadoop集群程序设计与开发-5.Yarn资源调度器_lk_edit.pptx

    【Yarn资源调度器】是Hadoop大数据处理框架的核心组件之一,主要负责集群资源的管理和分配,...此外,课程还提供了实训文档和演示视频,以帮助学员将理论知识转化为实践能力,提升在Hadoop集群程序设计与开发中的技能。

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf

    课程的重点在于Hadoop的介绍以及离线数据分析流程的理解,难点可能在于如何将这些理论知识与实际问题相结合,形成有效的数据处理策略。 教学目标不仅要求学生理解大数据的基本概念,还期望他们能掌握大数据学习的...

    Hadoop在雅虎的应用

    ### Hadoop在雅虎的应用详解 #### 一、引言 随着互联网的飞速发展,海量数据处理成为了各个大型互联网公司的必备技术能力。雅虎作为全球知名的互联网公司,在早期便开始采用并发展Hadoop这一开源分布式计算框架来...

    hadoop开发环境搭建

    因此,为了简化开发流程,提高效率,通常会搭建一个专门的开发环境来编写和测试Hadoop应用程序。本文档将详细介绍如何搭建这样一个环境。 #### 二、HadoopEclipse插件介绍 HadoopEclipse插件是专为Eclipse开发环境...

    Hadoop应用程序

    标题中的“Hadoop应用程序”指的是基于Hadoop框架开发的应用,这个框架主要用于大数据处理。Hadoop是Apache软件基金会下的一个开源项目,它提供了分布式文件系统(HDFS)和数据处理工具(MapReduce)来处理和存储大...

    hadoop shell操作与程式开发

    【标题】"Hadoop Shell操作与程序开发"涵盖了在分布式计算环境Hadoop中进行命令行交互和编写应用程序的核心概念。Hadoop是一个开源框架,专为处理和存储大量数据而设计,它利用分布式文件系统(HDFS)和MapReduce...

    基于Eclipse的Hadoop应用开发环境的配置

    本篇将详细介绍如何配置基于Eclipse的Hadoop应用开发环境。 首先,我们需要了解Hadoop的基本构成。Hadoop主要由两个核心部分组成:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是分布式文件系统,用于...

    win7下Eclipse开发Hadoop应用程序环境搭建

    从Eclipse官网下载适合Windows 7的Eclipse IDE版本,如Java EE版本,因为它包含了对Web和企业级应用开发的支持。 3. **安装Hadoop**: 下载Hadoop的稳定版本,解压到一个合适的目录,并配置`HADOOP_HOME`环境变量...

    Hadoop开发

    通过对Hadoop的基本原理、国内应用现状以及开发环境搭建等内容的学习,可以帮助开发者更好地利用Hadoop进行大数据处理工作。随着技术的不断发展,Hadoop及相关组件也在不断演进,未来将会更加成熟和完善。

Global site tag (gtag.js) - Google Analytics