- 浏览: 581550 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
maleking:
太感谢了。新手搭建hadoop集群环境,dat ...
启动hadoop后没有datanodes的问题 -
system_mush:
NoClassDefFoundError: com/google/common/collect/Maps -
di1984HIT:
呵呵,我学习一下。
Katta源码分析 -
di1984HIT:
呵呵, 不管怎么说,挺好的。
zookeeper3.3学习笔记2:配置参数介绍 -
zoezhang:
谢谢了,可以解决
maven2报cannot be cast to javax.servlet.Filter错误解决
hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类进行实现的。 那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。 hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。 但大多数情况下,回车键或换行符作为输入文件的行分隔符并不能满足我们的需求,通常用户很有可能会输入回车键、换行符,所以通常我们会定义不可见字符(即用户无法输入的字符)为行分隔符,这种情况下,就需要新写一个InputFormat。 又或者,一条记录的分隔符不是字符,而是字符串,这种情况相对麻烦;还有一种情况,输入文件的主键key已经是排好序的了,需要hadoop做的只是把相同的key作为一个数据块进行逻辑处理,这种情况更麻烦,相当于免去了mapper的过程,直接进去reduce,那么InputFormat的逻辑就相对较为复杂了,但并不是不能实现。 1、改变一条记录的分隔符,不用默认的回车或换行符作为记录分隔符,甚至可以采用字符串作为记录分隔符。 public class FileInputFormatB extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) { } @Override 2)关键在于定义一个新的SearchRecordReader继承RecordReader,支持自定义的行分隔符,即一条记录的分隔符。标红的地方为与hadoop默认的LineRecordReader不同的地方。 public class IsearchRecordReader extends RecordReader<LongWritable, Text> { public IsearchRecordReader(){ public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { this.start = split.getStart(); // open the file and seek to the start of the split this.pos = this.start; public boolean nextKeyValue() throws IOException { if (newSize == 0) { LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize)); if (newSize == 0) { public LongWritable getCurrentKey() { public Text getCurrentValue() { public float getProgress() { public synchronized void close() throws IOException { } 3)重写SearchRecordReader需要的LineReader,可作为SearchRecordReader内部类。特别需要注意的地方就是,读取文件的方式是按指定大小的buffer来读,必定就会遇到一条完整的记录被切成两半,甚至如果分隔符大于1个字符时分隔符也会被切成两半的情况,这种情况一定要加以拼接处理。 public class LineReader { public LineReader(InputStream in, Configuration conf) throws IOException { public void close() throws IOException { public int readLine(Text str, int maxLineLength) throws IOException { public int readLine(Text str) throws IOException { //以下是需要改写的部分_start,核心代码 public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{ bytesConsumed += readLength; } while (!newline && (bytesConsumed < maxBytesToConsume)); if (bytesConsumed > (long)Integer.MAX_VALUE) { //以下是需要改写的部分_end //以下是hadoop-core中LineReader的源码_start public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{ if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed); //以下是hadoop-core中LineReader的源码_end } 2、已经按主键key排好序了,并保证相同主键key一定是在一起的,假设每条记录的第一个字段为主键,那么如果沿用上面的LineReader,需要在核心方法readLine中对前后两条记录的id进行equals判断,如果不同才进行split,如果相同继续下一条记录的判断。代码就不再贴了,但需要注意的地方,依旧是前后两个buffer进行交接的时候,非常有可能一条记录被切成了两半,一半在前一个buffer中,一半在后一个buffer中。 这种方式的好处在于少去了reduce操作,会大大地提高效率,其实mapper的过程相当的快,费时的通常是reduce。 转自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html
1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下:
return new SearchRecordReader("\b");
protected boolean isSplitable(FileSystem fs, Path filename) {
// 输入文件不分片
return false;
}
}
private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
//行分隔符,即一条记录的分隔符
private byte[] separator = {'\b'};
private int sepLength = 1;
}
public IsearchRecordReader(String seps){
this.separator = seps.getBytes();
sepLength = separator.length;
}
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
this.end = (this.start + split.getLength());
Path file = split.getPath();
this.compressionCodecs = new CompressionCodecFactory(job);
CompressionCodec codec = this.compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
this.in = new LineReader(codec.createInputStream(fileIn), job);
this.end = Long.MAX_VALUE;
} else {
if (this.start != 0L) {
skipFirstLine = true;
this.start -= sepLength;
fileIn.seek(this.start);
}
this.in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
if(newSize > 0){
start += newSize;
}
}
}
if (this.key == null) {
this.key = new LongWritable();
}
this.key.set(this.pos);
if (this.value == null) {
this.value = new Text();
}
int newSize = 0;
while (this.pos < this.end) {
newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(
(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));
break;
}
this.pos += newSize;
if (newSize < this.maxLineLength) {
break;
}
}
//读下一个buffer
this.key = null;
this.value = null;
return false;
}
//读同一个buffer的下一个记录
return true;
}
return this.key;
}
return this.value;
}
if (this.start == this.end) {
return 0.0F;
}
return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));
}
if (this.in != null)
this.in.close();
}
//回车键(hadoop默认)
//private static final byte CR = 13;
//换行符(hadoop默认)
//private static final byte LF = 10;
//按buffer进行文件读取
private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private InputStream in;
private byte[] buffer;
private int bufferLength = 0;
private int bufferPosn = 0;
LineReader(InputStream in, int bufferSize) {
this.bufferLength = 0;
this.bufferPosn = 0;
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
}
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
}
in.close();
}
return readLine(str, maxLineLength, Integer.MAX_VALUE);
}
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
str.clear();
Text record = new Text();
int txtLength = 0;
long bytesConsumed = 0L;
boolean newline = false;
int sepPosn = 0;
do {
//已经读到buffer的末尾了,读下一个buffer
if (this.bufferPosn >= this.bufferLength) {
bufferPosn = 0;
bufferLength = in.read(buffer);
//读到文件末尾了,则跳出,进行下一个文件的读取
if (bufferLength <= 0) {
break;
}
}
int startPosn = this.bufferPosn;
for (; bufferPosn < bufferLength; bufferPosn ++) {
//处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
sepPosn = 0;
}
//遇到行分隔符的第一个字符
if (buffer[bufferPosn] == separator[sepPosn]) {
bufferPosn ++;
int i = 0;
//判断接下来的字符是否也是行分隔符中的字符
for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){
//buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
if(bufferPosn + i >= bufferLength){
bufferPosn += i - 1;
break;
}
//一旦其中有一个字符不相同,就判定为不是分隔符
if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
sepPosn = 0;
break;
}
}
//的确遇到了行分隔符
if(sepPosn == sepLength){
bufferPosn += i;
newline = true;
sepPosn = 0;
break;
}
}
}
int readLength = this.bufferPosn - startPosn;
//行分隔符不放入块中
//int appendLength = readLength - newlineLength;
if (readLength > maxLineLength - txtLength) {
readLength = maxLineLength - txtLength;
}
if (readLength > 0) {
record.append(this.buffer, startPosn, readLength);
txtLength += readLength;
//去掉记录的分隔符
if(newline){
str.set(record.getBytes(), 0, record.getLength() - sepLength);
}
}
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int) bytesConsumed;
}
str.clear();
int txtLength = 0;
int newlineLength = 0;
boolean prevCharCR = false;
long bytesConsumed = 0L;
do {
int startPosn = this.bufferPosn;
if (this.bufferPosn >= this.bufferLength) {
startPosn = this.bufferPosn = 0;
if (prevCharCR) bytesConsumed ++;
this.bufferLength = this.in.read(this.buffer);
if (this.bufferLength <= 0) break;
}
for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {
if (this.buffer[this.bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
this.bufferPosn ++;
break;
}
if (prevCharCR) {
newlineLength = 1;
break;
}
prevCharCR = this.buffer[this.bufferPosn] == CR;
}
int readLength = this.bufferPosn - startPosn;
if ((prevCharCR) && (newlineLength == 0))
--readLength;
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(this.buffer, startPosn, appendLength);
txtLength += appendLength; }
}
while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));
return (int)bytesConsumed;
}
发表评论
-
apache hadoop 2
2012-06-14 00:54 1162apache hadoop 2.x 是在1.x版本上做了重 ... -
hadoop乱码
2011-12-12 14:36 2023文件存入hadoop出现乱码,尤其是在windows下的c ... -
Partitioner, SortComparator and GroupingComparator in Hadoop
2011-12-12 14:15 1312hadoop 0.20.2 api里面,作业被重新定义 ... -
HDFS Federation设计动机与基本原理
2011-12-06 10:50 1283HDFS Federation是Hadoop最新发布版本H ... -
Apache Hadoop 0.23 MapReduce 2.0 (MRv2 or YARN) 介绍
2011-12-05 15:27 2697MapReduce 在hadoop 0.23版本中经历了一次大 ... -
Apache Hadoop 0.23 HDFS Federation介绍
2011-12-04 23:31 2845HDFS Federation 为了 ... -
读hadoop0.23源码(1):Job
2011-11-23 10:47 1208每次配置job的时候,最后一步总是 System.ex ... -
MapReduce名词解释
2011-11-08 10:23 1480在网上收集了一些mapreduce中常用的一些名词的解释, ... -
hadoop问题汇总
2011-11-02 09:39 10991.系统时钟。zookeeper会根据系统时钟判断两台机器多久 ... -
进程间通信IPC、LPC、RPC
2011-09-06 11:20 976进程间通信(IPC,I ... -
hadoop的一个恶心错误
2011-09-02 10:17 911今早机器被网管重启了,启动hadoop发现节点都启动不了 s ... -
Hadoop的配置类 Configuration
2011-08-04 14:11 1956Hadoop的配置类是由资源指定 ... -
hadoop错误:"failed to report status for 600 seconds"
2011-07-19 14:39 2683<property> <name ... -
hadoop/mapred 优化方法
2011-07-14 08:30 1154从三个方面着手优化 : 1. hadoop配置 2. ... -
Hadoop传递参数的方法总结
2011-07-07 14:39 3199写MapReduce程序通常要传递各种各样的参数,选择合 ... -
hadoop hdfs的一些用法
2011-07-04 09:25 1455Example 3-1. Displaying files f ... -
Changes of Hadoop 0.20笔记
2011-07-01 13:21 1100最近学习hadoop 0.20.1,网上找到一篇文章《Wh ... -
hadoop0.18.3 到 0.20.2
2011-07-01 13:10 1795以前用的是0.18.3,现在改用0.20.2,結果发现ma ... -
Hadoop开发常用的InputFormat和OutputFormat
2011-07-01 11:02 1513Hadoop中的Map Reduce框架依赖InputFo ... -
hadoop inputformat
2011-07-01 10:09 2303作业的输入 InputFormat 为Map/Red ...
相关推荐
- **自定义类**:用户可以自定义 `InputFormat` 和 `OutputFormat` 类来控制输入输出格式。 #### 四、Hadoop MapReduce 性能调优 - **内存管理**:合理设置 MapReduce 任务的内存大小可以提高整体性能。 - **数据...
2. **资源管理**:开发者可以方便地将源代码、配置文件以及输入数据集组织到HDFS(Hadoop Distributed File System)中。通过插件,可以直接在Eclipse中浏览HDFS文件系统,并上传或下载文件。 3. **运行与调试**:...
- **输入分片信息写入**:通过输入格式化器 (`InputFormat`) 获得分片信息,默认类型为 `FileSplit`,并将这些信息写入 `job.split` 文件。 - **配置信息写入**:将所有任务配置信息写入 `job.xml` 文件。 - **...
-inputformat org.apache.hadoop.mapred.TextInputFormat \ -outputformat org.apache.hadoop.mapred.TextOutputFormat 在本示例中,我们使用了BZip2Codec对数据进行压缩。该方法可以大大减少存储空间。
Map阶段是MapReduce的核心部分,它将输入数据分割成独立的块(通常为文件),然后对每个块执行一个Map函数。Map函数接受键值对作为输入,产生一系列中间键值对。这个过程是并行的,因此可以在多个节点上同时运行,...
《Hadoop MapReduce实战指南——基于<hadoop-map-reduce-demo>项目解析》 在大数据处理领域,Hadoop MapReduce作为核心组件,承担着数据分布式计算的任务。本篇将通过一个名为“hadoop-map-reduce-demo”的示例项目...
Mapper读取`job.split`文件中的输入分片信息,然后调用用户自定义的`map()`函数处理输入数据,并生成中间键值对。 - **Shuffle和Sort阶段**:Mapper生成的中间结果会被分区和排序,以便Reducer可以正确处理。这个...
6. **数据输入与输出**:Hadoop通过InputFormat和OutputFormat定义数据的输入和输出格式。项目中的代码需要配置适当的InputFormat来读取电影数据,如CSV或JSON,同时定义OutputFormat来决定结果如何存储,可能是HDFS...
`map()`函数负责将输入数据分割并转化为键值对,而`reduce()`函数则对这些键值对进行聚合,从而实现数据去重。在这个特定的案例中,`Uniq.java`可能包含了逻辑来识别重复的键,并确保在输出中只保留一个副本。 `...
3. **InputFormat与OutputFormat**:InputFormat负责将原始输入数据分割成RecordReader可以处理的块,而OutputFormat则定义如何将Reducer的输出写入到最终的文件系统。用户可以通过自定义InputFormat和OutputFormat...
Map阶段将输入数据拆分成键值对并进行局部处理,Reduce阶段则负责收集Map阶段的结果,执行聚合操作。这一章将详细解释Map和Reduce函数的编写,以及Shuffle和Sort过程。 第三章“Hadoop分布式文件系统(HDFS)”详细...
通常,这个程序会用Java编写,利用Hadoop的API,如`Job`类和`InputFormat`/`OutputFormat`接口。 总的来说,理解并掌握如何在Windows环境下搭建Hadoop以及使用MapReduce进行数据预处理,对于大数据分析和挖掘具有...
此外,代码中可能还会涉及到Hadoop的相关API,如`InputFormat`、`OutputFormat`和`Partitioner`等接口,它们分别用于定义输入数据的格式、输出数据的格式以及数据分区策略。 MapReduce非常适合处理批处理任务,例如...
Map任务的数量依赖于输入数据的切割(split)数量,压缩文件不可切割,因此Map数等于输入的文件数。非压缩文件及Sequence文件可以被切割。HDFS上,一个文件物理上以一个或多个block存储,一个block对应一个Linux文件...
6. **InputFormat 和 OutputFormat**:`InputFormat`定义如何将输入数据分割为键值对,而`OutputFormat`规定如何将Reduce阶段的结果写回。常见的实现有`TextInputFormat`和`TextOutputFormat`,但可以根据需求自定义...
- 通过阅读源码,开发者可以自定义Hadoop的行为,例如编写自定义InputFormat、OutputFormat或Partitioner。 - 调试工具,如Hadoop的日志系统和JMX监控,可以帮助定位和解决问题。 6. 性能优化 - 通过对源码的...
从高层次角度来看,整个过程就是Hadoop接收输入文件、使用自定义转换(Map-Reduce步骤)获得内容流,以及将输出文件的结果写回磁盘。上个月InfoQ展示了怎样在第一个步骤中,使用InputFormat类来更好地对接收输入文件...
- **Hadoop I/O框架**:提供了一组API和工具,使得开发者可以方便地处理Hadoop中的数据输入和输出操作。 - **MapReduce应用程序开发**:主要包括编写Mapper和Reducer类,以及设置作业参数等步骤。开发者需要关注数据...
对于开发者来说,理解Hadoop的基本原理和编程模型非常重要,包括MapReduce的map和reduce阶段、InputFormat和OutputFormat接口、RecordReader和RecordWriter的概念,以及如何编写自定义分区器和Combiner。此外,学习...
MapReduce计算模型详讲(结合源码深入解读) MapReduce是Hadoop中的一种编程模型,用于处理大规模数据...它通过InputFormat、InputSplit、RecordReader和Map/Reduce阶段来处理输入数据,并将输出结果进行聚合和处理。