前面几篇文章的梳理让我对hadoop新yarn 框架有了一个大概的认识,今天开始回归老本行---开始coding。
因为涉及到linux系统部署,所以今天安了一个linux 的 lszrz 插件
下载并解压缩 lrzsz-0.12.20.tar.gz
安装之前,需要检查系统是否有gcc 若没有请安装 yum install gcc
安装lrzsz ./configure && make && make install
上面安装过程默认把lsz和lrz安装到了/usr/local/bin/目录下, 下面创建软链接, 并命名为rz/sz:
# cd /usr/bin
# ln -s /usr/local/bin/lrz rz
# ln -s /usr/local/bin/lsz sz
开始写代码 首先导入相应的包
commons-beanutils-1.7.0.jar
commons-beanutils-core-1.8.0.jar
commons-cli-1.2.jar
commons-codec-1.4.jar
commons-collections-3.2.1.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
commons-digester-1.8.jar
commons-el-1.0.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
commons-logging-1.0.4.jar
commons-logging.jar
guava-11.0.2.jar
hadoop-common-2.5.2.jar
hadoop-mapreduce-client-core-2.5.2.jar
log4j-1.2.14.jar
mockito-all-1.8.5.jar
mrunit-1.1.0-hadoop2.jar
powermock-mockito-1.4.9-full.jar
在此我们写一个分析每年最高气温的任务,气温数据格式如下
1901 01 01 06 -38 -9999 10200 270 159 8 -9999 -9999
其中1901 为年份 01 01 为月份 -38为气温
开始编写mapper 代码如下
package com.snwz.mapreduce; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class MyMapper { private static final Log logger = LogFactory.getLog(MyMapper.class); public static class myMapper extends Mapper<Object, Text, IntWritable, IntWritable> { private static final IntWritable one = new IntWritable(1); private IntWritable key = new IntWritable(); private IntWritable record = new IntWritable(); private IntWritable year = new IntWritable(); private Context context; /** * key 数据偏移量 * value 数据 * context 上下文对象 * * 注:由于要计算每年最高的气温,所以在此我们将年份作为key 气温作为value * 都作为整形来计算 */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if("".equals(line)||null==line){ return; } line = line.replace(" ", "%"); String array[] = line.split("%"); if(array==null || array.length<22){ logger.info("line : "+key+" array length error "+line); return; } if("-9999".equals(array[5])){ logger.info("line : "+key+" temperature error -9999"); return; } year.set(Integer.parseInt(array[0])); int temperature = Integer.parseInt(array[9]); record.set(temperature); context.write(year, record); } } public static void main(String[] args) { } }
reducer 代码如下:
package com.snwz.mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer { public static class myReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{ @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //当年最大气温 int maxTem = 0; for(IntWritable i : values){ maxTem = Math.max(maxTem, i.get()); } context.write(key, new IntWritable(maxTem)); } } }
完成之后我们通过一个方便的测试工具mrunit 来进行测试
package com.snwz.mapreduce; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mrunit.mapreduce.MapDriver; import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.junit.Before; import org.junit.Test; import com.snwz.mapreduce.MyMapper.myMapper; import com.snwz.mapreduce.MyReducer.myReducer; public class MpTest { MapDriver<Object, Text, IntWritable, IntWritable> mapDriver; ReduceDriver<IntWritable, IntWritable, IntWritable, IntWritable> reduceDriver; MapReduceDriver<IntWritable, Text, IntWritable, IntWritable, IntWritable, IntWritable> mapReduceDriver; @Before public void setUp(){ System.setProperty("hadoop.home.dir", "E:\\hadoop\\hadoop-2.5.2"); myMapper mapper = new myMapper(); myReducer reducer = new myReducer(); mapDriver = MapDriver.newMapDriver(mapper); reduceDriver = ReduceDriver.newReduceDriver(reducer); } @Test public void testMapper() throws IOException { mapDriver.withInput(new LongWritable(), new Text("1901 01 01 06 -78 -9999 10200 270 159 8 -9999 -9999")); mapDriver.withOutput(new IntWritable(1901), new IntWritable(-78)); mapDriver.runTest(); } @Test public void testReducer() throws IOException { List<IntWritable> values = new ArrayList<IntWritable>(); values.add(new IntWritable(1)); values.add(new IntWritable(2)); values.add(new IntWritable(-48)); values.add(new IntWritable(-12)); reduceDriver.withInput(new IntWritable(1940), values) .withOutput(new IntWritable(1940), new IntWritable(2)) .runTest(); } }
测试通过后 开始编写job 任务
package com.snwz.mapreduce; import java.io.File; import java.util.Date; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.snwz.mapreduce.MyMapper.myMapper; import com.snwz.mapreduce.MyReducer.myReducer; public class MyJob extends Configured implements Tool { private static final Log logger = LogFactory.getLog(MyJob.class); public static void main(String[] args) { try { int res; res = ToolRunner.run(new Configuration(), new MyJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } public int run(String[] args) throws Exception { if (args == null || args.length != 2) { System.out.println("need inputpath and outputpath"); return 1; } // hdfs 输入路径 String inputpath = args[0]; // reduce 结果集输出路径 String outputpath = args[1]; String shortin = args[0]; String shortout = args[1]; if (shortin.indexOf(File.separator) >= 0) shortin = shortin.substring(shortin.lastIndexOf(File.separator)); if (shortout.indexOf(File.separator) >= 0) shortout = shortout.substring(shortout.lastIndexOf(File.separator)); File inputdir = new File(inputpath); File outputdir = new File(outputpath); if (!inputdir.exists() || !inputdir.isDirectory()) { System.out.println("inputpath not exist or isn't dir!"); return 0; } if (!outputdir.exists()) { new File(outputpath).mkdirs(); } Job job = new Job(new JobConf()); job.setJarByClass(MyJob.class); job.setJobName("MyJob"); job.setOutputKeyClass(IntWritable.class);// 输出的 key 类型,在 OutputFormat 会检查 job.setOutputValueClass(IntWritable.class); // 输出的 value 类型,在 OutputFormat 会检查 job.setMapperClass(myMapper.class); job.setCombinerClass(myReducer.class); job.setReducerClass(myReducer.class); FileInputFormat.setInputPaths(job, new Path(shortin));//hdfs 中的输入路径 FileOutputFormat.setOutputPath(job,new Path(shortout));//hdfs 中输出路径 Date startTime = new Date(); logger.info("Job started: " + startTime); job.waitForCompletion(true); Date end_time = new Date(); logger.info("Job ended: " + end_time); logger.info("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); return 0; } }
编写完成后,一个简单的mapreduce就编写完成了,然后通过打包工具将编写的类打成jar包,关联的jar就不需要了,因为hadoop里面的 jar命令会自己去关联相应的jar文件。,打包时 main 方法指定为job即可,将包存放在hadoop根目录,然后将需要分析的文件存放在hdfs系统
清空输出路径 ./bin/hadoop dfs -rmr /output
建立输入路径 ./bin/hadoop dfs -mkdir /input
上传文件 ./bin/hadoop dfs -copyFromLocal 本地路径 hdfs路径
运行jar文件 ./bin/hadoop jar myJob.jar /input /output
运行完成后 进入输出路径 查看输出结果即可。
相关推荐
本文将详细阐述如何搭建Hadoop集群以及进行MapReduce程序的关键点个性化开发。 首先,我们来看任务1——Hadoop集群的部署。这一步至关重要,因为它为整个大数据处理系统提供了基础架构。在虚拟机中安装Ubuntu Kylin...
### 使用IBM MapReduce Tools简化Hadoop开发和部署 在大数据处理领域,Hadoop凭借其强大的分布式数据处理能力占据了举足轻重的地位。然而,对于初学者来说,Hadoop的复杂性和配置过程可能会成为一道难以逾越的门槛...
《Hadoop集群配置及MapReduce开发手册》是针对大数据处理领域的重要参考资料,主要涵盖了Hadoop分布式计算框架的安装、配置以及MapReduce编程模型的详细解析。Hadoop作为Apache基金会的一个开源项目,因其分布式存储...
【IBM的MapReduce Tools for Eclipse插件】是IBM推出的一款集成开发环境插件,专为简化Hadoop MapReduce应用程序的开发、调试和部署而设计。它整合了Eclipse的功能,使得开发者无需深入理解Hadoop集群的复杂配置,就...
该指南涵盖了 E-MapReduce 的基本概念、安装部署、开发指南、常见问题解答等方面的内容。 法律声明是指南的开头部分,该声明明确了使用该指南的法律责任和义务,包括保密义务、禁止擅自摘抄、翻译、复制、传播等...
在Hadoop MapReduce开发的过程中,工程化的方法是必不可少的,这涉及到了从编写代码、单元测试、本地测试、集群测试到性能优化的完整流程。 首先,MapReduce应用的开发流程可以细分为以下步骤: 1. 编写Map和...
在Hadoop生态系统中,Eclipse是一个常用的集成开发环境(IDE),用于编写MapReduce程序。...通过熟练掌握这些JAR包和开发流程,你将能够充分利用Hadoop Eclipse插件进行高效、便捷的MapReduce开发。
总的来说,"mapreduce_eclipse开发需要的所有包"是指一系列工具和库的集合,它们共同构建了一个完整的MapReduce开发环境,使开发者能够在Eclipse中高效地编写、测试和部署MapReduce应用。这些工具涵盖了开发、测试、...
标题提到的“hadoop eclipse mapreduce下开发所有需要用到的JAR包”是指为了在Eclipse中进行MapReduce开发,我们需要引入一系列的依赖库,这些库通常以JAR(Java Archive)文件的形式存在。这些JAR包提供了Hadoop ...
阿里云专有云企业版V3.9.0的E-MapReduce开发指南是一份详细介绍如何在阿里云平台上开发和管理MapReduce作业的重要文档。E-MapReduce是阿里云提供的一个大数据处理服务,它基于开源的Apache Hadoop和Spark框架,为...
阿里云专有云企业版V3.7.0的E-MapReduce开发指南是一份针对开发者和管理员的重要参考资料,旨在帮助用户理解和利用E-MapReduce服务进行大数据处理。E-MapReduce是阿里云提供的基于Hadoop和Spark的云计算服务,它简化...
阿里云专有云Enterprise版V3.5.0的E-MapReduce开发指南是一份详细介绍如何在阿里云平台上使用E-MapReduce服务的文档。E-MapReduce是基于开源Hadoop和Spark生态构建的企业级大数据处理服务,它为企业提供了便捷、高效...
安装后,Eclipse会添加新的项目类型、构建配置以及调试选项,以适应MapReduce开发的需求。开发者可以创建MapReduce项目,导入相关的Hadoop库,设置输入和输出路径,甚至可以直接在Eclipse内部查看作业的执行日志和...
大数据MapReduce和YARN二次开发 大数据MapReduce和YARN二次开发是大数据处理技术的重要组成部分,本文档将详细介绍MapReduce的过程、搭建开发环境、运行程序和MR开发接口介绍。 MapReduce的过程 MapReduce是...
阿里云专有云企业版V3.6.1的E-MapReduce开发指南是一份详细的技术文档,旨在帮助开发者和用户更好地理解和使用阿里云的E-MapReduce服务。E-MapReduce是基于开源Hadoop和Spark框架构建的,为企业提供了大规模数据处理...
总的来说,这个实验报告详细介绍了如何在Eclipse环境中配置MapReduce开发环境,以及如何创建和运行基本的MapReduce作业。理解并掌握这些步骤对于学习和实践大数据处理至关重要,因为MapReduce是处理大规模数据集的...
这个插件允许开发者在Eclipse的工作空间内创建Hadoop项目,并将MapReduce作业部署到远程Hadoop集群上。安装该插件后,Eclipse会增加新的菜单选项和视图,以便用户可以方便地管理Hadoop作业。 接着,`hadoop.dll`和`...
《华为MapReduce服务应用开发指南》是一份详细阐述如何在华为云平台上开发和部署MapReduce应用的教程。MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。华为云服务提供了对MapReduce...