1 wordcount
2 倒排序
3 自定义分区(不同规则输出到不同的文件)
4 自定义文件输出
5 统计文件流
1 自定义输出类
package com.wzt.mapreduce.custom; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author root * * @param <Text> reduce 输出的key类型 value类型 * @param <LongWritable> */ public class MyCustomOutputFormat<Text, LongWritable> extends FileOutputFormat<Text, LongWritable>{ @Override public RecordWriter<Text, LongWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); FileSystem hdfs = FileSystem.get(conf); FSDataOutputStream os1 = hdfs.create(new Path("/wc/output1/file1.log")); FSDataOutputStream os2 = hdfs.create(new Path("/wc/output2/file2.log")); return new MyRecordWriter<Text, LongWritable>( os1, os2); } public static class MyRecordWriter<Text, LongWritable> extends RecordWriter<Text, LongWritable>{ FSDataOutputStream os1 = null ; FSDataOutputStream os2 = null; public MyRecordWriter(FSDataOutputStream os1, FSDataOutputStream os2) { this.os1 = os1 ; this.os2 = os2 ; } @Override public void write(Text key, LongWritable value2) throws IOException, InterruptedException { Long hang = Long.parseLong( value2.toString()); if(hang%2==0){ os1.writeBytes(key.toString() ); }else{ os2.writeBytes(key.toString() ); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(os1!=null){ os1.close(); } if(os2!=null){ os2.close(); } } } }
2 Mapper 数据整理类
package com.wzt.mapreduce.custom; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class CIOMapper extends Mapper<LongWritable, Text , Text, LongWritable >{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = StringUtils.split(line, " "); for(String word :words ){ context.write( new Text(word) , key );; } } }
3 运行的主类(Map中数据直接输出所以没有使用到reducer)
package com.wzt.mapreduce.custom; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CIORunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration() ; Job job = Job.getInstance(conf) ; job.setJarByClass(CIORunner.class ); job.setMapperClass( CIOMapper.class ); //job.setReducerClass( CIOReducer.class ); 没有reducer就不用了 job.setOutputKeyClass( Text.class ); job.setOutputValueClass(LongWritable.class ); job.setMapOutputKeyClass( Text.class); job.setMapOutputValueClass( LongWritable.class ); job.setOutputFormatClass(MyCustomOutputFormat.class); FileInputFormat.setInputPaths(job, "/wc/input/xiyou.txt"); FileOutputFormat.setOutputPath(job, new Path("/wc/outputcount")); // FileInputFormat.setInputPaths(job, "D:\\wordcount\\wordcount.txt"); // FileOutputFormat.setOutputPath(job, new Path("D:\\wordcount\\output")); job.waitForCompletion(true) ; } }
相关推荐
2. **实际编程经验积累**:通过编写MapReduce程序,积累了实际编程经验,熟悉了Hadoop和MapReduce的API。 3. **分布式计算的认识**:认识到分布式计算的局限性与优势,在实际应用中需要权衡数据规模和计算需求。 4. ...
在IT行业中,Hadoop是一个广泛使用的开源框架,用于处理和存储大规模数据。...通过遵循`README.md`中的指导,结合对这些技术的理解,我们可以成功构建出适配Hadoop 2.x的Eclipse插件,从而提升MapReduce开发效率。
可以从Github上下载`hadoop2x-eclipse-plugin`,解压缩后,将jar文件复制到Eclipse的plugins目录,并运行Eclipse的清理命令以启用插件。这一步骤只需在首次安装后执行一次。 配置Hadoop-Eclipse-Plugin是实验的关键...
总之,Spark-x.x.x-bin-hadoop包为本地调试提供了便利,涵盖了Spark的各种功能和与Hadoop的集成,使开发者能够在本地环境中高效地测试和优化大数据处理任务。通过理解Spark的组件和其与Hadoop的交互方式,你将能更好...
2. **JobBuilder**:用于构建MapReduce作业,支持添加Mapper、Reducer类,并能设置输入输出格式、分区器和排序器等。 3. **Job提交与监控**:插件集成了Hadoop作业提交的功能,开发者可以直接在Eclipse中启动作业,...
MapReduce2(也称为 YARN)是 Hadoop 2.x 版本引入的重要改进,旨在解决原 MapReduce 的资源管理和调度问题。本文将详细探讨在 MapReduce2 中如何实现自定义排序和分组,以满足特定的数据处理需求。 首先,了解 ...
为了方便Hadoop开发者在Eclipse中进行Hadoop应用的开发和测试,出现了Hadoop2x-eclipse-plugin插件。这个插件允许开发者直接在Eclipse环境中创建、编译、运行和调试Hadoop MapReduce项目,极大地提升了开发效率。 ...
1. **作业提交**:MapReduce 2.x(即Hadoop 2.x及以后版本)的作业提交使用与MapReduce 1.x相同的用户API。用户编写完成MapReduce程序后,通过Hadoop的客户端提交作业。 2. **获取作业ID**:作业客户端向资源管理器...
### MapReduce编程基础与实践 #### 一、MapReduce简介 MapReduce是一种编程模型,用于大规模数据集(通常是大于1TB)的并行运算。...通过这种方式,开发者可以快速地开始编写和测试MapReduce应用程序。
- 编写测试类:Mrunit提供了模拟MapReduce作业的类,如`org.apache.hadoop.mapreduce.lib.map.MockMapper`和`org.apache.hadoop.mapreduce.lib.reduce.MockReducer`。开发者可以通过继承这些类,覆盖必要的方法来...
YARN (Yet Another Resource Negotiator) 是 Hadoop 2.x 版本之后引入的一个资源管理系统。它负责为应用程序分配资源,并且监控应用程序的运行状态。YARN 提供了一个通用的平台,可以在该平台上运行各种类型的数据...
通过编写测试类,将Mapper和Reducer作为参数传递给测试方法,例如,可以使用`@Test`注解标记测试方法,并使用MRUnit提供的API来模拟输入数据和验证输出结果。 测试Mapper类时,MRUnit提供了直观的API,可以直接设置...
在Hadoop 0.19.x版本中,提供了一个名为`MultipleOutputFormat`的类,它允许MapReduce任务输出多个文件并自定义文件名。但是,从Hadoop 0.20.x开始,`MultipleOutputFormat`及其所在包的类被标记为"已过时",这意味...
在 Hadoop 0.19.X 版本中,提供了一个 org.apache.hadoop.mapred.lib.MultipleOutputFormat 类,可以输出多份文件且可以自定义文件名。但是,从 Hadoop 0.20.x 版本开始,MultipleOutputFormat 所在包的所有类被标记...
在使用"mrunit-1.1.0-hadoop2.jar"时,开发人员应确保他们的开发环境已经配置好Hadoop 2.x的相关依赖,然后将MRUnit库添加到项目中,编写测试类并继承MRUnit提供的基类,如`TestMapper`和`TestReducer`,利用提供的...
Hadoop2lib还可能包含Hadoop MapReduce库,这是实现MapReduce任务的关键,它提供了编写和执行MapReduce作业所需的类和接口。此外,YARN(Yet Another Resource Negotiator)作为Hadoop 2.x的新特性,是资源管理和...
2. 编写测试类:创建一个继承自`org.junit.Test`的类,并在其中定义各种测试方法,每个方法针对MapReduce作业的一个特定部分进行测试。 3. 使用MRUnit API:在测试方法中,使用MRUnit提供的`runTest()`、`assertMap...