`

mapreduce 开发以及部署

阅读更多

前面几篇文章的梳理让我对hadoop新yarn 框架有了一个大概的认识,今天开始回归老本行---开始coding。

因为涉及到linux系统部署,所以今天安了一个linux 的 lszrz 插件

下载并解压缩 lrzsz-0.12.20.tar.gz

安装之前,需要检查系统是否有gcc  若没有请安装 yum install gcc

安装lrzsz  ./configure && make && make install
上面安装过程默认把lsz和lrz安装到了/usr/local/bin/目录下, 下面创建软链接, 并命名为rz/sz:
# cd /usr/bin
# ln -s /usr/local/bin/lrz rz
# ln -s /usr/local/bin/lsz sz

 

开始写代码 首先导入相应的包
  commons-beanutils-1.7.0.jar
 commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.4.jar
 commons-collections-3.2.1.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-digester-1.8.jar
 commons-el-1.0.jar
 commons-httpclient-3.1.jar
 commons-io-2.4.jar
 commons-lang-2.6.jar
 commons-logging-1.0.4.jar
 commons-logging.jar
 guava-11.0.2.jar
 hadoop-common-2.5.2.jar
 hadoop-mapreduce-client-core-2.5.2.jar
 log4j-1.2.14.jar
 mockito-all-1.8.5.jar
 mrunit-1.1.0-hadoop2.jar
 powermock-mockito-1.4.9-full.jar

 

在此我们写一个分析每年最高气温的任务,气温数据格式如下

1901 01 01 06   -38 -9999 10200   270   159     8 -9999 -9999

其中1901 为年份 01 01 为月份 -38为气温

开始编写mapper 代码如下

package com.snwz.mapreduce;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class MyMapper {

	private static final Log logger = LogFactory.getLog(MyMapper.class);
	public static class myMapper extends Mapper<Object, Text, IntWritable, IntWritable> {
		
		private static final IntWritable one = new IntWritable(1);
		private IntWritable key = new IntWritable(); 
		private IntWritable record = new IntWritable();
		private IntWritable year = new IntWritable(); 
		private Context context; 
		/**
		 * key 数据偏移量
		 * value 数据
		 * context 上下文对象
		 * 
		 * 注:由于要计算每年最高的气温,所以在此我们将年份作为key 气温作为value
		 * 都作为整形来计算
		 */
	    @Override
		protected void map(Object key, Text value, Context context)
	    		throws IOException, InterruptedException {
			String line = value.toString();
	    	if("".equals(line)||null==line){
	    		return;
	    	}
	    	line = line.replace(" ", "%");
	    	String array[] = line.split("%");
	    	if(array==null || array.length<22){
	    		logger.info("line : "+key+" array length error "+line);
	    		return;
	    	}
	    	if("-9999".equals(array[5])){
	    		logger.info("line : "+key+" temperature error -9999");
	    		return;
	    	}
	    	year.set(Integer.parseInt(array[0]));
	    	int temperature = Integer.parseInt(array[9]);
	    	record.set(temperature);
	    	context.write(year, record); 
	    }
	}
	
	
	public static void main(String[] args) {
		
	}
	
}

 

reducer 代码如下:

package com.snwz.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer {
	
	public static class myReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
		@Override
		protected void reduce(IntWritable key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			//当年最大气温
			int maxTem = 0;
			for(IntWritable i : values){
				maxTem = Math.max(maxTem, i.get());
			}
			context.write(key, new IntWritable(maxTem));
		}
	}

}

 完成之后我们通过一个方便的测试工具mrunit 来进行测试

package com.snwz.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

import com.snwz.mapreduce.MyMapper.myMapper;
import com.snwz.mapreduce.MyReducer.myReducer;

public class MpTest {

	MapDriver<Object, Text, IntWritable, IntWritable> mapDriver;  
	ReduceDriver<IntWritable, IntWritable, IntWritable, IntWritable> reduceDriver;  
	MapReduceDriver<IntWritable, Text, IntWritable, IntWritable, IntWritable, IntWritable> mapReduceDriver;
	
	
	@Before
	public void setUp(){
		System.setProperty("hadoop.home.dir", "E:\\hadoop\\hadoop-2.5.2");
		myMapper  mapper = new myMapper();
		myReducer reducer = new myReducer();
		mapDriver = MapDriver.newMapDriver(mapper);
		reduceDriver = ReduceDriver.newReduceDriver(reducer);
	}
	
	 @Test  
	 public void testMapper() throws IOException {
		 mapDriver.withInput(new LongWritable(), 
				 new Text("1901 01 01 06   -78 -9999 10200   270   159     8 -9999 -9999"));    
		 mapDriver.withOutput(new IntWritable(1901), new IntWritable(-78));    
		 mapDriver.runTest();
     }

     @Test
     public void testReducer() throws IOException {
    	 List<IntWritable> values = new ArrayList<IntWritable>();
    	 values.add(new IntWritable(1));
    	 values.add(new IntWritable(2));
    	 values.add(new IntWritable(-48));
    	 values.add(new IntWritable(-12));
    	 reduceDriver.withInput(new IntWritable(1940), values)
    	 .withOutput(new IntWritable(1940), new IntWritable(2))
    	 .runTest();
     }
}

 测试通过后 开始编写job 任务

package com.snwz.mapreduce;

import java.io.File;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.snwz.mapreduce.MyMapper.myMapper;
import com.snwz.mapreduce.MyReducer.myReducer;

public class MyJob extends Configured implements Tool {
	
	private static final Log logger = LogFactory.getLog(MyJob.class);

	public static void main(String[] args) {
		try {
			int res;
			res = ToolRunner.run(new Configuration(), new MyJob(), args);
			System.exit(res);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public int run(String[] args) throws Exception {
		if (args == null || args.length != 2) {
			System.out.println("need inputpath and outputpath");
			return 1;
		}
		// hdfs 输入路径
		String inputpath = args[0];
		// reduce 结果集输出路径
		String outputpath = args[1];
		
		String shortin = args[0]; 
		String shortout = args[1]; 
		if (shortin.indexOf(File.separator) >= 0) 
		shortin = shortin.substring(shortin.lastIndexOf(File.separator)); 
		if (shortout.indexOf(File.separator) >= 0) 
		shortout = shortout.substring(shortout.lastIndexOf(File.separator)); 

		File inputdir = new File(inputpath);
		File outputdir = new File(outputpath);
		if (!inputdir.exists() || !inputdir.isDirectory()) {
			System.out.println("inputpath not exist or isn't dir!");
			return 0;
		}
		if (!outputdir.exists()) {
			new File(outputpath).mkdirs();
		}
		
		Job job = new Job(new JobConf());
		job.setJarByClass(MyJob.class); 
		job.setJobName("MyJob"); 
		job.setOutputKeyClass(IntWritable.class);// 输出的 key 类型,在 OutputFormat 会检查
	    job.setOutputValueClass(IntWritable.class); // 输出的 value 类型,在 OutputFormat 会检查
	    job.setMapperClass(myMapper.class); 
	    job.setCombinerClass(myReducer.class); 
	    job.setReducerClass(myReducer.class); 
	    FileInputFormat.setInputPaths(job, new Path(shortin));//hdfs 中的输入路径
	    FileOutputFormat.setOutputPath(job,new Path(shortout));//hdfs 中输出路径

	    Date startTime = new Date(); 
	    logger.info("Job started: " + startTime); 
	    job.waitForCompletion(true);    
	    Date end_time = new Date(); 
	    logger.info("Job ended: " + end_time); 
	    logger.info("The job took " + 
	    (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); 
	    return 0; 

	}
}

 编写完成后,一个简单的mapreduce就编写完成了,然后通过打包工具将编写的类打成jar包,关联的jar就不需要了,因为hadoop里面的 jar命令会自己去关联相应的jar文件。,打包时 main 方法指定为job即可,将包存放在hadoop根目录,然后将需要分析的文件存放在hdfs系统

清空输出路径 ./bin/hadoop dfs -rmr /output

建立输入路径 ./bin/hadoop dfs -mkdir /input

上传文件 ./bin/hadoop dfs -copyFromLocal 本地路径 hdfs路径

运行jar文件 ./bin/hadoop jar myJob.jar /input /output

运行完成后 进入输出路径 查看输出结果即可。

 

分享到:
评论

相关推荐

    Hadoop集群搭建部署与MapReduce程序关键点个性化开发.doc

    本文将详细阐述如何搭建Hadoop集群以及进行MapReduce程序的关键点个性化开发。 首先,我们来看任务1——Hadoop集群的部署。这一步至关重要,因为它为整个大数据处理系统提供了基础架构。在虚拟机中安装Ubuntu Kylin...

    Hadoop集群配置及MapReduce开发手册

    《Hadoop集群配置及MapReduce开发手册》是针对大数据处理领域的重要参考资料,主要涵盖了Hadoop分布式计算框架的安装、配置以及MapReduce编程模型的详细解析。Hadoop作为Apache基金会的一个开源项目,因其分布式存储...

    使用IBM的MapReduce Tools for Eclipse插件简化Hadoop开发和部署文档

    【IBM的MapReduce Tools for Eclipse插件】是IBM推出的一款集成开发环境插件,专为简化Hadoop MapReduce应用程序的开发、调试和部署而设计。它整合了Eclipse的功能,使得开发者无需深入理解Hadoop集群的复杂配置,就...

    Hadoop MapReduce开发

    在Hadoop MapReduce开发的过程中,工程化的方法是必不可少的,这涉及到了从编写代码、单元测试、本地测试、集群测试到性能优化的完整流程。 首先,MapReduce应用的开发流程可以细分为以下步骤: 1. 编写Map和...

    hadoop eclipse mapreduce 下开发所有需要用到的 JAR 包

    在Hadoop生态系统中,Eclipse是一个常用的集成开发环境(IDE),用于编写MapReduce程序。...通过熟练掌握这些JAR包和开发流程,你将能够充分利用Hadoop Eclipse插件进行高效、便捷的MapReduce开发。

    mapreduce_eclipse开发需要的所有包

    总的来说,"mapreduce_eclipse开发需要的所有包"是指一系列工具和库的集合,它们共同构建了一个完整的MapReduce开发环境,使开发者能够在Eclipse中高效地编写、测试和部署MapReduce应用。这些工具涵盖了开发、测试、...

    hadoop eclipse mapreduce下开发所有需要用到的JAR包

    标题提到的“hadoop eclipse mapreduce下开发所有需要用到的JAR包”是指为了在Eclipse中进行MapReduce开发,我们需要引入一系列的依赖库,这些库通常以JAR(Java Archive)文件的形式存在。这些JAR包提供了Hadoop ...

    阿里云 专有云企业版 V3.5.2 E-MapReduce 开发指南 - 20190326.pdf

    这些步骤是使用E-MapReduce进行开发和部署的前提,为开发者快速上手提供了必要的准备指导。 接下来,指南对E-MapReduce的基本概念进行了说明,包括E-MapReduce的定义、架构以及运行机制等。在这一部分,开发者可以...

    阿里云 专有云企业版 V3.9.0 E-MapReduce 开发指南 20191017.pdf

    阿里云专有云企业版V3.9.0的E-MapReduce开发指南是一份详细介绍如何在阿里云平台上开发和管理MapReduce作业的重要文档。E-MapReduce是阿里云提供的一个大数据处理服务,它基于开源的Apache Hadoop和Spark框架,为...

    阿里云 专有云企业版 V3.7.0 E-MapReduce 开发指南 20190320.pdf

    阿里云专有云企业版V3.7.0的E-MapReduce开发指南是一份针对开发者和管理员的重要参考资料,旨在帮助用户理解和利用E-MapReduce服务进行大数据处理。E-MapReduce是阿里云提供的基于Hadoop和Spark的云计算服务,它简化...

    阿里云 专有云Enterprise版 V3.5.0 E-MapReduce 开发指南 - 20190327.pdf

    阿里云专有云Enterprise版V3.5.0的E-MapReduce开发指南是一份详细介绍如何在阿里云平台上使用E-MapReduce服务的文档。E-MapReduce是基于开源Hadoop和Spark生态构建的企业级大数据处理服务,它为企业提供了便捷、高效...

    eclipse運行mapreduce的插件

    安装后,Eclipse会添加新的项目类型、构建配置以及调试选项,以适应MapReduce开发的需求。开发者可以创建MapReduce项目,导入相关的Hadoop库,设置输入和输出路径,甚至可以直接在Eclipse内部查看作业的执行日志和...

    大数据MapReduce和YARN二次开发.pdf

    大数据MapReduce和YARN二次开发 大数据MapReduce和YARN二次开发是大数据处理技术的重要组成部分,本文档将详细介绍MapReduce的过程、搭建开发环境、运行程序和MR开发接口介绍。 MapReduce的过程 MapReduce是...

    阿里云 专有云企业版 V3.6.1 E-MapReduce 开发指南 - 20190326.pdf

    阿里云专有云企业版V3.6.1的E-MapReduce开发指南是一份详细的技术文档,旨在帮助开发者和用户更好地理解和使用阿里云的E-MapReduce服务。E-MapReduce是基于开源Hadoop和Spark框架构建的,为企业提供了大规模数据处理...

    大数据技术基础实验报告-MapReduce编程.doc

    总的来说,这个实验报告详细介绍了如何在Eclipse环境中配置MapReduce开发环境,以及如何创建和运行基本的MapReduce作业。理解并掌握这些步骤对于学习和实践大数据处理至关重要,因为MapReduce是处理大规模数据集的...

    eclipse运行MapReduce架包

    这个插件允许开发者在Eclipse的工作空间内创建Hadoop项目,并将MapReduce作业部署到远程Hadoop集群上。安装该插件后,Eclipse会增加新的菜单选项和视图,以便用户可以方便地管理Hadoop作业。 接着,`hadoop.dll`和`...

    华为MapReduce服务应用开发指南.rar

    《华为MapReduce服务应用开发指南》是一份详细阐述如何在华为云平台上开发和部署MapReduce应用的教程。MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。华为云服务提供了对MapReduce...

Global site tag (gtag.js) - Google Analytics