`

自定义InputFormat

 
阅读更多

 

 

0 引子:

 

InputFormt的各种实现类是针对不同数据源来定义的,

比如针对文件类型的FileInputFormat ,针对DB的DBInputFormat,

如果数据源是别的样子的,那么该怎么做?

这时候就需要自定义InputFormat了

 

参考图:

 

 

 

需要明确的一个流程:

 

一个split ----> 对应一个map任务,

一个split-----> 在经过redordreader处理后,会产生很多<k1,v1>对,每一对都会调用mapp.class.map()方法

 

 

 

 

1 模拟FileInputFormat , FileSplit , LineRecordReader 来自定义 InputFormat实例,

从内存中获取数值进行mapreduce计算:

 

package inputformat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
   自定义InputFormat,用于解析指定类型的数据源,下面案例中
 * 数据源来自于内存(不是来在于hdfs / db )
 * 在执行map  reduce任务时,因为数据都是随机产生的, 因此执行的map 和 reduce任务时,  没有合并
 * 
 * 下面案例中 自定义 split时,产生了三个split, 每个split数据源为 [长度为10的数组],每个split都会对应一个map任务,
 * 每个split的数组最后通过自定义recordreader后,最后生成10个<k,v>,这10个<k,v>会依次调用mapper.map方法
 * 这个流程可以通过运行此案例,看System.out.println("MyMapper map value is " + line);即可推到到
 * 
 * 
 *fileInputFormat中,
 *一个split对应64M大小的hdfs文件,类比单词计数功能中一个hello文件占64M大小,加入者64M有6400行
 *那么最后通过LineRecordReader生成 6400个<k,v> 每个<k,v>分别代表<当前行起始位置,当前行文本内容>
 *这 6400个<k,v>会调用mapper.class.map()方法6400次
 * 
 * 结果如下:
 * MyMapper map value is Text53348
14/12/03 13:17:49 INFO mapred.MapTask: Starting flush of map output
MyMapper map value is Text320473
MyMapper map value is Text320021
MyMapper map value is Text768245
MyMapper map value is Text642733
MyMapper map value is Text48407
MyMapper map value is Text789931
MyMapper map value is Text651215
MyMapper map value is Text552616
MyMapper map value is Text968669
14/12/03 13:17:50 INFO mapred.MapTask: Finished spill 0
14/12/03 13:17:50 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/03 13:17:50 INFO mapred.LocalJobRunner: 
14/12/03 13:17:50 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
14/12/03 13:17:50 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/12/03 13:17:50 INFO mapred.MapTask: io.sort.mb = 100
14/12/03 13:17:50 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/03 13:17:50 INFO mapred.MapTask: record buffer = 262144/327680
MyMapper map value is Text150861
MyMapper map value is Text428272
MyMapper map value is Text695122
MyMapper map value is Text401944
MyMapper map value is Text405576
MyMapper map value is Text651821
MyMapper map value is Text497050
MyMapper map value is Text447011
MyMapper map value is Text918767
MyMapper map value is Text567241
14/12/03 13:17:50 INFO mapred.MapTask: Starting flush of map output
14/12/03 13:17:50 INFO mapred.MapTask: Finished spill 0
14/12/03 13:17:50 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/03 13:17:50 INFO mapred.LocalJobRunner: 
14/12/03 13:17:50 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done.
14/12/03 13:17:50 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/12/03 13:17:50 INFO mapred.MapTask: io.sort.mb = 100
14/12/03 13:17:50 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/03 13:17:50 INFO mapred.MapTask: record buffer = 262144/327680
MyMapper map value is Text403321
MyMapper map value is Text600217
MyMapper map value is Text387387
MyMapper map value is Text63766
MyMapper map value is Text240805
MyMapper map value is Text247539
MyMapper map value is Text901107
MyMapper map value is Text766337
MyMapper map value is Text523199
MyMapper map value is Text780722





Text205845      1
Text213736      1
Text23425       1
Text267465      1
Text287345      1
Text287679      1
Text297910      1
Text311346      1
Text341091      1
Text418038      1
Text491331      1
Text523116      1
Text528894      1
Text621959      1
Text641714      1
Text64916       1
Text660309      1
Text699375      1
Text713395      1
Text754231      1
Text788194      1
Text812630      1
Text817771      1
Text862128      1
Text870210      1
Text916419      1
Text919783      1
Text932819      1
Text93461       1
Text974656      1

 */
public class MyselInputFormatApp {
	private static final String OUT_PATH = "hdfs://master:9000/out";

	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();
		final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf);
		if(filesystem.exists(new Path(OUT_PATH))){
			filesystem.delete(new Path(OUT_PATH), true);
		}
		
		final Job job = new Job(conf , MyselInputFormatApp.class.getSimpleName());
		job.setJarByClass(MyselInputFormatApp.class);
		
		job.setInputFormatClass(MyselfMemoryInputFormat.class);
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		
		job.waitForCompletion(true);
	}
	
	public static class MyMapper extends Mapper<NullWritable, Text, Text, LongWritable>{
		//
		protected void map(NullWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<NullWritable,Text,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
			
			/**
			 * 按下面自定义inputformat 自定义split(构造产生内存中的数据,加入为1,2,3..10)和自定义 redordreader 最后 这个split到 map方法时是数据为:
			 * <null,1><null,2>...<null,10>
			 * 那么在执行map后最终输出结果为: <1,1> <2,1>  ....<10,1>
			 */
			final String line = value.toString();
			System.out.println("MyMapper map value is " + line);
			final String[] splited = line.split("\t");
			
			
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
			}
		};
	}
	
	
	
	
	//map产生的<k,v>分发到reduce的过程称作shuffle
	public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text key, java.lang.Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
			//count表示单词key在整个文件中的出现次数
			long count = 0L;
			for (LongWritable times : values) {
				count += times.get();
			}
			context.write(key, new LongWritable(count));
		};
	}
	
	/**
	 * 从内存中产生数据,然后解析成一个个的键值对
	 * 这里自定义MyselfMemoryInputFormat时,指定最后处理好形成的<key1,value1>类型为 <NullWritable,Text>
	 */
	public static class MyselfMemoryInputFormat extends InputFormat<NullWritable, Text>{

		@Override
		public List<InputSplit> getSplits(JobContext context)
				throws IOException, InterruptedException {
			final ArrayList<InputSplit> result = new ArrayList<InputSplit>();
			// splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));  参考FileInputFormat.getSplits的写法,构造出如下
			// 人工构造三个split,这样会产生三个map任务
			result.add(new MemoryInputSplit());
			result.add(new MemoryInputSplit());
			result.add(new MemoryInputSplit());
			
			return result;
		}

		// 指定解析split成<k1,v1>的处理类
		@Override
		public RecordReader<NullWritable, Text> createRecordReader(
				InputSplit split, TaskAttemptContext context)
				throws IOException, InterruptedException {
			return new MemoryRecordReader();
		}

	}
	// 参考FileInputFormat.getSplits的写法, splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
	// 上面是通过读取hdfs获取数据源,下面我们通过产生一些随机数字来模拟产生数据源放在内存中。
	// 需要实现序列化 split需要在节点之间传递
	public static class MemoryInputSplit extends InputSplit implements Writable{
		final int SIZE = 10;
		final ArrayWritable arrayWritable = new ArrayWritable(Text.class);
		
		/**
		 * 先创建一个java数组类型,然后转化为hadoop的数组类型 ,一个split内能获取诸多数据,这里自定义中,我们将随机产生的10个数来模拟split读取到的数据
		 */
		public MemoryInputSplit() {
			Text[] array = new Text[SIZE];
			
			final Random random = new Random();
			for (int i = 0; i < SIZE; i++) {
				final int nextInt = random.nextInt(999999);
				final Text text = new Text("Text"+nextInt);
				array[i] = text;
			}
			
			arrayWritable.set(array);
		}
		
		@Override
		public long getLength() throws IOException, InterruptedException {
			return SIZE;
		}

		@Override
		public String[] getLocations() throws IOException, InterruptedException {
			return new String[] {"localhost"};
		}

		public ArrayWritable getValues() {
			return arrayWritable;
		}

		// 针对 Writable 需要重写的两个方法  将需要传输的数据进行读入和写出
		/**
		 *  参考FileSplit的 write read 方法,是将目标数据的文件路径和文件读写起始位置写出去,实际上就是讲目标地址内的数据写出去 因此我们自定义中用arrayWritable.write(out);
		 *  @Override
  public void write(DataOutput out) throws IOException {
    Text.writeString(out, file.toString());
    out.writeLong(start);
    out.writeLong(length);
  }
		 */
		@Override
		public void write(DataOutput out) throws IOException {
			arrayWritable.write(out);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			arrayWritable.readFields(in);
		}
		
		
	}
	
	public static class MemoryRecordReader extends RecordReader<NullWritable, Text>{
		Writable[] values = null;
		Text value = null;
		int i = 0;
		// 自定义RecordReader中,在初始化后,需要得到对接过来的split的数据, 因此在自定义split中增加自己的方法getValues()
		@Override
		public void initialize(InputSplit split, TaskAttemptContext context)
				throws IOException, InterruptedException {
			MemoryInputSplit inputSplit = (MemoryInputSplit)split;
			ArrayWritable writables = inputSplit.getValues();
			this.values = writables.get();
			this.i = 0;
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			if(i>=values.length) {//values-->[1,2,3,4,5,6,7,8,9,10]那么在recordreader时,产生键值对为<null,1> <null,2>...<null,10>
				return false;
			}
			if(this.value==null) {
				this.value = new Text();
			}
			this.value.set((Text)values[i]);
			i++;
			return true;
		}

		@Override
		public NullWritable getCurrentKey() throws IOException,
				InterruptedException {
			return NullWritable.get();
		}

		@Override
		public Text getCurrentValue() throws IOException, InterruptedException {
			return value;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			return 0;
		}

		@Override
		public void close() throws IOException {
			
		}
		
	}
}

 

 

分享到:
评论

相关推荐

    自定义inputFormat&&outputFormat1

    自定义inputFormat&&outputFormat1

    自定义MapReduce的InputFormat

    5. **配置和使用自定义InputFormat**:在你的MapReduce作业中,通过设置`job.setInputFormatClass()`方法指定你的自定义InputFormat类。同时,如果需要,你还可以在JobConf中添加额外的配置参数来指导InputFormat和...

    hive inputformat

    开发者也可以根据需求自定义InputFormat以适应特定的数据格式。 2. **TextFile InputFormat** 默认情况下,Hive将所有数据视为TextFile格式,这意味着每个行被视为一个记录,每一行由制表符或逗号等分隔符拆分成...

    Hive多字节分隔符解决方案.docx

    本文将介绍Hive多字节分隔符问题的解决方案,包括替换分隔符、RegexSerDe正则加载和自定义InputFormat三种方法。 应用场景 在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如,我们会遇到以下两种情况...

    基于Java的Hadoop HDFS和MapReduce实践案例设计源码

    内容涵盖HDFS的JAVA API操作,如文件读取、写入、删除、元数据查询和文件列表等,以及MapReduce编程模型的多个应用,包括求平均数、Join操作、TopK算法、二次排序,并涉及自定义InputFormat、OutputFormat和shuflle...

    hive数据表-小文件合并代码(java)

    1. **编写自定义InputFormat**:你需要创建一个继承自Hive的`org.apache.hadoop.hive.ql.io.HiveInputFormat`的类。在这个类中,重写`getSplits()`方法,该方法用于决定如何将输入分区为多个工作单元(split)。在...

    MapReduceV2笔记

    自定义InputFormat和OutputFormat可以实现对数据格式的特定需求。 分区是MapReduce中的一个重要概念,它决定map输出的键值对分配给哪个reduce任务。Hadoop提供了默认的分区方法,但用户也可以自定义分区函数以满足...

    hadoop 开发者入门专刊 1-4

    6. Hadoop数据输入与输出:学习如何使用InputFormat和OutputFormat进行数据读取和写入,以及自定义InputFormat和OutputFormat的方法。 7. Hadoop作业提交与监控:了解如何使用Hadoop命令行工具提交作业,以及如何...

    云计算技术实验报告七MapReduce数据统计

    - 自定义InputFormat:可以按需定制键值对的解析规则。 2. Map阶段: - 对于默认的TextInputFormat,需要在map方法中解析value,根据“,”分隔键(被叫号码)和值(通话时间)并转换类型。 - 输出的键值对形式为...

    尚硅谷大数据技术之Hadoop

    2. 自定义InputFormat和OutputFormat:展示如何根据数据格式定制输入输出格式,以适应不同的数据源和需求。 3. 键值对处理:通过自定义Partitioner、Comparator和Reducer,实现更复杂的键值对排序和分区逻辑。 4. ...

    拓思爱诺大数据第五天-mapreduce编程

    例如,自定义InputFormat可以根据数据特点定制切片策略;自定义OutputFormat可以将结果写入各种存储系统。 #### 总结 通过本章的学习,我们了解到MapReduce作为一种分布式计算框架的重要性和其实现原理。MapReduce...

    hadoop 2.5.2 源码

    - 通过阅读源码,开发者可以自定义Hadoop的行为,例如编写自定义InputFormat、OutputFormat或Partitioner。 - 调试工具,如Hadoop的日志系统和JMX监控,可以帮助定位和解决问题。 6. 性能优化 - 通过对源码的...

    MapReduce2.0源码分析与实战编程

    例如,自定义InputFormat处理非标准输入,OutputFormat定义结果存储方式。此外,还需关注作业提交、监控和调试技巧。 7. **Hadoop YARN**:MapReduce2.0引入了YARN(Yet Another Resource Negotiator),作为资源...

    hadoop项目--网站流量日志分析--5.docx

    通过自定义InputFormat和OutputFormat,Sqoop能够适应不同的数据源和目标格式。例如,你可以使用Sqoop将MySQL、Oracle等传统数据库中的数据导入到HDFS或Hive,反之亦然,将Hadoop中的数据导出回关系数据库。 在数据...

    mapreduce_training:用于教学目的的MapReduce应用程序集

    MapReduce自定义InputFormat和RecordReader实现 MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外...

    Hadoop源代码分析

    用户可以通过自定义InputFormat和OutputFormat来处理特定格式的数据。 4. **Mapper与Reducer**:Mapper是用户编写的数据处理逻辑,通常用于过滤、转换数据。Reducer则负责聚合Mapper的输出,进行汇总或计算。在某些...

    Hadoop源码分析

    这可能涉及自定义InputFormat来解析特定格式的日志,编写Map和Reduce函数进行业务逻辑处理,以及利用OutputFormat将结果写入到文件或数据库中。 学习Hadoop源码不仅有助于理解其内部工作原理,还能帮助你在遇到性能...

    MapReduce高阶实现

    - **MapReduce库的扩展**:通过自定义InputFormat、OutputFormat、Partitioner和Comparator,可以适应不同的数据源和格式,以及自定义数据分区和排序策略。 - **多路归并(Multiple Reducers)**:使用多个Reducer...

    《实战Hadoop--开启通向云计算的捷径》源码

    3. **MapReduce编程**:讲解如何编写Map和Reduce函数,包括键值对的处理、分区、排序和归约过程,以及自定义InputFormat和OutputFormat。 4. **Hadoop生态组件**:如HBase(分布式数据库)、Hive(数据仓库工具)、...

Global site tag (gtag.js) - Google Analytics