Hadoop被设计用来处理海量数据,这种数据可以是结构化的,半结构化的,甚至是一些无结构化的文本数据(这些数据可能存储在HDFS文件中,也可能存放在DB中)。它处理数据的核心就是map-reduce模型,但是,无论是map还是reduce,它们的输入输出数据都是key-value对的形式,这种key-value对的形式我们可以看做是结构化的数据。同时,对于reduce的输入,当然就是map的输出,而reduce、map的输出又直接可以在map和reduce处理函数中定义,那么这就只剩下map的输入了,也就是说,Hadoop如何把输入文件包装成key-value对的形式交给map来处理,同时hadoop又是如何切割作业的输入文件来结果不同的TaskTracker同时来处理的呢?这两个问题就是本文将要重点讲述的内容——作业的输入文件格式化器(InputFormat)。
在Hadoop对Map-Reduce实现设计中,作业的输入文件格式化器包括两个组件:文件读取器(RecordReader)和文件切割器(Spliter)。其中,文件切割器用来对作业的所有输入数据进行分片切割,最后有多少个切片就有多少个map任务,文件读取器用来读取切片中的数据,并按照一定的格式把读取的数据包装成一个个key-value对。而在具体的对应实现中这个输入文件格式化器被定义了一个抽先类,这样它把如何切割输入数据以及如何读取数据并把数据包装成key-value对交给了用户来实现,因为只有用户才知道输入的数据是如何组织的,map函数需要什么样的key-value值作为输入值。这个输入文件格式化器对应的是org.apache.hadoop.mapreduce.InputFormat类:
public abstract class InputFormat<K, V> { /** * Logically split the set of input files for the job. */ public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; /** * Create a record reader for a given split. The framework will call */ public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; }
显然,在InputFormat类中,getSplits()方法是让用户定义如何对作业的输入数据进行切割分的,createRecordReader方法是定义如何读取输入数据,并包装成一个若干个key-value对的,即定义一个记录读取器。另外,对于一个输入数据切片信息(数据的长度、数据保存在哪些DataNode节点上)被保存在一个对应的InputSplit对象中。顺带需要提一下的是,JobClient在调用InputFormat的getSplits()方法时,对返回的InputSplit数组又使用JobClient.RawSplit进行了一次封装,并将其序列化到文件中。下面就来看看hadoop在其内部有哪些相关的默认实现的。

