`

hadoop 自定义inputformat和outputformat

阅读更多

 

hadoop的inputformat和outputformat

 

最好的例子vertica :虽然是在pig中实现的udf,但是就是hadoop的inputformat和outputformat,在hive里也可以照用,贴个下载的地址:http://blackproof.iteye.com/blog/1791995

 

再贴一个项目中,在实现hadoop join时,用的inputformat和outputformat的简单实例:

hadoop join在http://blackproof.iteye.com/blog/1757530

   自定义inputformat(泛型是maper的input)

public class MyInputFormat extends FileInputFormat<MultiKey,Employee> {
	
	public MyInputFormat(){}

	@Override
	public RecordReader<MultiKey, Employee> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return new MyRecordReader();
	}
	
	public static class MyRecordReader extends RecordReader<MultiKey, Employee>{

		public LineReader in;
		public MultiKey key;
		public Employee value;
		public StringTokenizer token = null;
		
		public Text line;
		
		@Override
		public void initialize(InputSplit split, TaskAttemptContext context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			FileSplit fileSplit = (FileSplit)split;
			Configuration job = context.getConfiguration();
			Path file = fileSplit.getPath();
			FileSystem fs = file.getFileSystem(job);
			
			FSDataInputStream filein = fs.open(file);
			in = new LineReader(filein, job);
			
			key = new MultiKey();
			value = new Employee();
			line = new Text();
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {

			int linesize = in.readLine(line);
			if(linesize==0)
				return false;
			String[] pieces = line.toString().split(",");
			int i = Integer.valueOf(pieces[0]);
			switch (i) {
			case 1:
				value.setEmpName(pieces[1]);
				value.setFlag(1);
				break;

			default:
				value.setDepartName(pieces[1]);
				value.setFlag(2);
				break;
			}
			value.setDepartId(pieces[2]);
			value.setDepartNo(pieces[3]);
			
			key.setDepartId(value.getDepartId());
			key.setDepartNo(value.getDepartNo());
			return true;
		}

		@Override
		public MultiKey getCurrentKey() throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			return key;
		}

		@Override
		public Employee getCurrentValue() throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			return value;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public void close() throws IOException {
			// TODO Auto-generated method stub
			
		}
		
	}

}

 

 

   自定义outputformat(泛型是reduce的输出)

public class MyOutputFormat extends FileOutputFormat<Text, Employee> {

	@Override
	public RecordWriter<Text, Employee> getRecordWriter(
			TaskAttemptContext job) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = job.getConfiguration();
		Path file = getDefaultWorkFile(job, "");
		FileSystem fs = file.getFileSystem(conf);
		FSDataOutputStream fileOut = fs.create(file, false);
		return new MyRecordWriter(fileOut);
	}
	
	public static class MyRecordWriter extends RecordWriter<Text, Employee>{

		protected DataOutputStream out;
		private final byte[] keyValueSeparator;
		 public static final String NEW_LINE = System.getProperty("line.separator");
		
		public MyRecordWriter(DataOutputStream out){
			this(out,":");
		}
		
		public MyRecordWriter(DataOutputStream out,String keyValueSeparator){
			this.out = out;
			this.keyValueSeparator = keyValueSeparator.getBytes();
		}
		
		@Override
		public void write(Text key, Employee value) throws IOException,
				InterruptedException {
			if(key!=null){
				out.write(key.toString().getBytes());
				out.write(keyValueSeparator);
			}
			out.write(value.toString().getBytes());
			out.write(NEW_LINE.getBytes());
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			out.close();
		}
		
	}

}

 

分享到:
评论

相关推荐

    自定义inputFormat&&outputFormat1

    自定义inputFormat&&outputFormat1

    基于Java的Hadoop HDFS和MapReduce实践案例设计源码

    内容涵盖HDFS的JAVA API操作,如文件读取、写入、删除、元数据查询和文件列表等,以及MapReduce编程模型的多个应用,包括求平均数、Join操作、TopK算法、二次排序,并涉及自定义InputFormat、OutputFormat和shuflle...

    hadoop-lzo-master.zip

    Hadoop-LZO不仅提供了对LZO压缩格式的支持,还包含了用于Hadoop的InputFormat和OutputFormat,使得Hadoop可以直接处理LZO压缩的数据。 二、LZO压缩算法 LZO(Lempel-Ziv-Oberhumer)是一种实时数据压缩算法,它...

    hadoop 开发者入门专刊 1-4

    6. Hadoop数据输入与输出:学习如何使用InputFormat和OutputFormat进行数据读取和写入,以及自定义InputFormat和OutputFormat的方法。 7. Hadoop作业提交与监控:了解如何使用Hadoop命令行工具提交作业,以及如何...

    java 中自定义OutputFormat的实例详解

    在 Hadoop 中, OutputFormat 的实现类主要有两个:TextInputFormat 和 TextOutputFormat。 TextInputFormat 用于读取文本文件,而 TextOutputFormat 用于将文本数据写入到文件中。但是,在实际应用中,我们可能需要...

    hadoop2lib.tar.gz

    例如,使用Hadoop的InputFormat和OutputFormat接口,开发者可以定义自定义的数据输入和输出格式。同时,Hadoop的Configuration类使得配置参数变得简单,而FileSystem API则允许开发者操作HDFS上的文件。 在实际开发...

    Hadoop源代码分析

    用户可以通过自定义InputFormat和OutputFormat来处理特定格式的数据。 4. **Mapper与Reducer**:Mapper是用户编写的数据处理逻辑,通常用于过滤、转换数据。Reducer则负责聚合Mapper的输出,进行汇总或计算。在某些...

    实战hadoop中的源码

    7. **扩展性与插件开发**:学习如何为Hadoop开发自定义InputFormat、OutputFormat、Partitioner、Combiner等组件。 8. **实战项目**:结合实际案例,运用所学知识解决大数据处理问题,如日志分析、推荐系统等。 ...

    Hadoop技术文档

    此外,还需要处理输入输出格式,如自定义InputFormat和OutputFormat。 5. **Hadoop核心API**: Hadoop提供了一套丰富的API,包括FileSystem API用于文件操作,InputFormat和OutputFormat接口用于定制数据读写,...

    hadoop-2.7.3源码和安装包.zip

    5. **数据输入与输出**:学习如何使用Hadoop的InputFormat和OutputFormat接口自定义数据格式,以及如何使用`hadoop fs`命令操作HDFS。 6. **应用程序开发**:掌握如何编写MapReduce程序,理解Mapper和Reducer的工作...

    Hadoop高级编程- 构建与实现大数据解决方案

    6. **数据输入和输出格式**:学习自定义InputFormat和OutputFormat,以处理非标准格式的数据,如CSV、JSON或其他定制格式。 7. **错误处理和容错机制**:理解Hadoop的检查点、故障检测和恢复策略,以及如何在代码中...

    mapreduce在hadoop实现词统计和列式统计

    此外,Hadoop支持自定义InputFormat和OutputFormat,以适应不同格式的数据源和结果输出需求。 总结,MapReduce通过分布式计算能力,使得在Hadoop平台上处理大规模数据变得更加高效和便捷。无论是简单的词统计还是...

    尚硅谷大数据技术之Hadoop

    2. 自定义InputFormat和OutputFormat:展示如何根据数据格式定制输入输出格式,以适应不同的数据源和需求。 3. 键值对处理:通过自定义Partitioner、Comparator和Reducer,实现更复杂的键值对排序和分区逻辑。 4. ...

    Hadoop Real-World Solutions Cookbook 源代码

    2. **Chap 2 - 数据输入与输出**:这章可能包含如何使用Hadoop的InputFormat和OutputFormat类来定义数据的读取和写入方式。读者可以学习如何自定义输入分片(Splits)和Mapper/Reducer任务。 3. **Chap 3 - ...

    hadoop2.7.3源码包,hadoop2.7.3zip源码包

    同时,源码包也方便了开发者进行扩展和优化,例如自定义InputFormat、OutputFormat、Partitioner、Reducer等,以适应特定的业务需求。 此外,由于这个源码包是基于Maven结构生成的,所以它应该包含了所有依赖项的...

    hadoop 2.5.2 源码

    - 通过阅读源码,开发者可以自定义Hadoop的行为,例如编写自定义InputFormat、OutputFormat或Partitioner。 - 调试工具,如Hadoop的日志系统和JMX监控,可以帮助定位和解决问题。 6. 性能优化 - 通过对源码的...

    Hadoop源代码分析(一)

    7. **扩展性与插件机制**:Hadoop允许用户自定义InputFormat、OutputFormat、Partitioner和Reducer等,源码分析可以帮助我们理解这些接口的实现,以及如何为特定需求定制Hadoop组件。 8. **Hadoop与其他组件的集成*...

    Hadoop权威指南(2)

    这一章会讨论Hadoop的InputFormat和OutputFormat接口,它们是定制数据源和目标的关键。同时,还会介绍RecordReader和RecordWriter的概念,以及如何使用各种分隔符和编码格式。 第五章“MapReduce应用开发”将引导...

    elasticsearch-hadoop-5.2.1

    9. **RESTful接口**:除了传统的MapReduce和Spark支持,Elasticsearch-Hadoop还支持通过Hadoop的InputFormat和OutputFormat使用Hadoop的通用工具(如Hive和Pig)来与Elasticsearch交互,这些工具可以直接利用Elastic...

    Hadoop源码分析 第一章 Hadoop脚本

    然后逐步深入源码,结合实际案例分析,例如研究如何自定义InputFormat、OutputFormat、Mapper和Reducer等组件。此外,熟悉Java编程语言和面向对象设计是必不可少的,因为Hadoop主要用Java实现。 总之,Hadoop脚本的...

Global site tag (gtag.js) - Google Analytics