`

MapReduce : 新版API 自定义InputFormat 把整个文件作为一条记录处理

 
阅读更多

自定义InputFormat 新版API 把真个文件当成一条输入

主要参考 源代码LineRecordReader里面的内容  有些细节还没有理解

WholeFileInputFormat

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

	@Override
	public RecordReader<Text, BytesWritable> createRecordReader(
			InputSplit arg0, TaskAttemptContext arg1) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return new WholeFileRecordReader();
	}

	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		// TODO Auto-generated method stub
		return false;
	}

}

WholeFileRecordReader

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;


public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

	private FileSplit fileSplit;
	private FSDataInputStream fis;
	
	private Text key = null;
	private BytesWritable value = null;

	private boolean processed = false;

	
	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		//fis.close();
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return this.key;
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return this.value;
	}

	

	@Override
	public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
			throws IOException, InterruptedException {
		
		fileSplit = (FileSplit) inputSplit;
		Configuration job = tacontext.getConfiguration();
		Path file = fileSplit.getPath();
		FileSystem fs = file.getFileSystem(job);
		fis = fs.open(file);
	}

	@Override
	public boolean nextKeyValue() {
		
		if(key == null){
			key = new Text();
		}
		
		if(value == null){
			value = new BytesWritable();
		}
		
		if(!processed){
			byte[] content = new byte[(int) fileSplit.getLength()];
			
			Path file = fileSplit.getPath();
			
			System.out.println(file.getName());
			key.set(file.getName());
			
			try {
				IOUtils.readFully(fis, content, 0, content.length);
				//value.set(content, 0, content.length);
				value.set(new BytesWritable(content));
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally{
				IOUtils.closeStream(fis);
			}
			
			  
			processed = true;
			return true;
		}
		
		return false;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return processed? fileSplit.getLength():0;
	}

	

}

验证
public static class mapper extends Mapper<Text, BytesWritable, Text, Text>{

		@Override
		protected void map(Text key, BytesWritable value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub	
			context.write(key, new Text(value.getBytes()));
		}
	}

note:value是BytesWritable类型,显示为十六进制数, new Text(value.getBytes()) 变成字符串形式
 
0
0
分享到:
评论

相关推荐

    自定义MapReduce的InputFormat

    例如,如果你正在处理一种结构化的日志文件,其中每个事件由开始和结束的特定字符串包围,你可以在RecordReader中寻找这些限定符,然后将限定符之间的内容作为键值对返回。这样,Mapper就可以直接处理事件数据,而...

    MapReduce: Simplified Data Processing on Large Clusters 英文原文

    这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686

    论文:MapReduce: Simplified Data Processing on Large Clusters

    ### MapReduce: 简化的大型集群数据处理 #### 一、引言 《MapReduce: Simplified Data Processing on Large Clusters》这篇论文由Google的研究员Jeffrey Dean和Sanjay Ghemawat撰写,旨在介绍一种名为MapReduce的...

    MapReduce: Simplified Data Processing on Large Clusters中文版

    MapReduce 是一种编程模型,由 Jeffrey Dean 和 Sanjay Ghemawat 于 2004 年提出,用于处理大规模数据集的分布式计算。该模型将计算任务分解成两个主要阶段:Map 和 Reduce。Map 阶段将输入数据处理成中间键值对,而...

    MapReduce: Simplified Data Processing on Large Clusters

    ### MapReduce:简化大型集群上的数据处理 #### 概述 MapReduce是一种编程模型及其相应的实现方式,旨在处理和生成大型数据集。该技术由谷歌的Jeffrey Dean和Sanjay Ghemawat提出,用于解决大规模数据处理的问题。...

    MapReduce: Simplified Data Processing on Large Clusters翻译

    ### MapReduce:简化大型集群上的数据处理 #### 概述 MapReduce是一种高效的数据处理模型,主要用于处理和生成大规模数据集。它通过将数据处理任务分解为“映射(Map)”和“归并(Reduce)”两个阶段,极大地简化...

    mapreduce wc单词计数 自定义分区 自定义排序实现

    在MapReduce框架中,"WordCount"是一个经典的例子,用于演示如何处理大数据并进行简单的统计。这个任务的主要目标是计算文本文件中每个单词出现的次数。在这个特定的案例中,我们不仅实现了基本的WordCount功能,还...

    MapReduce:超大机群上的简单数据处理

    MapReduce是一种编程模型,专为处理和生成大型数据集而设计。它简化了在超大机群中进行数据处理的复杂性,使程序员无需深入掌握并行分布式处理系统的细节就能编写程序。MapReduce的核心思想是将计算过程分为两个主要...

    【MapReduce篇03】MapReduce之InputFormat数据输入1

    在MapReduce编程模型中,InputFormat是至关重要的组件,它负责将存储在HDFS(Hadoop Distributed File System)上的数据转化为可以被MapTask处理的键值对。本文将深入讲解MapReduce的InputFormat,特别是默认的...

    MapReduce2中自定义排序分组

    在大数据处理领域,Apache Hadoop 的 MapReduce 框架是一个关键组件,它为大规模数据集的并行处理提供了高效且可扩展的解决方案。MapReduce2(也称为 YARN)是 Hadoop 2.x 版本引入的重要改进,旨在解决原 MapReduce...

    MapReduce: Simplified Data Processing on Large Clusters.pdf

    MapReduce是Google公司开发的一种编程模型和实现方法,用于处理和生成大规模数据集。该模型允许用户指定一个Map函数,以处理键值对,并生成中间键值对;然后,指定一个Reduce函数,以合并所有与同一个中间键关联的...

    MapReduce:Nkeys,Nfiles终极解决方案.docx

    MapReduce是一种分布式编程模型,由Google开发,用于处理和生成大型数据集。它将复杂的并行数据处理任务分解为两个阶段:Map(映射)和Reduce(化简)。在这个Nkeys,Nfiles的终极解决方案中,主要关注的是MapReduce...

    MapReduce模型--自定义数据类型

    MapReduce模型是Hadoop框架的核心组件之一,它用于分布式地处理大量数据。MapReduce的主要思想是将复杂的、运行在一个分布式系统上的大规模数据处理任务分解成两个阶段:Map(映射)阶段和Reduce(归约)阶段。...

    mapreduce 自定义分隔符源码

    默认情况下,Hadoop的`LineRecordReader`类将每一行作为一个记录进行处理,而对行内数据的分隔则需要用户自定义。在处理格式复杂、分隔符不固定的日志文件时,我们需要对`LineRecordReader`进行扩展,以实现自定义...

    Hadoop源码解析---MapReduce之InputFormat

    在Hadoop的生态系统中,MapReduce是处理海量数据的一种编程模型,而InputFormat作为MapReduce编程模型的重要组成部分,是负责处理输入数据的关键接口。为了深入理解MapReduce工作原理,必须掌握InputFormat的设计和...

    MapReduce编程实例:单词计数

    假设文件的量比较大,每个文档又包含大量的单词,则无法使用传统的线性程序进行处理,而这类问题正是 MapReduce 可以发挥优势的地方。 在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词...

    MapReduce2.0程序设计多语言编程(理论+实践)

    MapReduce 2.0是Hadoop生态系统中的一个关键组件,用于大规模数据处理。它通过将复杂的计算任务分解为可并行执行的Map和Reduce阶段,实现了高效的数据处理能力。在这个框架下,程序员可以使用多种语言编写应用程序,...

    MapReduce_ Simplified Data Processing on Large Clusters.pdf

    MapReduce的翻译,我只是个搬运工qwq

    mapreduce解析网络日志文件(或从mysql数据库获取记录)并计算相邻日志记录间隔时长

    在大数据处理领域,MapReduce是一种广泛使用的编程模型,尤其适合处理和存储海量数据。本话题主要探讨如何利用MapReduce解析网络日志文件,或者从MySQL数据库中获取记录,并计算相邻日志记录之间的间隔时长。这涉及...

Global site tag (gtag.js) - Google Analytics