`
风过无声
  • 浏览: 92534 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Hadoop MapReduce应用开发

阅读更多

1.开发流程

1)编写map函数和reduce函数,最好使用单元测试来确保函数的运行符合预期

2)写一个驱动程序来运行作业

3)通过在一个小的数据集上运行这个驱动程序来进行测试

2.配置API

1)Configuration

一个Configuration类的实例代表配置属性及其取值的一个集合。

每个属性由一个String来命名,而值类型可以是多种。

Configuration从XML文件中读取属性内容,常见的有core-site.xml,hdfs-site.xml,mapred-site.xml。e.g.

configuration-1.xml

<?xml version="1.0"?>
<configuration>
    <property>
        <name>color</name>
        <value>yellow</value>
        <description>Color</description>
    </property>
    <property>
        <name>size</name>
        <value>10</value>
        <description>Size</description>
    </property>
    <property>
        <name>sizew</name>
        <value>10w</value>
        <description>Size</description>
    </property>
    <property>
        <name>weight</name>
        <value>heavy</value>
        <final>true</final>
        <description>Weight</description>
    </property>
    <property>
        <name>size-weight</name>
        <value>${size},${weight}</value>
        <description>Size and weight</description>
    </property>
</configuration>

访问

public void testLoad() {
	Configuration conf = new Configuration();
	conf.addResource("configuration-1.xml");
	assertEquals(conf.get("color"), "yellow");
	assertEquals(conf.getInt("size", 0), 10);
	assertEquals(conf.getInt("sizew", 0), 0);
	assertEquals(conf.get("weight"), "heavy");
	assertEquals(conf.get("size-weight"), "10,heavy");
}

--属性的类型通过访问时的方法确定

--属性可以通过其他属性进行扩展,如size-weight

合并多个配置文件

configuration-2.xml

<?xml version="1.0"?>
<configuration>
  <property>
    <name>size</name>
    <value>12</value>
  </property>
  
  <property>
    <name>weight</name>
    <value>light</value>
  </property>
</configuration>

访问

public void testMerge() {
	Configuration conf = new Configuration();
	conf.addResource("configuration-1.xml");
	conf.addResource("configuration-2.xml");
	assertEquals(conf.getInt("size", 0), 12);
	assertEquals(conf.get("weight"), "heavy");
}

--后来添加到源文件的属性会覆盖之前定义的属性

--被标记为final的属性不能被后面的定义覆盖,并且会WARN提示

2)系统属性

系统属性System.setProperty(...)或者使用JVM参数-DXXX=XXX设置

配置属性可以通过系统属性来扩展,系统属性优先级高于配置文件中定义的属性。前提是配置属性中存在该属性。

public void testMerge() {
	Configuration conf = new Configuration();
	conf.addResource("configuration-1.xml");
	conf.addResource("configuration-2.xml");
	assertEquals(conf.getInt("size", 0), 12);
	assertEquals(conf.get("weight"), "heavy");
	System.setProperty("size", "14");
	assertEquals(conf.get("size-weight"), "14,heavy");
	//fail
	assertEquals(conf.getInt("size", 0), 14); 
}

3.辅助类GenericOptionsParser,Tool和ToolRunner

GenericOptionsParser一个用来解释常用的Hadoop命令行选项的类,但更方便的方式是实现Tool接口,通过ToolRunner来运行程序,ToolRunner内部调用GenericOptionsParser。e.g.

package com.siyuan.hadoop.test.dev;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ConfigurationPrinter extends Configured implements Tool {
	
	@Override
	public int run(String[] args) throws Exception {
		for (String arg : args) {
			System.out.println(arg);
		}
		return 0;
	}
	
	public static void main(String[] args) throws Exception {
		ConfigurationPrinter cfgPrinter = new ConfigurationPrinter();
		Configuration loaded = new Configuration(false);
		loaded.addResource("configuration-1.xml");
		cfgPrinter.setConf(loaded);
		int exitCode = ToolRunner.run(cfgPrinter , args);
		Configuration cfg = cfgPrinter.getConf();
		for (Map.Entry<String,String> property : cfg) {
			System.out.printf("Property: %s=%s\n", property.getKey(), property.getValue());
		}
		System.exit(exitCode);
	}
	
}

执行:

hadoop jar /home/hadoop/task/hadooptest.jar com.siyuan.hadoop.test.dev.ConfigurationPrinter -conf configuration-2.xml arg1 arg2

输出结果:

arg1
arg2
Property: weight=heavy
Property: sizew=10w
Property: color=yellow
Property: size-weight=${size},${weight}
Property: mapred.used.genericoptionsparser=true
Property: size=12

在程序中ToolRunner的静态run方法使用GenericOptionsParser来获取在hadoop命令行中指定的标准选项,然后在Configuration实例上进行设置,将非标准选项传递给Tool接口的run方法。


1)-D选项和系统属性不一样

2)-D选项的优先级要高于配置文件中的其他属性

3)并不是所有的属性都能通过-D改变

4)选项必须位于程序参数之前,如之前的-conf configuration-2.xml必须位于arg1,arg2,否则将被视为程序参数

4.程序编写

1)Mapper

MaxTemperatureMapper

package com.siyuan.hadoop.test.dev;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
		Mapper<LongWritable, Text, Text, IntWritable> {
	
	private static final int MISSING = 9999;  
	
	@Override
	protected void map(LongWritable key, Text value,
			Context context)
			throws IOException, InterruptedException {
		String line = value.toString();  
        
		String year = line.substring(15, 19);  
		  
		int airTemperature;  
		if (line.charAt(87) == '+') {  
		    airTemperature = Integer.parseInt(line.substring(88, 92));  
		} else {  
		    airTemperature = Integer.parseInt(line.substring(87, 92));  
		}  
		  
		String quaility = line.substring(92, 93);  
		if (airTemperature != MISSING && quaility.matches("[01459]")) {  
		    context.write(new Text(year), new IntWritable(airTemperature));  
		}
		
	}
	
}

MaxTemperatureMapperTest

package com.siyuan.hadoop.test.dev;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import junit.framework.TestCase;
import static org.mockito.Mockito.*;

public class MaxTemperatureMapperTest extends TestCase {
	
	private MaxTemperatureMapper mapper;
	private Mapper<LongWritable, Text, Text, IntWritable>.Context ctxt;
	
	@Override
	protected void setUp() throws Exception {
		mapper = new MaxTemperatureMapper();
		ctxt = mock(Mapper.Context.class);
	}

	public void testMap() throws IOException, InterruptedException {
		Text value = new Text("0029029070999991901010106004+64333+023450FM-12+000599999V0"
				+ "202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999");
		mapper.map(null, value, ctxt);
		verify(ctxt).write(new Text("1901"), new IntWritable(-78));
	}
	
	public void testMapMissing() throws IOException, InterruptedException {
		Text value = new Text("0029029070999991901010106004+64333+023450FM-12+000599999V0"
				+ "202701N015919999999N0000001N9+99991+99999102001ADDGF108991999999999999999999");
		mapper.map(null, value, ctxt);
		verify(ctxt, never()).write(any(Text.class), any(IntWritable.class));
	}
	
}

2)Reducer

MaxTemperatureReducer

package com.siyuan.hadoop.test.dev;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context ctxt)
			throws IOException, InterruptedException {
		int maxAirTemperature = Integer.MIN_VALUE;  
        
        for (IntWritable airTemperature : values) {  
            maxAirTemperature = Math.max(maxAirTemperature, airTemperature.get());  
        }  
          
        ctxt.write(new Text(key), new IntWritable(maxAirTemperature));
	}
	
}

MaxTemperatureReducerTest

package com.siyuan.hadoop.test.dev;

import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import junit.framework.TestCase;
import static org.mockito.Mockito.*;

public class MaxTemperatureReducerTest extends TestCase {
	
	private MaxTemperatureReducer reducer;
	private Reducer<Text, IntWritable, Text, IntWritable>.Context ctxt;
	
	@Override
	protected void setUp() throws Exception {
		reducer = new MaxTemperatureReducer();
		ctxt = mock(Reducer.Context.class);
	}
	
	public void testReduce() throws IOException, InterruptedException {
		Text key = new Text("1901");
		List<IntWritable> values = new ArrayList<IntWritable>();
		values.add(new IntWritable(100));
		values.add(new IntWritable(50));
		values.add(new IntWritable(0));
		reducer.reduce(key, values, ctxt);
		verify(ctxt).write(new Text("1901"), new IntWritable(100));
	}
	
}

3)job driver

package com.siyuan.hadoop.test.dev;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxTemperatureJob extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		if (args.length != 2) {  
            System.err.println("Usage: MaxTemperatureJob <input path> <output path>");  
            System.exit(-1);  
        }
		
		Job job = new Job(getConf(), "Max Temperature Job");
		job.setJarByClass(getClass());
		
		FileInputFormat.addInputPath(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
        
        // default: job.setInputFormatClass(TextInputFormat.class);
        
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        
        return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MaxTemperatureJob(), args));
	}
	
}

运行结果:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.Text

修复:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

5.MapReduce的WEB页面

可以通过Hadoop提供的Web页面来浏览作业信息,对于跟踪作业进度,查找作业完成后的统计信息和日志非常有用。默认的端口号为50030。

mapred-site.xml

<property>
  <name>mapred.job.tracker.http.address</name>
  <value>0.0.0.0:50030</value>
  <description>
    The job tracker http server address and port the server will listen on.
    If the port is 0 then the server will start on a free port.
  </description>
</property>
1)作业,任务和task attempt ID

 

--作业ID:jobtracker(不是作业)开始的时间和唯一标识此作业的由jobtracker维护的增量计数器。

e.g. job_201403281304_0006

注:计数器从0001开始,达到10000时,不能重新设置。

--任务ID:在初始化时产生,不必是任务执行的顺序。格式为 作业ID_[mr]_000X,mr为任务类型,000X为任务计数器,从0000开始。

e.g. task_201403281304_0006_m_000000,task_201403281304_0006_r_000000

--task attempt ID

由于失败或者推测执行,任务可能会执行多次。为了标识任务执行的不同实例,会通过task attempt ID进行区分。格式为 任务ID_index,index从0开始。task attempt在作业运行时根据需要分配,所以,它们的顺序代表tasktracker产生并运行的先后顺序。

e.g.task_201403281304_0006_m_000000_0,task_201403281304_0006_r_000000_0

注:如果在jobtracker重启并恢复运行作业后,作业被重启,那么task attempt ID中的计数器将从1000开始。

2)WEB页面组成

--jobtracker页面

在作业存储到历史信息页之前,主页上只显示100个作业,作业历史是永久存储的。

mapred-site.xml

<property>
  <name>mapred.jobtracker.completeuserjobs.maximum</name>
  <value>100</value>
  <description>The maximum number of complete jobs per user to keep around 
  before delegating them to the job history.</description>
</property>

 

作业历史的保存路径,系统会保存30天,然后自动删除。

mapred-site.xml

<property>
  <name>hadoop.job.history.location</name>
  <value></value>
  <description> The location where jobtracker history files are stored.
  The value for this key is treated as a URI, meaning that the files 
  can be stored either on HDFS or the local file system.  If no value is 
  set here, the location defaults to the local file system, at 
  file:///${hadoop.log.dir}/history.  If the URI is missing a scheme,
  fs.default.name is used for the file system.
  </description>
</property>
作业输出目录的_logs/history子目录为用户存放第二个备份,该文件不会被系统删除

 

mapred-site.xml

<property>
  <name>hadoop.job.history.user.location</name>
  <value></value>
  <description> User can specify a location to store the history files of 
  a particular job. If nothing is specified, the logs are stored in 
  output directory. The files are stored in "_logs/history/" in the directory.
  User can stop logging by giving the value "none". 
  </description>
</property>

 

--job页面

--task页面

Actions列包括终止task attempt的连接,默认情况下为禁用的。

core-site.xml

<property>
  <name>webinterface.private.actions</name>
  <value>false</value>
  <description> If set to true, the web interfaces of JT and NN may contain 
                actions, such as kill job, delete file, etc., that should 
                not be exposed to public. Enable this option if the interfaces 
                are only reachable by those who have the right authorization.
  </description>
</property>
6.其它

 

1)获取结果

每个reducer会产生一个输出文件到输出目录。

--hadoop fs 命令中的-getmerge,可以得到源模式目录中的所有文件,并在本地系统上将它们合并成一个文件。e.g.

hadoop fs -getmerge output output-local.txt
2)作业调试

--将调试信息打印到标准错误中

--发送一个信息来更新任务的状态信息以提示我们查看错误日志

--创建一个自定义的计数器来统计错误总数

以上信息均可在WEB页面中查看

package com.siyuan.hadoop.test.dev;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends
		Mapper<LongWritable, Text, Text, IntWritable> {
	
	private static final int MISSING = 9999;  
	
	enum RECORD_FORMAT {
		ERROR
	}
	
	@Override
	protected void map(LongWritable key, Text value,
			Context context)
			throws IOException, InterruptedException {
		String line = value.toString();  
        
		String year = line.substring(15, 19);  
		  
		int airTemperature;  
		if (line.charAt(87) == '+') {  
		    airTemperature = Integer.parseInt(line.substring(88, 92));  
		} else {  
		    airTemperature = Integer.parseInt(line.substring(87, 92));  
		}  
		  
		String quaility = line.substring(92, 93);  
		if (airTemperature != MISSING && quaility.matches("[01459]")) {  
		    context.write(new Text(year), new IntWritable(airTemperature));  
		} else {
			//将调试信息打印到标准错误中
			System.err.println("Wrong format record has been found:" + line);
			//更改任务状态
			context.setStatus("Wrong format record has been found.");
			//创建一个自定义的计数器来统计错误总数
			context.getCounter(RECORD_FORMAT.ERROR).increment(1);
		}
		
	}
	
}

3)Hadoop用户日志

针对不同的用户,hadoop在不同的地方生成日志,如下表:

4)作业调优


5)MapReduce工作流

--JobControl类

--Oozie

  • 大小: 138.4 KB
  • 大小: 24.9 KB
  • 大小: 89.2 KB
  • 大小: 41.6 KB
分享到:
评论

相关推荐

    Hadoop MapReduce实现tfidf源码

    在Hadoop 2.7.7版本中,开发MapReduce程序通常需要使用Java编程语言。你需要导入Hadoop的相关库,创建Mapper和Reducer类,并实现它们的map()、reduce()方法。此外,还需要配置Job参数,如输入路径、输出路径、Mapper...

    大数据 hadoop mapreduce 词频统计

    总的来说,大数据Hadoop MapReduce词频统计是大数据分析的重要应用之一,它揭示了文本数据的内在结构,为文本挖掘、信息检索等应用提供了基础。通过理解和掌握这一技术,开发者可以更好地应对现代数据驱动决策的需求...

    基于Apriori算法的频繁项集Hadoop mapreduce

    将Apriori算法应用于Hadoop MapReduce时,我们可以按照以下步骤进行: 1. **映射阶段(Map)**:在这一阶段,原始数据被分割成多个块,并分配给不同的Mapper。每个Mapper负责扫描其分配的数据块,找出所有单个项...

    Hadoop应用系列2--MapReduce原理浅析(上)

    理解MapReduce的工作原理对于开发和优化大数据处理应用至关重要,这也是Hadoop生态系统中的核心技能之一。通过持续学习和实践,开发者可以更好地利用MapReduce解决实际问题,驾驭海量数据的挑战。

    hadoop eclipse mapreduce 下开发所有需要用到的 JAR 包

    10. **zookeeper.jar**:如果MapReduce应用使用了ZooKeeper,那么也需要这个JAR包,ZooKeeper是一个分布式协调服务。 在Eclipse中,你可以通过以下步骤来配置这些JAR包: 1. 创建一个Java项目,并选择"Dynamic Web...

    Hadoop_MapReduce教程

    - 使用 Java 开发 MapReduce 应用程序是最常见的做法。 **2. 其他编程语言支持** - **Hadoop Streaming**:允许用户使用任何可执行程序(如 Shell 脚本)作为 mapper 和 reducer。 - **Hadoop Pipes**:提供了 ...

    Hadoop MapReduce开发

    在分布式计算领域,Hadoop是一个开源框架,用于...总而言之,Hadoop MapReduce开发是一个完整的流程,从代码编写到性能优化,需要开发者综合运用编程技能、测试手段和性能分析工具来确保MapReduce应用的高效稳定运行。

    Hadoop_MapReduce教程.doc

    如Hadoop Streaming允许使用任意可执行程序(如Shell脚本)作为mapper和reducer,而Hadoop Pipes则是一个C++ API,可用于创建MapReduce应用程序。 7. **输入与输出**: MapReduce作业的输入和输出都是键值对的形式...

    Hadoop MapReduce教程.pdf

    Hadoop则是Apache基金会开发的一个开源框架,其核心组件包括HDFS(Hadoop Distributed File System)和MapReduce。Hadoop MapReduce是一种基于Java的分布式计算框架,用于在大量廉价硬件上处理海量数据。 #### ...

    hadoop mapreduce 例子项目,运行了单机wordcount

    5. 单机模式下的Hadoop运行环境配置和应用部署。 了解这些内容对于初学者来说是进入Hadoop世界的重要步骤,它提供了实践经验,帮助理解分布式计算的核心思想。随着对Hadoop的深入学习,可以进一步探索更复杂的数据...

    MapReduce应用开发

    最后,华为提供的这份培训资料强调,学习完MapReduce应用开发课程后,开发者将能够掌握MapReduce的业务过程,搭建开发环境,并进行实际的MapReduce应用开发。MapReduce作为大数据处理的核心技术之一,在云计算和...

    云计算技术实验报告三运行Hadoop MapReduce程序

    通过这次实验,参与者不仅掌握了Hadoop MR程序的开发流程,还深化了对Hadoop MapReduce工作流程的理解,包括Map阶段的数据分区、排序和Shuffle过程,以及Reduce阶段的聚合计算。同时,对于如何打包和执行jar文件也有...

    Python中Hadoop MapReduce的一个简单示例.zip

    我们将详细探讨MapReduce的基本概念、工作原理以及Python在Hadoop中的应用。 MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。它将复杂的数据处理任务拆分成两个主要阶段:Map...

    大数据-hadoop-mapreduce代码

    总的来说,这个“大数据-hadoop-mapreduce代码”资源对于想要深入理解Hadoop MapReduce机制,或者需要开发和优化MapReduce应用程序的人来说,是一个宝贵的参考资料。通过研究这些代码,你可以学习到如何处理Hadoop...

    基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据.zip

    此外,Hadoop 还支持使用其他编程语言,如 Python 和 Scala,通过 Pig 或 Hive 等高级接口编写 MapReduce 作业,简化开发过程。然而,对于更复杂的逻辑,Java 仍然是首选,因为它提供了更大的灵活性和性能。 在项目...

    基于Hadoop MapReduce的高校考研分数线统计分析项目代码+数据集.rar

    该项目是关于利用Hadoop ...总的来说,"基于Hadoop MapReduce的高校考研分数线统计分析项目"是一个结合理论与实践的优秀案例,涵盖了大数据处理的基本流程,对于提升数据分析技能、理解和应用MapReduce具有很高的价值。

    Hadoop MapReduce Cookbook

    本书可以作为读者在实际工作中遇到问题时的参考书,也可以作为开发高性能大数据处理应用的教程。此外,通过本书提供的各种“食谱”(即具体的代码示例和操作步骤),读者可以快速学习如何部署和运行自己的MapReduce...

    Clustering Algorithms to Hadoop MapReduce Framework.pdf

    ### 聚类算法在Hadoop MapReduce框架下的应用 #### 一、引言与背景 随着大数据时代的到来,处理海量数据集的需求日益增长。在这样的背景下,Apache Hadoop作为一个强大的分布式计算平台,成为了处理大规模数据集的...

    Hadoop原理与技术MapReduce实验

    (1)熟悉Hadoop开发包 (2)编写MepReduce程序 (3)调试和运行MepReduce程序 (4)完成上课老师演示的内容 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 二、实验内容 1.单词计数实验...

Global site tag (gtag.js) - Google Analytics