`

2.x MapReduce的测试类

 
阅读更多

1 wordcount

2 倒排序

3 自定义分区(不同规则输出到不同的文件)

4 自定义文件输出

5 统计文件流 

 

 

1 自定义输出类

  

package com.wzt.mapreduce.custom;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * @author root
 *
 * @param <Text> reduce 输出的key类型     value类型  
 * @param <LongWritable>
 */
public class MyCustomOutputFormat<Text, LongWritable> extends FileOutputFormat<Text, LongWritable>{
	
	
	@Override
	public RecordWriter<Text, LongWritable> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {

        Configuration conf = job.getConfiguration();  
		FileSystem hdfs = FileSystem.get(conf);
		FSDataOutputStream os1 = hdfs.create(new Path("/wc/output1/file1.log"));
		FSDataOutputStream os2 = hdfs.create(new Path("/wc/output2/file2.log"));
		
		return new MyRecordWriter<Text, LongWritable>( os1, os2);
	}

	public static class MyRecordWriter<Text, LongWritable> extends RecordWriter<Text, LongWritable>{
		FSDataOutputStream os1 =  null ;
		FSDataOutputStream os2  = null;
		
		public MyRecordWriter(FSDataOutputStream os1, FSDataOutputStream os2) {
			 this.os1 = os1 ;
			 this.os2 = os2 ; 
		}

		@Override
		public void write(Text key, LongWritable value2) throws IOException,
				InterruptedException {

			Long hang = Long.parseLong( value2.toString());
			
			if(hang%2==0){
				os1.writeBytes(key.toString() );
			}else{
				os2.writeBytes(key.toString() );
			}
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			 if(os1!=null){
				 os1.close();
			 }
			 if(os2!=null){
				 os2.close();
			 }
			
		}
	}
}

 2 Mapper 数据整理类

 

package com.wzt.mapreduce.custom;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CIOMapper extends Mapper<LongWritable, Text , Text, LongWritable >{

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		 
		String line = value.toString();
		String[] words  = StringUtils.split(line, " ");
		for(String word :words ){
			context.write( new Text(word) , key );; 
		}
	}
}

3 运行的主类(Map中数据直接输出所以没有使用到reducer)

 

 

package com.wzt.mapreduce.custom;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class CIORunner {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration() ; 
		Job job = Job.getInstance(conf) ;
		
		job.setJarByClass(CIORunner.class );
		
		job.setMapperClass( CIOMapper.class );
		//job.setReducerClass( CIOReducer.class ); 没有reducer就不用了 
		
		job.setOutputKeyClass( Text.class );
		job.setOutputValueClass(LongWritable.class );
		
		job.setMapOutputKeyClass( Text.class);
		job.setMapOutputValueClass( LongWritable.class );
		
		job.setOutputFormatClass(MyCustomOutputFormat.class);
		
		FileInputFormat.setInputPaths(job,  "/wc/input/xiyou.txt");
		FileOutputFormat.setOutputPath(job,  new Path("/wc/outputcount"));
//		FileInputFormat.setInputPaths(job,  "D:\\wordcount\\wordcount.txt");
//		FileOutputFormat.setOutputPath(job,  new Path("D:\\wordcount\\output"));
 		job.waitForCompletion(true) ; 
		
	}
}

  

 

分享到:
评论

相关推荐

    大数据实验四-MapReduce编程实践

    2. **实际编程经验积累**:通过编写MapReduce程序,积累了实际编程经验,熟悉了Hadoop和MapReduce的API。 3. **分布式计算的认识**:认识到分布式计算的局限性与优势,在实际应用中需要权衡数据规模和计算需求。 4. ...

    生成 hadoop-eclipse-plugin-2.x 插件工具代码

    在IT行业中,Hadoop是一个广泛使用的开源框架,用于处理和存储大规模数据。...通过遵循`README.md`中的指导,结合对这些技术的理解,我们可以成功构建出适配Hadoop 2.x的Eclipse插件,从而提升MapReduce开发效率。

    大数据技术基础实验报告-MapReduce编程.doc

    可以从Github上下载`hadoop2x-eclipse-plugin`,解压缩后,将jar文件复制到Eclipse的plugins目录,并运行Eclipse的清理命令以启用插件。这一步骤只需在首次安装后执行一次。 配置Hadoop-Eclipse-Plugin是实验的关键...

    本地调试所需spark-x.x.x-bin-hadoop包

    总之,Spark-x.x.x-bin-hadoop包为本地调试提供了便利,涵盖了Spark的各种功能和与Hadoop的集成,使开发者能够在本地环境中高效地测试和优化大数据处理任务。通过理解Spark的组件和其与Hadoop的交互方式,你将能更好...

    hadoop1.x的eclipse插件

    2. **JobBuilder**:用于构建MapReduce作业,支持添加Mapper、Reducer类,并能设置输入输出格式、分区器和排序器等。 3. **Job提交与监控**:插件集成了Hadoop作业提交的功能,开发者可以直接在Eclipse中启动作业,...

Global site tag (gtag.js) - Google Analytics