`
qianshangding
  • 浏览: 129181 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Hadoop之SequenceFile

 
阅读更多

Hadoop序列化文件SequenceFile可以用于解决大量小文件(所谓小文件:泛指小于black大小的文件)问题,SequenceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key,value>对序列化到文件中,一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。


hadoop Archive也是一个高效地将小文件放入HDFS块中的文件存档文件格式,详情请看:hadoop Archive


但是SequenceFile文件不能追加写入,适用于一次性写入大量小文件的操作。

SequenceFile的压缩基于CompressType,请看源码:

  /**
   * The compression type used to compress key/value pairs in the
   * {@link SequenceFile}.
   * @see SequenceFile.Writer
   */
public static enum CompressionType {
    /** Do not compress records. */
    NONE, //不压缩
    /** Compress values only, each separately. */
    RECORD,  //只压缩values
    /** Compress sequences of records together in blocks. */
    BLOCK //压缩很多记录的key/value组成块
}

SequenceFile读写示例:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;

/**
 * @version 1.0
 * @author Fish
 */
public class SequenceFileWriteDemo {
	private static final String[] DATA = { "fish1", "fish2", "fish3", "fish4" };

	public static void main(String[] args) throws IOException {
		/**
		 * 写SequenceFile
		 */
		String uri = "/test/fish/seq.txt";
		Configuration conf = new Configuration();
		Path path = new Path(uri);
		IntWritable key = new IntWritable();
		Text value = new Text();
		Writer writer = null;
		try {
			/**
			 * CompressionType.NONE 不压缩<br>
			 * CompressionType.RECORD 只压缩value<br>
			 * CompressionType.BLOCK 压缩很多记录的key/value组成块
			 */
			writer = SequenceFile.createWriter(conf, Writer.file(path), Writer.keyClass(key.getClass()),
					Writer.valueClass(value.getClass()), Writer.compression(CompressionType.BLOCK));

			for (int i = 0; i < 4; i++) {
				value.set(DATA[i]);
				key.set(i);
				System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
				writer.append(key, value);

			}
		} finally {
			IOUtils.closeStream(writer);
		}

		/**
		 * 读SequenceFile
		 */
		SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path));
		IntWritable key1 = new IntWritable();
		Text value1 = new Text();
		while (reader.next(key1, value1)) {
			System.out.println(key1 + "----" + value1);
		}
		IOUtils.closeStream(reader);// 关闭read流
		
		/**
		 * 用于排序
		 */
//		SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, comparator, IntWritable.class, Text.class, conf);
	}
}

以上程序执行多次,并不会出现数据append的情况,每次都是重新创建一个文件,且文件中仅仅只有四条数据。究其原因,可以查看SequenceFile.Writer类的构造方法源码:
out = fs.create(p, true, bufferSize, replication, blockSize, progress);

第二个参数为true,表示每次覆盖同名文件,如果为false会抛出异常。这样设计的目的可能是和HDFS一次写入多次读取有关,不提倡追加现有文件,所以构造方法写死了true。


SequenceFile文件的数据组成形式:



一,Header


写入头部的源码:

    /** Write and flush the file header. */
    private void writeFileHeader() 
      throws IOException {
      out.write(VERSION);//版本号
      Text.writeString(out, keyClass.getName());//key的Class
      Text.writeString(out, valClass.getName());//val的Class

      out.writeBoolean(this.isCompressed());//是否压缩
      out.writeBoolean(this.isBlockCompressed());//是否是CompressionType.BLOCK类型的压缩
      
      if (this.isCompressed()) {
        Text.writeString(out, (codec.getClass()).getName());//压缩类的名称
      }
      this.metadata.write(out);//写入metadata
      out.write(sync);                       // write the sync bytes
      out.flush();                           // flush header
    }
版本号:
  private static byte[] VERSION = new byte[] {
    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
  };

同步标识符的生成方式:
    byte[] sync;                          // 16 random bytes
    {
      try {                                       
        MessageDigest digester = MessageDigest.getInstance("MD5");
        long time = Time.now();
        digester.update((new UID()+"@"+time).getBytes());
        sync = digester.digest();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
二,Record

Writer有三个实现类,分别对应CompressType的NONE,RECOR,BLOCK。下面逐一介绍一下(结合上面的图看):

1,NONE SequenceFile

Record直接存Record 的长度,KEY的长度,key值,Value的值

2, BlockCompressWriter

/** Append a key/value pair. */
    @Override
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key+" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val+" is not "+valClass);

      // Save key/value into respective buffers 
      int oldKeyLength = keyBuffer.getLength();
      keySerializer.serialize(key);
      int keyLength = keyBuffer.getLength() - oldKeyLength;
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);
      WritableUtils.writeVInt(keyLenBuffer, keyLength);//每调一次,都会累加keyLength

      int oldValLength = valBuffer.getLength();
      uncompressedValSerializer.serialize(val);
      int valLength = valBuffer.getLength() - oldValLength;
      WritableUtils.writeVInt(valLenBuffer, valLength);//每调一次,都会累加valLength      
      // Added another key/value pair
      ++noBufferedRecords;
      
      // Compress and flush?
      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
      if (currentBlockSize >= compressionBlockSize) {
      //compressionBlockSize = conf.getInt("io.seqfile.compress.blocksize", 1000000);
      //超过1000000就会写一个Sync
       sync();
      }
    

超过compressionBlockSize的大小,就会调用sync()方法,下面看看sync的源码(和上面的图对照):

会写入和图中所画的各个数据项。

/** Compress and flush contents to dfs */
    @Override
    public synchronized void sync() throws IOException {
      if (noBufferedRecords > 0) {
        super.sync();
        
        // No. of records
        WritableUtils.writeVInt(out, noBufferedRecords);
        
        // Write 'keys' and lengths
        writeBuffer(keyLenBuffer);
        writeBuffer(keyBuffer);
        
        // Write 'values' and lengths
        writeBuffer(valLenBuffer);
        writeBuffer(valBuffer);
        
        // Flush the file-stream
        out.flush();
        
        // Reset internal states
        keyLenBuffer.reset();
        keyBuffer.reset();
        valLenBuffer.reset();
        valBuffer.reset();
        noBufferedRecords = 0;
      }
      
    }


2,RecordCompressWriter

/** Append a key/value pair. */
    @Override
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val.getClass().getName()
                              +" is not "+valClass);

      buffer.reset();

      // Append the 'key'
      keySerializer.serialize(key);
      int keyLength = buffer.getLength();
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);

      // Compress 'value' and append it
      deflateFilter.resetState();
      compressedValSerializer.serialize(val);
      deflateOut.flush();
      deflateFilter.finish();

      // Write the record out
      checkAndWriteSync();                                // sync
      out.writeInt(buffer.getLength());                   // total record length record的长度
      out.writeInt(keyLength);                            // key portion length key的长度
      out.write(buffer.getData(), 0, buffer.getLength()); // data 数据
    }
写入Sync:
synchronized void checkAndWriteSync() throws IOException {
      if (sync != null &&
          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
        sync();
      }
    }

SYNC_INTERVAL的定义:
  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash

  /** The number of bytes between sync points.*/
  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
每2000个byte,就会写一个Sync。


总结:

Record:存储SequenceFile通用的KV数据格式,Key和Value都是二进制变长的数据。Record表示Key和Value的byte的总和。

Sync:主要是用来扫描和恢复数据的,以至于读取数据的Reader不会迷失。

Header:存储了如下信息:文件标识符SEQ,key和value的格式说明,以及压缩的相关信息,metadata等信息。

metadata包含文件头所需要的数据:文件标识、Sync标识、数据格式说明(含压缩)、文件元数据(时间、owner、权限等)、检验信息等

分享到:
评论

相关推荐

    Chinese2SequenceFile.rar_中文转Sequencefile

    标题 "Chinese2SequenceFile.rar_中文转Sequencefile" 指示了这个压缩包包含的是一组用于将中文文档转换为Hadoop支持的SequenceFile格式的资源。SequenceFile是Hadoop生态系统中的一个基础数据存储格式,它以键值对...

    sequenceFile打包多个小文件

    SequenceFile是一种基于Hadoop的文件格式,用于存储大量小文件。SequenceFile的优点是可以高效地存储和读取大量小文件,同时也可以对小文件进行排序、压缩和加密等操作。 SequenceFile的基本结构由三部分组成:...

    glibc-2.14 Hadoop专属glib

    升级glib解决Hadoop WARN util.NativeCodeLoader: ... 和 SequenceFile doesn't work with GzipCodec without native-hadoop code 问题, 具体请参见博文:https://blog.csdn.net/l1028386804/article/details/88420473

    sequencefile&mapfile代码

    在Hadoop生态系统中,SequenceFile和MapFile是两种常见的数据存储格式,它们为大数据处理提供了高效、可扩展的解决方案。这两个文件格式都是Hadoop原生支持的,用于存储大规模数据集,尤其适用于分布式环境。接下来...

    sequencify-CBIR-on-hadoop:将图像转换为 Hadoop SequenceFile 格式,适用于基于内容的图像检索系统

    在IT领域,尤其是在大数据处理和图像检索系统的设计中,Hadoop SequenceFile是一种广泛使用的存储格式。这个名为"sequencify-CBIR-on-hadoop"的项目专注于将图像数据转化为SequenceFile格式,以便在基于内容的图像...

    云计算技术实验报告六SequenceFile使用

    实验报告的主题是“云计算技术实验报告六SequenceFile使用”,主要涉及了云计算环境下的大数据处理技术,特别是Apache Hadoop中的SequenceFile。SequenceFile是一种高效的、序列化的文件格式,常用于存储和处理大...

    hadoop权威指南 ncdc2015年数据

    (2) 使用Hadoop的SequenceFile、Avro或Parquet等格式,这些格式能更高效地处理小文件;(3) 使用HBase等NoSQL数据库进行数据存储,它们更适用于大量小键值对。 5. **NCDC数据应用**:NCDC的气候数据可以用于多种分析...

    SequenceFile转换成MapFile

    在Hadoop大数据处理环境中,SequenceFile和MapFile都是常见的数据存储格式。SequenceFile是一种二进制文件格式,常用于存储键值对数据,适合大规模数据的处理和传输。MapFile则是SequenceFile的一种优化形式,它增加...

    hadoop-core-0.20.2 源码 hadoop-2.5.1-src.tar.gz 源码 hadoop 源码

    4. **数据存储与序列化**:Hadoop使用`org.apache.hadoop.io`包处理数据的存储和序列化,包括各种基本类型(如IntWritable、Text等)和复杂的可序列化对象(如SequenceFile、Avro等)。 5. **配置管理**:`org....

    TextFile转为SequenceFile

    业务需要hive读取SequenceFile文件,所以把TextFile类型转SequenceFile,再导入hive

    Hadoop权威指南 第二版(中文版)

     2.2.1 数据模型的“旋风之旅”  2.2.2 实现  2.3 安装  2.3.1 测试驱动  2.4 客户机  2.4.1 Java  2.4.2 Avro,REST,以及Thrift  2.5 示例  2.5.1 模式  2.5.2 加载数据  2.5.3 Web查询  2.6 HBase和...

    hadoop权威指南 中文版 英文版Hadoop: The Definitive Guide 带书签,无密码

    同时,你还会了解到数据的输入/输出(I/O)机制,如使用SequenceFile和Avro进行高效的数据序列化。 《Hadoop权威指南》不仅讲解了理论知识,还提供了大量实践案例和示例代码,帮助读者理解和掌握Hadoop的实际应用。...

    11、hadoop环境下的Sequence File的读写与合并

    import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop....

    Hadoop权威指南(中文版)2015上传.rar

    2.2.1 数据模型的"旋风之旅" 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift 2.5 示例 2.5.1 模式 2.5.2 加载数据 2.5.3 Web查询 2.6 HBase和RDBMS的比较 2.6.1 成功的...

    hadoop-api中文说明文档

    `SequenceFile`和`Avro`等格式支持高效的分桶处理。 8. **YARN(Yet Another Resource Negotiator)**:Hadoop 2.0引入了YARN,作为资源管理和调度的中心,分离了计算和资源管理,提高了集群的利用率和可扩展性。 ...

    hadoop2.6-api.zip

    8. **IO组件**:Hadoop的IO组件,如`SequenceFile`、`Text`、`BytesWritable`等,提供了高效的数据存储和传输格式。 通过解压并使用“hadoop2.6-api.zip”,开发者可以直接引用这些API,构建自己的Hadoop应用程序,...

    Hadoop在电信大数据平台的研究与设计.pdf

    - 使用Hadoop的SequenceFile或CompositeFileFormat等格式,将小文件批量写入到大文件中,减少I/O操作。 3. Hadoop技术框架详解 Hadoop的核心由两大部分组成:MapReduce和HDFS。MapReduce是一种编程模型,用于大规模...

    Hadoop权威指南(原版)

    7. **数据存储**:在Hadoop中,数据通常以文件的形式存储,支持多种文件格式,如SequenceFile、TextFile等,每种格式都有其特定的使用场景和优势。 8. **数据处理**:MapReduce编程模型,包括编写Map函数和Reduce...

    Hadoop: The Definitive Guide 中英两版

    5. **数据输入与输出**:了解多种数据源的接入方式,如SequenceFile、TextFile等,以及如何通过InputFormat和OutputFormat自定义处理。 6. **Hadoop的监控与调试**:学习如何使用Nagios、Ganglia等工具监控集群状态...

    content.zip

    SequenceFile是Hadoop生态系统中的一个重要组件,是一种高效、可靠的二进制文件格式,常用于存储大规模数据集。本篇将深入探讨SequenceFile及其在Java环境下的操作,结合给定的"content.zip"压缩包,我们将分析如何...

Global site tag (gtag.js) - Google Analytics