`
booby325
  • 浏览: 386498 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop:The Definitive Guid 总结 Chapter 5 MapReduce应用开发

 
阅读更多

 

用MapReduce来编写程序,有几个主要的特定流程,首先写map函数和reduce函数,最好使用单元测试来确保函数的运行符合预期,然后,写一个驱动程序来运行作业,要看这个驱动程序是否可以运行,之后利用本地IDE调试,修改程序

实际上权威指南的一些配置已经过时 所以这里很多地方不做介绍

 

1.配置API

Hadoop拥有很多xml配置文件,格式遵从一般xml的要求 见实例

复制代码
<!--Example:5-1. A simple configuration file, configuration-1.xml-->
<?xml version="1.0"?>
<configuration>
    <property>
        <name>color</name>
        <value>yellow</value>
        <description>Color</description>
    </property>
    <property>
        <name>size</name>
        <value>10</value>
        <description>Size</description>
    </property>
    <property>
        <name>weight</name>
        <value>heavy</value>
        <final>true</final>
        <description>Weight</description>
    </property>
    <property>
        <name>size-weight</name>
        <value>${size},${weight}</value>
        <description>Size and weight</description>
    </property>
</configuration>
复制代码

访问属性的方法:

复制代码
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
assertThat(conf.get("color"), is("yellow"));
assertThat(conf.getInt("size", 0), is(10));
assertThat(conf.get("breadth", "wide"), is("wide"));
复制代码


Hadoop允许多个配置文件进行合并:

复制代码
<!--Example 5-2. A second configuration file, configuration-2.xml -->
<?xml version="1.0"?>
<configuration>
    <property>
        <name>size</name>
        <value>12</value>
    </property>
    <property>
        <name>weight</name>
        <value>light</value>
    </property>
</configuration>
复制代码

源文件按顺序填到Configuration:

Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
conf.addResource("configuration-2.xml");

后来添加到源文件的属性会覆盖之前定义的属性,另外在上面的配置文件中,如果覆盖设置fina为true的property,则会出现配置错误,标记final为true的属性说明不希望客户端更改这个属性

关于可变的扩展:配置属性可以用其他属性或系统属性进行定义,而且系统属性的优先级高于源文件中定义的属性:

System.setProperty("size", "14");
assertThat(conf.get("size-weight"), is("14,heavy"));

该特性用于使用JVM参数-Dproperty=value来覆盖命令方式下的属性

 

 

2.配置开发环境

1).配置管理

权威指南给出了示例,实际上hadoop官方网站更具有权威性 如欲了解Hadoop2.0的配置参考示例请见:http://hadoop.apache.org/common/docs/r2.0.0-alpha/

2).辅助类GenericOptionsParser, Tool和ToolRunner

Hadoop提供了辅助类,GenericOptionsParser:用来解释常用的Hadoop命令选项,但是一般更常用的方式:实现Tool接口,通过ToolsRunner来运行程序,ToolRunner内部调用GenericOptionsParser

Tool实现示例用于打印一个Configuration对象的属性:

public interface Tool extends Configurable {
    int run(String[] args) throws Exception;
}
复制代码
public class ConfigurationPrinter extends Configured implements Tool {
    static {
        Configuration.addDefaultResource("hdfs-default.xml");
        Configuration.addDefaultResource("hdfs-site.xml");
        Configuration.addDefaultResource("mapred-default.xml");
        Configuration.addDefaultResource("mapred-site.xml");
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        for (Entry<String, String> entry : conf) {
            System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
        }
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
        System.exit(exitCode);
    }
}
复制代码


在Hadoop中 -D选项可以把默认属性放入配置文件中,然后在需要时,用-D选项来覆盖它们,注意的是,这个不同于JVM系统属性设置Java命令 -Dproperty=value,JVM中的-D与属性没有空格

 

下面给出GenericOptionsParser选项和ToolRunner选项

 

3).编写单元测试

以下程序可以在IDE Eclipse中运行

[注意:mruint到0.9版本仍然只是支持的是0.20版本之前的mapred包中的Mapper和Reducer,并不支持mapreducer包中的Mapper和Reducer]

mapper的测试实例:

复制代码
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*;

public class MaxTemperatureMapperTest {
    @Test
    public void processesValidRecord() throws IOException, InterruptedException {
        Text value = new Text(
                "0043011990999991950051518004+68750+023550FM-12+0382" +
                // Year ^^^^
                        "99999V0203201N00261220001CN9999999N9-00111+99999999999");
        // Temperature ^^^^^
        new MapDriver<LongWritable, Text, Text, IntWritable>()
                .withMapper(new MaxTemperatureMapper()).withInputValue(value)
                .withOutput(new Text("1950"), new IntWritable(-11)).runTest();
    }
}
复制代码


最终的Mapper函数:

复制代码
public class MaxTemperatureMapper extends
        Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String year = line.substring(15, 19);
        String temp = line.substring(87, 92);
        if (!missing(temp)) {
            int airTemperature = Integer.parseInt(temp);
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }

    private boolean missing(String temp) {
        return temp.equals("+9999");
    }
}
复制代码


reducer的测试函数

复制代码
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.*;

public class MaxTemperatureMapperTest {

    @Test
    public void returnsMaximumIntegerInValues() throws IOException,
            InterruptedException {
        new ReduceDriver<Text, IntWritable, Text, IntWritable>()
                .withReducer(new MaxTemperatureReducer())
                .withInputKey(new Text("1950"))
                .withInputValues(
                        Arrays.asList(new IntWritable(10), new IntWritable(5)))
                .withOutput(new Text("1950"), new IntWritable(10)).runTest();
    }
}
复制代码

 

最后的reducer函数实现

复制代码
public class MaxTemperatureReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}
复制代码

 

 

3.本地运行测试数据

1).本地运行Job

Job驱动程序查找最高气温

复制代码
public class MaxTemperatureDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output>\n",
                    getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }
        Job job = new Job(getConf(), "Max temperature");
        job.setJarByClass(getClass());

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
    }
}
复制代码

命令运行驱动程序:

% mvn compile
% export HADOOP_CLASSPATH=target/classes/
% hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml input/ncdc/micro output

这里给出权威指南上的parse函数

复制代码
public class NcdcRecordParser {
    private static final int MISSING_TEMPERATURE = 9999;
    private String year;
    private int airTemperature;
    private String quality;

    public void parse(String record) {
        year = record.substring(15, 19);
        String airTemperatureString;
        // Remove leading plus sign as parseInt doesn't like them
        if (record.charAt(87) == '+') {

            airTemperatureString = record.substring(88, 92);
        } else {
            airTemperatureString = record.substring(87, 92);
        }
        airTemperature = Integer.parseInt(airTemperatureString);
        quality = record.substring(92, 93);
    }

    public void parse(Text record) {
        parse(record.toString());
    }

    public boolean isValidTemperature() {
        return airTemperature != MISSING_TEMPERATURE
                && quality.matches("[01459]");
    }

    public String getYear() {
        return year;
    }

    public int getAirTemperature() {
        return airTemperature;
    }
}
复制代码

利用上面的parser函数mapper函数可以写成下面形式

复制代码
public class MaxTemperatureMapper extends
        Mapper<LongWritable, Text, Text, IntWritable> {
    private NcdcRecordParser parser = new NcdcRecordParser();

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        parser.parse(value);
        if (parser.isValidTemperature()) {
            context.write(new Text(parser.getYear()),
                    new IntWritable(parser.getAirTemperature()));
        }
    }
}
复制代码

 

 


2).测试驱动程序

需要关注的是 在下面程序中,checkOutput()方法被调用用以逐行对比实际输出与与其输出

复制代码
@Test
public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "file:///");
        conf.set("mapred.job.tracker", "local");
        Path input = new Path("input/ncdc/micro");
        Path output = new Path("output");
    
        FileSystem fs = FileSystem.getLocal(conf);
        fs.delete(output, true); // delete old output
        MaxTemperatureDriver driver = new MaxTemperatureDriver();
        driver.setConf(conf);
        int exitCode = driver.run(new String[] {
        input.toString(), output.toString() });
        assertThat(exitCode, is(0));
        checkOutput(conf, output);
}
复制代码

 

 

4.集群上的运行

以下会列出一些命令 但是最好还是参照Hadoop官方网站为佳

1).打包

新版的Hadoop 2.0用mvn对Hadoop进行打包 其实也可以用Eclipse打包 两者方法在实际中都可以,mav命令:

% mvn package -DskipTests

配置打包过程中注意对HADOOP_CLASSPATH的设置,和依赖包的导入等 详见上面 Hadoop官方网站

 

2).Job的启动

Job类中的waitForCompletion()启动Job并轮询检查Job的运行进程

 

3)Job、Task和Task Attempt ID

Job的ID一般来源本地时间 例如:job_200904110811_0002(0002,Job的ID从1开始)

Task隶属于Job 所以Task的ID是以Job的ID为前缀,然后加上一个后缀,表示Job下的哪一个Task,例如:task_200904110811_0002_m_000003(000003,Task的ID从0开始)

Task Attempt是由Task的生成 自然Task AttemptID的前缀为Task的ID,之后加上后缀,后面表示表示失败后尝试的次数,例如:attempt_200904110811_0002_m_000003_0(0,Task Attempt的ID从0开始)

 

3).MapReduce的Web页面

因为Hadoop经过改版一些web的页面的URL也不断变化,所以这个需要参照Hadoop的网站为佳

 

4).获取结果

hadoop fs 命令中的-getmerge,可以得到源模式目录中的所有文件,并在本地系统上将它们合并成一个文件,实例如下:

% hadoop fs -getmerge max-temp max-temp-local
% sort max-temp-local | tail

1991           607
1992           605
1993           567
1994           568
1995           567
1996           561
1997           565
1998           568
1999           568
2000           558

 

5).作业调试

可以利用Hadoop输出的log文件和一些其他信息(例如计数器等工具),进行调试,并用web页面查看调试后的结果

关于远程调试器:可以用JVM选项,Java profiling够工具,IsolationRunner工具还有,如果为了监视失败作业的情况,可以设置keep.failed.task.files为true

 

 

5.作业调优

作业调优表:

对Job程序的修改可以启用HPROF工具,另外也有其他分析工具帮助调优,例如:DistributedCache等等

 

 

6.MapReduce的工作流

1).将问题分解成MapReduce作业

需要注意的是:对于十分复杂的问题 可以使用Hadoop自带ChainMapper类库将它们连接成一个Mapper,结合使用ChainReducer,这样就可以在一个MapReduce作业中运行一系列的mapper,再运行一个reducer和另一个mapper链。

 

2).运行独立的Job

管理作业的执行顺序。其中主要考虑的是:是否有一个线性的作业链或一个更复杂的作业有向无环图(DAG)

 

附一个自己写的NewMaxTemperatureDriver  

 

package com.hadoop.definitive.guid.NewMaXTemperature;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.hadoop.definitive.guid.customobject.TextPair;
import com.hadoop.definitive.guid.customobject.TextPairFileInputStream;

public class NewMaxTemperatureDriver  extends Configured implements Tool{

	static class NewMaxTemperatureMapper extends
			Mapper<LongWritable, TextPair, Text, IntWritable>{

		private static final int MISSING = 9999;

		public void map(LongWritable key, TextPair value, Context context)
				throws IOException, InterruptedException {

			//System.out.println(value.getFirst().toString() + "  " + value.getSecond().toString());
			String line = value.getSecond().toString();
			String year = line.substring(15, 19);
			int airTemperature;
			if (line.charAt(87) == '+') {
				airTemperature = Integer.parseInt(line.substring(88, 92));
			} else {
				airTemperature = Integer.parseInt(line.substring(87, 92));
			}
			String quality = line.substring(92, 93);
			if (airTemperature != MISSING && quality.matches("[01459]")) {
				context.write(new Text(year), new IntWritable(airTemperature));
			}
		}
	}

	static class NewMaxTemperatureReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {

			int maxValue = Integer.MIN_VALUE;
			for (IntWritable value : values) {
				maxValue = Math.max(maxValue, value.get());
			}
			context.write(key, new IntWritable(maxValue));
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		 if (args.length != 2) {
	            System.err.printf("Usage: %s [generic options] <input> <output>\n",
	                    getClass().getSimpleName());
	            ToolRunner.printGenericCommandUsage(System.err);
	            return -1;
	        }

		Configuration conf = new Configuration();
		
		//add special client conf, default is using local 
		conf.addResource("conf/core-site.xml");
		
		//setup compress format for mapper output
		//conf.setBoolean("mapred.compress.map.output", true);
		//conf.setClass("mapred.map.output.compression.codec", BZip2Codec.class, CompressionCodec.class);

		//upload air resource file
		FileSystem fs = FileSystem.get(conf);
		//1st way: upload file to hdfs
		//upload(fs, "/mnt/hgfs/Shared/1901", args[0]);
		//upload(fs, "/mnt/hgfs/Shared/1902", args[0]);
		
		//2st way: upload two zip file into hdfs system
		upload(fs, "/mnt/hgfs/Shared/1901.gz", args[0]);
		upload(fs, "/mnt/hgfs/Shared/1902.gz", args[0]);
		
		if (fs.exists(new Path(args[1])))
			fs.delete(new Path(args[1]), true);

		Job job = new Job(conf);
		job.setJarByClass(NewMaxTemperatureDriver.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		//setup special input format, default is TextInputFormat
		job.setInputFormatClass(TextPairFileInputStream.class);
		
		job.setMapperClass(NewMaxTemperatureMapper.class);
		job.setReducerClass(NewMaxTemperatureReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//setup output compress format for whole job
		//FileOutputFormat.setCompressOutput(job, true);
		//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

		return job.waitForCompletion(true) ? 0 : 1;
	}
	
	public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new NewMaxTemperatureDriver(), args);
        System.exit(exitCode);
    }

	public synchronized static void upload(FileSystem fs, String local,
			String remote) {

		Path dst = new Path(remote);
		Path src = new Path(local);
		try {
			fs.copyFromLocalFile(false, true, src, dst);
			System.out.println("upload " + local + " to  " + remote
					+ " successed. ");
		} catch (Exception e) {
			System.err.println("upload " + local + " to  " + remote
					+ " failed :" + ExceptionUtils.getFullStackTrace(e));
		}
	}
	
	public synchronized static void uploadCompressFile(FileSystem fs, String local,
			String remote) {

		Path dst = new Path(remote);
		Path src = new Path(local);
		
		CompressionCodecFactory factory = new CompressionCodecFactory(fs.getConf());
        CompressionCodec codec = factory.getCodec(src);
		
        InputStream in = null;
        OutputStream out = null;
		try {
			in = codec.createInputStream(FileSystem.getLocal(fs.getConf()).open(src));
			out = fs.create(dst, true);
	        IOUtils.copyBytes(in, out, fs.getConf(), true);
	        
	        System.out.println("upload " + local + " to  " + remote
					+ " successed. ");
		} catch (Exception e) {
			System.err.println("upload " + local + " to  " + remote
					+ " failed :" + ExceptionUtils.getFullStackTrace(e));
		}
	}
}

 

 

import static org.junit.Assert.assertThat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

public class NewMaxTemperatureDriverTest {
	
	@Test
	public void test() throws Exception {
	        Configuration conf = new Configuration();
	        //conf.set("fs.default.name", "file:///");
	        //conf.set("mapred.job.tracker", "local");
	        Path input = new Path("/definitive_guid/ncdc/input");
	        Path output = new Path("/definitive_guid/ncdc/output/");
 
	        FileSystem fs = FileSystem.getLocal(conf);
	        fs.delete(output, true); // delete old output
	        NewMaxTemperatureDriver driver = new NewMaxTemperatureDriver();
	        driver.setConf(conf);
	        int exitCode = driver.run(new String[] {
	        input.toString(), output.toString() });
	        
	        System.out.println("exitCode= " + exitCode);
	        //assertThat(exitCode, is(0));
	        //checkOutput(conf, output);
	}

}
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/usr/local/hadoop/tmp</value>
        <!--备注:请先在 /usr/hadoop 目录下建立 tmp 文件夹-->
        <description>A base for other temporary directories.</description>
    </property>
</configuration>
分享到:
评论

相关推荐

    hadoop权威指南代码(Hadoop: The Definitive Guide code)

    `Hadoop: The Definitive Guide`中可能会讲解如何创建、读取和操作HDFS上的文件,以及如何配置HDFS参数以优化性能。 MapReduce是Hadoop处理大数据的主要计算模型,它将大规模数据处理任务分解为小的“映射”和...

    Hadoop: The Definitive Guide, 4th Edition

    Hadoop: The Definitive Guide, 4th Edition Get ready to unlock the power of your data. With the fourth edition of this comprehensive guide, you’ll learn how to build and maintain reliable, scalable,...

    Hadoop: The Definitive Guide, Third Edition

    With this digital Early Release edition of Hadoop: The Definitive Guide, you get the entire book bundle in its earliest form – the author’s raw and unedited content – so you can take advantage of ...

    [Hadoop权威指南(第2版)].(Hadoop:The.Definitive.Guide).文字版.pdf

    - **书名**:《Hadoop:The Definitive Guide》(第二版) - **作者**:Tom White - **前言作者**:Doug Cutting - **出版社**:O'Reilly Media, Inc. - **出版日期**:2010年10月 - **版权**:版权所有 © 2011 Tom...

    Hadoop: The Definitive Guide 中英两版

    《Hadoop:权威指南》是了解和掌握Apache Hadoop生态系统不可或缺的一本著作。这本书由Tom White撰写,全面深入地介绍了Hadoop的各个组件及其工作原理,对于初学者和专业人士来说都是一份宝贵的参考资料。 Hadoop是...

    Hadoop The Definitive Guide

    书中首先介绍了Hadoop的基础组件,包括MapReduce、HDFS(Hadoop Distributed File System)等,深入探讨了MapReduce的使用,包括开发适用于YARN的嵌套数据处理应用的步骤。接着,读者将学习两种数据格式:Avro用于...

    Hadoop The Definitive Guide PDF

    ### Hadoop The Definitive Guide知识点总结 #### 1. 了解Hadoop - **数据的重要性**:在当今数字化世界中,数据被视为一种极其宝贵的资源。随着数据量的激增,传统的数据处理方法逐渐显得力不从心。因此,开发出...

    Hadoop: The Definitive Guide 3rd_edition

    《Hadoop: The Definitive Guide, Third Edition》是Tom White对于Hadoop的深入剖析之作,涵盖了Hadoop的多个方面,包括它的历史、架构和应用。本书为读者展示了Hadoop的生态系统和核心概念,包括分布式文件系统...

    Hadoop: The Definitive Guide

    Hadoop的I/O、MapReduce应用程序开发;MapReduce的工作机制:MapReduce的类型和格式;MapReduce的特性:如何安装Hadoop集群,如何管理Hadoop;Pig简介:Hbase简介:ZooKeeper简介,最后还提供了丰富的案例分析。 ...

    hadoop权威指南 中文版 英文版Hadoop: The Definitive Guide 带书签,无密码

    《Hadoop权威指南》是大数据领域的一本经典著作,它详细介绍了Apache Hadoop生态系统的核心组件、工作原理以及实际应用。这本书分为中文版和英文版,为读者提供了双语学习的选择,且带有书签,便于查阅和学习,无需...

    Hadoop The Definitive Guide, 4th Edition.pdf

    The fourth edition covers Hadoop 2 exclusively. The Hadoop 2 release series is the current active release series and contains the most stable versions of Hadoop. There are new chapters covering YARN ...

    Hadoop: The Definitive Guide [Paperback]

    This comprehensive resource shows you how to build and maintain reliable, scalable, distributed systems with the Hadoop framework -- an open source implementation of MapReduce, the algorithm on which...

Global site tag (gtag.js) - Google Analytics