`

自定义OutputFormat

 
阅读更多

 

 

 

0 目标:

自定义OutputFormat, 指定输出文件名,并对输出的key-value在不同key下分别输出到不同目标文件

 

1 代码:

 

package outputformat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

/**
 * 自定义OutputFormat, 指定输出文件名,并对输出的key-value在不同key下分别输出到不同目标文件
 * 执行结果:
 * [root@master hadoop]# hadoop fs -lsr /
Warning: $HADOOP_HOME is deprecated.

-rw-r--r--   3 zm supergroup         19 2014-12-02 04:17 /hello
-rw-r--r--   3 zm supergroup         19 2014-12-02 04:16 /hello2
drwxr-xr-x   - zm supergroup          0 2014-12-04 00:17 /out
-rw-r--r--   3 zm supergroup          8 2014-12-04 00:17 /out/abc
-rw-r--r--   3 zm supergroup         11 2014-12-04 00:17 /out/other


[root@master hadoop]# hadoop fs -text /out/other
Warning: $HADOOP_HOME is deprecated.

me      1
you     1
[root@master hadoop]# hadoop fs -text /out/abc
Warning: $HADOOP_HOME is deprecated.

hello   2
 * @author zm
 *
 */
public class MySlefOutputFormatApp {
	private static final String INPUT_PATH = "hdfs://master:9000/hello";
	private static final String OUT_PATH = "hdfs://master:9000/out";
	private static final String OUT_FIE_NAME = "/abc";
	private static final String OUT_FIE_NAME1 = "/other";

	public static void main(String[] args) throws Exception{
		// 定义conf
		Configuration conf = new Configuration();
		final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf);
		if(filesystem.exists(new Path(OUT_PATH))){
			filesystem.delete(new Path(OUT_PATH), true);
		}
		// 定义job
		final Job job = new Job(conf , MySlefOutputFormatApp.class.getSimpleName());
		job.setJarByClass(MySlefOutputFormatApp.class);
		
		// 定义InputFormat
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);
		// 定义map
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		// 定义reduce
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		// 定义OutputFormat 自定义的MySelfTextOutputFormat中指定了输出文件名称
		job.setOutputFormatClass(MySelfTextOutputFormat.class);
		
		job.waitForCompletion(true);
	}
	
	public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		//
		protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
			//
			final String line = value.toString();
			final String[] splited = line.split("\t");
			
			//
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
			}
		};
	}
	
	//map产生的<k,v>分发到reduce的过程称作shuffle
	public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text key, java.lang.Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
			//
			long count = 0L;
			for (LongWritable times : values) {
				count += times.get();
			}
			context.write(key, new LongWritable(count));
		};
	}
	
	
	// 自定义OutputFormat,定义好输出数据的FSDataOutputStream位置
	public static class MySelfTextOutputFormat extends OutputFormat<Text, LongWritable>{
		FSDataOutputStream outputStream =  null;// 自定义OutputFormat中,通过FSDataOutputStream将数据写出到hdfs中
		FSDataOutputStream outputStream1 =  null;
		@Override
		public RecordWriter<Text, LongWritable> getRecordWriter(
				TaskAttemptContext context) throws IOException,
				InterruptedException {
			try {
				final FileSystem fileSystem = FileSystem.get(new URI(MySlefOutputFormatApp.OUT_PATH), context.getConfiguration());
				//指定的是输出文件路径
				final Path opath = new Path(MySlefOutputFormatApp.OUT_PATH+OUT_FIE_NAME);
				final Path opath1 = new Path(MySlefOutputFormatApp.OUT_PATH+OUT_FIE_NAME1);
				if(fileSystem.exists(opath)){
					fileSystem.delete(opath,true);
				}
				if(fileSystem.exists(opath1)){
					fileSystem.delete(opath1,true);
				}
				
				this.outputStream = fileSystem.create(opath, false);
				this.outputStream1 = fileSystem.create(opath1, false);
			} catch (URISyntaxException e) {
				e.printStackTrace();
			}
			// 自定义 RecordWriter
			return new MySlefRecordWriter(outputStream,outputStream1);
		}

		@Override
		public void checkOutputSpecs(JobContext context) throws IOException,
				InterruptedException {
			
		}

		@Override
		public OutputCommitter getOutputCommitter(TaskAttemptContext context)
				throws IOException, InterruptedException {
			return new FileOutputCommitter(new Path(MySlefOutputFormatApp.OUT_PATH), context);
		}

	}
	
	// 自定义 RecordWriter
	public static class MySlefRecordWriter extends RecordWriter<Text, LongWritable>{
		FSDataOutputStream outputStream = null;
		FSDataOutputStream outputStream1 = null;

		public MySlefRecordWriter(FSDataOutputStream outputStream,FSDataOutputStream outputStream1) {
			this.outputStream = outputStream;
			this.outputStream1 = outputStream1;
		}

		@Override
		public void write(Text key, LongWritable value) throws IOException,
				InterruptedException {
			
			if(key.toString().equals("hello")){
				outToHdfs(outputStream,key, value);
			}else{
				outToHdfs(outputStream1,key, value);
			}
			
		}

		private void outToHdfs(FSDataOutputStream outputStream,Text key, LongWritable value) throws IOException {
			outputStream.writeBytes(key.toString());
			outputStream.writeBytes("\t");
			//this.outputStream.writeLong(value.get()); 这种方式输出的话,在hdfs中是不会正常显示的 需要用如下方式才可
			outputStream.writeBytes(value.toString());
			outputStream.writeBytes("\n");
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			this.outputStream.close();
		}
		
	}
}

 

 

2 说明:

MySelfTextOutputFormat    指定输出文件名路径并创建,后和outputStream关联上,并将outputStream传递给MySlefRecordWriter
MySlefRecordWriter.write(key,value)中内做业务判断,将每次传来的key做判断,将key,value通过outputStream输出到不同文件中

分享到:
评论
1 楼 jaingbei 2018-02-06  
可以通过继承FileOutputFormat来简化相关代码

相关推荐

    MapReduce之自定义OutPutFormat.md

    MapReduce之自定义 OutPutFormat,通过一个案例,实现自定义的一个OutPutFormat,来更加的深刻的理解MR的过程

    java 中自定义OutputFormat的实例详解

    Java 中自定义 OutputFormat 的实例详解 Java 中的 OutputFormat 是 Hadoop 的 MapReduce 框架中的一种输出格式,它负责将 MapReduce 任务的输出结果写入到文件系统中。 OutputFormat 是一个抽象类,需要继承该类并...

    【MapReduce篇04】MapReduce之OutputFormat数据输出1

    比如,当需要控制输出路径或格式时,或者当希望数据以特定方式进行编码和解码时,就需要自定义OutputFormat。实现自定义OutputFormat通常包括以下步骤: - (1) 创建一个新的类继承自FileOutputFormat。 - (2) 重写...

    自定义inputFormat&&outputFormat1

    自定义inputFormat&&outputFormat1

    MapReduce:Nkeys,Nfiles终极解决方案.docx

    4. **配置和使用自定义OutputFormat**:在实际的MapReduce作业中,需要设置`mapreduce.job.outputformat.class`配置项,将其值设置为`OrcReNameFileOutputFormat`,这样作业就会使用这个定制的OutputFormat,从而...

    mapreduce高级特性3

    结合案例讲解mr重要知识点1.1 多表连接1.2 mr各组件之间数据传递1.3 mr中压缩设置1.4 多个job之间有序执行1.5 自定义outputFormat

    大数据框架整理.pdf

    自定义JavaBean作为MapReduce的数据类型需实现WritableComparable接口,同时可自定义OutputFormat以适应特定的输出需求。MapReduce适用于多种场景,如排序、统计TopN、join操作和寻找共同好友问题。 Hive是一个基于...

    拓思爱诺大数据第五天-mapreduce编程

    自定义OutputFormat可以将结果写入各种存储系统。 #### 总结 通过本章的学习,我们了解到MapReduce作为一种分布式计算框架的重要性和其实现原理。MapReduce不仅简化了大规模数据处理的复杂性,还通过其灵活的设计...

    mapreduce_training:用于教学目的的MapReduce应用程序集

    MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外,请查看,了解如何实现MapReduce联接。 包装库 ...

    22_尚硅谷大数据之MapReduce_常见错误及解决方案1

    10. 自定义outputformat时,注意在recordWirter中的close方法必须关闭流资源。否则输出的文件内容中数据为空。解决方案:在recordWirter中的close方法中关闭流资源,确保正确地关闭了流资源。 这些错误和解决方案将...

    Hadoop-Multiple-Output:一个使用hadoop处理数据的例子,实现结果的多输出

    多输出在Hadoop中的实现通常涉及自定义OutputFormat类,该类扩展了Hadoop的`org.apache.hadoop.mapreduce.OutputFormat`。通过重写`getRecordWriter`方法,我们可以为每个输出创建一个RecordWriter实例,分别负责...

    【Android】自定义录音、播放动画View,让你的录音浪起来

    mediaRecorder.setOutputFormat(MediaRecorder.OutputFormat.THREE_GPP); mediaRecorder.setAudioEncoder(MediaRecorder.AudioEncoder.AMR_NB); mediaRecorder.setOutputFile("/path/to/output/file.3gp"); ``` ...

    Android (系统+自定义)短视频录制

    mediaRecorder.setOutputFormat(MediaRecorder.OutputFormat.MPEG_4); mediaRecorder.setOutputFile("/path/to/output.mp4"); mediaRecorder.setVideoEncoder(MediaRecorder.VideoEncoder.H264); mediaRecorder....

    自定义view 录音 圆形进度条

    1. **初始化**:在开始录音前,需实例化`MediaRecorder`,并设置音频源(如`MediaRecorder.AudioSource.MIC`)、输出格式(如`MediaRecorder.OutputFormat.THREE_GPP`)和音频编码(如`MediaRecorder.AudioEncoder....

    android自定义Camera实现录像和拍照

    mRecorder.setOutputFormat(MediaRecorder.OutputFormat.THREE_GPP); mRecorder.setVideoEncoder(MediaRecorder.VideoEncoder.H264); mRecorder.setAudioEncoder(MediaRecorder.AudioEncoder.AAC); mRecorder....

    Android自定义录制视频功能

    - `mediaRecorder.setOutputFormat(MediaRecorder.OutputFormat.THREE_GPP)`:设置输出文件的格式,这里选择3GPP格式,一个轻量级的视频格式。 - `mediaRecorder.setVideoFrameRate(3)`:设置每秒录制的帧数,3帧...

    探索HadoopOutputFormat

    Hadoop常常被用作大型数据处理生态系统中的一部分。它的优势在于能够批量地处理大量数据,并将结果以最好的方式与其他系统相集成。...OutputFormat将Map/Reduce作业的输出结果转换为其他应用程序可读的方式,从而

    基于Java的Hadoop HDFS和MapReduce实践案例设计源码

    内容涵盖HDFS的JAVA API操作,如文件读取、写入、删除、元数据查询和文件列表等,以及MapReduce编程模型的多个应用,包括求平均数、Join操作、TopK算法、二次排序,并涉及自定义InputFormat、OutputFormat和shuflle...

Global site tag (gtag.js) - Google Analytics