`
bupt04406
  • 浏览: 348818 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hive 中间结果和结果数据压缩

 
阅读更多
Hadoop.The.Definitive.Guide.2nd.Edition    79页
hadoop默认的压缩算法。
DEFLATE org.apache.hadoop.io.compress.DefaultCodec

结果数据压缩是否开启,下面的配置为true,所以开启。
这个是最终的结果数据:
<property>
  <name>hive.exec.compress.output</name>
  <value>true</value>
  <description> This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>

mapred.output.compression.codec 这个选项确定压缩算法

这个是中间的结果数据是否压缩,也就是一个sql,生成多道MR,除了最后一道MR的结果数据外,前面的MR的结果数据可以压缩。
<property>
  <name>hive.exec.compress.intermediate</name>
  <value>true</value>
  <description> This controls whether intermediate files produced by hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>
中间结果数据压缩使用的算法
<property>
<name>hive.intermediate.compression.codec</name>
<value>org.apache.hadoop.io.compress.LzoCodec</value>
</property>
默认的文件格式是SequenceFile
<property>
  <name>hive.default.fileformat</name>
  <value>SequenceFile</value>
  <description>Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS &lt;TEXTFILE|SEQUENCEFILE&gt; to override</description>
</property>

HiveConf里面:
    COMPRESSRESULT("hive.exec.compress.output", false),
    COMPRESSINTERMEDIATE("hive.exec.compress.intermediate", false),
    COMPRESSINTERMEDIATECODEC("hive.intermediate.compression.codec", ""),
    COMPRESSINTERMEDIATETYPE("hive.intermediate.compression.type", ""),


hive.exec.compress.output
SemanticAnalyzer:
  private Operator genFileSinkPlan(String dest, QB qb, Operator input)
      throws SemanticException {

        Operator output = putOpInsertMap(
        OperatorFactory.getAndMakeChild(
            new FileSinkDesc(
                queryTmpdir,
                table_desc,
                conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), //结果数据是否压缩
                currentTableId,
                rsCtx.isMultiFileSpray(),
                rsCtx.getNumFiles(),
                rsCtx.getTotalFiles(),
                rsCtx.getPartnCols(),
                dpCtx),
            fsRS, input), inputRR);     

  }

FileSinkOperator:
  private void createEmptyBuckets(Configuration hconf, ArrayList<String> paths)
      throws HiveException, IOException {
    
    for (String p: paths) {
      Path path = new Path(p);
      RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
          jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);//创建RecordWriter
      writer.close(false); 
      LOG.info("created empty bucket for enforcing bucketing at " + path);
    }
     
  }

HiveFileFormatUtils:
  public static RecordWriter getRecordWriter(JobConf jc,
      HiveOutputFormat<?, ?> hiveOutputFormat,
      final Class<? extends Writable> valueClass, boolean isCompressed,
      Properties tableProp, Path outPath) throws IOException, HiveException {
    if (hiveOutputFormat != null) {
      return hiveOutputFormat.getHiveRecordWriter(jc, outPath, valueClass,
          isCompressed, tableProp, null);
    }
    return null;
  }

HiveSequenceFileOutputFormat:
public class HiveSequenceFileOutputFormat extends SequenceFileOutputFormat
    implements HiveOutputFormat<WritableComparable, Writable> {
  
      public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
      Class<? extends Writable> valueClass, boolean isCompressed,
      Properties tableProperties, Progressable progress) throws IOException {

         FileSystem fs = finalOutPath.getFileSystem(jc);
         final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
             fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);

      }

}

Utilities:
  public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file,
      Class<?> keyClass, Class<?> valClass, boolean isCompressed) throws IOException {
    CompressionCodec codec = null;
    CompressionType compressionType = CompressionType.NONE;
    Class codecClass = null;
    if (isCompressed) {
      compressionType = SequenceFileOutputFormat.getOutputCompressionType(jc);
      codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); //默认的压缩算法是DefaultCodec   org.apache.hadoop.io.compress.DefaultCodec
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
    }
    return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec));

  }

FileOutputFormat:
  public static Class<? extends CompressionCodec>
  getOutputCompressorClass(JobConf conf,
                       Class<? extends CompressionCodec> defaultValue) {
    Class<? extends CompressionCodec> codecClass = defaultValue;
   
    String name = conf.get("mapred.output.compression.codec"); //可以经过这个选项进行配置
    if (name != null) {
      try {
        codecClass =
        conf.getClassByName(name).asSubclass(CompressionCodec.class);
      } catch (ClassNotFoundException e) {
        throw new IllegalArgumentException("Compression codec " + name +
                                           " was not found.", e);
      }
    }
    return codecClass;
  }

中间结果数据压缩:
GenMapRedUtils.splitTasks:
  public static void splitTasks(Operator<? extends Serializable> op,
      Task<? extends Serializable> parentTask,
      Task<? extends Serializable> childTask, GenMRProcContext opProcCtx,
      boolean setReducer, boolean local, int posn) throws SemanticException {
   
    // Create a file sink operator for this file name
    boolean compressIntermediate = parseCtx.getConf().getBoolVar(
        HiveConf.ConfVars.COMPRESSINTERMEDIATE);
    FileSinkDesc desc = new FileSinkDesc(taskTmpDir, tt_desc,
        compressIntermediate);
    if (compressIntermediate) {
      desc.setCompressCodec(parseCtx.getConf().getVar(
          HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
      desc.setCompressType(parseCtx.getConf().getVar(
          HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
    }
    Operator<? extends Serializable> fs_op = putOpInsertMap(OperatorFactory
        .get(desc, parent.getSchema()), null, parseCtx);
  
}
分享到:
评论

相关推荐

    Hive优化方法整理

    Hive 优化方法整理是 Hive 数据处理过程中的重要步骤,涉及到 Hive 的类 SQL 语句本身进行调优、参数调优、Hadoop 的 HDFS 参数调优和 Map/Reduce 调优等多个方面。 Hive 类 SQL 语句优化 1. 尽量尽早地过滤数据...

    CDH550@hive存储格式和HQL材料

    - **应用场景**: 适合作为MapReduce作业的中间结果数据压缩格式,或作为连续作业的输入输出。 #### 四、Hive数据存储格式 Hive允许用户自定义数据格式,主要通过指定列分隔符、行分隔符以及读取数据的方法来实现。...

    hive常见的优化方案ppt

    可以开启Hive的中间数据和最终数据压缩,使用`hive.exec.compress.output=true`和`hive.exec.compress.intermediate=true`,并选择合适的压缩编码器如LZO、GZIP或Snappy。 2. **处理数据倾斜**:数据倾斜发生在某些...

    hive参数配置说明大全

    该参数决定了HDFS路径,用于存储不同map/reduce阶段的执行计划和这些阶段的中间输出结果,默认值为/tmp/&lt;user.name&gt;/hive。 8. hive.metastore.warehouse.dir 该参数决定了Hive默认的数据文件存储路径,通常为HDFS...

    Hive学习笔记

    - 除了设置 `hive.map.aggr`,还可以调整其他参数如 `hive.exec.parallel` 和 `hive.exec.compress.intermediate` 来并行执行任务和压缩中间结果。 13. **Hive 增加列**: - 可以使用 ALTER TABLE 命令向已有的表...

    Hive Summit 2011-join

    这种操作对于小数据集来说效率较高,但是当涉及到大数据量时,它的性能会迅速下降,因为它需要处理大量的中间数据,并且在Map和Reduce阶段都要进行数据的全量扫描。 2. Map Join(映射Join) Map Join是针对大数据...

    《企业级Hive实战课程》大纲

    - 多job间共享中间结果集的技巧; - 执行计划的深入分析; - 几种Join方式(ReduceJoin、MapJoin、SMBJoin)的工作原理与适用场景; - PredicatePushdown(PPD)的作用与实现; - 数据倾斜现象的诊断与解决策略...

    hive优化(ppt)

    此外,启用压缩临时文件`mapred.compress.map.output=true`可以进一步减少中间数据的磁盘占用和网络传输时间,从而加速整体流程。 综上所述,Hive优化是一个综合性的过程,涉及查询设计、数据存储和压缩等多个方面...

    Hive及Hadoop作业调优

    这包括了控制Map任务的数量、合理设置Reduce任务的数量、压缩中间数据以及减少磁盘IO。配置参数调优的目的是提高数据处理的效率和速度,降低不必要的资源消耗。 2. 自定义模块:调优过程中,用户可能需要根据自身...

    Hive用户指南(Hive_user_guide)_中文版

    除了基本的数据存储和查询功能外,Hive还提供了丰富的数据管理功能,包括但不限于表的创建、修改、删除,以及数据的导入导出等操作。 #### 二、Hive基本操作 **2.1 createtable** - **总述**:`CREATE TABLE`命令...

    站在hadoop上看hive

    - 通过压缩中间数据(`mapred.compress.map.output`)减少磁盘I/O操作和网络传输数据量。 - 配置合适的Shuffle参数,如`io.sort.mb`、`io.sort.spill.percent`等,以提高Shuffle过程的效率。 ### 知识点三:Hive与...

    Hive配置和基本操作 (2).docx

    - `&lt;property&gt;&lt;name&gt;hive.exec.scratchdir&lt;/name&gt;&lt;value&gt;/tmp/hive&lt;/value&gt;&lt;/property&gt;`: 用于存储执行过程中的中间结果。 - 其他配置项,如元数据存储、连接数据库的相关参数等,确保配置正确无误。 **二、Hive...

    Hive配置和基本操作 (2).pdf

    - `&lt;name&gt;hive.exec.local.scratchdir&lt;/name&gt;`:本地临时目录,用于存放中间计算结果。 - `&lt;name&gt;hive.exec.scratchdir&lt;/name&gt;`:分布式临时目录,用于跨节点的临时文件。 - `&lt;name&gt;hive.server2.logging....

    配电网监测数据的分布式Map压缩-查询技术.pdf

    在传统的模型中,数据压缩通常在Reduce阶段进行,但作者提出了一种新的思路,即在Map阶段即刻进行数据压缩,以此来减少后续步骤中的数据量,进而提升整个查询和处理过程的效率。 最后,文章中提到的时延性相关公式...

    Hive脚本任务参数优化配置.doc

    ` 开启中间结果数据压缩,减少数据传输时间。 #### 三、总结 通过上述对Hive脚本任务参数的优化配置,我们可以显著提高Hive查询的执行效率。然而,需要注意的是,在实际应用中还需根据具体场景灵活调整这些参数。...

    hive配置说明

    - **含义**:定义了HDFS路径,用于存储不同map/reduce阶段的执行计划和这些阶段的中间输出结果。 - **默认值**:`/tmp/&lt;user.name&gt;/hive` - **建议设置**:确保有足够的空间,并且访问权限正确。 8. **hive....

    Hive高级编程

    - **Map 阶段**:负责对输入数据进行处理并产生中间结果。每个 Map 任务只处理输入数据的一部分。 - **Shuffle 阶段**:将所有 Map 任务产生的中间结果按键排序后发送到 Reduce 任务。 - **Reduce 阶段**:负责对...

Global site tag (gtag.js) - Google Analytics