- 浏览: 51845 次
-
文章分类
最新评论
[hadoop2.7.1]I/O之“泥坯块”SequenceFile前序知识
概述
SequenceFile由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。可以看做是一个容器,它将这些小文件组织起来统一存储,就像建筑用的“泥坯块”一样,方方正正,大小随定,很好用。
为什么要这么做呢?
首先要了解,在这里小文件是指文件size比HDFS上block size(hadoop中默认为64M)小的文件,可能会小得多。下面从处理性能和存储能力两个方面分别进行解析。
处理性能:
HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。
存储能力:
在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,一个java对象在未开启压缩的情况下占用空间约为152bytes,
(参考:http://www.cnblogs.com/magialmoon/p/3757767.html )如果有大量的小文件,光对象占用的内存就是一个特别庞大的数字,这样的话,有多大的内存也不够用。
其实,除了使用SequenceFile之外,hadoop对于小文件问题也提供了另外两个解决方案:Hadoop Archive和CombineFileInputFormat。
为了更直观清晰的了解SequenceFile的使用,下面先介绍下hadoop的HDFS的读写过程。
这里增加点基本知识:NameNode和DataNode
NameNode
NameNode管理文件系统的命名空间。它维护着文件系统树及整棵树内所有的文件和目录。这些信息以两个文件形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件。NameNode也记录着每个文件中各个块所在的数据节点信息,但它并不永久保存块的位置信息,因为这些信息在系统启动时由数据节点重建。
DataNode
DataNode顾名思义,这里存储集群的数据(即具体的文件内容)。DataNode 响应来自 HDFS 客户机的读写请求。它们还响应来自 NameNode 的创建、删除和复制块的命令。NameNode 依赖来自每个 DataNode 的定期心跳(heartbeat)消息。每条消息都包含一个块报告,NameNode 可以根据这个报告验证块映射和其他文件系统元数据。如果 DataNode 不能发送心跳消息,NameNode 将采取修复措施,重新复制在该节点上丢失的块。
总之,一句话,一个NameNode(管理者)管理着多个DataNode(工作者)。
下面来看hadoop的HDFS的读写过程
读:
第一步:客户端询问NameNode它应该从哪里读取文件,得到这个block所在的datanode的列表,有几个副本数该列表就有几个datanode。
第二步:根据列表中datanode距离读取端的距离进行从小到大的排序:
1、首先查找本地是否存在该block的副本,如果存在,则将本地datanode作为第一个读取该block的datanode
2、然后查找本地的同一个rack下是否有保存了该block副本的datanode
3、最后如果都没有找到,或者读取数据的node本身不是datanode节点,则返回datanode列表的一个随机顺序。
4、NameNode发送数据块的信息给客户端。数据块信息包含了保存着文件副本的DataNode的IP地址,以及DataNode在本地硬盘查找数据块所需要的数据块ID。
第三步:客户端检查数据块信息,联系相关的DataNode,请求数据块。
第四步:DataNode返回文件内容给客户端,然后关闭连接,完成读操作。
第二步:根据列表中datanode距离读取端的距离进行从小到大的排序:
1、首先查找本地是否存在该block的副本,如果存在,则将本地datanode作为第一个读取该block的datanode
2、然后查找本地的同一个rack下是否有保存了该block副本的datanode
3、最后如果都没有找到,或者读取数据的node本身不是datanode节点,则返回datanode列表的一个随机顺序。
4、NameNode发送数据块的信息给客户端。数据块信息包含了保存着文件副本的DataNode的IP地址,以及DataNode在本地硬盘查找数据块所需要的数据块ID。
第三步:客户端检查数据块信息,联系相关的DataNode,请求数据块。
第四步:DataNode返回文件内容给客户端,然后关闭连接,完成读操作。
可以认为一个文件是由存储在DataNode上的数据块组成的,客户端并行从不同的DataNode中获取一个文件的数据块,然后联结这些数据块,拼成完整的文件。
第一步:client端发送写文件请求,namenode检查文件是否存在,如果已存在,直接返回错误信息,否则,发送给client一些可用datanode节点
第二步:client将文件分块,并行存储到不同节点上datanode上,在这里要注意,得到datanode的列表以后,从namenode返回该列表到DFSClient之前,会在namenode端首先根据该写入客户端跟 datanode列表中每个datanode之间的“距离”由近到远进行一个排序。如果此时DFS写入端不是datanode,则选择datanode列表中的第一个排在第一位。客户端根据这个顺序由近到远的进行数据块的写入。发送完成后,client同时发送信息给namenode和datanode。hadoop有自己的一套距离计算方法。
第三步: namenode收到的client信息后,发送确信信息给datanode
第四步: datanode同时收到namenode和datanode的确认信息后,提交写操作。。
由此来看,namenode在选择数据块的写入datanode列表时,就充分考虑到了将block副本分散在不同机架下,并同时尽量的避免了之前描述的过多的网络开销。
在这种高性能并行处理机制下,SequenceFile起着无比重要的“合并”小文件的作用,SequenceFile类中定义了
为此SequenceFile定义了一个静态枚举来标识:
由此分别定义了3个类Writer类,
只针对每条记录的value值进行压缩。
key/value分别压缩成一块(block),block大小可设置。
写:
第一步:client端发送写文件请求,namenode检查文件是否存在,如果已存在,直接返回错误信息,否则,发送给client一些可用datanode节点
第二步:client将文件分块,并行存储到不同节点上datanode上,在这里要注意,得到datanode的列表以后,从namenode返回该列表到DFSClient之前,会在namenode端首先根据该写入客户端跟 datanode列表中每个datanode之间的“距离”由近到远进行一个排序。如果此时DFS写入端不是datanode,则选择datanode列表中的第一个排在第一位。客户端根据这个顺序由近到远的进行数据块的写入。发送完成后,client同时发送信息给namenode和datanode。hadoop有自己的一套距离计算方法。
第三步: namenode收到的client信息后,发送确信信息给datanode
第四步: datanode同时收到namenode和datanode的确认信息后,提交写操作。。
由此来看,namenode在选择数据块的写入datanode列表时,就充分考虑到了将block副本分散在不同机架下,并同时尽量的避免了之前描述的过多的网络开销。
在这种高性能并行处理机制下,SequenceFile起着无比重要的“合并”小文件的作用,SequenceFile类中定义了
SequenceFile.Writer
,SequenceFile.Reader
和SequenceFile.Sorter类分别进行读、写、排序操作。
有三种SequenceFile Writers来压缩SequenceFile的key/value:为此SequenceFile定义了一个静态枚举来标识:
/** * The compression type used to compress key/value pairs in the * {@link SequenceFile}. * * @see SequenceFile.Writer */ public static enum CompressionType { /** Do not compress records. */ NONE, /** Compress values only, each separately. */ RECORD, /** Compress sequences of records together in blocks. */ BLOCK }
由此分别定义了3个类Writer类,RecordCompressWriter
类,BlockCompressWriter
类:
Writer类 : 记录未进行压缩:
/** Write key/value pairs to a sequence-format file. */ public static class Writer implements java.io.Closeable, Syncable { private Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; DataOutputBuffer buffer = new DataOutputBuffer(); Class keyClass; Class valClass; private final CompressionType compress; CompressionCodec codec = null; CompressionOutputStream deflateFilter = null; DataOutputStream deflateOut = null; Metadata metadata = null; Compressor compressor = null; protected Serializer keySerializer; protected Serializer uncompressedValSerializer; protected Serializer compressedValSerializer; // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record // starts and ends by scanning for this value. long lastSyncPos; // position of last sync byte[] sync; // 16 random bytes { try { MessageDigest digester = MessageDigest.getInstance("MD5"); long time = Time.now(); digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8)); sync = digester.digest(); } catch (Exception e) { throw new RuntimeException(e); } } public static interface Option {} static class FileOption extends Options.PathOption implements Option { FileOption(Path path) { super(path); } } /** * @deprecated only used for backwards-compatibility in the createWriter methods * that take FileSystem. */ @Deprecated private static class FileSystemOption implements Option { private final FileSystem value; protected FileSystemOption(FileSystem value) { this.value = value; } public FileSystem getValue() { return value; } } static class StreamOption extends Options.FSDataOutputStreamOption implements Option { StreamOption(FSDataOutputStream stream) { super(stream); } } static class BufferSizeOption extends Options.IntegerOption implements Option { BufferSizeOption(int value) { super(value); } } static class BlockSizeOption extends Options.LongOption implements Option { BlockSizeOption(long value) { super(value); } } static class ReplicationOption extends Options.IntegerOption implements Option { ReplicationOption(int value) { super(value); } } static class KeyClassOption extends Options.ClassOption implements Option { KeyClassOption(Class<?> value) { super(value); } } static class ValueClassOption extends Options.ClassOption implements Option { ValueClassOption(Class<?> value) { super(value); } } static class MetadataOption implements Option { private final Metadata value; MetadataOption(Metadata value) { this.value = value; } Metadata getValue() { return value; } } static class ProgressableOption extends Options.ProgressableOption implements Option { ProgressableOption(Progressable value) { super(value); } } private static class CompressionOption implements Option { private final CompressionType value; private final CompressionCodec codec; CompressionOption(CompressionType value) { this(value, null); } CompressionOption(CompressionType value, CompressionCodec codec) { this.value = value; this.codec = (CompressionType.NONE != value && null == codec) ? new DefaultCodec() : codec; } CompressionType getValue() { return value; } CompressionCodec getCodec() { return codec; } } public static Option file(Path value) { return new FileOption(value); } /** * @deprecated only used for backwards-compatibility in the createWriter methods * that take FileSystem. */ @Deprecated private static Option filesystem(FileSystem fs) { return new SequenceFile.Writer.FileSystemOption(fs); } public static Option bufferSize(int value) { return new BufferSizeOption(value); } public static Option stream(FSDataOutputStream value) { return new StreamOption(value); } public static Option replication(short value) { return new ReplicationOption(value); } public static Option blockSize(long value) { return new BlockSizeOption(value); } public static Option progressable(Progressable value) { return new ProgressableOption(value); } public static Option keyClass(Class<?> value) { return new KeyClassOption(value); } public static Option valueClass(Class<?> value) { return new ValueClassOption(value); } public static Option metadata(Metadata value) { return new MetadataOption(value); } public static Option compression(CompressionType value) { return new CompressionOption(value); } public static Option compression(CompressionType value, CompressionCodec codec) { return new CompressionOption(value, codec); } /** * Construct a uncompressed writer from a set of options. * @param conf the configuration to use * @param options the options used when creating the writer * @throws IOException if it fails */ Writer(Configuration conf, Option... opts) throws IOException { BlockSizeOption blockSizeOption = Options.getOption(BlockSizeOption.class, opts); BufferSizeOption bufferSizeOption = Options.getOption(BufferSizeOption.class, opts); ReplicationOption replicationOption = Options.getOption(ReplicationOption.class, opts); ProgressableOption progressOption = Options.getOption(ProgressableOption.class, opts); FileOption fileOption = Options.getOption(FileOption.class, opts); FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); StreamOption streamOption = Options.getOption(StreamOption.class, opts); KeyClassOption keyClassOption = Options.getOption(KeyClassOption.class, opts); ValueClassOption valueClassOption = Options.getOption(ValueClassOption.class, opts); MetadataOption metadataOption = Options.getOption(MetadataOption.class, opts); CompressionOption compressionTypeOption = Options.getOption(CompressionOption.class, opts); // check consistency of options if ((fileOption == null) == (streamOption == null)) { throw new IllegalArgumentException("file or stream must be specified"); } if (fileOption == null && (blockSizeOption != null || bufferSizeOption != null || replicationOption != null || progressOption != null)) { throw new IllegalArgumentException("file modifier options not " + "compatible with stream"); } FSDataOutputStream out; boolean ownStream = fileOption != null; if (ownStream) { Path p = fileOption.getValue(); FileSystem fs; if (fsOption != null) { fs = fsOption.getValue(); } else { fs = p.getFileSystem(conf); } int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : bufferSizeOption.getValue(); short replication = replicationOption == null ? fs.getDefaultReplication(p) : (short) replicationOption.getValue(); long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : blockSizeOption.getValue(); Progressable progress = progressOption == null ? null : progressOption.getValue(); out = fs.create(p, true, bufferSize, replication, blockSize, progress); } else { out = streamOption.getValue(); } Class<?> keyClass = keyClassOption == null ? Object.class : keyClassOption.getValue(); Class<?> valueClass = valueClassOption == null ? Object.class : valueClassOption.getValue(); Metadata metadata = metadataOption == null ? new Metadata() : metadataOption.getValue(); this.compress = compressionTypeOption.getValue(); final CompressionCodec codec = compressionTypeOption.getCodec(); if (codec != null && (codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) { throw new IllegalArgumentException("SequenceFile doesn't work with " + "GzipCodec without native-hadoop " + "code!"); } init(conf, out, ownStream, keyClass, valueClass, codec, metadata); } /** Create the named file. * @deprecated Use * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} * instead. */ @Deprecated public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException { this.compress = CompressionType.NONE; init(conf, fs.create(name), true, keyClass, valClass, null, new Metadata()); } /** Create the named file with write-progress reporter. * @deprecated Use * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} * instead. */ @Deprecated public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, Progressable progress, Metadata metadata) throws IOException { this.compress = CompressionType.NONE; init(conf, fs.create(name, progress), true, keyClass, valClass, null, metadata); } /** Create the named file with write-progress reporter. * @deprecated Use * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} * instead. */ @Deprecated public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize, Progressable progress, Metadata metadata) throws IOException { this.compress = CompressionType.NONE; init(conf, fs.create(name, true, bufferSize, replication, blockSize, progress), true, keyClass, valClass, null, metadata); } boolean isCompressed() { return compress != CompressionType.NONE; } boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } Writer ownStream() { this.ownOutputStream = true; return this; } /** Write and flush the file header. */ private void writeFileHeader() throws IOException { out.write(VERSION); Text.writeString(out, keyClass.getName()); Text.writeString(out, valClass.getName()); out.writeBoolean(this.isCompressed()); out.writeBoolean(this.isBlockCompressed()); if (this.isCompressed()) { Text.writeString(out, (codec.getClass()).getName()); } this.metadata.write(out); out.write(sync); // write the sync bytes out.flush(); // flush header } /** Initialize. */ @SuppressWarnings("unchecked") void init(Configuration conf, FSDataOutputStream out, boolean ownStream, Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata) throws IOException { this.conf = conf; this.out = out; this.ownOutputStream = ownStream; this.keyClass = keyClass; this.valClass = valClass; this.codec = codec; this.metadata = metadata; SerializationFactory serializationFactory = new SerializationFactory(conf); this.keySerializer = serializationFactory.getSerializer(keyClass); if (this.keySerializer == null) { throw new IOException( "Could not find a serializer for the Key class: '" + keyClass.getCanonicalName() + "'. " + "Please ensure that the configuration '" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + "properly configured, if you're using" + "custom serialization."); } this.keySerializer.open(buffer); this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); if (this.uncompressedValSerializer == null) { throw new IOException( "Could not find a serializer for the Value class: '" + valClass.getCanonicalName() + "'. " + "Please ensure that the configuration '" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + "properly configured, if you're using" + "custom serialization."); } this.uncompressedValSerializer.open(buffer); if (this.codec != null) { ReflectionUtils.setConf(this.codec, this.conf); this.compressor = CodecPool.getCompressor(this.codec); this.deflateFilter = this.codec.createOutputStream(buffer, compressor); this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter)); this.compressedValSerializer = serializationFactory.getSerializer(valClass); if (this.compressedValSerializer == null) { throw new IOException( "Could not find a serializer for the Value class: '" + valClass.getCanonicalName() + "'. " + "Please ensure that the configuration '" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + "properly configured, if you're using" + "custom serialization."); } this.compressedValSerializer.open(deflateOut); } writeFileHeader(); } /** Returns the class of keys in this file. */ public Class getKeyClass() { return keyClass; } /** Returns the class of values in this file. */ public Class getValueClass() { return valClass; } /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } /** create a sync point */ public void sync() throws IOException { if (sync != null && lastSyncPos != out.getPos()) { out.writeInt(SYNC_ESCAPE); // mark the start of the sync out.write(sync); // write sync lastSyncPos = out.getPos(); // update lastSyncPos } } /** * flush all currently written data to the file system * @deprecated Use {@link #hsync()} or {@link #hflush()} instead */ @Deprecated public void syncFs() throws IOException { if (out != null) { out.sync(); // flush contents to file system } } @Override public void hsync() throws IOException { if (out != null) { out.hsync(); } } @Override public void hflush() throws IOException { if (out != null) { out.hflush(); } } /** Returns the configuration of this file. */ Configuration getConf() { return conf; } /** Close the file. */ @Override public synchronized void close() throws IOException { keySerializer.close(); uncompressedValSerializer.close(); if (compressedValSerializer != null) { compressedValSerializer.close(); } CodecPool.returnCompressor(compressor); compressor = null; if (out != null) { // Close the underlying stream iff we own it... if (ownOutputStream) { out.close(); } else { out.flush(); } out = null; } } synchronized void checkAndWriteSync() throws IOException { if (sync != null && out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync sync(); } } /** Append a key/value pair. */ public void append(Writable key, Writable val) throws IOException { append((Object) key, (Object) val); } /** Append a key/value pair. */ @SuppressWarnings("unchecked") public synchronized void append(Object key, Object val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass); buffer.reset(); // Append the 'key' keySerializer.serialize(key); int keyLength = buffer.getLength(); if (keyLength < 0) throw new IOException("negative length keys not allowed: " + key); // Append the 'value' if (compress == CompressionType.RECORD) { deflateFilter.resetState(); compressedValSerializer.serialize(val); deflateOut.flush(); deflateFilter.finish(); } else { uncompressedValSerializer.serialize(val); } // Write the record out checkAndWriteSync(); // sync out.writeInt(buffer.getLength()); // total record length out.writeInt(keyLength); // key portion length out.write(buffer.getData(), 0, buffer.getLength()); // data } public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException { if (keyLength < 0) throw new IOException("negative length keys not allowed: " + keyLength); int valLength = val.getSize(); checkAndWriteSync(); out.writeInt(keyLength+valLength); // total record length out.writeInt(keyLength); // key portion length out.write(keyData, keyOffset, keyLength); // key val.writeUncompressedBytes(out); // value } /** Returns the current length of the output file. * * <p>This always returns a synchronized position. In other words, * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However * the key may be earlier in the file than key last written when this * method was called (e.g., with block-compression, it may be the first key * in the block that was being written when this method was called). */ public synchronized long getLength() throws IOException { return out.getPos(); } } // class Writer /** Write key/compressed-value pairs to a sequence-format file. */ static class RecordCompressWriter extends Writer { RecordCompressWriter(Configuration conf, Option... options) throws IOException { super(conf, options); } /** Append a key/value pair. */ @Override @SuppressWarnings("unchecked") public synchronized void append(Object key, Object val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass); buffer.reset(); // Append the 'key' keySerializer.serialize(key); int keyLength = buffer.getLength(); if (keyLength < 0) throw new IOException("negative length keys not allowed: " + key); // Compress 'value' and append it deflateFilter.resetState(); compressedValSerializer.serialize(val); deflateOut.flush(); deflateFilter.finish(); // Write the record out checkAndWriteSync(); // sync out.writeInt(buffer.getLength()); // total record length out.writeInt(keyLength); // key portion length out.write(buffer.getData(), 0, buffer.getLength()); // data } /** Append a key/value pair. */ @Override public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException { if (keyLength < 0) throw new IOException("negative length keys not allowed: " + keyLength); int valLength = val.getSize(); checkAndWriteSync(); // sync out.writeInt(keyLength+valLength); // total record length out.writeInt(keyLength); // key portion length out.write(keyData, keyOffset, keyLength); // 'key' data val.writeCompressedBytes(out); // 'value' data } }
RecordCompressWriter:
只针对每条记录的value值进行压缩。
/** Write compressed key/value blocks to a sequence-format file. */ static class BlockCompressWriter extends Writer { private int noBufferedRecords = 0; private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); private DataOutputBuffer keyBuffer = new DataOutputBuffer(); private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); private DataOutputBuffer valBuffer = new DataOutputBuffer(); private final int compressionBlockSize; BlockCompressWriter(Configuration conf, Option... options) throws IOException { super(conf, options); compressionBlockSize = conf.getInt("io.seqfile.compress.blocksize", 1000000); keySerializer.close(); keySerializer.open(keyBuffer); uncompressedValSerializer.close(); uncompressedValSerializer.open(valBuffer); } /** Workhorse to check and write out compressed data/lengths */ private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException { deflateFilter.resetState(); buffer.reset(); deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength()); deflateOut.flush(); deflateFilter.finish(); WritableUtils.writeVInt(out, buffer.getLength()); out.write(buffer.getData(), 0, buffer.getLength()); } /** Compress and flush contents to dfs */ @Override public synchronized void sync() throws IOException { if (noBufferedRecords > 0) { super.sync(); // No. of records WritableUtils.writeVInt(out, noBufferedRecords); // Write 'keys' and lengths writeBuffer(keyLenBuffer); writeBuffer(keyBuffer); // Write 'values' and lengths writeBuffer(valLenBuffer); writeBuffer(valBuffer); // Flush the file-stream out.flush(); // Reset internal states keyLenBuffer.reset(); keyBuffer.reset(); valLenBuffer.reset(); valBuffer.reset(); noBufferedRecords = 0; } } /** Close the file. */ @Override public synchronized void close() throws IOException { if (out != null) { sync(); } super.close(); } /** Append a key/value pair. */ @Override @SuppressWarnings("unchecked") public synchronized void append(Object key, Object val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key+" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val+" is not "+valClass); // Save key/value into respective buffers int oldKeyLength = keyBuffer.getLength(); keySerializer.serialize(key); int keyLength = keyBuffer.getLength() - oldKeyLength; if (keyLength < 0) throw new IOException("negative length keys not allowed: " + key); WritableUtils.writeVInt(keyLenBuffer, keyLength); int oldValLength = valBuffer.getLength(); uncompressedValSerializer.serialize(val); int valLength = valBuffer.getLength() - oldValLength; WritableUtils.writeVInt(valLenBuffer, valLength); // Added another key/value pair ++noBufferedRecords; // Compress and flush? int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); if (currentBlockSize >= compressionBlockSize) { sync(); } } /** Append a key/value pair. */ @Override public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException { if (keyLength < 0) throw new IOException("negative length keys not allowed"); int valLength = val.getSize(); // Save key/value data in relevant buffers WritableUtils.writeVInt(keyLenBuffer, keyLength); keyBuffer.write(keyData, keyOffset, keyLength); WritableUtils.writeVInt(valLenBuffer, valLength); val.writeUncompressedBytes(valBuffer); // Added another key/value pair ++noBufferedRecords; // Compress and flush? int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); if (currentBlockSize >= compressionBlockSize) { sync(); } } }
BlockCompressWriter:
key/value分别压缩成一块(block),block大小可设置。
/** Get the configured buffer size */ private static int getBufferSize(Configuration conf) { return conf.getInt("io.file.buffer.size", 4096); } /** Reads key/value pairs from a sequence-format file. */ public static class Reader implements java.io.Closeable { private String filename; private FSDataInputStream in; private DataOutputBuffer outBuf = new DataOutputBuffer(); private byte version; private String keyClassName; private String valClassName; private Class keyClass; private Class valClass; private CompressionCodec codec = null; private Metadata metadata = null; private byte[] sync = new byte[SYNC_HASH_SIZE]; private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; private boolean syncSeen; private long headerEnd; private long end; private int keyLength; private int recordLength; private boolean decompress; private boolean blockCompressed; private Configuration conf; private int noBufferedRecords = 0; private boolean lazyDecompress = true; private boolean valuesDecompressed = true; private int noBufferedKeys = 0; private int noBufferedValues = 0; private DataInputBuffer keyLenBuffer = null; private CompressionInputStream keyLenInFilter = null; private DataInputStream keyLenIn = null; private Decompressor keyLenDecompressor = null; private DataInputBuffer keyBuffer = null; private CompressionInputStream keyInFilter = null; private DataInputStream keyIn = null; private Decompressor keyDecompressor = null; private DataInputBuffer valLenBuffer = null; private CompressionInputStream valLenInFilter = null; private DataInputStream valLenIn = null; private Decompressor valLenDecompressor = null; private DataInputBuffer valBuffer = null; private CompressionInputStream valInFilter = null; private DataInputStream valIn = null; private Decompressor valDecompressor = null; private Deserializer keyDeserializer; private Deserializer valDeserializer; /** * A tag interface for all of the Reader options */ public static interface Option {} /** * Create an option to specify the path name of the sequence file. * @param value the path to read * @return a new option */ public static Option file(Path value) { return new FileOption(value); } /** * Create an option to specify the stream with the sequence file. * @param value the stream to read. * @return a new option */ public static Option stream(FSDataInputStream value) { return new InputStreamOption(value); } /** * Create an option to specify the starting byte to read. * @param value the number of bytes to skip over * @return a new option */ public static Option start(long value) { return new StartOption(value); } /** * Create an option to specify the number of bytes to read. * @param value the number of bytes to read * @return a new option */ public static Option length(long value) { return new LengthOption(value); } /** * Create an option with the buffer size for reading the given pathname. * @param value the number of bytes to buffer * @return a new option */ public static Option bufferSize(int value) { return new BufferSizeOption(value); } private static class FileOption extends Options.PathOption implements Option { private FileOption(Path value) { super(value); } } private static class InputStreamOption extends Options.FSDataInputStreamOption implements Option { private InputStreamOption(FSDataInputStream value) { super(value); } } private static class StartOption extends Options.LongOption implements Option { private StartOption(long value) { super(value); } } private static class LengthOption extends Options.LongOption implements Option { private LengthOption(long value) { super(value); } } private static class BufferSizeOption extends Options.IntegerOption implements Option { private BufferSizeOption(int value) { super(value); } } // only used directly private static class OnlyHeaderOption extends Options.BooleanOption implements Option { private OnlyHeaderOption() { super(true); } } public Reader(Configuration conf, Option... opts) throws IOException { // Look up the options, these are null if not set FileOption fileOpt = Options.getOption(FileOption.class, opts); InputStreamOption streamOpt = Options.getOption(InputStreamOption.class, opts); StartOption startOpt = Options.getOption(StartOption.class, opts); LengthOption lenOpt = Options.getOption(LengthOption.class, opts); BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts); // check for consistency if ((fileOpt == null) == (streamOpt == null)) { throw new IllegalArgumentException("File or stream option must be specified"); } if (fileOpt == null && bufOpt != null) { throw new IllegalArgumentException("buffer size can only be set when" + " a file is specified."); } // figure out the real values Path filename = null; FSDataInputStream file; final long len; if (fileOpt != null) { filename = fileOpt.getValue(); FileSystem fs = filename.getFileSystem(conf); int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); len = null == lenOpt ? fs.getFileStatus(filename).getLen() : lenOpt.getValue(); file = openFile(fs, filename, bufSize, len); } else { len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); file = streamOpt.getValue(); } long start = startOpt == null ? 0 : startOpt.getValue(); // really set up initialize(filename, file, start, len, conf, headerOnly != null); } /** * Construct a reader by opening a file from the given file system. * @param fs The file system used to open the file. * @param file The file being read. * @param conf Configuration * @throws IOException * @deprecated Use Reader(Configuration, Option...) instead. */ @Deprecated public Reader(FileSystem fs, Path file, Configuration conf) throws IOException { this(conf, file(file.makeQualified(fs))); } /** * Construct a reader by the given input stream. * @param in An input stream. * @param buffersize unused * @param start The starting position. * @param length The length being read. * @param conf Configuration * @throws IOException * @deprecated Use Reader(Configuration, Reader.Option...) instead. */ @Deprecated public Reader(FSDataInputStream in, int buffersize, long start, long length, Configuration conf) throws IOException { this(conf, stream(in), start(start), length(length)); } /** Common work of the constructors. */ private void initialize(Path filename, FSDataInputStream in, long start, long length, Configuration conf, boolean tempReader) throws IOException { if (in == null) { throw new IllegalArgumentException("in == null"); } this.filename = filename == null ? "<unknown>" : filename.toString(); this.in = in; this.conf = conf; boolean succeeded = false; try { seek(start); this.end = this.in.getPos() + length; // if it wrapped around, use the max if (end < length) { end = Long.MAX_VALUE; } init(tempReader); succeeded = true; } finally { if (!succeeded) { IOUtils.cleanup(LOG, this.in); } } } /** * Override this method to specialize the type of * {@link FSDataInputStream} returned. * @param fs The file system used to open the file. * @param file The file being read. * @param bufferSize The buffer size used to read the file. * @param length The length being read if it is >= 0. Otherwise, * the length is not available. * @return The opened stream. * @throws IOException */ protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize, long length) throws IOException { return fs.open(file, bufferSize); } /** * Initialize the {@link Reader} * @param tmpReader <code>true</code> if we are constructing a temporary * reader {@link SequenceFile.Sorter.cloneFileAttributes}, * and hence do not initialize every component; * <code>false</code> otherwise. * @throws IOException */ private void init(boolean tempReader) throws IOException { byte[] versionBlock = new byte[VERSION.length]; in.readFully(versionBlock); if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1]) || (versionBlock[2] != VERSION[2])) throw new IOException(this + " not a SequenceFile"); // Set 'version' version = versionBlock[3]; if (version > VERSION[3]) throw new VersionMismatchException(VERSION[3], version); if (version < BLOCK_COMPRESS_VERSION) { UTF8 className = new UTF8(); className.readFields(in); keyClassName = className.toStringChecked(); // key class name className.readFields(in); valClassName = className.toStringChecked(); // val class name } else { keyClassName = Text.readString(in); valClassName = Text.readString(in); } if (version > 2) { // if version > 2 this.decompress = in.readBoolean(); // is compressed? } else { decompress = false; } if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 this.blockCompressed = in.readBoolean(); // is block-compressed? } else { blockCompressed = false; } // if version >= 5 // setup the compression codec if (decompress) { if (version >= CUSTOM_COMPRESS_VERSION) { String codecClassname = Text.readString(in); try { Class<? extends CompressionCodec> codecClass = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); this.codec = ReflectionUtils.newInstance(codecClass, conf); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("Unknown codec: " + codecClassname, cnfe); } } else { codec = new DefaultCodec(); ((Configurable)codec).setConf(conf); } } this.metadata = new Metadata(); if (version >= VERSION_WITH_METADATA) { // if version >= 6 this.metadata.readFields(in); } if (version > 1) { // if version > 1 in.readFully(sync); // read sync bytes headerEnd = in.getPos(); // record end of header } // Initialize... *not* if this we are constructing a temporary Reader if (!tempReader) { valBuffer = new DataInputBuffer(); if (decompress) { valDecompressor = CodecPool.getDecompressor(codec); valInFilter = codec.createInputStream(valBuffer, valDecompressor); valIn = new DataInputStream(valInFilter); } else { valIn = valBuffer; } if (blockCompressed) { keyLenBuffer = new DataInputBuffer(); keyBuffer = new DataInputBuffer(); valLenBuffer = new DataInputBuffer(); keyLenDecompressor = CodecPool.getDecompressor(codec); keyLenInFilter = codec.createInputStream(keyLenBuffer, keyLenDecompressor); keyLenIn = new DataInputStream(keyLenInFilter); keyDecompressor = CodecPool.getDecompressor(codec); keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); keyIn = new DataInputStream(keyInFilter); valLenDecompressor = CodecPool.getDecompressor(codec); valLenInFilter = codec.createInputStream(valLenBuffer, valLenDecompressor); valLenIn = new DataInputStream(valLenInFilter); } SerializationFactory serializationFactory = new SerializationFactory(conf); this.keyDeserializer = getDeserializer(serializationFactory, getKeyClass()); if (this.keyDeserializer == null) { throw new IOException( "Could not find a deserializer for the Key class: '" + getKeyClass().getCanonicalName() + "'. " + "Please ensure that the configuration '" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + "properly configured, if you're using " + "custom serialization."); } if (!blockCompressed) { this.keyDeserializer.open(valBuffer); } else { this.keyDeserializer.open(keyIn); } this.valDeserializer = getDeserializer(serializationFactory, getValueClass()); if (this.valDeserializer == null) { throw new IOException( "Could not find a deserializer for the Value class: '" + getValueClass().getCanonicalName() + "'. " + "Please ensure that the configuration '" + CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " + "properly configured, if you're using " + "custom serialization."); } this.valDeserializer.open(valIn); } } @SuppressWarnings("unchecked") private Deserializer getDeserializer(SerializationFactory sf, Class c) { return sf.getDeserializer(c); } /** Close the file. */ @Override public synchronized void close() throws IOException { // Return the decompressors to the pool CodecPool.returnDecompressor(keyLenDecompressor); CodecPool.returnDecompressor(keyDecompressor); CodecPool.returnDecompressor(valLenDecompressor); CodecPool.returnDecompressor(valDecompressor); keyLenDecompressor = keyDecompressor = null; valLenDecompressor = valDecompressor = null; if (keyDeserializer != null) { keyDeserializer.close(); } if (valDeserializer != null) { valDeserializer.close(); } // Close the input-stream in.close(); } /** Returns the name of the key class. */ public String getKeyClassName() { return keyClassName; } /** Returns the class of keys in this file. */ public synchronized Class<?> getKeyClass() { if (null == keyClass) { try { keyClass = WritableName.getClass(getKeyClassName(), conf); } catch (IOException e) { throw new RuntimeException(e); } } return keyClass; } /** Returns the name of the value class. */ public String getValueClassName() { return valClassName; } /** Returns the class of values in this file. */ public synchronized Class<?> getValueClass() { if (null == valClass) { try { valClass = WritableName.getClass(getValueClassName(), conf); } catch (IOException e) { throw new RuntimeException(e); } } return valClass; } /** Returns true if values are compressed. */ public boolean isCompressed() { return decompress; } /** Returns true if records are block-compressed. */ public boolean isBlockCompressed() { return blockCompressed; } /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } /** * Get the compression type for this file. * @return the compression type */ public CompressionType getCompressionType() { if (decompress) { return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; } else { return CompressionType.NONE; } } /** Returns the metadata object of the file */ public Metadata getMetadata() { return this.metadata; } /** Returns the configuration used for this file. */ Configuration getConf() { return conf; } /** Read a compressed buffer */ private synchronized void readBuffer(DataInputBuffer buffer, CompressionInputStream filter) throws IOException { // Read data into a temporary buffer DataOutputBuffer dataBuffer = new DataOutputBuffer(); try { int dataBufferLength = WritableUtils.readVInt(in); dataBuffer.write(in, dataBufferLength); // Set up 'buffer' connected to the input-stream buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); } finally { dataBuffer.close(); } // Reset the codec filter.resetState(); } /** Read the next 'compressed' block */ private synchronized void readBlock() throws IOException { // Check if we need to throw away a whole block of // 'values' due to 'lazy decompression' if (lazyDecompress && !valuesDecompressed) { in.seek(WritableUtils.readVInt(in)+in.getPos()); in.seek(WritableUtils.readVInt(in)+in.getPos()); } // Reset internal states noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; valuesDecompressed = false; //Process sync if (sync != null) { in.readInt(); in.readFully(syncCheck); // read syncCheck if (!Arrays.equals(sync, syncCheck)) // check it throw new IOException("File is corrupt!"); } syncSeen = true; // Read number of records in this block noBufferedRecords = WritableUtils.readVInt(in); // Read key lengths and keys readBuffer(keyLenBuffer, keyLenInFilter); readBuffer(keyBuffer, keyInFilter); noBufferedKeys = noBufferedRecords; // Read value lengths and values if (!lazyDecompress) { readBuffer(valLenBuffer, valLenInFilter); readBuffer(valBuffer, valInFilter); noBufferedValues = noBufferedRecords; valuesDecompressed = true; } } /** * Position valLenIn/valIn to the 'value' * corresponding to the 'current' key */ private synchronized void seekToCurrentValue() throws IOException { if (!blockCompressed) { if (decompress) { valInFilter.resetState(); } valBuffer.reset(); } else { // Check if this is the first value in the 'block' to be read if (lazyDecompress && !valuesDecompressed) { // Read the value lengths and values readBuffer(valLenBuffer, valLenInFilter); readBuffer(valBuffer, valInFilter); noBufferedValues = noBufferedRecords; valuesDecompressed = true; } // Calculate the no. of bytes to skip // Note: 'current' key has already been read! int skipValBytes = 0; int currentKey = noBufferedKeys + 1; for (int i=noBufferedValues; i > currentKey; --i) { skipValBytes += WritableUtils.readVInt(valLenIn); --noBufferedValues; } // Skip to the 'val' corresponding to 'current' key if (skipValBytes > 0) { if (valIn.skipBytes(skipValBytes) != skipValBytes) { throw new IOException("Failed to seek to " + currentKey + "(th) value!"); } } } } /** * Get the 'value' corresponding to the last read 'key'. * @param val : The 'value' to be read. * @throws IOException */ public synchronized void getCurrentValue(Writable val) throws IOException { if (val instanceof Configurable) { ((Configurable) val).setConf(this.conf); } // Position stream to 'current' value seekToCurrentValue(); if (!blockCompressed) { val.readFields(valIn); if (valIn.read() > 0) { LOG.info("available bytes: " + valIn.available()); throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) + " bytes, should read " + (valBuffer.getLength()-keyLength)); } } else { // Get the value int valLength = WritableUtils.readVInt(valLenIn); val.readFields(valIn); // Read another compressed 'value' --noBufferedValues; // Sanity check if ((valLength < 0) && LOG.isDebugEnabled()) { LOG.debug(val + " is a zero-length value"); } } } /** * Get the 'value' corresponding to the last read 'key'. * @param val : The 'value' to be read. * @throws IOException */ public synchronized Object getCurrentValue(Object val) throws IOException { if (val instanceof Configurable) { ((Configurable) val).setConf(this.conf); } // Position stream to 'current' value seekToCurrentValue(); if (!blockCompressed) { val = deserializeValue(val); if (valIn.read() > 0) { LOG.info("available bytes: " + valIn.available()); throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) + " bytes, should read " + (valBuffer.getLength()-keyLength)); } } else { // Get the value int valLength = WritableUtils.readVInt(valLenIn); val = deserializeValue(val); // Read another compressed 'value' --noBufferedValues; // Sanity check if ((valLength < 0) && LOG.isDebugEnabled()) { LOG.debug(val + " is a zero-length value"); } } return val; } @SuppressWarnings("unchecked") private Object deserializeValue(Object val) throws IOException { return valDeserializer.deserialize(val); } /** Read the next key in the file into <code>key</code>, skipping its * value. True if another entry exists, and false at end of file. */ public synchronized boolean next(Writable key) throws IOException { if (key.getClass() != getKeyClass()) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (!blockCompressed) { outBuf.reset(); keyLength = next(outBuf); if (keyLength < 0) return false; valBuffer.reset(outBuf.getData(), outBuf.getLength()); key.readFields(valBuffer); valBuffer.mark(0); if (valBuffer.getPosition() != keyLength) throw new IOException(key + " read " + valBuffer.getPosition() + " bytes, should read " + keyLength); } else { //Reset syncSeen syncSeen = false; if (noBufferedKeys == 0) { try { readBlock(); } catch (EOFException eof) { return false; } } int keyLength = WritableUtils.readVInt(keyLenIn); // Sanity check if (keyLength < 0) { return false; } //Read another compressed 'key' key.readFields(keyIn); --noBufferedKeys; } return true; } /** Read the next key/value pair in the file into <code>key</code> and * <code>val</code>. Returns true if such a pair exists and false when at * end of file */ public synchronized boolean next(Writable key, Writable val) throws IOException { if (val.getClass() != getValueClass()) throw new IOException("wrong value class: "+val+" is not "+valClass); boolean more = next(key); if (more) { getCurrentValue(val); } return more; } /** * Read and return the next record length, potentially skipping over * a sync block. * @return the length of the next record or -1 if there is no next record * @throws IOException */ private synchronized int readRecordLength() throws IOException { if (in.getPos() >= end) { return -1; } int length = in.readInt(); if (version > 1 && sync != null && length == SYNC_ESCAPE) { // process a sync entry in.readFully(syncCheck); // read syncCheck if (!Arrays.equals(sync, syncCheck)) // check it throw new IOException("File is corrupt!"); syncSeen = true; if (in.getPos() >= end) { return -1; } length = in.readInt(); // re-read length } else { syncSeen = false; } return length; } /** Read the next key/value pair in the file into <code>buffer</code>. * Returns the length of the key read, or -1 if at end of file. The length * of the value may be computed by calling buffer.getLength() before and * after calls to this method. */ /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ @Deprecated synchronized int next(DataOutputBuffer buffer) throws IOException { // Unsupported for block-compressed sequence files if (blockCompressed) { throw new IOException("Unsupported call for block-compressed" + " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); } try { int length = readRecordLength(); if (length == -1) { return -1; } int keyLength = in.readInt(); buffer.write(in, length); return keyLength; } catch (ChecksumException e) { // checksum failure handleChecksumException(e); return next(buffer); } } public ValueBytes createValueBytes() { ValueBytes val = null; if (!decompress || blockCompressed) { val = new UncompressedBytes(); } else { val = new CompressedBytes(codec); } return val; } /** * Read 'raw' records. * @param key - The buffer into which the key is read * @param val - The 'raw' value * @return Returns the total record length or -1 for end of file * @throws IOException */ public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) throws IOException { if (!blockCompressed) { int length = readRecordLength(); if (length == -1) { return -1; } int keyLength = in.readInt(); int valLength = length - keyLength; key.write(in, keyLength); if (decompress) { CompressedBytes value = (CompressedBytes)val; value.reset(in, valLength); } else { UncompressedBytes value = (UncompressedBytes)val; value.reset(in, valLength); } return length; } else { //Reset syncSeen syncSeen = false; // Read 'key' if (noBufferedKeys == 0) { if (in.getPos() >= end) return -1; try { readBlock(); } catch (EOFException eof) { return -1; } } int keyLength = WritableUtils.readVInt(keyLenIn); if (keyLength < 0) { throw new IOException("zero length key found!"); } key.write(keyIn, keyLength); --noBufferedKeys; // Read raw 'value' seekToCurrentValue(); int valLength = WritableUtils.readVInt(valLenIn); UncompressedBytes rawValue = (UncompressedBytes)val; rawValue.reset(valIn, valLength); --noBufferedValues; return (keyLength+valLength); } } /** * Read 'raw' keys. * @param key - The buffer into which the key is read * @return Returns the key length or -1 for end of file * @throws IOException */ public synchronized int nextRawKey(DataOutputBuffer key) throws IOException { if (!blockCompressed) { recordLength = readRecordLength(); if (recordLength == -1) { return -1; } keyLength = in.readInt(); key.write(in, keyLength); return keyLength; } else { //Reset syncSeen syncSeen = false; // Read 'key' if (noBufferedKeys == 0) { if (in.getPos() >= end) return -1; try { readBlock(); } catch (EOFException eof) { return -1; } } int keyLength = WritableUtils.readVInt(keyLenIn); if (keyLength < 0) { throw new IOException("zero length key found!"); } key.write(keyIn, keyLength); --noBufferedKeys; return keyLength; } } /** Read the next key in the file, skipping its * value. Return null at end of file. */ public synchronized Object next(Object key) throws IOException { if (key != null && key.getClass() != getKeyClass()) { throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); } if (!blockCompressed) { outBuf.reset(); keyLength = next(outBuf); if (keyLength < 0) return null; valBuffer.reset(outBuf.getData(), outBuf.getLength()); key = deserializeKey(key); valBuffer.mark(0); if (valBuffer.getPosition() != keyLength) throw new IOException(key + " read " + valBuffer.getPosition() + " bytes, should read " + keyLength); } else { //Reset syncSeen syncSeen = false; if (noBufferedKeys == 0) { try { readBlock(); } catch (EOFException eof) { return null; } } int keyLength = WritableUtils.readVInt(keyLenIn); // Sanity check if (keyLength < 0) { return null; } //Read another compressed 'key' key = deserializeKey(key); --noBufferedKeys; } return key; } @SuppressWarnings("unchecked") private Object deserializeKey(Object key) throws IOException { return keyDeserializer.deserialize(key); } /** * Read 'raw' values. * @param val - The 'raw' value * @return Returns the value length * @throws IOException */ public synchronized int nextRawValue(ValueBytes val) throws IOException { // Position stream to current value seekToCurrentValue(); if (!blockCompressed) { int valLength = recordLength - keyLength; if (decompress) { CompressedBytes value = (CompressedBytes)val; value.reset(in, valLength); } else { UncompressedBytes value = (UncompressedBytes)val; value.reset(in, valLength); } return valLength; } else { int valLength = WritableUtils.readVInt(valLenIn); UncompressedBytes rawValue = (UncompressedBytes)val; rawValue.reset(valIn, valLength); --noBufferedValues; return valLength; } } private void handleChecksumException(ChecksumException e) throws IOException { if (this.conf.getBoolean("io.skip.checksum.errors", false)) { LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); } else { throw e; } } /** disables sync. often invoked for tmp files */ synchronized void ignoreSync() { sync = null; } /** Set the current byte position in the input file. * * <p>The position passed must be a position returned by {@link * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary * position, use {@link SequenceFile.Reader#sync(long)}. */ public synchronized void seek(long position) throws IOException { in.seek(position); if (blockCompressed) { // trigger block read noBufferedKeys = 0; valuesDecompressed = true; } } /** Seek to the next sync mark past a given position.*/ public synchronized void sync(long position) throws IOException { if (position+SYNC_SIZE >= end) { seek(end); return; } if (position < headerEnd) { // seek directly to first record in.seek(headerEnd); // note the sync marker "seen" in the header syncSeen = true; return; } try { seek(position+4); // skip escape in.readFully(syncCheck); int syncLen = sync.length; for (int i = 0; in.getPos() < end; i++) { int j = 0; for (; j < syncLen; j++) { if (sync[j] != syncCheck[(i+j)%syncLen]) break; } if (j == syncLen) { in.seek(in.getPos() - SYNC_SIZE); // position before sync return; } syncCheck[i%syncLen] = in.readByte(); } } catch (ChecksumException e) { // checksum failure handleChecksumException(e); } } /** Returns true iff the previous call to next passed a sync mark.*/ public synchronized boolean syncSeen() { return syncSeen; } /** Return the current byte position in the input file. */ public synchronized long getPosition() throws IOException { return in.getPos(); } /** Returns the name of the file. */ @Override public String toString() { return filename; } }
SequenceFile 数据格式
根据刚才的介绍,SequenceFiles有3种压缩形式,那么,相对应的SequenceFiles便有三种不同的数据格式,这三种数据格式,有一个相同的header
SequenceFile Header
- version - 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)
- keyClassName -key class
- valueClassName - value class
- compression - A boolean which specifies if compression is turned on for keys/values in this file.
- blockCompression - A boolean which specifies if block-compression is turned on for keys/values in this file.
- compression codec - CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).
- metadata - SequenceFile.Metadata for this file.
- sync - A sync marker to denote end of the header.
Uncompressed SequenceFile Format
- Header
- Record
- Record length
- Key length
- Key
- Value
- A sync-marker every few 100 bytes or so.
Record-Compressed SequenceFile Format
- Header
- Record
- Record length
- Key length
- Key
- Compressed Value
- A sync-marker every few 100 bytes or so.
Block-Compressed SequenceFile Format
- Header
- Record Block
- Uncompressed number of records in the block
- Compressed key-lengths block-size
- Compressed key-lengths block
- Compressed keys block-size
- Compressed keys block
- Compressed value-lengths block-size
- Compressed value-lengths block
- Compressed values block-size
- Compressed values block
- A sync-marker every block.
在hadoop2.x版本后,对于SequenceFile.Writer
的createWriter
()方法做了进一步的简化,通过createWriter()静态方法创建SequenceFile对象时,仅需configeration和Options来创建文件,并根据options来选择使用的压缩力度,最后返回SequenceFile.Writer实例。存储在SequenceFile中的键和值对并不一定是Writable类型,任意可以通过Serialization类实现序列化和反序列化的类型均可被使用。一旦拥有SequenceFile.Writer实例,就可以通过append()方法在文件末尾增加键/值对。
在SequenceFile类中定义的createWriter()
方法:static org.apache.hadoop.io.SequenceFile.Writer |
createWriter(Configurationconf,
org.apache.hadoop.io.SequenceFile.Writer.Option...opts)
Create a new Writer with the given options.
|
相关推荐
下载winutils.exe,hadoop.dll放到hadoop环境的bin目录,建议尽量使用版本匹配的,必然hadoop-2.6就使用2.6版本的。2.7版本就使用2.7.。理论上2.7版本可以使用在2.6版本上
Hadoop 2.7.1是Hadoop发展过程中的一个重要版本,它提供了许多增强特性和稳定性改进,使得大规模数据处理更加高效和可靠。在这个版本中,Hadoop的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,...
Hadoop安装教程_单机/伪分布式配置_Hadoop2.7.1/Ubuntu 16.04 本教程主要讲述了在 Ubuntu 16.04 环境下安装 Hadoop 2.7.1 的步骤,包括单机模式、伪分布式模式和分布式模式三种安装方式。以下是本教程的知识点总结...
- 设置I/O文件缓冲大小:`<value>131702</value>`。 - **修改`hdfs-site.xml`**: - 设置NameNode数据目录:`<value>file:/home/yy/hadoop-2.7.1/dfs/name</value>`。 - 设置DataNode数据目录:`<value>file:/...
在Hadoop 2.7.1中,主要包含以下关键知识点: 1. **HDFS(Hadoop Distributed File System)**:Hadoop的核心组件之一,是一个分布式文件系统,旨在跨多台机器提供高容错性和高吞吐量的数据访问。HDFS通过将大文件...
标题中的"hadoop2.7.1.rar"表明这是一个关于Apache Hadoop的压缩文件,具体版本为2.7.1。Hadoop是一个开源框架,主要用于分布式存储和计算,它由Apache软件基金会开发,广泛应用于大数据处理领域。这个压缩包可能是...
hadoop2.7.1的eclipse插件,编译环境,eclipse 4.4(luna) ,jdk1.7,ant1.9.6,maven3.3,hadoop2.7.1,centos6.7,jdk1.7 要注意的是开发黄金下jdk版本必须是jdk1.7及以上,否则无法使用
Hadoop2.7.1是Hadoop发展中的一个重要版本,它在前一个版本的基础上进行了一系列的优化和改进,增强了系统的稳定性和性能。这个压缩包文件包含的是Hadoop2.7.1的中文文档,对于学习和理解Hadoop的运作机制、配置以及...
Hadoop 2.7.1是其一个重要的版本,提供了许多性能优化和功能增强。然而,Hadoop最初设计的目标是在Linux环境下运行,因此,直接在Windows系统上运行可能会遇到兼容性问题。为了在Windows上成功部署并运行Hadoop ...
在使用Hadoop时,需要注意数据的分块策略,合理设置Block Size以优化I/O效率。同时,为了保证数据安全,定期进行NameNode的快照备份是必要的。此外,监控系统性能,如磁盘使用率、CPU和内存使用情况,以及网络带宽,...
在Hadoop2.7.1安装包中,`hadoop-2.7.1.tar.gz`是主要的发布文件,包含了Hadoop的所有组件和依赖库。这个tarball文件通常在Linux环境下使用,通过解压缩可以得到Hadoop的源代码和二进制文件。用户需要配置环境变量、...
本文将详细介绍如何配置Eclipse以支持Hadoop 2.7.1,并讨论相关的知识点。 首先,配置Eclipse Hadoop插件的步骤如下: 1. **下载插件**:你需要下载`hadoop-eclipse-plugin-2.7.1.jar`这个文件,它是Eclipse与...
hadoop2.7.1平台搭建
标题 "hadoop2.7.1-win32.zip" 指示了这是一个适用于Windows 32位操作系统的Hadoop版本,具体为2.7.1。Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在大量计算机节点上处理和存储海量数据。这个...
在这个hadoop2.7.1tar包.zip文件中,我们拥有了Hadoop 2.7.1的源码或二进制版本,它是一个重要的里程碑版本,包含了很多改进和优化。Hadoop在大数据领域扮演着核心角色,其主要由两个关键组件构成:HDFS(Hadoop ...
《Hadoop Winutils.exe在2.7.1版本中的应用与配置详解》 Hadoop作为一个分布式计算框架,广泛应用于大数据处理领域。在Windows环境中,Winutils.exe和hadoop.dll是Hadoop的重要组成部分,它们为Hadoop在Windows上的...
同时,此版本还对 HDFS 和 MapReduce 进行了性能优化,例如更快的数据读写速度、更高效的磁盘 I/O 和网络通信等。 在实际应用中,Hadoop 2.7.1 可用于各种场景,如日志分析、推荐系统、图像处理、生物信息学研究等...
标题中的"hadoop-2.7.1.tar.gz"是一个压缩包文件,它是Apache Hadoop的2.7.1版本。Hadoop是一个开源框架,主要用于分布式存储和计算,它使得处理和存储海量数据变得可能。".tar.gz"是Linux/Unix系统中常用的文件压缩...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。Hadoop 2.7.1是这个框架的一个重要版本,它包含了...在进行Hadoop开发和数据处理时,熟悉这些基础知识能帮助我们更高效地利用这个强大的框架。
Hadoop2.7.1是Hadoop的一个重要版本,它带来了许多改进和优化,而Spark则是一个快速、通用且可扩展的数据处理框架,尤其在处理大规模数据时表现出色。Spark与Hadoop的兼容性是确保大数据工作流流畅运行的关键。 ...