- 浏览: 149276 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
EclipseEye:
fair_jm 写道不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程 -
fair_jm:
不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程
一、SequenceFileInputFormat及SequenceFileRecordReader
二、SequenceFileAsBinaryInputFormat及SequenceFileAsBinaryRecordReader
三、SequenceFileAsBinaryRecordReader
及SequenceFileAsTextRecordReader
/** An {@link InputFormat} for {@link SequenceFile}s. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> { @Override public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException { return new SequenceFileRecordReader<K,V>(); } @Override protected long getFormatMinSplitSize() { return SequenceFile.SYNC_INTERVAL; } @Override protected List<FileStatus> listStatus(JobContext job )throws IOException { List<FileStatus> files = super.listStatus(job); int len = files.size(); for(int i=0; i < len; ++i) { FileStatus file = files.get(i); if (file.isDirectory()) { // it's a MapFile Path p = file.getPath(); FileSystem fs = p.getFileSystem(job.getConfiguration()); // use the data file files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME))); } } return files; } }
/** An {@link RecordReader} for {@link SequenceFile}s. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> { private SequenceFile.Reader in; private long start; private long end; private boolean more = true; private K key = null; private V value = null; protected Configuration conf; @Override public void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; conf = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); this.in = new SequenceFile.Reader(fs, path, conf); this.end = fileSplit.getStart() + fileSplit.getLength(); if (fileSplit.getStart() > in.getPosition()) { in.sync(fileSplit.getStart()); // sync to start } this.start = in.getPosition(); more = start < end; } @Override @SuppressWarnings("unchecked") public boolean nextKeyValue() throws IOException, InterruptedException { if (!more) { return false; } long pos = in.getPosition(); key = (K) in.next(key); if (key == null || (pos >= end && in.syncSeen())) { more = false; key = null; value = null; } else { value = (V) in.getCurrentValue(value); } return more; } @Override public K getCurrentKey() { return key; } @Override public V getCurrentValue() { return value; } /** * Return the progress within the input split * @return 0.0 to 1.0 of the input byte range */ public float getProgress() throws IOException { if (end == start) { return 0.0f; } else { return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start)); } } public synchronized void close() throws IOException { in.close(); } }
二、SequenceFileAsBinaryInputFormat及SequenceFileAsBinaryRecordReader
/** * InputFormat reading keys, values from SequenceFiles in binary (raw) * format. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileAsBinaryInputFormat extends SequenceFileInputFormat<BytesWritable,BytesWritable> { public SequenceFileAsBinaryInputFormat() { super(); } public RecordReader<BytesWritable,BytesWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { return new SequenceFileAsBinaryRecordReader(); } /** * Read records from a SequenceFile as binary (raw) bytes. */ public static class SequenceFileAsBinaryRecordReader extends RecordReader<BytesWritable,BytesWritable> { private SequenceFile.Reader in; private long start; private long end; private boolean done = false; private DataOutputBuffer buffer = new DataOutputBuffer(); private SequenceFile.ValueBytes vbytes; private BytesWritable key = null; private BytesWritable value = null; public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { Path path = ((FileSplit)split).getPath(); Configuration conf = context.getConfiguration(); FileSystem fs = path.getFileSystem(conf); this.in = new SequenceFile.Reader(fs, path, conf); this.end = ((FileSplit)split).getStart() + split.getLength(); if (((FileSplit)split).getStart() > in.getPosition()) { in.sync(((FileSplit)split).getStart()); // sync to start } this.start = in.getPosition(); vbytes = in.createValueBytes(); done = start >= end; } @Override public BytesWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * Retrieve the name of the key class for this SequenceFile. * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName */ public String getKeyClassName() { return in.getKeyClassName(); } /** * Retrieve the name of the value class for this SequenceFile. * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName */ public String getValueClassName() { return in.getValueClassName(); } /** * Read raw bytes from a SequenceFile. */ public synchronized boolean nextKeyValue() throws IOException, InterruptedException { if (done) { return false; } long pos = in.getPosition(); boolean eof = -1 == in.nextRawKey(buffer); if (!eof) { if (key == null) { key = new BytesWritable(); } if (value == null) { value = new BytesWritable(); } key.set(buffer.getData(), 0, buffer.getLength()); buffer.reset(); in.nextRawValue(vbytes); vbytes.writeUncompressedBytes(buffer); value.set(buffer.getData(), 0, buffer.getLength()); buffer.reset(); } return !(done = (eof || (pos >= end && in.syncSeen()))); } public void close() throws IOException { in.close(); } /** * Return the progress within the input split * @return 0.0 to 1.0 of the input byte range */ public float getProgress() throws IOException, InterruptedException { if (end == start) { return 0.0f; } else { return Math.min(1.0f, (float)((in.getPosition() - start) / (double)(end - start))); } } } }
三、SequenceFileAsBinaryRecordReader
及SequenceFileAsTextRecordReader
/** * This class is similar to SequenceFileInputFormat, except it generates * SequenceFileAsTextRecordReader which converts the input keys and values * to their String forms by calling toString() method. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileAsTextInputFormat extends SequenceFileInputFormat<Text, Text> { public SequenceFileAsTextInputFormat() { super(); } public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { context.setStatus(split.toString()); return new SequenceFileAsTextRecordReader(); } }
/** * This class converts the input keys and values to their String forms by * calling toString() method. This class to SequenceFileAsTextInputFormat * class is as LineRecordReader class to TextInputFormat class. */ @InterfaceAudience.Public @InterfaceStability.Stable public class SequenceFileAsTextRecordReader extends RecordReader<Text, Text> { private final SequenceFileRecordReader<WritableComparable<?>, Writable> sequenceFileRecordReader; private Text key; private Text value; public SequenceFileAsTextRecordReader() throws IOException { sequenceFileRecordReader = new SequenceFileRecordReader<WritableComparable<?>, Writable>(); } public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { sequenceFileRecordReader.initialize(split, context); } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } /** Read key/value pair in a line. */ public synchronized boolean nextKeyValue() throws IOException, InterruptedException { if (!sequenceFileRecordReader.nextKeyValue()) { return false; } if (key == null) { key = new Text(); } if (value == null) { value = new Text(); } key.set(sequenceFileRecordReader.getCurrentKey().toString()); value.set(sequenceFileRecordReader.getCurrentValue().toString()); return true; } public float getProgress() throws IOException, InterruptedException { return sequenceFileRecordReader.getProgress(); } public synchronized void close() throws IOException { sequenceFileRecordReader.close(); } }
发表评论
-
数据迁移相关(关系型数据库mysql,oracle和nosql数据库如hbase)
2015-04-01 15:15 738HBase数据迁移(1) http://www.importn ... -
zookeeper适用场景:如何竞选Master及代码实现
2015-04-01 14:53 796zookeeper适用场景:如何竞选Master及代码实现 h ... -
MR/hive 数据去重
2015-04-01 14:43 739海量数据去重的五大策略 http://www.ciotimes ... -
面试牛x题
2015-03-18 23:50 0hive、mr(各需三道) 1.分别使用Hadoop MapR ... -
使用shell并发上传文件到hdfs
2015-03-16 21:41 1274使用shell并发上传文件到hdfs http://mos19 ... -
hadoop集群监控工具Apache Ambari
2015-03-14 17:27 0Apache Ambari官网 http://ambari.a ... -
Hadoop MapReduce优化相关
2015-03-16 21:46 474[大牛翻译系列]Hadoop 翻译文章索引 http://ww ... -
数据倾斜问题 牛逼(1)数据倾斜之MapReduce&hive
2015-03-16 21:43 805数据倾斜总结 http://www.alidata.org/a ... -
MapReduce牛逼(4)WritableComparable接口
2015-03-12 08:57 608@Public @Stable A Writable whi ... -
MapReduce牛逼(3)(继承WritableComparable)实现自定义key键,实现二重排序
2015-03-12 08:57 650package sort; import jav ... -
MapReduce牛逼(2)MR简单实现 导入数据到hbase例子
2015-03-12 08:57 1285package cmd; /** * MapRe ... -
MapReduce牛逼(1)MR单词计数例子
2015-03-11 00:44 1215package cmd; import org. ... -
InputFormat牛逼(8)FileInputFormat实现类之TextInputFormat
2015-03-11 00:19 583/** An {@link InputFormat} for ... -
InputFormat牛逼(6)org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
2015-03-11 00:11 679@Public @Evolving A RecordRead ... -
InputFormat牛逼(5)org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>
2015-03-10 23:10 605@Public @Stable A InputFormat ... -
InputFormat牛逼(4)org.apache.hadoop.mapreduce.RecordReader<KEYIN, VALUEIN>
2015-03-10 22:50 373@Public @Stable The record rea ... -
InputFormat牛逼(3)org.apache.hadoop.mapreduce.InputFormat<K, V>
2015-03-10 22:46 665@Public @Stable InputFormat d ... -
InputFormat牛逼(2)org.apache.hadoop.mapreduce.InputSplit & DBInputSplit
2015-03-10 22:22 538@Public @Stable InputSplit rep ... -
InputFormat牛逼(1)org.apache.hadoop.mapreduce.lib.db.DBWritable
2015-03-10 22:07 558@Public @Stable Objects that a ... -
如何把hadoop2 的job作业 提交到 yarn平台
2015-01-08 21:09 0aaa萨芬撒点
相关推荐
Hadoop框架本身提供了一些常用的InputFormat实现,如FileInputFormat,适用于普通的文件输入;DBInputFormat,适用于数据库的输入;还有KeyValueTextInputFormat,用于处理以键值对形式组织的文本文件输入。 接下来...
1. **创建一个新的类**:首先,你需要创建一个继承自`org.apache.hadoop.mapreduce.InputFormat`的类。这个类将覆盖父类的方法来实现自定义的输入处理逻辑。 2. **实现`getSplits()`方法**:此方法用于将输入数据...
本文将深入讲解MapReduce的InputFormat,特别是默认的TextInputFormat以及几种常见的实现类。 FileInputFormat是所有输入格式的基类,它定义了如何将HDFS上的文件拆分成多个InputSplit,每个InputSplit进一步由...
这通常涉及到创建两个类:RecordReader和InputFormat。RecordReader负责读取数据文件并按需拆分记录,而InputFormat负责初始化RecordReader并确定数据分片(Splits)。 4. **实例代码:按照空格拆分日志文件** ...
自定义inputFormat&&outputFormat1
Hadoop 代码使用方式 job.setInputFormatClass(SmallFileCombineTextInputFormat.class); 当前Maven提加如下依赖 讲无法从reposity中找到直接jar,需手动编译下载...-Dmapreduce.input.fileinputformat.split.maxsize=10
Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...
3. 记录读取器(RecordReader):RecordReader是InputFormat的实现类,负责读取InputSplit中的每条记录,并将读取的记录交给Map函数来处理。它的主要方法有:next(取得输入分片中的下一个键值对)、createKey(创建...
`FileInputFormat`是InputFormat的一个常见实现,用于处理文件类型的输入数据。 - **Job 提交流程**:客户端首先通过`Job`类提交作业,`waitForCompletion()`方法启动作业并等待其完成。`submit()`方法建立与...
MapReduce提供了多种输入处理类,例如InputFormat、FileInputFormat等。这些类提供了不同的输入处理方式,例如读取文件、读取数据库等。 MapReduce序列化 MapReduce提供了序列化机制,可以将,v>对序列化为byte数组...
- `mapreduce.inputformat.class`:指定输入格式的实现类; - `mapreduce.map.class`:指定Mapper的实现类; - `mapreduce.combine.class`:指定Combiner的实现类(如果有的话); - `mapreduce.reduce.class`:...
目前似乎没有任何可以支持多行 JSON 的 JSON InputFormat 类。 执照 Apache 许可。 用法 要开始,只需: 下载并运行ant 在您的环境中包含dist/lib/json-mapreduce-1.0.jar 使用MultiLineJsonInputFormat类作为您...
MapReduce编程过程是一个继承类与实现接口的过程,这些类与接口来自于Hadoop的Map-Reduce框架,由框架控制其执行流程。编程过程的三个阶段是输入阶段、计算阶段和输出阶段。 在输入阶段,InputFormat文件分割,读取...
InputFormat是MapReduce框架基础类之一,负责读取输入数据,将其分割成小块,称为split。InputFormat的主要任务是将输入数据分割成小块,以便于后续的处理。 Split Split是InputFormat的输出结果,每个split包含...
常见的InputFormat包括`TextInputFormat`、`SequenceFileInputFormat`等。 - **InputSplit**: 表示输入数据的一个逻辑分割。 - **RecordReader**: 负责从InputSplit中读取数据,并将其转换为键值对的形式,即`,v1>`...
- FileInputFormat:所有基于文件的InputFormat的基类,用于设置作业的输入文件位置。 - TextInputFormat:默认的InputFormat,key是行的字节偏移量,value是行内容(不包含行终止符)。 - KeyValueTextInputFormat...
##Couchbase InputFormat 提供什么? 在与 Couchbase Sqoop 连接器搏斗时,发现了一些错误,使其无法与 CDH3 版本正常工作。 从 Couchbase 中提取键/值的实际 InputFormat 存在于 Sqoop 连接器的基于代码中,但对 ...
MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系...InputFormat接口的设计与实现3.3.3 OutputFormat接口的设计与实现3.3.4 Mapper与Reducer解析3.3.5 Partitioner接口的设计与实现3.4 非...
在 Hadoop 中, OutputFormat 的实现类主要有两个:TextInputFormat 和 TextOutputFormat。 TextInputFormat 用于读取文本文件,而 TextOutputFormat 用于将文本数据写入到文件中。但是,在实际应用中,我们可能需要...