从上面的类图可以看出,Hadoop在抽象类FileInputFormat中实现了一个基于文件的数据分片切割器,所以在这里我先主要谈谈它是如何实现的。先来看源码:
protected long getFormatMinSplitSize() { return 1; } public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong("mapred.min.split.size", 1L); } public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong("mapred.max.split.size", Long.MAX_VALUE); } protected long computeSplitSize(long blockSize, long minSize,long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } public List<InputSplit> getSplits(JobContext job) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//计算允许的最小切片大小 long maxSize = getMaxSplitSize(job);//计算允许的最大切片大小 // generate splits LOG.debug("start to split all input files for Job["+job.getJobName()+"]"); List<InputSplit> splits = new ArrayList<InputSplit>(); for (FileStatus file: listStatus(job)) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job.getConfiguration()); long length = file.getLen(); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize);//计算该输入文件一个切片最终大小 long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkLocations.length-1].getHosts())); } } else if (length != 0) { splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); } else { //Create empty hosts array for zero length files splits.add(new FileSplit(path, 0, length, new String[0])); } } LOG.debug("Total # of splits in Job["+job.getJobName()+"]'s input files: " + splits.size()); return splits; } /*是否允许对一个文件进行切片*/ protected boolean isSplitable(JobContext context, Path filename) { return true; }
上面的输入数据切割器是支持多输入文件的,而且还要着重注意的是这个输入数据切割器是如何计算一个数据切片大小的,因为在很多情况下,切片的大小对一个作业的执行性能有着至关重要的影响,应为至少切片的数量决定了map任务的数量。试想一下,如果3个数据块被切成两个数据片和被切成三个数据块,哪一种情况下耗费的网络I/O时间要多一些呢?在作业没有配置数据切割器的情况下,默认的是TextInputFormat,对应的配置文件的设置项为:mapreduce.inputformat.class。
最后,以LineRecordReader为例来简单的讲解一下记录读取器的实现,这个记录读取器是按文本文件中的行来读取数据的,它的key-value中为:行号一行文本。
public class LineRecordReader extends RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(LineRecordReader.class); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); Path _inputFile = split.getPath(); FSDataInputStream fileIn = fs.open(_inputFile); boolean skipFirstLine = false; if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } // line too long. try again LOG.debug("Skipped this line because the line is too long: lineLength["+newSize+"]>maxLineLength["+maxLineLength+"] at position[" + (pos - newSize)+"]."); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } public synchronized void close() throws IOException { if (in != null) { in.close(); } } }
在记录读取器中,getProgress()被用来报告当前读取输入文件的进度,因为Hadoop为客户端查看当前作业执行进度的API。另外,由于LineRecordReader是按照行来读取的,由于切割器的分割,可能使得某一行在两个数据片中,所以在初始化的时候有一个是否跳过第一行的操作。
相关推荐
然而,对于特定的数据格式或需求,可能需要自定义InputFormat以更高效或精确地解析输入数据。 自定义InputFormat的步骤通常包括以下几个部分: 1. **创建一个新的类**:首先,你需要创建一个继承自`org.apache....
1. 输入数据类型:Map阶段所处理的原始数据类型,通常为文本文件或其他可序列化的数据格式。 2. Map输出类型:Map阶段处理后输出的键值对类型。 3. Reduce输入类型:Reduce阶段处理的键值对类型,必须与Map输出类型...
- **输入分片信息写入**:通过输入格式化器 (`InputFormat`) 获得分片信息,默认类型为 `FileSplit`,并将这些信息写入 `job.split` 文件。 - **配置信息写入**:将所有任务配置信息写入 `job.xml` 文件。 - **...
- InputFormat和OutputFormat: 分别定义输入数据和输出数据的格式,例如TextInputFormat和TextOutputFormat分别用于处理文本数据。 - Partitioner: 控制数据如何被分配到不同的Reducer,默认是HashPartitioner,可以...
- 配置Job:设置输入输出路径,指定InputFormat和OutputFormat。 - 定义Mapper和Reducer:Mapper负责转换输入数据,Reducer则聚合Mapper的输出。 - 设置Job参数:如是否启用压缩,压缩算法等。 - 提交Job并等待...
- `InputFormat`和`OutputFormat`:定义输入数据如何被分割成键值对供Mapper处理,以及Reducer处理后的结果如何写回。例如,`TextInputFormat`用于读取文本文件,`TextOutputFormat`将结果写回为文本格式。 - `Job...
- `InputFormat`和`OutputFormat`接口:定义输入数据的分割方式和输出数据的格式,有多种预定义实现,如`TextInputFormat`和`TextOutputFormat`。 - `Partitioner`接口:用于控制Reduce任务的数量和数据分配,确保...
5. **格式化NameNode**:首次启动Hadoop时,需要对NameNode进行格式化,这会初始化HDFS的元数据。 6. **启动Hadoop**:运行`start-all.cmd`命令启动所有Hadoop守护进程,包括DataNodes和NameNodes。 7. **测试...
3. **InputFormat**:定义了数据的输入格式,例如如何将原始天气源数据分割成适合Mapper处理的记录。可能使用`TextInputFormat`或自定义的输入格式,如`WeatherDataInputFormat`,以便正确解析天气日志。 4. **...
- 输入格式类(InputFormat)负责将输入数据分割成一系列记录,这些记录再进一步划分为可处理的块(Split)。Split的大小可以根据集群配置进行调整,通常与HDFS的Block大小相匹配。 6. **RecordReader**: - ...
OutputFormat 负责将最终结果格式化并输出。 - **示例**:在这个例子中,由于输入输出都是简单的文本文件,所以没有显式定义 InputFormat 和 OutputFormat,默认使用 TextInputFormat 和 TextOutputFormat。 4. **...
InputFormat定义输入数据的分块方式,OutputFormat定义输出格式。 - YARN集成:配置和提交MapReduce作业到YARN,管理应用程序生命周期,如监控作业状态和资源使用情况。 - 配置与优化:通过Configuration类设置...
InputFormat负责定义数据的输入方式和解析方式,而OutputFormat定义了输出数据的存储格式和输出方式。自定义InputFormat和OutputFormat可以实现对数据格式的特定需求。 分区是MapReduce中的一个重要概念,它决定map...
- **InputFormat**: 负责定义如何将输入数据切分成逻辑片段(InputSplit),以及如何读取这些片段。常见的InputFormat包括`TextInputFormat`、`SequenceFileInputFormat`等。 - **InputSplit**: 表示输入数据的一个...
此外,还有Partitioner用于控制数据分发,Combiner用于本地聚合,以及InputFormat和OutputFormat用于定义输入和输出格式。 3. **Java编程**:Java是MapReduce的原生语言,使用Hadoop的API可以直接创建MapReduce程序...
- **MapReduce Job**:用户可以编写MapReduce程序来处理HBase中的数据。 - **InputFormat**:定义如何从HBase中读取数据并将其转换为键值对。 - **OutputFormat**:定义如何将处理结果写回到HBase。 #### 五、HBase...
InputFormat 用于描述输入数据的格式,常用的为 TextInputFormat,提供数据切分和为 Mapper 提供数据的功能。OutputFormat 用于描述输出数据的格式,可以将用户提供的 key/value 对写入特定格式的文件中。Mapper/...
- `InputFormat`和`OutputFormat`:定义输入数据的格式和输出结果的格式,例如,`TextInputFormat`和`TextOutputFormat`分别用于处理文本数据的输入和输出。 - `RecordReader`和`RecordWriter`:读取输入数据和...
例如,`MapReduceBase`是许多关键组件的基类,`JobConf`用于配置作业,`InputFormat`和`OutputFormat`定义输入和输出格式,而`Mapper`和`Reducer`则是用户处理数据的主要接口。 四、工具支持 为了方便开发和调试...
Hadoop集群的搭建过程包括安装Java环境、配置Hadoop环境变量、修改配置文件(如core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml等)、格式化NameNode、启动服务等步骤。集群搭建的关键在于合理分配...