`

2.x MapReduce的测试类

 
阅读更多

1 wordcount

2 倒排序

3 自定义分区(不同规则输出到不同的文件)

4 自定义文件输出

5 统计文件流 

 

 

1 自定义输出类

  

package com.wzt.mapreduce.custom;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * @author root
 *
 * @param <Text> reduce 输出的key类型     value类型  
 * @param <LongWritable>
 */
public class MyCustomOutputFormat<Text, LongWritable> extends FileOutputFormat<Text, LongWritable>{
	
	
	@Override
	public RecordWriter<Text, LongWritable> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {

        Configuration conf = job.getConfiguration();  
		FileSystem hdfs = FileSystem.get(conf);
		FSDataOutputStream os1 = hdfs.create(new Path("/wc/output1/file1.log"));
		FSDataOutputStream os2 = hdfs.create(new Path("/wc/output2/file2.log"));
		
		return new MyRecordWriter<Text, LongWritable>( os1, os2);
	}

	public static class MyRecordWriter<Text, LongWritable> extends RecordWriter<Text, LongWritable>{
		FSDataOutputStream os1 =  null ;
		FSDataOutputStream os2  = null;
		
		public MyRecordWriter(FSDataOutputStream os1, FSDataOutputStream os2) {
			 this.os1 = os1 ;
			 this.os2 = os2 ; 
		}

		@Override
		public void write(Text key, LongWritable value2) throws IOException,
				InterruptedException {

			Long hang = Long.parseLong( value2.toString());
			
			if(hang%2==0){
				os1.writeBytes(key.toString() );
			}else{
				os2.writeBytes(key.toString() );
			}
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			 if(os1!=null){
				 os1.close();
			 }
			 if(os2!=null){
				 os2.close();
			 }
			
		}
	}
}

 2 Mapper 数据整理类

 

package com.wzt.mapreduce.custom;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CIOMapper extends Mapper<LongWritable, Text , Text, LongWritable >{

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		 
		String line = value.toString();
		String[] words  = StringUtils.split(line, " ");
		for(String word :words ){
			context.write( new Text(word) , key );; 
		}
	}
}

3 运行的主类(Map中数据直接输出所以没有使用到reducer)

 

 

package com.wzt.mapreduce.custom;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class CIORunner {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration() ; 
		Job job = Job.getInstance(conf) ;
		
		job.setJarByClass(CIORunner.class );
		
		job.setMapperClass( CIOMapper.class );
		//job.setReducerClass( CIOReducer.class ); 没有reducer就不用了 
		
		job.setOutputKeyClass( Text.class );
		job.setOutputValueClass(LongWritable.class );
		
		job.setMapOutputKeyClass( Text.class);
		job.setMapOutputValueClass( LongWritable.class );
		
		job.setOutputFormatClass(MyCustomOutputFormat.class);
		
		FileInputFormat.setInputPaths(job,  "/wc/input/xiyou.txt");
		FileOutputFormat.setOutputPath(job,  new Path("/wc/outputcount"));
//		FileInputFormat.setInputPaths(job,  "D:\\wordcount\\wordcount.txt");
//		FileOutputFormat.setOutputPath(job,  new Path("D:\\wordcount\\output"));
 		job.waitForCompletion(true) ; 
		
	}
}

  

 

分享到:
评论

相关推荐

    大数据实验四-MapReduce编程实践

    2. **实际编程经验积累**:通过编写MapReduce程序,积累了实际编程经验,熟悉了Hadoop和MapReduce的API。 3. **分布式计算的认识**:认识到分布式计算的局限性与优势,在实际应用中需要权衡数据规模和计算需求。 4. ...

    生成 hadoop-eclipse-plugin-2.x 插件工具代码

    在IT行业中,Hadoop是一个广泛使用的开源框架,用于处理和存储大规模数据。...通过遵循`README.md`中的指导,结合对这些技术的理解,我们可以成功构建出适配Hadoop 2.x的Eclipse插件,从而提升MapReduce开发效率。

    大数据技术基础实验报告-MapReduce编程.doc

    可以从Github上下载`hadoop2x-eclipse-plugin`,解压缩后,将jar文件复制到Eclipse的plugins目录,并运行Eclipse的清理命令以启用插件。这一步骤只需在首次安装后执行一次。 配置Hadoop-Eclipse-Plugin是实验的关键...

    本地调试所需spark-x.x.x-bin-hadoop包

    总之,Spark-x.x.x-bin-hadoop包为本地调试提供了便利,涵盖了Spark的各种功能和与Hadoop的集成,使开发者能够在本地环境中高效地测试和优化大数据处理任务。通过理解Spark的组件和其与Hadoop的交互方式,你将能更好...

    hadoop1.x的eclipse插件

    2. **JobBuilder**:用于构建MapReduce作业,支持添加Mapper、Reducer类,并能设置输入输出格式、分区器和排序器等。 3. **Job提交与监控**:插件集成了Hadoop作业提交的功能,开发者可以直接在Eclipse中启动作业,...

    MapReduce2中自定义排序分组

    MapReduce2(也称为 YARN)是 Hadoop 2.x 版本引入的重要改进,旨在解决原 MapReduce 的资源管理和调度问题。本文将详细探讨在 MapReduce2 中如何实现自定义排序和分组,以满足特定的数据处理需求。 首先,了解 ...

    hadoop2x-eclipse-plugin

    为了方便Hadoop开发者在Eclipse中进行Hadoop应用的开发和测试,出现了Hadoop2x-eclipse-plugin插件。这个插件允许开发者直接在Eclipse环境中创建、编译、运行和调试Hadoop MapReduce项目,极大地提升了开发效率。 ...

    大数据平台构建:YARN中运行Mapreduce程序.pptx

    1. **作业提交**:MapReduce 2.x(即Hadoop 2.x及以后版本)的作业提交使用与MapReduce 1.x相同的用户API。用户编写完成MapReduce程序后,通过Hadoop的客户端提交作业。 2. **获取作业ID**:作业客户端向资源管理器...

    Mapreduce编程.docx

    ### MapReduce编程基础与实践 #### 一、MapReduce简介 MapReduce是一种编程模型,用于大规模数据集(通常是大于1TB)的并行运算。...通过这种方式,开发者可以快速地开始编写和测试MapReduce应用程序。

    Mrunit-1.1.0-hadoop2

    - 编写测试类:Mrunit提供了模拟MapReduce作业的类,如`org.apache.hadoop.mapreduce.lib.map.MockMapper`和`org.apache.hadoop.mapreduce.lib.reduce.MockReducer`。开发者可以通过继承这些类,覆盖必要的方法来...

    MapReduce学习笔记,亲自测试写出来的,1000分都不贵

    YARN (Yet Another Resource Negotiator) 是 Hadoop 2.x 版本之后引入的一个资源管理系统。它负责为应用程序分配资源,并且监控应用程序的运行状态。YARN 提供了一个通用的平台,可以在该平台上运行各种类型的数据...

    Hadoop专业解决方案-第5章开发可靠的MapReduce应用.docx

    通过编写测试类,将Mapper和Reducer作为参数传递给测试方法,例如,可以使用`@Test`注解标记测试方法,并使用MRUnit提供的API来模拟输入数据和验证输出结果。 测试Mapper类时,MRUnit提供了直观的API,可以直接设置...

    大数据与云计算培训学习资料 Hadoop的MapReduce中多文件输出 共9页.pdf

    在Hadoop 0.19.x版本中,提供了一个名为`MultipleOutputFormat`的类,它允许MapReduce任务输出多个文件并自定义文件名。但是,从Hadoop 0.20.x开始,`MultipleOutputFormat`及其所在包的类被标记为"已过时",这意味...

    Hadoop的MapReduce中多文件输出.pdf

    在 Hadoop 0.19.X 版本中,提供了一个 org.apache.hadoop.mapred.lib.MultipleOutputFormat 类,可以输出多份文件且可以自定义文件名。但是,从 Hadoop 0.20.x 版本开始,MultipleOutputFormat 所在包的所有类被标记...

    hadoop2lib.tar.gz

    Hadoop2lib还可能包含Hadoop MapReduce库,这是实现MapReduce任务的关键,它提供了编写和执行MapReduce作业所需的类和接口。此外,YARN(Yet Another Resource Negotiator)作为Hadoop 2.x的新特性,是资源管理和...

    mrunit-1.1.0.jar

    在使用"mrunit-1.1.0-hadoop2.jar"时,开发人员应确保他们的开发环境已经配置好Hadoop 2.x的相关依赖,然后将MRUnit库添加到项目中,编写测试类并继承MRUnit提供的基类,如`TestMapper`和`TestReducer`,利用提供的...

    MR Unit test需要的相关jar包

    2. 编写测试类:创建一个继承自`org.junit.Test`的类,并在其中定义各种测试方法,每个方法针对MapReduce作业的一个特定部分进行测试。 3. 使用MRUnit API:在测试方法中,使用MRUnit提供的`runTest()`、`assertMap...

Global site tag (gtag.js) - Google Analytics